流处理系统最重要的特性是端到端的延迟,端到端延迟是指开始处理输入数据到输出该数据产生的结果所需的时间。Flink,作为流式计算的标杆,其端到端延迟包括容错的快慢主要取决于检查点机制(Checkpointing),所以如何将 Checkpoint 做得高效稳定是 Flink 流计算的首要任务。我们在 “Flink 新一代流计算和容错——阶段总结和展望” 一文中介绍了 Flink 从社区 1.12 版本开始所做的提升 Checkpointing 机制的努力,本文将着重介绍其中刚刚在 Flink 1.15 版本发布的 Generic Log-Based Incremental Checkpointing 这个功能。
Generic Log-Based Incremental Checkpointing 的设计初衷是我们将全量的状态快照和增量的检查点机制分隔开,通过持续上传增量 Changelog 的方法,来确保每次 Checkpointing 可以稳定快速的完成,从而减小 Checkpointing 之间的间隔,提升 Flink系统端到端的延迟。拓展开来说,主要有如下三点提升:
那是怎么做到的呢?我们知道影响 Flink Checkpointing 时间的主要因素有以下几点:
对Flink Checkpoint 机制不太了解的读者可以参考 1。
Flink 1.12 版本引入的 Unaligned Checkpoint 和 1.14 版本中引入的 Buffer Debloating 主要解决了上述第 1 个问题,尤其是在反压的情况下。更早之前引入的 Incremental Checkpoint 是为了减少每次 Checkpointing 所需要持久化存储状态的大小,以减小第 2 个影响因素,但在实际中也不完全能做到:现有 Incremental Checkpoint 是基于 RocksDB 来完成的,RocksDB 出于空间放大和读性能的考虑会定期做 Compaction。Compaction 会产生新的、相对较大的文件,会增加上传所需要的时间。每一个执行 Flink 作业的物理节点(Task)至少有一个 RocksDB 实例,所以Checkpoint 被延迟的概率会随着物理节点增多而变大。这导致在 Flink 的大型作业中,几乎每次完成 Checkpointing 时都有可能会因为某个节点而延迟,如下图所示。
图1: 每次 Checkpoint 都可能因为某个节点上传文件缓慢而延迟
另外值得一提的是在现有的 Checkpointing 机制下,Task 只有在收到至少一个 Checkpoint Barrier 之后,才会做状态快照并且开始持久化状态快照到高可用存储,从而增加了 Checkpoint 完成时间,如下图所示。
图2: 在现有机制下,快照在 Checkpoint Barrier 到达之后才会开始上传
在新的设计中,我们通过持续上传增量 Changelog 的方法,可以避免这个限制,加速 Checkpoint 完成时间。下面我们来看看详细的设计。
Generic Log-Based Incremental Checkpointing 的核心思想是引入 State Changelog(状态变化日志),这样可以更细粒度地持久化状态:
新的设计中需要在做 Checkpoint 的时候上传的数据量变得很少,不仅可以把 Checkpoint 做得更稳定,还可以做得更高频。整个工作流程如下图所示:
图3: Generic Log-Based Incremental Checkpointing 工作流程
Generic Log-Based Incremental Checkpointing 类似传统数据库系统的 WAL 机制:
这种和 WAL 类似的机制可以有效提升 Checkpoint 完成的速度,但也带来一些额外的开销:
我们在后面的 Benchmark 对比中,也会对这三方面的影响进行分析。特别对于第 3 点,额外的重放 Changelog 所带来的容错恢复时间增加会在一定程度上因为可以做更频繁的 Checkpoint 所弥补,因为更频繁的 Checkpoint 意味着容错恢复后需要回放的处理数据更少。
Generic Log-Based Incremental Checkpointing 的很重要的一个组件是 State Changelog 存储这个部分,我们称之为 Durable Short-term Log(DSTL,短存 Log)。DSTL 需要满足以下几个特性:
State Changelog 是组成 Checkpoint 的一个部分,所以也需要能持久化存储。同时,State Changelog 只需要保存从最近一次持久化 State Table 到当前做 Checkpoint 时的 Changelog,因此只需要保存很短时间(几分钟)的数据。
只有在 Restore 或者 Rescale 的情况下才需要读取 Changelog,大部分情况下只有 append 操作,并且一旦写入,数据就不能再被修改。
引入 State Changelog 是为了能将 Checkpoint 做得更快(1s 以内)。因此,单次写请求需要至少能在期望的 Checkpoint 时间内完成。
如果我们有多个 State Changelog 的副本,就会产生多副本之间的一致性问题。一旦某个副本的 State Changelog 被持久化并被 JM 确认,恢复时需要以此副本为基准保证语义一致性。
从上面的特性也可以看出为什么我们将 Changelog 存储命名为 DSTL 短存 Log。
DSTL 可以有多种方式实现,例如分布式日志(Kafka)、分布式文件系统(DFS),甚至是数据库。在 Flink 1.15 发布的 Generic Log-Based Incremental Checkpointing MVP 版本中,我们选择 DFS 来实现 DSTL,基于如下考虑:
另一方面,使用 DFS 有以下缺点:
经过一些初步实验,我们认为目前大部分 DFS 实现(例如 S3,HDFS 等)的性能可以满足 80% 的用例,后面的 Benchmark 会提供更多数据。
下图以 RocksDB 为例展示了基于 DFS 的 DSTL 架构图。状态更新通过 Changelog State Backend 双写,一份写到 RocksDB,另一份写到 DSTL。RocksDB 会定期进行 Materialization,也就是将当前的 SST 文件 上传到 DFS;而 DSTL 会将 state change 持续写入 DFS,并在 Checkpointing 的时候完成 flush,这样 Checkpoint 完成时间只取决于所需 flush 的数据量。需要注意的是 Materialization 完全独立于 Checkpointing 的过程,并且 Materialization 也可以比 Checkpointing 的频率慢很多,系统默认值是 10 分钟。
图4: 以 RocksDB 为例基于 DFS 的 DSTL 架构图
这里还有几个问题值得补充讨论一下:
前面有提到在新的架构中,一个 Checkpoint 由两部分组成:1)State Table 和 2)State Change Log。这两部分都需要按需清理。1)这个部分的清理复用 Flink 已有的 Checkpoint 机制;2)这个部分的清理相对较复杂,特别是 State Change Log 在当前的设计中为了避免小文件的问题,是以 TM 为粒度的。在当前的设计中,我们分两个部分来清理 State Change Log:一是 Change Log 本身的数据需要在 State Table 物化后删除其相对应的部分;二是 Change Log 中成为 Checkpoint 的部分的清理融合进已有的 Flink Checkpoint 清理机制。
DFS 相关问题
DFS 的一个问题是每个 Checkpoint 会创建很多小文件,并且因为 Changleog State Backend 可以提供更高频的 Checkpoint,小文件问题会成为瓶颈。为了缓解这种情况,我们将同一个 Task Manager 上同一作业的所有 State Change 写到同一个文件中。因此,同一个 Task Manager 会共享同一个 State Change Log。
为了解决 DFS 高长尾延迟问题,DFS 写入请求会在允许超时时间(默认为 1 秒)内无法完成时重试。
Generic Log-Based Incremental Checkpointing 对于 Checkpoint 速度和稳定性的提升取决于以下几个因素:
在 Benchmark 实验中,我们使用如下配置:
我们第一部分的实验,主要针对每次更新的 Key 值都不一样的负载;这种负载因为上述第 2 点和第 4 点的原因,Changelog 的提升是比较明显的:Checkpoint 完成时间缩短了 10 倍(99.9 pct),Checkpoint 大小增加 30%,恢复时间增加 66% - 225%,如下表所示。
表1: 基于 ValueState Workload 的 Changelog 各项指标对比
下面我们来更详细的看一下 Checkpoint Size 这个部分:
表2: 基于 ValueState Workload 的 Changelog(开启/关闭)的 Checkpoint 相关指标对比
这里使用的是 Sliding Window。如下表所示,Changelog 对 checkpoint 完成时间加速 3 倍左右;但存储放大要高得多(消耗的空间接近 45 倍):
表3: 基于 Window Workload 的 Changelog(开启/关闭)的 Checkpoint 相关指标对比
Full Checkpoint Data 存储空间放大主要原因来自于:
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。