Flink 流批一体中的数据边界

众所周知,流场景和批场景最为根本的区别在于 Data Boundness(数据集有界性)。Data Boundness 将数据分为 Bounded 和 Un-Bounded。在业界过去多年的实践中,两者分别绑定对应领域的存储系统和计算引擎,然而在流批一体的趋势下,领域的边界在逐渐弱化。例如,消息队列通常用作流场景,但 Pravega 的 StreamCut 支持将指定队列中某一段消息作为批处理的输入[1]。在混合使用流批的场景下,不少原本大家习以为常的设定都需要重新去审视,其中的一项便是数据集内部的边界。

存储边界与计算边界

数据边界不仅包括数据集整体的逻辑边界,也包括数据集内部的存储单元逻辑边界,比如 HDFS 等文件系统的文件及底层的 Block、Kafka 等消息队列的 Partition 等等。数据边界在批处理中扮演着十分关键的角色,比如作为分治基础,比如标识计算的结束。

在批计算中,整个 Job 会被数据边界划分为多个小 Task,每个小 Task 都可以视为一个事务,计算是由数据的边界驱动的。如果将事务看成计算的逻辑单元,那么一个计算的逻辑单元的数据输入就对应一至多个存储的逻辑单元,因此我们可以说计算和存储是对齐的。例如,在 MapReduce 中,一个 Map 的输入对应一个 Split,而一个 Split 由一至多个 HDFS Block 组成,但不会出现一个 Map 对应 1.5 个 Block 的情况。

图1. 批计算的对齐边界

在流计算中,虽然计算是连续不断的,但出于容错等原因,仍然会将计算划分为多个事务处理。以主流实时计算引擎 Apache Flink 来说,Flink 通常会定时触发两阶段提交(2PC)事务,也就是常说的 Checkpoint。Checkpoint 会向数据流注入 Checkpoint Barrier,作为每个 Checkpoint 对应数据的边界。以 Checkpoint Barrier 划分的数据单元和数据源本身的逻辑存储单元并无关系,因此两者的边界通常不会重合,我们可以说它们是非对齐的。例如 Flink 读取 Kafka 数据,并不需要感知到 Partition 底层的 Segment,而 Kafka 也没有将这样的数据边界暴露给用户。

图2. 流计算的非对齐边界

在流批一体场景下,引擎常要读写有边界的数据集。取决于不同存储系统,不对齐的边界可能导致流计算的容错、可维护性都大打折扣。主要问题有数据血缘、结束条件和可重复读,下文逐一分析。

数据血缘

数据血缘指的是输入数据到输出数据之间依赖关系。如上文所说,批计算的输入数据边界与计算边界是对齐的,而计算边界很自然地又体现在输出数据的边界上。这点很容易理解,因为一个计算事务结束必然会 commit 数据,而这些数据会以文件、对象为单元独立存储,不会跟其他事务的数据混在一起。以文件、对象为单位,我们很容易追踪到数据上下游的血缘关系。

图3. 流计算的血缘关系

清晰的血缘关系能大大提高数据的可维护性。如果出现脏数据或者程序 bug 等异常,需要回滚计算时,我们可以方便地识别出异常的数据,删掉重新计算或者写 ad-hoc 脚本修复数据。比如在上图中的输入文件 1 出现问题,那么我们只需要处理事务 1 的输出数据即可,影响范围是十分明确的。此外,批计算的输入输出通常是以时间索引的(比如 Hive 中常用的天或小时分区),因此我们还可以依据时间来回滚事务。

然而,在流式计算中,即使输入数据是存在边界的,这样的边界信息并不会体现在计算上,计算仍是连续不断的,辅以周期性的事务。在触发 Checkpoint 快照的时候,Flink 会记录当前正在读取和正在写的文件的 Offset,作为对应事务的数据边界。

图4. 流计算的血缘关系

这意味着 Flink 计算时是无视存储逻辑单元边界的,边界信息被限制在与存储系统打交道的 Connector 中,这样的设计更符合单一职责原则,更加优雅,但也导致了存储边界信息以及血缘关系的丢失。当出需要回滚事务时,我们很难识别出影响范围,只能基于时间来过滤数据而不能直接回滚对应事务。

比如若发现上图中的文件 1 某条数据不准确,我们很难识别出需要回滚事务 1 还是事务 2,或者两者都需要,因此只能选择比较安全的做法,回滚全部事务。更加严重的问题在于,如果异常作业除了 Source/Sink 有还别的有状态的算子,那么我们无法直接丢弃原先的 Checkpoint 重新开始,只能从有限的几个可选 Checkpoint 中选一个来恢复,而这个 Checkpoint 记录的输入输出文件及其 Offset 又不一定符合当前最新状态,可能造成作业恢复状态后提交事务失败。

解决数据血缘丢失的关键在于,Flink Checkpoint 记录的数据存储 Offset 应当同步持久化到外部,最好可以有存储系统的原生支持。如此一来,即使事务数据即使没有对齐存储单元,要追踪和操作事务涉及的数据也比较方便。举个大家熟悉的例子就是 Kafka 的 Consumer Group Offset。不过 Kafka 仍有个问题在于 Consumer Group Offset 没有版本控制,所以只能记录最新的一组 Offset。在这点上,Pravega 允许多组 StreamCut 则更加友好。

结束条件

