[关闭]
@awsekfozc 2016-01-08T19:09:37.000000Z 字数 3361 阅读 1639

MapReducew五大过程

MapReduce

mapreduce五大过程.png-47kB

Input

  • 将输入日志数据,数据库数据输入到map函数 (key-value形式)
  • 一行记录对于一个键值对(key-value)

Map

1)每个节点都需要执行代码(需要打成jar包)
2)对输入的数据做逻辑处理。
3)接受一个键值对(key-value),产生一组中间键值对。map函数产生的中间键值对里键相同的值传递给一个reduce函数。
4)map任务结束之前,将处理的结果放置再内存中(默认大小为100M),称之为“环形缓冲区”。当内存占到一定比率时(默认80%),把内存中的数据溢写到磁盘(spill)

Shuffle

map shuffle

1)分区partitioner:当map在内存中的数据溢写到磁盘之前。根据reduce的不同。自动将数据分为到不同的区域。决定了map输出的数据被那个对应的reduce处理。
2)排序sorter:对分区中的数据排序,此时数据还在内存之中,排序速度快。
3)溢写spill:上述过程完成后,将数据写到spill到本地磁盘。
4)合并merge:当map处理数据结束以后。合并各个分区的数据,排序合并后的数据。形成一个分区完成排序完成的文件。
5)map端的reduce(combiner)(可选,不是所有mapreduce都能设置combiner)
6)压缩:压缩Map输出的数据

参数设置

  1. <!-- mapred-site.xml -->
  2. <!--Map任务CPU最小核数-->
  3. <property>
  4. <name>mapreduce.map.cpu.vcores</name>
  5. <value>1</value>
  6. </property>
  7. <!--Reduce任务CPU最小核数-->
  8. <property>
  9. <name>mapreduce.reduce.cpu.vcores</name>
  10. <value>1</value>
  11. </property>
  12. <!--在排序时合并的流的数目-->
  13. <property>
  14. <name>mapreduce.task.io.sort.factor</name>
  15. <value>10</value>
  16. </property>
  17. <!--环形缓存的大小-->
  18. <property>
  19. <name>mapreduce.task.io.sort.mb</name>
  20. <value>100</value>
  21. </property>
  22. <!--缓存区的限制(80%)-->
  23. <property>
  24. <name>mapreduce.map.sort.spill.percent</name>
  25. <value>0.80</value>
  26. </property>

分区设置(partitioner)

  1. /**
  2. * map shuffle 分区设置(partitioner)
  3. */
  4. package com.zc.hadoop.shuffle;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Partitioner;
  7. public class TestPartitioner extends Partitioner<Text, Text>{
  8. public int getPartition(Text arg0, Text arg1, int arg2) {
  9. // TODO Auto-generated method stub
  10. return 0;
  11. }
  12. }
  13. /**
  14. * mapreduce中调用
  15. */
  16. job.setPartitionerClass(TestPartitioner.class);

排序,分组设置(srot,group)

  1. /**
  2. * map shuffle 排序,分组设置(srot,group)
  3. */
  4. package com.zc.hadoop.shuffle;
  5. import org.apache.hadoop.io.RawComparator;
  6. import com.zc.hadoop.mapreduce.WordCountMr;
  7. import com.zc.hadoop.mapreduce.WordCountMr.WCMapper;
  8. public class TestRawComparator implements RawComparator<WordCountMr.WCMapper> {
  9. public int compare(WCMapper arg0, WCMapper arg1) {
  10. // TODO Auto-generated method stub
  11. return 0;
  12. }
  13. public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
  14. int arg5) {
  15. // TODO Auto-generated method stub
  16. return 0;
  17. }
  18. }
  19. /**
  20. * mapreduce中调用
  21. */
  22. job.setCombinerClass(TestRawComparator.class);
  23. job.setGroupingComparatorClass(TestRawComparator.class);

combiner设置

  1. /**
  2. * combiner
  3. */
  4. package com.zc.hadoop.shuffle;
  5. import java.io.IOException;
  6. import java.util.Iterator;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapred.OutputCollector;
  10. import org.apache.hadoop.mapred.Reporter;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. public class TestCombiner extends Reducer<Text, IntWritable, IntWritable, Text>{
  13. public void reduce(Text key, Iterator<IntWritable> values,
  14. OutputCollector<IntWritable, Text> output, Reporter reporter)
  15. throws IOException {
  16. // TODO Auto-generated method stub
  17. }
  18. }
  19. /**
  20. * mapreduce中调用
  21. */
  22. job.setCombinerClass(WordCountCombiner.class);
combiner结果

c2.png-35.9kB

压缩设置(comperss)

  1. /**
  2. * 设置配置(代码设置方式)
  3. * param1:
  4. * param2:
  5. */
  6. /**
  7. * param1:启用压缩
  8. * param2:true 启用
  9. */
  10. configuration.set("mapreduce.map.output.compress", "true");
  11. /**
  12. * param1:设置压缩类型
  13. * param2:类路径
  14. */
  15. configuration.set( "mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
  1. <!--配置设置方式-->
  2. <!--启用压缩-->
  3. <property>
  4. <name>mapreduce.map.output.compress</name>
  5. <value>true</value>
  6. </property>
  7. <!--设置压缩方式-->
  8. <property>
  9. <name>mapreduce.map.output.compress.codec</name>
  10. <value>org.apache.hadoop.io.compress.SnappyCodec</value>
  11. </property>

reduce shuffle

1)读取数据:已完成的map task存储在本地磁盘的数据(map shuffle过程一将之,分组,合并,排序)。为各个reduce task去拉取每个reduce要处理的数据。将相同key的数据合并在一起,放置再内存中。

Reduce

1)接收合并好的数据,业务处理
2)结果写入磁盘

Output

输出reduce后的数据到文件系统中。


在此输入正文

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注