Spark源码学习笔记(二):Executor运行机制

Executor是执行用户代码的“一线工人”(worker),理解Executor的运行机制无论对编写高效优雅的Spark Application还是对Task的troubleshooting都尤为重要。

注:吸取了broadcast包1.6版本和2.x版本差异较大的教训,我决定直接跳到今年7月发布的Spark 2.2.0版本,之后的所有笔记也会基于2.2.0版本。

Executor与Driver间的通信

executor-driver通讯
ExecutorBackend为executor后台服务,它与Driver通信,管理了executor从初始化注册、运行任务到停止释放资源的整个生命周期,并向Driver报告executor和在上面执行的task的状态。除了心跳则直接向Driver报告,executor所有与Driver的通信都通过ExecutorBackend完成。

Executor

Executor是一个JVM实例,持有可以同时运行多个task attempt的工作线程池,并负责task的监控和管理。

Executor Variable

  • currentFiles: Executor当前工作目录的文件。
  • currentJars: Executor当前classpath的jar。
  • EMPTY_BYTE_BUFFER: 空的byte数组,用于更新executor状态而不需要发送实际数据的情况。
  • conf: SparkConf实例,保存了用户设置与系统默认的Spark配置。
  • threadPool: 工作线程池,线程是UninterruptibleThread类型,以防被外部中断。
  • executorSource: 监控指标源,用于统计executor的执行情况指标。
  • taskReaperPool: task清理器线程池,用于cancel或者kill task。
  • taskReaperForTask: taskId->taskReaper的HashMap,它是同步的,以确保不会为某个task重复创建taskReaper。
  • userClassPathFirst: SparkConf中的”spark.executor.userClassPathFirst”配置项,用于决定executor先加载系统classpath还是用户classpath,默认false。
  • taskReaperEnabled: SparkConf中的”spark.task.reaper.enabled”,用于决定是否启用taskReaper特性,默认false。
  • urlClassLoader: 通过url加载class的ClassLoader。
  • replClassLoader: REPL使用的ClassLoader,会在运行时加载用户新定义的类。
  • maxDirectResultSize: SparkConf中的”spark.task.maxDirectResultSize”,直接发送计算结果的数据量阈值,超过这个阈值用BlockManager来发送,默认512M。
  • maxResultSize: executor允许的最大结果集,超过这个数量的结果集将不能返回给Driver,默认1G。
  • runningTasks: taskId->taskRunner的ConcurrentHashMap,维护taskId到taskRunner的映射。
  • heartbeater: 维持与Driver间心跳的线程池。
  • heartbeaterReceiverRef: Driver的心跳接收器引用,这个变量应该在开始executor心跳之前初始化。
  • HEARTBEAT_MAX_FAILURES: SparkConf中的”spark.executor.heartbeat.maxFailures”,最大允许的心跳连续失败次数。
  • heartbeatFailutres: 心跳的连续失败次数,它应该被heartThread访问,每次成功的心跳将它清零。

Executor Method

  • numRunningTasks:
    numRunningTasks
    返回正在运行的task数目。
  • launchTask:
    launchTask
    将taskDescription交给一个taskRunner,将后者注册到runningTasks并放入threadPool执行。
  • killTask:
    killTask
    根据taskId杀死task,interruptThread参数决定了是否要中断正在执行的线程. 这个函数在taskReaperEnabled为true的情况下会使用taskReaper来杀死task,否则直接从runningTasks中获取对应的taskRunner并杀死它。
  • killAllTasks:
    killAllTasks
    杀死该executor正在运行的所有task,该方法由executorBackend调用以杀死所有task,而不是直接关停JVM。
  • stop:
    stop
    关停executor。
  • computeTotalGcTime:
    computeTotalGcTime
    返回executor的总GC时间。
  • createClassLoader:
    createClassLoader
    创建task执行时的ClassLoader,并加载用户定义类。
  • addReplClassLoaderIfNeeded:
    addReplClassLoaderIfNeeded
    为REPL创建另一个ClassLoader,在用户使用REPL时,这个ClassLoader会实时读取用户定义的新类。
  • updateDependencies:
    updateDependencies
    当SparkContext受到新的文件或者jar时调用,从Driver下载缺少的包,并加载到ClassLoader, 通常是当SparkContext.addJar被调用的情况。
  • reportHeartBeat:
    reportHeartBeat
    向Driver报告活跃task的状态,Executor会将各个task的metrics作汇总,计算更新数据量,并通过Driver RPC服务heartbeatReceiverRef报告给Driver。
  • startDriverHeartbeater:
    startDriverHeartbeater
    启动一个任务用于报告executor心跳和部分metrics,交给heartbeater调度。

Executor.TaskRunner

TaskRunner是一个线程类,用于实际运行task,它持有ExecBackend对象可以直接向Driver报告task执行状态。

Executor.TaskRunner Variable

  • executorBackend: ExecutorBackend类型,executor的后台服务类,用于向Driver报告更新task状态。
  • taskDescription: TaskDescription类型,任务描述。
  • taskId: 任务id,从taskDescription中获取taskId。
  • taskName: 任务名,从taskDescription获取中taskName。
  • reasonIfKilled: 任务被杀死的原因,该变量volatitle,只在运行时有用。
  • threadId: taskRunner的线程id,同样是volatitle,只在运行时有用。
  • finished: 该taskRunner是否已经完成。
  • startGCTime: task启动时JVM的GC时间。
  • task: 从Driver发送过来的要执行的task。

