spark使用apache paimon入门

在之前的文章,我们说到flink写入paimon入门,本文则使用spark引擎进行操作。
spark使用apache paimon入门

组件版本

paimon 0.9.0
hadoop 3.3.4
hive 3.1.3
spark 3.5.2

基本配置

wget https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.5/0.9.0/paimon-spark-3.5-0.9.0.jar -O spark/jars/paimon-spark-3.5-0.9.0.jar

拷贝hive-site.xml到spark/conf配置文件目录下,默认使用hive catalog方式。

启动spark-sql

bin/spark-sql ... \
  --conf spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog \
  --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

创建表

不分区表

CREATE TABLE my_table_spark_1 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);

insert into my_table_spark_1 values(1, 1, 'go', '20240921', '12');

分区表

默认情况下,Paimon不会将新创建的分区同步到Hive元数据存储。用户在Hive中看到的将是未分区的表。分区下推将由过滤下推来执行。

如果您想在Hive中看到分区表,并且将新创建的分区同步到Hive元数据存储,请将表属性metastore.partitioned-table设置为true。

CREATE TABLE my_table_spark_2 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id',
    'metastore.partitioned-table' = 'true'
);

insert into my_table_spark_2 values(1, 1, 'go', '20240921', '12');

可能出现的报错:

spark-sql (default)> insert into my_table_spark_2 values(1, 1, 'go', '20240921', '12');
Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
        at org.apache.spark.sql.hive.execution.V1WritesHiveUtils.getDynamicPartitionColumns(V1WritesHiveUtils.scala:82)
        at org.apache.spark.sql.hive.execution.V1WritesHiveUtils.getDynamicPartitionColumns$(V1WritesHiveUtils.scala:45)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$.getDynamicPartitionColumns(InsertIntoHiveTable.scala:288)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$.apply(InsertIntoHiveTable.scala:316)
        at org.apache.spark.sql.hive.HiveAnalysis$$anonfun$apply$3.applyOrElse(HiveStrategies.scala:166)
        at org.apache.spark.sql.hive.HiveAnalysis$$anonfun$apply$3.applyOrElse(HiveStrategies.scala:161)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:32)
        at org.apache.spark.sql.hive.HiveAnalysis$.apply(HiveStrategies.scala:161)
        at org.apache.spark.sql.hive.HiveAnalysis$.apply(HiveStrategies.scala:160)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:240)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:236)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:187)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:236)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:202)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:223)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:222)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:68)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:501)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:619)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:613)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:613)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:310)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1029)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

这是动态分区的问题,可以根据提示,在hive中配置hive.exec.dynamic.partition.mode=nonstrict。与此同时直接在spark sql中设置也可以。

使用create table as select创建表

