Apache Hudi基于hadoop兼容存储提供如下流原型
-
更新/删除记录
-
修改流
关键概念
Timeline(时间轴)
在其核心,Hudi维护了在不同时刻对表执行的所有操作的时间轴,这有助于提供表的瞬时视图,与此同时也有效地支持按到达顺序的数据检索。Hudi的瞬时特性包括以下部分:
-
瞬时动作:基于表的操作类型
-
瞬时时间:通常是一个时间戳(201901117010349),随着动作的执行时间单调递增
-
状态:顺势状态
Hudi保证基于时间轴的动作是原子性的以及基于瞬时时间的时间轴一致性。
主要动作包括:
-
COMMITS – 原子写操作一批数据到表中
-
CLEANS – 清除后台处于活动状态并且是不被需要的的表的旧版本文件
-
DELTA_COMMIT – 原子写一批书到MergeOnRead类型的表中,也就是收这些数据仅仅写入到delta logs中。
-
COMPACTION – 在后台合并Hudi不同的数据结构,比如说把基于行的日志文件更新到列格式的文件中。在Hudi内部,压缩显示为时间轴上的一个特殊提交
-
ROLLBACK – 表示commit/delta提交不成功并回滚,删除在写过程中产生的任何部分文件
-
SAVEPOINT – 将某些文件组标记为“已保存”,这样cleaner就不会删除它们。在灾难/数据恢复场景下,它有助于将表恢复到时间轴上的某个点。
状态包括:
-
REQUESTED – 表示有动作被列入执行计划中,但是并未被执行
-
INFLIGHT – 表示当前动作正在被执行
-
COMPLETED – 表示在timeline上动作执行完成
上面的例子显示了在10:00到10:20之间在Hudi表上发生的upserts,大约每5分钟,在Hudi时间轴上留下提交元数据,以及其他后台清理/压缩。 需要做的一个关键观察是,提交时间指示数据的到达时间(10:20AM),而实际数据组织反映实际时间或事件时间,数据的目的是(从07:00开始的每小时桶)。 在权衡延迟和数据完整性时,这是两个关键概念。
当有延迟到达的数据(原定为9:00到达的数据在10:20到达,晚了1个多小时)时,我们可以看到upsert将新数据更新插入到更旧的时间桶/文件夹中。 在时间轴的帮助下,尝试获取从10:00小时以来成功提交的所有新数据的增量查询,能够非常有效地只使用更改的文件,而不必扫描所有时间桶> 07:00的数据。
File management
Hudi将表组织到DFS的根路径下的目录结构中。 表被分成多个分区,分区是包含该分区数据文件的文件夹,非常类似于Hive表。 每个分区由它的partitionpath惟一标识,partitionpath是相对于根路径的。
在每个分区中,文件被组织成文件组,由文件id唯一标识。 每个文件组包含几个文件片,其中每个片包含在某个提交/压缩即时时间生成的基本文件(.parquet),以及一组日志文件(.log.*),这些日志文件包含自基本文件生成以来对基本文件的插入/更新。 Hudi采用MVCC设计,其中压缩操作合并日志和基本文件生成新的文件片,清理操作清除未使用/旧的文件片,回收DFS上的空间。
Index
Hudi提供了高效的upserts,通过索引机制将给定的hoodie键(记录键+分区路径)一致地映射到一个文件id。 记录键和文件组/文件id之间的映射,在记录的第一个版本被写入文件后不会改变。 简言之,映射文件组包含一组记录的所有版本。
Table Types & Queries
Hudi表类型定义了如何在DFS上索引和布局数据,以及如何在这样的组织上实现上述基本单元和时间轴活动(即数据是如何写入的)。 反过来,查询类型定义如何将底层数据暴露给查询(即如何读取数据)。
Table Type | Supported Query types |
---|---|
Copy On Write | Snapshot Queries + Incremental Queries |
Merge On Read | Snapshot Queries + Incremental Queries + Read Optimized Queries |
Table Types
Hudi支持以下表类型:
-
Copy on write: 仅使用列式文件格式(如parquet)存储数据。 通过在写入期间执行同步合并,简单地更新版本和重写文件。
-
Merge on read:使用基于列(如parquet)+基于行(如avro)的文件格式的组合存储数据。 更新被记录到增量文件中(基于行),然后被压缩以同步或异步地生成新版本的列式文件。
下表总结了这两种表类型之间的权衡:
Query types
Hudi支持如下查询类型:
-
快照查询:查询查看给定提交或压缩操作时表的最新快照。 对于读表上的merge,它通过动态合并最新文件片的基文件和增量文件来获取接近实时的数据(几分钟)。 对于写表上的复制,它提供了现有parquet表的临时替代,同时提供了插入/删除和其他写侧功能。
-
增量查询:根据给定的提交/压缩,查询只要查询写入表的新数据。 这有效地提供了更改流来支持增量数据管道。
-
读优化查询:查询给定提交/压缩操作时的表的最新快照。 仅公开最新文件片中的基/列文件,并保证与非hudi列表相比具有相同的列查询性能。
下表展示了不同查询之间的权衡:
Copy On Write Table
Copy-On-Write表中的文件片只包含基/列式文件,并且每次提交都会生成新版本的基文件。换句话说,我们隐式地压缩了每个提交,这样只存在列数据。因此,写放大(输入数据的1个字节所写的字节数)要高得多,而读放大为零。这是分析工作负载非常需要的属性,分析工作负载主要是读负载。
下面演示了从概念上讲,当数据写入copy-on-write表并在其上运行两个查询时,这是如何工作的。
当写入数据时,对现有文件组的更新将为该文件组生成一个带有提交瞬时时间的新片,同时插入分配一个新文件组并为该文件组写入它的第一个片。这些文件片和它们的提交时间在上面用颜色编码。针对这样一个表运行的SQL查询(例如:select count(*)计算该分区中的总记录),首先检查最近提交的时间轴,然后过滤每个文件组中除最近的文件片以外的所有文件片。如您所见,旧查询没有看到当前用粉红色编码的正在提交的文件,但在提交后开始的新查询将获得新数据。因此,查询不受任何写失败/部分写的影响,只在已提交的数据上运行。
写表复制的目的,是从根本上改进目前表的管理方式
-
支持在文件级原子更新数据,而不是重写整个表/分区
-
量地消费更改,而不是浪费的扫描或启发式搜索
-
制文件大小以保持优异的查询性能(小文件会极大地影响查询性能)。
Merge On Read Table
读表合并是写表复制的超集,在某种意义上,它仍然支持对表进行读优化的查询,方法是只查询最新文件片中的基/列文件。此外,它将每个文件组传入的upserts存储到基于行的增量日志中,以便在查询期间动态地将增量日志应用到每个文件id的最新版本中,从而支持快照查询。因此,这种表类型试图智能地平衡读和写放大,以提供接近实时的数据。这里最重要的变化是压缩器,它现在仔细选择需要将哪些增量日志文件压缩到它们的列式基文件中,以保持查询性能(较大的增量日志文件在查询端合并数据时会导致更长的合并时间)
下面演示了该表的工作原理,并展示了两种查询类型——快照查询和读取优化查询。
在这个示例中发生了许多有趣的事情,这些事情揭示了该方法的微妙之处。
-
在大约每1分钟提交一次,这在其他表类型中是做不到的。
-
文件id组中,现在有一个增量日志文件,它在基础列文件中记录更新。在这个示例中,增量日志文件保存了从10:05到10:10的所有数据。与之前一样,基本列文件仍然使用提交进行版本控制。因此,如果只看基本文件,那么表布局看起来就像写表的副本。
-
压缩过程将从增量日志中协调这些更改,并生成一个新版本的基本文件,就像在示例中10:05发生的事情一样。
-
同的底层表有两种方法:读优化查询和快照查询,这取决于我们选择的是查询性能还是数据的新鲜度。
-
优化查询,何时提交的数据可用的语义会以一种微妙的方式改变。注意,这种在10:10运行的查询不会看到上面10:05之后的数据,而快照查询总是看到最新的数据。
-
触发压缩时,决定压缩的是什么,这是解决这些难题的关键。通过实现压缩策略,将最新的分区与旧的分区进行比较,我们可以确保读优化查询以一致的方式查看X分钟内发布的数据。
对读表进行合并的目的是直接在DFS上进行接近实时的处理,而不是将数据复制到可能无法处理数据量的专门系统。这个表还有一些次要的好处,比如通过避免数据的同步合并减少了写的放大,也就是说,在批处理中每1字节的数据写入的数据量。
Writing(写操作)
在hudi中的写操作分为三种,分别是upsert、insert以及bulk-insert。
-
upsert:是默认的写操作,通过查找索引,输入记录首先被标记为插入或者更新,并最终在运行启发式操作后写入记录,以确定如何最好地将他们打包到存储上,以优化诸如文件大小之类的事情。这个操作推荐用于数据库更改捕获这样的用例,因为输入几乎肯定包含更新。
-
insert:这个操作在启发式/文件大小方面与upsert非常相似,但完全跳过了索引查找步骤。 因此,对于日志重复删除之类的用例,它可能比upserts快得多(结合下面提到的过滤重复项的选项)。 这也适用于数据集可以容忍重复,但只需要Hudi的事务性写/增量拉取/存储管理功能的用例。
-
bulk insert:upsert和insert操作都将输入记录保存在内存中,以加快存储启发式计算的速度(以及其他一些事情),因此对于最初加载/引导一个Hudi数据集可能会很麻烦。 bulk insert提供了与insert相同的语义,同时实现了基于排序的数据写入算法,该算法可以很好地扩展到几百tb的初始负载。 然而,与像insert /upserts那样保证文件大小相比,这只是在调整文件大小方面做了最大的努力。
Compaction(压缩)
压缩(compaction)是hudi本身的一个操作,用于合并日志文件片,生成一个新的压缩文件。压缩只适用于MOR类型的表,且什么样的文件片被压缩是在写操作之后由压缩算法决定的(默认是选择具有最大未压缩日志文件大小的文件片)
从高层次来说,有两种类型的压缩方法,一种是同步的,另一种则是异步的。
-
Synchronous compaction:在这里,压缩是由写入进程本身在每次写入之后同步执行的,也就是说,只有压缩完成,下一个写入操作才能开始。 就操作而言,这是最简单的,因为不需要调度单独的压缩过程,但数据新鲜度保证较低。 然而,这种风格在某些情况下仍然非常有用,比如可以在每次写操作时压缩最新的表分区,同时延迟对晚到/老的分区的压缩。
-
Asynchronous compaction:在这种风格中,压缩过程可以通过写操作并发和异步运行。 这样做的明显好处是压缩不会阻塞下一批写操作,从而产生接近实时的数据新鲜度。 像Hudi DeltaStreamer这样的工具支持一种方便的连续模式,在这种模式下,压缩和写入操作以这种方式在单个spark运行时集群中进行。
Cleaning(清理)
清理(cleaning)是hudi本身的一个操作,用于删除旧的文件片,以及限制表空间的增长,清理操作在每次写操作之后自动被执行。同时利用缓存在timelineserver上的timeline metadata来防止扫描整个表。
清理操作支持如下两种方式:
-
Clean by commits/deltacommits:这是增量查询中最常见和必须使用的模式。 在这种风格中,cleaner保留了在最近N次commits/delta commits中写入的所有文件片,从而有效地提供了跨这些操作增量查询任何范围的能力。 虽然这对于增量查询很有用,但在一些高写工作负载上可能需要更大的存储空间,因为它为配置范围保留了所有版本的文件片。
-
Clean by file-slices retained:这是一种更简单的清理风格,我们只保留每个文件组中的最后N个文件片。 像Apache hive这样的查询引擎处理非常大的查询,可能需要几个小时才能完成,在这种情况下,需要将N设置足够大,这样才能够防止需要查询的文件片被删除。
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/1957/