Flink Table API和SQL(中)

一、时间属性和窗口

  基于时间的操作(比如时间窗口),需要定义相关的时间语义和时间数据来源的信息。在 Table API 和 SQL 中,会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间。
  所以所谓的时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的DDL 里直接定义为一个字段,也可以在 DataStream 转换成表时定义。一旦定义了时间属性,它就可以作为一个普通字段引用,并且可以在基于时间的操作中使用。
  时间属性的数据类型为TIMESTAMP,它的行为类似于常规时间戳,可以直接访问并且进行计算。
按照时间语义的不同,我们可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)两种情况。

1. 事件时间

  我们在实际应用中,最常用的就是事件时间。在事件时间语义下,允许表处理程序根据每个数据中包含的时间戳(也就是事件发生的时间)来生成结果。
  事件时间语义最大的用途就是处理乱序事件或者延迟事件的场景。我们通过设置水位线(watermark)来表示事件时间的进展,而水位线可以根据数据的最大时间戳设置一个延迟时间。这样即使在出现乱序的情况下,对数据的处理也可以获得正确的结果。
  为了处理无序事件,并区分流中的迟到事件。Flink 需要从事件数据中提取时间戳,并生成水位线,用来推进事件时间的进展。
  事件时间属性可以在创建表DDL中定义,也可以在数据流和表的转换中定义。

1.1 在创建表的DDL 中定义

  在创建表的 DDL(CREATE TABLE 语句)中,可以增加一个字段,通过 WATERMARK 语句来定义事件时间属性。WATERMARK 语句主要用来定义水位线(watermark)的生成表达式,这个表达式会将带有事件时间戳的字段标记为事件时间属性,并在它基础上给出水位线的延迟时间。具体定义方式如下:

