Flink DataStream API与Data Table API/SQL集成

在定义数据处理管道时,Table API 和 DataStream API 同样重要。

DataStream API 在一个相对较低级别的命令式编程 API 中提供了流处理的原语(即时间、状态和数据流管理)。 Table API 抽象了许多内部结构,并提供了结构化和声明性的 API。

两种 API 都可以处理有界和无界流。

处理历史数据时需要管理有界流。 无限流发生在可能首先用历史数据初始化的实时处理场景中。

为了高效执行,这两个 API 都以优化的批处理执行模式提供处理有界流。 但是,由于批处理只是流的一种特殊情况,因此也可以在常规流执行模式下运行有界流的管道。

目前,DataStream API 和 Table API 都提供了自己的方式来启用批处理执行模式。 在不久的将来,这将进一步统一。

一个 API 中的管道可以端到端定义,而不依赖于另一个 API。 但是,出于各种原因,混合使用这两种 API 可能会很有用:

  • 在 DataStream API 中实现主管道之前,使用表生态系统轻松访问目录或连接到外部系统。
  • 在 DataStream API 中实现主管道之前,访问一些用于无状态数据规范化和清理的 SQL 函数。
  • 如果 Table API 中不存在更底层的操作(例如自定义计时器处理),请不时切换到 DataStream API。

Flink 提供了特殊的桥接功能,使与 DataStream API 的集成尽可能顺畅。

在 DataStream 和 Table API 之间切换会增加一些转换开销。 例如,部分处理二进制数据的表运行时的内部数据结构(即 RowData)需要转换为对用户更友好的数据结构(即 Row)。 通常,可以忽略此开销,但为了完整起见,在此提及。

DataStream和Table之间的转换

Flink 在 Java 和 Scala 中提供了一个专门的 StreamTableEnvironment 用于与 DataStream API 集成。 这些环境使用其他方法扩展常规 TableEnvironment,并将 DataStream API 中使用的 StreamExecutionEnvironment 作为参数。

目前 StreamTableEnvironment 暂不支持开启批量执行模式。 尽管如此,有界流可以使用流式执行模式处理,但效率较低。 但是请注意,通用 TableEnvironment 可以在流式执行或优化的批处理执行模式下工作。

以下代码显示了如何在两个 API 之间来回切换的示例。 Table 的列名和类型自动派生自 DataStream 的 TypeInformation。 由于 DataStream API 本身不支持变更日志处理,因此代码在流到表和表到流的转换过程中假定仅附加/仅插入语义。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// create a DataStream
DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");

// interpret the insert-only DataStream as a Table
Table inputTable = tableEnv.fromDataStream(dataStream);

// register the Table object as a view and query it
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable");

// interpret the insert-only Table as a DataStream again
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);

// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();

// prints:
// +I[Alice]
// +I[Bob]
// +I[John]

fromDataStream 和 toDataStream 的完整语义可以在下面的专用部分中找到。 特别是,本节讨论了如何使用更复杂和嵌套的类型来影响模式派生。 它还涵盖了使用事件时间和水印。

根据查询的类型,在许多情况下,生成的动态表是一个管道,它不仅在将表覆盖到 DataStream 时产生仅插入更改,而且还会产生撤回和其他类型的更新。 在表到流的转换过程中,这可能会导致类似于以下的异常

Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].

在这种情况下,需要再次修改查询或切换到 toChangelogStream。

以下示例显示了如何转换更新表。 每个结果行都表示更改日志中的一个条目,该条目带有一个更改标志,可以通过对其调用 row.getKind() 进行查询。 在该示例中,Alice 的第二个分数在 (-U) 更改之前和 (+U) 更改之后创建更新。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// create a DataStream
DataStream<Row> dataStream = env.fromElements(
    Row.of("Alice", 12),
    Row.of("Bob", 10),
    Row.of("Alice", 100));

// interpret the insert-only DataStream as a Table
Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");

// register the Table object as a view and query it
// the query contains an aggregation that produces updates
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery(
    "SELECT name, SUM(score) FROM InputTable GROUP BY name");

// interpret the updating Table as a changelog DataStream
DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);

// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();

