hudi 异步clustering

在之前的一篇文章中,我们引入了一种新的名为clustering的表服务,它可以重组数据,从而在不影响写入速度的情况下提高查询性能。 我们学习了如何设置inline clustering。 在这篇文章中,我们将讨论自那以后发生的变化,并看看如何使用HoodieClusteringJob和DeltaStreamer实用工具来设置异步clustering。

总览

在较高的层次上,集群基于可配置的策略创建计划,根据特定的标准对符合条件的文件进行分组,然后执行计划。 Hudi支持多写入器,它在多个表服务之间提供快照隔离,从而允许写入器在后台运行clustering时继续输入。 要了解clustering架构的更详细概述,请查看之前的博客文章。

Clustering 策略

如前所述,clustering计划和执行都依赖于可配置策略。 这些策略大致可分为三类:clustering计划策略、执行策略和更新策略。

计划策略

此策略在创建clustering计划时发挥作用。 它有助于决定应该对哪些文件组进行clustering。 让我们看看Hudi的不同计划策略。 注意,使用这个配置,这些策略都是插件式的。

SparkSizeBasedClusteringPlanStrategy:它根据基本文件的小文件限制选择文件片,并创建clustering组,每个组的最大文件大小为允许的最大文件大小。 最大大小可以使用这个配置来指定。 这种策略对于将中等大小的文件拼接到更大的文件中,以减少大量文件在冷分区上的传播非常有用。

SparkRecentDaysClusteringPlanStrategy:它回顾以前的“N”天分区,并创建一个计划,将这些分区中的“小”文件片clustering起来。 这是默认策略。 当工作负载是可预测的并且数据是按时间划分的时候,它会很有用。

SparkSelectedPartitionsClusteringPlanStrategy:如果您只想在一个范围内clustering特定的分区,无论这些分区是旧的还是新的,那么这个策略可能是有用的。 要使用这个策略,需要额外设置以下两个配置(开始分区和结束分区都包括在内):

hoodie.clustering.plan.strategy.cluster.begin.partition
hoodie.clustering.plan.strategy.cluster.end.partition

所有的策略都是分区感知的,后两个策略仍然受第一个策略的大小限制。

执行策略

在规划阶段构建clustering组之后,Hudi对每个组应用执行策略,主要基于排序列和大小。 可以使用此配置指定策略。

SparkSortAndSizeExecutionStrategy是默认策略。 当使用此配置进行clustering时,用户可以指定要对数据进行排序的列。 除此之外,我们还可以为clustering生成的parquet文件设置最大文件大小。 该策略使用大容量插入将数据写入新文件,在这种情况下,Hudi隐式地使用分区程序根据指定的列进行排序。 通过这种方式,该策略改变了数据布局,不仅提高了查询性能,还自动平衡了重写开销。

现在,这个策略可以作为单个spark作业执行,也可以作为多个作业执行,这取决于在规划阶段创建的clustering组的数量。 默认情况下,Hudi将提交多个spark作业并合并结果。 如果你想强制Hudi使用单个spark作业,设置执行策略类配置为SingleSparkJobExecutionStrategy。

更新策略

目前,clustering只能被调度到没有接收到任何并发更新的表/分区。 默认情况下,更新策略的配置设置为SparkRejectUpdateStrategy。 如果某个文件组在集群期间有更新,那么它将拒绝更新并抛出异常。 然而,在某些用例中,更新非常稀疏,并且不涉及大多数文件组。 简单地拒绝更新的默认策略似乎不公平。 在这些用例中,用户可以将配置设置为SparkAllowUpdateStrategy。

我们讨论了关键的有用配置。 这里列出了与cluster相关的所有其他配置。 在这个列表中,一些非常有用的配置是:

Config key Remarks Default
hoodie.clustering.async.enabled 启用clustering服务的运行,当写入发生在表上时异步运行。 False
hoodie.clustering.async.max.commits 通过指定应该触发多少提交clustering,来控制异步clustering的频率。 4
hoodie.clustering.preserve.commit.metadata 重写数据时,保留已有的_hoodie_commit_time。 这意味着用户可以在集群数据上运行增量查询,而不会产生任何副作用。 False

