Hudi在兴盛优选数据湖应用中的实践

Hudi在兴盛优选数据湖应用中的实践

1.背景

1.1 业务背景


Hudi作为主流的数据湖产品之一,解决了Hive无法更新的场景,由于支持ACID事务能力,所以也就能够很好的支撑实时读写,这在架构上就为流批一体提供了条件,越来越多的被业界同行的数仓团队所采用。在我们的数仓架构中,Hudi作为实时数仓的主要存储方式,如下图所示:

Hudi在兴盛优选数据湖应用中的实践图一 数仓架构图
以ODS层的数据为例,通过同步工具将业务的数据(Binlog)写入Kafka,然后通过Spark Streaming写入Hudi,此种方式也是Hudi社区目前主推的写入方式。

但随着业务的发展,使用Spark Streaming写入Hudi这种入湖方式带来的成本问题也越来越严重,目前需要写入Hudi的业务表接近1000多张,单这一块消耗的计算成本就在9W+每月。为此,如何降低Hudi的写入资源消耗是一个极具挑战且有实践意义的问题。本文将介绍数据存储团队通过分析Hudi的源码架构,然后改造Hudi写流程,从而达到节约85%以上成本的实现方案。

1.2 技术背景


Hudi是一种数据湖的存储格式,在Hadoop文件系统之上提供了方便数据更新和删除的能力以及读取历史快照数据的能力,这些是Hive所不具备的特性。目前社区提供的数据导入方式主要是基于Spark和Flink等,在实际写入中,不管多少数据,一个数据表至少需要1个Core,1G内存(简称1C1G),如果算上Spark的Dirver或者flink ResouceManger等就需要2C2G。

另外,以Hudi(参见图二)为例,实际写入过程中链路很长,对于MOR(原理参考第二章)场景,每次写入数据必须和现有数据进行Tag(判断数据是否已经存在),严重影响了写入的性能。社区提供了主要两种索引,一种是基于bloomFilter,一种是基于Hbase。BloomeFilter因为存在假阳性,所以当判断为“存在”时,需要分区内历史存量数据进行二次确认比较。而HBase索引方案需要存储历史所有的Index,所以成本也比较高。

Hudi在兴盛优选数据湖应用中的实践图二 SparkStream写Hudi流程
综上,目前的数据湖写入主要有两个问题:1、采用主流的Spark和Flink等大数据工具写入小量数据到数据湖时有存在资源的浪费,哪怕每天只有几条数据写入,至少需要2C2G来完成该功能。2、对于MOR表,每次写入数据都必须tag已有磁盘数据,严重影响的写入性能。

1.3 业务特征


特征1

Hudi以解决数据的更新为主,按照业务情况更新情况如表1所示,在实际场景中99.9%都是前5种场景,要么是数据本身比较小,比如维度数据,要么是周期性数据,周期数据一般采用时间分区,或基于每小时,或基于每天构建一个分区,只有最近少量周期的数据进行更新,历史数据的更新情况比较低,前五种情况的共同结论是只需要对少量的数据进行索引即可。从我们的业务来看,目前只存在前面5种情况,第6种情况不存在。

Hudi在兴盛优选数据湖应用中的实践
表一 数据更新分布和索引策略
特征2

某些数据表的数据更新操作符合齐普夫分布,如下图所示,部分长尾的分区数据少量更新却要消耗大量的资源,不是特别合理。

Hudi在兴盛优选数据湖应用中的实践图三 长尾分布类图

1.4 总结


基于以上业务和技术背景,我们设计了一种数据湖优化写的方法,以达到节约数据湖写入资源的目的。

2.Hudi数据写入原理

Hive只能基于Partition做更新和删除,无法基于主键做实时的更新,那Hudi是怎么解决该问题的呢?

本章将详细介绍Hudi的2种数据写入方式:COW(Copy on Write)和MOR(Merge on Read),通过这两种写入方式的介绍进行Hudi的数据更新流程的剖析。

2.1 COW(Copy on Write)写入原理


COW每次写入数据涉及更新时,都会将历史数据进行Copy,然后和新数据合并,写入一份新的数据,图四是使用COW模式单分区写入的示意图。在COW模式的分区中,每次新数据进来,都会与历史数据进行合并,data version0的数据,会写到version1,然后写到version2,最后到versionN+1,每次写数据在分区内都和历史数据进行合并,写出新的数据,从而达到更新。

