谈谈 Flink Shuffle 演进

在分布式计算中,Shuffle 是非常关键但常常容易被忽视的一环。比如著名的 MapReduce 的命名跳过 Shuffle ,只包含其前后的 Map 跟 Reduce。背后原因一方面是 Shuffle 是底层框架在做的事情,用户基本不会感知到其存在,另一方面是 Shuffle 听起来似乎是比较边缘的基础服务。然而,笔者认为大数据计算与在线服务最基础的区别之一正在于 Shuffle。

众所周知,分布式算法的基础在于分治,而分治的三步为: 分解(Divide)、解决(Conquer)、合并(Combine),其中最为核心的分解与合并两步都与 Shuffle 密不可分。除了数据同步之类的完全并行(Embarrassingly Parallel)作业,大多数分布式计算作业都会包含一到多轮 Shuffle,而这些 Shuffle 的本质则是将上一轮计算的中间结果按照下一轮计算需要的方式重新组织。例如,在著名的 WordCount 案例中,在 Map 阶段的数据是随机分布的,而在 Shuffle 过后则是按照单词为分区 Key 来分布。而剩余的完全并行作业,本质上并不是在处理一个需要分治的大问题,而是处理重复的大量小问题,这样的需求其实跟普通的 Web 服务是类似的,若不考虑效率,完全可以用微服务框架来实现。

如果说数据分治的核心在于 Shuffle,那么计算分治的核心则在于调度器,两者相辅相成,比如流式调度和批式调度会搭配不同的 Shuffle。对于实践流批一体的 Flink 而言,Shuffle 面临的问题比其他计算引擎更加复杂,因此 Flink 做了更多的优化,包括流计算的 Pipelined Shuffle、批计算的 Blocking Shuffle 以及结合二者特点的 Hybird Shuffle。

流计算的 Pipelined Shuffle

Flink 流计算的 Shuffle 相对简单,主要原因是所有 Task 同时在运行,上下游 Task 可以通过网络流式地传输中间结果,不需要落盘,这种 Shuffle 被称为 Pipelined Shuffle。

相信不少读者都接触过 Flink DAG 的边类型。当我们在 DAG 构建中使用 Partition (将数据分区)相关的操作,比如 DataStream 的 keyByrescale、SQL 中的 Group By,Flink 会引入一轮 Shuffle,体现在可视化的 DAG 上就是上下游划分到不同的两个节点,两者以一条边相连。边的类型有 HASHBROADCASTREBALANCE 等等(见下图)。

图1. Flink DAG 的边

尽管逻辑上的 Partition 有多种多样的算法,产生的边五花八门,但它们的区别仅在于产出的结果如何划分给不同的下游 Subtask,所以从底层的 Shuffle 看来要做的事情是一样的:将中间结果提供给不同的下游 Subtask 读取。结合下图用 Flink 的话语讲,Partition 算法决定如何划分出 Subpartition,而 Shuffle 决定如何将 Subpartition 传递给 InputGate。

图2. Flink Shuffle 实现

面对 Pipelined Shuffle 的需求,最容易想到的实现方式便是上游 Subtask 所在 TaskManager 直接通过网络推给下游 Subtask 的 TaskManager。事实上,Flink 也的确是这么做的。Flink 在 TaskManager 里内嵌了基于 Netty 的 Shuffle Service,计算得出的中间数据会存到 TaskManager 的缓存池中,由 Netty 去定时轮询发送给下游。

图3. 内置 Netty Shuffle

Pipelined Shuffle 实现上有很多值得研究的地方,其中最重要的是 Flink 1.5 版本引入的 Credit-Based 流控机制。简单来说,Credit-Based 流控实现了类似 TCP 滑动窗口的机制,让上游 Subtask 依据下游 Subtask 的空闲 buffer(Credit)来发送数据,避免多个 Subtask 共用的一条 TCP 链接因为其中一个 Subtask 被阻塞。感兴趣的读者推荐阅读《批流统一计算引擎的动力源泉—Shuffle机制的重构与优化》这篇博客[11]。

批计算的 Blocking Shuffle

