Shuttle2.0: 大数据计算引擎推进器

admin 2023年4月27日20:49:35评论56 views字数 5073阅读16分54秒阅读模式

Shuttle2.0: 大数据计算引擎推进器

PART

00
前言

Shuttle [1]是 OPPO 大数据团队自研的高可用高性能的 Spark Remote Shuffle Service,2022年4月份已经对外开源 [2],自上线后一直稳定运行,显著提升了大数据任务计算效率和稳定性。并且在六月份实现了分布式内存对 Shuffle 的加速 [3]

但,Shuttle 的技术使命不仅仅作为一个 Spark 的 RSS 中间服务,Shuttle 作为大数据计算推进器,需要在更广的计算范畴提供加速能力,为此我们推出了 Shuttle2.0。

Shuttle2.0 具体有哪些新功能,下面进行详细介绍。

PART

01
兼容Flink引擎
目前 OPPO 大数据主流的计算引擎有 Spark,Flink 分别承担着离线批任务,实时任务。数据 Shuffle 对计算引擎的性能和稳定性影响是每一个分布式计算引擎都要面临的难题,Spark 引擎已经通过 RSS 方案比较好地解决了,但这些问题在 Flink 引擎上依然存在。因此,我们尝试将移植到  Flink 引擎,但 Flink 的 shuffle 模式与 Spark 有本质的区别,Spark Shuffle为 Pull 模式,而 Flink 引擎的 Shuffle 则为 Push 模式,即使 Flink 的批处理任务 Shuffle 过程也是 Push 模式。
Flink 引擎执行的 Pipeline based 模式与 Spark 引擎的 Stage based模式区别较大,主要难点如下:
Flink 在1.14版本实现的 SortShuffle,同时留出来 Shuffle Service 的接口,但是,将一个 Pull based RSS对接到 Flink 上, 存在下面几个问题:
  • 数据流上游推送给 Shuttle,怎么将数据推到下游

  • Flink 上下游不仅同步数据,还要同步广播数据和 Event,如何保障顺序性和正确性

  • Flink 流式数据通过 Network shuffle,效率较高,接入 Shuttle 如何保障高效


Flink 接入 Shuttle 模式
Shuttle2.0: 大数据计算引擎推进器

图1:Flink Shuffle on Shuttle两种方案

Shuttle 不提供数据存储,依赖外部分布式文件系统,如图1 所示,Flink Shuffle on Shuttle 的接入两种方案:
方案1中数据流转以 Shuttle Worker 为中心:
1. Shuttle Worker 接受上游Map任务推动数据
2. Shuttle Worker 将数据按分区聚合,落盘
3. Shuttle Worker 将数据从存储读取,推送到下游 Reduce 任务
这种方案对 Shuttle Worker 进程造成比较大的压力,同时,数据在存储和 Shuttle 直接多次拷贝,效率和稳定性不高。

方案2 中 Shuttle Worker 只负责数据聚合落盘,但是如何向下游 Reduce 任务推送数据?我们在数据 Reader 中做了一下数据中转,从分布式存储中读取数据,并推送给 Reduce 任务。Reader 作为 Reduce 任务的插件,绑定 Reduce 任务。这种方式降低 Shuttle Worker 进程发送数据压力,同时,减少数据拷贝次数,稳定性和效率更优,对比之后,我们选用方案2解决数据推动问题。


流批一体 Shuffle 数据模型

Flink 原生 shuffle 的数据类型分为分区数据,广播数据和广播事件,数据通过 Channel 向下游传递,因此,数据不同数据类型之间的顺序是有保障的,如图2所示。

Shuttle2.0: 大数据计算引擎推进器

 图2:Flink Shuffle 数据和广播流模型 

Shuttle 接入 Flink 数据 Shuffle,将不同的 Map 数据按分区聚合,方便下游任务顺序读取。分区数据按照分区聚合,广播数据和广播事件单独聚合,并且这两种数据单独发给一个 Shuttle Worker。

在 Reduce 端读取数据过程中,需要保障分区数据、广播数据和广播事件的顺序性,由于数据按照分区聚合,广播对所有下游任务都有影响,所以我们在 Map 发送数据包的时候增加了序号。数据包的序号与广播数据和广播事件包的序号保持单增,下游任务读取数据的时候,按照序号读取即可保障数据正确性。

数据聚合和顺序读取流程如图3所示:
Shuttle2.0: 大数据计算引擎推进器

图3:Flink Shuffle on Shuttle数据流模型