Executor.TaskRunner Method

  • getThreadId:
    getThreadId
    返回threadId。
  • isFinished:
    isFinished
    返回finished。
  • kill:
    kill
    通过task的kill方法杀死task。这种kill方法属于“尽最大努力”的操作,它只是把task的标识为killed,实际上是否起作用决定于上层的Spark代码是否还会读取这个标示位。所以更保险的方法是使用下文讲到的TaskReaper。
  • setTaskFinishedAndClearInterruptStatus:
    setTaskFinishedAndClearInterruptStatus
    将finished标识位置为true,并清理线程的中断状态。
  • run:
    run
    TaskRunner的核心方法,是task.run的封装,负责了task的上下文管理和统计信息。步骤如下:
    1.初始化taskRunner状态(threadIdthreadName、反序列化消耗的时间、task开始时间、cpu时间等),设置ClassLoader和序列化器。
    2.初始化task状态,更新task依赖,反序列化task对象。
    3.执行task,调用task.run,每次视为task的一个attempt。执行完释放资源,包括blockManager上的锁以及在taskMemoryManager上为该task attempt分配的内存。如果用户代码有报错,taskRunner会将其打印到错误日志。该步骤如果没有异常则转步骤4,否则跳到步骤5。
    4.计算task metrics统计信息,并将计算结果序列化后返回给Driver,更新task的状态为成功。
    5.异常处理,根据异常模式匹配分为几种情况:

    • FetchFailedException: 顾名思义,该异常表示有某些block无法从其他机器上获取。这种情况下Spark会将该task中断并标记为Failed。
    • TaskKilledException: 检测到task已经被杀死,将该task中断并标记为Killed。这种情况在用户通过YARN等集群管理器杀死任务时很常见。
    • InterruptedException: Executor本身被中断,将该task中断并标记为Killed。这种情况可能发生在抢占式的资源调度池,任务占用了本不属于自己的资源,而这些资源由于新的任务进入资源池而被收回。
    • CommitDeniedException: 无法提交task运行结果。这种异常还没见过。
    • Throwable: 不属于上面任何情况的异常,这时taskRunner会判断是否属于fatal,若是则将task的metrics和异常信息序列化后报告给Driver,然后通过uncaughtExceptionHandler来停止该executor。但如果恰好这时executor处于shut down中,该异常不会报告给Driver,因为有可能和用户设置的shutdown钩子冲突。

    6.从runningTasks映射中删除该task。

Executor.TaskReaper

直译为任务收割器,实现了Runnable接口,负责杀死或者取消任务。这是从Spark 2.1.1和Spark 2.2.0开始引入的特性,用于修复任务的杀死和取消机制的不可靠问题。因为task的杀死和取消机制是被动的,task.kill方法只是将killed标识置为true,但是具体什么时候task才停止决定于task是否interruptable以及task的运行是否会再次读取killed标识并作出反应。因此,有可能用户从监控页面看到task已经被杀死,而其实还有僵尸task一直占用该executor。TaskReaper针对上述问题提供了主动杀死task的方法:它会一直监视已经标记为killed的task,直到该task退出。另外,TaskReaper也提供超时机制,若task超过约定时间没有退出,则直接杀死JVM以释放资源。

Executor.TaskReaper Varibale

  • taskRunner: 正在执行该需要杀死的task的TaskRunner实例。
  • interruptThread: 是否中断线程。
  • reason: 杀死任务的原因。
  • taskId: 要杀死的taskId。
  • killPollingIntervalMs: taskReaper轮询的时间间隔,可以通过spark.task.reaper.pollingInterval来设置,默认10s。
  • killTimeoutMs: task退出的超时时长,可以通过spark.task.reaper.killTimeout来设置,默认-1,即不设置超时。
  • takeThreadDump: 是否需要对task进行threadDump,可以通过spark.task.reaper.threadDump配置,默认为true。

Executor.TaskReaper Method

  • run:
    run
    不断轮询检测task状态,直到task退出。步骤如下:
  1. 调用taskRunner.kill常规方法来杀死task。
  2. 轮询检测taskRunner状态。若未完成则等待(wait)一个轮询周期,意味着在这期间任何其他对takRunner的调用都会被阻塞;若已经完成,则如果需要的话进行threadDump。
  3. 同步更新taskReaperForTask映射,将该taskId对应的Entry移除。

总结

Executor类只负责核心的与task运行紧密相关的操作,其他服务类的模块比如Block管理、内存管理都封装在SparkEnv中,会在用到时通过全局变量SparkEnv获取,符合高内聚低耦合的设计原则,因此可以兼容多种集群模式。Executor并未直接与其他进程通信,相关的操作是由ExecutorBackend类在后台通过RPC服务完成的。

本文是原创文章,转载请注明:时间与精神的小屋 - Spark源码学习笔记(二):Executor运行机制