深入理解流计算中的 Watermark

近年来流计算技术发展迅猛,甚至有后来居上一统原本批处理主导的分布式计算之势,其中 Watermark 机制作为流计算结果准确性和延迟的桥梁扮演着不可或缺的角色。然而由于缺乏高质量的学习资源加上计算 Watermark 确实不是一件容易的事情,不少有着批处理计算背景的用户在流计算作业的开发中可能并不理解 Watermark 的重要意义,从而多走了很多弯路。为此,本文将基于笔者的学习积累和开发经验,谈谈个人对 Watermark 的理解,希望起到抛砖引玉的作用。

本文将首先说明 Watermark 提出的背景,然后详细解析 Watermark 的原理,最后结合工业案例说明 Watermark 在实践中如何被应用。

Watermark 背景

自 Google 的三篇论文和 Hadoop 出现后,工业界的分布式计算技术进入了百花齐放的时期,然而相比于离线批处理计算的蓬勃发展,作为后来者的流计算却有点停滞不前。流计算和批处理在对于每条记录的单独处理上基本一致,不同之处在于聚合类的计算。批处理计算结果的输出依赖于输入数据集合的结束,而流计算的输入数据集通常是无边界的,不可能等待输入结束再输出结果。针对这个问题流处理引入了窗口的特性,简单来说就是将无限的数据流按照时间范围切分为一个个有限的数据集,所以我们依然能够沿用批处理的计算模型。来到这时,业界在流计算和批处理的关系上出现了两种截然不同的观点,一个观点认为流计算是批处理的特例,另一个观点则认为批处理是流处理的特例。

实时计算与离线计算的分离

流计算是批处理的特例的观点在早期占据了主导的地位,其中最为典型的便是以 Spark Streaming 为代表的 micro-batching 类型的实时处理框架的流行。Micro-batching 的主要思想是以分钟甚至秒级别的执行间隔来将批处理应用到数据流上,但不久后人们意识到这种计算模型依然不能完全满足低延迟高准确性的要求,主要问题除了批处理调度导致的延迟外,还有一点是窗口变小后,数据收集延迟对结果准确性的影响大大增强了。比如说计算一个游戏服务器每 5 min 的新登录玩家数,但因为网络或者客户端设备故障等因素,12:00 的玩家登录日志可能在 12:10 才被收集到服务器,如果实时计算在 12:05:00 就输出结果,必然会漏掉这条迟到的数据。在离线计算中这样的问题并不明显,因为一个批次的时间跨度较大且对延迟要求不高,因此计算的时间可以设置一个安全的延迟,比如 1 个小时,确保数据都已经收集完成后再开始计算,即使有大量数据是在 1 个小时后才收集到,只需要重算结果即可。然而这样的实践经验并不能应用于实时计算,一是引入额外的安全延迟对于很多对延迟敏感的场景不可接受,二是实时计算的重算要比批处理重算的成本高出很多。因此业界普遍是采用 结合离线和实时处理的 Lambda 架构来应对这个问题,其主要思想是同时运行实时和离线两个数据处理管道,实时管道提供最近小时内的临时结算结果,而离线管道提供小时以前的计算结果并覆盖掉对应时间段的实时计算结果,查询时将两者的结果再进行合并产生最终的结果[1]

实时计算与离线计算的融合

实时计算与离线计算的分离说明了用批处理模型不足以表达流计算,于是人们开始探索批处理是流计算特例的模型。2015 年 Google 发表名为 The Dataflow Model 的论文,这篇论文较为详细地阐述了实时流计算和离线批计算的统一模型(出于篇幅原因不展开讲,详情请见[2]),而该模型基于批处理是流计算特例的观点。The Dataflow Model 将计算分为四个要素,即 what、where、when 和 how:

  • what 表示要计算什么结果,即对数据的一系列转换操作;
  • where 表示结果计算上下文,即窗口如何定义;
  • when 表示何时输出和物化计算结果;
  • how 表示如何清理已经输出的结果。

在 what 和 where 两点上流计算和批处理是相似的,而主要不同之处在于 when 和 how 两点,这两点在批处理里基本不会涉及,但在流计算里却影响着计算结果的准确性,实际上它们分别对应了上文所说的批处理经验不能应用于实时计算的两个问题。本文主要讨论的 watermark 就是属于 when 要素里的一种技术,因而下文将主要关注 when。

在批处理中 when 是输入数据集结束的时候,how 是以覆盖的形式来清理之前的输出结果,处理模式都是固定的,因此用户并不需要考虑。举个例子,假设要计算一个游戏每天的玩家充值金额,用离线计算时我们会考虑如何将充值金额从日志中提取出来并累加到一起,此为 what;再考虑批处理的运行时间,比如每天 00:30,所以每次计算是处理 24 小时采集到的数据,此为 where;而批处理的 when 是和 where 绑定的,即 00:30 计算开始,结束后马上输出结果;至于 how,不同批次的批处理运行的结果是互不相干的,同一批次的运行结果会覆盖前一次运行的结果。