相对于数据血缘主要是业务应用层面问题,结束条件则更多是计算引擎层面的问题,而且是流批一体最大的障碍之一。幸运的是这些问题在最新的 Flink 1.14 都得到了基本解决。

在批计算中,计算的事务和输入数据的边界是对齐的,因此输入数据结束则代表事务可提交;而在流计算中,计算的事务是由周期性 Checkpoint 而不是输入数据边界驱动的,因此事务可提交的标识是输入数据结束加上 Checkpoint 快照成功。这点在 Flink 1.14 中有所体现,现在 Flink 可以在 Bounded Source 结束以后会马上触发一个 Checkpoint,来提交最后一个事务的数据,不过为保持与之前版本的行为一致,这个功能暂时在默认情况下是关闭的。

另外一个跟结束条件相关的问题是,在混合使用 Bounded 数据集和 Un-Bounded 数据集的情况下,会遇到 Bounded 数据集已经输入完毕(因此 Task 为 Finished)但整体作业还在运行的情况,这时 Flink 需要继续能进行正确 Checkpoint。这个问题听起来不算难,但其实有非常复杂的实现细节需要考虑,感兴趣的同学可以阅读 FLIP-147 [4]。本文只列举其中最为核心的三个备选解决方案,其中最后一个为被采纳的最终方案:

  • 让已经结束的 Source Task 继续保持在 Running 状态,不要转为 Finished 状态。这个方案比较投机取巧,但有点滥用了 Task 状态,带来的后果就是不能依靠 Source 结束产生的 EndOfPartition 事件来代表输出结束,而是要另外引入新的事件。
  • 让 Task 转为 Finished,同时记录 Finished Task 的 State 到 Checkpoint。这个行为听起来很自然,但实现起来有诸多问题,比如 Task 转为 Finished 前的最后一次 Checkpoint 包含着这个 Task 最终的 State,而作业后续的每次 Checkpoint 都会引用它,导致 Checkpoint 难以清理。这是因为 Task 变为 Finished 后状态不再可以访问,所以不能从当前的算子从获取。
  • 让 Task 转为 Finished,但不记录 Finished Task的 State 到 Checkpoint。这个方案相当于将 Finished Task 的 State 丢弃掉,因此在这之前的一个 Checkpoint 需要触发相关算子将中间结果全部 flush,效果类似 stop-with-savepoint --drain 命令。

数据源可重复读

数据源可重复读是 Flink Checkpoint 机制对数据源的要求之一,意味着 Flink 作业在进行主动或被动的重启之后,仍然可以依据 Checkpoint 记录的状态重新读取跟之前相同的数据。数据源可重复读是数据准确性基本保证。跟数据库领域的可重复读类似,数据源可重复读要求数据源在被读取期间不被同时发生的更新修改操作所影响。

从严格意义来说,数据源可重复与本文主题数据边界并没有必然关系。但 Bounded 存储系统通常基于文件、对象等可更新的抽象概念,而 Un-Bounded 存储系统通常基于消息队列这样不可更新的抽象概念,所以比起 Un-Bounded 存储,Bounded 存储需要额外考虑可重复读的问题。

如果一个文件、对象被流计算作业所读,可以认为它涉入了一个生命周期等同于作业能回滚的最大时长的长事务。由于 Bounded 存储系统通常没有类似数据库 MVCC 的多版本控制,因此在这个长事务期间,文件必须保持不变,以确保若作业出现事务回滚(也就是作业恢复至之前的某个 Checkpoint)时,读取到的数据还是跟以前一致的。这对于主要用作数据仓库或者数据归档场景的 HDFS、S3 来说问题并不是很大,因为数据写入之后常常不会再更新,但也有一些例外的情况,比如要对数据进行压缩合并或者作为冷数据降级到更便宜的存储系统上。所以在实际生产中,一般还是明确需要限制流计算可以回滚的最大时长,在超过这个阈值之后解除数据不可更新的限制。

另外一个更加有趣的场景发生在流计算直接读取数据库时(虽然生产环境很少这么做)。数据库的更新操作要比大数据存储频繁得多,而且优先级更高,没有办法要求数据库锁表不更新,只能依靠 MVCC 来保证写不影响读。然而,MVCC 的作用范围只有单个数据库事务,对齐到 Flink 端就是单个 Checkpoint,而 Flink 要求的可重复读是横跨多个 Checkpoint 的。这个问题是笔者在开发一个第三方的 Flink MongoDB Connector [5]时遇到的,以直接读取的方式实现的 Source 很难配合 Flink Checkpoint 机制,因此还是应该以 CDC 方式来读取数据库。

总结

不难看出,Flink 虽然已经实现流批一体引擎及其跟各种存储系统的接口,但在批场景下的结合传统 Bounded 存储系统的使用体验距离传统批计算引擎还有一定的距离或差异。当然,这也是 Iceberg、Hudi 等数据湖在近年来异军突起的原因。在批计算场景下,这些数据湖屏蔽底层文件、并发写和多版本控制的特性可以很好地弥补传统 Bounded 存储系统与 Flink 的间隙,同时也支持接近数据库的 ACID,满足 Serving 需求。

参考

  1. Pravega: StreamCut with BatchClient
  2. Flink 执行引擎:流批一体的融合之路
  3. Apache Flink 1.14.0 Release Announcement
  4. FLIP-147: Support Checkpoints After Tasks Finished
  5. MongoFlink