-
多云部署架构 -
业务场景 -
Helm 集群管理模式 -
Native Flink on Kubernetes -
流批一体作业管控平台 -
未来展望
一、多云部署架构
二、业务场景
-
第一,Summary 标签数据会和推荐引擎推荐出来笔记的特征数据进行关联,这个关联也是在 Flink 任务中进行的,内部称其为 FeatureJoiner 任务。接着会产出一个算法训练的样本,这个样本经过算法训练之后产出一个推荐模型,而这个模型最终会反馈到实时推荐引擎中。 -
第二,Summary 标签数据会通过 Flink 实时写到 OLAP 引擎中,比如写到 Hologres 或 Clickhouse 中。 -
最后, Summary 标签数据会通过 Flink 写入到离线 Hive 表中,提供给后续离线报表使用。
三、Helm 集群管理模式
-
第一,可以管理比较复杂的 K8s 应用,创建 Flink 集群时会创建很多 K8s 相关的资源,例如 service 或者 config map 以及 Deployment 等, Helm 可以将这些资源统一打包成一个 Helm chart,然后进行统一管理,从而不需要感知每一种资源对应的底层描述文件。 -
第二,比较方便升级和回滚,只需要执行一条简单命令就可以进行升级或者回滚。同时因为它的代码是和 Flink Client 的代码做了隔离,因此在升级过程中不需要去修改 Flink Client 的代码,实现了代码解耦。 -
第三,非常易于共享,将 Helm chart 部署在公司私有服务器上之后,已经可以同时支持多个云产品的 Flink 集群管理。
-
首先是最基础的 JobManager 和 TaskManager Deployment; -
第二部分是 ConfigMap,主要是针对 log4j 的配置和各大云厂商提供的云存储产品相关的配置; -
第三部分是 Ingress,目前主要用于 Flink web UI 使用以及访问 JobManager 当前任务状态; -
第四部分是 Nodeport Service,每启动一个 JobManager,就会在 JM 上启动一个 Nodeport Service,并与 Ingress 做绑定; -
第五部分是指磁盘资源,主要有以下两个应用场景:使用 RocksDB Backend 的时候需要去挂载高效云盘、批处理任务需要挂载磁盘做中间数据交换; -
最后一部分是 ServiceMesh,TaskManager 内部会通过 sidecar 形式去访问第三方服务,比如说 Redkv service,这些 service 的配置也是在这里面创建的。
-
第一是 K8s 资源瓶颈问题。因为每启动一个 JobManager 就会创建一个 NodePort Service,而这个 Service 会在整个集群范围内占用一个端口和一个 ClusterIP。当作业规模达到一定程度的时候,这些端口资源以及 IP 资源就会遇到性能瓶颈了。 -
第二个是 ServiceMesh 配置成本过高。上文提到 TaskManager 内部会访问第三方服务,比如说 redkv service,那么每增加一个 redkv service,就需要去修改对应的配置并完成发版,过程的成本是比较高的。 -
第三个是存在一定的资源泄露问题。所有的资源创建以及销毁都是通过执行 Helm 命令来完成的,在某些异常情况下,job 失败会导致 Helm delete 命令没有被执行,这个时候就有可能会存在资源泄露的问题。 -
第四个是镜像版本比较难以收敛。在日常的生产过程中,某些线上任务出现了问题,会临时出一个 hotfix 版本镜像并上线运行,久而久之线上就会存在很多版本镜像在运行,这对于后面的运维工作以及问题排查产生了非常大的挑战。 -
最后一个问题是 UDF 管理复杂度比较高,这是任何分布式计算平台都会遇到的一个问题。
四、Native Flink on Kubernetes
-
更短的 Failover 时间; -
可以实现资源托管,不需要手动创建 TaskManager 的 pod,也可以自动完成销毁; -
具有更加便捷的 HA。在 Flink 1.12 之前,实现 JobManager HA 还是依赖于第三方的 zookeeper。但在 Native Flink on K8s 模式下,可以依赖于原生 K8s 的 leader 选举机制来完成 JobManager 的 HA。
-
第一类是平台内置的,将平时的生产工作中经常使用到的 UDF 进行抽象归纳总结,并内置到镜像里面。镜像里有关于 UDF 的配置文件,其中有 UDF 的名称以及类型,同时指定了它对应的实现类。 -
另外一类是 User-defined UDF,在 Helm 管理模式下,针对用户自定义的 UDF 管理是比较粗放的,将用户 project 下所有 UDF 相关的 JAR 包统一加载到 classloader 下,这会导致类冲突问题。而在 Native Flink 模式下,实现了一个 create function using JAR 的语法,可以按需加载用户所需要的 UDF 对应的 JAR 包,可以极大地缓解类冲突的问题。
-
第一,兼容转化工具。这个工具会对 SQL 进行转化,保证 SQL 在 1.13 运行的语法校验不会出错。1.10 到 1.13 经历过几个大版本的变更, SQL 的定义在众多方面已经不兼容,比如在 1.10 和 1.11 的时候,Kafka connector 的取值是 0.11,到 1.13 之后,对应取值已经变成 universal,如果不做任何转化,原始 SQL 肯定在 1.13 上没有办法运行。 -
第二,兼容检测工具。这个工具的目的是为了检查 SQL 运行在 1.13 的时候能不能从一个低版本的 savepoint 去进行恢复。主要从以下几个方面去做了检查:operator ID 升级之后,名称有没有发生变化;新旧两个版本对应的 max parallelism 有没有发生变化,因为 max parallelism 发生变化的时候,在某部分场景下是没有办法从一个老的 savepoint 来恢复的。 -
第三,预编译。在 1.13 上对转换之后的 SQL 进行预编译,看编译的结果是否能够正常通过。在兼容检测工具的过程中,也发现了很多从低版本到高版本不兼容的地方,引入了新的数据类型机制,1.11 没有使用 ExternalSerializer,而 1.12 及以后使用 ExternalSerializer 进行包装;BaseRowSerializer 已经在 Flink 1.11 时候改名成了 RowDataSerializer;数据类型里面有一个 seriaVersionUID,之前它是一个随机的 long 类型的数字,而在 1.13 统一固定成了 1。上述种种不兼容会导致 1.13 没有办法直接从一个低版本的 savepoint 来恢复的。因此针对这些问题,在引擎侧做了一些改造。 -
第四,迁移工具。这个工具的目标主要有以下三点: -
首先,对用户作业的影响时间尽可能降到最低,为了达成这个目标,我们对 Native Flink on K8s 的 application mode 做了比较大的改造。原生的 application mode 是一边调度一边申请资源,为了在升级过程中降低对用户作业的影响,实现了 application mode 下可以提前申请好资源并完成 SQL 的编译 (即 JobManager 的预启动),这个过程完成之后,将旧的 job 停掉然后启动新的 job,整个过程对用户作业的影响能够控制在 30 秒以内 (中等规模任务)。 -
其次,在迁移的过程中要保证状态不丢失,因为所有迁移都是基于 savepoint 来启动的,所以这块的数据是不会有任何丢失的。 -
最后,如果在升级过程中发生了异常,可以支持异常情况下自动完成回滚。
五、流批一体作业管控平台
-
第一个因素是它运行所属的云环境; -
第二个因素是业务类型; -
第三个因素是资源池提供给流还是批任务使用。
六、未来展望
-
第一,动态资源调整。目前, Flink job 一旦提交运行,就无法在运行期间修改某个 operator 占用的资源。所以希望未来能够在 job 不进行 restart 的情况下,调整某个算子所占用的资源。 -
第二,跨云多活方案。目前公司核心 P0 作业基本都是双链路的,但都仅限于在单朵云上。希望针对这些核心任务,实现跨云双活方案,其中一个云上任务出现问题的时候,能够稳定切换到另外一朵云上。 -
第三,批任务资源调度优化。因为批任务大多是在凌晨以后开始执行,同时会调度很多任务,有的任务可能因为抢占不到资源导致无法及时运行,在任务调度执行策略上仍有可以优化的空间。
本文转载自Apache Flink,原文链接:https://mp.weixin.qq.com/s/Tb36BalDm-vqcs3HHHAqMA。