Flink 1.8 Release 解读

Flink 1.8

在距离上个 feature 版本发布近四个月之后, 近日 Apache Flink 发布了 1.8 版本。该版本处理了 420 个 issue,其中新 feature 及改进主要集中在 State、Connector 和 Table API 三者上,并 fix 了一些在生产部署中较为常见的问题。下文将选取一些笔者认为重要的新特性、improvement 和 bugfix 进行解读,详细的改动请参照 release notes [1]

State

State Schema Evolution 新增对内置序列化器的支持

Flink 1.7 版本新增 State Schema Evolution 的特性,可以在支持 State POJO 随业务逻辑演变(见之前一篇博客[9]),但要求序列化器向后兼容,而当时 Flink 内置的 POJOSerializer 并不能做到这点,因此 1.7 版本只提供了 AvroSerializer 的 State Schema Evolution。1.8 版本修复了这个尴尬的情况,现在用户新增或者移除 POJO 的任意字段不会不破坏 Savepoint 兼容性。

State 支持基于 TTL 的持续增量清理

Flink 自 1.6 版本提供的 State TTL 现已被广泛使用,但是用户经常反馈的一个痛点是 Keyed State 在一个 key 过期之后,只有被再次访问后才会被发现,然后通过后台线程物理上清理掉。这会造成一个问题是有的 key 只是在一段时期内出现,如果依赖 State TTL 的话它可能永远不会被发现和清理,造成 State size 可能一直增长,因此在实践中用户常常还要通过设置各种 Timer 来手动清理不会再出现的 key 的 State。

Flink 1.8 实现了 State 的持续增量清理(continuous cleanup),即每次 full snapshot(即一般 checkpoint/savepoint,或者 incremental checkpoint 的 full snapshot 运行周期)的时候扫描全部的 entry 来检查和清理其中过期的部分,不过代价也是显而易见的更长的 checkpoint 时间。

Connector

FlinkKafkaConsumer 支持访问 Kafka 消息 header

Flink 1.8 新增了 KafkaDeserializationSchema 来给予用户对 Kafka 消息的完整访问权限,在这之前用户只可以访问反序列后的消息 body 而不能访问 header。这个 feature 使得用户可以在 Flink Application 里实现更多的定制化需求,比如我们正想做的基于 Kafka header timestamp 的类似于 Prevega 的 StreamCut 功能,以满足重流数据的场景。

将从 State 恢复的 Kafka topic partition 从消费列表过滤掉

在 1.8 版本之前,FlinkKafkaConsumer 从 savepoint/checkpoint 恢复时会将读取所有 Kafka topic partition offset,在 topic name 或者 pattern 出现变化时,比如用户想从 topic A 改为 topic B,恢复后的作业会读取同时两个 topic。如果用户不再需要读取 topic A,一个办法是将 Kafka source 的 uid 改变来避免 FlinkKafkaConsumer 恢复状态。

Flink 1.8 修复了这个问题,并默认会根据新的 topic name 或者 pattern 判断需要恢复哪些 offset。如果用户希望保留之前的行为,可以通过 FlinkKafkaConsumer#disableFilterRestoredPartitionsWithSubscribedTopics() 来禁用过滤。

StreamingFileSink 支持 SequenceFile

Flink 1.6 新增了 StreamingFileSink 来代替 BucketingSink 成为写文件或对象到下游系统的主流 connector,但是并不支持 Hadoop SequenceFile 格式(虽然支持了 Parquet 和 ORC),这个问题同样在 1.8 版本得到修复。

Table API

重构 Table API

Table API 的重构主要是为 Merge 阿里巴巴贡献的 Blink 做准备。Blink 在 SQL 层和 Table 层上做了比较多的改进,因此为了实现一个更为平滑的 Merge 进程,社区提出了 FLIP-32 [3] 以重构 Table API,主要目的降低 Table API 内部以及自身和其他模块(DataSet/DataStream 等)的耦合性。

Merge 的进程会分为以下几个步骤实现:

  1. 移除 Table API 对于 Scala 的依赖。由于历史原因,Table API 主要使用 Scala 来编写,其中主要问题是 Scala class 中的 private 或者 protected 变量对于 Java class 来说是可见的,因此会造成潜在的安全问题且对用户不友好。
  2. 优化 API 设计。目前 flink-table 模块有 7 个 TableEnvironment(3 个 base class + 4 个 Java/Scala 的 TableEnvironment)作为编程接口,不够高内聚。
  3. Table API 与 DataStream/DataSet API 解耦。让 Table API self-container,在编程接口上不需要依赖 DataStream/DataSet。
  4. 统一批处理和流处理。这是 Blink 团队一直向社区推的一个重要 feature,也是 Flink 的核心理念”批处理是流处理的特例”的体现。这个目标会将 DataSet 从直接基于 runtime 迁移到 DataStream 之上,因此毋需额外为 DataSet 维护一个执行层栈,详情可见[4]
  5. 处理 Table API 向后兼容性。以上的步骤涉及到 TableEnvironmnet 等面向用户的 API 改动,因此需要提前识别出来并在过渡版本标记为 @Deprecated,以便在后续版本移除。
  6. Merga Blink 的 SQL feature 以及架构上的改进。该步也会分成多个阶段进行,比如首先是 Blink planer(计划器),然后再逐个 Operator 改动。

Flink 1.8 目前来说在步骤 1 2 3 上都有初步的进度。

Table API 支持 Kafka CSV 格式数据的 Schema

