首页 理论教育 TaskScheduler原理解析

TaskScheduler原理解析

时间:2023-06-29 理论教育 版权反馈
【摘要】:TaskSchedulerImpl的start方法中还会根据配置判断是否周期性地检查任务的推测执行。TaskSchedulerImpl启动后,就可以接收DAGScheduler的submitMissingTasks方法提交过来的TaskSet进行进一步处理。如果Task执行过程中有错误导致失败,会调用TaskSetManager来处理Task失败的情况,进而通知DAGScheduler结束当前的Task。TaskSetManager会将失败的Task再次添加到待执行的Task队列中。注意,Spark Task允许失败的次数默认是4次,在TaskSchedulerImpl初始化时通过spark.task.maxFailures设置该默认值。如果Task执行完毕,执行的结果会反馈给TaskSetManager,由TaskSetManager通知DAGScheduler。

TaskScheduler原理解析

通过之前DAGScheduler的介绍可以知道,DAGScheduler将划分的一系列Stage(每个Stage封装一个TaskSet),按照Stage的先后顺序依次提交给底层的TaskScheduler去执行。下面来分析TaskScheduler接收到DAGScheduler的Stage任务后,是如何管理Stage(TaskSet)的生命周期的。

首先,回顾一下DAGScheduler在SparkContext中实例化时,TaskScheduler和Scheduler-Backend就已经先在SparkContext的createTaskScheduler创建出实例对象了。

注意,虽然Spark支持多种资源部署模式(包括Local、Standalone、YARN和Mesos等),但是底层调度器TaskScheduler接口的实现类都是TaskSchedulerImpl。并且,为了方便读者理解TaskScheduler,对于SchedulerBackend的实现也只专注Standalone部署模式下的具体实现SparkDeploySchedulerBackend来做分析。

TaskSchedulerImpl在createTaskScheduler方法中实例化后,就立即调用自己的initialize方法把SparkDeploySchedulerBackend的实例对象传进来,从而赋值给TaskSchedulerImpl的backend。在TaskSchedulerImpl的initialize方法中,根据调度模式的配置创建实现了Schedul-erBuilder接口的相应实例对象,并且创建的对象会立即调用buildPools创建相应数量的Pool存放和管理TaskSetManager的实例对象。实现SchedulerBuilder接口的具体类都是Scheduler-Builder的内部类。

1)FIFOSchedulableBuilder:调度模式是SchedulingMode.FIFO,使用先进先出策略调度。这是默认模式,在该模式下,只有一个TaskSetManager池。

2)FairSchedulableBuilder:调度模式是SchedulingMode.FAIR,使用公平策略调度。

在createTaskScheduler方法返回后,TaskSchedulerImpl通过DAGScheduler的实例化过程设置DAGScheduler的实例对象,然后调用自己的start方法。在TaskSchedulerImpl调用start方法时,会调用SparkDeploySchedulerBackend的start方法,在SparkDeploySchedulerBackend的start方法中会最终注册应用程序AppClient。TaskSchedulerImpl的start方法中还会根据配置判断是否周期性地检查任务的推测执行。

TaskSchedulerImpl启动后,就可以接收DAGScheduler的submitMissingTasks方法提交过来的TaskSet进行进一步处理。TaskSchedulerImpl在submitTasks中初始化一个TaskSetManag-er对其生命周期进行管理,当TaskSchedulerImpl得到Worker结点上的Executor计算资源时,会通过TaskSetManager来发送具体的Task到Executor上执行计算。

如果Task执行过程中有错误导致失败,会调用TaskSetManager来处理Task失败的情况,进而通知DAGScheduler结束当前的Task。TaskSetManager会将失败的Task再次添加到待执行的Task队列中。

注意,Spark Task允许失败的次数默认是4次,在TaskSchedulerImpl初始化时通过spark.task.maxFailures设置该默认值。

如果Task执行完毕,执行的结果会反馈给TaskSetManager,由TaskSetManager通知DAGScheduler。DAGScheduler根据是否还存在待执行的Stage,继续迭代提交对应的TaskSet给TaskScheduler去执行,或者输出Job的结果。

结合第5章执行器(Executor)可知,通过下面的调度链,Executor把Task执行的结果返回给调度器(Scheduler)。(www.xing528.com)

1)Executor.run。

2)CoarseGrainedExecutorBackend.statusUpdate(发送StatusUpdate消息)。

3)CoarseGrainedSchedulerBackend.receive(处理StatusUpdate消息)。

4)TaskSchedulerImpl.statusUpdate。

5)TaskResultGetter.enqueueSuccessfulTask或者enqueueFailedTask。

6)TaskSchedulerImpl.handleSuccessfulTask或者handleFailedTask。

7)TaskSetManager.handleSuccessfulTask或者handleFailedTask。

8)DAGScheduler.taskEnded。

9)DAGScheduler.handleTaskCompletion。

上面的调度链值得关注的是:第7)步中,TaskSetManager的handleFailedTask方法会将失败的Task再次添加到待执行Task队列中。在第6)步中,TaskSchedulerImpl的handle-FailedTask方法在TaskSetManager的handleFailedTask方法返回后,会调用CoarseGrained-SchedulerBackend的reviveOffers方法给重新执行的Task获取资源。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