测试环境
组件版本
首先请确保以下组件正常启动:
-
mysql
-
hivemetastore
-
hiveserver2
-
hdfs
-
yarn
hudi适配hive 3.1.2源码编译
0.9.0版本的hudi在适配hive3时,其hudi/package/hudi-flink-bundle/pom.xml文件使用的flink-connector-hive版本有问题,所以需要修改pom文件。
修改点一:
143行,修改为:
<include>org.apache.flink:flink-sql-connector-hive-${hive.version}_${scala.binary.version}</include>
642行,修改为:
<artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version}</artifactId>
编译命令:
mvn clean install -DskipTests -Pflink-bundle-shade-hive3 -Dhadoop.version=3.2.0 -Dhive.version=3.1.2 -Pinclude-flink-sql-connector-hive -U -Dscala.version=2.12.10 -Dscala.binary.version=2.12
将编译后得到的hudi/package/hudi-flink-bundle/target/hudi-flink-bundle_2.12-0.9.0.jar拷贝到flink/lib目录下,将得到的hudi/package/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.9.0.jar拷贝到hive/auxlib目录下,如果没有这个目录则新建一个即可。
关于flink操作hudi的相关方法如果有疑惑的可先看本系列的其他文章,例如使用flink插入数据到hudi数据湖初探, Flink SQL Client实战CDC数据入湖等。
生成测试数据
使用datafaker生成100000条数据,放到mysql数据库中的stu4表。
datafaker工具使用方法见datafaker — 测试数据生成工具
首先在mysql中新建表test.stu4
create database test;
use test;
create table stu4 (
id int unsigned auto_increment primary key COMMENT '自增id',
name varchar(20) not null comment '学生名字',
school varchar(20) not null comment '学校名字',
nickname varchar(20) not null comment '学生小名',
age int not null comment '学生年龄',
score decimal(4,2) not null comment '成绩',
class_num int not null comment '班级人数',
phone bigint not null comment '电话号码',
email varchar(64) comment '家庭网络邮箱',
ip varchar(32) comment 'IP地址'
) engine=InnoDB default charset=utf8;
新建meta.txt文件,文件内容为:
id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
生成10000条数据并写入到mysql中的test.stu3表
datafaker rdb mysql+mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu4 100000 --meta meta.txt
datafaker工具有详细使用方法,请参考。
导入mysql数据
使用flink sql client进行如下操作
构建源表
create table stu4(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop:3306/test?serverTimezone=GMT%2B8',
'username' = 'root',
'password' = 'Pass-123-root',
'table-name' = 'stu4'
);
构建目标表
create table stu4_tmp_1(
id bigint not null,
name string,
`school` string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:9000/tmp/stu4_tmp_1',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'id',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://hadoop:9083',
'hive_sync.jdbc_url' = 'jdbc:hive2://hadoop:10000',
'hive_sync.table' = 'stu_tmp_1',
'hive_sync.db' = 'test',
'hive_sync.username' = 'hive',
'hive_sync.password' = 'hive'
);
插入数据
insert into stu4_tmp_1 select * from stu4;
hive数据查询
使用hive命令进入hive cli
执行如下命令查询数据
select * from test.stu_tmp_1 limit 10;
结果:
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/2052/
[…] 温馨提示要完成如下任务,请确保已经使用其他方法将hudi数据同步到hive中。如果没有同步hive数据,可参考文章:使用flink SQL Client将mysql数据写入到hudi并同步到hive。并且,以下内容中的presto查询,即是基于上述参考文章所同步的hive表进行查询的,建议可先阅读上述参考文章。以下presto安装以单节点为例。 […]
问个问题哈 flink cdc 可以直接同步msyql binlog数据到hive吗
可以的
请教您一个问题,我在用flinkcdc写入数据到hudi时,同步到hive这步会报异常:ClassNotFoundException: org.apache.hudi.org.apache.hadoop.hive.ql.metadata.Hive。我是在idea下测试的,使用的是flink-sql-connector-hive包,但这个包是没有被shade的,也就是.org.apache.hadoop.hive.ql.metadata.Hive。请问,这个如何解决?