CREATE TABLE EventTable( user STRING,
url STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
...

  这里我们把 ts 字段定义为事件时间属性,而且基于 ts 设置了 5 秒的水位线延迟。这里的“5 秒”是以“时间间隔”的形式定义的,格式是INTERVAL

INTERVAL '5' SECOND

  这里的数值必须用单引号引起来,而单位用 SECOND 和 SECONDS 是等效的。
  Flink 中支持的事件时间属性数据类型必须为TIMESTAMP 或者TIMESTAMP_LTZ。这里TIMESTAMP_LTZ 是指带有本地时区信息的时间戳(TIMESTAMP WITH LOCAL TIME ZONE);一般情况下如果数据中的时间戳是“年-月-日-时-分-秒”的形式,那就是不带时区信息的,可以将事件时间属性定义为TIMESTAMP 类型。
  而如果原始的时间戳就是一个长整型的毫秒数,这时就需要另外定义一个字段来表示事件时间属性,类型定义为TIMESTAMP_LTZ 会更方便:

CREATE TABLE events ( user STRING,
url STRING, ts BIGINT,
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);

  这里我们另外定义了一个字段ts_ltz,是把长整型的 ts 转换为TIMESTAMP_LTZ 得到的;进而使用 WATERMARK 语句将它设为事件时间属性,并设置 5 秒的水位线延迟。

1.2 在数据流转换为表时定义

  事件时间属性也可以在将DataStream 转换为表的时候来定义。我们调用 fromDataStream() 方法创建表时,可以追加参数来定义表中的字段结构;这时可以给某个字段加上.rowtime() 后缀,就表示将当前字段指定为事件时间属性。这个字段可以是数据中本不存在、额外追加上去的“逻辑字段”,就像之前 DDL 中定义的第二种情况;也可以是本身固有的字段,那么这个字段就会被事件时间属性所覆盖,类型也会被转换为 TIMESTAMP。不论那种方式,时间属性字段中保存的都是事件的时间戳(TIMESTAMP 类型)。
  需要注意的是,这种方式只负责指定时间属性,而时间戳的提取和水位线的生成应该之前就在 DataStream 上定义好了。由于 DataStream 中没有时区概念,因此 Flink 会将事件时间属性解析成不带时区的TIMESTAMP 类型,所有的时间值都被当作 UTC 标准时间。

在代码中的定义方式如下:

// 方法一:
// 流中数据类型为二元组 Tuple2,包含两个字段;需要自定义提取时间戳并生成水位线
DataStreamTuple2String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").rowtime());

// 方法二:
// 流中数据类型为三元组 Tuple3,最后一个字段就是事件时间戳
DataStreamTuple3String, String, Long>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 不再声明额外字段,直接用最后一个字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").rowtime());

2. 处理时间

  相比之下处理时间就比较简单了,它就是我们的系统时间,使用时不需要提取时间戳
(timestamp)和生成水位线(watermark)。因此在定义处理时间属性时,必须要额外声明一个字段,专门用来保存当前的处理时间。
  类似地,处理时间属性的定义也有两种方式:创建表 DDL 中定义,或者在数据流转换成表时定义。

2.1 在创建表的DDL 中定义

  在创建表的 DDL(CREATE TABLE 语句)中,可以增加一个额外的字段,通过调用系统内置的 PROCTIME()函数来指定当前的处理时间属性,返回的类型是TIMESTAMP_LTZ

CREATE TABLE EventTable( user STRING,
url STRING,
ts AS PROCTIME()
) WITH (
...

  这里的时间属性,其实是以“计算列”(computed column)的形式定义出来的。所谓的计算列是 Flink SQL 中引入的特殊概念,可以用一个 AS 语句来在表中产生数据中不存在的列, 并且可以利用原有的列、各种运算符及内置函数。在前面事件时间属性的定义中,将 ts 字段转换成 TIMESTAMP_LTZ 类型的 ts_ltz,也是计算列的定义方式。

2.2 在数据流转换为表时定义

  处理时间属性同样可以在将 DataStream 转换为表的时候来定义。 我们调用fromDataStream()方法创建表时,可以用.proctime()后缀来指定处理时间属性字段。由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个已有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现。
  代码中定义处理时间属性的方法如下:

DataStreamTuple2String, String>> stream = ...;

// 声明一个额外的字段作为处理时间属性字段
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").proctime());

3. 窗口(Window)

3.1 分组窗口(Group Window,老版本)

  在 Flink 1.12 之前的版本中,Table API 和 SQL 提供了一组“分组窗口”(Group Window)函数,常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现;具体在 SQL 中就是调用 TUMBLE()、HOP()、SESSION(),传入时间属性字段、窗口大小等参数就可以了。以滚动窗口为例:

TUMBLE(ts, INTERVAL '1' HOUR)

  这里的 ts 是定义好的时间属性字段,窗口大小用“时间间隔”INTERVAL 来定义。
  在进行窗口计算时,分组窗口是将窗口本身当作一个字段对数据进行分组的,可以对组内的数据进行聚合。基本使用方式如下:

Table result = tableEnv.sqlQuery(
    "SELECT " +
    "user, " +
    "TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " +
    "COUNT(url) AS cnt " + 
    "FROM EventTable " +
    "GROUP BY " + // 使用窗口和用户名进行分组
    "user, " +
    "TUMBLE(ts, INTERVAL '1' HOUR)" // 定义 1 小时滚动窗口
);

  这里定义了 1 小时的滚动窗口,将窗口和用户 user 一起作为分组的字段。用聚合函数COUNT()对分组数据的个数进行了聚合统计,并将结果字段重命名为cnt;用TUPMBLE_END()函数获取滚动窗口的结束时间,重命名为 endT 提取出来。
  分组窗口的功能比较有限,只支持窗口聚合,所以目前已经处于弃用(deprecated)的状态。

3.2 窗口表值函数(Windowing TVFs,新版本)

  从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions, Windowing TVFs)来定义窗口。窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表进行扩展后返回。表函数(table function)可以看作是返回一个表的函数,关于这部分内容, 我们会在后面进行介绍。

目前 Flink 提供了以下几个窗口TVF:

  • 滚动窗口(Tumbling Windows);
  • 滑动窗口(Hop Windows,跳跃窗口);
  • 累积窗口(Cumulate Windows);
  • 会话窗口(Session Windows,目前尚未完全支持)。

  窗口表值函数可以完全替代传统的分组窗口函数。窗口 TVF 更符合 SQL 标准,性能得到了优化,拥有更强大的功能;可以支持基于窗口的复杂计算,例如窗口Top-N、窗口联结(window join)等等。当然,目前窗口 TVF 的功能还不完善,会话窗口和很多高级功能还不支持,不过正在快速地更新完善。可以预见在未来的版本中,窗口 TVF 将越来越强大,将会是窗口处理的唯一入口。
  在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列: “窗口起始点”(window_start)、“窗口结束点”(window_end)、“窗口时间”(window_time)。
起始点和结束点比较好理解,这里的“窗口时间”指的是窗口中的时间属性,它的值等于
window_end - 1ms,所以相当于是窗口中能够包含数据的最大时间戳。
  在 SQL 中的声明方式,与以前的分组窗口是类似的,直接调用 TUMBLE()、HOP()、CUMULATE()就可以实现滚动、滑动和累积窗口,不过传入的参数会有所不同。下面我们就分别对这几种窗口TVF 进行介绍。
(1)滚动窗口(TUMBLE)
  滚动窗口在SQL 中的概念与 DataStream API 中的定义完全一样,是长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。
  在 SQL 中通过调用 TUMBLE()函数就可以声明一个滚动窗口,只有一个核心参数就是窗口大小(size)。在 SQL 中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要将当前的时间属性字段传入;另外,窗口 TVF 本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。具体声明如下:

TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)

  这里基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中。