PART

02
分布式排序加速
SortMergeJoin 是 Spark SQL 最常用的一种 join 算子,SortMergeJoin 性能瓶颈主要有两点:Shuffle 和 Sort;Shuffle 的性能问题在之前的文章 [1]中做过详细介绍,通过 Shuttle 这种 Remote Shuffle Service 已经得到解决。
在 Reduce 任务拉取 Shuffle 数据后,在单点做排序,如果数据量过大,排好序的数据溢写到磁盘,多次溢写后进行 Merge。分析该过程,需要多次溢写磁盘,同时在对单点性能压力比较大。我们可以观察到线上这种大数据量的 SortMergeJoin 任务,Shuffle 后 Reduce 任务需要花费大量的时间进行排序。那么,如何解决这种单点排序的性能问题?
Shuttle2.0 的解决方案:将 Shuffle 过程和排序结合,将 Shuffle 和 Sort 从串行变成 Pipeline 执行形式,如图4所示;同时将单点排序扩展到分布式排序,解决单点排序多次溢写磁盘问题;

Shuttle2.0: 大数据计算引擎推进器

图4:Shuffle和Sort流水线化示意图
如何结合 Shuttle 将 Reduce 单点排序转换成分布式排序?Shuttle Worker 在收到上游 Map 任务发送的数据后,按照固定大小(默认128M)排序,然后将排好序的数据写入分布式内存文件系统(或者磁盘)。数据流程如图5所示:

Shuttle2.0: 大数据计算引擎推进器

图5:分布式排序数据流程图

图5关键点说明:

1. ShuffleWorker 将分区数据按照固定大小切分写入分布式内存缓存

2. ShuffleSorter 将缓存中的未排序的数据块排序

3. Reduce 任务使用多路归并模式顺序读取各个排好序的数据块,达到全局有序
以上,是基于 Shuttle 实现的 Spark 分布式加速 SortMergeJoin 的创新实现。在实现过程中,还有一个棘手的问题,ShuffleSorter 如果排序不及时怎么办?分布式内存容不下这么大数据量怎么办?
第一个问题:Reader 端实现兼容排序和非排序数据两种模式,所以,对于排序的数据量我们采用的是 Best Effort 方式。
第二个问题:分布式缓存系统容不下 shuffle 和 sort 数据后,新增数据直接写磁盘,磁盘数据不进行排序,避免多次磁盘 IO。

测试结果
测试环境:20台物理机,单机48core,384G内存,24块HDD硬盘
测试任务:数据量1T的TPCH测试集

后面的功能测试均基于该环境进行。

Shuttle2.0: 大数据计算引擎推进器
表1:Spark 原生 vs RSS vs Shuttle Sorter
TPCH 22个任务测试对比,Shuttle Sorter 比 Shuttle RSS 平均性能提升11.7%,比 Spark 原生(ESS)平均性能提升20.9%。

进一步对比,前面分析,在任务数据量比较大的场景,这种单点排序性能会成为瓶颈。那么,最能体现这种方式优势的任务是在大数据量的 SortMergeJoin 的任务上,为此,我们专门测试对比了一个这种类型的任务:任务 Input 数据量、Shuffle 数据量都在1TB左右,3表 Join 的任务,测试结果见表2:

Shuttle2.0: 大数据计算引擎推进器
表2:SortMergeJoin大任务性能对比

Shuttle 分布式排序性能对比原生 Spark 提升36%,对比 Shuttle 的 RSS 提升23%,性能提升的主要原因是在 ShuffleRead 阶段磁盘溢写数据量降低为0。

PART

03
Adaptive Broadcast
在 Spark 计算模式中,在大表与小表的 Join 计算过程中,BroadcastJoin 大表数据数据 Shuffle。BroadcastJoin 相比 SortMergeJoin 具有明显的性能和成本优势。其中主要原因是:尽可能少地移动数据,记住,这一点很重要,几乎是大数据计算提升性能的核心。

Spark3.x 版本引入的 AQE 中,支持动态决策执行 BroadcastJoin 或 SortMergeJoin。但是,线上任务依然会出现因为 BroadcastJoin 导致的 OOM 错误。经过分析,我们认为 Spark 的 BroadcastJoin 存在以下几个问题:

Driver 端可容纳数据有限,限制广播表数据量(默认10M),数据大会导致 Driver OOM
在上游存在过滤条件等复杂逻辑,导致数据预估不准确
数据预估不准,对于决策执行广播有两种问题:
a. 应该广播的,没有广播,导致计算效率损失