// prints:
// +I[Alice, 12]
// +I[Bob, 10]
// -U[Alice, 12]
// +U[Alice, 112]

fromChangelogStream 和 toChangelogStream 的完整语义可以在下面的专用部分中找到。 特别是,本节讨论了如何使用更复杂和嵌套的类型来影响模式派生。 它涵盖了使用事件时间和水印。 它讨论了如何为输入和输出流声明主键和更改日志模式。

依赖与导入

将 Table API 与 DataStream API 结合的项目需要添加以下桥接模块之一。 它们包括对 flink-table-api-java 或 flink-table-api-scala 的传递依赖以及相应的特定于语言的 DataStream API 模块。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.13.5</version>
  <scope>provided</scope>
</dependency>

配置

TableEnvironment 将采用来自传递的 StreamExecutionEnvironment 的所有配置选项。 但是,不能保证对 StreamExecutionEnvironment 的配置的进一步更改会在 StreamTableEnvironment 实例化后传播到它。 此外,不支持将选项从 Table API 反向传播到 DataStream API。

我们建议在切换到 Table API 之前尽早在 DataStream API 中设置所有配置选项。

import java.time.ZoneId;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

// create Java DataStream API

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// set various configuration early

env.setMaxParallelism(256);

env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// then switch to Java Table API

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// set configuration early

tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));

// start defining your pipelines in both APIs...

执行行为

这两个 API 都提供了执行管道的方法。 换句话说:如果需要,他们会编译一个作业图,该作业图将提交到集群并触发执行。 结果将流式传输到声明的接收器。

通常,这两个 API 都使用方法名称中的术语执行来标记此类行为。 但是,Table API 和 DataStream API 的执行行为略有不同。

DataStream API

DataStream API 的 StreamExecutionEnvironment 充当构建器模式来构建复杂的管道。 管道可能会分成多个分支,这些分支可能会或可能不会以接收器结束。

必须至少定义一个接收器。 否则,将引发以下异常:

java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

StreamExecutionEnvironment.execute() 提交整个构建的管道并随后清除构建器。 换句话说:不再声明源和接收器,并且可以将新管道添加到构建器中。 因此,每个 DataStream 程序通常以调用 StreamExecutionEnvironment.execute() 结束。 或者,DataStream.executeAndCollect() 隐式定义了一个接收器,用于将结果流式传输到本地客户端,并且只执行当前分支。

Table API

在 Table API 中,仅在 StatementSet 中支持分支管道,其中每个分支都必须声明一个最终接收器。 TableEnvironment 和 StreamTableEnvironment 都不提供专用的通用 execute() 方法。 相反,它们提供了提交单个源到接收器管道或语句集的方法:

// execute with explicit sink
tableEnv.from("InputTable").executeInsert("OutputTable")

tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")

tableEnv.createStatementSet()
    .addInsert("OutputTable", tableEnv.from("InputTable"))
    .addInsert("OutputTable2", tableEnv.from("InputTable"))
    .execute()

tableEnv.createStatementSet()
    .addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable")
    .addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable")
    .execute()

// execute with implicit local sink

tableEnv.from("InputTable").execute().print()

tableEnv.executeSql("SELECT * FROM InputTable").print()

为了结合这两种执行行为,对 StreamTableEnvironment.toDataStream 或 StreamTableEnvironment.toChangelogStream 的每次调用都将具体化(即编译)表 API 子管道并将其插入到 DataStream API 管道构建器中。 这意味着之后必须调用 StreamExecutionEnvironment.execute() 或 DataStream.executeAndCollect。 Table API 中的执行不会触发这些“外部部分”。

// (1)

// adds a branch with a printing sink to the StreamExecutionEnvironment
tableEnv.toDataStream(table).print()

// (2)

// executes a Table API end-to-end pipeline as a Flink job and prints locally,
// thus (1) has still not been executed
table.execute().print()

// executes the DataStream API pipeline with the sink defined in (1) as a
// Flink job, (2) was already running before
env.execute()

处理(仅插入)流

