什么是 Flink State Evolution?

State Evolution 是 Apache Flink (下简称 Flink)1.7 版本引入的新特性,目的是为用户提供迭代或修改 State 的方法,以适应长期运行的作业的版本迭代需求,比如迁移 State 到不同的序列化框架,或者对 State 的数据结构进行改变,甚至直接对 State 的内容进行修改。该特性对于企业级应用来说有着重大的意义。

作为 Statefull 计算框架,Flink 作业的状态通常用 State API 来保存而不是存在的数据库等外部存储,这样在获得更好的一致性和数据本地性的同时,也牺牲了使用数据库的灵活性和可访问性。State 以序列化的形式(Savepoint/Checkpoint)存在,对于很多不熟悉 Flink 序列化的用户来说相当于黑盒子,难以验证其中的数据正确性,更不用说修改其数据结构或者序列化格式,而 State Evolution 则是 Flink 在 State 管理上的一次很好的探索。

根据 Gordon Tai 在 2018 年 Flink Forward Berlin 的分享[1],State Evolution 主要分为 State Schema Migration、Savepoint Management 和 Upgradability Dry-Runs 三个部分,其中 State Schema Migration 已经在 1.7 版本支持,其余的官方支持尚在开发中。

State Schema Migration

State Schema Migration 的目的在于使得 State 可以随作业程序更新而更新,其工作原理和 Flink 流处理作业其他类型的更新一样,都是利用 Savepoint 来迁移作业状态。作业运行时,随着计算的进行 Flink 会持续地读写本地的 State,然后在 Savepoint 的时候将本地的 State 上传至分布式的文件系统,新版本的作业启动时只需要读取 Savepoint 即可恢复原先的作业状态。

旧版本作业 cancel with savepoint

新版本作业 start with savepoint

不过由于 State 涉及到不同的序列化框架,比起像改变 JobGraph 或者作业并行度等其他类型的更新,它的更新方法更加复杂一些,主要问题在于在从 Savepoint 恢复时我们如何读取之前的版本的序列化对象。这需要将 State 更新分为 State Schema 更新和 State 序列化框架的更新这两种情况考虑。

State Schema 的更新主要是和业务逻辑的变更相关,比如新增一个字段或者移除一个字段,这种版本升级主要需要考虑序列化框架的兼容性,比如 Java 默认的对象序列化框架,可以兼容新增字段但不能兼容移除或重命名字段。如果变更是可兼容的,那么无需额外操作即可迁移,否则可能需要考虑更换序列号框架。然而 Flink 默认情况下会根据 State 的 POJO 类型来生成 Adhoc 的(反)序列化器(PojoSerializer),任何 POJO 的变更都会导致不同的序列化器,因此对于 Flink 来说 State Schema 的更新通常等同于 State 序列化框架的更新。

更新 State 序列化框架的实现里有很多有意思的细节,其中一个是不同的 StateBackend 对于序列化器的使用方式不同。基于内存的 StateBackend (Heap Based StateBackend),比如 FsStateBackend,属于 “Lazy serialization, eager deserialzition”,意思是仅当 Savepoint 时才会将内存的 State 序列化,而读取 Savepoint 时会全部反序列化到内存。

Heap-based StateBackend snapshot

举个例子,假设我们从 v1 序列化器迁移至 v2 序列化器,在 Savepoint 的时候 Heap Based StateBackend 会将所有 key 的 State 全部用 v1 序列化器写出,在作业重启恢复时序列化器已经更新为 v2,但是只要我们还可以得到 v1 的(反)序列化器就可以顺利迁移,因此一个简单的方法是每次 Savepoint 时将当前使用的序列化器也一并使用 Java 序列化来写入 Savepoint,然后在读取 Savepoint 时我们首先提取(反)序列化器,再通过这个(反)序列化器来读取 State。

Heap-based StateBackend restore

然而在使用 Off Heap StateBackend 的时候,情况则变得更加复杂。因为 Off Heap StateBackend,比如 RocksDBStateBackend,是 “Eager serialization, lazy deserialization” 的,即每当 State 有更新时就会将对应 key 的 State 写到 Savepoint,而读取 Savepoint 时仅会读取被访问到的 key 的 State。

Off-heap StateBackend snapshot

这样造成的一个问题就是在一次作业运行中,可能同时存在使用不同版本的序列化器的 State。比如在前一次运行的 Savepoint,Flink 使用 v1 序列化器写了 5 个 key 的 State,而在第二次运行时我们只更新了其中 2 个 key,那么被更新的 key 是使用 v2 序列化器,而没有被更新的 key 仍然使用 v1 序列化器。因此在使用 Off Heap StateBackend 的情况下,我们不仅需要上一次运行使用的序列化器,还需要之前所有运行使用的序列化器。

Off-heap StateBackend restore

这种情况下保存每次运行时使用的序列化器显然是不现实的,因此 Flink 提供了两种办法来更新序列化器: 一是序列化器保证向后兼容,二是 State Migration Process。对于序列化器来说,向后兼容性是十分重要的特性,大部分流行的序列化器,比如 ProtoBuf、Avro、Thrift,都提供了向后兼容的能力。如果用户使用定制化的序列化器,Flink 也提供了编程 API 的支持,让用户可以根据序列化器的版本号来维护向后兼容性。具体来说,每次 Savepoint snapshot 的时候,Flink 会将序列化器的配置信息也存储下来,在作业读取 Savepoint 时用户定制的序列化器可以根据相关信息来配置自己,再反序列化 State。

