总览
本文使用datafaker工具生成数据发送到MySQL,通过flink cdc工具将mysql binlog数据发送到kafka,最后再从kafka中读取数据并写入到hudi中。
与此同时,在将数据写入到hudi中时,同步进行查询。
组件版本与依赖
-
datafaker 0.6.3
-
mysql 5.7
-
zookeeper 3.6.3
-
kafka 2.8.0
-
hadoop 3.2.0
-
flink 1.12.2
-
hudi 0.9.0
为了完成以下内容,请确保mysql、zookeeper、kafka、hadoop正常安装并启动,并且mysql需要开启binlog。
相关组件安装方法可直接在右上角搜索框根据关键字进行搜索即可。
本文以两台主机作为测试,分别命名为hadoop和hadoop1,主机上安装的组件如下:
hadoop | hadoop1 |
---|---|
组件名称 | 组件名称 |
namenode | zookeeper |
datanode | kafka |
resourcemanager | |
nodemanager | |
mysql | |
flink |
使用datafaker生成测试数据并发送到mysql
- 在数据库中新建stu3表
mysql -u root -p
create database test;
use test;
create table stu3 (
id int unsigned auto_increment primary key COMMENT '自增id',
name varchar(20) not null comment '学生名字',
school varchar(20) not null comment '学校名字',
nickname varchar(20) not null comment '学生小名',
age int not null comment '学生年龄',
class_num int not null comment '班级人数',
phone bigint not null comment '电话号码',
email varchar(64) comment '家庭网络邮箱',
ip varchar(32) comment 'IP地址'
) engine=InnoDB default charset=utf8;
- 新建meta.txt文件,文件内容为:
id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
- 生成10000条数据并写入到mysql中的test.stu3表
datafaker rdb mysql+mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu3 10000 --meta meta.txt
备注:如果要再次生成测试数据,则需要将自增id中的1改为比10000大的数,不然会出现主键冲突情况。
hudi、flink-mysql-cdc、flink-kafka相关jar包
将jar包下载到flink的lib目录下
cd flink-1.12.2/lib
wget https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/hudi/flink-sql-client-cdc-datalake/hudi-flink-bundle_2.12-0.9.0.jar
wget https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/hudi/flink-sql-client-cdc-datalake/flink-connector-kafka_2.12-1.12.2.jar
wget https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/hudi/flink-sql-client-cdc-datalake/flink-sql-connector-mysql-cdc-1.2.0.jar
备注:上述hudi-flink-bundle_2.12-0.9.0.jar已经修复了官方的bug,即不能加载默认配置项问题,建议使用上述提供的jar包。
如果你在启动以及运行flink任务中遇到缺少某些类问题,请下载相关jar包并放置到flink-1.12.2/lib目录下,本实验在操作过程中遇到的缺少的包如下(点击可下载):
在yarn上启动flink session集群
首先确保已经配置好HADOOP_CLASSPATH,对于开源版本hadoop3.2.0,可通过如下方式设置:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HADOOP_HOME/share/hadoop/client/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/tools/*:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/etc/hadoop/*
flink需要开启checkpoint,修改flink-conf.yaml配置文件
execution.checkpointing.interval: 150000ms
state.backend: rocksdb
state.checkpoints.dir: hdfs://hadoop:9000/flink-chk
state.backend.rocksdb.localdir: /tmp/rocksdb
启动flink session集群
cd flink-1.12.2
bin/yarn-session.sh -s 4 -jm 2048 -tm 2048 -nm flink-hudi-test -d
可看到yarn上启动的application
点击右边的ApplicationMaster即可进入到flink管理页面
启动flink sql client
cd flink-1.12.2
bin/sql-client.sh embedded -s yarn-session -j ./lib/hudi-flink-bundle_2.12-0.9.0.jar shell
进入如下flink SQL客户端
flink读取mysql binlog并写入kafka
我们通过flink SQL client构建实时任务将mysql binlog日志实时写入到kafka中:
创建mysql源表
create table stu3_binlog(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string
) with (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'database-name' = 'test',
'table-name' = 'stu3'
);
在with()中的属性都是mysql的连接信息。
创建kafka目标表
create table stu3_binlog_sink_kafka(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
) with (
'connector' = 'kafka'
,'topic' = 'cdc_mysql_stu3_sink'
,'properties.zookeeper.connect' = 'hadoop1:2181'
,'properties.bootstrap.servers' = 'hadoop1:9092'
,'format' = 'debezium-json'
);
创建任务将mysql binlog日志写入kafka
insert into stu3_binlog_sink_kafka
select * from stu3_binlog;
可看到任务提交信息:
flink管理页面上也可以看到相关任务信息:
flink读取kafka数据并写入hudi数据湖
创建kafka源表
create table stu3_binlog_source_kafka(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string
) with (
'connector' = 'kafka',
'topic' = 'cdc_mysql_stu3_sink',
'properties.bootstrap.servers' = 'hadoop1:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'testGroup'
);
创建hudi目标表
create table stu3_binlog_sink_hudi(
id bigint not null,
name string,
`school` string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.option' = 'insert',
'write.precombine.field' = 'id'
);
创建任务将kafka数据写入到hudi中
insert into stu3_binlog_sink_hudi
select * from stu3_binlog_source_kafka;
可以看到任务提交信息:
flink管理页面上也可以看到相关任务信息:
Flink UI查看数据消费情况
统计数据入hudi情况
create table stu3_binlog_hudi_view(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.precombine.field' = 'id'
);
select count(*) from stu3_binlog_hudi_view;
hdfs查看hudi数据
实时查看数据入湖情况
接下来我们使用datafaker再次生成测试数据。
修改meta.txt为
id||int||自增id[:inc(id,10001)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
生成100000条数据
datafaker rdb mysql+mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu3 100000 --meta meta.txt
实时查看数据入湖情况
create table stu3_binlog_hudi_streaming_view(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.precombine.field' = 'id',
'read.streaming.enabled' = 'true'
);
select * from stu3_binlog_hudi_streaming_view;
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/2049/
[…] 对hudi相关指标进行监控,只需要在将数据写入hudi的时候进行监控配置即可,本文以Flink SQL Client实战CDC数据入湖为例,对其指标进行监控。 […]
[…] 赞 (4) xiaozhch5 0 1 生成海报 Flink SQL Client实战CDC数据入湖 上一篇 2021-11-12 20:30 使用presto查询同步到hive的hudi数据 下一篇 […]
你好,按照您上面的sql操作,flinksql查询decimal类型的表积极报错,我是flink1.12+hudi0.9
decimal的问题用官方的0.9的版本好像是有bug的,这个问题在0.10修复了呢,您可以试试看新的版本,或者使用这个补丁包重新打包试试呢:https://github.com/apache/hudi/pull/3519/commits
https://github.com/apache/hudi/pull/3519/commits,这个是0.9.0的补丁包,可以合一下从新编译一下测试呢
https://lrting.top/backend/2062/
这个我是0.10.0编译的是可以查decimal的呢,
所以加补丁包或者升级都是可以的呢