基于 Log 的通用增量 Checkpoint​

摘要:本文整理自 Apache Flink Contributor 俞航翔 9 月 24 日在 Apache Flink Meetup 的分享。主要内容包括:


  1. Checkpoint 性能优化之路

  2. 解析 Changelog

  3. 一览 State/Checkpoint 优化

基于 Log 的通用增量 Checkpoint​
Flink 作为一个 Stateful 计算引擎,State 是其非常重要的概念,它支持 Stateful 算子通过 State 记录多个 Events 之间的信息,并在 Checkpoint 时做状态持久化,存储全局一致性快照,在恢复时通过 Resume Checkpoint 以及 Replay 来实现不同语义的一致性保障。

容错是长期运行的流式系统非常重要的部分,而 Checkpoint 的主要目的就是解决 Failover 问题。基于此目标,它的生命周期完全由 Flink 管理。因此对于不同的 StateBackend 可以用特定的原生格式进行存储,利用 StateBackend 的内部机制,比如 Incremental Checkpoint 做进一步优化等。

基于以上机制, Checkpoint 目前的设计目标就是更轻量以及更快速的恢复。

基于 Log 的通用增量 Checkpoint​
那 Flink 在 Chekpoint 性能上做了哪些优化呢?

最早期,在支持轻量级异步 Snapshot 算法后, Flink Checkpoint 的性能往前迈了一大步。在该机制下,Flink 将  Barrier 作为特殊 Record 在 Graph 中流动,同时将耗时较大的文件上传等工作放到异步过程中进行,极大降低了对主流程的影响。

在 1.0 版本中,Flink 开始支持 RocksDB StateBackend ,这对大状态下的存储提供了很好的支持。1.3 版本 Flink实现了基于 RocksDB Incremental Checkpoint 的机制,进一步提升了Checkpoint 在异步阶段的性能。1.11 版本 Flink 引入了 Unaligned Checkpoint,并在 1.13 版本达到了 Production-ready 状态,对于 Barrier 对齐有瓶颈的作业,这个技术让作业在反压比较严重的情况下依然可以做出 Checkpoint 。1.14 版本引入的 Buffer Debloating 可以通过动态调整 Network Buffer 大小来加速 Barrier 流动,进一步加速 Aligned Checkpoint 完成,减少 Unaligned Checkpoint 存储的数据量。1.15 和 1.16 中 Flink 引入了 Changelog StateBackend ,它通过更通用的 Incremental Checkpoint 机制进一步提升了 Checkpoint 的异步性能。

基于 Log 的通用增量 Checkpoint​

我们可以通过这张图来看一下这些技术在作业执行链路中的作用。

当 Checkpoint 触发时, Barrier 会随着 Graph 流动。当打开 Buffer Debloating 后,Flink 会通过计算吞吐等方式动态调整 Network Buffer 的大小来加速 Barrier 传递。当 Barrier 到达 Stateful 节点时,如果是 Aligned Checkpoint ,则算子会等待 Barrier 对齐再触发后续的 Checkpoint 行为;如果是 Unaligned Checkpoint ,则会直接将 Barrier 传递给后续算子,同时触发 StateBackend 上的 Checkpoint,并将 Buffer 中的内容存储到 HDFS ,不会阻塞,如图中间的实线所示。

在 StateBackend 内部触发 Checkpoint 时,基于异步 Checkpoint 算法,异步部分会进行文件上传,如图中实线所示。在开启了 RocksDB Incremental Checkpoint 时,会做增量文件上传,如上图中,只有更新后的 S2、S4 需要上传到远端。

Changelog StateBackend 的核心机制如上图最下方虚线所示,会将原先的 StateBackend 异步上传和 Changelog 部分进行解耦,引入了额外独立上传到 HDFS 的过程,这个过程将稳定、持续地发生在作业运行过程中。

基于 Log 的通用增量 Checkpoint​
我们可以通过 Checkpoint Metrics 相关信息来直观感受下这些技术的作用。

Metrics 上关于 Size 的关键指标有 Checkpointed DataSize 和 Full Checkpoint DataSize 两个,是 1.15 版本在 Flink UI 中透出的指标,我们可以通过这两个指标直接看出增量 Checkpoint 的性能以及存储在远端的空间大小。

