在定义数据处理管道时,Table API 和 DataStream API 同样重要。
DataStream API 在一个相对较低级别的命令式编程 API 中提供了流处理的原语(即时间、状态和数据流管理)。 Table API 抽象了许多内部结构,并提供了结构化和声明性的 API。
两种 API 都可以处理有界和无界流。
处理历史数据时需要管理有界流。 无限流发生在可能首先用历史数据初始化的实时处理场景中。
为了高效执行,这两个 API 都以优化的批处理执行模式提供处理有界流。 但是,由于批处理只是流的一种特殊情况,因此也可以在常规流执行模式下运行有界流的管道。
目前,DataStream API 和 Table API 都提供了自己的方式来启用批处理执行模式。 在不久的将来,这将进一步统一。
一个 API 中的管道可以端到端定义,而不依赖于另一个 API。 但是,出于各种原因,混合使用这两种 API 可能会很有用:
- 在 DataStream API 中实现主管道之前,使用表生态系统轻松访问目录或连接到外部系统。
- 在 DataStream API 中实现主管道之前,访问一些用于无状态数据规范化和清理的 SQL 函数。
- 如果 Table API 中不存在更底层的操作(例如自定义计时器处理),请不时切换到 DataStream API。
Flink 提供了特殊的桥接功能,使与 DataStream API 的集成尽可能顺畅。
在 DataStream 和 Table API 之间切换会增加一些转换开销。 例如,部分处理二进制数据的表运行时的内部数据结构(即 RowData)需要转换为对用户更友好的数据结构(即 Row)。 通常,可以忽略此开销,但为了完整起见,在此提及。
DataStream和Table之间的转换
Flink 在 Java 和 Scala 中提供了一个专门的 StreamTableEnvironment 用于与 DataStream API 集成。 这些环境使用其他方法扩展常规 TableEnvironment,并将 DataStream API 中使用的 StreamExecutionEnvironment 作为参数。
目前 StreamTableEnvironment 暂不支持开启批量执行模式。 尽管如此,有界流可以使用流式执行模式处理,但效率较低。 但是请注意,通用 TableEnvironment 可以在流式执行或优化的批处理执行模式下工作。
以下代码显示了如何在两个 API 之间来回切换的示例。 Table 的列名和类型自动派生自 DataStream 的 TypeInformation。 由于 DataStream API 本身不支持变更日志处理,因此代码在流到表和表到流的转换过程中假定仅附加/仅插入语义。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// create a DataStream
DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");
// interpret the insert-only DataStream as a Table
Table inputTable = tableEnv.fromDataStream(dataStream);
// register the Table object as a view and query it
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable");
// interpret the insert-only Table as a DataStream again
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();
// prints:
// +I[Alice]
// +I[Bob]
// +I[John]
fromDataStream 和 toDataStream 的完整语义可以在下面的专用部分中找到。 特别是,本节讨论了如何使用更复杂和嵌套的类型来影响模式派生。 它还涵盖了使用事件时间和水印。
根据查询的类型,在许多情况下,生成的动态表是一个管道,它不仅在将表覆盖到 DataStream 时产生仅插入更改,而且还会产生撤回和其他类型的更新。 在表到流的转换过程中,这可能会导致类似于以下的异常
Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].
在这种情况下,需要再次修改查询或切换到 toChangelogStream。
以下示例显示了如何转换更新表。 每个结果行都表示更改日志中的一个条目,该条目带有一个更改标志,可以通过对其调用 row.getKind() 进行查询。 在该示例中,Alice 的第二个分数在 (-U) 更改之前和 (+U) 更改之后创建更新。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// create a DataStream
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 12),
Row.of("Bob", 10),
Row.of("Alice", 100));
// interpret the insert-only DataStream as a Table
Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
// register the Table object as a view and query it
// the query contains an aggregation that produces updates
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery(
"SELECT name, SUM(score) FROM InputTable GROUP BY name");
// interpret the updating Table as a changelog DataStream
DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();
// prints:
// +I[Alice, 12]
// +I[Bob, 10]
// -U[Alice, 12]
// +U[Alice, 112]
fromChangelogStream 和 toChangelogStream 的完整语义可以在下面的专用部分中找到。 特别是,本节讨论了如何使用更复杂和嵌套的类型来影响模式派生。 它涵盖了使用事件时间和水印。 它讨论了如何为输入和输出流声明主键和更改日志模式。
依赖与导入
将 Table API 与 DataStream API 结合的项目需要添加以下桥接模块之一。 它们包括对 flink-table-api-java 或 flink-table-api-scala 的传递依赖以及相应的特定于语言的 DataStream API 模块。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.13.5</version>
<scope>provided</scope>
</dependency>
配置
TableEnvironment 将采用来自传递的 StreamExecutionEnvironment 的所有配置选项。 但是,不能保证对 StreamExecutionEnvironment 的配置的进一步更改会在 StreamTableEnvironment 实例化后传播到它。 此外,不支持将选项从 Table API 反向传播到 DataStream API。
我们建议在切换到 Table API 之前尽早在 DataStream API 中设置所有配置选项。
import java.time.ZoneId;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
// create Java DataStream API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// set various configuration early
env.setMaxParallelism(256);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// then switch to Java Table API
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// set configuration early
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));
// start defining your pipelines in both APIs...
执行行为
这两个 API 都提供了执行管道的方法。 换句话说:如果需要,他们会编译一个作业图,该作业图将提交到集群并触发执行。 结果将流式传输到声明的接收器。
通常,这两个 API 都使用方法名称中的术语执行来标记此类行为。 但是,Table API 和 DataStream API 的执行行为略有不同。
DataStream API
DataStream API 的 StreamExecutionEnvironment 充当构建器模式来构建复杂的管道。 管道可能会分成多个分支,这些分支可能会或可能不会以接收器结束。
必须至少定义一个接收器。 否则,将引发以下异常:
java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
StreamExecutionEnvironment.execute() 提交整个构建的管道并随后清除构建器。 换句话说:不再声明源和接收器,并且可以将新管道添加到构建器中。 因此,每个 DataStream 程序通常以调用 StreamExecutionEnvironment.execute() 结束。 或者,DataStream.executeAndCollect() 隐式定义了一个接收器,用于将结果流式传输到本地客户端,并且只执行当前分支。
Table API
在 Table API 中,仅在 StatementSet 中支持分支管道,其中每个分支都必须声明一个最终接收器。 TableEnvironment 和 StreamTableEnvironment 都不提供专用的通用 execute() 方法。 相反,它们提供了提交单个源到接收器管道或语句集的方法:
// execute with explicit sink
tableEnv.from("InputTable").executeInsert("OutputTable")
tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
tableEnv.createStatementSet()
.addInsert("OutputTable", tableEnv.from("InputTable"))
.addInsert("OutputTable2", tableEnv.from("InputTable"))
.execute()
tableEnv.createStatementSet()
.addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable")
.addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable")
.execute()
// execute with implicit local sink
tableEnv.from("InputTable").execute().print()
tableEnv.executeSql("SELECT * FROM InputTable").print()
为了结合这两种执行行为,对 StreamTableEnvironment.toDataStream 或 StreamTableEnvironment.toChangelogStream 的每次调用都将具体化(即编译)表 API 子管道并将其插入到 DataStream API 管道构建器中。 这意味着之后必须调用 StreamExecutionEnvironment.execute() 或 DataStream.executeAndCollect。 Table API 中的执行不会触发这些“外部部分”。
// (1)
// adds a branch with a printing sink to the StreamExecutionEnvironment
tableEnv.toDataStream(table).print()
// (2)
// executes a Table API end-to-end pipeline as a Flink job and prints locally,
// thus (1) has still not been executed
table.execute().print()
// executes the DataStream API pipeline with the sink defined in (1) as a
// Flink job, (2) was already running before
env.execute()
处理(仅插入)流
StreamTableEnvironment 提供以下方法来转换和转换为 DataStream API:
- fromDataStream(DataStream):将仅插入更改和任意类型的流解释为表。默认情况下不传播事件时间和水印。
- fromDataStream(DataStream, Schema):将仅插入更改和任意类型的流解释为表。可选模式允许丰富列数据类型并添加时间属性、水印策略、其他计算列或主键。
- createTemporaryView(String, DataStream):在一个名称下注册流,以便在 SQL 中访问它。它是 createTemporaryView(String, fromDataStream(DataStream)) 的快捷方式。
- createTemporaryView(String, DataStream, Schema):在一个名称下注册流,以便在 SQL 中访问它。它是 createTemporaryView(String, fromDataStream(DataStream, Schema)) 的快捷方式。
- toDataStream(DataStream):将表转换为只插入更改的流。默认流记录类型是 org.apache.flink.types.Row。单个行时间属性列被写回到 DataStream API 的记录中。水印也被传播。
- toDataStream(DataStream, AbstractDataType):将表转换为只插入更改的流。此方法接受一种数据类型来表达所需的流记录类型。规划器可能会插入隐式强制转换和重新排序列以将列映射到(可能是嵌套的)数据类型的字段。
- toDataStream(DataStream, Class):toDataStream(DataStream, DataTypes.of(Class)) 的快捷方式,可以快速反射地创建所需的数据类型。
从 Table API 的角度来看,与 DataStream API 之间的转换类似于读取或写入已使用 SQL 中的 CREATE TABLE DDL 定义的虚拟表连接器。
virtual CREATE TABLE name (schema) WITH (options) 语句中的 schema 部分可以从 DataStream 的类型信息中自动派生、丰富或完全使用 org.apache.flink.table.api.Schema 手动定义。
虚拟 DataStream 表连接器为每一行公开以下元数据:
Key | Data Type | Description | R/W |
---|---|---|---|
rowtime |
TIMESTAMP_LTZ(3) NOT NULL |
Stream record’s timestamp. | R/W |
虚拟 DataStream 表源实现 SupportsSourceWatermark,因此允许调用 SOURCE_WATERMARK() 内置函数作为水印策略,以采用来自 DataStream API 的水印。
fromDataStream
例子
下面的代码展示了如何将 fromDataStream 用于不同的场景。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import java.time.Instant;
// some example POJO
public static class User {
public String name;
public Integer score;
public Instant event_time;
// default constructor for DataStream API
public User() {}
// fully assigning constructor for Table API
public User(String name, Integer score, Instant event_time) {
this.name = name;
this.score = score;
this.event_time = event_time;
}
}
// create a DataStream
DataStream<User> dataStream =
env.fromElements(
new User("Alice", 4, Instant.ofEpochMilli(1000)),
new User("Bob", 6, Instant.ofEpochMilli(1001)),
new User("Alice", 10, Instant.ofEpochMilli(1002)));
// === EXAMPLE 1 ===
// derive all physical columns automatically
Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9)
// )
// === EXAMPLE 2 ===
// derive all physical columns automatically
// but add computed columns (in this case for creating a proctime attribute column)
Table table = tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.build());
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT NOT NULL,
// `event_time` TIMESTAMP_LTZ(9),
// `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
//)
// === EXAMPLE 3 ===
// derive all physical columns automatically
// but add computed columns (in this case for creating a rowtime attribute column)
// and a custom watermark strategy
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
.build());
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
// )
// === EXAMPLE 4 ===
// derive all physical columns automatically
// but access the stream record's timestamp for creating a rowtime attribute column
// also rely on the watermarks generated in the DataStream API
// we assume that a watermark strategy has been defined for `dataStream` before
// (not part of this example)
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
// )
// === EXAMPLE 5 ===
// define physical columns manually
// in this example,
// - we can reduce the default precision of timestamps from 9 to 3
// - we also project the columns and put `event_time` to the beginning
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.column("event_time", "TIMESTAMP_LTZ(3)")
.column("name", "STRING")
.column("score", "INT")
.watermark("event_time", "SOURCE_WATERMARK()")
.build());
table.printSchema();
// prints:
// (
// `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
// `name` VARCHAR(200),
// `score` INT
// )
// note: the watermark strategy is not shown due to the inserted column reordering projection
示例 1 说明了一个不需要基于时间的操作的简单用例。
示例 4 是最常见的用例,其中基于时间的操作(例如窗口或间隔连接)应该是管道的一部分。 示例 2 是这些基于时间的操作应该在处理时间内工作的最常见用例。
示例 5 完全依赖于用户的声明。 这对于将 DataStream API 中的泛型类型(在 Table API 中为 RAW)替换为适当的数据类型很有用。
由于 DataType 比 TypeInformation 更丰富,我们可以轻松启用不可变 POJO 和其他复杂的数据结构。 以下 Java 示例显示了可能的情况。 另请查看 DataStream API 的 Data Types & Serialization 页面以获取有关那里支持的类型的更多信息。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
// the DataStream API does not support immutable POJOs yet,
// the class will result in a generic type that is a RAW type in Table API by default
public static class User {
public final String name;
public final Integer score;
public User(String name, Integer score) {
this.name = name;
this.score = score;
}
}
// create a DataStream
DataStream<User> dataStream = env.fromElements(
new User("Alice", 4),
new User("Bob", 6),
new User("Alice", 10));
// since fields of a RAW type cannot be accessed, every stream record is treated as an atomic type
// leading to a table with a single column `f0`
Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
// prints:
// (
// `f0` RAW('User', '...')
// )
// instead, declare a more useful data type for columns using the Table API's type system
// in a custom schema and rename the columns in a following `as` projection
Table table = tableEnv
.fromDataStream(
dataStream,
Schema.newBuilder()
.column("f0", DataTypes.of(User.class))
.build())
.as("user");
table.printSchema();
// prints:
// (
// `user` *User<`name` STRING,`score` INT>*
// )
// data types can be extracted reflectively as above or explicitly defined
Table table3 = tableEnv
.fromDataStream(
dataStream,
Schema.newBuilder()
.column(
"f0",
DataTypes.STRUCTURED(
User.class,
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT())))
.build())
.as("user");
table.printSchema();
// prints:
// (
// `user` *User<`name` STRING,`score` INT>*
// )
createTemporaryView
示例
DataStream 可以直接注册为视图(可能通过模式丰富)。
从 DataStream 创建的视图只能注册为临时视图。 由于它们的内联/匿名性质,无法将它们注册到永久目录中。
下面的代码展示了如何在不同的场景下使用 createTemporaryView。
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
// create some DataStream
DataStream<Tuple2<Long, String>> dataStream = env.fromElements(
Tuple2.of(12L, "Alice"),
Tuple2.of(0L, "Bob"));
// === EXAMPLE 1 ===
// register the DataStream as view "MyView" in the current session
// all columns are derived automatically
tableEnv.createTemporaryView("MyView", dataStream);
tableEnv.from("MyView").printSchema();
// prints:
// (
// `f0` BIGINT NOT NULL,
// `f1` STRING
// )
// === EXAMPLE 2 ===
// register the DataStream as view "MyView" in the current session,
// provide a schema to adjust the columns similar to `fromDataStream`
// in this example, the derived NOT NULL information has been removed
tableEnv.createTemporaryView(
"MyView",
dataStream,
Schema.newBuilder()
.column("f0", "BIGINT")
.column("f1", "STRING")
.build());
tableEnv.from("MyView").printSchema();
// prints:
// (
// `f0` BIGINT,
// `f1` STRING
// )
// === EXAMPLE 3 ===
// use the Table API before creating the view if it is only about renaming columns
tableEnv.createTemporaryView(
"MyView",
tableEnv.fromDataStream(dataStream).as("id", "name"));
tableEnv.from("MyView").printSchema();
// prints:
// (
// `id` BIGINT NOT NULL,
// `name` STRING
// )
toDataStream
示例
下面的代码展示了如何在不同的场景中使用 toDataStream。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import java.time.Instant;
// POJO with mutable fields
// since no fully assigning constructor is defined, the field order
// is alphabetical [event_time, name, score]
public static class User {
public String name;
public Integer score;
public Instant event_time;
}
tableEnv.executeSql(
"CREATE TABLE GeneratedTable "
+ "("
+ " name STRING,"
+ " score INT,"
+ " event_time TIMESTAMP_LTZ(3),"
+ " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
+ ")"
+ "WITH ('connector'='datagen')");
Table table = tableEnv.from("GeneratedTable");
// === EXAMPLE 1 ===
// use the default conversion to instances of Row
// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagated
DataStream<Row> dataStream = tableEnv.toDataStream(table);
// === EXAMPLE 2 ===
// a data type is extracted from class `User`,
// the planner reorders fields and inserts implicit casts where possible to convert internal
// data structures to the desired structured type
// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagated
DataStream<User> dataStream = tableEnv.toDataStream(table, User.class);
// data types can be extracted reflectively as above or explicitly defined
DataStream<User> dataStream =
tableEnv.toDataStream(
table,
DataTypes.STRUCTURED(
User.class,
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT()),
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));
请注意,toDataStream 仅支持非更新表。 通常,基于时间的操作(例如窗口、间隔连接或 MATCH_RECOGNIZE 子句)非常适合与投影和过滤器等简单操作相邻的仅插入管道。
具有产生更新的操作的管道可以使用 toChangelogStream。
处理变更流
在内部,Flink 的表运行时是一个变更日志处理器。 概念页面描述了动态表和流如何相互关联。
StreamTableEnvironment 提供以下方法来公开这些变更数据捕获 (CDC) 功能:
- fromChangelogStream(DataStream):将变更日志条目流解释为表格。流记录类型必须是 org.apache.flink.types.Row,因为它的 RowKind 标志是在运行时评估的。默认情况下不传播事件时间和水印。此方法需要一个包含各种更改的更改日志(在 org.apache.flink.types.RowKind 中枚举)作为默认的 ChangelogMode。
- fromChangelogStream(DataStream, Schema):允许为 DataStream 定义类似于 fromDataStream(DataStream, Schema) 的模式。否则语义等于 fromChangelogStream(DataStream)。
- fromChangelogStream(DataStream, Schema, ChangelogMode):完全控制如何将流解释为变更日志。传递的 ChangelogMode 有助于规划器区分仅插入、更新插入或收回行为。
- toChangelogStream(Table):fromChangelogStream(DataStream)的逆操作。它生成一个包含 org.apache.flink.types.Row 实例的流,并在运行时为每条记录设置 RowKind 标志。该方法支持各种更新表。如果输入表包含单个行时间列,它将被传播到流记录的时间戳中。水印也将被传播。
- toChangelogStream(Table, Schema):fromChangelogStream(DataStream, Schema)的逆操作。该方法可以丰富产生的列数据类型。如有必要,计划者可能会插入隐式强制转换。可以将行时间写为元数据列。
- toChangelogStream(Table, Schema, ChangelogMode):完全控制如何将表转换为变更日志流。传递的 ChangelogMode 有助于规划器区分仅插入、更新插入或收回行为。
从 Table API 的角度来看,与 DataStream API 之间的转换类似于读取或写入已使用 SQL 中的 CREATE TABLE DDL 定义的虚拟表连接器。
因为 fromChangelogStream 的行为与 fromDataStream 类似,我们建议在继续之前阅读上一节。
此虚拟连接器还支持读取和写入流记录的行时元数据。
虚拟表源实现 SupportsSourceWatermark。
fromChangelogStream
使用示例
下面的代码展示了如何将 fromChangelogStream 用于不同的场景。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
// === EXAMPLE 1 ===
// interpret the stream as a retract stream
// create a changelog DataStream
DataStream<Row> dataStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
// interpret the DataStream as a Table
Table table = tableEnv.fromChangelogStream(dataStream);
// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv
.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
.print();
// prints:
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -D | Alice | 12 |
// | +I | Alice | 100 |
// +----+--------------------------------+-------------+
// === EXAMPLE 2 ===
// interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)
// create a changelog DataStream
DataStream<Row> dataStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
// interpret the DataStream as a Table
Table table =
tableEnv.fromChangelogStream(
dataStream,
Schema.newBuilder().primaryKey("f0").build(),
ChangelogMode.upsert());
// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv
.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
.print();
// prints:
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -U | Alice | 12 |
// | +U | Alice | 100 |
// +----+--------------------------------+-------------+
示例 1 中显示的默认 ChangelogMode 对于大多数用例来说应该足够了,因为它接受各种更改。
但是,示例 2 显示了如何通过使用 upsert 模式将更新消息的数量减少 50% 来限制传入更改的种类以提高效率。 可以通过为 toChangelogStream 定义主键和 upsert 更改日志模式来减少结果消息的数量。
toChangelogStream
使用示例
下面的代码展示了如何在不同的场景下使用 toChangelogStream。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.data.StringData;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.*;
// create Table with event-time
tableEnv.executeSql(
"CREATE TABLE GeneratedTable "
+ "("
+ " name STRING,"
+ " score INT,"
+ " event_time TIMESTAMP_LTZ(3),"
+ " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
+ ")"
+ "WITH ('connector'='datagen')");
Table table = tableEnv.from("GeneratedTable");
// === EXAMPLE 1 ===
// convert to DataStream in the simplest and most general way possible (no event-time)
Table simpleTable = tableEnv
.fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12))
.as("name", "score")
.groupBy($("name"))
.select($("name"), $("score").sum());
tableEnv
.toChangelogStream(simpleTable)
.executeAndCollect()
.forEachRemaining(System.out::println);
// prints:
// +I[Bob, 12]
// +I[Alice, 12]
// -U[Alice, 12]
// +U[Alice, 14]
// === EXAMPLE 2 ===
// convert to DataStream in the simplest and most general way possible (with event-time)
DataStream<Row> dataStream = tableEnv.toChangelogStream(table);
// since `event_time` is a single time attribute in the schema, it is set as the
// stream record's timestamp by default; however, at the same time, it remains part of the Row
dataStream.process(
new ProcessFunction<Row, Void>() {
@Override
public void processElement(Row row, Context ctx, Collector<Void> out) {
// prints: [name, score, event_time]
System.out.println(row.getFieldNames(true));
// timestamp exists twice
assert ctx.timestamp() == row.<Instant>getFieldAs("event_time").toEpochMilli();
}
});
env.execute();
// === EXAMPLE 3 ===
// convert to DataStream but write out the time attribute as a metadata column which means
// it is not part of the physical schema anymore
DataStream<Row> dataStream = tableEnv.toChangelogStream(
table,
Schema.newBuilder()
.column("name", "STRING")
.column("score", "INT")
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.build());
// the stream record's timestamp is defined by the metadata; it is not part of the Row
dataStream.process(
new ProcessFunction<Row, Void>() {
@Override
public void processElement(Row row, Context ctx, Collector<Void> out) {
// prints: [name, score]
System.out.println(row.getFieldNames(true));
// timestamp exists once
System.out.println(ctx.timestamp());
}
});
env.execute();
// === EXAMPLE 4 ===
// for advanced users, it is also possible to use more internal data structures for efficiency
// note that this is only mentioned here for completeness because using internal data structures
// adds complexity and additional type handling
// however, converting a TIMESTAMP_LTZ column to `Long` or STRING to `byte[]` might be convenient,
// also structured types can be represented as `Row` if needed
DataStream<Row> dataStream = tableEnv.toChangelogStream(
table,
Schema.newBuilder()
.column(
"name",
DataTypes.STRING().bridgedTo(StringData.class))
.column(
"score",
DataTypes.INT())
.column(
"event_time",
DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class))
.build());
// leads to a stream of Row(name: StringData, score: Integer, event_time: Long)
有关示例 4 中的数据类型支持哪些转换的更多信息,请参阅 Table API 的数据类型页面。
toChangelogStream(Table).executeAndCollect() 的行为等同于调用 Table.execute().collect()。 但是, toChangelogStream(Table) 可能对测试更有用,因为它允许在 DataStream API 的后续 ProcessFunction 中访问生成的水印。
TypeInformation 和 DataType 之间的映射
DataStream API 使用 org.apache.flink.api.common.typeinfo.TypeInformation 的实例来描述在流中传输的记录类型。特别是,它定义了如何将记录从一个 DataStream 运算符序列化和反序列化到另一个。它还有助于将状态序列化为保存点和检查点。
Table API 使用自定义数据结构在内部表示记录,并向用户公开 org.apache.flink.table.types.DataType 以声明将数据结构转换为的外部格式,以便在源、接收器、UDF 或 DataStream 中使用API。
DataType 比 TypeInformation 更丰富,因为它还包含有关逻辑 SQL 类型的详细信息。因此,在转换过程中会隐式添加一些细节。
Table 的列名和类型自动派生自 DataStream 的 TypeInformation。使用 DataStream.getType() 检查是否已通过 DataStream API 的反射类型提取工具正确检测到类型信息。如果最外层记录的 TypeInformation 是 CompositeType,则在派生表的 schema 时将在第一级展平。
TypeInformation 转为 DataType
将 TypeInformation 转换为 DataType 时适用以下规则:
- TypeInformation 的所有子类都映射到逻辑类型,包括与 Flink 内置序列化器对齐的可空性。
- TupleTypeInfoBase 的子类被转换为行(对于 Row)或结构化类型(对于元组、POJO 和案例类)。
- BigDecimal 默认转换为 DECIMAL(38, 18)。
- PojoTypeInfo 字段的顺序由以所有字段作为参数的构造函数确定。 如果在转换过程中未找到,则字段顺序将按字母顺序排列。
- 无法表示为列出的 org.apache.flink.table.api.DataTypes 之一的 GenericTypeInfo 和其他 TypeInformation 将被视为黑盒 RAW 类型。 当前会话配置用于实现原始类型的序列化程序。 届时将无法访问复合嵌套字段。
- 有关完整的翻译逻辑,请参阅 TypeInfoDataTypeConverter。
使用 DataTypes.of(TypeInformation) 在自定义模式声明或 UDF 中调用上述逻辑。
DataType转为TypeInformation
表运行时将确保正确地将输出记录序列化到 DataStream API 的第一个运算符。
之后,需要考虑 DataStream API 的类型信息语义。
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/3912/