一、函数
在 SQL 中,我们可以把一些数据的转换操作包装起来,嵌入到 SQL 查询中统一调用,这就是“函数”(functions)。
Flink 的Table API 和 SQL 同样提供了函数的功能。两者在调用时略有不同:Table API 中的函数是通过数据对象的方法调用来实现的;而 SQL 则是直接引用函数名称,传入数据作为参数。例如,要把一个字符串 str 转换成全大写的形式,Table API 的写法是调用 str 这个 String 对象的 upperCase()
方法:
str.upperCase();
而 SQL 中的写法就是直接引用UPPER()函数,将 str 作为参数传入:
UPPER(str)
由于Table API 是内嵌在 Java 语言中的,很多方法需要在类中额外添加,因此扩展功能比较麻烦,目前支持的函数比较少;而且 Table API 也不如 SQL 的通用性强,所以一般情况下较少使用。下面我们主要介绍 Flink SQL 中函数的使用。
Flink SQL 中的函数可以分为两类:一类是 SQL 中内置的系统函数,直接通过函数名调用就可以,能够实现一些常用的转换操作,比如之前我们用到的 COUNT()、CHAR_LENGTH()、UPPER()等等;而另一类函数则是用户自定义的函数(UDF),需要在表环境中注册才能使用。接下来我们就对这两类函数分别进行介绍。
1. 系统函数
系统函数(System Functions)也叫内置函数(Built-in Functions),是在系统中预先实现好的功能模块。我们可以通过固定的函数名直接调用,实现想要的转换操作。Flink SQL 提供了大量的系统函数,几乎支持所有的标准 SQL 中的操作,这为我们使用 SQL 编写流处理程序提供了极大的方便。
Flink SQL 中的系统函数又主要可以分为两大类:标量函数(
Scalar Functions
)和聚合函数(Aggregate Functions
)。
1.1 标量函数(Scalar Functions)
所谓的“标量”,是指只有数值大小、没有方向的量;所以标量函数指的就是只对输入数据做转换操作、返回一个值的函数。这里的输入数据对应在表中,一般就是一行数据中 1 个或多个字段,因此这种操作有点像流处理转换算子中的 map。另外,对于一些没有输入参数、直接可以得到唯一结果的函数,也属于标量函数。
标量函数是最常见、也最简单的一类系统函数,数量非常庞大,很多在标准 SQL 中也有定义。所以我们这里只对一些常见类型列举部分函数,做一个简单概述,具体应用可以查看官网的完整函数列表。
-
比较函数(Comparison Functions)
比较函数其实就是一个比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。这个比较表达式可以是用、
>
、=
等符号连接两个值,也可以是用关键字定义的某种判断。例如:
(1)value1 = value2
判断两个值相等;
(2)value1 value2
判断两个值不相等
(3)value IS NOT NULL
判断value 不为空 -
逻辑函数(Logical Functions)
逻辑函数就是一个逻辑表达式,也就是用与(AND)、或(OR)、非(NOT)将布尔类型的值连接起来,也可以用判断语句(IS、IS NOT)进行真值判断;返回的还是一个布尔类型的值。例如:
(1)boolean1 OR boolean2
布尔值boolean1 与布尔值 boolean2 取逻辑或
(2)boolean IS FALSE
判断布尔值 boolean 是否为 false
(3)NOT boolean
布尔值 boolean 取逻辑非 -
算术函数(Arithmetic Functions)
进行算术计算的函数,包括用算术符号连接的运算,和复杂的数学运算。例如:
(1)numeric1 + numeric2
两数相加
(2)POWER(numeric1, numeric2)
幂运算,取数numeric1 的 numeric2 次方
(3)RAND()
返回(0.0, 1.0)区间内的一个double 类型的伪随机数 -
字符串函数(String Functions) 进行字符串处理的函数。例如:
(1) string1 || string2 两个字符串的连接
(2) UPPER(string) 将字符串 string 转为全部大写
(3) CHAR_LENGTH(string) 计算字符串 string 的长度 -
时间函数(Temporal Functions) 进行与时间相关操作的函数。例如:
(1) DATE string 按格式”yyyy-MM-dd”解析字符串 string,返回类型为 SQL Date
(2) TIMESTAMP string 按格式”yyyy-MM-dd HH:mm:ss[.SSS]”解析,返回类型为 SQL timestamp
(3) CURRENT_TIME 返回本地时区的当前时间,类型为 SQL time(与 LOCALTIME
等价)
(4) INTERVAL string range 返回一个时间间隔。string 表示数值;range 可以是DAY, MINUTE,DAT TO HOUR 等单位,也可以是YEAR TO MONTH 这样的复合单位。如“2 年10 个月”可以写成:INTERVAL ‘2-10’ YEAR TO MONTH
1.2 聚合函数(Aggregate Functions)
聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作为结果返回。聚合函数应用非常广泛,不论分组聚合、窗口聚合还是开窗(Over)聚合,对数据的聚合操作都可以用相同的函数来定义。
标准 SQL 中常见的聚合函数 Flink SQL 都是支持的,目前也在不断扩展,为流处理应用提供更强大的功能。例如:
-
COUNT(*)
返回所有行的数量,统计个数 -
SUM([ ALL | DISTINCT ] expression)
对某个字段进行求和操作。默认情况下省略了关键字 ALL,表示对所有行求和;如果指定 DISTINCT,则会对数据进行去重,每个值只叠加一次。 -
RANK()
返回当前值在一组值中的排名 -
ROW_NUMBER()
对一组值排序后,返回当前值的行号。与RANK()的功能相似
2. 自定义函数(UDF)
Flink 的Table API 和SQL 提供了多种自定义函数的接口,以抽象类的形式定义。当前 UDF主要有以下几类:
- 标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值;
- 表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是扩展成一个表;
- 聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标量值;
- 表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一个或多个新的行数据。
2.1 整体调用流程
要想在代码中使用自定义的函数,我们需要首先自定义对应UDF 抽象类的实现,并在表环境中注册这个函数,然后就可以在 Table API 和SQL 中调用了。
(1) 注册函数
注册函数时需要调用表环境的 createTemporarySystemFunction()
方法,传入注册的函数名以及UDF 类的Class 对象:
// 注册函数
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
我们自定义的 UDF 类叫作 MyFunction,它应该是上面四种 UDF 抽象类中某一个的具体实现;在环境中将它注册为名叫 MyFunction 的函数。
这里 createTemporarySystemFunction()方法的意思是创建了一个“临时系统函数”,所以 MyFunction 函数名是全局的,可以当作系统函数来使用 ;我们也可以用createTemporaryFunction()
方法,注册的函数就依赖于当前的数据库(database)和目录(catalog)了,所以这就不是系统函数,而是“目录函数”(catalog function),它的完整名称应该包括所属的 database 和 catalog。
一般情况下,我们直接用 createTemporarySystemFunction()
方法将 UDF 注册为系统函数就可以了。
(2)使用Table API 调用函数
在 Table API 中,需要使用 call()方法来调用自定义函数:
tableEnv.from("MyTable").select(call("MyFunction", $("myField")));
这里 call()方法有两个参数,一个是注册好的函数名 MyFunction,另一个则是函数调用时本身的参数。这里我们定义 MyFunction 在调用时,需要传入的参数是 myField 字段。
此外,在Table API 中也可以不注册函数,直接用“内联”(inline)的方式调用 UDF:
tableEnv.from("MyTable").select(call(SubstringFunction.class, $("myField")));
区别只是在于 call()方法第一个参数不再是注册好的函数名,而直接就是函数类的 Class对象了。
(3)在 SQL 中调用函数
当我们将函数注册为系统函数之后,在 SQL 中的调用就与内置系统函数完全一样了:
tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");
可见,SQL 的调用方式更加方便,我们后续依然会以 SQL 为例介绍 UDF 的用法。接下来我们就对不同类型的 UDF 进行展开介绍。
2.2 标量函数(Scalar Functions)
自定义标量函数可以把 0 个、 1 个或多个标量值转换成一个标量值,它对应的输入是一行数据中的字段,输出则是唯一的值。所以从输入和输出表中行数据的对应关系看,标量函数是“一对一”的转换。
想要实现自定义的标量函数,我们需要自定义一个类来继承抽象类 ScalarFunction,并实现叫作eval()
的求值方法。标量函数的行为就取决于求值方法的定义,它必须是公有的(public),而且名字必须是 eval。求值方法 eval 可以重载多次,任何数据类型都可作为求值方法的参数和返回值类型。
这里需要特别说明的是,ScalarFunction 抽象类中并没有定义 eval()方法,所以我们不能直接在代码中重写(override);但 Table API 的框架底层又要求了求值方法必须名字为 eval()。这是 Table API 和 SQL 目前还显得不够完善的地方,未来的版本应该会有所改进。
ScalarFunction 以及其它所有的UDF 接口,都在 org.apache.flink.table.functions
中。
下面我们来看一个具体的例子。我们实现一个自定义的哈希(hash)函数 HashFunction, 用来求传入对象的哈希值。
public class UdfTest_ScalarFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1. 自定义数据源,从流转换
SingleOutputStreamOperatorEvent> eventStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssignerEvent>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
// 2. 将流转换成表
Table eventTable = tableEnv.fromDataStream(eventStream,
$("user"),
$("url"),
$("timestamp").rowtime().as("ts"));
tableEnv.createTemporaryView("EventTable", eventTable);
// 3. 注册自定义标量函数
tableEnv.createTemporarySystemFunction("MyHash", MyHash.class);
// 4. 调用UDF查询转换
Table resultTable = tableEnv.sqlQuery("select user, MyHash(user) from EventTable");
// 5. 输出到控制台
tableEnv.executeSql("create table output (" +
"uname STRING, " +
"myhash INT ) " +
"WITH (" +
"'connector' = 'print')");
resultTable.executeInsert("output");
}
// 自定义一个ScalarFunction
public static class MyHash extends ScalarFunction {
public int eval(String str){
return str.hashCode();
}
}
}
eval
的参数可以是任意类型。
2.3 表函数(Table Functions)
跟标量函数一样,表函数的输入参数也可以是 0 个、1 个或多个标量值;不同的是,它可以返回任意多行数据。“多行数据”事实上就构成了一个表,所以“表函数”可以认为就是返回一个表的函数,这是一个“一对多”的转换关系。之前我们介绍过的窗口 TVF,本质上就是表函数。
类似地,要实现自定义的表函数,需要自定义类来继承抽象类 TableFunction,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction 类本身是有一个泛型参数T 的,这就是表函数返回数据的类型;而eval()方法没有返回类型,内部也没有return 语句,是通过调用 collect()方法来发送想要输出的行数据的。多么熟悉的感觉——回忆一下DataStream API 中的 FlatMapFunction 和 ProcessFunction,它们的 flatMap 和 processElement 方法也没有返回值,也是通过 out.collect()
来向下游发送数据的。
我们使用表函数,可以对一行数据得到一个表,这和 Hive 中的 UDTF 非常相似。那对于原先输入的整张表来说,又该得到什么呢?一个简单的想法是,就让输入表中的每一行,与它转换得到的表进行联结(join),然后再拼成一个完整的大表,这就相当于对原来的表进行了扩展。在Hive 的 SQL 语法中,提供了“侧向视图”(lateral view,也叫横向视图)的功能,可以将表中的一行数据拆分成多行;Flink SQL 也有类似的功能,是用 LATERAL TABLE 语法来实现的。
在 SQL 中调用表函数,需要使用 LATERAL TABLE(
来生成扩展的“侧向表”,然后与原始表进行联结(Join)。这里的 Join 操作可以是直接做交叉联结(cross join
),在 FROM 后用逗号分隔两个表就可以;也可以是以 ON TRUE
为条件的左联结(LEFT JOIN
)。
下面是表函数的一个具体示例。我们实现了一个分隔字符串的函数 SplitFunction,可以将一个字符串转换成(字符串,长度)的二元组。
public class UdfTest_TableFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1. 自定义数据源,从流转换
SingleOutputStreamOperatorEvent> eventStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssignerEvent>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
// 2. 将流转换成表
Table eventTable = tableEnv.fromDataStream(eventStream,
$("user"),
$("url"),
$("timestamp").rowtime().as("ts"));
tableEnv.createTemporaryView("EventTable", eventTable);
// 3. 注册自定义表函数
tableEnv.createTemporarySystemFunction("MySplit", MySplit.class);
// 4. 调用UDF查询转换
Table resultTable = tableEnv.sqlQuery("select user, url, word, length " +
"from EventTable, LATERAL TABLE( MySplit(url) ) AS T(word, length)");
// 5. 输出到控制台
tableEnv.executeSql("create table output (" +
"uname STRING, " +
"url STRING, " +
"word STRING, " +
"length INT) " +
"WITH (" +
"'connector' = 'print')");
resultTable.executeInsert("output");
}
// 自定义一个TableFunction,注意有泛型,这里输出的是两个字段,二元组
// 注意这里的类型标注,输出是 Row 类型,Row 中包含两个字段:word 和 length。
@FunctionHint(output = @DataTypeHint("ROW" ))
public static class MySplit extends TableFunctionTuple2String, Integer>>{
public void eval(String str){
String[] fields = str.split("\?"); // 转义问号,以及反斜杠本身
for (String field : fields){
collect(Tuple2.of(field, field.length()));
}
}
}
}
这里我们直接将表函数的输出类型定义成了 ROW,这就是得到的侧向表中的数据类型; 每行数据转换后也只有一行。我们分别用交叉联结和左联结两种方式在 SQL 中进行了调用, 还可以对侧向表的中字段进行重命名。
2.4 聚合函数(Aggregate Functions)
用户自定义聚合函数(User Defined AGGregate function,UDAGG)会把一行或多行数据
(也就是一个表)聚合成一个标量值。这是一个标准的“多对一”的转换。
聚合函数的概念我们之前已经接触过多次,如 SUM()、MAX()、MIN()、AVG()、COUNT() 都是常见的系统内置聚合函数。而如果有些需求无法直接调用系统函数解决,我们就必须自定义聚合函数来实现功能了。
自定义聚合函数需要继承抽象类 AggregateFunction。AggregateFunction 有两个泛型参数
,T
表示聚合输出的结果类型,ACC
则表示聚合的中间状态类型。
Flink SQL 中的聚合函数的工作原理如下:
(1) 首先,它需要创建一个累加器(accumulator),用来存储聚合的中间结果。这与 DataStream API 中的 AggregateFunction 非常类似,累加器就可以看作是一个聚合状态。调用createAccumulator()
方法可以创建一个空的累加器。
(2) 对于输入的每一行数据,都会调用 accumulate()方法来更新累加器,这是聚合的核心过程。
(3) 当所有的数据都处理完之后,通过调用 getValue()方法来计算并返回最终的结果。
所以,每个 AggregateFunction 都必须实现以下几个方法:
-
createAccumulator()
这是创建累加器的方法。没有输入参数,返回类型为累加器类型ACC。 -
accumulate()
这是进行聚合计算的核心方法,每来一行数据都会调用。它的第一个参数是确定的,就是当前的累加器,类型为 ACC,表示当前聚合的中间状态;后面的参数则是聚合函数调用时传入的参数,可以有多个,类型也可以不同。这个方法主要是更新聚合状态,所以没有返回类型。需要注意的是,accumulate()与之前的求值方法 eval()类似,也是底层架构要求的,必须为 public,方法名必须为 accumulate,且无法直接 override、只能手动实现。 -
getValue()
这是得到最终返回结果的方法。输入参数是ACC 类型的累加器,输出类型为T。
在遇到复杂类型时,Flink 的类型推导可能会无法得到正确的结果。所以AggregateFunction也可以专门对累加器和返回结果的类型进行声明, 这是通过 getAccumulatorType() 和getResultType()两个方法来指定的。
除了上面的方法,还有几个方法是可选的。这些方法有些可以让查询更加高效,有些是在某些特定场景下必须要实现的。比如,如果是对会话窗口进行聚合,merge()方法就是必须要实现的,它会定义累加器的合并操作,而且这个方法对一些场景的优化也很有用;而如果聚合函数用在 OVER 窗口聚合中,就必须实现 retract()方法,保证数据可以进行撤回操作; resetAccumulator()方法则是重置累加器,这在一些批处理场景中会比较有用。
AggregateFunction 的所有方法都必须是 公有的(public),不能是静态的(static),而且名字必须跟上面写的完全一样。 createAccumulator 、 getValue 、 getResultType 以及getAccumulatorType 这几个方法是在抽象类 AggregateFunction 中定义的,可以 override;而其他则都是底层架构约定的方法。
下面举一个具体的示例。在常用的系统内置聚合函数里,可以用 AVG()来计算平均值;如果我们现在希望计算的是某个字段的“加权平均值”,又该怎么做呢?系统函数里没有现成的实现,所以只能自定义一个聚合函数 WeightedAvg 来计算了。
比如我们要从学生的分数表 ScoreTable 中计算每个学生的加权平均分。为了计算加权平均值,应该从输入的每行数据中提取两个值作为参数:要计算的分数值 score,以及它的权重weight。而在聚合过程中,累加器(accumulator)需要存储当前的加权总和 sum,以及目前数据的个数 count。这可以用一个二元组来表示,也可以单独定义一个类 WeightedAvgAccum, 里面包含 sum 和 count 两个属性,用它的对象实例来作为聚合的累加器。
具体代码如下:
public class UdfTest_AggregateFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1. 自定义数据源,从流转换
SingleOutputStreamOperatorEvent> eventStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssignerEvent>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
// 2. 将流转换成表
Table eventTable = tableEnv.fromDataStream(eventStream,
$("user"),
$("url"),
$("timestamp").as("ts"),
$("rt").rowtime());
tableEnv.createTemporaryView("EventTable", eventTable);
// 3. 注册自定义表函数
tableEnv.createTemporarySystemFunction("WeightedAverage", WeightedAverage.class);
// 4. 调用UDF查询转换,这里权重直接给1
Table resultTable = tableEnv.sqlQuery("select user, " +
" WeightedAverage(ts, 1) as weighted_avg " +
"from EventTable " +
"group by user");
// 5. 输出到控制台
tableEnv.executeSql("create table output (" +
"uname STRING, " +
"weighted_avg BIGINT) " +
"WITH (" +
"'connector' = 'print')");
resultTable.executeInsert("output");
}
// 单独定义一个累加器类型
public static class WeightedAvgAccumulator {
public long sum = 0; // 加权和
public int count = 0; // 数据个数
}
// 自定义一个AggregateFunction,求加权平均值
public static class WeightedAverage extends AggregateFunctionLong, WeightedAvgAccumulator>{
@Override
public Long getValue(WeightedAvgAccumulator accumulator) {
if (accumulator.count == 0)
return null; // 防止除数为0
else
return accumulator.sum / accumulator.count;
}
@Override
public WeightedAvgAccumulator createAccumulator() {
return new WeightedAvgAccumulator();
}
// 累加计算方法,类似于add
public void accumulate(WeightedAvgAccumulator accumulator, Long iValue, Integer iWeight){
accumulator.sum += iValue * iWeight; // 这个值要算iWeight次
accumulator.count += iWeight;
}
}
}
聚合函数的 accumulate()方法有三个输入参数。第一个是 WeightedAvgAccum 类型的累加器;另外两个则是函数调用时输入的字段:要计算的值 ivalue 和 对应的权重 iweight。这里我们并不考虑其它方法的实现,只要有必须的三个方法就可以了。
2.5 表聚合函数(Table Aggregate Functions)
用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多对多”的转换。
自定义表聚合函数需要继承抽象类 TableAggregateFunction。TableAggregateFunction 的结构和原理与AggregateFunction 非常类似,同样有两个泛型参数
,用一个 ACC 类型的累加器( accumulator) 来存储聚合的中间结果。聚合函数中必须实现的三个方法, 在TableAggregateFunction 中也必须对应实现:
-
createAccumulator()
创建累加器的方法,与AggregateFunction 中用法相同。 -
accumulate()
聚合计算的核心方法,与AggregateFunction 中用法相同。 -
emitValue()
所有输入行处理完成后,输出最终计算结果的方法。这个方法对应着 AggregateFunction 中的getValue()
方法;区别在于 emitValue 没有输出类型,而输入参数有两个:第一个是 ACC 类型的累加器,第二个则是用于输出数据的“收集器”out,它的类型为 Collect。所以很明显,表聚合函数输出数据不是直接 return,而是调用 out.collect()方法,调用多次就可以输出多行数据了;这一点与表函数非常相似。另外,emitValue()在抽象类中也没有定义,无法 override,必须手动实现。
表聚合函数得到的是一张表;在流处理中做持续查询,应该每次都会把这个表重新计算输出。如果输入一条数据后,只是对结果表里一行或几行进行了更新(Update),这时我们重新计算整个表、全部输出显然就不够高效了。为了提高处理效率,TableAggregateFunction 还提供了一个 emitUpdateWithRetract()方法,它可以在结果表发生变化时,以“撤回”(retract)老数据、发送新数据的方式增量地进行更新。如果同时定义了 emitValue()和 emitUpdateWithRetract() 两个方法,在进行更新操作时会优先调用 emitUpdateWithRetract()。
表聚合函数相对比较复杂,它的一个典型应用场景就是 Top N 查询。比如我们希望选出一组数据排序后的前两名,这就是最简单的 TOP-2 查询。没有线程的系统函数,那么我们就可以自定义一个表聚合函数来实现这个功能。在累加器中应该能够保存当前最大的两个值,每当来一条新数据就在 accumulate() 方法中进行比较更新,最终在 emitValue() 中调用两次out.collect()
将前两名数据输出。
public class UdfTest_TableAggregatteFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1. 自定义数据源,从流转换
SingleOutputStreamOperatorEvent> eventStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssignerEvent>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
// 2. 将流转换成表
Table eventTable = tableEnv.fromDataStream(eventStream,
$("user"),
$("url"),
$("timestamp").as("ts"),
$("rt").rowtime());
tableEnv.createTemporaryView("EventTable", eventTable);
// 3. 开滚动窗口聚合,得到每个用户在每个窗口中的浏览量
Table windowAggTable = tableEnv.sqlQuery("select user, count(url) as cnt, " +
"window_end " +
"from TABLE(" +
" TUMBLE( TABLE EventTable, DESCRIPTOR(rt), INTERVAL '10' SECOND )" +
")" +
"group by user," +
" window_start," +
" window_end");
tableEnv.createTemporaryView("AggTable", windowAggTable);
// 4. 注册表聚合函数函数
tableEnv.createTemporarySystemFunction("Top2", Top2.class);
// 5. 在Table API中调用函数
Table resultTable = tableEnv.from("AggTable")
.groupBy($("window_end"))
.flatAggregate(call("Top2", $("cnt")).as("value", "rank"))
.select($("window_end"), $("value"), $("rank"));
// 6. 输出到控制台
tableEnv.toChangelogStream(resultTable).print();
env.execute();
}
// 聚合累加器的类型定义,包含最大的第一和第二两个数据
public static class Top2Accumulator {
public Long first;
public Long second;
}
// 自定义表聚合函数,查询一组数中最大的两个,返回值为(数值,排名)的二元组
public static class Top2 extends TableAggregateFunctionTuple2Long, Integer>, Top2Accumulator> {
@Override
public Top2Accumulator createAccumulator() {
Top2Accumulator acc = new Top2Accumulator();
acc.first = Long.MIN_VALUE; // 为方便比较,初始值给最小值
acc.second = Long.MIN_VALUE;
return acc;
}
// 每来一个数据调用一次,判断是否更新累加器
public void accumulate(Top2Accumulator acc, Long value) {
if (value > acc.first) {
acc.second = acc.first;
acc.first = value;
} else if (value > acc.second) {
acc.second = value;
}
}
// 输出(数值,排名)的二元组,输出两行数据
public void emitValue(Top2Accumulator acc, CollectorTuple2Long, Integer>> out) {
if (acc.first != Long.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Long.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
}
这里使用了flatAggregate()方法,它就是专门用来调用表聚合函数的接口。对 MyTable 中数据按 myField 字段进行分组聚合,统计 value 值最大的两个;并将聚合结果的两个字段重命名为 value 和 rank,之后就可以使用 select()将它们提取出来了。
二、SQL 客户端
具体使用流程如下:
(1)首先启动本地集群
./bin/start-cluster.sh
(2)启动 Flink SQL 客户端
./bin/sql-client.sh
SQL 客户端的启动脚本同样位于 Flink 的 bin 目录下。默认的启动模式是 embedded,也就
是说客户端是一个嵌入在本地的进程,这是目前唯一支持的模式。未来会支持连接到远程 SQL
客户端的模式。
(3)设置运行模式
启动客户端后,就进入了命令行界面,这时就可以开始写 SQL 了。一般我们会在开始之前对环境做一些设置,比较重要的就是运行模式。
首先是表环境的运行时模式,有流处理和批处理两个选项。默认为流处理
Flink SQL> SET 'execution.runtime-mode' = 'streaming';
其次是 SQL 客户端的“执行结果模式”,主要有 table、 changelog、 tableau 三种,默认为table 模式:
Flink SQL> SET 'sql-client.execution.result-mode' = 'table';
table 模式就是最普通的表处理模式,结果会以逗号分隔每个字段;changelog 则是更新日志模式,会在数据前加上“+”(表示插入)或“-”(表示撤回)的前缀;而 tableau 则是经典的可视化表模式,结果会是一个虚线框的表格。
此外我们还可以做一些其它可选的设置,比如之前提到的空闲状态生存时间(TTL):
Flink SQL> SET 'table.exec.state.ttl' = '1000';
除了在命令行进行设置,我们也可以直接在 SQL 客户端的配置文件 sql-cli-defaults.yaml 中进行各种配置,甚至还可以在这个 yaml 文件里预定义表、函数和 catalog。关于配置文件的更多用法,大家可以查阅官网的详细说明。
(4) 执行 SQL 查询
接下来就可以愉快的编写 SQL 语句了,这跟操作 MySQL、Oracle 等关系型数据库没什么区别。
我们可以尝试把一开始举的简单聚合例子写一下:
Flink SQL> CREATE TABLE EventTable(
> user STRING,
> url STRING,
> </span>timestamp<span class="token punctuation">
BIGINT
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 'events.csv',
> 'format' = 'csv'
> );
Flink SQL> CREATE TABLE ResultTable (
> user STRING,361
> cnt BIGINT
> ) WITH (
> 'connector' = 'print'
> );
Flink SQL> INSERT INTO ResultTable SELECT user, COUNT(url) as cnt FROM EventTable
GROUP BY user;
三、连接到外部系统
此处不做详细描述,可查看参考资料
1. Kafka
Kafka 的 SQL 连接器可以从 Kafka 的主题(topic)读取数据转换成表,也可以将表数据写入Kafka 的主题。换句话说,创建表的时候指定连接器为Kafka,则这个表既可以作为输入表,也可以作为输出表。
1. 引入依赖
想要在 Flink 程序中使用 Kafka 连接器,需要引入如下依赖:
dependency>
groupId>org.apache.flinkgroupId>
artifactId>flink-connector-kafka_${scala.binary.version}artifactId>
version>${flink.version}version>
dependency>
这里我们引入的 Flink 和 Kafka 的连接器,与之前DataStream API 中引入的连接器是一样的。如果想在 SQL 客户端里使用 Kafka 连接器,还需要下载对应的 jar 包放到 lib 目录下。
另外,Flink 为各种连接器提供了一系列的“表格式”(table formats),比如 CSV、JSON、 Avro、Parquet 等等。这些表格式定义了底层存储的二进制数据和表的列之间的转换方式,相当于表的序列化工具。对于 Kafka 而言,CSV、JSON、Avro 等主要格式都是支持的,
根据Kafka 连接器中配置的格式,我们可能需要引入对应的依赖支持。以CSV 为例:
dependency>
groupId>org.apache.flinkgroupId>
artifactId>flink-csvartifactId>
version>${flink.version}version>
dependency>
由于 SQL 客户端中已经内置了 CSV、JSON 的支持,因此使用时无需专门引入;而对于没有内置支持的格式(比如 Avro),则仍然要下载相应的 jar 包。关于连接器的格式细节详见官网说明,我们后面就不再讨论了。
2. 创建连接到 Kafka 的表
创建一个连接到Kafka 表,需要在CREATE TABLE 的 DDL 中在 WITH 子句里指定连接器为Kafka,并定义必要的配置参数。
具体例子:
CREATE TABLE KafkaTable (
</span>user<span class="token punctuation">
STRING,
</span>url<span class="token punctuation">
STRING,
</span>ts<span class="token punctuation">
TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka', 'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
这里定义了 Kafka 连接器对应的主题(topic),Kafka 服务器,消费者组 ID,消费者起始模式以及表格式。需要特别说明的是,在 KafkaTable 的字段中有一个 ts,它的声明中用到了METADATA FROM,这是表示一个“元数据列”(metadata column),它是由 Kafka 连接器的元数据“timestamp”生成的。这里的 timestamp 其实就是Kafka 中数据自带的时间戳,我们把它直接作为元数据提取出来,转换成一个新的字段 ts。
3. Upsert Kafka
正常情况下,Kafka 作为保持数据顺序的消息队列,读取和写入都应该是流式的数据,对应在表中就是仅追加(append-only)模式。如果我们想要将有更新操作(比如分组聚合)的结果表写入Kafka,就会因为 Kafka 无法识别撤回(retract)或更新插入(upsert)消息而导致异常。
为了解决这个问题,Flink 专门增加了一个“更新插入Kafka”(Upsert Kafka)连接器。这个连接器支持以更新插入(UPSERT)的方式向 Kafka 的 topic 中读写数据。
具体来说,Upsert Kafka 连接器处理的是更新日志(changlog)流。如果作为 TableSource, 连接器会将读取到的topic 中的数据(key, value),解释为对当前key 的数据值的更新(UPDATE),也就是查找动态表中key 对应的一行数据,将 value 更新为最新的值;因为是Upsert 操作,所以如果没有 key 对应的行,那么也会执行插入(INSERT)操作。另外,如果遇到 value 为空
(null),连接器就把这条数据理解为对相应 key 那一行的删除(DELETE)操作。
如果作为 TableSink,Upsert Kafka 连接器会将有更新操作的结果表,转换成更新日志
(changelog)流。如果遇到插入(INSERT)或者更新后(UPDATE_AFTER)的数据,对应的是一个添加(add)消息,那么就直接正常写入 Kafka 主题;如果是删除(DELETE)或者更新前的数据,对应是一个撤回(retract)消息,那么就把 value 为空(null)的数据写入 Kafka。由于 Flink 是根据键(key)的值对数据进行分区的,这样就可以保证同一个 key 上的更新和删除消息都会落到同一个分区中。
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro'
);
CREATE TABLE pageviews (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP,
user_region STRING,
WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',364
'properties.bootstrap.servers' = '...',
'format' = 'json'
);
-- 计算 pv、 uv 并插入到 upsert-kafka 表中
INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*),
COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;
这里我们从 Kafka 表 pageviews 中读取数据,统计每个区域的 PV(全部浏览量)和 UV
(对用户去重),这是一个分组聚合的更新查询,得到的结果表会不停地更新数据。为了将结果表写入Kafka 的 pageviews_per_region 主题,我们定义了一个 Upsert Kafka 表,它的字段中需要用PRIMARY KEY 来指定主键,并且在WITH 子句中分别指定key和value的序列化格式。
2. 文件系统
另一类非常常见的外部系统就是文件系统(File System)了。Flink 提供了文件系统的连接器,支持从本地或者分布式的文件系统中读写数据。这个连接器是内置在 Flink 中的,所以使用它并不需要额外引入依赖。
下面是一个连接到文件系统的示例:
CREATE TABLE MyTable (
column_name1 INT,
column_name2 STRING,
...
part_name1 INT,
part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
'connector' = 'filesystem', -- 连接器类型
'path' = '...', -- 文件路径
'format' = '...' -- 文件格式
)
这里在 WITH 前使用了 PARTITIONED BY 对数据进行了分区操作。文件系统连接器支持对分区文件的访问。
其余还有JDBC ES Hive Hbase等,详情请看参考资料
参考资料
Word版:https://download.csdn.net/download/mengxianglong123/85035166
PDF版:https://download.csdn.net/download/mengxianglong123/85035172
原文链接:Flink Table API和SQL(下)
本文转载自落花雨时,原文链接:https://blog.csdn.net/mengxianglong123/article/details/124078287。