(2)滑动窗口(HOP)
  滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL 中通过调用 HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size) 和滑动步长(slide)两个参数。

HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));

  这里我们基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。

(3) 累积窗口(CUMULATE)
  滚动窗口和滑动窗口,可以用来计算大多数周期性的统计指标。不过在实际应用中还会遇到这样一类需求:我们的统计周期可能较长,因此希望中间每隔一段时间就输出一次当前的统计值;与滑动窗口不同的是,在一个统计周期内,我们会多次输出统计值,它们应该是不断叠加累积的。
  例如,我们按天来统计网站的 PV(Page View,页面浏览量),如果用 1 天的滚动窗口,那需要到每天 24 点才会计算一次,输出频率太低;如果用滑动窗口,计算频率可以更高,但统计的就变成了“过去 24 小时的 PV”。所以我们真正希望的是,还是按照自然日统计每天的 PV,不过需要每隔 1 小时就输出一次当天到目前为止的 PV 值。这种特殊的窗口就叫作“累积窗口”(Cumulate Window)。

Flink Table API和SQL(中)

  累积窗口是窗口 TVF 中新增的窗口功能,它会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。如图所示,开始时,创建的第一个窗口大小就是步长 step;之后的每个窗口都会在之前的基础上再扩展 step 的长度,直到达到最大窗口长度。在 SQL 中可以用CUMULATE()函数来定义,具体如下:

CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))

  这里我们基于时间属性 ts,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1
小时的累积窗口。注意第三个参数为步长 step,第四个参数则是最大窗口长度。
  上面所有的语句只是定义了窗口,类似于 DataStream API 中的窗口分配器;在 SQL 中窗口的完整调用,还需要配合聚合操作和其它操作。我们会在下一节详细讲解窗口的聚合。

二、聚合(Aggregation)查询

聚合查询分成两种:流处理中特有的聚合(主要指窗口聚合),以及 SQL 原生的聚合查询方式。

1. 分组聚合

  SQL 中一般所说的聚合我们都很熟悉,主要是通过内置的一些聚合函数来实现的,比如SUM()MAX()MIN()AVG()以及 COUNT()。它们的特点是对多条输入数据进行计算,得到一个唯一的值,属于“多对一”的转换。比如我们可以通过下面的代码计算输入数据的个数:

Table eventCountTable = tableEnv.sqlQuery("select COUNT(*) from EventTable");

  而更多的情况下,我们可以通过 GROUP BY 子句来指定分组的键(key),从而对数据按照某个字段做一个分组统计。例如之前我们举的例子,可以按照用户名进行分组,统计每个用户点击 url 的次数:

SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user

  这种聚合方式,就叫作“分组聚合”(group aggregation)。从概念上讲,SQL 中的分组聚合可以对应 DataStream API 中 keyBy 之后的聚合转换,它们都是按照某个 key 对数据进行了划分,各自维护状态来进行聚合统计的。在流处理中,分组聚合同样是一个持续查询,而且是一个更新查询,得到的是一个动态表;每当流中有一个新的数据到来时,都会导致结果表的更新操作。因此,想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream) 或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成 DataStream 打印输出, 需要调用 toChangelogStream()。
  另外,在持续查询的过程中,由于用于分组的 key 可能会不断增加,因此计算结果所需要维护的状态也会持续增长。为了防止状态无限增长耗尽资源,Flink Table API 和 SQL 可以在表环境中配置状态的生存时间(TTL):

TableEnvironment tableEnv = ...

// 获取表环境的配置
TableConfig tableConfig = tableEnv.getConfig();
// 配置状态保持时间
tableConfig.setIdleStateRetention(Duration.ofMinutes(60));

或者也可以直接设置配置项 table.exec.state.ttl:

TableEnvironment tableEnv = ...
Configuration configuration = tableEnv.getConfig().getConfiguration(); configuration.setString("table.exec.state.ttl", "60 min");

  这两种方式是等效的。需要注意,配置 TTL 有可能会导致统计结果不准确,这其实是以牺牲正确性为代价换取了资源的释放。
  此外,在 Flink SQL 的分组聚合中同样可以使用 DISTINCT 进行去重的聚合处理;可以使用 HAVING 对聚合结果进行条件筛选;还可以使用GROUPING SETS(分组集)设置多个分组情况分别统计。这些语法跟标准 SQL 中的用法一致,这里就不再详细展开了。
  可以看到,分组聚合既是 SQL 原生的聚合查询,也是流处理中的聚合操作,这是实际应用中最常见的聚合方式。当然,使用的聚合函数一般都是系统内置的,如果希望实现特殊需求也可以进行自定义。关于自定义函数(UDF),我们会在后面详细介绍。

2. 窗口聚合

  在流处理中,往往需要将无限数据流划分成有界数据集,这就是所谓的“窗口”。在 11.4.3 小节中已经介绍了窗口的声明方式,这相当于 DataStream API 中的窗口分配器(window assigner),只是明确了窗口的形式以及数据如何分配;而窗口具体的计算处理操作,在 DataStream API 中还需要窗口函数(window function)来进行定义。
  在 Flink 的 Table API 和 SQL 中,窗口的计算是通过“窗口聚合”(window aggregation)来实现的。与分组聚合类似,窗口聚合也需要调用 SUM()、MAX()、MIN()、COUNT()一类的聚合函数,通过GROUP BY 子句来指定分组的字段。只不过窗口聚合时,需要将窗口信息作为分组 key 的一部分定义出来。在 Flink 1.12 版本之前,是直接把窗口自身作为分组 key 放在GROUP BY 之后的,所以也叫“分组窗口聚合”;而 1.13 版本开始使用了 “窗口表值函数”(Windowing TVF),窗口本身返回的是就是一个表,所以窗口会出现在 FROM后面,GROUP BY 后面的则是窗口新增的字段 window_startwindow_end

比如,我们将前面分组窗口的聚合,用窗口TVF 重新实现一下:

Table result = tableEnv.sqlQuery(
    "SELECT " +
    "user, " +
    "window_end AS endT, " + "COUNT(url) AS cnt " +
    "FROM TABLE( " +
    "TUMBLE( TABLE EventTable, " + "DESCRIPTOR(ts), " + "INTERVAL '1' HOUR)) " +
    "GROUP BY user, window_start, window_end "
    );

  这里我们以 ts 作为时间属性字段、基于 EventTable 定义了 1 小时的滚动窗口,希望统计出每小时每个用户点击 url 的次数。用来分组的字段是用户名 user, 以及表示窗口的window_start 和window_end;而 TUMBLE()是表值函数,所以得到的是一个表(Table),我们的聚合查询就是在这个Table 中进行的。这就是前面窗口聚合的实现方式。
  Flink SQL 目前提供了滚动窗口TUMBLE()、滑动窗口 HOP()和累积窗口(CUMULATE) 三种表值函数(TVF)。在具体应用中,我们还需要提前定义好时间属性。下面是一段窗口聚合的完整代码,以累积窗口为例:

