Flink内存配置指南

配置Flink进程的内存

Apache Flink 基于 JVM 的高效处理能力,依赖于其对各组件内存用量的细致掌控。 考虑到用户在 Flink 上运行的应用的多样性,尽管社区已经努力为所有配置项提供合理的默认值,仍无法满足所有情况下的需求。 为了给用户生产提供最大化的价值, Flink 允许用户在整体上以及细粒度上对集群的内存分配进行调整。

本文接下来介绍的内存配置方法适用于 1.10 及以上版本的 TaskManager 进程和 1.11 及以上版本的 JobManager 进程。 Flink 在 1.101.11 版本中对内存配置部分进行了较大幅度的改动,从早期版本升级的用户请参考升级指南

配置总内存

Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。 _Flink 总内存(Total Flink Memory)_包括 _JVM 堆内存(Heap Memory)和_堆外内存(Off-Heap Memory)。 其中堆外内存包括_直接内存(Direct Memory)和_本地内存(Native Memory)

Flink内存配置指南

配置 Flink 进程内存最简单的方法是指定以下两个配置项中的任意一个:

配置项 TaskManager 配置参数 JobManager 配置参数
Flink 总内存 taskmanager.memory.flink.size jobmanager.memory.flink.size
进程总内存 taskmanager.memory.process.size jobmanager.memory.process.size

关于本地执行,请分别参考 TaskManagerJobManager 的相关文档。

Flink 会根据默认值或其他配置参数自动调整剩余内存部分的大小。 关于各内存部分的更多细节,请分别参考 TaskManagerJobManager 的相关文档。

对于独立部署模式(Standalone Deployment),如果你希望指定由 Flink 应用本身使用的内存大小,最好选择配置 Flink 总内存。 _Flink 总内存_会进一步划分为 JVM 堆内存_和_堆外内存。 更多详情请参考如何为独立部署模式配置内存

通过配置_进程总内存_可以指定由 Flink _JVM 进程_使用的总内存大小。 对于容器化部署模式(Containerized Deployment),这相当于申请的容器(Container)大小,详情请参考如何配置容器内存KubernetesYarnMesos)。

此外,还可以通过设置 _Flink 总内存_的特定内部组成部分的方式来进行内存配置。 不同进程需要设置的内存组成部分是不一样的。 详情请分别参考 TaskManagerJobManager 的相关文档。

以上三种方式中,用户需要至少选择其中一种进行配置(本地运行除外),否则 Flink 将无法启动。 这意味着,用户需要从以下无默认值的配置参数(或参数组合)中选择一个给出明确的配置:

TaskManager: JobManager:
taskmanager.memory.flink.size jobmanager.memory.flink.size
taskmanager.memory.process.size jobmanager.memory.process.size
taskmanager.memory.task.heap.sizetaskmanager.memory.managed.size jobmanager.memory.heap.size

不建议同时设置_进程总内存_和 Flink 总内存。 这可能会造成内存配置冲突,从而导致部署失败。 额外配置其他内存部分时,同样需要注意可能产生的配置冲突。

JVM参数

Flink 进程启动时,会根据配置的和自动推导出的各内存部分大小,显式地设置以下 JVM 参数:

JVM 参数 TaskManager 取值 JobManager 取值
-Xmx-Xms 框架堆内存 + 任务堆内存 JVM 堆内存 (*)
-XX:MaxDirectMemorySize(TaskManager 始终设置,JobManager 见注释) 框架堆外内存 + 任务堆外内存(**) + 网络内存 堆外内存 (**) (***)
-XX:MaxMetaspaceSize JVM Metaspace JVM Metaspace
(*) 请记住,根据所使用的 GC 算法,你可能无法使用到全部堆内存。一些 GC 算法会为它们自身分配一定量的堆内存。这会导致堆的指标返回一个不同的最大值。
(**) 请注意,堆外内存也包括了用户代码使用的本地内存(非直接内存)。
(**) 只有在 jobmanager.memory.enable-jvm-direct-memory-limit 设置为 true 时,JobManager 才会设置 JVM 直接内存限制*。

相关内存部分的配置方法,请同时参考 TaskManagerJobManager 的详细内存模型。

受限的等比内存部分

本节介绍下列内存部分的配置方法,它们都可以通过指定在总内存中所占比例的方式进行配置,同时受限于相应的的最大/最小值范围。

  • JVM 开销:可以配置占用_进程总内存_的固定比例
  • 网络内存:可以配置占用 _Flink 总内存_的固定比例(仅针对 TaskManager)

相关内存部分的配置方法,请同时参考 TaskManagerJobManager 的详细内存模型。

