flink使用apache paimon入门

apache hudi与apache paimon社区活跃度分析

数据湖作为统一、灵活且可扩展的数据存储平台,对于提高数据分析效率、降低成本、增强数据治理和推动企业数字化转型具有核心价值,是实现数据驱动决策和业务创新的基石。

Apache Paimon 是一个功能强大的流式数据湖存储平台,它结合了 Apache Flink 的流处理能力和 Lakehouse 架构的优势,为用户提供高速数据摄取、变更日志跟踪和高效的实时分析能力。随着其社区的不断发展,Apache Paimon 将在实时数据湖和流批处理领域发挥越来越重要的作用。

下面基于flink与paimon并参考官网,介绍了一些基本使用方法。

基本组件

  • paimon 0.9.0
  • flink 1.20
  • hadoop 3.3.4
  • hive 3.1.3

flink配置paimon

wget https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.20/0.9.0/paimon-flink-1.20-0.9.0.jar -O lib/paimon-flink-1.20-0.9.0.jar

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.20.0/flink-sql-connector-hive-3.1.3_2.12-1.20.0.jar -O lib/flink-sql-connector-hive-3.1.3_2.12-1.20.0.jar

将hadoop的hadoop-mapreduce-client-core-3.3.4.jar包复制到flink/lib目录下

启动yarn session模式

bin/yarn-session.sh -jm 2g -tm 2g -s 4 -yd

启动 sql-client.sh

bin/sql-client.sh

Paimon Hive Catalog使用

将hive-site.xml文件上传至hdfs的/hive-conf目录

hdfs dfs -mkdir /hive-conf

hdfs dfs -put hive-site.xml /hive-conf

创建paimon hive catalog

export HADOOP_CLASSPATH=`hadoop classpath`
create catalog my_hive with(
  'type'='paimon',
  'metastore'='hive',
  'hive-conf-dir'= 'hdfs://hadoop:9000/hive-conf');

创建testdb库,并在该库下创建表

use catalog my_hive;

create database testdb;

use testdb;

不分区表

创建不分区表,并写入数据

/*不分区表*/
CREATE TABLE my_table_1 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

insert into my_table_1 values(1, 1, 'go', '20240921', '12');

查看表目录

flink使用apache paimon入门

分区表

默认情况下,Paimon不会将新创建分区同步到Hive元数据存储。用户在Hive中会看到一个未分区的表。分区下推将由过滤下推来执行。

如果您希望在Hive中看到一个分区表,并且也将新创建的分区同步到Hive元数据存储,请设置表属性metastore.partitioned-table为true。

创建分区表,并写入数据。

/*分区表*/
CREATE TABLE my_table_2 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh)
with (
  'metastore.partitioned-table' = 'true',
  'file.format' = 'parquet'
);

insert into my_table_2 values(1, 1, 'go', '20240921', '12');

查看表目录

flink使用apache paimon入门

create table as select语法使用

/*
 * 直接执行这个语句,是个streaming的任务,
 * 所以需要先把执行模式设置为batch
 */

set execution.runtime-mode=batch;

create table my_table_3 as select * from my_table_2;

在hdfs上查看my_table_3表的路径
flink使用apache paimon入门

可以看到创建表在hdfs中没有分区路径。

现在修改上述sql,在with中添加额外参数。

CREATE TABLE my_table_4 WITH ('partition' = 'dt', 'file.format' = 'orc', 'primary-key' = 'dt,hh') AS SELECT * FROM my_table_2;

再次查看my_table_4表的路径

flink使用apache paimon入门

可以看到分区路径。

Clustering功能使用

在Paimon中,聚类是一个功能,它允许你在写入过程中根据某些列的值对Append Table中的数据进行聚类。这种数据组织方式可以显著提升后续读取数据任务的效率,因为它实现了更快、更精准的数据检索。该功能仅支持Append Table(没有定义主键的表,bucket = -1)和批量执行模式。

要利用聚类功能,您可以在创建或写入表时指定希望聚类的列。以下是一个如何启用聚类的简单示例:

CREATE TABLE my_table_5 (
    a STRING,
    b STRING,
    c STRING
) WITH (
  'sink.clustering.by-columns' = 'a,b',
);

insert into my_table_5 values('1', '2', '3');

