首页 理论教育 RDD Lineage关系案例与源码解析

RDD Lineage关系案例与源码解析

时间:2023-06-25 理论教育 版权反馈
【摘要】:理解RDD的Lineage关系的案例与源码解析的内容:解析RDD中与Lineage关系相关的三个组成部分。RDD抽象概念对Lineage关系的天然支持,是构建Spark DAG这一高层调度机制的基石。参考章节1.3RDD的编程模型,从分区列表、计算每个分片的函数以及对父RDD的依赖列表这三个RDD的组成部分对Lineage关系进行解析:1)compute:计算每个分片的函数。SparkContext是Spark功能的主入口点,命名为sc,在接下来的案例解析中直接使用sc来表示。调用toDebugString查看其Lineage的关系:从HadoopRDD转换为MapPartitionsRDD。

RDD Lineage关系案例与源码解析

RDD作为Spark的基本计算单元,是Spark的一个最核心的抽象概念,可以通过一系列算子进行操作,包括Transformation和Action两种算子操作。本节针对前面的案例进行详细解析,主要从源码、界面监控信息等方面进行详细解析。

接下来采用Spark Shell交互式工具运行Spark应用程序,应用程序的运行方式参见章节2.1Spark应用程序部署。

在此先介绍一些代码阅读技巧,常用的一些代码操作技巧包括:

1)在Shell交互界面上,可以通过按【TAB】键自动补全代码。

2)IntellijIDEA中:按住【CTRL】键,同时单击某个符号(如类、方法等),可以跳到该符号的定义;或鼠标定位于该符号,然后按【F4】键或【Ctrl+B】组合键,跳转到该符号的定义;或在符号处单击右键,在弹出上下文菜单中,选择Go To→Declaration,如图2.16所示。

978-7-111-51909-6-Chapter02-74.jpg

图2.16 IDEA中符号声明的调整

通常在IDE中阅读源码时,可以通过查看菜单、右键的上下文菜单等信息,来获取比较常用功能的快捷键。

理解RDD的Lineage关系的案例与源码解析的内容:解析RDD中与Lineage关系相关的三个组成部分。RDD抽象概念对Lineage关系的天然支持,是构建Spark DAG这一高层调度机制的基石。参考章节1.3RDD的编程模型,从分区列表、计算每个分片的函数以及对父RDD的依赖列表这三个RDD的组成部分对Lineage关系进行解析:

1)compute:计算每个分片的函数。

2)getPartitions:分区列表。

3)getDependencies:对父RDD的依赖列表。

Lineage关系,即血统关系,可以从两个角度进行解析,一个是构建一个RDD的数据流的血统关系,一个是RDD构建的算子流的血统关系。

SparkContext是Spark功能的主入口点,命名为sc,在接下来的案例解析中直接使用sc来表示。

一、TextFile案例与解析

案例代码:

978-7-111-51909-6-Chapter02-75.jpg

978-7-111-51909-6-Chapter02-76.jpg

代码解析如下:

通过SparkContext的textFile接口,从外部存储系统加载数据,构建RDD,即textFile。调用toDebugString查看其Lineage的关系:从HadoopRDD转换为MapPartitionsRDD。

获取RDD的分区数的方法有两种,获取Spark的所有RDD的默认分区数的方法sc.defaultParallelism和获取Spark的具体RDD实例的分区数的方法rdd.partitions.size,如textFile.partitions.size,这里的textFile为加载文件数据后构建的RDD实例。

这里使用sc.getConf.getInt("spark.default.parallism",0)来获取当前设置的默认分区数对应的配置属性值。

二、HadoopRDD、MapPartitionsRDD源码解析

根据Lineage的关系从HadoopRDD转换为MapPartitionsRDD,解析源码:

从主入口点SparkContext的源码开始解析,打开IDEA,使用【Ctrl+N】组合键,在弹出窗口中输入SparkContext,按【Enter】键进入SparkContext源码后,查看textFile源码,如图2.17所示。

图2.17 IDEA中查找、打开类文件

使用【Ctrl+F】组合键,在SparkContext源码中查找textFile方法,找到的textFile方法的源码如下所示:

978-7-111-51909-6-Chapter02-78.jpg

根据Lineage的关系:从HadoopRDD转换为MapPartitionsRDD,首先解析HadoopRDD,按住【Ctrl】键,单击HadoopRDD类名,进入HadoopRDD的源码,进入的源码如下所示:

978-7-111-51909-6-Chapter02-79.jpg

继续查看HadoopRDD类,按住【Ctrl】键,单击HadoopRDD类名,进入HadoopRDD的源码,如下所示,在HadoopRDD的源码中可以看到,对应HadoopRDD的分区类为Hadoop-Partition(根据类的名字,以及getPartitions方法中构建的分区的类型),这是getPartitions方法中需要构建的分区类型:

978-7-111-51909-6-Chapter02-80.jpg

通过deps成员查看HadoopRDD的父依赖:在HadoopRDD的主构造函数中,我们可以看到其依赖的父RDD(即类的deps成员)为Nil,即HadoopRDD的父依赖RDD为空,这是因为它是从外部数据加载的,而不是从其他RDD转换得到的。

继续查看重载的getPartitions方法,解析HadoopRDD是如何记录各个分区的数据来源信息:

978-7-111-51909-6-Chapter02-81.jpg

其中,构建HadoopRDD分区时传入的minPartitions参数(参数表示最小分区个数)会作为输入文件split的个数的参考值。可以看到在代码中使用inputSplits构建出(即new出HadoopRDD的实例)HadoopRDD的全部分区。

到这一步,可以得出HadoopRDD的分区HadoopParition与Hadoop的InputFormat的Split一一对应,即HadoopRDD各个分区的数据来源对应于InputFormat指定的Split,通常对应于Hadoop文件的分块。