这些内存部分的大小必须在相应的最大值、最小值范围内,否则 Flink 将无法启动。 最大值、最小值具有默认值,也可以通过相应的配置参数进行设置。 例如,如果仅配置下列参数:

  • 进程总内存 = 1000Mb
  • JVM 开销最小值 = 64Mb
  • JVM 开销最大值 = 128Mb
  • JVM 开销占比 = 0.1

那么 _JVM 开销_的实际大小将会是 1000Mb x 0.1 = 100Mb,在 64-128Mb 的范围内。

如果将最大值、最小值设置成相同大小,那相当于明确指定了该内存部分的大小。

如果没有明确指定内存部分的大小,Flink 会根据总内存和占比计算出该内存部分的大小。 计算得到的内存大小将受限于相应的最大值、最小值范围。 例如,如果仅配置下列参数:

  • 进程总内存 = 1000Mb
  • JVM 开销最小值 = 128Mb
  • JVM 开销最大值 = 256Mb
  • JVM 开销占比 = 0.1

那么 _JVM 开销_的实际大小将会是 128Mb,因为根据总内存和占比计算得到的内存大小 100Mb 小于最小值。

如果配置了总内存和其他内存部分的大小,那么 Flink 也有可能会忽略给定的占比。 这种情况下,受限的等比内存部分的实际大小是总内存减去其他所有内存部分后剩余的部分。 这样推导得出的内存大小必须符合最大值、最小值范围,否则 Flink 将无法启动。 例如,如果仅配置下列参数:

  • 进程总内存 = 1000Mb
  • 任务堆内存 = 100Mb(或 JobManager 的 JVM 堆内存
  • JVM 开销最小值 = 64Mb
  • JVM 开销最大值 = 256Mb
  • JVM 开销占比 = 0.1

进程总内存_中所有其他内存部分均有默认大小,包括 TaskManager 的_托管内存_默认占比或 JobManager 的默认_堆外内存。 因此,_JVM 开销_的实际大小不是根据占比算出的大小(1000Mb x 0.1 = 100Mb),而是_进程总内存_中剩余的部分。 这个剩余部分的大小必须在 64-256Mb 的范围内,否则将会启动失败。

配置TaskManager内存

Flink 的 TaskManager 负责执行用户代码。 根据实际需求为 TaskManager 配置内存将有助于减少 Flink 的资源占用,增强作业运行的稳定性。

本文接下来介绍的内存配置方法适用于 1.10 及以上版本。 Flink 在 1.10 版本中对内存配置部分进行了较大幅度的改动,从早期版本升级的用户请参考升级指南

本篇内存配置文档仅针对 TaskManager! 与 JobManager 相比,TaskManager 具有相似但更加复杂的内存模型。

配置总内存

Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。 其中,Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)、托管内存(Managed Memory)以及其他直接内存(Direct Memory)或本地内存(Native Memory)。

Flink内存配置指南

如果你是在本地运行 Flink(例如在 IDE 中)而非创建一个集群,那么本文介绍的配置并非所有都是适用的,详情请参考本地执行

其他情况下,配置 Flink 内存最简单的方法就是配置总内存。 此外,Flink 也支持更细粒度的内存配置方式

Flink 会根据默认值或其他配置参数自动调整剩余内存部分的大小。 接下来的章节将介绍关于各内存部分的更多细节。

配置堆内存和托管内存

配置总内存中所述,另一种配置 Flink 内存的方式是同时设置任务堆内存托管内存。 通过这种方式,用户可以更好地掌控用于 Flink 任务的 JVM 堆内存及 Flink 的托管内存大小。

Flink 会根据默认值或其他配置参数自动调整剩余内存部分的大小。 关于各内存部分的更多细节,请参考相关文档

提示 如果已经明确设置了任务堆内存和托管内存,建议不要再设置_进程总内存_或 Flink 总内存,否则可能会造成内存配置冲突。

任务(算子)堆内存

如果希望确保指定大小的 JVM 堆内存给用户代码使用,可以明确指定任务堆内存taskmanager.memory.task.heap.size)。 指定的内存将被包含在总的 JVM 堆空间中,专门用于 Flink 算子及用户代码的执行。

托管内存

托管内存_是由 Flink 负责分配和管理的本地(堆外)内存。 以下场景需要使用_托管内存

可以通过以下两种范式指定_托管内存_的大小:

当同时指定二者时,会优先采用指定的大小(Size)。 若二者均未指定,会根据默认占比进行计算。

请同时参考如何配置 State Backend 内存以及如何配置批处理作业内存

消费者权重

对于包含不同种类的托管内存消费者的作业,可以进一步控制托管内存如何在消费者之间分配。 通过 taskmanager.memory.managed.consumer-weights 可以为每一种类型的消费者指定一个权重,Flink 会按照权重的比例进行内存分配。 目前支持的消费者类型包括:

  • OPERATOR: 用于内置算法。
  • STATE_BACKEND: 用于流处理中的 RocksDB State Backend。
  • PYTHON:用户 Python 进程。