在 1.8 版本以前,在 Table API 使用 CSV 格式数据时没有充分利用 CVS 的 header 来定义 Schema,导致用户需要在 Table 定义中和 CSV 格式中重复定义 Schema,比如:

1
2
3
4
5
6
7
8
9
10
11
.withFormat(
new Csv()
.field("field1", Types.STRING) // required: ordered format fields
.field("field2", Types.TIMESTAMP)
.fieldDelimiter(",") // optional: string delimiter "," by default
.lineDelimiter("\n") // optional: string delimiter "\n" by default
.quoteCharacter('"') // optional: single character for string values, empty by default
.commentPrefix('#') // optional: string to indicate comments, empty by default
.ignoreFirstLine() // optional: ignore the first line, by default it is not skipped
.ignoreParseErrors() // optional: skip records with parse error instead of failing by default
)

1.8 版本引入符合 RFC-4180 的新版本 CVS 格式,当 CSV header 和 Table 定义的 Schema 吻合时,可以自动生成数据 Schema,因此代码可以简化为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
.withFormat(
new Csv()
// required: define the schema either by using type information
.schema(Type.ROW(...))
// or use the table's schema
.deriveSchema()
.fieldDelimiter(';') // optional: field delimiter character (',' by default)
.lineDelimiter("\r\n") // optional: line delimiter ("\n" by default;
// otherwise "\r" or "\r\n" are allowed)
.quoteCharacter('\'') // optional: quote character for enclosing field values ('"' by default)
.allowComments() // optional: ignores comment lines that start with '#' (disabled by default);
// if enabled, make sure to also ignore parse errors to allow empty rows
.ignoreParseErrors() // optional: skip fields and rows with parse errors instead of failing;
// fields are set to null in case of errors
.arrayElementDelimiter("|") // optional: the array element delimiter string for separating
// array and row element values (";" by default)
.escapeCharacter('\\') // optional: escape character for escaping values (disabled by default)
.nullLiteral("n/a") // optional: null literal string that is interpreted as a
// null value (disabled by default)
)

SQL CEP 函数支持 UDF

Flink 1.7 支持了 SQL MATCH_RECOGNIZE 函数,用户可以在 SQL 中使用 CEP,但仍不能在 MATCH_RECOGNIZE 语句里使用 UDF,这会给使用场景带来比较大的局限。1.8 版本解决了这个问题。

Deployment

完全移除 legacy 部署模式

自 1.4 版本,Flink 社区一直在重构 Flink 的 runtime 架构,并在 1.5 版本后提供了 legacy 和 new 来种运行模式。在 1.8 版本,Flink 已经完全移除掉 legacy 的 runtime 架构。详情请见 [5]

修复 on YARN 模式请求过多 container 的问题

在 1.8 版本之前,Flink 存在的一个问题是当 JobManager 向 YARN RM 请求 container 成功,但是 TaskManager 无法利用分配到的 container 启动时(比如 Kerberos 权限错误或者无法读取用于恢复作业的 Savepoint),JobManager 没有移除相应 container request。这样的话 YARN RM 一直认为 Flink 的 AM 持有这些 container,导致 Flink 占用的 container 数会随着 JobManager 不断重试而增长,最终可能吃完整个队列。Flink 1.8 修复了这个问题。

资源清理问题

Flink on YARN 模式通常依赖于 Zookeeper 和 HDFS 来做元数据的持久化,但是在这些资源的管理上并不是很优雅,作业运行过后偶尔会留下一些痕迹需要手动异步清理,这在大规模部署的情况下可能会造成意想不到的问题。

Flink 1.8 修复了部分这样的问题,其中包括修复 Application 提交失败后 blob server 未清理的问题[6]和修复作业结束后 ZK 元数据未清理的问题[7],但仍有 checkpoint 目录未清理的问题[8]

Build

默认不再提供包含 Hadoop 的发行包

在之前版本为方便用户部署,Flink 在发行包中默认包括了 Hadoop,但这导致发行包十分臃肿,并且在使用率上可能并不太高,造成资源的浪费。自 Flink 1.8 开始,Apache Flink 默认不再提供包含 Hadoop 的发行包。

在 Flink 发展的初期,为了方便用户从 Storm 迁移过来,Flink 提供了 flink-storm 的兼容包来运行用 Storm 编写的作业。但时过境迁,flink-storm 无法跟上 Flink 本身的迭代,现在 flink-storm 只能利用 Flink 的一些低级功能,而且维护成本也逐渐增加,因此社区在调查之后决定废弃掉 flink-storm 模块。

在之前的发行版,Flink lib 默认包含 flink-dist、flink-shaded-hadoop2-uber 和 flink-python 三个 Flink 模块,然而 flink-python 的使用率似乎并不高,因此后续会将其移到 opt 下。加上移除 Hadoop shaded 包,之后 lib 下应该只有 flink-dist 和日志相关的 lib。

参考文献

  1. Apache Flink 1.8.0 Release Announcement
  2. Add KafkaDeserializationSchema that directly uses ConsumerRecord
  3. FLIP-32: Restructure flink-table for future contributions
  4. Unified Core API for Streaming and Batch
  5. FLIP-6 - Flink Deployment and Process Model - Standalone, Yarn, Mesos, Kubernetes, etc.
  6. FLINK-10848: Flink’s Yarn ResourceManager can allocate too many excess containers
  7. FLINK-11383: Dispatcher does not clean up blobs of failed submissions
  8. FLINK-11789: Flink HA didn’t remove ZK metadata
  9. 什么是 Flink State Evolution