问题描述
在hudi 0.12.0版本,flink和spark都可以基于hive metastore进行元数据管理,更多信息可参考:hudi HMS Catalog指南。也就是说基于hudi hms catalog,flink建表之后,flink或者spark都可以写,或者spark建表之后,spark或者flink都可以写。但是目前 hudi 0.12.0版本中存在一个问题,当使用flink hms catalog建hudi表之后,spark sql结合spark hms catalog将hive数据进行批量导入时存在无法导入的情况,具体复现方式与版本如下:
hudi 0.12.0
flink 1.13.6
spark 3.3.0
hive (HDP 3.1.4版本)
flink sql建表
create catalog hudi with(
'type' = 'hudi',
'mode' = 'hms',
'hive.conf.dir'='/etc/hive/conf'
);
--- 创建数据库供hudi使用
create database hudi.hudidb;
CREATE TABLE IF NOT EXISTS hudi.hudidb.store_returns_hudi_4(
sr_returned_date_sk BIGINT NOT NULL,
sr_return_time_sk BIGINT,
sr_item_sk BIGINT,
sr_customer_sk BIGINT,
sr_cdemo_sk BIGINT,
sr_hdemo_sk BIGINT,
sr_addr_sk BIGINT,
sr_store_sk BIGINT,
sr_reason_sk BIGINT,
sr_ticket_number BIGINT,
sr_return_quantity INT,
sr_return_amt DOUBLE,
sr_return_tax DOUBLE,
sr_return_amt_inc_tax DOUBLE,
sr_fee DOUBLE,
sr_return_ship_cost DOUBLE,
sr_refunded_cash DOUBLE,
sr_reversed_charge DOUBLE,
sr_store_credit DOUBLE,
sr_net_loss DOUBLE,
insert_time STRING,
PRIMARY KEY(sr_returned_date_sk) NOT ENFORCED
)
with (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'hive_sync.conf.dir' = '/etc/hive/conf',
'write.precombine.field' = 'sr_ticket_number',
'hoodie.datasource.write.hive_style_partitioning'='false',
'index.bootstrap.enabled' = 'true'
);
hive中建表以及导入数据:
CREATE TABLE store_returns (
sr_returned_date_sk BIGINT,
sr_return_time_sk BIGINT,
sr_item_sk BIGINT,
sr_customer_sk BIGINT,
sr_cdemo_sk BIGINT,
sr_hdemo_sk BIGINT,
sr_addr_sk BIGINT,
sr_store_sk BIGINT,
sr_reason_sk BIGINT,
sr_ticket_number BIGINT,
sr_return_quantity INT,
sr_return_amt DOUBLE,
sr_return_tax DOUBLE,
sr_return_amt_inc_tax DOUBLE,
sr_fee DOUBLE,
sr_return_ship_cost DOUBLE,
sr_refunded_cash DOUBLE,
sr_reversed_charge DOUBLE,
sr_store_credit DOUBLE,
sr_net_loss DOUBLE)
TBLPROPERTIES (
'bucketing_version' = '2',
'transient_lastDdlTime' = '1658370656');
insert into store_returns select 1,29496,26309,41563,325271,1894,196917,86,6,240001,43,79.55,4.77,84.32,63.0,212.42,62.84,0.83,15.88,280.19;
使用spark sql导入数据
insert into hudidb.store_returns_hudi_4(
sr_returned_date_sk,
sr_return_time_sk,
sr_item_sk,
sr_customer_sk,
sr_cdemo_sk,
sr_hdemo_sk,
sr_addr_sk,
sr_store_sk,
sr_reason_sk,
sr_ticket_number,
sr_return_quantity,
sr_return_amt,
sr_return_tax,
sr_return_amt_inc_tax,
sr_fee,
sr_return_ship_cost,
sr_refunded_cash,
sr_reversed_charge,
sr_store_credit,
sr_net_loss,
insert_time
)
select sr_returned_date_sk,
sr_return_time_sk,
sr_item_sk,
sr_customer_sk,
sr_cdemo_sk,
sr_hdemo_sk,
sr_addr_sk,
sr_store_sk,
sr_reason_sk,
sr_ticket_number,
sr_return_quantity,
sr_return_amt,
sr_return_tax,
sr_return_amt_inc_tax,
sr_fee,
sr_return_ship_cost,
sr_refunded_cash,
sr_reversed_charge,
sr_store_credit,
sr_net_loss,cast(current_timestamp() as string);
具体报错如下:
Error: org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.spark.sql.AnalysisException: Cannot write incompatible data to table 'hudidb.store_returns_hudi_4':
- Cannot write nullable values to non-null column 'sr_returned_date_sk'
at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:43)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:325)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:230)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:230)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:225)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:239)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.AnalysisException: Cannot write incompatible data to table 'hudidb.store_returns_hudi_4':
- Cannot write nullable values to non-null column 'sr_returned_date_sk'
at org.apache.spark.sql.errors.QueryCompilationErrors$.cannotWriteIncompatibleDataToTableError(QueryCompilationErrors.scala:1535)
at org.apache.spark.sql.catalyst.analysis.TableOutputResolver$.resolveOutputColumns(TableOutputResolver.scala:59)
at org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.resolveOutputColumns(HoodieSpark3CatalystPlanUtils.scala:38)
at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.coerceQueryOutputColumns(InsertIntoHoodieTableCommand.scala:159)
at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.alignQueryOutput(InsertIntoHoodieTableCommand.scala:142)
at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:99)
at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:60)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:291)
... 16 more (state=,code=0)
问题分析
通过分析代码以及查看表属性,发现flink建表对应的hive metastore中spark.sql.sources.schema.part.0配置对应的value中字段sr_returned_date_sk的nullable属性为false,而如果通过spark建上述表的话,该字段属性是true的。
可判断flink在创建hive metastore中创建hudi表时,构建的给spark用的参数存在问题,也就是对应
HoodieHiveCatalog.instantiateHiveTable中的
serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys));
其中translateFlinkTableProperties2Spark方法如下
public static Map<String, String> translateFlinkTableProperties2Spark(
CatalogTable catalogTable,
Configuration hadoopConf,
Map<String, String> properties,
List<String> partitionKeys) {
Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(
partitionKeys,
sparkVersion,
4000,
messageType);
properties.putAll(sparkTableProperties);
return properties.entrySet().stream()
.filter(e -> KEY_MAPPING.containsKey(e.getKey()) && !catalogTable.getOptions().containsKey(KEY_MAPPING.get(e.getKey())))
.collect(Collectors.toMap(e -> KEY_MAPPING.get(e.getKey()),
e -> e.getKey().equalsIgnoreCase(FlinkOptions.TABLE_TYPE.key()) ? VALUE_MAPPING.get(e.getValue()) : e.getValue()));
}
于是,我们可以考虑将spark.sql.sources.schema.part.0对应的value中字段的nullable属性改为true,即对上述方法进行如下修改即可:
public static Map<String, String> translateFlinkTableProperties2Spark(
CatalogTable catalogTable,
Configuration hadoopConf,
Map<String, String> properties,
List<String> partitionKeys) {
Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(
partitionKeys,
sparkVersion,
4000,
messageType);
properties.putAll(replaceSparkTableProperties(sparkTableProperties));
return properties.entrySet().stream()
.filter(e -> KEY_MAPPING.containsKey(e.getKey()) && !catalogTable.getOptions().containsKey(KEY_MAPPING.get(e.getKey())))
.collect(Collectors.toMap(e -> KEY_MAPPING.get(e.getKey()),
e -> e.getKey().equalsIgnoreCase(FlinkOptions.TABLE_TYPE.key()) ? VALUE_MAPPING.get(e.getValue()) : e.getValue()));
}
public static Map<String, String> replaceSparkTableProperties(Map<String, String> originProperties) {
for (String key : originProperties.keySet()) {
if (key.contains("spark.sql.sources.schema.part.")) {
originProperties.put(key, originProperties.get(key)
.replaceAll(" ", "").replaceAll("\"nullable\":false", "\"nullable\":true"));
}
}
return originProperties;
}
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/10360/