[关闭]
@tsing1226 2015-12-31T11:35:19.000000Z 字数 1857 阅读 987

mapreduce

MapReduce Shuffle过程

MapReduce里的Shuffle

描述着数据从map task输出到reduce task输入的这段过程。

map side

Map里的Shuffle过程描述

map函数开始输出时,并不是简简单单地将它写到磁盘。它利用缓冲的方式写到内存并出于效率的考虑进行预处理。

1、 每个Map Task都有一个环形缓冲内存,每个内存默认是100MB(大小可以通过mapreduce.task.io.sort.mb属性进行修改),当内存达到阈值大小时(mapreduce.map.sort.spill.percent,属性,默认大小为80%或0.8),溢出的部分可以溢写到磁盘上,当数据写到硬盘的过程中,Map输出还可以向缓冲内存中写入,除非这时内存已被写满。溢写本地磁盘完成时Map阶段结束。

2、在环形缓存区中进行分区、排序,分区(partitioner)决定了map输出的数据被哪个reduce task进行处理,排序会对分区中的数据进行排序。

  • 分区(Partitioner)

在Map输出写到本地磁盘之前,首先将数据进行相应的分区,使其能够传送到相应发reducer中。分区决定map输出文件,被哪个reduce任务进行处理。

  • 排序(Sorter)

在每一个分区后台线程能够对内存中的数据按key排序,如果将各个分区的数据合并,它将输出排序好的数据。合并(combiner)函数能让map输出比较紧凑,因此有较少的数据写入本地磁盘和传送给reducer()。

当分区、排序过后,需要将数据spill到本地磁盘,但是spill前需要将数据进行merge(合并),按规定相同reduce需要处理的数据进行合并,合并以后把每个分区里的数据进行排序,最后形成一个文件,分区完成的,并且各个分区中的数据已经排序完成

  • 合并(Combiner)(可选)

**3、当内存缓冲达到spill阈值时,新的spill就会被创建,之后map task就会接着上次记录继续输出,并存在于多个spill文件中。在任务结束前,spill文件将会被合并成一个分区并按序输出。配置文件mapreduce.task.io.sort.factor控制着最大数据合并数,默认是10.

当spill文件超过3个时(mapreduce.map.combine.minspills属性配置,在文件输出之前,Combiner函数将会再次运行,可能再次运行的Combiner可能对输出结果并没有太大影响。当只有两个或一个spill文件时,可能的reducer函数处理map输出数据大小不足以开销复杂的合并,这时将不对map输出的数据进行合并处理。**

  • 压缩(Compress)(可选)

在map输出数据在写入本地磁盘之前对数据压缩是不错的选择,这样做的优势就是能够使数据更快的写入本地磁盘,节约本地磁盘,减少向reduce传送量。默认情况下,map输出的数据并不会压缩,但是它可以通过设置mapreduce.map.output.compress实现。也可以在main()实现:

Reduce Side

Reduce里的Shuffle过程描述

1、复制文件

Map输出文件被放在机器的本地磁盘并运行Map任务(Map任务的输出通常写在本地磁盘,reduce输出则在本地磁盘或内存中)。但当机器运行reduce部分时,reduce任务需要特定区域的map输出并通过集群的执行几个reduce任务。这些的map任务可能在不同时间结束,reduce任务开始复制每个完成map任务的数据,这被称作为reduce的复制阶段。Reduce拥有比较小的复制阈值,能够保证及时并行的获取map输出值。这些线程默认是5个,但是可以通过属性mapreduce.reduce.shuffle.parallelcopies进行设定。

2、排序(合并)

当所有的map输出被复制完毕,reduce任务进入排序阶段,它能够合并map输出,并能够保持原有排序。这个过程需要几轮才可以完成。例如,假设有50个map输出合并因子为10(由mapreduce.task.io.sort.factor属性确定),这时就需要5轮才能完成reduce的复制。每轮将合并10个文件,那结果就是产生5个临时文件。

  • 分组(group)

最后一轮将会把所有文件合并成一个排序文件,这种合并减少了磁盘的访问并进入了reduce的最后一个阶段:reduce阶段。最后对来自内存和本地磁盘文件把相同key的value放在一起分组。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注