hudi表流式regular inner join关联写入宽表实践

基本环境

  • mysql 5.7
  • hadoop 3.2.2
  • flink 1.14.4
  • hudi 0.11.0
  • flink-cdc-mysql 2.2

操作步骤

  1. 使用flink cdc将mysql中两个表的数据同步到hudi表
  2. 增量读取hudi表,增量关联两个表中的数据
  3. 将关联后的数据写入宽表中

具体实施

mysql中建表

create database hudi_test;
use hudi_test;

create table orders(id int primary key not null, num int);
create table product(id int primary key not null, name varchar(50));

往两个表中插入数据

insert into orders values(1, 2);
insert into product values(1, "phone");

启动flink sql,

flink sql中创建catalog

create catalog hudi with ('type'='hudi','catalog.path'='hdfs://bigdata:9000/user/hive/warehouse');
create database huditest;
use huditest;

在flink sql中建hudi表

--- order表
CREATE TABLE hudi.huditest.orders_hudi(
  id INT,
  num INT,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'compaction.async.enabled' = 'false'
);

--- product表
CREATE TABLE hudi.huditest.product_hudi(
  id INT,
  name STRING,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'compaction.async.enabled' = 'false'
);

--- 宽表
CREATE TABLE hudi.huditest.orders_product_hudi(
  id INT,
  name STRING,
  num INT,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'compaction.async.enabled' = 'false'
);

在flink sql中创建mysql源表

CREATE TABLE orders_mysql (
  id INT,
  num INT,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'database-name' = 'hudi_test',
  'table-name' = 'orders');

CREATE TABLE product_mysql (
  id INT,
  name STRING,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'database-name' = 'hudi_test',
  'table-name' = 'product');

数据写入

insert into hudi.huditest.orders_hudi select * from orders_mysql;

insert into hudi.huditest.product_hudi select * from product_mysql;

flink界面如图所示

hudi表流式regular inner join关联写入宽表实践

两个hudi表关联

使用inner join

insert into hudi.huditest.orders_product_hudi
select
  hudi.huditest.orders_hudi.id as id,
    hudi.huditest.product_hudi.name as name,
  hudi.huditest.orders_hudi.num as num
from hudi.huditest.orders_hudi
inner join hudi.huditest.product_hudi on hudi.huditest.orders_hudi.id = hudi.huditest.product_hudi.id;

数据查询

select * from hudi.huditest.orders_product_hudi;

得到:

hudi表流式regular inner join关联写入宽表实践

再次往orders表中插入数据

insert into orders values(2, 13);

hudi表流式regular inner join关联写入宽表实践

这种情况下会等待product流到来并关联再往下游输出,此时再往product表写数据

insert into product values(2, "door");

hudi表流式regular inner join关联写入宽表实践

查询数据得到

hudi表流式regular inner join关联写入宽表实践

异常流操作

往orders表先后写入两条数据,由于flink checkpoint时间设置为3分钟,该时间为hudi表数据具体落盘的时间,所以为了让orders和product流数据乱序,进行如下操作

  1. orders流插入数据
    insert into orders values(3, 33);
  2. product流插入数据
    insert into product values(4, "flowers");

    此时可以看到关联流的数据已经接受了,但是由于id对不上,并不做关联

hudi表流式regular inner join关联写入宽表实践

  1. 看到join接收到数据之后,再次往product流插入id为3的数据
insert into product values(3, "screen");

再次查询数据

hudi表流式regular inner join关联写入宽表实践
可见乱序数据可正常关联

5 1 投票
文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://lrting.top/backend/bigdata/hudi/hudi-advanced/6479/

(5)
上一篇 2022-06-16 11:28
下一篇 2022-06-16 22:38

相关推荐

订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x