StreamTableEnvironment 提供以下方法来转换和转换为 DataStream API:

  • fromDataStream(DataStream):将仅插入更改和任意类型的流解释为表。默认情况下不传播事件时间和水印。
  • fromDataStream(DataStream, Schema):将仅插入更改和任意类型的流解释为表。可选模式允许丰富列数据类型并添加时间属性、水印策略、其他计算列或主键。
  • createTemporaryView(String, DataStream):在一个名称下注册流,以便在 SQL 中访问它。它是 createTemporaryView(String, fromDataStream(DataStream)) 的快捷方式。
  • createTemporaryView(String, DataStream, Schema):在一个名称下注册流,以便在 SQL 中访问它。它是 createTemporaryView(String, fromDataStream(DataStream, Schema)) 的快捷方式。
  • toDataStream(DataStream):将表转换为只插入更改的流。默认流记录类型是 org.apache.flink.types.Row。单个行时间属性列被写回到 DataStream API 的记录中。水印也被传播。
  • toDataStream(DataStream, AbstractDataType):将表转换为只插入更改的流。此方法接受一种数据类型来表达所需的流记录类型。规划器可能会插入隐式强制转换和重新排序列以将列映射到(可能是嵌套的)数据类型的字段。
  • toDataStream(DataStream, Class):toDataStream(DataStream, DataTypes.of(Class)) 的快捷方式,可以快速反射地创建所需的数据类型。

从 Table API 的角度来看,与 DataStream API 之间的转换类似于读取或写入已使用 SQL 中的 CREATE TABLE DDL 定义的虚拟表连接器。

virtual CREATE TABLE name (schema) WITH (options) 语句中的 schema 部分可以从 DataStream 的类型信息中自动派生、丰富或完全使用 org.apache.flink.table.api.Schema 手动定义。

虚拟 DataStream 表连接器为每一行公开以下元数据:

Key Data Type Description R/W
rowtime TIMESTAMP_LTZ(3) NOT NULL Stream record’s timestamp. R/W

虚拟 DataStream 表源实现 SupportsSourceWatermark,因此允许调用 SOURCE_WATERMARK() 内置函数作为水印策略,以采用来自 DataStream API 的水印。

fromDataStream 例子

下面的代码展示了如何将 fromDataStream 用于不同的场景。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import java.time.Instant;

// some example POJO
public static class User {
  public String name;

  public Integer score;

  public Instant event_time;

  // default constructor for DataStream API
  public User() {}

  // fully assigning constructor for Table API
  public User(String name, Integer score, Instant event_time) {
    this.name = name;
    this.score = score;
    this.event_time = event_time;
  }
}

// create a DataStream
DataStream<User> dataStream =
    env.fromElements(
        new User("Alice", 4, Instant.ofEpochMilli(1000)),
        new User("Bob", 6, Instant.ofEpochMilli(1001)),
        new User("Alice", 10, Instant.ofEpochMilli(1002)));

// === EXAMPLE 1 ===

// derive all physical columns automatically

Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT,
//  `event_time` TIMESTAMP_LTZ(9)
// )

// === EXAMPLE 2 ===

// derive all physical columns automatically
// but add computed columns (in this case for creating a proctime attribute column)

Table table = tableEnv.fromDataStream(
    dataStream,
    Schema.newBuilder()
        .columnByExpression("proc_time", "PROCTIME()")
        .build());
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT NOT NULL,
//  `event_time` TIMESTAMP_LTZ(9),
//  `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
//)

// === EXAMPLE 3 ===

// derive all physical columns automatically
// but add computed columns (in this case for creating a rowtime attribute column)
// and a custom watermark strategy

Table table =
    tableEnv.fromDataStream(
        dataStream,
        Schema.newBuilder()
            .columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
            .watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
            .build());
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT,
//  `event_time` TIMESTAMP_LTZ(9),
//  `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
//  WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
// )

// === EXAMPLE 4 ===

// derive all physical columns automatically
// but access the stream record's timestamp for creating a rowtime attribute column
// also rely on the watermarks generated in the DataStream API

// we assume that a watermark strategy has been defined for `dataStream` before
// (not part of this example)
Table table =
    tableEnv.fromDataStream(
        dataStream,
        Schema.newBuilder()
            .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
            .watermark("rowtime", "SOURCE_WATERMARK()")
            .build());
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT,
//  `event_time` TIMESTAMP_LTZ(9),
//  `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
//  WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
// )