b. 不应该广播的,进行广播,导致OOM任务失败

Shuttle2.0: 大数据计算引擎推进器

图6:Spark AQE决策BroadcastJoin流程

如图6所示,当 AQE 估算数据量出现严重偏差(Scene2和3经常出现),可能导致下游执行过程中不必要的大表 Shuffle 或者 Driver 端 OOM,尤其是 OOM 导致任务失败,对线上任务稳定性挑战比较大。

我们结合 Shuttle 提出了自己的 Adaptive BroadcastJoin 解决方案,先看主要的流程:

Shuttle2.0: 大数据计算引擎推进器
图7:Adaptive BroadcastJoin On Shuttle

由图7,广播数据不再发送到 Driver 端,而是落到 Shuttle上,降低 Driver进程的压力,彻底解决 Driver 因为广播数据过大导致的 OOM 失败。Driver 通过上游真实输出数据决策是执行BroadcastJoin还是 HashJoin,避免预测数据大小导致的误判。

注意:这里用了 HashJoin 代替了 SortMergeJoin,这又是什么优化逻辑?

HashJoin 比 SortMergeJoin 少了在 Reduce 端的排序过程,效率更高,但是,要求小表数据的分区数据全部加载到内存,数据量过大容易造成 OOM。那这里为啥敢用这个 HashJoin,不怕 OOM 吗?不怕。小表数据发送到 Shuttle 的数据本身不大,同时,发送给 Shuttle 的过程中按照 HashJoin 的分区器做了分区,那么单个分区的数据就更小。最后,在 Driver决策的时候有真实数据和 Executor 内存大小作为判断依据,不会导致 OOM。

数据流程架构如图8所示:

Shuttle2.0: 大数据计算引擎推进器

图8:Adaptive Broadcast数据流程示意图

注:图8中的圈1是指 HashJoin 的模式,2是指 Broadcast 模式,3是 Subquery Join 的模式


测试结果
测试环境同上,我们测试不同规模的广播的性能:

测试任务使用简单的两个表 join,查询条件相同,表大小规模不同。

Shuttle2.0: 大数据计算引擎推进器

表3:Shuttle2.0 Adaptive Broadcast 性能对比

第一组对比测试,就 Spark 原生广播已经报 OOM 错,所以,后面的对比测试只对比 Shuttle2.0 广播和 Spark 原生的 SortMergeJoin 的性能。从结果来看,主表越大,Shuttle2.0 的广播带来的性能提升越明显。我们统计了线上任务分布,适合用 Shuttle2.0 广播加速的任务占总任务的15%左右,但是,这些任务都是大任务,总成本占总体成本的80%。

PART

04
总结与展望

Shuttle 未来将朝着作为一个大数据计算引擎辅助引擎的方向发展,借助数据中转流通,加速相关的算子。同时,将扩展到不同的计算引擎,包括机器学习训练引擎,作为一个统一的高效数据流转平台。


附录
[1] Shuttle:高可用 高性能 Spark Remote Shuffle Service  https://mp.weixin.qq.com/s/FMvKGvVYcxNG4dNOFQlF0g
[2] Shuttle GitHub:https://github.com/cubefs/shuttle
[3] Shuttle + Alluxio 加速内存Shuffle起飞 https://mp.weixin.qq.com/s/lk3LGfUeJJf2OfZk0nSxNw

Shuttle2.0: 大数据计算引擎推进器
作者介绍

David Fu  OPPO大数据计算平台架构师 负责大数据计算平台技术演进设计开发,曾供职于阿里云,去哪儿网大数据平台,拥有10年大数据架构,开发经验。

Jack Xu OPPO高级数据平台工程师 目前就职于OPPO数据架构团队,主要负责Spark计算引擎和Shuttle的开发,拥有丰富大数据架构和开发经验。
END
About AndesBrain

安第斯智能云

OPPO 安第斯智能云(AndesBrain)是服务个人、家庭与开发者的泛终端智能云,致力于“让终端更智能”。作为 OPPO 三大核心技术之一,安第斯智能云提供端云协同的数据存储与智能计算服务,是万物互融的“数智大脑”。

原文始发于微信公众号(安第斯智能云):Shuttle2.0: 大数据计算引擎推进器

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2023年4月27日20:49:35
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   Shuttle2.0: 大数据计算引擎推进器http://cn-sec.com/archives/1694469.html

发表评论

匿名网友 填写信息