然而如果游戏策划急于知道某个活动是否有带动玩家充值,希望看到每分钟更新的实时数据,那么上述题目改为用实时流计算去实现,此时要考虑的东西会复杂一点。首先,我们可以依旧可以复用批处理的 what 和 where,即定义一个时间范围为 24 小时的窗口,计算逻辑和之前一样;在 when 方面,为了可以实时地得到最新的计算结果,我们需要定义每分钟输出一次最新的计算结果,直到达到 24 小时后输出最终结果;而在 how 方面,我们每次的输出结果只需要覆盖之前的结果即可。然而 when 的问题并没有这么简单。还记得我们之前说过数据采集延迟吗?可能一个用户充值的时间在 16:00,但中间采集的延迟可能有 1 min,导致到达服务器却是 16:01 分,如果基于充值记录被处理的时间(即 processing time)来进行窗口划分,用户充值记录可能会被计入错误的窗口,所以我们应该以用户充值这个时间(即 event time)发生的时间为准。这里的难点在于我们计算时并不能判断所有 event time 窗口内的数据被收集完,因为数据的延迟是不可预知的,这被称为窗口完整性问题。针对窗口完整性问题,The Dataflow Model 提出了 Watermark 的解决方案。

Watermark 原理解析

Watermark 并没有很正式的官方定义,最接近定义的是 Streaming 102[3] 里的一段描述。

A watermark is a notion of input completeness with respect to event times. A watermark with a value of time X makes the statement: “all input data with event times less than X have been observed.” As such, watermarks act as a metric of progress when observing an unbounded data source with no known end.

简单来说 Watermark 是一个时间戳,表示已经收集完毕的数据的最大 event time,换句话说 event time 小于 Watermark 的数据不应该再出现,基于这个前提我们才有可能将 event time 窗口视为完整并输出结果。Watermark 设计的初衷是处理 event time 和 processing time 之间的延迟问题,三者的关系可以用下图展示:

图 1. Event Time/Processing Time/Watermark 三者关系

理想的情况下数据没有延迟,因此 processing time 是等于 event time 的,理想的 Watermark 应该是斜率为 45 度的直线。然而在真实环境下,processing time 和 event time 之间总有不确定的延迟,表现出来的 Watermark 会类似图 1 中的红色的曲线。其中红色曲线与理想 Watermark 的纵坐标差值称为 processing-time lag,表示在真实世界中的数据延迟,而横坐标的差值表示 event-time skew,表示该延迟带来的 event-time 落后量。

Watermark 通常是基于已经观察到的数据的 event time 来判断(当然也可以引入 processing time 或者其他外部参数),具体需要用户根据数据流的 event time 特征来决定,比如最简单的算法就是取目前为止观察到的最大 event time。在数据流真实 event time 曲线是单调非减的情况下,比如 event time 是 Kafka producer timestamp 时,我们是可以计算出完美符合实际的 Watermark 的,然而绝大多数情况下数据流的 event time 都是乱序的,因此计算完美的 Watermark 是不现实的(实际上也是没有必要的),通常我们会以启发性的 Watermark 算法来代替。

启发性的 Watermark 算法目的在于在计算结果的延迟和准确性之间找到平衡点。如果采用激进的 Watermark 算法,那么 Watermark 会快于真实的 event time,导致在窗口数据还不完整的情况下过早输地出计算结果,影响数据的准确性;如果采用保守的 Watermark 算法,那么 Watermark 会落后于真实的 event time,导致窗口数据收集完整后不能及时输出计算结果,造成数据的延迟。实际上上文所说的 Watermark 取观察到的最大 event time 和批处理使用的设置一个足够大的安全延迟的办法分别就属于 Watermark 算法的两个极端。很多情况下用户偏向于牺牲一定的延时来换取准确性,不过在像金融行业的欺诈检测场景中,低延迟是首要的,否则准确性再高也没有意义。针对这种情况 The Dataflow Model 提供了 allow lateness 的机制,工作的原理是用户可以设置一个时间阈值,如果在计算结果输出后的这个阈值时间内发现迟到的数据,计算结果会被重新计算和输出,但如果超出这个阈值的迟到数据就会被丢弃。

这时你们可以看到要开发一个高质量的实时作业是多么不易了,这也是很多实时应用开发者最为头疼的地方,或许以后利用机器学习去计算 Watermark 是个不错的主意(然后我们的工作就可以愉快地从调 Watermark 算法参数变为调机器学习模型参数了 :) )。

Watermark 实践