import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class CumulateWindowExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 读取数据源,并分配时间戳、生成水位线
        SingleOutputStreamOperatorEvent> eventStream = env
                .fromElements(
                        new Event("Alice", "./home", 1000L),
                        new Event("Bob", "./cart", 1000L),
                        new Event("Alice", "./prod?id=1", 25 * 60 * 1000L),
                        new Event("Alice", "./prod?id=4", 55 * 60 * 1000L),
                        new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
                        new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
                        new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssignerEvent>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );

        // 创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 将数据流转换成表,并指定时间属性
        Table eventTable = tableEnv.fromDataStream(
                eventStream,
                $("user"),
                $("url"),
                $("timestamp").rowtime().as("ts")
        );

        // 为方便在SQL中引用,在环境中注册表EventTable
        tableEnv.createTemporaryView("EventTable", eventTable);

        // 设置累积窗口,执行SQL统计查询
        Table result = tableEnv
                .sqlQuery(
                        "SELECT " +
                                "user, " +
                                "window_end AS endT, " +
                                "COUNT(url) AS cnt " +
                                "FROM TABLE( " +
                                "CUMULATE( TABLE EventTable, " +    // 定义累积窗口
                                "DESCRIPTOR(ts), " +
                                "INTERVAL '30' MINUTE, " +
                                "INTERVAL '1' HOUR)) " +
                                "GROUP BY user, window_start, window_end "
                );

        tableEnv.toDataStream(result).print();

        env.execute();
    }
}

  这里我们使用了统计周期为 1 小时、累积间隔为 30 分钟的累积窗口。可以看到,代码的架构和处理逻辑与前面的实现完全一致,只是将滚动窗口 TUMBLE()换成了累积窗口CUMULATE()。代码执行结果如下:

+I[Alice, 1970-01-01T00:30, 2]
+I[Bob, 1970-01-01T00:30, 1]
+I[Alice, 1970-01-01T01:00, 3]
+I[Bob, 1970-01-01T01:00, 1]
+I[Bob, 1970-01-01T01:30, 1]
+I[Cary, 1970-01-01T02:00, 2]
+I[Bob, 1970-01-01T02:00, 1]

  与分组聚合不同,窗口聚合不会将中间聚合的状态输出,只会最后输出一个结果。我们可以看到,所有数据都是以 INSERT 操作追加到结果动态表中的,因此输出每行前面都有+I 的前缀。所以窗口聚合查询都属于追加查询,没有更新操作,代码中可以直接用 toDataStream() 将结果表转换成流。
  具体来看,上面代码输入的前三条数据属于第一个半小时的累积窗口,其中Alice 的访问数据有两条,Bob 的访问数据有 1 条,所以输出了两条结果[Alice, 1970-01-01T00:30, 2]和[Bob, 1970-01-01T00:30, 1];而之后又到来的一条Alice 访问数据属于第二个半小时范围,同时也属于第一个 1 小时的统计周期, 所以会在之前两条的基础上进行叠加, 输出[Alice,1970-01-01T00:30, 3],而 Bob 没有新的访问数据,因此依然输出[Bob, 1970-01-01T00:30, 1]。从第二个小时起,数据属于新的统计周期,就全部从零开始重新计数了。
  相比之前的分组窗口聚合,Flink 1.13 版本的窗口表值函数(TVF)聚合有更强大的功能。除了应用简单的聚合函数、提取窗口开始时间(window_start)和结束时间(window_end)之外, 窗口 TVF 还提供了一个 window_time 字段,用于表示窗口中的时间属性;这样就可以方便地进行窗口的级联(cascading window)和计算了。另外,窗口TVF 还支持 GROUPING SETS, 极大地扩展了窗口的应用范围。
  基于窗口的聚合,是流处理中聚合统计的一个特色,也是与标准 SQL 最大的不同之处。在实际项目中,很多统计指标其实都是基于时间窗口来进行计算的,所以窗口聚合是 Flink SQL 中非常重要的功能;基于窗口 TVF 的聚合未来也会有更多功能的扩展支持,比如窗口 Top N、会话窗口、窗口联结等等。