// === EXAMPLE 5 ===

// define physical columns manually
// in this example,
//   - we can reduce the default precision of timestamps from 9 to 3
//   - we also project the columns and put `event_time` to the beginning

Table table =
    tableEnv.fromDataStream(
        dataStream,
        Schema.newBuilder()
            .column("event_time", "TIMESTAMP_LTZ(3)")
            .column("name", "STRING")
            .column("score", "INT")
            .watermark("event_time", "SOURCE_WATERMARK()")
            .build());
table.printSchema();
// prints:
// (
//  `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
//  `name` VARCHAR(200),
//  `score` INT
// )
// note: the watermark strategy is not shown due to the inserted column reordering projection

示例 1 说明了一个不需要基于时间的操作的简单用例。

示例 4 是最常见的用例,其中基于时间的操作(例如窗口或间隔连接)应该是管道的一部分。 示例 2 是这些基于时间的操作应该在处理时间内工作的最常见用例。

示例 5 完全依赖于用户的声明。 这对于将 DataStream API 中的泛型类型(在 Table API 中为 RAW)替换为适当的数据类型很有用。

由于 DataType 比 TypeInformation 更丰富,我们可以轻松启用不可变 POJO 和其他复杂的数据结构。 以下 Java 示例显示了可能的情况。 另请查看 DataStream API 的 Data Types & Serialization 页面以获取有关那里支持的类型的更多信息。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;

// the DataStream API does not support immutable POJOs yet,
// the class will result in a generic type that is a RAW type in Table API by default
public static class User {

    public final String name;

    public final Integer score;

    public User(String name, Integer score) {
        this.name = name;
        this.score = score;
    }
}

// create a DataStream
DataStream<User> dataStream = env.fromElements(
    new User("Alice", 4),
    new User("Bob", 6),
    new User("Alice", 10));

// since fields of a RAW type cannot be accessed, every stream record is treated as an atomic type
// leading to a table with a single column `f0`

Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
// prints:
// (
//  `f0` RAW('User', '...')
// )

// instead, declare a more useful data type for columns using the Table API's type system
// in a custom schema and rename the columns in a following `as` projection

Table table = tableEnv
    .fromDataStream(
        dataStream,
        Schema.newBuilder()
            .column("f0", DataTypes.of(User.class))
            .build())
    .as("user");
table.printSchema();
// prints:
// (
//  `user` *User<`name` STRING,`score` INT>*
// )

// data types can be extracted reflectively as above or explicitly defined

Table table3 = tableEnv
    .fromDataStream(
        dataStream,
        Schema.newBuilder()
            .column(
                "f0",
                DataTypes.STRUCTURED(
                    User.class,
                    DataTypes.FIELD("name", DataTypes.STRING()),
                    DataTypes.FIELD("score", DataTypes.INT())))
            .build())
    .as("user");
table.printSchema();
// prints:
// (
//  `user` *User<`name` STRING,`score` INT>*
// )

createTemporaryView 示例

DataStream 可以直接注册为视图(可能通过模式丰富)。

从 DataStream 创建的视图只能注册为临时视图。 由于它们的内联/匿名性质,无法将它们注册到永久目录中。

下面的代码展示了如何在不同的场景下使用 createTemporaryView。

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;

// create some DataStream
DataStream<Tuple2<Long, String>> dataStream = env.fromElements(
    Tuple2.of(12L, "Alice"),
    Tuple2.of(0L, "Bob"));

// === EXAMPLE 1 ===

// register the DataStream as view "MyView" in the current session
// all columns are derived automatically

tableEnv.createTemporaryView("MyView", dataStream);

tableEnv.from("MyView").printSchema();

// prints:
// (
//  `f0` BIGINT NOT NULL,
//  `f1` STRING
// )

// === EXAMPLE 2 ===

// register the DataStream as view "MyView" in the current session,
// provide a schema to adjust the columns similar to `fromDataStream`

// in this example, the derived NOT NULL information has been removed

tableEnv.createTemporaryView(
    "MyView",
    dataStream,
    Schema.newBuilder()
        .column("f0", "BIGINT")
        .column("f1", "STRING")
        .build());