例如,一个流处理作业同时使用到了 RocksDB State Backend 和 Python UDF,消费者权重设置为 STATE_BACKEND:70,PYTHON:30,那么 Flink 会将 70% 的托管内存用于 RocksDB State Backend,30% 留给 Python 进程。

提示 只有作业中包含某种类型的消费者时,Flink 才会为该类型分配托管内存。 例如,一个流处理作业使用 Heap State Backend 和 Python UDF,消费者权重设置为 STATE_BACKEND:70,PYTHON:30,那么 Flink 会将全部托管内存用于 Python 进程,因为 Heap State Backend 不使用托管内存。

提示 对于未出现在消费者权重中的类型,Flink 将不会为其分配托管内存。 如果缺失的类型是作业运行所必须的,则会引发内存分配失败。 默认情况下,消费者权重中包含了所有可能的消费者类型。 上述问题仅可能出现在用户显式地配置了消费者权重的情况下。

配置对外内存(直接内存或本地内存)

用户代码中分配的堆外内存被归为任务堆外内存(Task Off-heap Memory),可以通过 taskmanager.memory.task.off-heap.size 指定。

提示 你也可以调整框架堆外内存(Framework Off-heap Memory)。 这是一个进阶配置,建议仅在确定 Flink 框架需要更多的内存时调整该配置。

Flink 将_框架堆外内存_和_任务堆外内存_都计算在 JVM 的_直接内存_限制中,请参考 JVM 参数

提示 本地内存(非直接内存)也可以被归在_框架堆外内存_或_任务堆外内存_中,在这种情况下 JVM 的_直接内存_限制可能会高于实际需求。

提示 _网络内存(Network Memory)_同样被计算在 JVM _直接内存_中。 Flink 会负责管理网络内存,保证其实际用量不会超过配置大小。 因此,调整_网络内存_的大小不会对其他堆外内存有实质上的影响。

请参考内存模型详解

内存模型详解

Flink内存配置指南

如上图所示,下表中列出了 Flink TaskManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。

组成部分 配置参数 描述
框架堆内存(Framework Heap Memory) taskmanager.memory.framework.heap.size 用于 Flink 框架的 JVM 堆内存(进阶配置)。
任务堆内存(Task Heap Memory) taskmanager.memory.task.heap.size 用于 Flink 应用的算子及用户代码的 JVM 堆内存。
托管内存(Managed memory) taskmanager.memory.managed.sizetaskmanager.memory.managed.fraction 由 Flink 管理的用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。
框架堆外内存(Framework Off-heap Memory) taskmanager.memory.framework.off-heap.size 用于 Flink 框架的堆外内存(直接内存或本地内存)(进阶配置)。
任务堆外内存(Task Off-heap Memory) taskmanager.memory.task.off-heap.size 用于 Flink 应用的算子及用户代码的堆外内存(直接内存或本地内存)
网络内存(Network Memory) taskmanager.memory.network.mintaskmanager.memory.network.maxtaskmanager.memory.network.fraction 用于任务之间数据传输的直接内存(例如网络传输缓冲)。该内存部分为基于 Flink 总内存受限的等比内存部分
JVM Metaspace taskmanager.memory.jvm-metaspace.size Flink JVM 进程的 Metaspace。
JVM 开销 taskmanager.memory.jvm-overhead.mintaskmanager.memory.jvm-overhead.maxtaskmanager.memory.jvm-overhead.fraction 用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存受限的等比内存部分

我们可以看到,有些内存部分的大小可以直接通过一个配置参数进行设置,有些则需要根据多个参数进行调整。

框架内存

通常情况下,不建议对_框架堆内存_和_框架堆外内存_进行调整。 除非你非常肯定 Flink 的内部数据结构及操作需要更多的内存。 这可能与具体的部署环境及作业结构有关,例如非常高的并发度。 此外,Flink 的部分依赖(例如 Hadoop)在某些特定的情况下也可能会需要更多的直接内存或本地内存。

提示 不管是堆内存还是堆外内存,Flink 中的框架内存和任务内存之间目前是没有隔离的。 对框架和任务内存的区分,主要是为了在后续版本中做进一步优化。

本地执行

如果你是将 Flink 作为一个单独的 Java 程序运行在你的电脑本地而非创建一个集群(例如在 IDE 中),那么只有下列配置会生效,其他配置参数则不会起到任何效果:

