数据湖作为统一、灵活且可扩展的数据存储平台,对于提高数据分析效率、降低成本、增强数据治理和推动企业数字化转型具有核心价值,是实现数据驱动决策和业务创新的基石。
Apache Paimon 是一个功能强大的流式数据湖存储平台,它结合了 Apache Flink 的流处理能力和 Lakehouse 架构的优势,为用户提供高速数据摄取、变更日志跟踪和高效的实时分析能力。随着其社区的不断发展,Apache Paimon 将在实时数据湖和流批处理领域发挥越来越重要的作用。
下面基于flink与paimon并参考官网,介绍了一些基本使用方法。
基本组件
- paimon 0.9.0
- flink 1.20
- hadoop 3.3.4
- hive 3.1.3
flink配置paimon
wget https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.20/0.9.0/paimon-flink-1.20-0.9.0.jar -O lib/paimon-flink-1.20-0.9.0.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.20.0/flink-sql-connector-hive-3.1.3_2.12-1.20.0.jar -O lib/flink-sql-connector-hive-3.1.3_2.12-1.20.0.jar
将hadoop的hadoop-mapreduce-client-core-3.3.4.jar包复制到flink/lib目录下
启动yarn session模式
bin/yarn-session.sh -jm 2g -tm 2g -s 4 -yd
启动 sql-client.sh
bin/sql-client.sh
Paimon Hive Catalog使用
将hive-site.xml文件上传至hdfs的/hive-conf目录
hdfs dfs -mkdir /hive-conf
hdfs dfs -put hive-site.xml /hive-conf
创建paimon hive catalog
export HADOOP_CLASSPATH=`hadoop classpath`
create catalog my_hive with(
'type'='paimon',
'metastore'='hive',
'hive-conf-dir'= 'hdfs://hadoop:9000/hive-conf');
创建testdb库,并在该库下创建表
use catalog my_hive;
create database testdb;
use testdb;
不分区表
创建不分区表,并写入数据
/*不分区表*/
CREATE TABLE my_table_1 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
insert into my_table_1 values(1, 1, 'go', '20240921', '12');
查看表目录
分区表
默认情况下,Paimon不会将新创建分区同步到Hive元数据存储。用户在Hive中会看到一个未分区的表。分区下推将由过滤下推来执行。
如果您希望在Hive中看到一个分区表,并且也将新创建的分区同步到Hive元数据存储,请设置表属性metastore.partitioned-table为true。
创建分区表,并写入数据。
/*分区表*/
CREATE TABLE my_table_2 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh)
with (
'metastore.partitioned-table' = 'true',
'file.format' = 'parquet'
);
insert into my_table_2 values(1, 1, 'go', '20240921', '12');
查看表目录
create table as select语法使用
/*
* 直接执行这个语句,是个streaming的任务,
* 所以需要先把执行模式设置为batch
*/
set execution.runtime-mode=batch;
create table my_table_3 as select * from my_table_2;
在hdfs上查看my_table_3表的路径
可以看到创建表在hdfs中没有分区路径。
现在修改上述sql,在with中添加额外参数。
CREATE TABLE my_table_4 WITH ('partition' = 'dt', 'file.format' = 'orc', 'primary-key' = 'dt,hh') AS SELECT * FROM my_table_2;
再次查看my_table_4表的路径
可以看到分区路径。
Clustering功能使用
在Paimon中,聚类是一个功能,它允许你在写入过程中根据某些列的值对Append Table中的数据进行聚类。这种数据组织方式可以显著提升后续读取数据任务的效率,因为它实现了更快、更精准的数据检索。该功能仅支持Append Table(没有定义主键的表,bucket = -1)和批量执行模式。
要利用聚类功能,您可以在创建或写入表时指定希望聚类的列。以下是一个如何启用聚类的简单示例:
CREATE TABLE my_table_5 (
a STRING,
b STRING,
c STRING
) WITH (
'sink.clustering.by-columns' = 'a,b',
);
insert into my_table_5 values('1', '2', '3');
也可以使用sql hint语法动态配置
INSERT INTO my_table_5 /*+ OPTIONS('sink.clustering.by-columns' = 'a,b,c') */ values('2', '2', '3');
数据使用自动选择的策略(如ORDER、ZORDER或HILBERT)进行聚类,但您可以通过设置sink.clustering.strategy来手动指定聚类策略。聚类依赖于抽样和排序。如果聚类过程耗时过长,您可以通过设置sink.clustering.sample-factor来减少总的样本数,或者通过设置sink.clustering.sort-in-cluster为false来禁用排序步骤。
可参考:FlinkConnectorOptions进行设置。
insert overwrite语法使用
不分区表:
INSERT OVERWRITE my_table SELECT ...
指定partiton实现overwrite
INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
Flink的默认覆盖模式是动态分区覆盖(这意味着Paimon只会删除覆盖数据中出现的分区)。您可以通过配置dynamic-partition-overwrite来将其更改为静态覆盖。
-- my_table is a Partitioned Table
-- Dynamic overwrite
INSERT OVERWRITE my_table SELECT ...
-- Static overwrite (Overwrite whole table)
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...
truncate table语法使用
flink 1.17-版本
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM my_table WHERE false;
flink 1.18+版本
TRUNCATE TABLE my_table;
清理分区
目前,Paimon支持两种清理分区的方式。
与清理表类似,您可以使用INSERT OVERWRITE来通过向分区中插入空值来清除其数据。
方法 #1 不支持删除多个分区。如果您需要删除多个分区,可以通过提交drop_partition作业通过flink run来操作。
-- Syntax
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */
PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM my_table WHERE false;
-- The following SQL is an example:
-- table definition
CREATE TABLE my_table (
k0 INT,
k1 INT,
v STRING
) PARTITIONED BY (k0, k1);
-- you can use
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */
PARTITION (k0 = 0) SELECT k1, v FROM my_table WHERE false;
-- or
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */
PARTITION (k0 = 0, k1 = 0) SELECT v FROM my_table WHERE false;
更新表
重要的表属性设置:
- 仅支持主键表使用此功能。
MergeEngine需要是去重(deduplicate)或部分更新(partial-update)* 才能支持此功能。 - 不支持更新主键。
目前,Paimon支持使用Flink 1.17及更高版本中的UPDATE语句来更新记录。您可以在Flink的批量模式下执行UPDATE操作。
-- Syntax
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;
-- The following SQL is an example:
-- table definition
CREATE TABLE my_table (
a STRING,
b INT,
c INT,
PRIMARY KEY (a) NOT ENFORCED
) WITH (
'merge-engine' = 'deduplicate'
);
-- you can use
UPDATE my_table SET b = 1, c = 2 WHERE a = 'myTable';
delete语法使用
重要的表属性设置:
- 仅支持主键表使用此功能。
- 如果表中有主键,MergeEngine需要是去重(deduplicate)模式才能支持此功能。
- 不支持在流式模式下从表中删除数据。
-- Syntax
DELETE FROM table_identifier WHERE conditions;
-- The following SQL is an example:
-- table definition
CREATE TABLE my_table (
id BIGINT NOT NULL,
currency STRING,
rate BIGINT,
dt String,
PRIMARY KEY (id, dt) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'merge-engine' = 'deduplicate'
);
-- you can use
DELETE FROM my_table WHERE currency = 'UNKNOWN';
分区标记完成
对于分区表,每个分区可能都需要调度以触发下游的批量计算。因此,选择合适的时机来指示它已准备好进行调度,并最小化调度过程中的数据漂移量是必要的。我们称这个过程为:“分区标记完成”。
CREATE TABLE my_partitioned_table (
f0 INT,
f1 INT,
f2 INT,
...
dt STRING
) PARTITIONED BY (dt) WITH (
'partition.timestamp-formatter'='yyyyMMdd',
'partition.timestamp-pattern'='$dt',
'partition.time-interval'='1 d',
'partition.idle-time-to-done'='15 m'
);
首先,您需要定义分区的时间解析器和分区间的时间间隔,以便确定何时可以正确地标记分区为完成状态。
其次,您需要定义空闲时间,这决定了分区在没有任何新数据时需要多长时间,然后它才会被标记为完成。
第三,默认情况下,分区标记完成会创建一个_SUCCESS文件,该文件的内容是JSON格式,包含creationTime和modificationTime,这些可以帮助您了解是否有延迟数据。您还可以配置其他操作。
批量查询
SET 'execution.runtime-mode' = 'batch';
时间旅行查询
动态参数使用方法:
-- read the snapshot with id 1L
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
-- read the snapshot from specified timestamp in unix milliseconds
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
-- read the snapshot from specified timestamp string ,it will be automatically converted to timestamp in unix milliseconds
-- Supported formats include:yyyy-MM-dd, yyyy-MM-dd HH:mm:ss, yyyy-MM-dd HH:mm:ss.SSS, use default local time zone
SELECT * FROM t /*+ OPTIONS('scan.timestamp' = '2023-12-09 23:09:12') */;
-- read tag 'my-tag'
SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;
-- read the snapshot from watermark, will match the first snapshot after the watermark
SELECT * FROM t /*+ OPTIONS('scan.watermark' = '1678883047356') */;
flink 1.18+支持语法:
-- read the snapshot from specified timestamp
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';
-- you can also use some simple expressions (see flink document to get supported functions)
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' + INTERVAL '1' DAY
批量增量查询
读取从起始快照(排除)到结束快照之间的增量变化。
例如:
‘5,10’ 表示在快照5和快照10之间的变化。 ‘TAG1,TAG3’ 表示在TAG1和TAG3之间的变化。
-- incremental between snapshot ids
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
-- incremental between snapshot time mills
SELECT * FROM t /*+ OPTIONS('incremental-between-timestamp' = '1692169000000,1692169900000') */;
默认情况下,会扫描生成变更日志文件的表的变更日志文件。否则,会扫描新更改的文件。您也可以强制指定 ‘incremental-betweencan-mode’。
在批量SQL中,不允许返回 DELETE 记录,因此带有 -D 的记录将被丢弃。如果您想查看 DELETE 记录,可以使用 audit_log 表:
SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;
流式查询
默认情况下,流式读取在首次启动时会产生表上的最新快照,并继续读取最新的更改。
Paimon 默认确保您的启动过程得到妥善处理,包含所有数据。
SET 'execution.runtime-mode' = 'streaming';
您还可以在不使用快照数据的情况下进行流式读取,可以使用最新扫描模式:
-- Continuously reads latest changes without producing a snapshot at the beginning.
SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
流式时间旅行查询
SELECT * FROM t WHERE dt > '2023-06-26';
动态参数使用方法:
-- read changes from snapshot id 1L
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
-- read changes from snapshot specified timestamp
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
-- read snapshot id 1L upon first startup, and continue to read the changes
SELECT * FROM t /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;
flink 1.18+支持语法:
-- read the snapshot from specified timestamp
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';
-- you can also use some simple expressions (see flink document to get supported functions)
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' + INTERVAL '1' DAY
时间旅行流式读取依赖于快照,但默认情况下,快照只保留1小时内的数据,这可能会阻止您读取更早的增量数据。因此,Paimon还提供了另一种流式读取模式,即 scan.file-creation-time-millis,该模式提供了一个粗略的过滤机制,以保留在 timeMillis生成的文件。
SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
Consumer ID
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.expiration-time' = '1 d', 'consumer.mode' = 'at-least-once') */;
在流式读取 Paimon 表时,下一个快照 ID 将被记录到文件系统中。这有几个优点:
当之前的作业停止时,新启动的作业可以继续从之前的进度开始消费,而无需从状态恢复。新读取将从消费者文件中找到的下一个快照 ID 开始读取。如果您不希望这种行为,可以将 ‘consumer.ignore-progress’ 设置为 true。
当决定一个快照是否已过期时,Paimon 会查看文件系统中该表的所有消费者,如果还有消费者仍然依赖于这个快照,那么这个快照将不会因为过期而被删除。
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/bigdata/paimon/14411/