基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓

本文基于Flink SQL与hudi构建准实时数仓,在Flink从kafka接入数据之后,即将所有数据存于hudi中,包括所有中间处理数据以及最终数据。文章《实时数仓|基于Flink1.11的SQL构建实时数仓探索实践 (qq.com)》描述了基于Flink SQL与kafka构建的实时数仓,本文以上述文章为基础。

在完成本文实践的同时可以同步参考上述文章。

最终结果:

基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓
基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓
基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓
基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓
基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓

背景介绍

本文电商业务为例,展示准实时数仓的数据处理流程。

组件与配置说明

Flink 1.13.3

flink cdc 2.0.2

hudi 0.10.0 (2021.12.08最新发布版本,地址:https://github.com/apache/hudi/releases/tag/release-0.10.0)

hadoop 3.2.0

zeppelin 0.10.0

mysql 5.7(开启binlog)

kafka 2.5.0

由于zeppelin的便捷性,本文全部基于zeppelin进行任务提交,如果您还不会用zeppelin,那么您可以参考:https://lrting-top.blog.csdn.net/article/details/120681666。当然,如果您不想用zeppelin,用Flink SQL Client提交也是完全没有问题的。

本实验Flink开启checkpoint,设置为60s。

在完成以下任务之前,请确保您已经

  • 部署好Flink 1.13.3,并将hudi对应的Jar包已经正确打包并且放置到Flink的lib目录下,将flink cdc对应的jar包放置到lib目录下。
  • 部署并启动zeppelin 0.10.0,在zeppelin的Flink interpreter上指定了FLINK_HOME以及HADOOP_CLASSPATH
  • 同时还有启动hadoop、mysql、kafka

处理流程

基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓

MySQL建表与原始数据载入

首先从以下地址获取mysql建表语句以及模拟数据:

https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/hudi/flink-hudi-realtime/realtime_table.zip

下载了上述建表语句之后,进入mysql,新建realtime_dw_demo_1,进入数据库 realtime_dw_demo_1 ,初始化数据库

mysql -u root -p

create database realtime_dw_demo_1;

use database realtime_dw_demo_1;

source realtime_table.sql

将mysql表数据同步到kafka

使用flink cdc将mysql数据同步到kafka中,以下为相关sql语句:

读取mysql源表数据

%flink.ssql

drop table if exists base_category1;
drop table if exists base_category2;
drop table if exists base_category3;
drop table if exists base_province;
drop table if exists base_region;
drop table if exists base_trademark;
drop table if exists date_info;
drop table if exists holiday_info;
drop table if exists holiday_year;
drop table if exists order_detail;
drop table if exists order_info;
drop table if exists order_status_log;
drop table if exists payment_info;
drop table if exists sku_info;
drop table if exists user_info;

---mysql table

CREATE TABLE `base_category1` (
  `id` bigint NOT NULL,
  `name` string NOT NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category1'
);

CREATE TABLE `base_category2` (
  `id` bigint NOT NULL COMMENT '编号',
  `name` string NOT NULL COMMENT '二级分类名称',
  `category1_id` bigint  NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category2'
);


CREATE TABLE `base_category3` (
  `id` bigint NOT NULL COMMENT '编号',
  `name` string NOT NULL COMMENT '三级分类名称',
  `category2_id` bigint  NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category3'
);

CREATE TABLE `base_province` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_province'
);

CREATE TABLE `base_region` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_region'
);

