概要1
Shuffle是一种数据重组,由于分布式计算的特性和要求,在实现细节方面更加复杂复杂
在MapReduce框架中,Shuffle是连接Map和Reduce的桥梁,在Map阶段通过Shuffle读取数据并输出到相应的Reduce; Reduce阶段负责从Map端提取数据进行计算。 整个shuffle进程通常涉及大量磁盘和网络I/O。 因此,shuffle的性能高低也直接决定着整个程序的性能高低。 Spark也有自己的shuffle实现过程
在DAG调度过程中,Stage阶段的划分取决于是否存在shuffle过程,也就是说如果存在ShuffleDependency宽度的依赖,则需要进行shuffle,此时将作业job划分为多个Stage; 然后在分割Stage时,在构建ShuffleDependency时进行shuffle注册,获取后续数据读取所需的ShuffleHandle,最终在每个job提交后与一个ResultStage和几个shuffle ResultStage和ShuffleMapStage中的任务分别对应于ResultTask和ShuffleMapTask。 除最终ResultStage之外的几个ShuffleMapStage中的每个ShuffleMapTask都必须根据对应的分区对数据进行分组,以使分区中的数据持久化。
一混洗机制
1.1混列shuffle概述
在spark-1.6发行版之前使用的是HashShuffle,但从spark-1.6发行版开始使用的是Sort-Base Shuffle。 由于散列缓冲区存在不足,因此替换了散列缓冲区。
据了解,Spark的运行主要分为两个部分。 部分是驱动程序,其核心是SparkContext。 另一个是工作器节点的任务,它正在执行实际任务。 程序运行时,驱动程序和执行程序进程会相互作用。 执行哪些任务,即驱动程序将任务分配给执行程序,驱动程序与执行程序进行网络传输。 任务数据从哪里获取,即Task从驱动程序中获取其他上游Task的数据结果,因此在该过程中会陆续生成网络结果。 其中,下一个Stage向前一个Stage请求数据的过程称为Shuffle。
1.2以前的混洗shuffle机制没有优化
在未优化散列缓冲区之前,每个ShufflleMapTask会为每个ReduceTask创建一个bucket缓存,并为每个bucket创建一个文件。 保存在此bucket中的数据通过Partitioner操作默认为HashPartitioner )找到对应的bucket并放入,最后将数据保存到
将bucket缓存中的数据更新为磁盘,即相应的块文件。
然后,ShuffleMapTask将输出作为映射状态发送到DAGScheduler的MapOutputTrackerMaster。 每个映射状态包含每个结果任务提取的数据的位置和大小
然后,ResultTask利用BlockStoreShuffleFetcher从MapOutputTrackerMaster检索地图状态,以确定哪些数据属于自己。 然后,基础架构通过块管理器提取数据
拉进来的数据构成内部的ShuffleRDD,优先放入内存,内存不够就放入磁盘,ResulTask开始聚合,最后生成我们想要获取的MapPartitionRDD
缺点:
如上图所示,这里有一个工作器、两个执行器,每个执行器运行两个ShuffleMapTask,有三个ReduceTask,所以总共有4 * 3=12个bucket和12个bluble
#数据量大时,生成M*R个小文件。 例如,如果有100个ShuffleMapTask和100个ResultTask,则会生成100*100=10000个小文件
#备份缓存很重要。 必须将ShuffleMapTask中的所有数据写入bucket,然后刷新到磁盘。 Map端的数据过多容易发生内存溢出。 后面有一个优化,bucket写入的数据在达到刷新到磁盘的阈值后,会一点一点地刷新到磁盘,但磁盘I/O会变多
1.3优化的混洗shuffle
每个Executor进程都根据核心数确定任务的并发行数。 例如,如果执行程序核心数为2,则可以同时运行两个Task,一个则只能运行一个Task
骗人的
设executor核数是1,ShuffleMapTask数量是M,那么它依然会根据ResultTask的数量R,创建R个bucket缓存,然后对key进行hash,数据进入不同的bucket中,每一个bucket对应着一个block file,用于刷新bucket缓存里的数据
然后下一个task运行的时候,那么不会再创建新的bucket和block file,而是复用之前的task已经创建好的bucket和block file。即所谓同一个Executor进程里所有Task都会把相同的key放入相同的bucket缓冲区中
这样的话,生成文件的数量就是本地worker的executor数量*executor的cores*ResultTask数量)如上图所示,即2 * 1* 3 = 6个文件,每一个Executor的shuffleMapTask数量100,ReduceTask数量为100,那么
未优化的HashShuffle的文件数是2 *1* 100*100 =20000,优化之后的数量是2*1*100 = 200文件,相当于少了100倍
缺点:如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。
二 Sort-Based Shuffle
2.1 Sort-Based Shuffle概述
HashShuffle回顾
HashShuffle写数据的时候,内存有一个bucket缓冲区,同时在本地磁盘有对应的本地文件,如果本地有文件,那么在内存应该也有文件句柄也是需要耗费内存的。也就是说,从内存的角度考虑,即有一部分存储数据,一部分管理文件句柄。如果Mapper分片数量为1000,Reduce分片数量为1000,那么总共就需要1000000个小文件。所以就会有很多内存消耗,频繁IO以及GC频繁或者出现内存溢出。
而且Reducer端读取Map端数据时,Mapper有这么多小文件,就需要打开很多网络通道读取,很容易造成Reducer(下一个stage)通过driver去拉取上一个stage数据的时候,说文件找不到,其实不是文件找不到而是程序不响应,因为正在GC.
2.2 Sorted-Based Shuffle介绍
为了缓解Shuffle过程产生文件数过多和Writer缓存开销过大的问题,spark引入了类似于hadoop Map-Reduce的shuffle机制。该机制每一个ShuffleMapTask不会为后续的任务创建单独的文件,而是会将所有的Task结果写入同一个文件,并且对应生成一个索引文件。以前的数据是放在内存缓存中,等到数据完了再刷到磁盘,现在为了减少内存的使用,在内存不够用的时候,可以将输出溢写到磁盘,结束的时候,再将这些不同的文件联合内存的数据一起进行归并,从而减少内存的使用量。一方面文件数量显著减少,另一方面减少Writer缓存所占用的内存大小,而且同时避免GC的风险和频率。
Sort-Based Shuffle有几种不同的策略:BypassMergeSortShuffleWriter、SortShuffleWriter和UnasfeSortShuffleWriter。
对于BypassMergeSortShuffleWriter,使用这个模式特点:
# 主要用于处理不需要排序和聚合的Shuffle操作,所以数据是直接写入文件,数据量较大的时候,网络I/O和内存负担较重
# 主要适合处理Reducer任务数量比较少的情况下
# 将每一个分区写入一个单独的文件,最后将这些文件合并,减少文件数量;但是这种方式需要并发打开多个文件,对内存消耗比较大
因为BypassMergeSortShuffleWriter这种方式比SortShuffleWriter更快,所以如果在Reducer数量不大,又不需要在map端聚合和排序,而且
Reducer的数目 < spark.shuffle.sort.bypassMergeThrshold指定的阀值,就是用的是这种方式。
对于SortShuffleWriter,使用这个模式特点:
# 比较适合数据量很大的场景或者集群规模很大
# 引入了外部外部排序器,可以支持在Map端进行本地聚合或者不聚合
# 如果外部排序器enable了spill功能,如果内存不够,可以先将输出溢写到本地磁盘,最后将内存结果和本地磁盘的溢写文件进行合并
对于UnsafeShuffleWriter由于需要谨慎使用,我们暂不做分析。
另外这个Sort-Based Shuffle跟Executor核数没有关系,即跟并发度没有关系,它是每一个ShuffleMapTask都会产生一个data文件和index文件,所谓合并也只是将该ShuffleMapTask的各个partition对应的分区文件合并到data文件而已。所以这个就需要个Hash-BasedShuffle的consolidation机制区别开来。