可以通过查询的结果来创建和填充表,例如,我们有一个如下SQL语句:CREATE TABLE table_b AS SELECT id, name FROM table_a。生成的表table_b将与以下语句创建表并插入数据的操作等价:CREATE TABLE table_b (id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;

当使用CREATE TABLE AS SELECT时,我们可以指定主键或分区。具体的语法,请参考以下SQL示例。

CREATE TABLE my_table (
     user_id BIGINT,
     item_id BIGINT
);
CREATE TABLE my_table_as AS SELECT * FROM my_table;

/* partitioned table*/
CREATE TABLE my_table_partition (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE my_table_partition_as PARTITIONED BY (dt) AS SELECT * FROM my_table_partition;

/* change TBLPROPERTIES */
CREATE TABLE my_table_options (
       user_id BIGINT,
       item_id BIGINT
) TBLPROPERTIES ('file.format' = 'orc');
CREATE TABLE my_table_options_as TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM my_table_options;

/* primary key */
CREATE TABLE my_table_pk (
     user_id BIGINT,
     item_id BIGINT,
     behavior STRING,
     dt STRING,
     hh STRING
) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_pk_as TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM my_table_pk;

/* primary key + partition */
CREATE TABLE my_table_all (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all;

数据写入

覆盖整张不分区的表数据

INSERT OVERWRITE my_table SELECT ...

覆盖部分分区

INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...

动态覆盖

Spark的默认覆盖模式是静态分区覆盖。要启用动态覆盖,您需要设置Spark会话配置spark.sql.sources.partitionOverwriteMode为dynamic。

示例:

CREATE TABLE my_table (id INT, pt STRING) PARTITIONED BY (pt);
INSERT INTO my_table VALUES (1, 'p1'), (2, 'p2');

-- Static overwrite (Overwrite the whole table)
INSERT OVERWRITE my_table VALUES (3, 'p1');

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
|  3| p1|
+---+---+
*/

-- Dynamic overwrite (Only overwrite pt='p1')
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE my_table VALUES (3, 'p1');

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
|  2| p2|
|  3| p1|
+---+---+
*/

删除整张表数据

TRUNCATE TABLE my_table;

更新表

Spark支持更新原始类型(PrimitiveType)和结构类型(StructType),例如:

-- Syntax
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;

CREATE TABLE t (
  id INT, 
  s STRUCT<c1: INT, c2: STRING>, 
  name STRING)
TBLPROPERTIES (
  'primary-key' = 'id', 
  'merge-engine' = 'deduplicate'
);

-- you can use
UPDATE t SET name = 'a_new' WHERE id = 1;
UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;

删除表数据

DELETE FROM my_table WHERE currency = 'UNKNOWN';

Merging into table使用

Paimon目前支持Spark 3+中的Merge语法,这允许在单个事务中基于源表执行一系列更新、插入和删除操作。

  1. 在更新子句中,不支持更新主键列。
  2. 不支持WHEN NOT MATCHED BY SOURCE语法。

示例一:

这是一个简单的示例,如果目标表中存在行,则更新该行;如果不存在,则插入该行。

-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *

示例二:

这是一个包含多个条件子句的演示。

-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED AND target.a = 5 THEN
   UPDATE SET b = source.b + target.b      -- when matched and meet the condition 1, then update b;
WHEN MATCHED AND source.c > 'c2' THEN
   UPDATE SET *    -- when matched and meet the condition 2, then update all the columns;
WHEN MATCHED THEN
   DELETE      -- when matched, delete this row in target table;
WHEN NOT MATCHED AND c > 'c9' THEN
   INSERT (a, b, c) VALUES (a, b * 1.1, c)      -- when not matched but meet the condition 3, then transform and insert this row;
WHEN NOT MATCHED THEN
INSERT *      -- when not matched, insert this row without any transformation;

流式写入 && schema evolution

目前两种功能不支持spark sql。

数据查询

Paimon的批量读取会返回表中快照中的所有数据。默认情况下,批量读取返回最新的快照。

Paimon的带时间旅行功能的批量读取可以指定一个快照或一个标签,并读取对应的数据。

需要Spark 3.3+版本。

你可以在查询中使用VERSION AS OF和TIMESTAMP AS OF来进行时间旅行:

-- read the snapshot with id 1L (use snapshot id as version)
SELECT * FROM t VERSION AS OF 1;

-- read the snapshot from specified timestamp 
SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';

-- read the snapshot from specified timestamp in unix seconds
SELECT * FROM t TIMESTAMP AS OF 1678883047;

-- read tag 'my-tag'
SELECT * FROM t VERSION AS OF 'my-tag';

-- read the snapshot from specified watermark. will match the first snapshot after the watermark
SELECT * FROM t VERSION AS OF 'watermark-1678883047356';

如果标签的名称是一个数字且等于快照ID,VERSION AS OF语法将首先考虑标签。例如,如果你有一个基于快照2的标签名为‘1’,那么执行SELECT * FROM t VERSION AS OF ‘1’的语句实际上查询的是快照2,而不是快照1。

批量增量查询

读取从起始快照(不包括)到结束快照之间的增量变更。

例如:

  1. ‘5,10’表示快照5和快照10之间的变更。
  2. ‘TAG1,TAG3’表示从TAG1到TAG3之间的变更。

默认情况下,将扫描生成变更日志文件的表的变更日志文件。否则,将扫描新变更的文件。你也可以强制指定‘incremental-between-scan-mode’。

需要Spark 3.2+版本。

Paimon支持使用Spark SQL来执行由Spark Table Valued Function实现的增量查询。

你可以在查询中使用paimon_incremental_query来提取增量数据:

-- read the incremental data between snapshot id 12 and snapshot id 20.
SELECT * FROM paimon_incremental_query('tableName', 12, 20);

在批量SQL中,不允许返回DELETE记录,因此带有-D标记的记录将被删除。如果你想要查看DELETE记录,你可以查询审计日志(audit_log)表。

流式读取

目前纯spark sql不支持流式读取,具体使用可参考:Streaming Query

关于查询优化

强烈建议在查询中指定分区和主键过滤器,这将加速查询的数据跳过过程。

以下是可以加速数据跳过的过滤器函数:

  • =
  • \<
  • \<=
  • >
  • >=
  • IN (…)
  • LIKE ‘abc%’
  • IS NULL

Paimon会按照主键对数据进行排序,这可以加快点查询和范围查询的速度。当使用复合主键时,为了实现最佳的加速效果,查询过滤器最好能够形成主键的最左侧前缀。

假设一个表具有以下规范:

CREATE TABLE orders (
    catalog_id BIGINT,
    order_id BIGINT,
    .....,
) TBLPROPERTIES (
    'primary-key' = 'catalog_id,order_id'
);

查询通过为复合主键的最左侧前缀指定范围过滤器,从而获得良好的加速效果。

SELECT * FROM orders WHERE catalog_id=1025;

SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;

SELECT * FROM orders
  WHERE catalog_id=1025
  AND order_id>2035 AND order_id<6000;

然而,以下过滤器并不能很好地加速查询。

SELECT * FROM orders WHERE order_id=29495;

SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;

修改表结构/属性

设置表属性

ALTER TABLE my_table SET TBLPROPERTIES (
    'write-buffer-size' = '256 MB'
);

删除表属性

ALTER TABLE my_table UNSET TBLPROPERTIES ('write-buffer-size');

修改/增加表属性

ALTER TABLE my_table SET TBLPROPERTIES (
    'comment' = 'table comment'
    );

表重命名

ALTER TABLE my_table RENAME TO my_table_new;
--- or
ALTER TABLE [catalog.[database.]]test1 RENAME to [database.]test2;

新增列

ALTER TABLE my_table ADD COLUMNS (
    c1 INT,
    c2 STRING
);

重命名列

ALTER TABLE my_table RENAME COLUMN c0 TO c1;

删除列

ALTER TABLE my_table DROP COLUMNS (c1, c2);

删除分区

ALTER TABLE my_table DROP PARTITION (`id` = 1, `name` = 'paimon');

修改字段注释

ALTER TABLE my_table ALTER COLUMN buy_count COMMENT 'buy count';

新增字段位置

ALTER TABLE my_table ADD COLUMN c INT FIRST;

ALTER TABLE my_table ADD COLUMN c INT AFTER b;

修改字段位置

ALTER TABLE my_table ALTER COLUMN col_a FIRST;

ALTER TABLE my_table ALTER COLUMN col_a AFTER col_b;

修改字段类型

ALTER TABLE my_table ALTER COLUMN col_a TYPE DOUBLE;

flink/hive/spark对paimon对兼容性

在之前文章中,使用flink创建paimon表并写入数据,下面测试flink与spark的兼容性。

  1. flink建表,spark写入 — 支持
  2. flink建表,写入一批数据,spark继续写入 — 支持
  3. spark建表,flink写入 — 支持
  4. spark建表,写入一批数据,flink继续写入 — 支持
  5. flink建表,写入数据,hive查 — 支持
  6. spark建表,写入数据,hive查 — 支持
0 0 投票数
文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://lrting.top/backend/14414/

(0)
上一篇 1天前
下一篇 2021-11-12 19:09

相关推荐

订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x