3. 开窗(Over)聚合

  在标准 SQL 中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值。比如说,我们可以以每一行数据为基准,计算它之前 1 小时内所有数据的平均值;也可以计算它之前 10 个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就是所谓的“开窗函数”。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口 TVF 聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系。与标准 SQL 中一致,Flink SQL 中的开窗函数也是通过 OVER 子句来实现的,所以有时开窗聚合也叫作“OVER 聚合”(Over Aggregation)。基本语法如下:

SELECT
    聚合函数> OVER (
    [PARTITION BY 字段 1>[, 字段 2>, ...]]
    ORDER BY 时间属性字段>
    开窗范围>),
    ...
FROM ...

这里OVER 关键字前面是一个聚合函数,它会应用在后面 OVER 定义的窗口上。在 OVER子句中主要有以下几个部分:

  • PARTITION BY(可选)
    用来指定分区的键(key),类似于 GROUP BY 的分组,这部分是可选的;
  • ORDER BY
      OVER 窗口是基于当前行扩展出的一段数据范围,选择的标准可以基于时间也可以基于数量。不论那种定义,数据都应该是以某种顺序排列好的;而表中的数据本身是无序的。所以在OVER 子句中必须用 ORDER BY 明确地指出数据基于那个字段排序。在 Flink 的流处理中, 目前只支持按照时间属性的升序排列,所以这里 ORDER BY 后面的字段必须是定义好的时间属性。
  • 开窗范围
      对于开窗函数而言,还有一个必须要指定的就是开窗的范围,也就是到底要扩展多少行来做聚合。这个范围是由BETWEEN AND 来定义的,也就是“从下界到上界” 的范围。目前支持的上界只能是 CURRENT ROW,也就是定义一个“从之前某一行到当前行” 的范围,所以一般的形式为:
BETWEEN ... PRECEDING AND CURRENT ROW

  前面我们提到,开窗选择的范围可以基于时间,也可以基于数据的数量。所以开窗范围还应该在两种模式之间做出选择:范围间隔(RANGE intervals)和行间隔(ROW intervals)。

  • 范围间隔
      范围间隔以RANGE 为前缀,就是基于ORDER BY 指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间。例如开窗范围选择当前行之前 1 小时的数据:
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  • 行间隔
    行间隔以 ROWS 为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了。例如开窗范围选择当前行之前的 5 行数据(最终聚合会包括当前行,所以一共 6 条数据):
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW

下面是一个具体示例:

SELECT user, ts,
    COUNT(url) OVER ( PARTITION BY user ORDER BY ts
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
    ) AS cnt
FROM EventTable

  这里我们以 ts 作为时间属性字段,对 EventTable 中的每行数据都选取它之前 1 小时的所有数据进行聚合,统计每个用户访问 url 的总次数,并重命名为 cnt。最终将表中每行的 user, ts 以及扩展出 cnt 提取出来。
  可以看到,整个开窗聚合的结果,是对每一行数据都有一个对应的聚合值,因此就像将表中扩展出了一个新的列一样。由于聚合范围上界只能到当前行,新到的数据一般不会影响之前数据的聚合结果,所以结果表只需要不断插入(INSERT)就可以了。执行上面 SQL 得到的结果表,可以用toDataStream()直接转换成流打印输出。
  开窗聚合与窗口聚合(窗口 TVF 聚合)本质上不同,不过也还是有一些相似之处的:它们都是在无界的数据流上划定了一个范围,截取出有限数据集进行聚合统计;这其实都是“窗口”的思路。事实上,在Table API 中确实就定义了两类窗口:分组窗口(GroupWindow)和开窗窗口(OverWindow);而在 SQL 中,也可以用 WINDOW 子句来在 SELECT 外部单独定义一个OVER 窗口:

