首页 理论教育 Spark RDD操作:行动操作与存储实现

Spark RDD操作:行动操作与存储实现

时间:2023-06-20 理论教育 版权反馈
【摘要】:Action操作是和transformation操作相对应的另外一种RDD的核心操作,在Spark的程序运行中,每调用一次Action操作,都会触发一次Spark的作业提交并返回相应的结果。Action操作将RDD直接保存到外部文件系统或者数据库中,比如将RDD保存到HDFS文件系统中。reduce方法相当于对RDD进行reduceleft方法的操作。reduceLeft先对两个元素Key-Value进行reduce操作,然后将结果和迭代器取出的下一个元素进行reduce操作,直到迭代器遍历完所有元素,得到最后结果。

Spark RDD操作:行动操作与存储实现

Action操作是和transformation操作相对应的另外一种RDD的核心操作,在Spark的程序运行中,每调用一次Action操作,都会触发一次Spark的作业提交并返回相应的结果。从目前Spark提供的API来看,Action操作可以分为以下两种类型:

(1)Action操作将标量或者集合返回给Spark的客户端程序,比如返回RDD中数据集的数量或者是返回RDD中的一部分符合条件的数据。

(2)Action操作将RDD直接保存到外部文件系统或者数据库中,比如将RDD保存到HDFS文件系统中。

1.Scala集合标量Action操作

(1)count方法返回RDD中元素的个数。

(2)collect方法相当于toArray,toArray已经过时不推荐使用,collect将分布式的RDD返回为一个单机的scala Array数组

(3)reduce方法相当于对RDD进行reduceleft方法的操作。reduceLeft先对两个元素Key-Value进行reduce操作,然后将结果和迭代器取出的下一个元素进行reduce操作,直到迭代器遍历完所有元素,得到最后结果。

(4)take方法将RDD作为集合,返回结合中[0,num-1]下标的元素。

(5)first方法返回RDD的第一个元素。

(6)fold方法是aggregate的便利接口,其中,op操作既是seqOp操作也是combOp操作,且最终的返回类型也是T,即与RDD中的每一个元素的类型是一样的。

(7)foreach方法在数据集的每一个元素上,运行函数f。这通常用于更新一个累加器变量,或者和外部存储系统做交互。

(8)lookup方法是针对Key-Value类型的操作,对于给定的值,返回与此键值对应的所有值。

(9)takeOrdered方法返回最小的num个元素,并且在返回的数组中保持元素的顺序。

(10)top方法返回最大的num个元素。(www.xing528.com)

(11)aggregate操作主要需要提供两个函数,一个是seqOp函数,其将RDD(RDD中的每个元素的类型是T)中的每一个分区的数据聚合成类型为U的值。另一个是combOp函数将各个分区聚合起来的值合并在一起得到最终类型为U的返回值。这里的RDD的元素类型和返回值的类型U可以为同一个类型。

2.输出数据到外部文件存储系统的Action操作

RDD最后的归宿除了可以返回为集合和标量,也可以存储到外部文件系统或者数据库中,Spark系统与Hadoop系统是完全兼容的,所以对于MapReduce所支持的读写文件或者数据库类型,Spark也同样支持。另外,由于Hadoop的API有新旧两个版本,所有Spark为了能够兼容Hadoop所有的版本,也提供了两套API。

(1)saveAsObjectFile方法生成包含序列化对象的SequenceFile写到本地或者hadoop文件系统。

(2)saveAsTextFile方法将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其他hadoop支持的文件系统。具体来说,在SaveAsTextFile方法内部会先通过调用RDD的map(x=>(NullWritable.get(),new Text(x.toString)))方法将RDD中的每个元素转换为文件中的一行文本,然后在SaveAsTextFile方法内部会继续调用saveAsHadoopFile方法将数据保存到本地文件系统或者Hadoop支持的文件系统。

(3)saveAsHadoopDataset方法的参数类型JobConf,JobConf是Hadoop的配置对象,Job-Conf既可以通过它的setInputFormat方法来指定输入路径集合,也可以通过setOutputFormat方法设置任务结果输出路径,所以在这里saveAsHadoopDataset方法不仅能将RDD存储到HDFS文件系统中,也可以将RDD存储到其他数据库中,如Hbase、MangoDB、Cassandra等。

(4)saveAsHadoopFile方法支持RDD存储到Hadoop支持的文件系统(比如HDFS)中。将RDD保存到Hadoop支持的文件系统中通常情况下考虑五个参数,包括文件保存的路径、RDD中key的类型,RDD中value的类型、RDD的输出格式(outputFormat,如TextOutput-Format、SequenceFileOutputFormat),以及参数codec是否需要进行压缩。

1)第一个saveAsHadoopFile方法中的参数列表中需要传入path(文件保存的路径)、keyClass(RDD中key值的类型)、valueClass(RDD中value值的类型)、outputFormatClass(RDD的输入格式)以及参数codec的默认值None。

2)第二个saveAsHadoopFile方法中的参数列表中需要传入path(文件保存的路径)、keyClass(RDD中key值的类型)、valueClass(RDD中value值的类型)、outputFormatClass(RDD的输入格式)以及参数codec的值。

3)第三个saveAsHadoopFile方法中的参数列表中需要传入path(文件保存的路径)和参数codec的值。

4)第四个saveAsHadoopFile方法中的参数列表中只需要传入path(文件保存的路径)。

(5)针对新版本Hadoop API提供了三个action操作,与旧版本的Hadoop的函数使用方法类似,后两个API支持将RDD保存到HDFS中,而saveAsNewAPIHadoopDataser则支持所有MapReduce兼容的输入输出类型。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