首页 理论教育 使用SparkSQL操作MongoDB的步骤与技巧

使用SparkSQL操作MongoDB的步骤与技巧

时间:2023-07-02 理论教育 版权反馈
【摘要】:本节Spark SQL操作MongoDB案例将对学生信息表的相关记录进行查询统计操作。2)Spark中引入MongoDB的相关JAR包。3)设置MongoDB数据库连接的URI信息:地址、端口、数据库及文档信息。4)设置MongoDB的查询条件。在用Spark操作MongoDB的过程中,首先使用命令./mongod--dbpath../data/--log-path../log---logappend启动MongoDB的服务。3)对newAPIHadoopRDD查询MongoDB的结果遍历,打印输出。操作结果如下图:示例四:删除历史分数小于90分的键1)创建Hadoop的Configuration配置类,设置MongoDB的输入i

使用SparkSQL操作MongoDB的步骤与技巧

本节Spark SQL操作MongoDB案例将对学生信息表的相关记录进行查询统计操作。具体步骤如下:

1)MongoDB服务器启动MongoDB服务。

2)Spark中引入MongoDB的相关JAR包。

3)设置MongoDB数据库连接的URI信息:地址、端口、数据库及文档信息。

4)设置MongoDB的查询条件。

5)Spark代码打成JAR包,提交集群运行。Spark从MongoDB中查询统计学生信息的数据。

●查询性别为男的所有学生。

●查询性别为男、数学成绩高于80分的文档。

●数学成绩低于90的分数加上5分成绩。

●删除历史History分数小于90的键。

●查询结果保存到MongoDB数据库中。

Spark中读取MongoDB数据库,需要依赖的JAR包包括:MongoDB-driver-3.0.2.jar,MongoDB-driver-core-3.0.2.jar,bson-3.0.2.jar,mongo-hadoop-core-1.4.0.jar,mon-go-java-driver-3.0.2.jar,spark-MongoDB_2.10-0.10.1.jar,可以在Maven的配置文件pom.xml中增加以下依赖关系:

其中mongo-hadoop-core-1.4.0.jar是连接器,可以用它来实现从MongoDB上读写数据,其配置参数使用配置对象传递,其中最重要的两个参数是mongo.input.uri和mongo.output.uri,这两个参数提供了MongoDB主机、端口、权限、数据库和数据集合名字。

在用Spark操作MongoDB的过程中,首先使用命令./mongod--dbpath../data/--log-path../log---logappend启动MongoDB的服务。

然后用Spark操作MngoDB需要引入如下的类(org.bson.BasicBSONObject、org.bson.BSONObject、com.mongodb.hadoop.MongoInputFormat、com.mongodb.hadoop.Mongo Output-Format、org.apache.hadoop.conf.Configuration)

引入MongoDB的JAR包以后,在main主程序中封装各方法操作MongoDB,步骤如下:

初始化Spark Context。

●在主程序业务代码中分别调用各方法:queryDocuments(sc)、querySubcollection(sc)、updateMath(sc)、removeHistory(sc)、saveToMongo(sc)

●关闭Spark Context。

main主函数代码如下:

(1)示例一:查询性别为男的所有学生

1)创建Hadoop的Configuration配置类,设置MongoDB的输入input URI连接属性:地址、端口、MyDB数据库及MyCollection集合信息;设置MongoDB的input输入类型为Mon-goInputFormat;设置MongoDB的查询条件mongo.input.query:性别是男。

2)调用SparkContext的newAPIHadoopRDD方法,newAPIHadoopRDD的第一个参数Con-figuration用于设置数据集的配置,Configuration将被放进Spark广播中。这里传入MongoDB的配置类;第二个参数InputFormat为输入类型是MongoInputFormat格式;第三个参数是返回结果的Key值,类型为Object;第四个参数是返回结果的Value值,类型为BSONObject;(www.xing528.com)

3)对newAPIHadoopRDD查询MongoDB的结果遍历,打印输出。第一个元素是Object ID,第二个元素是MongoDB中MyDB数据库MyCollection集合中性别为男的文档document记录。

查询结果如下:

(2)示例二:查询性别是男并且数学分数大于80分的文档

1)创建Hadoop的Configuration配置类,设置MongoDB的输入input URI连接属性:地址、端口、MyDB数据库及MyCollection集合信息;设置MongoDB的input输入类型为Mon-goInputFormat;设置MongoDB的查询条件mongo.input.query:信息Info性别为男,分数Score为数学成绩大于80分。

2)调用SparkContext的newAPIHadoopRDD方法。

3)对newAPIHadoopRDD查询MongoDB的结果遍历,打印输出。第一个元素是Object ID,第二个元素是MongoDB中MyDB数据库MyCollection集合中性别是男并且数学大于80分的记录。

(3)示例三:数学小于90分的分数加5分

1)创建Hadoop的Configuration配置类,设置MongoDB的输入input URI连接属性:地址、端口、MyDB数据库及MyCollection集合信息;设置MongoDB的输出output URI连接属性:地址、端口、test数据库及foo集合信息;设置MongoDB的input输入类型为MongoIn-putFormat;设置MongoDB的更新操作mongo.input.update:数学小于90分就加5分。

2)调用SparkContext的newAPIHadoopRDD方法。将数学成绩低于90分的加5分以后的输出结果保存到MongoDB test数据库的foo集合。

3)对newAPIHadoopRDD查询MongoDB的结果遍历,打印输出。第一个元素是Object ID,第二个元素是MongoDB中MyDB数据库MyCollection集合中数学成绩低于90分就加5分的记录。

操作结果如下图:

(4)示例四:删除历史分数小于90分的键

1)创建Hadoop的Configuration配置类,设置MongoDB的输入input URI连接属性:地址、端口、MyDB数据库及MyCollection集合信息;设置MongoDB的input输入类型为Mon-goInputFormat;设置MongoDB的更新操作mongo.input.update:删除历史分数小于90分的键值

2)调用SparkContext的newAPIHadoopRDD方法。

3)对newAPIHadoopRDD查询MongoDB的结果遍历,打印输出。第一个元素是Object ID,第二个元素是删除历史分数小于90分的集合记录,即X同学的历史分数是87分小于90分,因此X同学记录中的历史分数键值对被删除。

(5)示例五:保存数据到mongodbMongoDB中

1)创建Hadoop的Configuration配置类,设置MongoDB的输入input URI连接属性:地址、端口、MyDB数据库及MyCollection集合信息;设置MongoDB的输出output URI连接属性:地址、端口、MyDB数据库及MyCollection集合信息;

2)调用Spark的parallelize方法生成data RDD。

3)遍历data,将data的数据写入到BasicBSONObject,BasicBSONObject对象obj放入name、age键值对,map方法遍历以后返回元组Key-Value(null,obj)键值对,元组的第一个元素为null,因为保存至Hadoop时第一个元素是NullWritable;第二个元素为BasicB-SONObject对象obj。

4)调用SparkContext的saveAsNewAPIHadoopFile方法保存记录,saveAsNewAPIHa-doopFile方法的第一个参数是MongoDB结果保存的路径,即保存在MyDB数据库的MyCol-lection集合中,第二个参数是输入Key的类型Any,第三个参数是输入Value的类型Any,第四个参数是输出的类型MongoOutputFormat[Any,Any],第五个参数是Hadoop的配置类mongoConfig。

保存的结果如下所示:

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