6月 26 号,由示说网主办,上海白玉兰开源开放研究院、云启资本、开源社联合主办的上海开源大数据技术 Meetup 如期举行。Apache Doris 社区受邀参与本次 Meetup ,来自韵达科技的 高级研发工程师张浩 以及 数据模型工程师郭文杰 为大家带来了题为“ Apache Doris 在韵达物流领域的应用实践 ”的主题分享,主要介绍了韵达科技的业务背景、平台需求与选型、基于 Apache Doris 的实时数仓设计、以及 Doris 在韵达业务场景中的实际应用,以下是分享内容。
张浩:
非常荣幸今天能在 Meetup 给大家分享一下, Apache Doris 在韵达的实际应用。这是今天演讲的几个方面,然后我给大家带来前两个小节的分享,后面两个小节由我的同事来分享一下。
韵达简介
首先来介绍一下韵达科技。下面这张图,是最近 10 年中国快递行业的总数据,在刚过去的 2020 年,全国的数据总量、快递的票件量一年达到了 830 亿件,总收入达到了 8750 亿。过去 10 年每年增长都在 30% 以上,可以看出来增速是非常快的。拿韵达来说,在 2010 年,也就是 10年前,双 11 单日的票件量是在百万级,而在去年的双 11 ,单日的票件量已经达到了亿级以上。10 年的时间票件量达到了百倍以上的增长,但是数据量可不仅仅是百倍,因为有了更多的维度,更多的指标,所以数据量呈几百倍的增长。
面对如此大的数据量,我们不仅有票件数据,还有智能硬件、比如说车辆摄像头等一些监控方面的数据,我们要对这些数据进行处理,以便公司做出管理决策。所以我们韵达建立了以数据中台和业务中台为基础的平台,对公司内部的这十几个系统提供了统一的数据服务。
多样化的数据服务需求
韵达不仅有大家熟悉的韵达快递,还有包括韵达快运、韵达供应链、韵达国际和韵达科技在内,一共五大板块。
平台需求与选型
01 平台需求
既然有这么大的数据量,对于平台首先要满足以下几个需求:
-
交互式分析与简单易用
-
性能与复杂分析支持
-
便于运维
-
接口与生态丰富
因为有大量的数据,我们需要有一个交互的分析平台,以便业务人员执行一些 Adhoc 去查询不同的数据,还要支持通用的 SQL 语法,降低学习成本,并且需要查询快捷。
我们每天的数据量达到了百亿级以上,不仅要求它性能要快,还要有票单量的明细级多表,去 Join 亿级的多个票单量维表。再就是需要多表的票单量精确去重,并保证数据的准确性。
还有一个是运维方面,我们要把运维的时间成本降低,把精力放在业务开发上。同时平台还要支持多种报表的导出和多种数据源的导入。
02 系统选型
我们对比了市面上常见的几种 OLAP 引擎,根据我们的需求,比如票单量需要明细级的查询、业务数据需要支持点更新、高并发以及需要大数据量的 Join ,我们发现, Doris 可以比较好地满足我们的应用场景。
OLAP技术选型
综上,我们选择了 Doris,并简单总结了以下几点原因:
运维方面
Doris 的运维非常方便,只有 FE 和 BE 两种角色;其次,它的扩容也非常快捷,只需要把 FE 或者 BE 的 binary 分发到新的节点上,然后在 FE 执行一个 SQL (只需要一条 SQL 就可以完成扩容),扩容之后会把旧的 BE 的数据自动发到新的 BE 上, Rebalance 也都是自动的;然后,它的滚动升级也很方便,做升级的时候只需要先任意挑选一个 BE ,在 BE 上把新的 BE 的 Binary 替换进去,测试数据可以正常读写,就可以把其他所有的 BE 都可以直接替换掉,再逐个去重启每一个 BE ,重启完 BE 再重启 FE ,替换完就可以在不影响业务的情况下完成集群的升级。
性能方面
它的性能也完全满足我们的业务需求,不会产生中断的情况。此外它运用的是列式存储,即按需去读取所需要的数据,而不需要把每一行所有的数据都读出来。
易用性方面
易用性方面,它支持标准的 SQL 语法,所以业务人员不需要学习新的 SQL 语法,就可以在 Doris 上查询数据。同时还支持动态分区,比如我们一些业务保留最近7天,需要经常查询,就把最近7天的数据放在 SSD 里。比如说我们要查一些历史数据,只需要保留一个月,保留一个月我们就会把大于7天小于等于30天的数据放在 HDD 里,这样就可以降低我们的存储成本。比如有些业务的数据只需要保留一个月,大于一个月的之后要删除,动态分区这里也可以设置只保留30天的数据,30天后自动删除,不需要写调度脚本就可以删掉,导入导出也很方便。
03 系统介绍
下图是 Doris 的架构图, Doris 只有 FE 和 BE 两种角色,对外只需要一个通用的 MySQL客户端,即可直接连接 BE 进行查询。FE 负责解析生成和调度查询计划, BE 负责执行查询计划和数据的存储,任何节点都可以做线性扩展。
下图是 Doris 三种数据模型,第一个是 Aggregate Key 模型,可以对 Doris 的数据进行预聚合,相同 Key 列的数据可以指定不同方式去聚合。 Unique Key 模型是一种特殊的 Aggregate Key 模型,相当于是所有的 Value 列都用 Replace 的一个聚合方式,新导入的数据会完全替换之前的数据。而 Duplicate Key 模型 就是一个没有主键的模型,不做聚合,保留全部明细数据。
下图是 Doris 的数据导入方式,可以看到它有多种导入方式,我们在业务上用的是左边这几种。
第一个是 Broker Load , Broker Load 是用于导入一些离线的大文件,像我们应用在 HDFS 和 Hive 的数据,就用 Broker Load 来导入。
Routine Load 是一个用SQL语法就可以直接进行的导入方式,用于把 Kafka 的数据直接导入到 Doris 里面。还有一个最常用的是 Stream Load ,Stream Load 通过 HTTP 的 put 请求就可以把数据放导入到 Doris ,它不仅支持离线的数据,也支持实时导入,我们是结合 Flink 去自定义的 sink 去导入数据。还有就是 ODBC , ODBC 主要用于导入 MySQL 或者 Oracle 、 PostgreSQL 等数据库的数据,只要支持 ODBC 都可以连接。最后就是 Insert Into 在 Doris 内部去跑一些调度,我们结合 DolphinSchedule 在 Doris 跑一些批量的处理。
基于 Doris 的实时数仓设计
这是我们基于 Doris 的一个简化的系统架构图。
我们看到左边的数据从 Flume 或者 Canal 导入到 Kafka 之后,实时数据在 Kafka 里不需要经过复杂处理,直接用一个简单的 SQL 就可以导入到 Doris 。一些业务逻辑比较复杂的,通过 Flink 去处理,再用 Redis 或 PIKA 做缓存,处理完之后再用 Stream Load 的方式导入到 Doris ;离线数据是用 Broker Load 把数据直接从 HDFS 导入到 Doris ,之后再通过用 SQL 方式,用 DolphinSchedule 去跑批处理一些复杂业务,最后再汇总起来成为汇总表。
在业务层,我们会直接对接数据服务大厅,由数据服务大厅去对接BI工具和前端报表。而对于一些相对固定的数据,我们就直接输出到前端报表中。对于Adhoc的数据,我们提供BI工具供业务人员查询。
以下是我们使用 Doris 中遇到的一些问题。
导入 Kakfa 的数据之前用的是 Routine Load,会发现有一些数据用 Routine Load 会有卡死的情况——任务是 Running 状态,但是发现 offset 不会再继续提交,任务会卡死。我们做了一个自定义的 Routine Load 监控,去实时读取它的 offset ,如果发现了 offset 不再增长之后,就会手动通过监控系统去执行停掉 Routine Load 的任务,并重新启动 Routine Load, offset 用之前的,这样既能保证数据不丢,也不会重复,同时恢复 Routine Load。
关于 Routine Load 卡住以及 Compaction 的问题,Doris 社区已经在新版本中进行和修复和优化,后续我们也会尝试进行版本升级来解决这些问题。
还有就是我们最常用的 Stream Load ,我们自定义了 Flink 的 sink ,这个操作也比较简单,就直接用 http 把数据导入到 Doris 。我们之前最开始用的时候,发现 Doris BE 的节点会有经常挂掉的情况,经过我们分析 log 和百度 Doris 团队的支持,发现是由于我们导入的数据量大、事务数比较多,导致 Doris 这边 compaction 执行不动,会有数据的积压。
我们可以看到上图是修改之前,下图是修改之后。修改之前, Doris 的 compaction score 会达到300左右,修改之后只有50左右。百度 Doris 团队给我们指出调整执行 compaction 的线程数,增大了一倍之后,可以看到效果是立竿见影,从300直接降到了50。
在刚刚过去的618活动, Doris 也是平稳的运行,在此也感谢一下百度 Doris 团队 在我们使用过程当中提供的帮助。
好,下面请我的同事文杰同学给大家介绍一下业务方面的使用,谢谢。
郭文杰:
各位朋友大家好,我是韵达的数据模型工程师,下面由我来给大家分享一下 Doris 在韵达的业务场景,以及我们是怎么用 Doris 去对票件的物流环节进行运营的监控以及分析的。
业务场景
韵达作为一家快递公司,其物流服务质量是它在行业中的立身之本,那么分拨中心作为在物流环节中最为重要的一环,公司需要从数据层面对分拨中心的操作效率进行监控,及时找到它在操作中出现的问题,进行纠正,从而提升我们的物流服务质量。
我接下来要介绍的数据模型就是基于分拨中心的数据。这个数据的特点是:第一,它有海量的数据,主体是扫描数据,然后它会和别的明细层级的数据进行 Join。第二,时效性要求高,要求实时准实时,而这也是现在各行业的一个趋势。第三,提供秒级的接口响应,服务于我们的各个应用等。最后第四点,数据的准确性要保障:作为一个实时的数据,它和离线的数据误差不能超过 5%。
业务痛点方面,除了刚刚介绍的几点之外,我另外要说的就是业务口径的一致性。我们也是大概半年前才开始做实时数仓,所以从一开始我们就希望可以统一我们的各个业务口径,为未来做规划。
下面介绍几个 Doris 的特色功能,以及我们是如何去应用它的。
01 Aggregate 聚合模型
第一个是 Aggregate 聚合模型,这几种聚合类型是我们在建表的时候就可以使用的。
-
SUM 可以帮助我们去做票件量的统计;
-
MIN 和 MAX 用于取得票证最早最晚进出站时间;
-
Replace 方法在我们的业务场景下不太适用,因为我们经常会去做单个字段的一个更新,如果用 Replace 的话,这个字段更新了会把别的字段变成空的,这个是不能满足我们的需要的;
-
用 Replace if not null 的方法,就可以对空值不会把它更新到数据里,所以我们一般方法是取数据的最后一条记录;
-
bitmap_union 方法是用来做一个去重的统计。
通过聚合模型,我们实现了把 ODS 的数据写到 Doris 里面,它就会自动变成 DWD 层票件层级的一个聚合数据,或者是 DWS层 的一个汇总的数据。
02 物化视图和 BITMAP
那么物化视图帮助我们解决了什么?
物化视图,帮助我们实现了汇总数据和基表数据的实时统一。当我们在查询的时候,命中物化视图可以帮助我们提升效率,所以我们在建一张多维的汇总表的时候,我们会给它做一些物化视图,我们只要在查询的时候时候去命中这些字段,就可以提升效率。除此之外, BITMAP 方法可以帮助我们做一个 0~2 的 64 次方减一这个整形范围内的精确去重;现在我们的票件量一般是13位,未来就算扩容了票件量的位数也一样,可以满足我们的需求。
而 BITMAP 方法相比 distinct 方法,它的计算效率也更高。
03 实际应用
下图是我们中心效能模型的架构图,它目前分为上下两层,上层是实时的数据,主要是从 Kafka 过来的明细数据,通过 Flink 进行预处理,然后进到 DWD 层聚合成明细层级的一个数据,也可以直接变成 DWS 的表,去提供实时的一张汇总表。这里面主要是一些效率的统计或者状态的统计,然后这种表只要在加上物化视图就可以直接对外提供接口。
下层是离线的数据,主要是一些配置表的信息,来自 MySQL 或者 Hive ,通过 Load 的方式导进来。这上面实时的数据里面,比较核心的就是这张中心进出站明细的实时,这张明细表会和别的明细表进行 Join ,再和下面的这些配置表进行复杂逻辑的计算,最后再写到这样下面的中心进出站明细的离线表,这张离线表的主体其实就是上面实施表的数据,只是比它会增加更多的一些字段,满足我们后面的各种报表的需要。
因为这是模型里面最核心的一张表,所以只要通过这张表就可以获取所有的数据,比如遗失、时效延误、以及落货的数据,进而实现了数据的一致性。接下来,这些明细数据再进行汇总,对外提供一个汇总的数据,而报表也会按需要根据汇总数据下降到明细数据,再从这个里面提取。
下图是部分的实时中心进出站明细表的结构,它的组件是进站日期、分拨中心还有运单号。只要是相同组件的数据写进来,就会对这张实时表进行更新。比如我们可以更新最早最晚的进出站时间,还可以看到票件进来的时候,是小件单独进行扫描的,还是小件跟着大包进行扫描的;它可以实时更新预测的目的地,然后通过 Flink 把本站点的扫描数据附带上上一站点的扫描数据,就可以更新下一站。
中心进出站模型
此外,实施表跟会用会进行分钟级的跑批,跑的是最近更新的一些数据,然后这些数据去和别的明细表、配置表进行复杂逻辑 Join 最后写到离线表里。
这是 DWS 层的一张实时的表,用于统计一段时间内一个中心月台员工操作的票件量,票件的重量,以及计算它最后的效率。
这边主要说两点,第一点是用 BITMAP_UNION 去做一个30秒的时间切片,因为借助 BITMAP_UNION 我们可以对数据进行去重,不需要再额外在外面用 Group by 去计算实际有效操作时间,而是直接用 BITMAP_UNION 当中 BITMAP count 的方法,即可汇总出它的实际操作时间。
第二点,我们要计算一个去重的票件量,但是我们没有去用 BITMAP_UNION ,是因为如果用 BITMAP_UNION 这个方法进行去重,这个数据是会写进来的,就会对别的字段进行影响。比如这边有一个操作件重,那么如果票件量用 BITMAP_UNION 去做,那么操作件重就会有重复的数据,这是不可以的。所以说在这种情况下,我们还是需要在外部系统写 Doris 之前,用外部系统对这种组件重复的票件进行过滤,然后再写到 Doris 里面。
这是一张中心实时状态的一张表,用于统计月台的实际运营情况。这张表每更新一条数据,就会去更新某一个月台的数据,比如一个车辆的发车凭证;通过这张表我们就可以去驱动下面数字看板的 3D 模型,月台只要数据被更新,模型中月台就会发生变化。然后通过这张表在配合一些分钟级的跑批,比如实时交叉带的效率、车辆的进度、员工的效率等等,就构成了整个看板。
应用现状与收益
现在 Doris 主要是服务于韵达的分拨的数据,满足了 90% 的分拨中心核心指标,为多个部门的数据产品提供数据,保证了我们集团数据的一致性。我们现在还有更多的离线的报表想往 Doris 上进行迁移。现在每天在 Doris 里面会增加亿级的数据量,这个数据其实是聚合后的数据,实际的数据是远大于此的,几乎到百亿级的数据量。同时这也体现了 Doris 的聚合帮助我们减少了存储的成本,并且现在在 Doris 上跑的调度数和指标数都有 100 多个。
Doris 在韵达的应用现状
相同的业务我们用 Doris、 Greenplum 还有 Kudu 都去做过对比。在相同的业务场景下,Doris 的耗时是 Kudu 的1/2,是 GP 的1/10。
在开发的时候,我们相同的字段 GP、Kudu 和 Doris 都是怎么做的?比如说是 GP 的话,我们要取一个最早进站的时间,首先因为它是数仓,所以会有大量的数据,意味着所有的数据都会存在 GP 里面。
那么当我们要取最早的进站时间的话,毫无疑问会用一个开窗函数的方法取第一条,或者说我们用 Group by 去取一个 MIN 值;那么如果我们有几十个这样的字段去看这个票件的话,就会做几十次的这种这种操作,然后再 Join 到我们的主表上。这个操作首先非常繁琐,计算的时间也非常长,同时因为 GP 把所有数据都存进来了,所以它数据量也大。另一边,当我们使用 Kudu 的时候, Kudu 是有 upset 的功能,所以它里面存的数据会少一点,但是在我们做最早进站时间的时候,会需要外部系统去帮我们去进行统计,因为 Kudu 里面没有聚合模型,所以我们会再用 Flink 和 Redis 结合的方法去计算最早进站时间,然后等所有的资料算好了之后,再一起写进 Kudu里。
然而 Doris 就不一样了,我们只要一开始就建这么一个表,设置好聚合类型,接下来就只要不停的往里写就可以,因为数据写进去之后就变成了我们想要的样子。
开发难度对比
通过 Doris 降低了开发难度,同时也帮助我们减少了开发周期。同样的任务放在 Kudu 上,我们需要5天去做,在 Doris 上可能2天就完成了,同时也释放了我们外部的资源,比如 Flink 和 Redis 的计算资源。
未来展望
在未来,我们希望让 Doris 可以服务于韵达更多的业务,使更多的离线指标实时化。因为我们现在使用了 Doris 大半年的时间,其实也并没有很长,我们还希望可以通过优化模型调参等等方法去优化 Doris 集群的性能,也想去继续探索别的大数据组件和 Doris 应用的可能性。最后,我们也希望可以为 Doris 社区贡献我们的力量。
未来展望
本文转载自 张浩、郭文杰@ApacheDoris,原文链接:https://mp.weixin.qq.com/s/Z_PhWk92ctZ7slz4SrVZ9Q。