SELECT user, ts,
    COUNT(url) OVER w AS cnt,
    MAX(CHAR_LENGTH(url)) OVER w AS max_url FROM EventTable
    WINDOW w AS ( PARTITION BY user ORDER BY ts
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

  上面的 SQL 中定义了一个选取之前 2 行数据的 OVER 窗口,并重命名为w;接下来就可以基于它调用多个聚合函数,扩展出更多的列提取出来。比如这里除统计url 的个数外,还统计了url 的最大长度:首先用 CHAR_LENGTH()函数计算出url 的长度,再调用聚合函数 MAX() 进行聚合统计。这样,我们就可以方便重复引用定义好的 OVER 窗口了,大大增强了代码的可读性。

4. 应用实例 —— Top N

只放代码,详细解释看参考资料

4.1 普通 Top N

public class TopNExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1. 在创建表的DDL中直接定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'input/clicks.csv', " +
                " 'format' =  'csv' " +
                ")";

        tableEnv.executeSql(createDDL);

        // 普通Top N,选取当前所有用户中浏览量最大的2个

        Table topNResultTable = tableEnv.sqlQuery("SELECT user, cnt, row_num " +
                "FROM (" +
                "   SELECT *, ROW_NUMBER() OVER (" +
                "      ORDER BY cnt DESC" +
                "   ) AS row_num " +
                "   FROM (SELECT user, COUNT(url) AS cnt FROM clickTable GROUP BY user)" +
                ") WHERE row_num );

        tableEnv.toChangelogStream(topNResultTable).print("top 2: ");

        env.execute();
    }
}

4.2 窗口 Top N

public class WindowTopNExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 读取数据源,并分配时间戳、生成水位线
        SingleOutputStreamOperatorEvent> eventStream = env
                .fromElements(
                        new Event("Alice", "./home", 1000L),
                        new Event("Bob", "./cart", 1000L),
                        new Event("Alice", "./prod?id=1",  25 * 60 * 1000L),
                        new Event("Alice", "./prod?id=4", 55 * 60 * 1000L),
                        new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
                        new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
                        new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssignerEvent>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );

        // 创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 将数据流转换成表,并指定时间属性
        Table eventTable = tableEnv.fromDataStream(
                eventStream,
                $("user"),
                $("url"),
                $("timestamp").rowtime().as("ts")
                // 将timestamp指定为事件时间,并命名为ts
        );

        // 为方便在SQL中引用,在环境中注册表EventTable
        tableEnv.createTemporaryView("EventTable", eventTable);

        // 定义子查询,进行窗口聚合,得到包含窗口信息、用户以及访问次数的结果表
        String subQuery =
                "SELECT window_start, window_end, user, COUNT(url) as cnt " +
                        "FROM TABLE ( " +
                        "TUMBLE( TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR )) " +
                        "GROUP BY window_start, window_end, user ";

        // 定义Top N的外层查询
        String topNQuery =
                "SELECT * " +
                        "FROM (" +
                        "SELECT *, " +
                        "ROW_NUMBER() OVER ( " +
                        "PARTITION BY window_start, window_end " +
                        "ORDER BY cnt desc " +
                        ") AS row_num " +
                        "FROM (" + subQuery + ")) " +
                        "WHERE row_num ;

        // 执行SQL得到结果表
        Table result = tableEnv.sqlQuery(topNQuery);

        tableEnv.toDataStream(result).print();

        env.execute();
    }
}

三、联结(Join)查询

Flink SQL 中的联结查询大体上也可以分为两类:SQL 原生的联结查询方式,和流处理中特有的联结查询。

1. 常规联结查询

  常规联结(Regular Join)是 SQL 中原生定义的 Join 方式,是最通用的一类联结操作。它的具体语法与标准SQL 的联结完全相同,通过关键字 JOIN 来联结两个表,后面用关键字 ON 来指明联结条件。按照习惯,我们一般以“左侧”和“右侧”来区分联结操作的两个表。
  在两个动态表的联结中,任何一侧表的插入(INSERT)或更改(UPDATE)操作都会让联结的结果表发生改变。例如,如果左侧有新数据到来,那么它会与右侧表中所有之前的数据进行联结合并,右侧表之后到来的新数据也会与这条数据连接合并。所以,常规联结查询一般是更新(Update)查询。
  与标准 SQL 一致,Flink SQL 的常规联结也可以分为内联结(INNER JOIN)和外联结
(OUTER JOIN),区别在于结果中是否包含不符合联结条件的行。目前仅支持“等值条件”作为联结条件,也就是关键字 ON 后面必须是判断两表中字段相等的逻辑表达式。