CREATE TABLE `base_trademark` (
  `tm_id` string  NULL COMMENT '品牌id',
  `tm_name` string  NULL COMMENT '品牌名称',
  PRIMARY KEY (`tm_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_trademark'
);


CREATE TABLE `date_info` (
  `date_id` int NOT NULL,
  `week_id` int  NULL,
  `week_day` int  NULL,
  `day` int  NULL,
  `month` int  NULL,
  `quarter` int  NULL,
  `year` int  NULL,
  `is_workday` int  NULL,
  `holiday_id` int  NULL,
  PRIMARY KEY (`date_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'date_info'
);


CREATE TABLE `holiday_info` (
  `holiday_id` int NOT NULL,
  `holiday_name` string  NULL,
  PRIMARY KEY (`holiday_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'holiday_info'
);

CREATE TABLE `holiday_year` (
  `year_id` int  NULL,
  `holiday_id` int  NULL,
  `start_date_id` int  NULL,
  `end_date_id` int  NULL,
  PRIMARY KEY (`end_date_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'holiday_year'
);

CREATE TABLE `order_detail` (
  `id` bigint NOT NULL COMMENT '编号',
  `order_id` bigint  NULL COMMENT '订单编号',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku名称(冗余)',
  `img_url` string  NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2)  NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` string  NULL COMMENT '购买个数',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_detail'
);


CREATE TABLE `order_info` (
  `id` bigint NOT NULL COMMENT '编号',
  `consignee` string  NULL COMMENT '收货人',
  `consignee_tel` string  NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2)  NULL COMMENT '总金额',
  `order_status` string  NULL COMMENT '订单状态',
  `user_id` bigint  NULL COMMENT '用户id',
  `payment_way` string  NULL COMMENT '付款方式',
  `delivery_address` string  NULL COMMENT '送货地址',
  `order_comment` string  NULL COMMENT '订单备注',
  `out_trade_no` string  NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` string  NULL COMMENT '订单描述(第三方支付用)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  `operate_time` timestamp(3)  NULL COMMENT '操作时间',
  `expire_time` timestamp(3)  NULL COMMENT '失效时间',
  `tracking_no` string  NULL COMMENT '物流单编号',
  `parent_order_id` bigint  NULL COMMENT '父订单编号',
  `img_url` string  NULL COMMENT '图片路径',
  `province_id` int  NULL COMMENT '地区',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_info'
);


CREATE TABLE `order_status_log` (
  `id` int NOT NULL,
  `order_id` int  NULL,
  `order_status` int  NULL,
  `operate_time` timestamp(3)  NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_status_log'
);


CREATE TABLE `payment_info` (
  `id` bigint NOT NULL COMMENT '编号',
  `out_trade_no` string  NULL COMMENT '对外业务编号',
  `order_id` string  NULL COMMENT '订单编号',
  `user_id` string  NULL COMMENT '用户编号',
  `alipay_trade_no` string  NULL COMMENT '支付宝交易流水编号',
  `total_amount` decimal(16,2)  NULL COMMENT '支付金额',
  `subject` string  NULL COMMENT '交易内容',
  `payment_type` string  NULL COMMENT '支付方式',
  `payment_time` string  NULL COMMENT '支付时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'payment_info'
);


CREATE TABLE `sku_info` (
  `id` bigint NOT NULL COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'sku_info'
);

CREATE TABLE `user_info` (
  `id` bigint NOT NULL COMMENT '编号',
  `login_name` string  NULL COMMENT '用户名称',
  `nick_name` string  NULL COMMENT '用户昵称',
  `passwd` string  NULL COMMENT '用户密码',
  `name` string  NULL COMMENT '用户姓名',
  `phone_num` string  NULL COMMENT '手机号',
  `email` string  NULL COMMENT '邮箱',
  `head_img` string  NULL COMMENT '头像',
  `user_level` string  NULL COMMENT '用户级别',
  `birthday` date  NULL COMMENT '用户生日',
  `gender` string  NULL COMMENT '性别 M男,F女',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'user_info'
);

kafka sink表建表语句

%flink.ssql

drop table if exists base_category1_topic;
drop table if exists base_category2_topic;
drop table if exists base_category3_topic;
drop table if exists base_province_topic;
drop table if exists base_region_topic;
drop table if exists base_trademark_topic;
drop table if exists date_info_topic;
drop table if exists holiday_info_topic;
drop table if exists holiday_year_topic;
drop table if exists order_detail_topic;
drop table if exists order_info_topic;
drop table if exists order_status_log_topic;
drop table if exists payment_info_topic;
drop table if exists sku_info_topic;
drop table if exists user_info_topic;

CREATE TABLE `base_category1_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category1'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_category2_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '二级分类名称',
  `category1_id` bigint  NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category2'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `base_category3_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '三级分类名称',
  `category2_id` bigint  NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category3'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_province_topic` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码'
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_province'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_region_topic` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_region'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_trademark_topic` (
  `tm_id` string  NULL COMMENT '品牌id',
  `tm_name` string  NULL COMMENT '品牌名称',
  PRIMARY KEY (`tm_id`) NOT ENFORCED

)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_trademark'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `date_info_topic` (
  `date_id` int NOT NULL,
  `week_id` int  NULL,
  `week_day` int  NULL,
  `day` int  NULL,
  `month` int  NULL,
  `quarter` int  NULL,
  `year` int  NULL,
  `is_workday` int  NULL,
  `holiday_id` int  NULL,
  PRIMARY KEY (`date_id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.date_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `holiday_info_topic` (
  `holiday_id` int NOT NULL,
  `holiday_name` string  NULL,
  PRIMARY KEY (`holiday_id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.holiday_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `holiday_year_topic` (
  `year_id` int  NULL,
  `holiday_id` int  NULL,
  `start_date_id` int  NULL,
  `end_date_id` int  NULL
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.holiday_year'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `order_detail_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `order_id` bigint  NULL COMMENT '订单编号',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku名称(冗余)',
  `img_url` string  NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2)  NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` string  NULL COMMENT '购买个数',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_detail'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `order_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `consignee` string  NULL COMMENT '收货人',
  `consignee_tel` string  NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2)  NULL COMMENT '总金额',
  `order_status` string  NULL COMMENT '订单状态',
  `user_id` bigint  NULL COMMENT '用户id',
  `payment_way` string  NULL COMMENT '付款方式',
  `delivery_address` string  NULL COMMENT '送货地址',
  `order_comment` string  NULL COMMENT '订单备注',
  `out_trade_no` string  NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` string  NULL COMMENT '订单描述(第三方支付用)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  `operate_time` timestamp(3)  NULL COMMENT '操作时间',
  `expire_time` timestamp(3)  NULL COMMENT '失效时间',
  `tracking_no` string  NULL COMMENT '物流单编号',
  `parent_order_id` bigint  NULL COMMENT '父订单编号',
  `img_url` string  NULL COMMENT '图片路径',
  `province_id` int  NULL COMMENT '地区',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `order_status_log_topic` (
  `id` int NOT NULL ,
  `order_id` int  NULL,
  `order_status` int  NULL,
  `operate_time` timestamp(3)  NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_status_log'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `payment_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `out_trade_no` string  NULL COMMENT '对外业务编号',
  `order_id` string  NULL COMMENT '订单编号',
  `user_id` string  NULL COMMENT '用户编号',
  `alipay_trade_no` string  NULL COMMENT '支付宝交易流水编号',
  `total_amount` decimal(16,2)  NULL COMMENT '支付金额',
  `subject` string  NULL COMMENT '交易内容',
  `payment_type` string  NULL COMMENT '支付方式',
  `payment_time` string  NULL COMMENT '支付时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.payment_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `sku_info_topic` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.sku_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `user_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `login_name` string  NULL COMMENT '用户名称',
  `nick_name` string  NULL COMMENT '用户昵称',
  `passwd` string  NULL COMMENT '用户密码',
  `name` string  NULL COMMENT '用户姓名',
  `phone_num` string  NULL COMMENT '手机号',
  `email` string  NULL COMMENT '邮箱',
  `head_img` string  NULL COMMENT '头像',
  `user_level` string  NULL COMMENT '用户级别',
  `birthday` date  NULL COMMENT '用户生日',
  `gender` varchar(1)  NULL COMMENT '性别 M男,F女',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.user_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

insert语句,将mysql binlog数据导入kafka对应的topic

%flink.ssql(runAsOne=true)
insert into base_category1_topic select * from base_category1;
insert into base_category2_topic select * from base_category2;
insert into base_category3_topic select * from base_category3;
insert into base_province_topic select * from base_province;
insert into base_region_topic select * from base_region;
insert into base_trademark_topic select * from base_trademark;
insert into date_info_topic select * from date_info;
insert into holiday_info_topic select * from holiday_info;
insert into holiday_year_topic select * from holiday_year;
insert into order_detail_topic select * from order_detail;
insert into order_info_topic select * from order_info;
insert into order_status_log_topic select * from order_status_log;
insert into payment_info_topic select * from payment_info;
insert into sku_info_topic select * from sku_info;
insert into user_info_topic select * from user_info;

将维表数据导入hudi

将my5.base_province和my1.base_region两张区域维表信息写入hudi COW表中

%flink.ssql
drop table if exists base_province_topic_source;
drop table if exists base_province_hudi;

CREATE TABLE `base_province_topic_source` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码'
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_province'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_province_hudi` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_province_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_province_hudi select * from base_province_topic_source;
%flink.ssql
drop table if exists base_region_topic_source;
drop table if exists base_region_hudi;

CREATE TABLE `base_region_topic_source` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_region'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_region_hudi` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_region_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_region_hudi select * from base_region_topic_source;

使用上述两张维表创建dim_province表

%flink.ssql
DROP TABLE IF EXISTS dim_province_hudi;
create table dim_province_hudi (
  province_id INT,
  province_name STRING,
  area_code STRING,
  region_id INT,
  region_name STRING ,
  PRIMARY KEY (province_id) NOT ENFORCED
) with (
    'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/dim_province_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'province_id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql

insert into dim_province_hudi
SELECT
  bp.id AS province_id,
  bp.name AS province_name,
  bp.area_code AS area_code,
  br.id AS region_id,
  br.region_name AS region_name
FROM base_region_hudi br 
     JOIN base_province_hudi bp ON br.id= bp.region_id
;

将商品维表my5.base_category1和my5.base_category2两张商品维表信息写入hudi COW表

%flink.ssql
drop table if exists base_category1_topic_source;
drop table if exists base_category1_hudi;
CREATE TABLE `base_category1_topic_source` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category1'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category1_hudi` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category1_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_category1_hudi select * from base_category1_topic_source;
%flink.ssql
drop table if exists base_category2_topic_source;
drop table if exists base_category2_hudi;
CREATE TABLE `base_category2_topic_source` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category1_id` bigint NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category2'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category2_hudi` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category1_id` bigint NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category2_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_category2_hudi select * from base_category2_topic_source;
%flink.ssql
drop table if exists base_category3_topic_source;
drop table if exists base_category3_hudi;
CREATE TABLE `base_category3_topic_source` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category2_id` bigint NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category3'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category3_hudi` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category2_id` bigint NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category3_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_category3_hudi select * from base_category3_topic_source;

将商品表导入hudi

%flink.ssql
drop table if exists sku_info_topic_source;
drop table if exists sku_info_topic_hudi;

CREATE TABLE `sku_info_topic_source` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.sku_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `sku_info_topic_hudi` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/sku_info_topic_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into sku_info_topic_hudi select * from sku_info_topic_source;

基于上述步骤,我们把商品维表的基础数据同步到hudi中,同样我们使用商品维表创建dim_sku_info视图

%flink.ssql
drop view if exists dim_sku_info;
CREATE VIEW dim_sku_info AS
SELECT
  si.id AS id,
  si.sku_name AS sku_name,
  si.category3_id AS c3_id,
  si.weight AS weight,
  si.tm_id AS tm_id,
  si.price AS price,
  si.spu_id AS spu_id,
  c3.name AS c3_name,
  c2.id AS c2_id,
  c2.name AS c2_name,
  c3.id AS c1_id,
  c3.name AS c1_name
FROM
  sku_info_topic_hudi si 
  JOIN base_category3_hudi c3 ON si.category3_id = c3.id
  JOIN base_category2_hudi c2 ON c3.category2_id =c2.id
  JOIN base_category1_hudi c1 ON c2.category1_id = c1.id
;

DWD层数据处理

经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。

%flink.ssql
drop table if exists ods_order_detail_topic;
drop table if exists ods_order_info_topic;
drop table if exists dwd_paid_order_detail_hudi;

CREATE TABLE `ods_order_detail_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `order_id` bigint  NULL COMMENT '订单编号',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku名称(冗余)',
  `img_url` string  NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2)  NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` int  NULL COMMENT '购买个数',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_detail'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset' 
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `ods_order_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `consignee` string  NULL COMMENT '收货人',
  `consignee_tel` string  NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2)  NULL COMMENT '总金额',
  `order_status` string  NULL COMMENT '订单状态',
  `user_id` bigint  NULL COMMENT '用户id',
  `payment_way` string  NULL COMMENT '付款方式',
  `delivery_address` string  NULL COMMENT '送货地址',
  `order_comment` string  NULL COMMENT '订单备注',
  `out_trade_no` string  NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` string  NULL COMMENT '订单描述(第三方支付用)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  `operate_time` timestamp(3)  NULL COMMENT '操作时间',
  `expire_time` timestamp(3)  NULL COMMENT '失效时间',
  `tracking_no` string  NULL COMMENT '物流单编号',
  `parent_order_id` bigint  NULL COMMENT '父订单编号',
  `img_url` string  NULL COMMENT '图片路径',
  `province_id` int  NULL COMMENT '地区',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset' 
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE dwd_paid_order_detail_hudi
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,0),
  create_time TIMESTAMP(3),
  pay_time  TIMESTAMP(3),
  primary key (detail_id) not enforced
 ) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/dwd_paid_order_detail_hudi',
    'scan.startup.mode' = 'earliest-offset',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);