继续解析Hadoop的compute方法,该方法指定对分区的数据源的读取方式,并没有真正开始读取,在Action触发之后,调用该compute方法获取Iterator时才会开始读取数据。

978-7-111-51909-6-Chapter02-82.jpg

978-7-111-51909-6-Chapter02-83.jpg

978-7-111-51909-6-Chapter02-84.jpg(www.xing528.com)

在代码中,构建分区数据的Iterator时,使用RecordReader来读取当前Split的数据。至此,HadoopRDD的分区构建和分区计算都已经解析完成。继续MapPartitionsRDD源码的解析,对应MapPartitionsRDD源码的入口点如下:

978-7-111-51909-6-Chapter02-85.jpg

在textFile方法中通过map方法将HadoopRDD再次转换为MapPartitionsRDD。查看map方法的源码,可以看到HadoopRDD被转换为MapPartitionsRDD:

978-7-111-51909-6-Chapter02-86.jpg

其中,传入的参数f:T=>U被作用到iter上。

按【Ctrl】键,单击MapPartitionsRDD,进入MapPartitionsRDD源码:

978-7-111-51909-6-Chapter02-87.jpg

978-7-111-51909-6-Chapter02-88.jpg

同样,通过MapPartitionsRDD主构造函数可以得到依赖的父RDD,当前传入的RDD为HadoopRDD。

查看分区方法getPartitions源码,解析各个分区的数据来源:firstParent为传入的父RDD,getPartitions记录了MapPartitionsRDD分区的数据从依赖的父RDD分区中获取的关系。

查看重载的compute方法:记录对分区数据来源的操作,MapPartitionsRDD对父RDD分区的执行传入的f操作。

通过以上的解析,可以得到从外部存储系统的数据集到MapPartitionsRDD的整个Line-age的关系如图2.18所示。

978-7-111-51909-6-Chapter02-89.jpg

图2.18 Lineage数据流与操作算子流图

上图对应了前面的构建出MapPartitionsRDD的整个Lineage关系的过程,从该图中可以解析出通用的Lineage关系,具体从Lineage的数据流和处理流两个角度进行解析,即RDD的血统关系,记录了构建RDD时,各个点上的数据的流动关系,以及数据从上一个点到下一个点流动时,经过了什么样的处理,对应的两个关系图如下:

1)数据流关系图:每一个RDD记录了内部各个分区的数据来源,数据来源可以是外部存储系统,也可以是其他依赖的父RDDs,通过上图中的分区对应的数据来源的虚线箭头,可以看到Block数据和Partition数据之间的数据流关系。

2)处理流关系图:当该RDD作为其他RDD的数据来源,或作为外部存储系统的数据来源时,该RDD对各个分区数据的处理,通过上图中的分区数据上的算子的虚线箭头,可以看到Block数据和Partition数据之间在数据流动时,在流动数据上进行的算子操作,即对这些流动数据的处理。

整个Lineage记录了数据流关系和处理流关系,仅仅在Action操作时,才会将记录的关系提交到Job上,真正执行起来。

三、ReduceByKey的案例与源码解析

MapPartitionsRDD是一个窄依赖,为了进一步解析Lineage机制中的宽依赖,基于redu-ceByKey转换操作来解析如何形成ShuffledRDD。

查看reduceByKey的lineage关系图:

978-7-111-51909-6-Chapter02-90.jpg

其中,[Memory Deserialized 1x Replicated]是该RDD的缓存级别信息,在调用了cache等持久化操作后就会出现该信息。

MapPartitionsRDD已经解析过,下面解析ShuffledRDD。ShuffledRDD源码如下:

978-7-111-51909-6-Chapter02-91.jpg

从源码可知,ShuffledRDD的分区类型为ShuffledRDDPartition。从ShuffledRDD的主构造函数可知,其父依赖为传入的MapPartitionsRDD。

对应RDD的三个主要方法,源码如下:

978-7-111-51909-6-Chapter02-92.jpg

在getDependencies方法中,将父依赖RDD封装到了ShuffleDependency中,在DAG调度时,DAG是根据Shuffle来划分Stage的,而是否为Shuffle,则是通过判断RDD对父依赖的依赖类型是否为ShuffleDependency。

查看ShuffleDependency的源码:

978-7-111-51909-6-Chapter02-93.jpg

可以看到,每个ShuffleDependency分配了SparkContext中全局唯一的ID值shuffledId,可以通过这唯一的标识来找到Shuffle数据;同时,在构造ShuffleDependency时,向Shuffle-Manager注册了一个ShuffleHandle,它负责计算过程中,Shuffle过程的读写操作。

查看gePartitions方法的源码,可以看到该方法只是根据分区器part,构建了Shuffle-dRDD的各个分区,记录了数据来源;查看compute方法,该方法中使用了之前在ShuffleMa-nager注册过的ShuffleHandle,用ShuffleHandle的Reader来实际获取数据。

这部分可以参考下之前对HadoopRDD的compute(方法名),都是使用某种Reader(读取数据的类名)来获取数据,并没有对数据做处理。

下面通过DAG、ShuffledRDD的简单执行图,来加深解析。执行流程如图2.19所示。

978-7-111-51909-6-Chapter02-94.jpg

图2.19 Stage划分以及父子RDD数据流图

由上图可以看到,当DAG碰到Shuffle依赖时,会切分Stage,Stage前的RDD分区数据会通过ShuffledDependency的shuffleId作为Shuffle的唯一标识,利用ShuffleHander的Writer(写数据的类名)写数据。Stage的RDD在读取各个分区数据时,compute方法使用Shuffle-Hander的Reader,根据shuffleId去读取数据。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