组成部分 配置参数 本地执行时的默认值
任务堆内存 taskmanager.memory.task.heap.size 无穷大
任务堆外内存 taskmanager.memory.task.off-heap.size 无穷大
托管内存 taskmanager.memory.managed.size 128Mb
网络内存 taskmanager.memory.network.mintaskmanager.memory.network.max 64Mb

本地执行模式下,上面列出的所有内存部分均可以但不是必须进行配置。 如果未配置,则会采用默认值。 其中,任务堆内存和_任务堆外内存_的默认值无穷大(Long.MAX_VALUE 字节),以及托管内存的默认值 128Mb 均只针对本地执行模式。

提示 这种情况下,任务堆内存的大小与实际的堆空间大小无关。 该配置参数可能与后续版本中的进一步优化相关。 本地执行模式下,JVM 堆空间的实际大小不受 Flink 掌控,而是取决于本地执行进程是如何启动的。 如果希望控制 JVM 的堆空间大小,可以在启动进程时明确地指定相关的 JVM 参数,即 -Xmx-Xms

配置JobManager内存

JobManager 是 Flink 集群的控制单元。 它由三种不同的组件组成:ResourceManager、Dispatcher 和每个正在运行作业的 JobMaster。 本篇文档将介绍 JobManager 内存在整体上以及细粒度上的配置方法。

本文接下来介绍的内存配置方法适用于 1.11 及以上版本。 Flink 在 1.11 版本中对内存配置部分进行了较大幅度的改动,从早期版本升级的用户请参考升级指南

提示 本篇内存配置文档仅针对 JobManager! 与 TaskManager 相比,JobManager 具有相似但更加简单的内存模型。

内置总内存

配置 JobManager 内存最简单的方法就是进程的配置总内存本地执行模式下不需要为 JobManager 进行内存配置,配置参数将不会生效。

详细配置

Flink内存配置指南

如上图所示,下表中列出了 Flink JobManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。

组成部分 配置参数 描述
JVM 堆内存 jobmanager.memory.heap.size JobManager 的 JVM 堆内存
堆外内存 jobmanager.memory.off-heap.size JobManager 的堆外内存(直接内存或本地内存)
JVM Metaspace jobmanager.memory.jvm-metaspace.size Flink JVM 进程的 Metaspace。
JVM 开销 jobmanager.memory.jvm-overhead.minjobmanager.memory.jvm-overhead.maxjobmanager.memory.jvm-overhead.fraction 用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存受限的等比内存部分

配置JVM堆内存

配置总内存中所述,另一种配置 JobManager 内存的方式是明确指定 _JVM 堆内存_的大小(jobmanager.memory.heap.size)。 通过这种方式,用户可以更好地掌控用于以下用途的 _JVM 堆内存_大小。

  • Flink 框架
  • 在作业提交时(例如一些特殊的批处理 Source)及 Checkpoint 完成的回调函数中执行的用户代码

Flink 需要多少 JVM 堆内存,很大程度上取决于运行的作业数量、作业的结构及上述用户代码的需求。

提示 如果已经明确设置了 JVM 堆内存,建议不要再设置_进程总内存_或 Flink 总内存,否则可能会造成内存配置冲突。

在启动 JobManager 进程时,Flink 启动脚本及客户端通过设置 JVM 参数 -Xms-Xmx 来管理 JVM 堆空间的大小。 请参考 JVM 参数

配置堆外内存

_堆外内存_包括 JVM 直接内存本地内存。 可以通过配置参数 jobmanager.memory.enable-jvm-direct-memory-limit 设置是否启用 JVM 直接内存限制。 如果该配置项设置为 true,Flink 会根据配置的_堆外内存_大小设置 JVM 参数 -XX:MaxDirectMemorySize。 请参考 JVM 参数

可以通过配置参数 jobmanager.memory.off-heap.size 设置堆外内存的大小。 如果遇到 JobManager 进程抛出 “OutOfMemoryError: Direct buffer memory” 的异常,可以尝试调大这项配置。 请参考常见问题

以下情况可能用到堆外内存:

  • Flink 框架依赖(例如 Akka 的网络通信)
  • 在作业提交时(例如一些特殊的批处理 Source)及 Checkpoint 完成的回调函数中执行的用户代码

提示 如果同时配置了 Flink 总内存JVM 堆内存,且没有配置堆外内存,那么_堆外内存_的大小将会是 Flink 总内存减去JVM 堆内存。 这种情况下,_堆外内存_的默认大小将不会生效。

本地执行

如果你是在本地运行 Flink(例如在 IDE 中)而非创建一个集群,那么 JobManager 的内存配置将不会生效。

0 0 投票数
文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://lrting.top/backend/7539/

(0)
上一篇 2022-07-09 13:37
下一篇 2022-07-10 13:37

相关推荐

订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x