也可以使用sql hint语法动态配置

INSERT INTO my_table_5 /*+ OPTIONS('sink.clustering.by-columns' = 'a,b,c') */ values('2', '2', '3');

数据使用自动选择的策略(如ORDER、ZORDER或HILBERT)进行聚类,但您可以通过设置sink.clustering.strategy来手动指定聚类策略。聚类依赖于抽样和排序。如果聚类过程耗时过长,您可以通过设置sink.clustering.sample-factor来减少总的样本数,或者通过设置sink.clustering.sort-in-cluster为false来禁用排序步骤。

可参考:FlinkConnectorOptions进行设置。

insert overwrite语法使用

不分区表:

INSERT OVERWRITE my_table SELECT ...

指定partiton实现overwrite

INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...

Flink的默认覆盖模式是动态分区覆盖(这意味着Paimon只会删除覆盖数据中出现的分区)。您可以通过配置dynamic-partition-overwrite来将其更改为静态覆盖。

-- my_table is a Partitioned Table

-- Dynamic overwrite
INSERT OVERWRITE my_table SELECT ...

-- Static overwrite (Overwrite whole table)
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...

truncate table语法使用

flink 1.17-版本

INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM my_table WHERE false;

flink 1.18+版本

TRUNCATE TABLE my_table;

清理分区

目前,Paimon支持两种清理分区的方式。

与清理表类似,您可以使用INSERT OVERWRITE来通过向分区中插入空值来清除其数据。

方法 #1 不支持删除多个分区。如果您需要删除多个分区,可以通过提交drop_partition作业通过flink run来操作。

-- Syntax
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ 
PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM my_table WHERE false;

-- The following SQL is an example:
-- table definition
CREATE TABLE my_table (
    k0 INT,
    k1 INT,
    v STRING
) PARTITIONED BY (k0, k1);

-- you can use
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ 
PARTITION (k0 = 0) SELECT k1, v FROM my_table WHERE false;

-- or
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ 
PARTITION (k0 = 0, k1 = 0) SELECT v FROM my_table WHERE false;

更新表

重要的表属性设置:

  • 仅支持主键表使用此功能。
    MergeEngine需要是去重(deduplicate)或部分更新(partial-update)* 才能支持此功能。
  • 不支持更新主键。

目前,Paimon支持使用Flink 1.17及更高版本中的UPDATE语句来更新记录。您可以在Flink的批量模式下执行UPDATE操作。

-- Syntax
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;

-- The following SQL is an example:
-- table definition
CREATE TABLE my_table (
    a STRING,
    b INT,
    c INT,
    PRIMARY KEY (a) NOT ENFORCED
) WITH ( 
    'merge-engine' = 'deduplicate' 
);

-- you can use
UPDATE my_table SET b = 1, c = 2 WHERE a = 'myTable';

delete语法使用

重要的表属性设置:

  • 仅支持主键表使用此功能。
  • 如果表中有主键,MergeEngine需要是去重(deduplicate)模式才能支持此功能。
  • 不支持在流式模式下从表中删除数据。
-- Syntax
DELETE FROM table_identifier WHERE conditions;

-- The following SQL is an example:
-- table definition
CREATE TABLE my_table (
    id BIGINT NOT NULL,
    currency STRING,
    rate BIGINT,
    dt String,
    PRIMARY KEY (id, dt) NOT ENFORCED
) PARTITIONED BY (dt) WITH ( 
    'merge-engine' = 'deduplicate' 
);

-- you can use
DELETE FROM my_table WHERE currency = 'UNKNOWN';

分区标记完成

对于分区表,每个分区可能都需要调度以触发下游的批量计算。因此,选择合适的时机来指示它已准备好进行调度,并最小化调度过程中的数据漂移量是必要的。我们称这个过程为:“分区标记完成”。

CREATE TABLE my_partitioned_table (
    f0 INT,
    f1 INT,
    f2 INT,
    ...
    dt STRING
) PARTITIONED BY (dt) WITH (
    'partition.timestamp-formatter'='yyyyMMdd',
    'partition.timestamp-pattern'='$dt',
    'partition.time-interval'='1 d',
    'partition.idle-time-to-done'='15 m'
);

首先,您需要定义分区的时间解析器和分区间的时间间隔,以便确定何时可以正确地标记分区为完成状态。

