摘要:本文整理自字节跳动基础架构工程师,Apache Flink Contributor 马越在 Flink Forward Asia 2021 平台建设专场的演讲。主要内容包括:
背景
State Processor API 介绍
StateMeta Snapshot 机制
State as Database
使用 Flink Batch SQL 查询任务状态
未来展望
01
背景
众所周知,Flink 中的 State 保存了算子计算过程的中间结果。当任务出现异常时,可以通过查询任务快照中的 State 获取有效线索。
但目前对于 Flink SQL 任务来说,当我们想要查询作业 State 时,通常会因为无法获知 State 的定义方式和具体类型等信息,而导致查询 State 的成本过高。
为了解决这个问题,字节跳动流式计算团队在内部提出了 State Query on Flink SQL 的解决方案——用户通过写 SQL 的方式就可以简单地查询 State。本文将主要介绍字节跳动在 Flink 状态查询这方面所进行的相关工作。
02
State Processor API 介绍
提到状态查询,我们自然会联想到 Flink 在 1.9 版本提出的特性 — State Processor API。使用 State Processor API,我们可以将作业产生的 Savepoint 转换成 DataSet,然后使用 DataSet API 完成对 State 的查询、修改和初始化等操作。
有些同学可能会问,既然社区已经提供了查询 State 的功能,我们为什么还要去做同样的工作呢?主要是因为我们在使用 State Processor API 的过程中发现一些问题:
每次查询 State 我们都需要独立开发一个 Flink Batch 任务,对用户来说具有一定的开发成本;
实现 ReaderFunction 的时候需要比较清晰地了解任务状态的定义方式,包括 State 的名称、类型以及 State Descriptor 等信息,对用户来说使用门槛高较高;
使用 State Processor API 时,只能查询单个算子状态,无法同时查询多个算子的状态;
无法直接查询任务状态的元信息,比如查询任务使用了哪些状态,或者查询某个状态的类型。
总体来说,我们的目标有两个,一是降低用户的使用成本;二是增强状态查询的功能。我们希望用户在查询 State 时能用最简单的方式;同时也不需要知道任何信息。此外,我们还希望用户能同时查询多个算子的 State ,也可以直接查询作业使用了哪些 State,每个 State 的类型是什么。因此,我们提出了 State Query on Flink SQL 的解决方案。简单来说是把 State 当成数据库一样,让用户通过写 SQL 的方式就可以很简单地查询 State。在这个方案中,我们需要解决两个问题:
如何对用户屏蔽 State 的信息:参考 State Processor API 我们可以知道,查询 State 需要提供非常多的信息,比如 Savepoint 路径、 StateBacked 类型、算子 id 、State Descriptor 等等。通过 SQL 语句显然难以完整地表述这些复杂的信息,那么查询状态到底需要哪些内容,我们又如何对用户屏蔽 State 里复杂的细节呢?这是我们面对的第一个难点。
之后在状态查询时就只需解析 Savepoint 中的 stateInfo 文件,而不再需要用户通过代码去输入这些 State 的元信息。通过这样的方式可以很大程度地降低用户查询状态的成本。
04
State as Database
接下来我们来回答第二个问题,我们如何用 SQL 来表达 State。其实社区在设计 State Processor API 的时候就提出了一些解决思路,也就是 State As Database。在传统的数据库中,通常用 Catalog、Database、Table 这个三个元素来表示一个 Table,其实我们也可以将用样的逻辑到映射到 Flink State 上。我们可以把 Flink 的 State 当作一种特殊的数据源,作业每次产生的 Savepoint 都当作一个独立 DB 。在这个 DB 中,我们将 State 元信息、State 的明细数据,都抽象成不同的 Table 暴露给用户,用户直接查询这些 Table 就可以获取任务的状态信息。首先我们来看如何把 State 表示为 Table。我们都知道在 Flink 中,常用的 State 有两种类型,分别是 KeyedState 和 OperatorState。
对于 OperatorState 来说,它只有 Value 这一个属性,用来表示这个 State 具体的值。因此我们可以把 OperatorState 表示为只包含一个 Value 字段的表结构。
当我们抽象出了单个 State 之后,想要表示多个 State 就比较容易了。可以看到在上图的例子中,这个算子包含 3 个 State,分别是两个 KeyedState 和一个 OperatorState,我们只需要将这些 Table 简单的 union 起来,再通过 state_name 字段去区分不同的 State,就可以表示这个算子中所有的 State。最后还有一个问题,我们如何知道一个任务到底用了哪些 State 或者这些 State 的具体类型呢?为了解决这个问题,我们定义了一种特殊表 — StateMeta ,用来表示一个 Flink 任务中所有 State 的元信息。StateMeta 中包含一个任务中每个 State 的名称、State 所在的算子 ID 、算子名称 、Key 的类型和 Value 的类型等等,这样用户直接查询 StateMeta 这个表就能获取任务中所有状态的元信息。