剩余的指标主要是关于耗时的,StartDelay 指标过大通常是因为作业反压,一般需要先排查作业逻辑。此外,StartDelay 过大时,如果作业逻辑允许,可开启 Unaligned Checkpoint 和 Buffer Debloating 以加速 Checkpoint 完成。如果 Aligned Duration 指标比较长,可以考虑开启 Unaligned Checkpoint 。

同步 Duration 和异步 Duration 是整个 Checkpoint 是我们通常最关注的两个部分,而异步部分的耗时通常是最常见的瓶颈点。异步部分的优化可以通过开启 Incremental Checkpoint 以减少异步上传量,或者通过开启 Changelog StateBackend 来进一步缩短异步耗时。

02

解析 Changelog

基于 Log 的通用增量 Checkpoint​

Changelog 的核心目标如下:
  1. 更稳定的 Checkpoint:通过解耦 Compaction 和 Checkpoint 过程,使 Checkpoint 更稳定,大幅减少 Checkpoint Duration 突增的情况,还可进一步减少 CPU 抖动,使网络带宽变得更平稳。
    在大规模、大状态作业上经常会出现 CPU 随着 Checkpoint 周期性抖动,进而影响作业和集群稳定性的情况。Changelog 通过解耦 Checkpoint 触发 Compaction 的过程,可以使 CPU 变得更平稳。另外,在异步过程中,Compaction 导致的大量文件同时上传有时会将网络带宽打满, 而 Changelog 是能够缓解该状况的。
  2. 更快速的 Checkpoint:Checkpoint 期间会上传相对固定的增量,可以达到秒级完成 Checkpoint 的目标。
  3. 更小的端到端延迟:Flink 中实现端到端的 Exactly-once 语义主要依赖于 Checkpoint 的完成时间。Checkpoint 完成越快,Transactional sink 可以提交得更频繁,保证更好的数据新鲜度。后续可与 Table Store 结合,保证 Table Store 上的数据更新鲜。
  4. 更少的数据回追:通过设置更小的 Checkpoint Interval 加速 Failover 过程,可以减少数据回追。
    虽然目前 Changelog 的机制下,Restore 时在 TM 上会有额外的 Replay 时间开销,但总体来看,耗费的时间还是相对减少的。
基于 Log 的通用增量 Checkpoint​

那 RocksDB Incremental Checkpoint 为什么做不到快速且稳定呢?

我们先看下 RocksDB 的访问机制:当一条 Record 写到 RocksDB 时,首先会写到 Memtable ,数据量达到 Memtable 阈值后会 Memtable 变为 Immutable Memtable;当数据量再达到整个 Memory 所有 Memtable 的阈值后,会 Flush 到磁盘,形成 SST Files 。L0 的 SST files 之间是有 Overlap 的 。Flink 默认使用 RocksDB 的 Level Compaction 机制 ,因此在 L0 达到阈值后,会继续触发 Level Compaction,与 L1 进行 Compaction ,进一步可能触发后续 Level Compaction。
我们再来看一下 Checkpoint 同步阶段和异步阶段做了些什么。在同步过程中,Checkpoint 首先会触发 Memtable 强制 Flush,这一过程可能会触发后面级联的 Level Compaction,该步骤可能导致大量文件需要重新上传。同时,同步过程中会做 Local Checkpoint ,这是 RocksDB 本地的 Checkpoint 机制,对 Rocksdb 而言其实就是 Hard Link 一些 SST Files,是相对轻量的。异步过程会将这些 SST Files 上传,同时写入 Meta 信息。

我们可以看到两个重要部分:

  1. 数据量达到阈值,或者 cp 的同步阶段,是会触发 Memtable Flush,进一步触发级联 Level Compation,进一步导致大量文件需要重新上传的
  2. 在大规模作业中,每次 Checkpoint 可能都会因为某一个 Subtask 异步时间过长而需要重新上传很多文件。端到端 Duration 会因为 Compaction 机制而变得很长。

基于 Log 的通用增量 Checkpoint​
那么 Changelog 做了什么改进呢?

在介绍 Changelog 的机制之前,我先介绍下几个内部的术语

State Table 是本地状态数据读写结构,比如 RocksDB。我们更倾向于将 Changelog 理解为 StateBackend 上的功能增强,已有的 StateBackend(HashmapStateBackend/RocksDBStateBackend,或者自定义的一种StateBackend)均可以打开该功能 。而且我们在 1.16 中实现了 Changelog 开到关和关到开的兼容性,用户可以非常方便地在存量作业中使用。

