hudi支持HMS catalog啦!
功能亮点:当flink和spark同时接入hive metastore时,用hive metastore对hudi的元数据进行管理,无论是使用flink还是spark引擎建表,另外一种引擎或者hive都可以直接查询。
由于该功能为社区同学开发,还没合并到master分支,具体可以参考下述PR:
https://github.com/apache/hudi/pull/6013
本文以HDP集群为例,其他版本分别为:
flink 1.13.6
spark 3.2.1
在HDP集群中,hive的配置文件路径为/etc/hive/conf,所以在flink sql client中使用hive的配置文件来创建hudi-hive catalog从而将hudi元数据存储于hive metastore中。
在flink中写入数据
在flink sql client中进行如下操作:
create catalog hudi with(
'type' = 'hudi',
'mode' = 'hms',
'hive.conf.dir'='/etc/hive/conf'
);
--- 创建数据库供hudi使用
create database hudi.hudidb;
Flink sql client中建表
--- order表
CREATE TABLE hudi.hudidb.orders_hudi(
uuid INT,
ts INT,
num INT,
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ'
);
--- product表
CREATE TABLE hudi.hudidb.product_hudi(
uuid INT,
ts INT,
name STRING,
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ'
);
--- 宽表
CREATE TABLE hudi.hudidb.orders_product_hudi(
uuid INT,
ts INT,
name STRING,
num INT,
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ'
);
使用Flink SQL进行数据写入:
insert into hudi.hudidb.orders_hudi values
(1, 1, 2),
(2, 2, 3),
(3, 3, 4);
insert into hudi.hudidb.product_hudi values
(1, 1, 'tony'),
(2, 2, 'mike'),
(3, 3, 'funny');
insert into hudi.hudidb.orders_product_hudi
select
hudi.hudidb.orders_hudi.uuid as uuid,
hudi.hudidb.orders_hudi.ts as ts,
hudi.hudidb.product_hudi.name as name,
hudi.hudidb.orders_hudi.num as num
from hudi.hudidb.orders_hudi
inner join hudi.hudidb.product_hudi on hudi.hudidb.orders_hudi.uuid = hudi.hudidb.product_hudi.uuid;
在Flink SQL中查看数据
select * from hudi.hudidb.orders_hudi;
得到:
select * from hudi.hudidb.product_hudi;
得到:
select * from hudi.hudidb.orders_product_hudi;
得到:
在Spark中查看数据
hive为了连接集群hive metastore,只需要将hive的配置文件hive-site.xml放置到spark的配置文件目录即可。
通过beeline连接spark thriftserver,查看数据库:
show databases;
得到:
可以看到刚刚在flink中创建的hudidb数据库。现在查看里面的表:
use hudidb;
show tables;
由于在将数据写入hudi时,默认会新增_hoodie_commit_time、 _hoodie_record_key等字段用于内部使用,如果使用select * 表进行查询时会查出上述字段。
以查看orders_hudi表数据为例:
select * from orders_hudi;
得到
所以为了正确查询,需要指定字段:
select uuid, ts, num from orders_hudi;
得到
其他表同理。
在hive中查看数据
为了在hive引擎中查看,对于MERGE_ON_READ表,至少需要执行过一次压缩,也就是把avro文件压缩为parquet文件,才能够正常查看数据。由于上述操作为批量操作,默认是不会触发压缩操作的,所以需要手动触发压缩。
所以对product_hudi表进行手动压缩
./bin/flink run -c \
org.apache.hudi.sink.compact.HoodieFlinkCompactor \
lib/hudi-flink1.13-bundle_2.12-0.12.0-SNAPSHOT.jar \
--path hdfs://host146:8020/warehouse/tablespace/managed/hive/hudidb.db/product_hudi \
--schedule
注意:如果是使用flink将数据实时流式写入hudi的话,默认在写入五次时会自动触发压缩,不需要手动执行。
为了在hive中查看hudi所有数据,需要设置如下参数:
set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
如果设置该参数出现如下报错,
Error: Error while processing statement: Cannot modify hive.input.format at runtime. It is not in list of params that are allowed to be modified at runtime (state=42000,code=1)
那么通过如下方式修改hive-site.xml配置,新增如下配置,然后重启hive即可。
hive.security.authorization.sqlstd.confwhitelist.append=hive.input.format
进入hive客户端
设置变量:
set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
查看数据库
show databases;
得到:
查看hudidb库中product_hudi表数据
select * from hudidb.product_hudi;
得到
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/7294/
[…] 0.12.0版本,flink和spark都可以基于hive metastore进行元数据管理,具体可参考:hudi HMS Catalog指南。基于hudi hms […]
[…] metastore进行元数据管理,更多信息可参考:hudi HMS Catalog指南。也就是说基于hudi hms […]
[…] metastore进行元数据管理,更多信息可参考:hudi HMS Catalog指南。也就是说基于hudi hms […]
[…] metastore进行元数据管理,更多信息可参考:hudi HMS Catalog指南。也就是说基于hudi hms […]
[…] metastore进行元数据管理,更多信息可参考:hudi HMS Catalog指南。也就是说基于hudi hms […]
[…] metastore进行元数据管理,更多信息可参考:hudi HMS Catalog指南。也就是说基于hudi hms […]