tableEnv.from("MyView").printSchema();

// prints:
// (
//  `f0` BIGINT,
//  `f1` STRING
// )

// === EXAMPLE 3 ===

// use the Table API before creating the view if it is only about renaming columns

tableEnv.createTemporaryView(
    "MyView",
    tableEnv.fromDataStream(dataStream).as("id", "name"));

tableEnv.from("MyView").printSchema();

// prints:
// (
//  `id` BIGINT NOT NULL,
//  `name` STRING
// )

toDataStream 示例

下面的代码展示了如何在不同的场景中使用 toDataStream。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import java.time.Instant;

// POJO with mutable fields
// since no fully assigning constructor is defined, the field order
// is alphabetical [event_time, name, score]
public static class User {

    public String name;

    public Integer score;

    public Instant event_time;
}

tableEnv.executeSql(
    "CREATE TABLE GeneratedTable "
    + "("
    + "  name STRING,"
    + "  score INT,"
    + "  event_time TIMESTAMP_LTZ(3),"
    + "  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
    + ")"
    + "WITH ('connector'='datagen')");

Table table = tableEnv.from("GeneratedTable");

// === EXAMPLE 1 ===

// use the default conversion to instances of Row

// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagated

DataStream<Row> dataStream = tableEnv.toDataStream(table);

// === EXAMPLE 2 ===

// a data type is extracted from class `User`,
// the planner reorders fields and inserts implicit casts where possible to convert internal
// data structures to the desired structured type

// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagated

DataStream<User> dataStream = tableEnv.toDataStream(table, User.class);

// data types can be extracted reflectively as above or explicitly defined

DataStream<User> dataStream =
    tableEnv.toDataStream(
        table,
        DataTypes.STRUCTURED(
            User.class,
            DataTypes.FIELD("name", DataTypes.STRING()),
            DataTypes.FIELD("score", DataTypes.INT()),
            DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));

请注意,toDataStream 仅支持非更新表。 通常,基于时间的操作(例如窗口、间隔连接或 MATCH_RECOGNIZE 子句)非常适合与投影和过滤器等简单操作相邻的仅插入管道。

具有产生更新的操作的管道可以使用 toChangelogStream。

处理变更流

在内部,Flink 的表运行时是一个变更日志处理器。 概念页面描述了动态表和流如何相互关联。

StreamTableEnvironment 提供以下方法来公开这些变更数据捕获 (CDC) 功能:

  • fromChangelogStream(DataStream):将变更日志条目流解释为表格。流记录类型必须是 org.apache.flink.types.Row,因为它的 RowKind 标志是在运行时评估的。默认情况下不传播事件时间和水印。此方法需要一个包含各种更改的更改日志(在 org.apache.flink.types.RowKind 中枚举)作为默认的 ChangelogMode。
  • fromChangelogStream(DataStream, Schema):允许为 DataStream 定义类似于 fromDataStream(DataStream, Schema) 的模式。否则语义等于 fromChangelogStream(DataStream)。
  • fromChangelogStream(DataStream, Schema, ChangelogMode):完全控制如何将流解释为变更日志。传递的 ChangelogMode 有助于规划器区分仅插入、更新插入或收回行为。
  • toChangelogStream(Table):fromChangelogStream(DataStream)的逆操作。它生成一个包含 org.apache.flink.types.Row 实例的流,并在运行时为每条记录设置 RowKind 标志。该方法支持各种更新表。如果输入表包含单个行时间列,它将被传播到流记录的时间戳中。水印也将被传播。
  • toChangelogStream(Table, Schema):fromChangelogStream(DataStream, Schema)的逆操作。该方法可以丰富产生的列数据类型。如有必要,计划者可能会插入隐式强制转换。可以将行时间写为元数据列。
  • toChangelogStream(Table, Schema, ChangelogMode):完全控制如何将表转换为变更日志流。传递的 ChangelogMode 有助于规划器区分仅插入、更新插入或收回行为。

从 Table API 的角度来看,与 DataStream API 之间的转换类似于读取或写入已使用 SQL 中的 CREATE TABLE DDL 定义的虚拟表连接器。

