Shuttle [1]是 OPPO 大数据团队自研的高可用高性能的 Spark Remote Shuffle Service,2022年4月份已经对外开源 [2],自上线后一直稳定运行,显著提升了大数据任务计算效率和稳定性。并且在六月份实现了分布式内存对 Shuffle 的加速 [3]。
但,Shuttle 的技术使命不仅仅作为一个 Spark 的 RSS 中间服务,Shuttle 作为大数据计算推进器,需要在更广的计算范畴提供加速能力,为此我们推出了 Shuttle2.0。
Shuttle2.0 具体有哪些新功能,下面进行详细介绍。
-
数据流上游推送给 Shuttle,怎么将数据推到下游
-
Flink 上下游不仅同步数据,还要同步广播数据和 Event,如何保障顺序性和正确性
-
Flink 流式数据通过 Network shuffle,效率较高,接入 Shuttle 如何保障高效
图1:Flink Shuffle on Shuttle两种方案
方案2 中 Shuttle Worker 只负责数据聚合落盘,但是如何向下游 Reduce 任务推送数据?我们在数据 Reader 中做了一下数据中转,从分布式存储中读取数据,并推送给 Reduce 任务。Reader 作为 Reduce 任务的插件,绑定 Reduce 任务。这种方式降低 Shuttle Worker 进程发送数据压力,同时,减少数据拷贝次数,稳定性和效率更优,对比之后,我们选用方案2解决数据推动问题。
Flink 原生 shuffle 的数据类型分为分区数据,广播数据和广播事件,数据通过 Channel 向下游传递,因此,数据不同数据类型之间的顺序是有保障的,如图2所示。
图2:Flink Shuffle 数据和广播流模型
Shuttle 接入 Flink 数据 Shuffle,将不同的 Map 数据按分区聚合,方便下游任务顺序读取。分区数据按照分区聚合,广播数据和广播事件单独聚合,并且这两种数据单独发给一个 Shuttle Worker。
在 Reduce 端读取数据过程中,需要保障分区数据、广播数据和广播事件的顺序性,由于数据按照分区聚合,广播对所有下游任务都有影响,所以我们在 Map 发送数据包的时候增加了序号。数据包的序号与广播数据和广播事件包的序号保持单增,下游任务读取数据的时候,按照序号读取即可保障数据正确性。
图3:Flink Shuffle on Shuttle数据流模型
图5:分布式排序数据流程图
1. ShuffleWorker 将分区数据按照固定大小切分写入分布式内存缓存
2. ShuffleSorter 将缓存中的未排序的数据块排序
后面的功能测试均基于该环境进行。
进一步对比,前面分析,在任务数据量比较大的场景,这种单点排序性能会成为瓶颈。那么,最能体现这种方式优势的任务是在大数据量的 SortMergeJoin 的任务上,为此,我们专门测试对比了一个这种类型的任务:任务 Input 数据量、Shuffle 数据量都在1TB左右,3表 Join 的任务,测试结果见表2:
Shuttle 分布式排序性能对比原生 Spark 提升36%,对比 Shuttle 的 RSS 提升23%,性能提升的主要原因是在 ShuffleRead 阶段磁盘溢写数据量降低为0。
Spark3.x 版本引入的 AQE 中,支持动态决策执行 BroadcastJoin 或 SortMergeJoin。但是,线上任务依然会出现因为 BroadcastJoin 导致的 OOM 错误。经过分析,我们认为 Spark 的 BroadcastJoin 存在以下几个问题:
b. 不应该广播的,进行广播,导致OOM任务失败
图6:Spark AQE决策BroadcastJoin流程
我们结合 Shuttle 提出了自己的 Adaptive BroadcastJoin 解决方案,先看主要的流程:
由图7,广播数据不再发送到 Driver 端,而是落到 Shuttle上,降低 Driver进程的压力,彻底解决 Driver 因为广播数据过大导致的 OOM 失败。Driver 通过上游真实输出数据决策是执行BroadcastJoin还是 HashJoin,避免预测数据大小导致的误判。
注意:这里用了 HashJoin 代替了 SortMergeJoin,这又是什么优化逻辑?
HashJoin 比 SortMergeJoin 少了在 Reduce 端的排序过程,效率更高,但是,要求小表数据的分区数据全部加载到内存,数据量过大容易造成 OOM。那这里为啥敢用这个 HashJoin,不怕 OOM 吗?不怕。小表数据发送到 Shuttle 的数据本身不大,同时,发送给 Shuttle 的过程中按照 HashJoin 的分区器做了分区,那么单个分区的数据就更小。最后,在 Driver决策的时候有真实数据和 Executor 内存大小作为判断依据,不会导致 OOM。
数据流程架构如图8所示:
图8:Adaptive Broadcast数据流程示意图
注:图8中的圈1是指 HashJoin 的模式,2是指 Broadcast 模式,3是 Subquery Join 的模式
测试任务使用简单的两个表 join,查询条件相同,表大小规模不同。
表3:Shuttle2.0 Adaptive Broadcast 性能对比
第一组对比测试,就 Spark 原生广播已经报 OOM 错,所以,后面的对比测试只对比 Shuttle2.0 广播和 Spark 原生的 SortMergeJoin 的性能。从结果来看,主表越大,Shuttle2.0 的广播带来的性能提升越明显。我们统计了线上任务分布,适合用 Shuttle2.0 广播加速的任务占总任务的15%左右,但是,这些任务都是大任务,总成本占总体成本的80%。
Shuttle 未来将朝着作为一个大数据计算引擎辅助引擎的方向发展,借助数据中转流通,加速相关的算子。同时,将扩展到不同的计算引擎,包括机器学习训练引擎,作为一个统一的高效数据流转平台。
David Fu OPPO大数据计算平台架构师 负责大数据计算平台技术演进设计开发,曾供职于阿里云,去哪儿网大数据平台,拥有10年大数据架构,开发经验。
OPPO 安第斯智能云(AndesBrain)是服务个人、家庭与开发者的泛终端智能云,致力于“让终端更智能”。作为 OPPO 三大核心技术之一,安第斯智能云提供端云协同的数据存储与智能计算服务,是万物互融的“数智大脑”。
原文始发于微信公众号(安第斯智能云):Shuttle2.0: 大数据计算引擎推进器
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论