Flink 应用要想大规模可靠运行,必须满足两个条件:
- 应用程序需要能够可靠地进行checkpoint操作
- 故障后资源需要足够赶上输入数据流
第一部分讨论如何大规模执行checkpoint。 最后一部分解释了一些关于规划要使用多少资源的最佳实践。
监控检查点行为的最简单方法是通过 UI 的检查点部分。 检查点监控的文档显示了如何访问可用的检查点指标。
扩大检查点时特别感兴趣的两个问题(都通过任务级别指标和 Web 界面公开)是:
- 当触发 Checkpoint 的时间一直很高,也就是Operator 收到第一个 Checkpoint Barrier 的时间较长时,这意味着 Checkpoint Barrier 需要很长时间才能从源头到达 Operator。 这通常表明系统在恒定背压下运行。
- 对齐持续时间,定义为接收第一个和最后一个检查点屏障之间的时间。 在未对齐的仅一次检查点和至少一次检查点期间,子任务正在处理来自上游子任务的所有数据而没有任何中断。 但是,对于一次性检查点对齐,已经收到检查点屏障的通道将被阻止发送进一步的数据,直到所有剩余通道都赶上并接收它们的检查点屏障(对齐时间)。
理想情况下,这两个值都应该很低 – 较高的数量意味着由于一些背压检查点屏障缓慢地通过作业图,(没有足够的资源来处理传入的记录)。 这也可以通过增加处理记录的端到端延迟来观察。 请注意,在存在瞬时背压、数据倾斜或网络问题的情况下,这些数字有时会很高。
非对齐的检查点可用于加快检查点障碍的传播时间。 但是请注意,这并不能解决导致背压的根本问题(并且端到端记录延迟将保持很高)。
Checkpointing调优
应用程序可以配置定期触发检查点。 当检查点的完成时间超过检查点间隔时,在进行中的检查点完成之前不会触发下一个检查点。 默认情况下,一旦正在进行的检查点完成,将立即触发下一个检查点。
当检查点最终经常花费比基本间隔更长的时间(例如,因为状态增长大于计划,或者存储检查点的存储暂时很慢),系统会不断地获取检查点(一旦完成,新的检查点就会立即启动) . 这可能意味着过多的资源一直被检查点所占用,而Operator的处理太少。 此行为对使用异步检查点状态的流式应用程序的影响较小,但仍可能对整体应用程序性能产生影响。
为了防止这种情况,应用程序可以定义检查点之间的最小持续时间:
StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
此持续时间是最近一个检查点结束和下一个检查点开始之间必须经过的最小时间间隔。 下图说明了这如何影响检查点。
注意:可以(通过 CheckpointConfig)配置应用程序以允许同时进行多个检查点。 对于 Flink 中状态较大的应用程序,这通常会将过多的资源绑定到检查点中。 当手动触发保存点时,它可能与正在进行的检查点同时进行。
RocksDB调优
许多大型 Flink 流应用程序的状态存储主力是 RocksDB 状态后端。 后端的扩展性远远超出了主内存,并且可靠地存储了大的keyed状态。
RocksDB 的性能可能因配置而异,本节概述了使用 RocksDB 状态后端调整作业的一些最佳实践。
增量Checkpoints
在减少检查点花费的时间方面,激活增量检查点应该是首要考虑因素之一。 与完整检查点相比,增量检查点可以显着减少检查点时间,因为增量检查点仅记录与先前完成的检查点相比的更改,而不是生成状态后端的完整、自包含备份。
RocksDB 或 JVM 堆中的计时器
计时器默认存储在 RocksDB 中,这是更健壮和可扩展的选择。
当性能调整作业只有几个计时器(没有窗口,不使用 ProcessFunction 中的计时器)时,将这些计时器放在堆上可以提高性能。 请谨慎使用此功能,因为基于堆的计时器可能会增加检查点时间,并且自然无法扩展到内存之外。
RocksDB内存调优
RocksDB 状态后端的性能很大程度上取决于它可用的内存量。 为了提高性能,增加内存会很有帮助,或者调整内存的功能。
默认情况下,RocksDB 状态后端使用 Flink 为 RocksDB 缓冲区和缓存管理的内存预算(state.backend.rocksdb.memory.managed: true)。 有关该机制如何工作的背景信息,请参阅 RocksDB 内存管理。
要调整与内存相关的性能问题,以下步骤可能会有所帮助:
- 尝试提高性能的第一步应该是增加托管内存的数量。 这通常会大大改善这种情况,而不会增加调整底层RocksDB 选项的复杂性。
- 尤其是对于大型容器/进程大小,大部分总内存通常可以流向 RocksDB,除非应用程序逻辑本身需要大量 JVM 堆。 默认托管内存分数 (0.4) 是保守的,并且在使用具有多 GB 进程大小的 TaskManager 时通常可以增加。
- RocksDB 中写入缓冲区的数量取决于应用程序中的状态数量(管道中所有Operator的状态)。 每个状态对应一个 ColumnFamily,它需要自己的写缓冲区。 因此,具有许多状态的应用程序通常需要更多内存才能获得相同的性能。
- 您可以通过设置 state.backend.rocksdb.memory.managed: false 来尝试比较使用托管内存的 RocksDB 与使用每列族内存的 RocksDB 的性能。特别是针对基线进行测试(假设没有或适当的容器内存限制)或测试与早期版本的 Flink 相比的回归,这可能很有用。
- 与托管内存设置(恒定内存池)相比,不使用托管内存意味着 RocksDB 分配的内存与应用程序中的状态数成正比(内存占用量随着应用程序的变化而变化)。根据经验,非托管模式(除非应用 ColumnFamily 选项)的上限约为“140MB num-states-across-all-tasks num-slots”。计时器也算作状态!
- 如果您的应用程序有许多状态并且您看到频繁的 MemTable 刷新(写入端瓶颈),但您无法提供更多内存,您可以增加写入缓冲区的内存比例(state.backend.rocksdb.memory.write-buffer-比率)。有关详细信息,请参阅 RocksDB 内存管理。
- 一个高级选项(专家模式)可以通过 RocksDBOptionsFactory 调整 RocksDB 的 ColumnFamily 选项(arena块大小、最大后台刷新线程等)来减少具有许多状态的设置中的 MemTable 刷新次数:
public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
// increase the max background flush threads when we have many states in one operator,
// which means we would have many column families in one DB instance.
return currentOptions.setMaxBackgroundFlushes(4);
}
@Override
public ColumnFamilyOptions createColumnOptions(
ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
// decrease the arena block size from default 8MB to 1MB.
return currentOptions.setArenaBlockSize(1024 * 1024);
}
@Override
public OptionsFactory configure(ReadableConfig configuration) {
return this;
}
}
容量规划
本节讨论如何确定 Flink 作业应该使用多少资源才能可靠运行。 容量规划的基本经验法则是:
- 正常运行应有足够的能力,不会在恒定背压下运行。 有关如何检查应用程序是否在背压下运行的详细信息,请参阅背压监控。
- 在无故障时间内无背压运行程序所需的资源之上提供一些额外资源。 需要这些资源来“赶上”在应用程序恢复期间积累的输入数据。 这应该是多少取决于恢复操作通常需要多长时间(这取决于需要在故障转移时加载到新 TaskManager 中的状态大小)以及场景需要故障恢复的速度。
- 重要提示:应该在激活检查点的情况下建立基线,因为检查点会占用一些资源(例如网络带宽)。
- 临时背压通常是可以的,并且在负载峰值期间、追赶阶段或外部系统(写入接收器中)出现临时减速期间执行流控制的重要部分。
- 某些操作(如大窗口)会导致其下游操作符的负载激增:在窗口的情况下,下游操作符可能在构建窗口时几乎无事可做,而在发出窗口时有负载要做。 下游并行性的规划需要考虑到窗口发出多少以及需要以多快的速度处理这种峰值。
重要:为了允许以后添加资源,请确保将数据流程序的最大并行度设置为合理的数字。 最大并行度定义了在重新缩放程序时(通过保存点)可以设置程序并行度的高度。
Flink 的内部簿记以 max-parallelism-many 键组的粒度跟踪并行状态。 Flink 的设计力求使最大并行度具有非常高的值变得高效,即使以低并行度执行程序也是如此。
压缩
Flink 为所有检查点和保存点提供可选的压缩(默认:关闭)。 目前,压缩始终使用 snappy 压缩算法(版本 1.1.4),但我们计划在未来支持自定义压缩算法。 压缩作用于keyed状态下键组的粒度,即每个键组可以单独解压缩,这对于重新缩放很重要。
可以通过 ExecutionConfig 激活压缩:
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setUseSnapshotCompression(true);
注意压缩选项对增量快照没有影响,因为它们使用的是 RocksDB 的内部格式,该格式始终使用开箱即用的 snappy 压缩。
本地任务恢复
动机
在 Flink 的检查点中,每个任务都会生成其状态的快照,然后将其写入分布式存储。 每个任务通过发送描述状态在分布式存储中的位置的句柄来向Jobmanager确认状态的成功写入。 Jobmanager反过来收集所有任务的句柄并将它们捆绑到一个检查点对象中。
在恢复的情况下,Jobmanager打开最新的检查点对象并将句柄发送回相应的任务,然后可以从分布式存储中恢复它们的状态。 使用分布式存储来存储状态有两个重要的优势。 首先,存储是容错的,其次,分布式存储中的所有状态都可以被所有节点访问,并且可以很容易地重新分配(例如,用于重新缩放)。
但是,使用远程分布式存储也有一个很大的缺点:所有任务都必须通过网络从远程位置读取它们的状态。 在很多情况下,recovery 可以将失败的任务重新调度到与上次运行时相同的Taskmanager(当然也有机器故障等例外),但我们仍然需要读取远程状态。 这可能导致大型状态的恢复时间很长,即使单台机器上只有一个小故障。
方法
任务本地状态恢复正是针对这个恢复时间长的问题,其主要思想是:对于每个检查点,每个任务不仅将任务状态写入分布式存储,而且在一个备份中保存一份状态快照的副本。 任务本地的存储(例如在本地磁盘或内存中)。 请注意,快照的主存储仍然必须是分布式存储,因为本地存储在节点故障下无法确保持久性,并且也不提供其他节点重新分配状态的访问权限,因此此功能仍然需要主副本。
但是,对于每个可以重新调度到先前位置进行恢复的任务,我们可以从辅助的本地副本恢复状态,并避免远程读取状态的成本。 鉴于许多故障不是节点故障,并且节点故障通常一次只影响一个或很少的节点,因此在恢复过程中,大多数任务很可能可以返回到它们之前的位置并发现它们的本地状态完好无损。 这就是使本地恢复有效地减少恢复时间的原因。
请注意,根据所选的状态后端和检查点策略,创建和存储辅助本地状态副本的每个检查点可能会产生一些额外费用。 例如,在大多数情况下,实现将简单地将分布式存储的写入复制到本地文件。
主(分布式存储)和从(任务本地)状态快照的关系
任务本地状态始终被视为辅助副本,检查点状态的基本事实是分布式存储中的主副本。 这对检查点和恢复期间的本地状态问题有影响:
- 对于检查点,主副本必须成功,并且生成辅助本地副本的失败不会使检查点失败。如果无法创建主副本,即使已成功创建辅助副本,检查点也会失败。
- 只有主副本由Jobmanager确认和管理,辅助副本由Taskmanager拥有,并且它们的生命周期可以独立于它们的主副本。例如,可以保留 3 个最新检查点的历史记录作为主副本,并且只保留最新检查点的任务本地状态。
- 对于恢复,如果有匹配的辅助副本可用,Flink 将始终首先尝试从任务本地状态恢复。如果在从副本恢复过程中出现任何问题,Flink 会透明地重试从主副本恢复任务。仅当主副本和(可选)辅助副本失败时,恢复才会失败。在这种情况下,根据配置,Flink 仍可能回退到旧的检查点。
- 任务本地副本可能仅包含完整任务状态的一部分(例如,写入一个本地文件时出现异常)。 在这种情况下,Flink 会首先尝试在本地恢复本地部分,非本地状态从主副本恢复。 主状态必须始终是完整的,并且是任务本地状态的超集。
- 任务本地状态可以具有与主状态不同的格式,它们不需要字节相同。 例如,任务本地状态甚至可能是由堆对象组成的内存中,而不是存储在任何文件中。
- 如果task manager丢失,则其所有任务的本地状态都会丢失。
配置从本地恢复
任务本地恢复默认是停用的,可以通过 Flink 的配置使用 CheckpointingOptions.LOCAL_RECOVERY 中指定的 key state.backend.local-recovery 来激活。 此设置的值可以是 true 以启用或 false(默认)以禁用本地恢复。
请注意,未对齐的检查点当前不支持任务本地恢复。
关于不同状态后端任务本地恢复的详细信息
限制:目前,任务本地恢复仅涵盖keyed状态后端。 keyed状态通常是该状态的最大部分。 在不久的将来,我们还将介绍操作员状态和计时器。
以下状态后端可以支持任务本地恢复。
- FsStateBackend:keyed状态支持任务本地恢复。 该实现会将状态复制到本地文件。 这会引入额外的写入成本并占用本地磁盘空间。 将来,我们可能还会提供一种将任务本地状态保存在内存中的实现。
- RocksDBStateBackend:支持keyed状态的任务本地恢复。对于完整的检查点,状态被复制到本地文件。这会引入额外的写入成本并占用本地磁盘空间。对于增量快照,本地状态基于 RocksDB 的原生检查点机制。这种机制也被用作创建主副本的第一步,这意味着在这种情况下,创建辅助副本不会引入额外的成本。我们只是保留本地检查点目录,而不是在上传到分布式存储后将其删除。此本地副本可以与 RocksDB 的工作目录共享活动文件(通过硬链接),因此对于活动文件,增量快照的任务本地恢复也不会消耗额外的磁盘空间。使用硬链接还意味着 RocksDB 目录必须与所有可用于存储本地状态的配置本地恢复目录位于同一物理设备上,否则建立硬链接可能会失败(参见 FLINK-10954)。目前,当 RocksDB 目录配置为位于多个物理设备上时,这也会阻止使用本地恢复。
分配调度
任务本地恢复假设在故障下保留分配的任务调度,其工作原理如下。 每个任务都会记住其先前的分配并请求完全相同的插槽以重新启动恢复。 如果此槽不可用,任务将向资源管理器请求一个新的新槽。 这样,如果任务管理器不再可用,则无法返回其先前位置的任务将不会将其他正在恢复的任务赶出其先前的插槽。 我们的理由是,只有当任务管理器不再可用时,前一个插槽才会消失,在这种情况下,某些任务无论如何都必须请求新的插槽。 使用我们的调度策略,我们让最大数量的任务有机会从它们的本地状态中恢复,并避免任务从彼此之间窃取之前的插槽的级联效应。
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/4000/