因为 fromChangelogStream 的行为与 fromDataStream 类似,我们建议在继续之前阅读上一节

此虚拟连接器还支持读取和写入流记录的行时元数据。

虚拟表源实现 SupportsSourceWatermark

fromChangelogStream 使用示例

下面的代码展示了如何将 fromChangelogStream 用于不同的场景。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

// === EXAMPLE 1 ===

// interpret the stream as a retract stream

// create a changelog DataStream
DataStream<Row> dataStream =
    env.fromElements(
        Row.ofKind(RowKind.INSERT, "Alice", 12),
        Row.ofKind(RowKind.INSERT, "Bob", 5),
        Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
        Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));

// interpret the DataStream as a Table
Table table = tableEnv.fromChangelogStream(dataStream);

// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv
    .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
    .print();

// prints:
// +----+--------------------------------+-------------+
// | op |                           name |       score |
// +----+--------------------------------+-------------+
// | +I |                            Bob |           5 |
// | +I |                          Alice |          12 |
// | -D |                          Alice |          12 |
// | +I |                          Alice |         100 |
// +----+--------------------------------+-------------+

// === EXAMPLE 2 ===

// interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)

// create a changelog DataStream
DataStream<Row> dataStream =
    env.fromElements(
        Row.ofKind(RowKind.INSERT, "Alice", 12),
        Row.ofKind(RowKind.INSERT, "Bob", 5),
        Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));

// interpret the DataStream as a Table
Table table =
    tableEnv.fromChangelogStream(
        dataStream,
        Schema.newBuilder().primaryKey("f0").build(),
        ChangelogMode.upsert());

// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv
    .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
    .print();

// prints:
// +----+--------------------------------+-------------+
// | op |                           name |       score |
// +----+--------------------------------+-------------+
// | +I |                            Bob |           5 |
// | +I |                          Alice |          12 |
// | -U |                          Alice |          12 |
// | +U |                          Alice |         100 |
// +----+--------------------------------+-------------+

示例 1 中显示的默认 ChangelogMode 对于大多数用例来说应该足够了,因为它接受各种更改。

但是,示例 2 显示了如何通过使用 upsert 模式将更新消息的数量减少 50% 来限制传入更改的种类以提高效率。 可以通过为 toChangelogStream 定义主键和 upsert 更改日志模式来减少结果消息的数量。

toChangelogStream使用示例

下面的代码展示了如何在不同的场景下使用 toChangelogStream。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.data.StringData;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.*;

// create Table with event-time
tableEnv.executeSql(
    "CREATE TABLE GeneratedTable "
    + "("
    + "  name STRING,"
    + "  score INT,"
    + "  event_time TIMESTAMP_LTZ(3),"
    + "  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
    + ")"
    + "WITH ('connector'='datagen')");

Table table = tableEnv.from("GeneratedTable");

// === EXAMPLE 1 ===

// convert to DataStream in the simplest and most general way possible (no event-time)

Table simpleTable = tableEnv
    .fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12))
    .as("name", "score")
    .groupBy($("name"))
    .select($("name"), $("score").sum());

tableEnv
    .toChangelogStream(simpleTable)
    .executeAndCollect()
    .forEachRemaining(System.out::println);

// prints:
// +I[Bob, 12]
// +I[Alice, 12]
// -U[Alice, 12]
// +U[Alice, 14]

// === EXAMPLE 2 ===

// convert to DataStream in the simplest and most general way possible (with event-time)

DataStream<Row> dataStream = tableEnv.toChangelogStream(table);

// since `event_time` is a single time attribute in the schema, it is set as the
// stream record's timestamp by default; however, at the same time, it remains part of the Row

dataStream.process(
    new ProcessFunction<Row, Void>() {
        @Override
        public void processElement(Row row, Context ctx, Collector<Void> out) {

             // prints: [name, score, event_time]
             System.out.println(row.getFieldNames(true));

             // timestamp exists twice
             assert ctx.timestamp() == row.<Instant>getFieldAs("event_time").toEpochMilli();
        }
    });
env.execute();

// === EXAMPLE 3 ===

// convert to DataStream but write out the time attribute as a metadata column which means
// it is not part of the physical schema anymore