%flink.ssql

insert into dwd_paid_order_detail_hudi
SELECT
  od.id,
  oi.id order_id,
  oi.user_id,
  oi.province_id,
  od.sku_id,
  od.sku_name,
  od.sku_num,
  od.order_price,
  oi.create_time,
  oi.operate_time
FROM
    (
    SELECT * 
    FROM ods_order_info_topic
    WHERE order_status = '2' -- 已支付
    ) oi JOIN
    (
    SELECT *
    FROM ods_order_detail_topic
    ) od 
    ON oi.id = od.order_id;

ADS层数据

经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了hudi中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。

ads_province_index_hudi

%flink.ssql
drop table if exists ads_province_index_hudi;
drop table if exists tmp_province_index_hudi;

-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个省份的订单数
--      2.每天每个省份的订单金额
-- ---------------------------------

CREATE TABLE ads_province_index_hudi(
  province_id INT,
  area_code STRING,
  province_name STRING,
  region_id INT,
  region_name STRING,
  order_amount DECIMAL(10,2),
  order_count BIGINT,
  dt STRING,
  PRIMARY KEY (province_id, dt) NOT ENFORCED  
) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/ads_province_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);

-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表
-- ---------------------------------

CREATE TABLE tmp_province_index_hudi(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE,
    primary key(province_id) not enforced
)WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_province_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'

);

