分享嘉宾:黄彬耕 腾讯 数据工程师
编辑整理:田长远
出品平台:DataFunTalk
导读:今天主要分享Iceberg在微视的一些使用情况。全文将围绕下面三点展开:
-
为什么会使用Iceberg
-
目前是怎么用的,解决了哪些问题,以及计划如何使用它
-
在 Iceberg 的表维护场景下的实践
01
为什么会使用Iceberg?
首先看下数仓架构。数仓的数据接入主要有两个来源,一个是客户端的上报,还有一个是业务后台 DB 的上报,这两份数据都会通过一个消息队列接入数仓。我们的数据仓库采用lambda架构,总体分为离线和实时两套体系,分别有自己的计算和存储体系。离线主要是以 Hive 作为存储载体,计算以 Spark 为主,Map Reduce为辅。实时数据处理主要用了 Flink,再辅以Kafka和OLAP。
要解决这些问题,需要分析问题产生的原因。其中一个主要原因是一些传统存储组件,无法很好地在一个组件上支撑实时和离线场景。
首先看Kafka,它的成本相对比较高。跟Hive相比,Kafka每单位的存储成本超过了Hive的10倍。其次,它的数据压缩的效果也比较差,同样的数据写入Kafka,压缩后的数据量相比于Hive,也接近10倍。所以这两个叠加之后,它的成本会比Hive高两个数量级。Kafka虽然可以很好地支持实时读写,但是它对离线场景下的一些需求又支持得不好。在批处理时,它不能像Hive那样做分区过滤和相关优化。在数据回溯的场景,第一,Kafka的存储成本比较高,不适合留存比较久的历史数据;第二,它只能基于一个偏移量去做数据回溯,无法确定这个偏移量对应的数据是什么数据。所以相比之下,Hive基于分区的回溯更能符合我们的回溯需求。
然而 Hive 虽然在离线场景下比较成熟,但是在实时场景提供的支持并不好。
首先,它的延迟比较高。目前Hive提供的延迟比较低的需求是通过使用 Hive 小时表来提供一个大概延迟在两到三个小时的数据。如果这个延时想要再缩减,可能就需要把Hive的分区做到更细的粒度。更细粒度的分区也可能会带来一些问题,比如小文件的问题,会对HDFS的NameNode造成比较大压力,同时它的读取效率也不高。另外,HMS的扩展性问题,Hive的元数据主要是使用MySQL来做存放,MySQL的扩展性不好。而如果我们的分区粒度越细,分区数据越多,那MySQL就更容易遇到扩展性问题。同时,太多的分区也会对查询的效率造成一定影响。因为Hive会首先到元数据去获取这个分区信息的目录,然后再到那个HDFS里面对这些目录做一个list文件的操作,拿到文件之后再去做数据的读取。这个过程涉及到的分区越多,查询就会越慢。
所以基于这些问题,我们希望有一个存储系统,能够很好地同时支持实时和离线的场景。在成本比较低的情况下,满足我们的实时需求。
现在的数据湖文件系统,都可以对实时和离线提供比较好的支持。我们公司内部主要在使用Iceberg,我们把Iceberg与Hive和Kafka做一个对比。
从设计上来看,Hive对离线场景的支持Iceberg都可以做到,而且在某些方面做得更好。比如在批处理中,Iceberg的谓词下推可以做到文件级别的过滤,而Hive主要是在分区级别做过滤,然后在文件内部,利用文件的格式再去进行谓词下推。还有Iceberg通过版本控制,可以做到更好的读写分离。在Hive场景下对一些历史分区做数据压缩,可能会影响线上的读取任务,而Iceberg 不存在这种情况。并且,更为重要的一点是Iceberg提供了更低延时的读写特性。所以从离线存储的角度来讲,用Iceberg替换Hive可以得到很好的收益。
从实时的角度来看,Iceberg与Kafka对比,一个显著优势是Iceberg的成本可以低很多。因为Iceberg跟 Hive 使用的底层存储比较类似,是基于HDFS的,成本可以做到比Kafka低两个数量级;相比于Kafka,Iceberg在实时性上会差一点。因为Kafka可以做到流式的读写,而Iceberg只能做到分钟级别的延迟。但在我们的数据场景中,强实时的场景比较少,因此Iceberg可以很好地支持我们的实时场景,可以在一些新的增量模型中承载流批一体的存储。
02
我们如何使用Iceberg?
我们目前的架构,主要是用Iceberg来替换掉之前Kafka以及OLAP组件来承接一部分实时数据需求,以此降低实数据需求的实现成本。
出于初期实验的目的,我们仍然使用lambda架构,实时和离线的数据并存,虽然Iceberg有能力替换Hive提供的功能,但是由于我们之前已经在 Hive 上实现了存量数仓,整体的迁移成本和风险都比较高,可能会影响数据提供的稳定性,所以目前做整体的迁移是不太现实的。因此,我们初期的应用主要是用Iceberg来提供一个实时数据场景,后续在一些新的模型上再使用Iceberg去完成流批一体的统一存储。
下面介绍一个已经实现的实时需求方案。这个需求是给我们的运营系统提供一些实时的累积数据。我们通过 Hive 先提供一个 T+1 天级别的累积数据,然后再把当天的增量数据通过iceberg来落地。然后在下游配置一个定时调度的推送模块,去做数据的合并,得到最终的实时累计数据,推送到Kafka里面。交付给下游系统。
我们用到了Iceberg的增量读取接口,不需要推送大量的全量数据,只需要推送实时的增量数据即可,同时也不会出现少推数据的情况。这也涉及到了实时维表的需求,因为实时累计数据需要通过一个最终累计的状态表来做。我们一开始尝试了用 Iceberg的upsert功能。但是由于Iceberg只支持copy-on-write的格式,而我们的维表每次更新的数据占全量数据的占比很低,可能只有万分之一。如果用 copy-on-write 模式,每次更新数据就要做一个全量的写入,这样对资源的消耗比较大。所以我们最后还是使用了lambda架构的模式,通过Hive的累计数据和Iceberg的增量数据,在推送时再去做merge。后续也希望可以尝试用Iceberg提供的 merge-on-read 模式去生成实时累计数据来简化流程。
对于这个模式的需求落地,除了从实验角度考虑之外,也需要考虑到表的复用性。所以Iceberg 的模型会参照之前的离线数仓去进行建模的规范,同时还要考虑到数据的复用性。在做早期需求的同时,也为后期更多需求落地打好一个基础。用这种Iceberg的方式实现,相比于之前实时链路的 Kafka 加 OLAP 的方案,成本降低超过99%。
我们并不只是希望 Iceberg 去提供实时的数据,还希望在新的场景中,Iceberg可以承载流批一体的存储。为了实现这个目标,还需要研究Iceberg是否具有在离线的场景下的一些功能,其中一个是数据回溯功能,像表增加字段或者修改计算口径等操作,都需要去数据回溯。另外,如果上游有数据修复,也会需要去回溯重跑一段历史分区。
在离线场景下,数据回溯实现比较简单。
以填表为例,每个任务的实例必须处理一个某一天分区的数据。如果我们想要回溯3月的1-3 号 3 天的数据,只要在调度平台上把这三个实例任务重跑就可以实现了。但是在流批一体存储的场景下,表可能是使用Flink生成的,Flink的回溯可能会稍有不同,因为它是一个线上一直在运行的任务,无法通过直接重跑的方式去做回溯。可能需要通过复制Flink应用的方式来复用它的数据生产代码,再通过修改参数的方式让Flink的source进入一个回溯的批读模式,最后再通过传参的方式指定回溯的数据时间范围。当然,现在社区在新版本Flink中,source也新增了可以实现这个功能的接口。但是它还存在一些缺陷,比如Flink的source是没有状态的,也就是在回溯任务失败,重启执行后可能会产生一些重复数据。这是因为在第一次跑的时候已经有一些 check 成功了,提交了部分数据,而这时发生了故障失败重启,重启之后的任务又会重新读取source 数据,那么,第一次运行时提交的数据就变成了重复数据。在数仓里,数据回溯之后产生重复数据是不能接受的。所以还是希望它可以实现exactly once的语义,也就是数据一致性。
我们通过给source增加状态的方式来实现这个功能。在之前一个比较老版本的Flink上,我们给它的source增加了一个切块的功能。
首先Flink source由两个部分组成。
第一部分是一个 source 算子,主要负责一个单线程的文件扫描,然后把扫描的文件下发到下游多节点的FlatMap算子上。然后FlatMap主要负责把这个文件数据读取出来,再下发给下游做数据处理。我们就可以在source节点上增加一个状态。把它读取出来的文件,首先按照表的 partition做聚合,得到一个map的结构。然后map的每一个key对应一个分区的数据,接下来就可以按分区做文件的下发,先下发完一个分区,再接着下发第二个分区的数据。第二个要做的事是把checkpoint按照partition对齐,保证每一个checkpoint提交上去的都是一个完整的partition。我们实验的方法是:如果partition在没有完成下发的情况下到了checkpoint,就抛异常,失败掉,这样可以保证在一个partition下发完成之后,再等待一个checkpoint成功,来保证一次提交是一个完整的分区。
这种处理方式,在partition等待checkpoint的过程中有一些性能的开销,我们可以通过传参的方式指定一个checkpoint提交的分区数量。将等待的时间平摊到多个分区上,来提升整体的性能。加入了这个状态之后,在失败重启的场景下, source可以跳过已经消费过的分区,下发就不会产生重复的数据,这样就实现了exactly once的语义。
还需要支持的另一种场景是流转批场景,如果使用Iceberg做流批一体的存储,在上游的明细表,主要是ODS和DWD层的表可能会使用Flink生成。但这个表的二次加工可能会使用批处理去做计算。假设批处理是一个3月2号的实例,我们需要在3月3号时触发计算。那这个触发的批处理任务就需要知道上游的数据表什么时候的数据是完备的。如果简单地用一个延时调起方法的话,在一些异常的场景下会出现问题,比如上游的 Flink链路出现了问题,导致数据没有产生或者迟到了,那批处理的任务处理的就是一个不完整的数据或者是空跑。如果是人为发现了之后再去做回溯,成本会较高。
对此,我们使用的方案是通过在Flink的Sink里面,从数据中选取一个时间字段写入表的快照的方式去通知下游当前的数据进度,可以看到这里的Flink Sink也包含了两个算子。第一个是一个 writer 算子,它负责把数据写入文件,writer在checkpoint触发时,会把自己写入的最大的一个时间传到commit中,然后commit从多个上游传过来的时间中选取一个最小值作为这一批提交数据的时间,并写入表的元数据。
在下游的批处理任务之前加一个监控任务去监控最新快照元数据。如果它的时间已经超过了当前的分区时间,就认为这个表的数据已经完备了,这个monitor任务就会成功触发下游的批处理任务进行计算,这样可以防止在异常场景下数据管道或者批处理任务空跑的情况。
我们在完善Iceberg在批处理场景下的功能之后,可以设计一个流批一体的架构,虽然看起来总体上还是一个lambda架构,但它有一些改进。首先,它在生成DWD的过程中,统一使用了Flink计算引擎去生成,并进行双写,一份写入Iceberg,另外一份写入Kafka。如果没有强实时的需求,很多数据都不需要再走Kafka这条链路。所以在DWD层可以做到计算引擎的统一。其次MQ的数据除了被Flink任务消费,还会同步一部分落地到ODS层,用作回溯数据的支持。
在我们用Iceberg替换掉Hive之后可以做一个准实时以及离线场景下的一个流批统一存储,很大程度上解决我们之前遇到的很多指标重复计算带来的口径不一致问题,还有一些冗余的存储开销,节约我们的准实时需求的成本。
我们会在新的一些场景下去落地这个流批一体的方案。
03
在Iceberg的表维护场景下的实践
Iceberg表在维护过程中比Hive稍微复杂一些,Hive只需要清除过期数据即可,但是Iceberg除了清除过期数据,还要做过期快照的删除、小文件的合并、元数据的合并,还包括清除一些孤儿文件。
这些功能大部分平台侧都可以完成。这里讲一下小文件合并的实践。因为Iceberg会用Flink去生成表的数据,Flink的提交的批次间隔比较小,就带来了更多的小文件,因此需要定期去做合并。合并主要是通过Spark实现的。Spark对小文件合并主要有两种策略,一种是BinPack背包策略,另外一种是加入了排序逻辑的Sort策略。背包策略主要是把小文件加入到相同大小到背包里面去做合并,最后每一个背包就是一个合并后的文件。Sort策略在合并小文件的基础上,会做一个分组排序的功能,使用我们指定的一个字段去做分组排序,使得这个字段在各个分区之间整体是有序的,同时每个分区内部也是有序的。分组排序获得的收益主要是可以减少表的大小。
我们一般在明细表上做分组排序,用Sort策略做小文件合并。相比于背包策略,合成后表的大小可以缩减 40%到70%。表大小的缩减带来的直接收益是二次读写的时间缩短和点查效率大幅提升,这主要得益于使用这个排序字段做点查时,Flink提供了一个谓词下推的文件过滤效果。
简单分析一下原理。
首先是存储收益,因为Iceberg主要是基于Parquet列存,这种存储格式会通过对数据进行编码以及压缩算法来压缩数据。如果某一列数据的局部相似度很高,那压缩算法就可以更好地发挥它的作用,生成一个更小的数据文件。微视的明细表有一个特征就是大量的字段都跟用户相关。所以我们按照用户ID做数据排序之后,大量的字段相邻的值都是相同或者相似的。最终落地出来的文件大小会比没排序之前小很多。
第二是文件过滤,这个主要得益于Iceberg在元数据的My Manifest文件里面保存了每一个列值的上下界。在查询时,可以将查询的条件和每一列的上下界做对比。如果发现这个值不可能存在这个Data范围中,那么在文件扫码阶段就可以把这个文件过滤掉。
上图是我们做的数据对比,左边一列是使用BinPack策略合并小文件的结果,右边是用Sort策略合并小文件的结果。对于同一份数据进行合并,合并之后的总文件数量大概是70多个,此时查询3个用户的数据,在使用BinPack策略合并后,需要扫描77个文件,也就是没有做任何的文件过滤。但是在Sort排续策略之后,由于已经将其他部分的文件进行了过滤,用户只需要扫描3个文件就可以。如果是查询1个用户的话,就只需要访问一个文件,这个效果是比较明显的。
04
问答环节
Q:数据在Iceberg中进行打宽或者聚合,用SQL方式多久调度一次?在涉及到 join的场景下,如果是表A的增量去join表B的全量,那么是表B的增量去校验A表的全量吗?
A:对于这两个问题,首先调度的频率是根据数据需求的实效性来决定的,如果这是一个离线的T+1的需求的话,就是天级别的调度,如果对实效性要求比较高,可以考虑用Flink去做分钟级别的聚合。基本上实效性要求越高,需要的代价就越高。对于Join的场景,是这样去做的。这里主要考虑的是实时场景下的Join,因为在离线场景中Join是没有什么问题的,因为它的实效性要求不高。如果是在实时场景下,我们需要做一些维表规划,如果数据有很强的实效性要求,应该尽量避免出现Join的情况。
Q:Iceberg数据湖底层存储跟离线数仓底层存储都是使用同一个集群吗?
A:可以是同一个集群,但不是必须的。
Q:使用Iceberg代替Hive来进行建模,对于一些实时需求,更新频率是怎么样的?每一层都需要更新是否影响性能?
A:对一些实时的需求,时效性要求比较高,那层次就不能太多,因为每一层都会引入一些数据延时。对于需求场景,实时需求可能从DWD层就直接出这个指标。如果DWD层的读取效率比较低的话,在下游的ODS层可以做一些聚合,然后再从DWS层输出数据。总体而言,实时性要求越高的场景,模型的层次就不能太多。
在文末分享、点赞、在看,给个3连击呗~
分享嘉宾:
本文为从大数据到人工智能博主「jetty」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/7908/