DataStream<Row> dataStream = tableEnv.toChangelogStream(
    table,
    Schema.newBuilder()
        .column("name", "STRING")
        .column("score", "INT")
        .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
        .build());

// the stream record's timestamp is defined by the metadata; it is not part of the Row

dataStream.process(
    new ProcessFunction<Row, Void>() {
        @Override
        public void processElement(Row row, Context ctx, Collector<Void> out) {

            // prints: [name, score]
            System.out.println(row.getFieldNames(true));

            // timestamp exists once
            System.out.println(ctx.timestamp());
        }
    });
env.execute();

// === EXAMPLE 4 ===

// for advanced users, it is also possible to use more internal data structures for efficiency

// note that this is only mentioned here for completeness because using internal data structures
// adds complexity and additional type handling

// however, converting a TIMESTAMP_LTZ column to `Long` or STRING to `byte[]` might be convenient,
// also structured types can be represented as `Row` if needed

DataStream<Row> dataStream = tableEnv.toChangelogStream(
    table,
    Schema.newBuilder()
        .column(
            "name",
            DataTypes.STRING().bridgedTo(StringData.class))
        .column(
            "score",
            DataTypes.INT())
        .column(
            "event_time",
            DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class))
        .build());

// leads to a stream of Row(name: StringData, score: Integer, event_time: Long)

有关示例 4 中的数据类型支持哪些转换的更多信息,请参阅 Table API 的数据类型页面。

toChangelogStream(Table).executeAndCollect() 的行为等同于调用 Table.execute().collect()。 但是, toChangelogStream(Table) 可能对测试更有用,因为它允许在 DataStream API 的后续 ProcessFunction 中访问生成的水印。

TypeInformation 和 DataType 之间的映射

DataStream API 使用 org.apache.flink.api.common.typeinfo.TypeInformation 的实例来描述在流中传输的记录类型。特别是,它定义了如何将记录从一个 DataStream 运算符序列化和反序列化到另一个。它还有助于将状态序列化为保存点和检查点。

Table API 使用自定义数据结构在内部表示记录,并向用户公开 org.apache.flink.table.types.DataType 以声明将数据结构转换为的外部格式,以便在源、接收器、UDF 或 DataStream 中使用API。

DataType 比 TypeInformation 更丰富,因为它还包含有关逻辑 SQL 类型的详细信息。因此,在转换过程中会隐式添加一些细节。

Table 的列名和类型自动派生自 DataStream 的 TypeInformation。使用 DataStream.getType() 检查是否已通过 DataStream API 的反射类型提取工具正确检测到类型信息。如果最外层记录的 TypeInformation 是 CompositeType,则在派生表的 schema 时将在第一级展平。

TypeInformation 转为 DataType

将 TypeInformation 转换为 DataType 时适用以下规则:

  • TypeInformation 的所有子类都映射到逻辑类型,包括与 Flink 内置序列化器对齐的可空性。
  • TupleTypeInfoBase 的子类被转换为行(对于 Row)或结构化类型(对于元组、POJO 和案例类)。
  • BigDecimal 默认转换为 DECIMAL(38, 18)。
  • PojoTypeInfo 字段的顺序由以所有字段作为参数的构造函数确定。 如果在转换过程中未找到,则字段顺序将按字母顺序排列。
  • 无法表示为列出的 org.apache.flink.table.api.DataTypes 之一的 GenericTypeInfo 和其他 TypeInformation 将被视为黑盒 RAW 类型。 当前会话配置用于实现原始类型的序列化程序。 届时将无法访问复合嵌套字段。
  • 有关完整的翻译逻辑,请参阅 TypeInfoDataTypeConverter。

使用 DataTypes.of(TypeInformation) 在自定义模式声明或 UDF 中调用上述逻辑。

DataType转为TypeInformation

表运行时将确保正确地将输出记录序列化到 DataStream API 的第一个运算符。

之后,需要考虑 DataStream API 的类型信息语义。

0 0 投票数
文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://lrting.top/backend/3912/

(0)
上一篇 2022-02-21 02:51
下一篇 2022-02-26 13:38

相关推荐

订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x