Materialization 是 State Table 持久化的过程,可以理解为 RocksDBStateBackend 或 HashmapStateBackend 做 Checkpoint 的过程。目前会定时触发,完成一次成功的 Materialization 后会 Truncate 掉 Changelog ,即做 Changelog 的清理。

DSTL 是 Changelog 的存储组件。Changelog 的写入需要提供持久化、低延迟、一致性及并发支持。目前基于 DFS 实现了 DSTL,后续我们将继续探索其他实现方式。

基于 Log 的通用增量 Checkpoint​
Changelog 的机制很像 WAL 的机制。

如图所示,图中下面部分为 State Table ,上面为 Changelog 的存储部分即 DSTL 。

首先,在状态写入时,会同时写到 State Table 和 DSTL,如果 State Table 是 Rocksdb,那么它的后续流程就像我们刚才提到的一样,包括写 Memtable,Flush,触发 Compaction 等等过程。DSTL 这个部分会以操作日志的方式追加写入 DSTL,我们也支持了不同 State 类型的各种操作的写入。

其中 DSTL 会有一套完整的定时持久化机制持久化到远端存储中,所有 Changelog 将会在运行过程中连续上传,同时在 Checkpoint 上传较小的增量。

State Table 会定时进行 Materialization,在完成一次完整的 Materialization 后将会对 Changelog 进行 Truncate,清理掉失效的 Changelog,然后新的 Checkpoint 将以这个 Materialization 为基准继续持续上传增量。

我们按读写流程、Checkpoint 流程、Restore 流程再总结下这个过程。

在状态写入时,会双写到 State Table 和 Dstl,读取时会从 State Table 中读取,即读写链路上只是多了个双写,且 Dstl 的部分是 append-only 的写入,会非常轻量级。

在 Checkpoint 时,依赖于定时 Materilize State Table,以及定期 Persist Changelog,需要上传的增量会非常小,在上传小增量后只需要把 Materialization 部分的 Handle 和 Changelog 部分的 Handle 组合返回给 jm 即可。同时我们也会依赖于 Truncate 机制去及时清理无效 Changelog。

在 Restore 时,我们拿到了 State Table 的 Handle 和 Changelog 的 Handle,State Table 部分会按之前的方式进行 Retsore,比如是 Rocksdb,那么在没开启 Local Recovery 时,会先下载远端 SST Files,再 Rebuild。Changelog 部分再没有开启 Local Recovery 时,会先下载远端 State Change,然后重新 Apply 到 State Table,Rebuild Changelog 部分。

基于 Log 的通用增量 Checkpoint​
Changelog 开启后,Checkpoint 文件上传过程会变得非常平滑。此前,只有在 Checkpoint 时才会上传文件;而现在有了 Materialization 以及定期做 Changelog 增量上传,实际做 Checkpoint 时需要上传的增量变得非常小。

基于 Log 的通用增量 Checkpoint​
上图为相关常用参数的含义和使用方法。

基于 Log 的通用增量 Checkpoint​
Changelog 能够使 Checkpoint 更快速以及更稳定,但是会存在三个额外开销:

  • 额外的存储空间。Truncate 之前,State Changelog 会一直占用额外的存储空间。
  • 额外的恢复开销。Restore 过程需要额外 Apply 以及额外下载,因此也需要额外的恢复,恢复过程会占用耗时。
  • 额外的性能开销。State Changelog 会做定时上传,存在一定的性能开销。
基于 Log 的通用增量 Checkpoint​

我们基于 RocksDB incremental 与 Changelog 做了 Benchmark 。Changelog 下使用的 State Table 为 RocksDB,开启 Incremental ,使用 OSS 作为存储介质;将 Checkpoint Interval 设置为 1 秒,对 RocksDB 而言意味着尽可能快地执行 Checkpoint ;将 Materialization Interval 设置为 3 分钟,Source Rate 设置为 10k /s,该速率对于两者而言都是比较日常的流量。

基于 Log 的通用增量 Checkpoint​
结果显示,RocksDB 侧 Duration 不稳定,时少时多。Checkpoint Datasize 存在周期性特征,每隔 4 个会增加,这是由于 Checkpoint 期间会 Flush Memtable 触发 Compaction 导致,且 RocksDB 配置 L0 的阈值为 4。而 Changelog 部分执行很稳定。

