首页 理论教育 RDD转换:其他常用操作

RDD转换:其他常用操作

时间:2023-06-29 理论教育 版权反馈
【摘要】:从父RDD转换可得到新的RDD。通过Spark内核给用户提供的Transformation来对RDD进行各种算子的转化,形成新的RDD实现算法。接下来分析HadoopRDD的compute方法,源代码如下。接下来分析getPreferredLocations方法,该方法通过调用InputSplit接口的getLocations方法获得RDD所在的位置。由此可以得出两个结论:①getPartitions直接沿用了父RDD的分片信息。②compute函数是通过RDD遍历每一个数时接受一个匿名函数f进行处理的。

RDD转换:其他常用操作

从父RDD转换可得到新的RDD。通过Spark内核给用户提供的Transformation来对RDD进行各种算子的转化,形成新的RDD实现算法。下面通过一段常用代码来讲解一下,代码如下。

这里涉及两个RDD,第一个RDD是textFile,textFile是一个HadoopRDD经过map后的MapPartitionsRDD,第二个RDD是经过filter方法之后生成的一个FilteredRDD,即返回一个新的RDD(原RDD的子集)。

接下来将要讲解上述代码片段在Spark内部的处理流程。首先来看textFile方法,进入SparkContext这个类找到它,源代码如下。

由SparkContext类中textFile方法的源代码可知,hadoopFile后面加了一个map方法,取pair的第二个参数,最后在shell里面可看到它是一个MappredRDD。默认的defaultMinParti-tions的大小为2。源代码语句如下。

查看hadoopFile方法的源代码可知,hadoopFile做了3个操作,把Hadoop的配置文件保存到广播变量里,设置了路径,new了一个HadoopRDD返回。HadoopFile方法的源代码如下。

接下来分析HadoopRDD,通过以下源代码可知,HadoopRDD是一个对象。

HadoopRDD也是RDD,由于RDD具有五大特性,所以重点关注它的getPartitions方法、compute方法和getPreferredLocations方法。先看getPartitions方法,源代码如下。

它调用的是inputFormat自带的getSplits方法来计算分片,然后把分片HadoopPartition包装到Array数组里面返回。

接下来分析HadoopRDD的compute方法,源代码如下。(www.xing528.com)

HadoopRDD的输入值是一个Partition,返回值是一个Iterator[(K,V)]类型的数据,这里只关注两点即可:把Partition转换成HadoopPartition,然后通过InputSplit创建一个Recor-dReader;重写Iterator的getNext方法,通过创建的reader调用next方法读取下一个值。从这里可以看出,compute方法是通过分片来获得Iterator接口的,遍历分片的数据。

接下来分析getPreferredLocations方法,该方法通过调用InputSplit接口的getLocations方法获得RDD所在的位置。源代码如下。

最后分析MapPartitionsRDD,首先了解一下RDD类里面的map方法,map方法的源代码如下。

从map方法的源代码可知,map方法内部直接new了一个MapPartitionsRDD,还把匿名函数f处理来再传进去,接下来展开MapPartitionsRDD,MapPartitionsRDD的源代码如下:

由MapPartitionsRDD的源代码可知,MapPartitionsRDD把父类RDD的partitioner、getPar-titions和compute给重写了,而且每个分区的compute中都用到了firstParent[T]方法,first-Parent[T]方法的源代码如下。

继续追踪方法dependencies,dependencies的源代码如下。

由此可以得出两个结论:①getPartitions直接沿用了父RDD的分片信息。②compute函数是通过RDD遍历每一个数时接受一个匿名函数f进行处理的。现在应该可以看出compute函数有两个显著的作用,一个是在没有依赖的条件下,根据分片的信息生成遍历数据的Iter-able接口;另一个是在有前置依赖的条件下,在父RDD的Iterable接口上遍历每个元素时再接受一个方法处理。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