1.1 等值内联结(INNER Equi-JOIN)

  内联结用 INNER JOIN 来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。目前仅支持等值联结条件。
  例如之前提到的“订单表”(定义为 Order)和“商品表”(定义为 Product)的联结查询,就可以用以下SQL 实现:

SELECT *
FROM Order
INNER JOIN Product
ON Order.product_id = Product.id

  这里是一个内联结,联结条件是订单数据的 product_id 和商品数据的 id 相等。由于订单表中出现的商品id 一定会在商品表中出现,因此这样得到的联结结果表,就包含了订单表Order 中所有订单数据对应的详细信息。

1.2 等值外联结(OUTER Equi-JOIN)

  与内联结类似,外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL 支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行 返回。例如,订单表中未必包含了商品表中的所有 ID,为了将哪些没有任何订单的商品信息也查询出来,我们就可以使用右外联结(RIGHT JOIN)。当然,外联结查询目前也仅支持等值联结条件。具体用法如下:

SELECT *
FROM Order
LEFT JOIN Product
ON Order.product_id = Product.id

SELECT *
FROM Order
RIGHT JOIN Product
ON Order.product_id = Product.id

SELECT *
FROM Order
FULL OUTER JOIN Product
ON Order.product_id = Product.id

2. 间隔联结查询

  间隔联结(Interval Join)返回的,同样是符合约束条件的两条中数据的笛卡尔积。只不过这里的“约束条件”除了常规的联结条件外,还多了一个时间间隔的限制。具体语法有以下要点:

  • 两表的联结
    间隔联结不需要用 JOIN 关键字,直接在 FROM 后将要联结的两表列出来就可以,用逗号分隔。这与标准 SQL 中的语法一致,表示一个“交叉联结”(Cross Join),会返回两表中所有行的笛卡尔积。
  • 联结条件
    联结条件用 WHERE 子句来定义,用一个等值表达式描述。交叉联结之后再用 WHERE进行条件筛选,效果跟内联结 INNER JOIN … ON …非常类似。
  • 时间间隔限制
    我们可以在WHERE 子句中,联结条件后用 AND 追加一个时间间隔的限制条件;做法是提取左右两侧表中的时间字段,然后用一个表达式来指明两者需要满足的间隔限制。具体定义方式有下面三种,这里分别用 ltime 和 rtime 表示左右表中的时间字段:
    (1)ltime = rtime
    (2)ltime >= rtime AND ltime
    (3)ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

  判断两者相等,这是最强的时间约束,要求两表中数据的时间必须完全一致才能匹配;一般情况下,我们还是会放宽一些,给出一个间隔。间隔的定义可以用=,>这一类的关系不等式,也可以用BETWEEN … AND …这样的表达式。
  例如,我们现在除了订单表 Order 外,还有一个“发货表”Shipment,要求在收到订单后四个小时内发货。那么我们就可以用一个间隔联结查询,把所有订单与它对应的发货信息连接合并在一起返回。

SELECT *
FROM Order o, Shipment s WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

  在流处理中,间隔联结查询只支持具有时间属性的“仅追加”(Append-only)表。
  那对于有更新操作的表,又怎么办呢?除了间隔联结之外,Flink SQL 还支持时间联结
(Temporal Join),这主要是针对“版本表”(versioned table)而言的。所谓版本表,就是记录了数据随着时间推移版本变化的表,可以理解成一个“更新日志”(change log),它就是具有时间属性、还会进行更新操作的表。当我们联结某个版本表时,并不是把当前的数据连接合并起来就行了,而是希望能够根据数据发生的时间,找到当时的“版本”;这种根据更新时间提取当时的值进行联结的操作,就叫作“时间联结”(Temporal Join)。这部分内容由于涉及版本表的定义,我们就不详细展开了,感兴趣的可以查阅官网资料。

参考资料

Word版:https://download.csdn.net/download/mengxianglong123/85035166
PDF版:https://download.csdn.net/download/mengxianglong123/85035172

    
原文链接:Flink Table API和SQL(中)
0 0 投票数
文章评分

本文转载自落花雨时,原文链接:https://blog.csdn.net/mengxianglong123/article/details/124072705。

(0)
上一篇 2022-07-07 09:35
下一篇 2022-07-07 09:44

相关推荐

订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x