基于 Log 的通用增量 Checkpoint​
上图展示了 Checkpoint Duration 情况,我们截取了 P99 的数据做了两组参数,主要针对不同单并发 State Size 。结果显示,CP 端到端的延时,Changelog 可以在 1s 以内完成,RocksDB 延迟约 17 秒。

基于 Log 的通用增量 Checkpoint​
大 State 在 1GB 单并发场景下,空间开销约为 1.2-1.5 倍。在内存中会更多一些,因为 RocksDB 在内存中数据会变得更紧致。

实际测试中,Sliding Window 场景下的空间消耗会更明显,可能会超出两倍。因为 Sliding Window 时,每个 Windows 之间 State 不共享,会存储多份。另外,Sliding Window 在 Checkpoint 期间会不断触发 Purge 操作,Sliding Size 设置越小,Purge 越多,RocksDB 相对能更好地合并 Push 操作。另外,因为 Changelog 使用了操作日志方式存储,Truncate 比较慢,部分全量会放得很大。

针对于 Sliding Window 的优化也一直在讨论中,比如支持 Changelog 之间的 Merge 方式。目前机制下,是否启用 Changelog 是空间放大与 Checkpoint 稳定性和更快速度之间的取舍。目前云上的空间相对廉价,因此在大多数情况下,牺牲空间换取性能和稳定性的方案是可以接受的。

基于 Log 的通用增量 Checkpoint​
将 Local Recovery 关闭后,RocksDB 与 Changelog 之间存在 9 秒的时间差。结合 Local Recovery 功能将 download 部分的差距抹掉,最终时间差距为 3 秒左右。在极限 TPS 上,Changelog 会有 10% 的损耗。测试时将整个 Benchmark 打满,打到反压后测试其极限状态。后续我们将针对 DSTL 上传部分做优化。

值得注意的是,这里测试的是极限情况。日常情况下,两者的 TPS 性能相差不多,甚至 Changelog 更优。因为在 同样的 Interval 设置下, RocksDB Compaction 会变得更频繁。

基于 Log 的通用增量 Checkpoint​
未来,我们的优化方向主要包含以下三个:


  • 第一,减少空间消耗,以及优化极限 TPS 场景。

  • 第二,结合 Failover 2.0,使 Checkpoint 做得更快、使 Recovery 变得更快以及实现 Reactive Mode 功能。因为在 Reactive Mode 下,增加了资源后会依赖于该机制做 Failover 后重启。如果能够将 Checkpoint 做得更快,状态恢复也会变得更快。

  • 第三,让 Table Store 获得更好的数据新鲜度。


03

一览 State/Checkpoint 优化

基于 Log 的通用增量 Checkpoint​

1.16 版本在 State 和 Checkpoint 方面也做了不少优化。

可用性方面,做了针对于 RocksDB 监控和可用性的提升,导出了 Database Level 监控。同时也提高了 Unaligned Checkpoint 和 Aligned  Checkpoint 之间的切换可用性。

性能方面,基于 DeleteRange 将 RocksDB Rescale 性能提升 2-10 倍,基于 Overdraft Buffer 提升了 Checkpoint 性能。

■ 后面在 Flink Forward Asia 2022 核心技术专场分享的文章中,会有更加完整的展示!


Q&A

Q:RocksDB 与 Changelog 两种存储,相当于在 HDFS 上存了两份,文件系统容量 1.2 是如何计算得出的?
A:按 Full Checkpoint Size 计算得出。

Q:在 HDFS上的容量应该相当于两倍?
A:不一定是两倍。做完一次 Materialization 后,Changelog 部分的增量是会变化的。HDFS 上也会做定时清理,避免膨胀。
Q:HDFS 有两份,在恢复时也下载了两份数据,需要做数据版本合并吗?
A:是的。RocksDB 先恢复,然后将 Changelog Apply ,类似于合并操作。
Q:Changelog 数据应该已经比较准确,再做合并操作是否冗余?
A:Changelog 机制是基于 State Table 上做增量,因此作业恢复时还是需要全量数据。

0 0 投票数
文章评分

本文转载自俞航翔 @ 阿里 Apache Flink,原文链接:https://mp.weixin.qq.com/s/pU2RVm0X8sp9FC_y4mYOzQ。

(0)
上一篇 2023-01-07 15:44
下一篇 2023-01-11 14:03

相关推荐

订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x