接下来我们将结合工业生产的案例来说明实战中 Watermark 是如何影响流计算的。Watermark 在不同计算引擎的实现并不相同,本文将以笔者使用最多的 Apache Flink (下文简称 Flink)作为例子来说明。

对于游戏行业来说,游戏的日活跃玩家数是个很常见的指标,游戏策划或者运营通常可以根据日活跃玩家数的变动来实时地监控某个活动是否收到玩家欢迎的程度,但是游戏可能有海外服务器,数据收集的延迟可能差别较大,造成数据流 event time 乱序比较严重,在这种情况下设计 Watermark 算法是个比较大的挑战。

假设我们有 A、B、C 共 3 台服务器,其中 A、B 为国内服务器,延迟较低且稳定,而 C 为海外服务器,延迟较高且不稳定,而我们需要计算每分钟内的登录玩家数。

图 2. 数据流延迟

我们现在面临两种可能带来 event time 乱序的因素:一是不同服务器间的延迟不同,比如可能先收到服务器 A 在 t2 的数据,再收到服务 C 在 t1 的数据;二是同一服务器的不同数据的延迟不同,比如可能先收到服务器 C t2 的数据再收到 t1 的数据。针对第二种因素,我们可以对不同服务器的数据分别计算 Watermark,再取其中的最小值作为 Watermark,而针对第一种因素,我们则需要设计出针对单个服务器数据流的合理 Watermark 算法。

在算法实现上,Flink 提供两种触发 Watermark 更新的方法,即在收到特殊的消息时触发或者定时触发,我们这里将选用定时触发的方法。因为窗口是一分钟比较小,我们这里将定时的间隔设为 5 秒,也就是说 Watermark 大约落后真实 Watermark 5 秒,然后这 5 秒内 Watermark 是不会提升的,所以可以容忍局部的 processing lag。

我们试着取目前为止观察到的最大时间戳作为 Watermark,那么 Watermark 的效果如下(为了在消费端更加直观,我们将坐标系调转,现在 x 轴表示 processing time)。

图 3. Watermark 算法实现

其中 t0-t3 分别表示 Watermark 提升的时间点,黄虚线表示在一个 Watermark 周期内的最大 event time,红线表示 Watermark。可以看到在 t0-t1 的 Watermark 周期内出现了轻微的 event time 乱序,但是并不影响计算的准确性。接下来在 t1-t2 和 t2-t3 两个周期间也发生了相似的乱序,但是这个乱序并不在同一个 Watermark 周期,因此导致正常延迟的数据被误认为是迟到数据。解决方法是引入一定可容忍的 event time skew,比如说最简单的设置一个 skew 阈值,即每次计算 Watermark 的结果都减去这个值。根据数据流延迟的不同,我们还可以给不同服务器设置不同的 skew 阈值。

上述 Watermark 算法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class WatermarkProcessor implements AssignerWithPeriodicWatermarks<UserLogin> {
private static final long ALLOWED_EVENT_TIME_SKEW = 1000L;
private static final Map<String, Long> maxTimestampPerServer = new HashMap<>(3);
@Nullable
public Watermark getCurrentWatermark() {
Optional<Long> maxTimestamp = maxTimestampPerServer.values().stream()
.min(Comparator.comparingLong(Long::valueOf));
if (maxTimestamp.isPresent()) {
return new Watermark(maxTimestamp.get() - ALLOWED_EVENT_TIME_SKEW);
} else{
return null;
}
}
public long extractTimestamp(UserLogin userLogin, long previousElementTimestamp) {
String server = userLogin.getServer();
long eventTime = userLogin.getEventTime();
if (!maxTimestampPerServer.containsKey(server) ||
userLogin.getEventTime() > maxTimestampPerServer.get(server)) {
maxTimestampPerServer.put(server, eventTime);
}
return eventTime;
}
}

总结

流计算和批处理谁是表达能力更强的计算模式,这个问题或许还将继续被争论下去,不过根据 The Dataflow Model 我们已经有足够的理论支撑来开发低延迟高准确并且可容错的流计算应用。其中流计算的准确性很大程度上决定于数据流时间的乱序程度,因此我们在开发实时流计算应用时,比起开发离线批处理应用,很大的一个不同是要考虑数据是以什么顺序到达,并针对性地设计 Watermark 算法来处理数据流时间的乱序。Watermark 算法需要平衡低延迟和高准确性两者,在引入最低延迟成本的情况下准确判断窗口的计算和输出结果的时机,通常可以从 processing lag 和 event time skew 两者的容忍阈值入手。

参考文献

1.How to beat the CAP theorem
2.The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive Scale, Unbounded, Out of Order Data Processing
3.Streaming 102: The world beyond batch
4.Tyler AkidauSlava, Chernyak, Reuven Lax. (2018). Streaming Systems.