Spark源码学习笔记(三):Spark-on-YARN调度器初始化

作为提交Spark应用的高级接口,SparkContext封装了分布式计算的绝大部分底层逻辑,API简单易用对用户十分友好。然而在简单的一行new SparkContext()命令背后,Spark为执行用户代码做了大量的准备,包括启动BlockManager、HeartbeatReceiver、MetricsSystem等等。不过最为关键的一点还是启动Spark调度器,这点从Spark应用的applicationId取自TaskScheduler的applicationId便可看出。

调度器初始化入口

每个应用都从SparkContext的实例化开始,而SparkContext初始化阶段的重要工作之一就是启动SchedulerBackend守护进程。SchedulerBackend作为Driver与Executor的RPC通信接口,应该Driver初始化时启动以便Executor启动后可以向Driver注册自己。同时SchedulerBackend的门面(Facade),TaskScheduler会被传递给SparkContext的DAGScheduler持有。此后DAGScheduler负责高级的作业(Job)和阶段(Stage)管理,将应用分解到TaskSet级别,交由TaskScheduler完成底层的任务(Task)分发、执行和监控等操作。

SparkContext创建TaskScheduler部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
master match {
//为简单起见,删除了local模式的case。
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
case masterUrl =>
//通过master url匹配合适的集群管理器并加载该类。
//masterUrl即spark-submit的`--master`参数,或SparkConf的setMaster指定的参数。
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
//以SchedulerBackend作为初始化TaskScheduler
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}

YARN模式下调度器的初始化流程

官方支持的集群管理器有Standalone、Mesos和YARN三种,分别对应StandaloneSchedulerBackend, MesosCoarseGrainedSchedulerBackend和MesosFineGrainedSchedulerBackend, YarnClusterSchedulerBackend和YarnClientSchedulerBackend三种调度器(后台)。

其中和Mesos一样作为外部集群管理器,YARN并没有常驻的Spark的守护进程,所以每个Spark应用需要在初始化期间启动这些守护进程(线程),并在应用结束时销毁他们。因此Spark守护进程需要在YARN的框架内实现,如果对YARN编程不太熟悉可以先阅读董西成的如何编写YARN应用程序

Spark-on-YARN有cluster和client两种不同的部署模式,两者区别在之前一篇博客提过,而它们的应用初始化流程区别就是如何启动YarnSchedulerBackend。

YARN cluster部署模式

YARN cluster模式的应用初始化

步骤如下:

  1. Spark通过YARN Client申请application id,提交AM初始化上下文后YARN Client退出。
  2. RM分配到一个container启动AM。AM初始化后先启动一个独立线程执行Spark Driver,后者通过反射获取并执行用户代码main方法。main方法本身会初始化SparkContext来注册Driver的RPC端口和监控端口,YarnClusterSchedulerBackend会在这个阶段启动。
  3. 等Driver成功启动后,AM启动自己的RPC终端并通过YarnRMClient向RM注册自己,然后不断轮询向RM请求container,并在通过NMClient通知NM在申请到的container上启动Spark Executor,直到executor数目达到预设数目。
  4. Executor初始化后启动自己的RPC终端,并向Driver注册自己,等待Driver调度。

YARN client部署模式

YARN client模式的应用初始化

具体步骤如下:

  1. Spark Submit直接运行用户提交的应用,期间会启动Spark Driver。SparkContext初始化时会启动YarnClientSchedulerBackend,后者通过YARN Client申请application id,提交AM初始化上下文后不断轮询RM获取application最新状态。
  2. RM分配到一个container启动AM。AM初始化后AM启动自己的RPC终端并通过YarnRMClient向RM注册自己。
  3. AM不断轮询向RM请求container,并在通过NMClient通知NM在申请到的container上启动Spark Executor,直到executor数目达到预设数目。
  4. Executor初始化后启动自己的RPC终端,并向Driver注册自己,等待Driver调度。

后记

这篇博客其实从今年较早的时候就开始写了,但YARN调度器这部分代码逻辑实在有点复杂,一边要实现YARN的API(花了不少时间去看YARN的协议),一边要完成Spark守护进程的功能。这让我也花了很多时间来整理,不过离深入浅出依然有很大差距。如果有可以改善的地方,希望读者不吝指教。

本文是原创文章,转载请注明:时间与精神的小屋 - Spark源码学习笔记(三):Spark-on-YARN调度器初始化