批计算的上下游 Subtask 通常不会同时调度起来,所以上游产出数据首先需要落盘存储,等下游调度起来再去读取,这种方式被称为 Blocking Shuffle。自 Flink 开始定位为流批一体计算引擎后,社区便持续对 Flink 批计算的 Blocking Shuffle 进行改良。

首先是 Flink 1.9 将 Shuffle Service 与计算解耦,改为插件化的架构(见 FLIP-31[3])。在此之前,Shuffle Service 作为 TaskManager 职责之一,绑定使用 TaskManager 内置的 Netty Shuffle Service。Netty Shuffle Service 在 Pipelined Shuffle 的场景下会直接通过 TCP 流式发送数据,而在 Blocking Shuffle 的场景下则会先写本地文件,再等下游 Subtask 拉取。然而,后一种情况会导致问题是,上游已经结束的 Subtask 想要释放 TM 的资源,必须先等下游 Subtask 被调度起来并拉完数据,这会造成资源的浪费甚至死锁。更加重要的是,在某些批计算场景下(比如交互式查询),同一批中间数据可能会被消费多次,这是 TaskManager 兼任的 Shuffle Service 无法满足的。

熟悉 Spark 的读者可能会想起 Spark 的 ESS (External Shuffle Service) 和 RSS (Remote Shuffle Service)。前者支持 Spark Executor 本地部署 Shuffle Service,比如部署在 YARN NodeManager 里的 YARN Shuffle Service,而后者支持在远端部署 Shuffle Service,比如阿里的 Aliyun RSS[12]、腾讯刚贡献给 Apache 的 Uniffle(原名 Firestorm)[13]。Flink 参考Spark 的经验,在 FLIP-31 中同时考虑了 ESS 和 RSS 的需求,为后续迭代奠定了良好基础。

其次,Flink 1.12、1.13 引入并完善了 Blocking Shuffle 的 Sort-Merge 实现(见 FLIP-148[4])。Blocking Shuffle 有 Hash Shuffle 和 Sort-Merge Shuffle 两种常见策略。在此之前,Flink 只支持比较简单的 Hash Shuffle,而缺少性能更好更适合生产使用的 Sort-Merge Shuffle。

简单而言,Hash Shuffle 是将数据按照下游每个消费者一个文件的形式组织,当并行度高时会产生大量的文件,容易耗光操作系统的文件描述符,并产生大量随机 IO 对 HDD 磁盘不友好,此外每个文件需要一个独立 Buffer 占内存过多。

图4. Hash Shuffle

相比之下,Sort-Merge Shuffle 是将上游所有的结果写入同一个文件,文件内部再按照下游消费者的 ID 进行排序并维护索引,下游有读取数据请求时,则按照索引来读取大文件中的某一段。

图5. Sort Shuffle

参考 Spark,Spark 在 1.1 版本引入 Sort-Merge Shuffle,并在 1.2 版本用其替代 Hase Shuffle,成为默认的 Shuffle 策略。说句题外话,一方面 Spark 1.1 2014 年发布,而 Flink 1.12 2020 年发布,Flink 在批计算落后于 Spark 6 年,而另一方面,Spark 今年(2022)新宣布的流计算 ProjectLightspeed(Structured Streaming 升级版)[14]要做的特性基本上是 Flink 5 年前已经实现的,可谓有趣的对称。Flink 在批计算上落后于 Spark,正如同 Spark 在流计算上落后于 Flink。

批场景下流批一体的 Hybrid Shuffle

如上文所讲,流计算用 Pipelined Shuffle,批计算用 Blocking Shuffle,那么流批一体用什么 Shuffle 呢?大家很容易联系到本节要讨论的 Hybrid Shuffle,但遗憾的是这句话大概只对一半,因为目前的 Hybrid Shuffle 只针对批场景有效。