%flink.ssql
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表数据装载
-- ---------------------------------
INSERT INTO tmp_province_index_hudi
SELECT
      province_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
      TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY province_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql

INSERT INTO ads_province_index_hudi
SELECT
  pc.province_id,
  dp.area_code,
  dp.province_name,
  dp.region_id,
  dp.region_name,
  pc.order_amount,
  pc.order_count,
  cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_hudi pc
  JOIN dim_province_hudi as dp 
  ON dp.province_id = pc.province_id;

查看ADS层的ads_province_index_hudi表数据:

基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓

ads_sku_index_hudi

%flink.ssql

-- ---------------------------------
-- 使用 DDL创建hudi中的ADS层表
-- 指标:1.每天每个商品对应的订单个数
--      2.每天每个商品对应的订单金额
--      3.每天每个商品对应的数量
-- ---------------------------------


drop table if exists ads_sku_index_hudi;
CREATE TABLE ads_sku_index_hudi
(
  sku_id BIGINT,
  sku_name VARCHAR,
  weight DOUBLE,
  tm_id BIGINT,
  price DOUBLE,
  spu_id BIGINT,
  c3_id BIGINT,
  c3_name VARCHAR ,
  c2_id BIGINT,
  c2_name VARCHAR,
  c1_id BIGINT,
  c1_name VARCHAR,
  order_amount DOUBLE,
  order_count BIGINT,
  sku_count BIGINT,
  dt varchar,
  PRIMARY KEY (sku_id,dt) NOT ENFORCED
) with (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/ads_sku_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);

-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
drop table if exists tmp_sku_index_hudi;
CREATE TABLE tmp_sku_index_hudi(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    order_sku_num BIGINT,
    pay_date DATE,
    PRIMARY KEY (sku_id) NOT ENFORCED
)WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_sku_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);
%flink.ssql

-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------

INSERT INTO tmp_sku_index_hudi
SELECT
      sku_id,
      count(distinct order_id) order_count, -- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
      sum(sku_num) order_sku_num,
      TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY sku_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql
INSERT INTO ads_sku_index_hudi
SELECT
  sku_id ,
  sku_name ,
  weight ,
  tm_id ,
  price ,
  spu_id ,
  c3_id ,
  c3_name,
  c2_id ,
  c2_name ,
  c1_id ,
  c1_name ,
  sc.order_amount,
  sc.order_count ,
  sc.order_sku_num ,
  cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_hudi sc 
  JOIN dim_sku_info ds
  ON ds.id = sc.sku_id
%flink.ssql
select * from ads_sku_index_hudi;
基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓
0 0 投票数
文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://lrting.top/uncategorized/2744/

(0)
上一篇 2021-12-11 13:34
下一篇 2021-12-14 01:22

相关推荐

订阅评论
提醒
guest
0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x