serializer backward compatabilitye

但目前来说 Flink 默认的 PojoSerializer 并不能提供这样的向后兼容性,这就需要下面的 State Migration Process。State Migration Process 即在恢复作业状态时扫描所有 key 的 State 并强制用上次运行的序列化器反序列化,从而保证使用 Off Heap StateBackend 的情况下 State 不会存在多个序列化版本。但是在 State 比较大的情况下,在作业启动时进行 State Migration Process 可能会带来很长的恢复时间,因此该方法还是需要结合实际慎重使用。

Savepoint Management

不同于 State Schema Migration 在作业恢复时执行,Savepoint Management 的目的是提供离线读写 State 的能力。这种对 State 的管理能力对于生产级应用是至关重要的,因为程序 bug 是不能完全避免的,而当 bug 导致作业 State 错误时我们需要有可以修复这种错误的能力。在之前我们大多数情况下的做法是修复程序 bug 并重流数据来重新计算正确的 State,但是这种做法成本太高,甚至有时是不可行的,比如 Kafka 的消息已经被清理或者作业外部依赖的状态已经改变。而通过 Savepoint Management,只要知道如何根据错误的 State 计算得出正确的 State,我们就可以离线地修复 State 的问题并应用到线上。此外,我们甚至可以用外部的数据来计算出一个之前不存在的 State,然后用于作业首次运行时的算子状态的初始化。

截止至 Flink 1.7 版本,官方还没有这方面的支持,不过社区有个可用的项目 Bravo [2]可以一定程度上满足这方面的需求,在和 Flink 社区达成共识之后 Bravo 也会被合并到 Flink 项目里。Bravo 是由荷兰的 King 银行,Flink 的重度用户之一,开发的一个用于读写 Savepoint 的工具。其原理是利用 Flink 序列化器提供 OperatorStateReader 和 OperatorStateWriter 两个主要 API,用户可以利用它们来将 Savepoint 读入转为 Flink DataSet 或者将 DataSet 写出为 Savepoint。

State 的读取是以算子为单位的,我们需要指定 uid、State 类型、POJO 类型来定位具体的 State。一个简单的使用 Demo 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// First we start by taking a savepoint/checkpoint of our running job...
// Now it's time to load the metadata
Savepoint savepoint = StateMetadataUtils.loadSavepoint(savepointPath);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// We create a KeyedStateReader for accessing the state of the operator with the UID "CountPerKey"
OperatorStateReader reader = new OperatorStateReader(env, savepoint, "CountPerKey");
// The reader now has access to all keyed states of the "CountPerKey" operator
// We are going to read one specific value state named "Count"
// The DataSet contains the key-value tuples from our state
DataSet<Tuple2<Integer, Integer>> countState = reader.readKeyedStates(
KeyedStateReader.forValueStateKVPairs("Count", new TypeHint<Tuple2<Integer, Integer>>() {}));

Bravo 是基于 Flink 1.5 版本开发的,目前已经支持 1.6 版本引入的 State TTL,不过用户还是比较少,建议大家去尝试下(Bravo 维护者 Gyula Fora 非常 nice,提 issue 很快可以得到解决)。

Upgradability Dry-Runs

State Evolution 还提供的一项很方便的功能是 Upgradability Dry-Runs,这项功能用于离线检查作业的版本兼容性,以帮助用户提前发现兼容性问题。常见的兼容性问题主要有:

  • 作业拓扑的变更。这可能会导致算子 uid 的匹配失败,进而导致作业恢复后状态的不完整。目前来说 Flink 默认会在作业提交时进行安全性检查,用户可以通过 -n,--allowNonRestoredState 参数来允许不完整的状态恢复,但是这种检查应该测试阶段而不是在部署阶段完成。
  • State Schema 的变更。如上文所说,这会导致新版本的作业恢复失败并需要马上回滚。

目前来说社区还没有这方面的讨论和开发计划,但 Gordon 分享了一些可行的办法。离线验证的关键点在于从用户作业程序提取出需要的信息,并对比找出其中不兼容的点。对于作业拓扑图来说,我们可以计算出两者的 StreamGraph 并比较它们的 uid。而对于 State Schema 来说则可能绕一点,因为按照 Flink 目前的设计注册 State 的过程是只对 StateBackend 可见,因此可能需要入侵 StateBackend 来支持兼容性检查。而另外一个办法则是引入新的 annotation 并在声明 State 的时候标记这个 State,这样我们就可以在解析用户代码时得出 State 的信息,并用于检测兼容性。这种功能暂时被命名为 Eager State Declaration,具体的一个 Demo 如下。

eager-state-declaration

总结

综上所述,State Evolution 是关注于 State 如何管理的一系列特性,用户可以利用这些特性来获得在 State 版本演进或者类似传统数据库的 CRUD 上的更强的控制能力和更大操作空间,以更好地维护长期运行并不断迭代的作业。另外,从 1.7 版本开始我们可以看到 Flink 社区越来越重视版本兼容和跨版本迁移,包括 State Evolution 和 REST 等 API 的版本化,这也是一个项目走向成熟和准备好企业级生产的标志之一。

参考资料

1.Upgrading Apache Flink Applications: State of the Union
2.Bravo: Utilities for processing Flink checkpoints/savepoints
3.Apache Flink 官方文档: State Schema Evolution