在之前的文章,我们说到flink写入paimon入门,本文则使用spark引擎进行操作。
组件版本
paimon 0.9.0
hadoop 3.3.4
hive 3.1.3
spark 3.5.2
基本配置
wget https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.5/0.9.0/paimon-spark-3.5-0.9.0.jar -O spark/jars/paimon-spark-3.5-0.9.0.jar
拷贝hive-site.xml到spark/conf配置文件目录下,默认使用hive catalog方式。
启动spark-sql
bin/spark-sql ... \
--conf spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
创建表
不分区表
CREATE TABLE my_table_spark_1 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
);
insert into my_table_spark_1 values(1, 1, 'go', '20240921', '12');
分区表
默认情况下,Paimon不会将新创建的分区同步到Hive元数据存储。用户在Hive中看到的将是未分区的表。分区下推将由过滤下推来执行。
如果您想在Hive中看到分区表,并且将新创建的分区同步到Hive元数据存储,请将表属性metastore.partitioned-table设置为true。
CREATE TABLE my_table_spark_2 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id',
'metastore.partitioned-table' = 'true'
);
insert into my_table_spark_2 values(1, 1, 'go', '20240921', '12');
可能出现的报错:
spark-sql (default)> insert into my_table_spark_2 values(1, 1, 'go', '20240921', '12');
Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
at org.apache.spark.sql.hive.execution.V1WritesHiveUtils.getDynamicPartitionColumns(V1WritesHiveUtils.scala:82)
at org.apache.spark.sql.hive.execution.V1WritesHiveUtils.getDynamicPartitionColumns$(V1WritesHiveUtils.scala:45)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$.getDynamicPartitionColumns(InsertIntoHiveTable.scala:288)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$.apply(InsertIntoHiveTable.scala:316)
at org.apache.spark.sql.hive.HiveAnalysis$$anonfun$apply$3.applyOrElse(HiveStrategies.scala:166)
at org.apache.spark.sql.hive.HiveAnalysis$$anonfun$apply$3.applyOrElse(HiveStrategies.scala:161)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:32)
at org.apache.spark.sql.hive.HiveAnalysis$.apply(HiveStrategies.scala:161)
at org.apache.spark.sql.hive.HiveAnalysis$.apply(HiveStrategies.scala:160)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:240)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:236)
at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:187)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:236)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:202)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:223)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:222)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:68)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:501)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:619)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:613)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:613)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:310)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1029)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
这是动态分区的问题,可以根据提示,在hive中配置hive.exec.dynamic.partition.mode=nonstrict。与此同时直接在spark sql中设置也可以。
使用create table as select创建表
可以通过查询的结果来创建和填充表,例如,我们有一个如下SQL语句:CREATE TABLE table_b AS SELECT id, name FROM table_a。生成的表table_b将与以下语句创建表并插入数据的操作等价:CREATE TABLE table_b (id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;
当使用CREATE TABLE AS SELECT时,我们可以指定主键或分区。具体的语法,请参考以下SQL示例。
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT
);
CREATE TABLE my_table_as AS SELECT * FROM my_table;
/* partitioned table*/
CREATE TABLE my_table_partition (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE my_table_partition_as PARTITIONED BY (dt) AS SELECT * FROM my_table_partition;
/* change TBLPROPERTIES */
CREATE TABLE my_table_options (
user_id BIGINT,
item_id BIGINT
) TBLPROPERTIES ('file.format' = 'orc');
CREATE TABLE my_table_options_as TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM my_table_options;
/* primary key */
CREATE TABLE my_table_pk (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_pk_as TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM my_table_pk;
/* primary key + partition */
CREATE TABLE my_table_all (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all;
数据写入
覆盖整张不分区的表数据
INSERT OVERWRITE my_table SELECT ...
覆盖部分分区
INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
动态覆盖
Spark的默认覆盖模式是静态分区覆盖。要启用动态覆盖,您需要设置Spark会话配置spark.sql.sources.partitionOverwriteMode为dynamic。
示例:
CREATE TABLE my_table (id INT, pt STRING) PARTITIONED BY (pt);
INSERT INTO my_table VALUES (1, 'p1'), (2, 'p2');
-- Static overwrite (Overwrite the whole table)
INSERT OVERWRITE my_table VALUES (3, 'p1');
SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
| 3| p1|
+---+---+
*/
-- Dynamic overwrite (Only overwrite pt='p1')
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE my_table VALUES (3, 'p1');
SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
| 2| p2|
| 3| p1|
+---+---+
*/
删除整张表数据
TRUNCATE TABLE my_table;
更新表
Spark支持更新原始类型(PrimitiveType)和结构类型(StructType),例如:
-- Syntax
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;
CREATE TABLE t (
id INT,
s STRUCT<c1: INT, c2: STRING>,
name STRING)
TBLPROPERTIES (
'primary-key' = 'id',
'merge-engine' = 'deduplicate'
);
-- you can use
UPDATE t SET name = 'a_new' WHERE id = 1;
UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;
删除表数据
DELETE FROM my_table WHERE currency = 'UNKNOWN';
Merging into table使用
Paimon目前支持Spark 3+中的Merge
语法,这允许在单个事务中基于源表执行一系列更新、插入和删除操作。
- 在更新子句中,不支持更新主键列。
- 不支持
WHEN NOT MATCHED BY SOURCE
语法。
示例一:
这是一个简单的示例,如果目标表中存在行,则更新该行;如果不存在,则插入该行。
-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.
MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
示例二:
这是一个包含多个条件子句的演示。
-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.
MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED AND target.a = 5 THEN
UPDATE SET b = source.b + target.b -- when matched and meet the condition 1, then update b;
WHEN MATCHED AND source.c > 'c2' THEN
UPDATE SET * -- when matched and meet the condition 2, then update all the columns;
WHEN MATCHED THEN
DELETE -- when matched, delete this row in target table;
WHEN NOT MATCHED AND c > 'c9' THEN
INSERT (a, b, c) VALUES (a, b * 1.1, c) -- when not matched but meet the condition 3, then transform and insert this row;
WHEN NOT MATCHED THEN
INSERT * -- when not matched, insert this row without any transformation;
流式写入 && schema evolution
目前两种功能不支持spark sql。
数据查询
Paimon的批量读取会返回表中快照中的所有数据。默认情况下,批量读取返回最新的快照。
Paimon的带时间旅行功能的批量读取可以指定一个快照或一个标签,并读取对应的数据。
需要Spark 3.3+版本。
你可以在查询中使用VERSION AS OF和TIMESTAMP AS OF来进行时间旅行:
-- read the snapshot with id 1L (use snapshot id as version)
SELECT * FROM t VERSION AS OF 1;
-- read the snapshot from specified timestamp
SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';
-- read the snapshot from specified timestamp in unix seconds
SELECT * FROM t TIMESTAMP AS OF 1678883047;
-- read tag 'my-tag'
SELECT * FROM t VERSION AS OF 'my-tag';
-- read the snapshot from specified watermark. will match the first snapshot after the watermark
SELECT * FROM t VERSION AS OF 'watermark-1678883047356';
如果标签的名称是一个数字且等于快照ID,VERSION AS OF语法将首先考虑标签。例如,如果你有一个基于快照2的标签名为‘1’,那么执行SELECT * FROM t VERSION AS OF ‘1’的语句实际上查询的是快照2,而不是快照1。
批量增量查询
读取从起始快照(不包括)到结束快照之间的增量变更。
例如:
- ‘5,10’表示快照5和快照10之间的变更。
- ‘TAG1,TAG3’表示从TAG1到TAG3之间的变更。
默认情况下,将扫描生成变更日志文件的表的变更日志文件。否则,将扫描新变更的文件。你也可以强制指定‘incremental-between-scan-mode’。
需要Spark 3.2+版本。
Paimon支持使用Spark SQL来执行由Spark Table Valued Function实现的增量查询。
你可以在查询中使用paimon_incremental_query
来提取增量数据:
-- read the incremental data between snapshot id 12 and snapshot id 20.
SELECT * FROM paimon_incremental_query('tableName', 12, 20);
在批量SQL中,不允许返回DELETE记录,因此带有-D标记的记录将被删除。如果你想要查看DELETE记录,你可以查询审计日志(audit_log)表。
流式读取
目前纯spark sql不支持流式读取,具体使用可参考:Streaming Query
关于查询优化
强烈建议在查询中指定分区和主键过滤器,这将加速查询的数据跳过过程。
以下是可以加速数据跳过的过滤器函数:
- =
- \<
- \<=
- >
- >=
- IN (…)
- LIKE ‘abc%’
- IS NULL
Paimon会按照主键对数据进行排序,这可以加快点查询和范围查询的速度。当使用复合主键时,为了实现最佳的加速效果,查询过滤器最好能够形成主键的最左侧前缀。
假设一个表具有以下规范:
CREATE TABLE orders (
catalog_id BIGINT,
order_id BIGINT,
.....,
) TBLPROPERTIES (
'primary-key' = 'catalog_id,order_id'
);
查询通过为复合主键的最左侧前缀指定范围过滤器,从而获得良好的加速效果。
SELECT * FROM orders WHERE catalog_id=1025;
SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;
SELECT * FROM orders
WHERE catalog_id=1025
AND order_id>2035 AND order_id<6000;
然而,以下过滤器并不能很好地加速查询。
SELECT * FROM orders WHERE order_id=29495;
SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;
修改表结构/属性
设置表属性
ALTER TABLE my_table SET TBLPROPERTIES (
'write-buffer-size' = '256 MB'
);
删除表属性
ALTER TABLE my_table UNSET TBLPROPERTIES ('write-buffer-size');
修改/增加表属性
ALTER TABLE my_table SET TBLPROPERTIES (
'comment' = 'table comment'
);
表重命名
ALTER TABLE my_table RENAME TO my_table_new;
--- or
ALTER TABLE [catalog.[database.]]test1 RENAME to [database.]test2;
新增列
ALTER TABLE my_table ADD COLUMNS (
c1 INT,
c2 STRING
);
重命名列
ALTER TABLE my_table RENAME COLUMN c0 TO c1;
删除列
ALTER TABLE my_table DROP COLUMNS (c1, c2);
删除分区
ALTER TABLE my_table DROP PARTITION (`id` = 1, `name` = 'paimon');
修改字段注释
ALTER TABLE my_table ALTER COLUMN buy_count COMMENT 'buy count';
新增字段位置
ALTER TABLE my_table ADD COLUMN c INT FIRST;
ALTER TABLE my_table ADD COLUMN c INT AFTER b;
修改字段位置
ALTER TABLE my_table ALTER COLUMN col_a FIRST;
ALTER TABLE my_table ALTER COLUMN col_a AFTER col_b;
修改字段类型
ALTER TABLE my_table ALTER COLUMN col_a TYPE DOUBLE;
flink/hive/spark对paimon对兼容性
在之前文章中,使用flink创建paimon表并写入数据,下面测试flink与spark的兼容性。
- flink建表,spark写入 — 支持
- flink建表,写入一批数据,spark继续写入 — 支持
- spark建表,flink写入 — 支持
- spark建表,写入一批数据,flink继续写入 — 支持
- flink建表,写入数据,hive查 — 支持
- spark建表,写入数据,hive查 — 支持
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/14414/