异步Clustering

在前面,我们已经看到了用户如何设置inline cluster。 此外,用户可以利用HoodieClusteringJob来设置两步异步clustering。

HoodieClusteringJob

随着Hudi 0.9.0版本的发布,我们可以在同一个步骤中调度和执行clustering。 我们只需要指定-mode或-m选项。 有三种模式:

  1. schedule:制定clustering计划。 这提供了一个可以在执行模式中传递的瞬间。

  2. execute:在给定的瞬间执行clustering计划,这意味着这里需要指定–instant-time。

  3. scheduleAndExecute:首先制定一个clustering计划,然后立即执行。

注意,要在原始写入器仍在运行时运行此作业,请启用多写入:

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider

设置HoodieClusteringJob的spark-submit命令示例如下:

spark-submit \
--class org.apache.hudi.utilities.HoodieClusteringJob \
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \
--props /path/to/config/clusteringjob.properties \
--mode scheduleAndExecute \
--base-path /path/to/hudi_table/basePath \
--table-name hudi_table_schedule_clustering \
--spark-memory 1g

clusteringjob.properties如下

hoodie.clustering.async.enabled=true
hoodie.clustering.async.max.commits=4
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
hoodie.clustering.plan.strategy.small.file.limit=629145600
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.sort.columns=column1,column2

HoodieDeltaStreamer

这就引出了用户在Hudi中最喜欢的实用程序。 现在,我们可以用DeltaStreamer触发异步clustering。 只需将hoodie.cluster .async.enabled配置设置为true,并在属性文件中指定其他clustering配置,这些配置的位置可以在启动deltastreamer时作为-props传递(就像HoodieClusteringJob的情况一样)。

设置HoodieDeltaStreamer的spark-submit命令示例如下:

spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \
--props /path/to/config/clustering_kafka.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--table-type COPY_ON_WRITE \
--target-base-path /path/to/hudi_table/basePath \
--target-table impressions_cow_cluster \
--op INSERT \
--hoodie-conf hoodie.clustering.async.enabled=true \
--continuous

Spark Structured Streaming

我们还可以使用Spark结构化的流接收器启用异步clustering,如下所示。

val commonOpts = Map(
   "hoodie.insert.shuffle.parallelism" -> "4",
   "hoodie.upsert.shuffle.parallelism" -> "4",
   DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
   DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
   DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
   HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)

def getAsyncClusteringOpts(isAsyncClustering: String, 
                           clusteringNumCommit: String, 
                           executionStrategy: String):Map[String, String] = {
   commonOpts + (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
           HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,
           HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key -> executionStrategy
   )
}

def initStreamingWriteFuture(hudiOptions: Map[String, String]): Future[Unit] = {
   val streamingInput = // define the source of streaming
   Future {
      println("streaming starting")
      streamingInput
              .writeStream
              .format("org.apache.hudi")
              .options(hudiOptions)
              .option("checkpointLocation", basePath + "/checkpoint")
              .mode(Append)
              .start()
              .awaitTermination(10000)
      println("streaming ends")
   }
}

def structuredStreamingWithClustering(): Unit = {
   val df = //generate data frame
   val hudiOptions = getClusteringOpts("true", "1", "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
   val f1 = initStreamingWriteFuture(hudiOptions)
   Await.result(f1, Duration.Inf)
}

结论与未来工作

在这篇文章中,我们讨论了不同的clustering策略以及如何建立异步cluster。 故事还没有结束,未来的工作包括:

  1. 支持updates clustering。

  2. CLI工具支持clustering。

请跟随本JIRA了解更多关于这个问题的积极发展。

3 1 投票
文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://lrting.top/backend/2023/

(0)
上一篇 2021-11-12 19:25
下一篇 2021-11-12 19:30

相关推荐

订阅评论
提醒
guest
0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x