其次,您需要定义空闲时间,这决定了分区在没有任何新数据时需要多长时间,然后它才会被标记为完成。

第三,默认情况下,分区标记完成会创建一个_SUCCESS文件,该文件的内容是JSON格式,包含creationTime和modificationTime,这些可以帮助您了解是否有延迟数据。您还可以配置其他操作。

批量查询

SET 'execution.runtime-mode' = 'batch';

时间旅行查询

动态参数使用方法:

-- read the snapshot with id 1L
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;

-- read the snapshot from specified timestamp in unix milliseconds
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

-- read the snapshot from specified timestamp string ,it will be automatically converted to timestamp in unix milliseconds
-- Supported formats include:yyyy-MM-dd, yyyy-MM-dd HH:mm:ss, yyyy-MM-dd HH:mm:ss.SSS, use default local time zone
SELECT * FROM t /*+ OPTIONS('scan.timestamp' = '2023-12-09 23:09:12') */;

-- read tag 'my-tag'
SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;

-- read the snapshot from watermark, will match the first snapshot after the watermark
SELECT * FROM t /*+ OPTIONS('scan.watermark' = '1678883047356') */; 

flink 1.18+支持语法:

-- read the snapshot from specified timestamp
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';

-- you can also use some simple expressions (see flink document to get supported functions)
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' + INTERVAL '1' DAY

批量增量查询

读取从起始快照(排除)到结束快照之间的增量变化。

例如:

‘5,10’ 表示在快照5和快照10之间的变化。 ‘TAG1,TAG3’ 表示在TAG1和TAG3之间的变化。

-- incremental between snapshot ids
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;

-- incremental between snapshot time mills
SELECT * FROM t /*+ OPTIONS('incremental-between-timestamp' = '1692169000000,1692169900000') */;

默认情况下,会扫描生成变更日志文件的表的变更日志文件。否则,会扫描新更改的文件。您也可以强制指定 ‘incremental-betweencan-mode’。

在批量SQL中,不允许返回 DELETE 记录,因此带有 -D 的记录将被丢弃。如果您想查看 DELETE 记录,可以使用 audit_log 表:

SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;

流式查询

默认情况下,流式读取在首次启动时会产生表上的最新快照,并继续读取最新的更改。

Paimon 默认确保您的启动过程得到妥善处理,包含所有数据。

SET 'execution.runtime-mode' = 'streaming';

您还可以在不使用快照数据的情况下进行流式读取,可以使用最新扫描模式:

-- Continuously reads latest changes without producing a snapshot at the beginning.
SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;

流式时间旅行查询

SELECT * FROM t WHERE dt > '2023-06-26';

动态参数使用方法:

-- read changes from snapshot id 1L 
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;

-- read changes from snapshot specified timestamp
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

-- read snapshot id 1L upon first startup, and continue to read the changes
SELECT * FROM t /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;

flink 1.18+支持语法:

-- read the snapshot from specified timestamp
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';

-- you can also use some simple expressions (see flink document to get supported functions)
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' + INTERVAL '1' DAY

时间旅行流式读取依赖于快照,但默认情况下,快照只保留1小时内的数据,这可能会阻止您读取更早的增量数据。因此,Paimon还提供了另一种流式读取模式,即 scan.file-creation-time-millis,该模式提供了一个粗略的过滤机制,以保留在 timeMillis生成的文件。

SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;

Consumer ID

SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.expiration-time' = '1 d', 'consumer.mode' = 'at-least-once') */;

在流式读取 Paimon 表时,下一个快照 ID 将被记录到文件系统中。这有几个优点:

当之前的作业停止时,新启动的作业可以继续从之前的进度开始消费,而无需从状态恢复。新读取将从消费者文件中找到的下一个快照 ID 开始读取。如果您不希望这种行为,可以将 ‘consumer.ignore-progress’ 设置为 true。

当决定一个快照是否已过期时,Paimon 会查看文件系统中该表的所有消费者,如果还有消费者仍然依赖于这个快照,那么这个快照将不会因为过期而被删除。

0 0 投票数
文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://lrting.top/backend/bigdata/paimon/14411/

(1)
上一篇 2024-09-14 14:55
下一篇 2024-09-23

相关推荐

订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x