我们使用Spark Submit方式提交作业为例说明Standalone模式结合的运行原理,这里需要注意的是,一个Spark应用可以包含多个作业,在这里为了简单明了地讲清楚原理,我们一般把一个Spark应用看成只有一个作业,而对于Spark应用程序的提交运行最重要的就是作业的调度了,所以在下面的内容我们常会用作业来代指一个Spark应用程序。由于根据Driver(驱动程序)在集群中所处结点位置Standalone模式也可以分为两种情况:一种是Driver在Worker结点上运行的Cluster模式,一种是Driver在Client(客户端)结点运行的Client模式。在这里,我们选择Standalone的Cluster模式来讲解,而对Standalone的Client 模式的分析我们会在第5章讲解Spark的运行机制时再分析。
这里涉及的结点主要有Master结点、Worker结点、Client结点。涉及的进程主要有客户端提交任务进程Client、Master、Worker、CoarseGrainedExecutorBackend。下面我们会以图4-7所示的通过Spark Submit的方式向Spark集群提交作业的流程图,以及通过Spark 1.2的源代码来分析以Standalone模式运行的Spark内部实现原理。
图4-7 Spark Submit方式提交作业流程图
Spark作业(应用)运行的主要流程如下(以Spark Submit模式提交):
(1)我们已经知道启动Spark集群是通过Spark的sbin目录下的start-all.sh命令来进行的,那么现在我们用vim文本编辑器打开start-all.sh文件来看一下它的具体实现。
1)在start-all.sh这个shell脚本文件中可以看到,最终会调用两个可执行文件start-master.sh和start-slaves.sh来分别启动Spark集群的Master结点和Worker结点。
2)继续跟踪start-master.sh文件,在用vim打开的start-master.sh文件里,最终会通过org.apache.spark.deploy.master.Master类的伴生对象启动Master。
3)在start-slaves文件里,通过执行slaves.sh命令,会在$SPARK_HOME/conf目录下的slaves文件里找到集群搭建时设置的Worker结点的主机名。最终会通过调用org.apache.spark.deploy.worker.Worker类的伴生对象来启动所有Worker结点。
(2)Master类和Worker类都实现了基于Akka通信的Actor类,所以他们之间的消息通信都是借助Akka通信框架来进行的。在Master类和Worker类的初始化过程中,除了要初始化内部的成员变量,还会自动调用各自的preStart方法来完成Master结点和Worker结点的启动过程中的一些配置操作。当然对于Worker结点来说,它在启动的过程中,还需要向Mas-ter发送消息,申请注册。
1)在Worker类中,会在preStart()方法内部调用Worker类的registerWithMaster()来向已经启动的Master结点进行注册。
2)我们继续跟踪registerWithMaster()方法,看它的内部实现,这时它会继续调用Worker类的tryRegisterAllMasters()方法来完成注册。
3)在tryRegisterAllMasters()方法内部,会首先通过Worker初始化时绑定的Master结点的masterUrl拿到当前启动的Master结点的引用,然后通过Master结点的引用向Master发送RegisterWorker消息。
(4)Master类和Worker类都是通过各自的receiveWithLogging方法进行消息的接受和处理,当Master类的receiveWithLogging方法收到RegisterWorker消息后会根据RegisterWorker 所携带的要注册的Worker结点的一系列信息(比如workerHost、workerPort、cores等)对其进行注册,当注册成功后会发送一个RegisteredWorker消息给要注册的Worker结点,否则会发送一个RegisterWorkerFailed消息来告诉Worker结点注册失败。
(5)当Worker类(Worker结点)的receiveWithLogging方法接受到Master发送过来的消息,结合Scala的模式匹配进行消息的判断,如果是RegisteredWorker(注册成功)的消息,就会调用changeMaster()方法重新绑定当前启动的Master结点的masterUrl,并同时设置心跳(HEARTBEAT)的配置,定时向Master发送心跳。如果收到的消息是RegisterWorker-Failed(注册失败),很简单,它会打印当前Work结点注册失败的日志消息并调用Sys-tem.exit(1)结束当前运行的进程(每个开启的Work结点是在一个JVM中运行的进程)。
到这里,我们的Spark集群已经完全开启,下面就是介绍如何使用Spark Sumit工具来向集群提交我们自己编写的Spark应用程序了(在提交应用的过程我们重点讲述的是Spark应用程序中的作业调度)。
(3)在Spark中我们向集群中提交作业用的Spark Sumit工具,它的具体实现在Spark的bin目录下的spark-submit shell脚本文件中。
1)首先我们进入Spark的bin目录,然后用ls命令列出当前目录下的所有可执行文件。
2)我们用vim命令打开spark-submit脚本文件,里面的内容如下:
在上面展示的spark-submit脚本文件的最后一行我们可以看到,在提交任务时实际执行的是org.apache.spark.deploy.SparkSubmit类(当然这里需要注意的是SparkSubmit其实是一个对象,因为在Scala语言中,main方法只能放在对象里)。既然这样,我们就需要看一下SparkSubmit对象的具体实现。
3)在SparkSubmit对象中会先初始化相关成员变量,比如Cluster managers(集群资源管理器)和Deploy modes(部署模式)。当然在SparkSubmit对象中最重要的就是main方法里对SparkSubmitArguments类初始化,以及createLaunchEnv方法和launch方法的调用。SparkSubmitArguments类的初始化是对SparkSubmit脚本提交任务时带的附加参数进行解析并把解析后的结果赋值给SparkSubmitArguments的成员变量。createLaunchEnv方法通过传给它的SparkSubmitArguments对象实例,创建Launch环境(提交作业的Spark环境)。在这里我们使用的Standalone运行模式下的Cluster部署模式,所以launch方法用来启动Client伴生对象的main方法。
4)这里我们结合SparkSubmit内部的launch方法来分析一下它是如何启动Client类的main方法。在此强调一下:因为我们选用的Standalone运行模式下的Cluster部署模式,在SparkSubmit的launch方法里会通过反射调用org.apace.spark.deploy.yarn.Client伴生对象。具体实现如下:
(4)通过以上的launch方法,最后直接执行了Client伴生对象的main()方法,在main()方法主要做一件事,就是启动Client类(也就是客户端)的Actor——ClientActor,ClientActor类主要负责Client结点和Master结点的通信,比如ClientActor可以向Master发送注册Driver的消息。
1)在Client的main创建ClientActor对象。在Spark中各个模块(比如Client、Master和Worker结点)之间的通信采用的是Akka通信框架,而在Spark中使用AkkaUtils工具类对Akka进行了封装,这样做的好处在创建负责通信的模块(比如ClientActor类)的时候不需要关注Akka本身的很多实现细节。对于Akka的介绍,我们会在第5章讲Spark的运行机制时再分析。下面的Spark源代码中就是创建负责消息通信的ClientActor对象。
2)在ClientActor类中向Master注册Driver。这时在ClientActor类主要做了三件事:第一是根据ClientActor类初始化时传递进来的参数driverArgs来获得当前正在运行的Master结点的引用,ClientActor通过Master结点的引用向Master发送消息;第二是也是通过参数driv-erArgs并结合Scala语言的模式匹配来生成DriverDescription对象,DriverDescription对象中封装了要向Master结点中的Driver的所有信息;第三是通过Akka通信框架向Master发送Re-questSubmitDriver消息。
(5)在Master结点的receiveWithLogging方法中,对接受到的消息会结合Scala语言的模式匹配进行匹配,然后进行处理。
1)在这里Master结点收到了ClientActor类发送过来的RequestSubmitDriver消息,经过匹配判断后,它会先判断当前的Master结点是否处于活动状态(RecoveryState.ALIVE),如果当前的Master处于活动状态,那么它会先调用createDriver方法初始化Driver的描述信息,然后添加Driver的信息到Master的waitingDrivers中(waitingDrivers是一个ArrayBuffer),最关键的是最后对schedule方法的调用,在schedule方法中,会根据在Master结点注册的Worker结点的资源情况来进行调度。而我们这里的Driver的启动就是在schedule方法里实现的。
2)下面我们来看Master类的schedule方法,在schedule方法中,首先会在Master结点注册的所有Worker结点中随机选取一些来启动Driver,接着会从Master的成员变量waiting-Drivers中提取出等待启动的Driver,然后调用Master的launchDriver方法在对应的Worker结点启动Driver。
3)我们接着跟踪Master类的launchDriver方法的源代码,在Master的launchDriver方法中,会发送LaunchDriver消息给Worker结点(这里的worker.actor就是Worker类的引用)。
(6)Worker结点的receiveWithLogging方法接收到Master结点发送过来的LaunchDriver消息后,首先会做两件事:第一是实例化一个DriverRunner,DriverRunner对象负责管理Driver的执行。第二是Driver启动后会向Master结点发送RegisterApplication消息(注册Spark应用)。
1)receiveWithLogging方法收到的消息匹配LaunchDriver(driverId,driverDesc)样例类后先实例化DriverRunner对象,需要被启动的Driver的信息(driverId,driverDesc)作为参数传递给DriverRunner对象。接下来DriverRunner调用自己的start方法,在start方法内部会开启一个线程来运行和管理Driver。此时Driver会初始化SparkContext、DAGScheduler、Task-Scheduler等。
(7)Driver启动后会向Master发送RegisterApplication消息来申请注册Spark应用,Mas-ter类的receiveWithLogging方法收到该消息后主要做三件事:第一是调用registerApplication注册该Spark应用;第二是发送RegisteredApplication消息给Driver;第三是调用schedule()方法来启动Driver请求的Executor资源。
1)下面是Master的eceiveWithLogging方法接受到RegisterApplication消息后进行的一系列处理。
2)这时我们继续跟踪schedule()方法的实现,在schedule()方法中根据Master结点中注册的Worker结点的资源情况来调用Master的launchExecutor()方法,在launchExecutor()方法里会向Worker发送LaunchExecutor消息来启动Executor。
(www.xing528.com)
(8)Worker根据Master的资源分配结果来创建Executor。
1)在Worker类的receiveWithLogging方法收到LaunchExecutor消息后,会实例化一个ExecutorRunner对象,并且会调用该对象的start方法。ExecutorRunner对象负责管理Coar-seGrainedExecutorBackend进程的运行。
2)在ExecutorRunner的start()方法中,会开启一个线程,在这个线程中会调用fetchAn-dRunExecutor()方法,该方法负责从Driver上下载并运行准备好的ApplicationDescription。
3)接下来在fetchAndRunExecutor()方法中会启动ApplicationDescription中携带的org.apache.spark.executor.CoarseGrainedExecutorBackend类。其中CommandUtils类的buildPro-cessBuilder()方法中的参数appDesc指的就是ApplicationDescription。
4)CoarseGrainedExecutorBackend启动后,会向Driver端的CoarseGrainedSchedulerBack-end中的DriverActor发送RegisterExecutor(executorId,hostPort,cores)消息,DriverActor会回复RegisteredExecutor消息,此时CoarseGrainedExecutorBackend的receiveWithLogging方法收到消息后会创建一个org.apache.spark.executor.Executor对象。
至此,Executor创建完毕。Executor创建完毕后,Driver就可以向Spark的Worker结点提交作业了,下面我们开始分析Driver是如何一步步把作业划分成任务并提交给Worker上的Executor进行执行。
(9)对于我们编写的Spark应用程序,它向集群提交的唯一入口就是SparkContext类,而Spark应用里的每一个作业只有等到有Action操作的时候才会真正地向集群提交作业。比如sc.textFile("README.md").count()这段代码(这是一个常见的统计加入到内存的文件有多少行的例子),其中sc表示的是SparkContext的实例化对象,在这段代码中RDD的count()方法是一个Action操作,它会触发Spark作业的提交。
1)我们看一下Spark 1.2源码中的count()方法的实现,可以发现它会调用SparkContext的runJob()方法来向Worker结点提交作业,这里的runJob()方法最终会返回一个单机数组,该数组调用sum方法完成单词计数。
2)接下来我们就需要看一下SparkContext类中的runJob()方法的实现。在SparkContext类中对runJob()方法实现了一系列的重载,在我们上面使用sc.runJob()方法提交作业的过程中,它会连续调用自己重载过的方法,每个runJob()方法调用的自己的重载方法都会通过在参数列表增加参数来实现功能的添加,最终调用的下面的SparkContext的runJob()方法。例如在该runJob()方法中,参数allowLocal表示是否允许作业在本地运行,参数re-sultHandler是一个匿名函数,它是一个对最终的计算结果进行处理的句柄。
在以上的runJob方法中,最关键的一段代码就是dagScheduler.runJob(......),因为对于向Spark集群提交的作业,首先要经过DagScheduler调度器对其进行划分Stage。下面我们进入DagScheduler类的源码,看一下它的具体实现。
(10)在DagScheduler类中会对要提交的作业进行划分Stage,并决定各个Stage之间的依赖关系并以TaskSet的方式向TaskScheduler调度器进行Stage的提交,其中TaskSet是对Stage的封装,每一个TaskSet对应一个Stage。
1)我们首先看一下DAGScheduler的runJob()方法,在该方法内部会继续调用dag-Scheduler的submitJob()方法,其中submitJob()方法会返回一个JobWaiter类的对象waiter这里的waiter会处于一个阻塞状态,直到该作业执行完毕或者作业被消息后才会解除阻塞状态。
2)我们继续跟踪dagScheduler的submitJob()方法。在submitJob()方法中主要做的一件事是,把作业相关的信息封装成一个JobSubmitted样例类,然后会发送JobSubmitted消息给自己的消息处理器dagSchedulerEventProcessActor去集中处理。
3)dagSchedulerEventProcessActor的receive方法收到JobSubmitted消息后,会继续调用dagScheduler自己的handleJobSubmitted()方法去继续处理作业的提交。
4)在handleJobSubmitted()方法中会根据传递进来的参数finalRDD来创建finalStage finalStage,顾名思义,就是对要提交的作业划分Stage后,处在这个相互依赖的Stage链的最后的那个Stage。在创建好finalStage后,会首先判断该finalStage是否满足本地模式(Local)运行的条件,在下面源代码中,如果变量shouldRunLocally的值为真,则会调用runLocally(job)方法在Local模式下运行,否则会调用submitStage(finalStage)方法继续提交作业。
5)在submitStage()方法中,会根据finalStage的依赖getMissingParentStages来求出它的所有父Stage,并调用List集合的sortBy(_.id)方法按照id的大小进行排序后赋值给变量missing。接着会判断变量missing是否为Nil,如果missing==Nil,表明传递进来的Stage没有父Stage,这时就会调用submitMissingTasks(stage,jobId.get)继续向集群提交Stage;否则会遍历该Stage的父Stage,然后继续回调submitStage()方法。
6)如果一个Stage的所有的父Stage都已经计算完成,那么它会调用dagScheduler的sub-mitMissingTasks()方法来提交该Stage所包含的Tasks。在submitMissingTasks()方法中主要实现了Stage是如何生成TaskSet的,并调用TaskScheduler的submitTasks方法进行TaskSet(一系列Task)的提交。当然这里的TaskScheduler实际上是它的实现子类TaskSchedulerImpl。
(11)在以上的submitMissingTasks()方法中,被提交的Stage中包含的一系列Task被封装成了一个TaskSet,并通过调用TaskSchedulerImpl的submitTasks()方法继续向Spark集群提交任务,在这里再强调一下,一个TaskSet对应一个Stage。下面我们进入TaskScheduler调度器的实现子类TaskSchedulerImpl来看一下它是如何进行任务的提交的。
1)我们继续跟踪TaskSchedulerImpl类的submitTasks()方法,在该方法中首先实例化了TaskSetManager,TaskSetManager主要对TaskSet中的需要执行的任务进行管理;然后判断任务不是本机执行和非hasReceivedTask,并启动一个Timer对象的scheduleAtFixedRate()方法,以一定的时间间隔提交Task;最后调用backend的reviveOffers()进行下一步操作。这里的backend是SparkDeploySchedulerBackend,但是reviveOffers()方法是在它的父类CoarseG-rainedSchedulerBackend中实现的。
2)我们继续跟踪CoarseGrainedSchedulerBackend类中的reviveOffers()方法,这里的re-viveOffer()方法最终会向CoarseGrainedSchedulerBackend中的DriverActor类发送消息进行处理。
3)在DriverActor的receiveWithLogging方法中,对收到的ReviveOffers消息会继续调用CoarseGrainedSchedulerBackend的makeOffers()方法。
4)在makeOffers()中通过调用CoarseGrainedSchedulerBackend的launchTasks()方法发送消息给CoarseGrainedExecutorBackend。
5)我们继续跟踪CoarseGrainedSchedulerBackend的launchTasks()方法,在该方法中,它的参数tasks是通过scheduler.resourceOffers()方法进行资源申请后返回来的一系列Task,最终,每个Task会被循环地发送到与其绑定的Worker结点的已经启动的CoarseGrained-SchedulerBackend进程中。
至此,经过Driver端的DAGScheduler调度要提交的作业进行Stage划分,以及Task-Scheduler调度器对Stage进一步划分为一个个Task,并发送Task到Worker结点的CoarseG-rainedSchedulerBackend进程的Executor中执行,那么下面我们就要进入Worker结点相关的类中作进一步的分析。
(12)我们接下来会通过Worker结点相关的CoarseGrainedExecutorBackend类、Executor类TaskRunner类的源码实现来分析一下它们是如何处理从Driver端的CoarseGrainedSchedul-erBackend对象中发送过来的Task。
1)首先,在CoarseGrainedExecutorBackend的receiveWithLogging方法接受到CoarseG-rainedSchedulerBackend对象发送过来的LaunchTask信息后,会调用Executor的launchTask()方法让Task在绑定的Executor上运行。
2)我们继续跟踪Executor的launchTask()方法,在这个方法内部主要做了两件事情:一是初始化了一个TaskRunner对象来对Task进行管理,一是调用threadPool.execute(tr)Ex-ecutor方法把TaskRunner对象加入到一个Java的线程池中去运行。
3)下面我们的重心就是TaskRunner的源码实现了,看它是如何管理Task的实际运行的。TaskRunner是一个Runnable接口,它最重要的实现都在它的run()方法中,在run()方法中首先会对收到的Task信息进行反序列化,然后调用task.run(taskId.toInt)方法运行Task。这里需要注意的是如果运行的是ShuffleMapTask,会将结果保存到本地文件中,汇报ShuffleMapTask的存储信息给Driver,等待ResultTask或者其他ShuffleMapTask获取;如果是ResultTask,它会根据Driver中提供的信息进行结果的获取和并进行Reduce操作,最后执行结果汇报给Driver。
至此,一个Task就运行结束了,对于Task在Executor中运行的细节,我们在第5章讲Spark运行机制的作业调度的时候做了更加详细的分析,所以这里不在做更深入的探讨。
(13)最后,当Spark应用程序中所有的作业都提交并运行结束后。会调用SparkContext的stop方法结束应用。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。