数据湖的业务一般采用微批的方式进行数据的写入,不会像OLTP的业务一样单条数据提交。尽管做了攒批的处理,每次提交的时候仍然需要读取历史数据,进行合并,写入新的文件,存在明显的写放大情况,这样会导致写入时的性能受到影响,特别是长尾数据的情形,写放大更为明显,因此这种方式一般用于写少读多的场景。

Hudi在兴盛优选数据湖应用中的实践图四 COW格式表的数据写入原理

2.2 MOR(Merge on Read)写入原理


相对应COW格式表造成的写放大问题,还存在一种MOR格式表,MOR对于每次新来的数据,都会和历史数据进行比较(Tag过程),拆分为insert(历史不存在相同key数据时)和update(历史存在相同key数据时)两种数据,然后分别写入base文件和log文件。当log文件达到一定量时,也会将base和log文件进行合并,减少文件个数,以加速查询,具体流程如图五。每次新来的数据,在分区内都会和base文件先比较,根据比较结果分为update数据和insert数据,insert数据append到base file里去,update会写到log文件中,这种方式减少了base文件的写放大,因此相比COW格式表来说更加适合写多读少场景。

Hudi在兴盛优选数据湖应用中的实践图五 MOR格式表的数据写入原理
在我们实际的业务中都是采用的是MOR的方式,写多读少的方式。MOR能解决多次合并的带来的读写放大问题,但是每次都需要和分区内的历史数据比较(Tag过程),用时且消耗资源过多。

2.3 总结


从以上的两个流程分析来看,不管是COW还是MOR格式的表,都存在资源消耗过大的问题,需要进一步的优化方案去解决资源消耗的问题。

3.Hudi写入架构优化

3.1 整体架构


整体架构采用Master-Worker架构,Master负责接收新任务,Worker负责执行拉取数据完成整个数据的接收和写入Hudi表中。

Hudi在兴盛优选数据湖应用中的实践图六 Hudi服务化写架构
Master负责任务分发,故障处理以及负责均衡,worker负责具体的业务处理。

采用一个Worker服务器管理一个或多个表,一个表有一个或多个分区,一个分区管理一个Index,所述方法包括以下步骤:

  • 数据通过客户端写入,或直接设置kafka consumer读取数据进行写入。
  • 数据写入时会进行分区所属的确定,同时对于该条数据会查询对应分区的Index进行判断该数据是属于更新还是新增,更新和新增两个不同的数据分别写入不同的内存区域,同时更新Index数据并将数据写入WAL文件。
  • 当数据达到一定规模或者时间达到一定阈值时,将内存的数据flush进入数据湖的文件系统中,新增数据写入insert文件中(同base文件),更新数据写入update文件(同log文件)。
  • Index采用BloomFilter+RocksDB构建,按分区构建Index,Index只保留一段周期,一般为一周或一个月,定时删除历史分区的Index。
  • 通过Java线程替换了Spark/Flink进程,对于小数据量的表节约了大量的资源。通过Index的优化,加速了tag过程。

3.2 Index构建处理


通过索引的分区裁剪和索引结构优化,在成本和效率上都有极大提高,但是对于分区Index的构建和释放是否可以进一步优化,本节进行进一步的探讨,如图七所示的三种场景:

Hudi在兴盛优选数据湖应用中的实践图七 MOR的三种数据场景
场景1,已经存在base文件,新来数据k=2,v=5(key为主键,value为其他值),因为base文件中存在k=2,写入log文件。

场景2,已经存在base文件,新来数据k=2,v=5,但是此时base文件里没有k=2,将新来数据追加到base文件。

场景3,存在base文件,新来数据k=2,v=5,虽然此时base文件中不存在k=2,但与现有技术不同的是,本设计依旧将新来数据强制写入log文件。

以上三种场景,查询的结果是一样的,以场景1和场景3对比,不管base文件中是否存在数据,都写入log文件,依旧能正常查询到数据。

从上可以看出,如果对于新写进来的数据当前的读取采用了类似full join+group by的方式进行,既然采用了full join的方式,那么就涉及两张表的join的效率问题,因为log文件当中可能包含重复key,所以数据量可能会比较大,当分区进入某一阶段的时候,即新增的数据不是特别多的时候,就可以直接将数据写入log文件,从而不影响读的性能。从前文我们可以知道,当前分区Index主要用于写入数据是更新还是新增的判断,我们在一定阶段时候,可以将全部的数据写入log文件,那么此分区的索引就可以删除,便于节省索引的存储成本。

在讨论写入log文件之前,我们先来看下hudi分区的文件组成,如下图所示:

Hudi在兴盛优选数据湖应用中的实践
图八 Hudi分区当中的文件组成
一个分区由多个basefile和logfile组成,basefile存储的是经过合并过之后的文件,里面的数据不存在重复的主键,logfile当中存储的是更新的数据,里面可能存在重复的数据,同时根据key的分布,basefile和logfile是一一对应的,这意味着logfile1当中的数据必须出现在basefile1当中。但是数据落在哪个baseifle当中是根据key去索引当中排查的,当分区索引被删除的时候,就需要一种规则去规范key和basefile的关系,这样新写入的数据不又查索引就可以根据规则写入logfile了。关于key和basefile的映射关系可以基于hash或范围分组。图九以基于hash为例:

Hudi在兴盛优选数据湖应用中的实践图九 基于hashrepartition过程
图九采用的是hash的规则进行重新分组,重新分组后,每条数据能确定唯一的所属文件,比如新写入数据hash(key)=123,重新分区后的文件个数为10,那么数据将写入第四(123%10+1=4)个文件。因此通过repartition方案,每条数据都能找到对应base文件,base文件repartition时会构建Bloom Index用来加速查询。repartition后分区内将不再产生新的base文件,所有的数据都写入对应的log文件,读时采用full join方式。系统也会定时将base文件和log文件合并,但这种合并不会改变分组规则。随着单个base文件大小增大,可以再次进行repartition。

综上,社区的MOR架构中,可以增加日增量的统计,并进行数据的拟合,然后通过公式求出分割点,也可更具数据条数,或者设定阈值进行求出分割点,分割点前的数据正常处理,将new data分为update和insert分别写入log文件和base文件,分割点后的数据不进行base扫描,直接写入log文件,具体如图十所示。

Hudi在兴盛优选数据湖应用中的实践图十 repartition流程
对于完全Index方式,在分割点后,可以将Index移除,不再需要通过Index来判断是update还是insert,全部写入log文件中。对于非完全Index,同样减少了tag过程,将数据全部写入log文件。

对于COW场景,在repartition后,每批数据也能快速找到对于的base文件,然后通过Bloom Index比较,如果完全都是insert数据,可直接append,如果有upsert数据,直接copy和merge。

当数据量小于一定值时,无需进行拟合,可直接进入优化方法处理流程;如果业务可预知性切割点,也可不用拟合。

3.3 优化效果评估


通过设计出分布式的写Hudi架构,替换了原始的Spark Streaming方案,将资源使用最大化,通过自研的完善tag流程和repartition流程,加速了Tag过程,减少了Index占用,以及加速查询过程。

提供的基于服务化的数据湖产品,能够节约90%的资源内核和cpu。主要节约点在,小数据量任务不需要像Spark Streaming或者Flink一样长期占用资源,通过资源共享来节约资源。大任务加快了tag过程,同时通过缓存元数据,能加速写入过程。通过提高效率来节约资源。实际测试结果从28KB/s/1c3g提升到 10MB/s/1c1g。提升效果300倍以上,考虑到Index对资源消耗,综合考虑,资源情况也在10倍以上的优化效果。

repartition后Index数据可以删除,因此相比长期维护Index而言,节约了95%以上(数据一年内有更新,既Index保留1年)。

repartition过程会重新规整数据的分别,不仅有利于写入过程,而且有利于读过程,很大程度上降低了查询时间,主要原因是数据进行repartition后,能加快定位功能,减少了文件的扫描。

4.总结与展望


本次主要改造了Hudi的写入逻辑,通过提升数据写入性能,以及基于数据的特性,完成对Index的清除,实现了资源的85%以上的降本,以目前的运行情况,预估每年实现降本百万元以上,随着导入任务的增加,节约成本成线性的增加。

数据入湖只是数据湖的第一步,入湖之后存在计算和查询。目前的计算成本仍旧较高,后续计划在计算上发力,目前已完全将数据湖融入到调度系统中,提供大量方便计算的计算算子。其次就查询而言,基于SparkSql或者Presto的对于Hudi的查询性能还有待提升,目前我们的另外一个项目ShapleyDB引擎也在逐步完善,后续会将该引擎与Hudi或者Hive对接,加速查询效率,可以实现实时数仓,实现全方位的湖仓一体。

0 0 投票数
文章评分

本文转载自兴盛优选技术社区,原文链接:https://mp.weixin.qq.com/s/A3uEw5bjnC0c-HAo0dRSfA。

(1)
上一篇 2023-01-01 19:41
下一篇 2023-01-02 15:02

相关推荐

订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x