hudi时间旅行查询完整版

本文从头开始讲述使用Flink引擎实现hudi数据湖基于commit_time的查询语义。基本使用可参考前面文章hudi时间旅行查询

基本要求:

  • 有一台机器部署docker用于安装数据生成工具datafaker
  • MySQL数据库开启binlog
  • Flink 1.13.6
  • Flink CDC 2.2.0
  • Hudi 0.11.0

基本流程:

  • 使用datafaker生成测试数据写入MySQL表中
  • 使用Flink CDC工具将MySQL中的数据写到Hudi表
  • 查询Hudi表中的数据
  • 手动修改MySQL中的数据
  • 区间查询hudi中的数据
  • 流式写入数据到hudi表中
  • 再次查询数据

使用datafaker生成测试数据

假定MySQL连接信息为:

  • hostname:192.168.1.2
  • port:3306
  • username:root
  • password:Pass-123-root

MySQL中创建hudi数据库和student表。

create database hudi;
CREATE TABLE `hudi`.`student` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` varchar(20) NOT NULL COMMENT '学生名字',
  `school` varchar(20) NOT NULL COMMENT '学校名字',
  `nickname` varchar(20) NOT NULL COMMENT '学生小名',
  `age` int(11) NOT NULL COMMENT '学生年龄',
  `class_num` int(11) NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` bigint(20) NOT NULL COMMENT '电话号码',
  `email` varchar(64) DEFAULT NULL COMMENT '家庭网络邮箱',
  `ip` varchar(32) DEFAULT NULL COMMENT 'IP地址',
  `address` text COMMENT '家庭地址',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8;

新建测试数据生成工具元数据文件,保存如下数据为meta.txt

id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(zhongda, beida, xidian, huagong)]
nickname||varchar(20)||学生小名[:enum(鬼泣, 高小王子, 歌神, 逗比)]
age||int||学生年龄[:age]
class_num||int||班级人数[:int(10, 100)]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
address||text||家庭地址[:address]

生成10条测试数据并写入MySQL表

(在装有docker的服务器执行,由于自增ID从1开始,第二次执行需要修改自增ID起始值)

docker run -v /root/xzc/datafaker/meta.txt:/opt/meta.txt xiaozhch5/datafaker:v1.0 rdb mysql+mysqldb://root:Pass-123-root@192.168.1.2:3306/hudi?charset=utf8 student 10 --meta /opt/meta.txt

查询student表中的数据

select * from hudi.student limit 10;

hudi时间旅行查询完整版

删除id为1的数据以及更新id为2的数据

delete from hudi.student where id = 1;

update hudi.student set name = "BQmBbNb" where id = 2;

将测试数据同步到hudi表

yarn上启动flink session

export HADOOP_CLASSPATH
bin/yarn-session.sh -s 8 -jm 2048 -tm 2048 -nm flink-hudi-test -d

启动flink sql

bin/sql-client.sh

以hdfs上的/user/hudi/warehouse为目录创建catalog

create catalog hudi with('type'='hudi','catalog.path'='hdfs://bigdata:8020/user/hudi/warehouse');

创建test数据库和student表:

create database hudi.test;

drop table if exists hudi.test.student;

create table hudi.test.student (
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ'
);

Flink SQL中创建stu表读取数据

create table student_mysql (
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
  'connector'='mysql-cdc',
  'hostname'='192.168.1.2',
  'port'='3306',
  'username'='root',
  'password'='Pass-123-root',
  'database-name'='hudi',
  'table-name'='student'
);

将mysql数据同步到hudi表

insert into hudi.test.student select * from student_mysql;

查看hudi的commit_time

hudi每次数据写入时都会生成一个时间戳,用于表示数据写入的时间,基于该特性,在进行数据查询时可使用该时间对hudi中数据进行查询。

查看所有时间戳数据:

create table student_commit_time_view(
  `_hoodie_commit_time` string,
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
with (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'path' = 'hdfs://bigdata:8020/user/hudi/warehouse/test/student'
  )
select distinct _hoodie_commit_time from student_commit_time_view

查到有两个时间戳,分别为:

hudi时间旅行查询完整版

  • 20220622152536101 这个时间点为数据写入操作
  • 20220622152707516 这个时间点为数据删除和更新操作

现在查询时间戳小于20220622152536101的数据,再次创建hudi表

create table student_view_1(
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
with (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'path' = 'hdfs://bigdata:8020/user/hudi/warehouse/test/student',
  'read.start-commit' = 'earliest',
  'read.end-commit' = '20220622152536101'
  );
SET 'sql-client.execution.result-mode' = 'tableau';

select * from student_view_1;

得到如下10条数据

hudi时间旅行查询完整版

现在查询时间戳小于20220622152707516的数据,再次创建hudi表

create table student_view_11(
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
with (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'path' = 'hdfs://bigdata:8020/user/hudi/warehouse/test/student',
  'read.start-commit' = 'earliest',
  'read.end-commit' = '20220622152707516'
  );

查询数据,得到:

select * from student_view_11;

hudi时间旅行查询完整版

重点

但是,这个_hoodie_commit_time是有保存时间限制的,保存时间由clean.retain_commits配置项和checkpoint interval确定。

clean.retain_commits表示数据写入到hudi的次数

checkpoint interval表示每次checkpoint的时间间隔

当_hoodie_commit_time距离现在的时间超过clean.retain_commits * checkpoint interval,那么数据会被清除。

比如说,现在设置checkpoint的时间间隔为3分钟,设置clean.retain_commits的次数为10次,那么通过_hoodie_commit_time最多只能查询到30分钟以前的数据,再之前的数据查不到。

在本次实验中,设置的checkpoint时间间隔为20秒,clean.retain_commits的次数为30次,也就是说,如果有数据持续写入的话,通过时间戳的方式,最多查询到600秒以内的数据。

changelog 模式下,这个参数可以控制 changelog 的保留时间.

再次往hudi表中持续插入100万条数据。我们发现在数据持续插入600秒之后,之前数据查不到了:

select * from student_view_11;

hudi时间旅行查询完整版

select * from student_view_1;

hudi时间旅行查询完整版

现在的时间戳是2022062216460000,我们查询这个时间之前600秒的数据

create table student_view_33(
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
with (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'path' = 'hdfs://bigdata:8020/user/hudi/warehouse/test/student',
  'read.start-commit' = '20220622163600000',
  'read.end-commit' = '20220622164600000'
  );

查询数据总量

select count(*) from student_view_33;

得到:

hudi时间旅行查询完整版

0 0 投票数
文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://lrting.top/backend/bigdata/hudi/hudi-basic/6828/

(0)
上一篇 5天前
下一篇 5天前

相关推荐

订阅评论
提醒
guest
0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x