众所周知,Flink 遵循 “批是流的特殊案例” 的流批一体理念,因而 Flink 中的批计算是能以流计算的方式去跑的。然而,大多数情况下我们不会这么做,因为批场景有额外的特点能让我们进行优化,比如借助 Blocking Shuffle 可以解耦上下游,让它们无需同时运行,相当于用时间换空间,让作业资源门槛比 Pipelined Shuffle 更低。这点也体现在 Flink 的配置上: Flink 的批作业可以通过 execution.batch-shuffle-mode 指定 Shuffle 模式,默认为 Blocking 模式(其余模式还有 Pipelined 和 Hybird)。

Blocking Shuffle 带来的一个限制是排斥上下游同时运行,因为上游计算结束之前,下游是没办法访问到其不完整的结果数据的,即使调度下游 Subtask 也只会让其空跑。这点在批计算的角度看来很正常,但对于流批一体的 Flink 而言其实是有优化空间的。设想如果在执行上游作业时,集群有空余资源能跑下游作业,那么我们是不是可以尽量 fallback 回 Pipelined Shuffle,用空间换时间,让作业更快完成?

基于这个思路,Flink 社区在 FLIP-235 [8] 中提出了 Hybird Shuffle。Hybird Shuffle 支持以内存(Pipelined Shuffle 风格)或文件(Blocking Shuffle 风格)的方式存储上游产出的结果数据,原则是优先内存,内存满了后 spill 到文件(见下图)。无论是在内存或者文件中,所有数据在产出后即对下游可见,因此可以同时支持流式的消费或批式的消费。

图6. Hybird Shuffle 的数据生产和消费

以 WordCount 作业为例,假设一共有 2 个 Map 和 2 个 Reduce,但现在计算资源只有 3 个 slot,采用不同的 Shuffle 有以下效果:

  • Blocking Shuffle: 先调度 2 个 Map,再调度 2 个 Reduce,有 1 个 slot 被浪费。
  • Pipelined Shuffle: 要求 4 个 slot,因此作业无法运行。
  • Hybird Shuffle: 先调度 2 个 Map 和 1 个 Reduce,剩余一个 Reduce 等三者任意一个完成后再调度(见图 7)。

图7. Hybird Shuffle 下的 Wordcount

从图中可以看到,Map 产出的 Subpartition 1 被下游的 Reduce 1 流式读取,因此数据很可能是缓存在内存中;而 Subpartition 2 由于消费者 Reduce 2 还未运行,所以数据可能会在内存满之后 spill 到磁盘,等 Reduce 2 启动后再读取。

总结

Shuffle 是分布式计算中关键的一环,它与计算调度相辅相成,成为分布式计算分治的基础。对于流批一体的 Flink 而言,Shuffle 不仅要满足流计算调度、批计算调度,还要满足流批一体的调度。前两个场景的 Shuffle 经过多年的发展目前在业界已经比较成熟,而最后的流批一体 Shuffle 还有不少探索的空间。Flink 1.16 版本即将引入的 Hybird Shuffle 针对批场景的流批一体 Shuffle 进行优化,使得 Flink 可以在运行时根据资源情况灵活决定使用类似流计算的 Shuffle 还是批计算的 Shuffle,以提高空闲资源利用率。

参考

  1. Sort-Based Blocking Shuffle Implementation in Flink - Part One
  2. Sort-Based Blocking Shuffle Implementation in Flink - Part Two
  3. FLIP-31: Pluggable Shuffle Service
  4. FLIP-148: Introduce Sort-Based Blocking Shuffle to Flink
  5. FLIP-184: Refine ShuffleMaster lifecycle management for pluggable shuffle service framework
  6. FLIP-199: Change some default config values of blocking shuffle for better usability
  7. FLIP-209: Support to run multiple shuffle plugins in one session cluster
  8. FLIP-235: Hybrid Shuffle Mode Skip to end of metadata
  9. Flink 1.13,面向流批一体的运行时与 DataStream API 优化
  10. Advanced Flink Application Patterns Vol.2: Dynamic Updates of Application Logic
  11. 批流统一计算引擎的动力源泉—Shuffle机制的重构与优化
  12. Aliyun Remote Shuffle Service
  13. Firestorm
  14. Project Lightspeed: Faster and Simpler Stream Processing With Apache Spark
  15. Improvements in task scheduling for batch workloads in Apache Flink 1.12