[关闭]
@zhangyy 2017-08-28T13:05:20.000000Z 字数 12692 阅读 170

mapreduce 的二次排序

hadoop的部分


  • 一: 理解二次排序的功能, 使用自己理解的方式表达(包括自定义数据类型,分区,分组,排序)
  • 二: 编写实现二次排序功能, 提供源码文件。
  • 三:理解mapreduce join 的几种 方式,编码实现reduce join,提供源代码,说出思路。

一: 理解二次排序的功能, 使用自己理解的方式表达(包括自定义数据类型,分区,分组,排序)

  1. 1) partitioner
  2. job.setPartitionerClass(FirstPartitioner.class);
  3. 2) sort
  4. job.setSortComparatorClass(cls);
  5. 3) combine
  6. job.setCombinerClass(cls);
  7. 4) compress
  8. set by configuration
  9. 5) group
  10. job.setGroupingComparatorClass(FirstGroupingComparator.class);

二: 编写实现二次排序功能, 提供源码文件。

  1. package org.apache.hadoop.studyhadoop.sort;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import org.apache.hadoop.util.Tool;
  15. import org.apache.hadoop.util.ToolRunner;
  16. /**
  17. *
  18. * @author zhangyy
  19. *
  20. */
  21. public class SecondarySortMapReduce extends Configured implements Tool{
  22. // step 1: mapper class
  23. /**
  24. * public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  25. */
  26. public static class SecondarySortMapper extends //
  27. Mapper<LongWritable,Text,PairWritable,IntWritable>{
  28. private PairWritable mapOutputKey = new PairWritable() ;
  29. private IntWritable mapOutputValue = new IntWritable() ;
  30. @Override
  31. public void map(LongWritable key, Text value, Context context)
  32. throws IOException, InterruptedException {
  33. // line value
  34. String lineValue = value.toString();
  35. // split
  36. String[] strs = lineValue.split(",") ;
  37. // invalidate
  38. if(2 != strs.length){
  39. return ;
  40. }
  41. // set map output key and value
  42. mapOutputKey.set(strs[0], Integer.valueOf(strs[1]));
  43. mapOutputValue.set(Integer.valueOf(strs[1]));
  44. // output
  45. context.write(mapOutputKey, mapOutputValue);
  46. }
  47. }
  48. // step 2: reducer class
  49. /**
  50. * public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
  51. */
  52. public static class SecondarySortReducer extends //
  53. Reducer<PairWritable,IntWritable,Text,IntWritable>{
  54. private Text outputKey = new Text() ;
  55. @Override
  56. public void reduce(PairWritable key, Iterable<IntWritable> values,
  57. Context context)
  58. throws IOException, InterruptedException {
  59. // set output key
  60. outputKey.set(key.getFirst());
  61. // iterator
  62. for(IntWritable value : values){
  63. // output
  64. context.write(outputKey, value);
  65. }
  66. }
  67. }
  68. // step 3: driver
  69. public int run(String[] args) throws Exception {
  70. // 1: get configuration
  71. Configuration configuration = super.getConf() ;
  72. // 2: create job
  73. Job job = Job.getInstance(//
  74. configuration, //
  75. this.getClass().getSimpleName()//
  76. );
  77. job.setJarByClass(this.getClass());
  78. // 3: set job
  79. // input -> map -> reduce -> output
  80. // 3.1: input
  81. Path inPath = new Path(args[0]) ;
  82. FileInputFormat.addInputPath(job, inPath);
  83. // 3.2: mapper
  84. job.setMapperClass(SecondarySortMapper.class);
  85. job.setMapOutputKeyClass(PairWritable.class);
  86. job.setMapOutputValueClass(IntWritable.class);
  87. // ===========================Shuffle======================================
  88. // 1) partitioner
  89. job.setPartitionerClass(FirstPartitioner.class);
  90. // 2) sort
  91. // job.setSortComparatorClass(cls);
  92. // 3) combine
  93. // job.setCombinerClass(cls);
  94. // 4) compress
  95. // set by configuration
  96. // 5) group
  97. job.setGroupingComparatorClass(FirstGroupingComparator.class);
  98. // ===========================Shuffle======================================
  99. // 3.3: reducer
  100. job.setReducerClass(SecondarySortReducer.class);
  101. job.setOutputKeyClass(IntWritable.class);
  102. job.setOutputValueClass(IntWritable.class);
  103. // set reducer number
  104. job.setNumReduceTasks(2);
  105. // 3.4: output
  106. Path outPath = new Path(args[1]);
  107. FileOutputFormat.setOutputPath(job, outPath);
  108. // 4: submit job
  109. boolean isSuccess = job.waitForCompletion(true);
  110. return isSuccess ? 0 : 1 ;
  111. }
  112. public static void main(String[] args) throws Exception {
  113. args = new String[]{
  114. "hdfs://namenode01.hadoop.com:8020/input/sort" ,//
  115. "hdfs://namenode01.hadoop.com:8020/output"
  116. };
  117. // create configuration
  118. Configuration configuration = new Configuration();
  119. // run job
  120. int status = ToolRunner.run(//
  121. configuration, //
  122. new SecondarySortMapReduce(), //
  123. args
  124. ) ;
  125. // exit program
  126. System.exit(status);
  127. }
  128. }

PairWritable.java

  1. package org.apache.hadoop.studyhadoop.sort;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.WritableComparable;
  6. public class PairWritable implements WritableComparable<PairWritable> {
  7. private String first;
  8. private int second;
  9. public PairWritable() {
  10. }
  11. public PairWritable(String first, int second) {
  12. this.set(first, second);
  13. }
  14. public void set(String first, int second) {
  15. this.first = first;
  16. this.setSecond(second);
  17. }
  18. public String getFirst() {
  19. return first;
  20. }
  21. public void setFirst(String first) {
  22. this.first = first;
  23. }
  24. public int getSecond() {
  25. return second - Integer.MAX_VALUE;
  26. }
  27. public void setSecond(int second) {
  28. this.second = second + Integer.MAX_VALUE;
  29. }
  30. public void write(DataOutput out) throws IOException {
  31. out.writeUTF(first);
  32. out.writeInt(second);
  33. }
  34. public void readFields(DataInput in) throws IOException {
  35. this.first = in.readUTF();
  36. this.second = in.readInt();
  37. }
  38. public int compareTo(PairWritable o) {
  39. // compare first
  40. int comp =this.first.compareTo(o.getFirst()) ;
  41. // eqauls
  42. if(0 != comp){
  43. return comp ;
  44. }
  45. // compare
  46. return Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond())) ;
  47. }
  48. }

FirstPartitioner.java

  1. package org.apache.hadoop.studyhadoop.sort;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.mapreduce.Partitioner;
  4. public class FirstPartitioner extends Partitioner<PairWritable,IntWritable> {
  5. @Override
  6. public int getPartition(PairWritable key, IntWritable value,
  7. int numPartitions) {
  8. return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
  9. }
  10. }

FirstGroupingComparator.java

  1. package org.apache.hadoop.studyhadoop.sort;
  2. import org.apache.hadoop.io.RawComparator;
  3. import org.apache.hadoop.io.WritableComparator;
  4. public class FirstGroupingComparator implements RawComparator<PairWritable> {
  5. // object compare
  6. public int compare(PairWritable o1, PairWritable o2) {
  7. return o1.getFirst().compareTo(o2.getFirst());
  8. }
  9. // bytes compare
  10. public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  11. return WritableComparator.compareBytes(b1, 0, l1 - 4, b2, 0, l2 - 4);
  12. }
  13. }

上传数据处理:
hdfs dfs -put sort /input

运行输出:
12.png-116.8kB
2.png-41.5kB

3.png-10.7kB

三:理解mapreduce join 的几种 方式,编码实现reduce join,提供源代码,说出思路。

mapreduce join 有三种:

  1. 3.1 map 的端的join
  2. map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。
  3. Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
  4. 为了支持文件的复制,Hadoop提供了一个类DistributedCache 去实现。
  5. 3.2 reduce 的端的join
  6. map阶段,map函数同时读取两个文件File1File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。
  7. reduce阶段,reduce函数获取key相同的来自File1File2文件的value list 然后对于同一个key,对File1File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作
  8. 3.3 SemiJoin
  9. SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO
  10. 实现方法很简单:选取一个小表,假设是File1,将其参与joinkey抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCacheFile3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同

3.4 编程代码:
DataJoinMapReduce.java


  1. package org.apache.hadoop.studyhadoop.join;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.conf.Configured;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.LongWritable;
  10. import org.apache.hadoop.io.NullWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.mapreduce.Job;
  13. import org.apache.hadoop.mapreduce.Mapper;
  14. import org.apache.hadoop.mapreduce.Mapper.Context;
  15. import org.apache.hadoop.mapreduce.Reducer;
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.util.Tool;
  19. import org.apache.hadoop.util.ToolRunner;
  20. /**
  21. *
  22. * @author zhangyy
  23. *
  24. */
  25. public class DataJoinMapReduce extends Configured implements Tool {
  26. // step 1 : mapper
  27. /**
  28. * public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  29. */
  30. public static class WordCountMapper extends //
  31. Mapper<LongWritable, Text, LongWritable, DataJoinWritable> {
  32. private LongWritable mapOutputKey = new LongWritable();
  33. private DataJoinWritable mapOutputValue = new DataJoinWritable();
  34. @Override
  35. public void map(LongWritable key, Text value, Context context)
  36. throws IOException, InterruptedException {
  37. // split
  38. String[] strs = value.toString().split(",");
  39. // invalidate
  40. if ((3 != strs.length) && (4 != strs.length)) {
  41. return;
  42. }
  43. // set mapoutput key
  44. Long cid = Long.valueOf(strs[0]);
  45. mapOutputKey.set(cid);
  46. // set name
  47. String name = strs[1];
  48. // customer
  49. if (3 == strs.length) {
  50. String phone = strs[2];
  51. mapOutputValue.set("customer", name + "," + phone);
  52. }
  53. // order
  54. if (4 == strs.length) {
  55. String price = strs[2];
  56. String date = strs[3];
  57. mapOutputValue.set("order", name + "," + price + "," + date);
  58. }
  59. context.write(mapOutputKey, mapOutputValue);
  60. }
  61. }
  62. // step 2 : reducer
  63. public static class WordCountReducer extends //
  64. Reducer<LongWritable, DataJoinWritable, NullWritable, Text> {
  65. private Text outputValue = new Text();
  66. @Override
  67. public void reduce(LongWritable key, Iterable<DataJoinWritable> values,
  68. Context context) throws IOException, InterruptedException {
  69. String customerInfo = new String();
  70. List<String> orderList = new ArrayList<String>();
  71. for (DataJoinWritable value : values) {
  72. if ("customer".equals(value.getTag())) {
  73. customerInfo = value.getData();
  74. } else if ("order".equals(value.getTag())) {
  75. orderList.add(value.getData());
  76. }
  77. }
  78. for (String order : orderList) {
  79. outputValue.set(key.toString() + "," + customerInfo + ","
  80. + order);
  81. context.write(NullWritable.get(), outputValue);
  82. }
  83. }
  84. }
  85. // step 3 : job
  86. public int run(String[] args) throws Exception {
  87. // 1 : get configuration
  88. Configuration configuration = super.getConf();
  89. // 2 : create job
  90. Job job = Job.getInstance(//
  91. configuration,//
  92. this.getClass().getSimpleName());
  93. job.setJarByClass(DataJoinMapReduce.class);
  94. // job.setNumReduceTasks(tasks);
  95. // 3 : set job
  96. // input --> map --> reduce --> output
  97. // 3.1 : input
  98. Path inPath = new Path(args[0]);
  99. FileInputFormat.addInputPath(job, inPath);
  100. // 3.2 : mapper
  101. job.setMapperClass(WordCountMapper.class);
  102. // TODO
  103. job.setMapOutputKeyClass(LongWritable.class);
  104. job.setMapOutputValueClass(DataJoinWritable.class);
  105. // ====================shuffle==========================
  106. // 1: partition
  107. // job.setPartitionerClass(cls);
  108. // 2: sort
  109. // job.setSortComparatorClass(cls);
  110. // 3: combine
  111. // job.setCombinerClass(cls);
  112. // 4: compress
  113. // set by configuration
  114. // 5 : group
  115. // job.setGroupingComparatorClass(cls);
  116. // ====================shuffle==========================
  117. // 3.3 : reducer
  118. job.setReducerClass(WordCountReducer.class);
  119. // TODO
  120. job.setOutputKeyClass(NullWritable.class);
  121. job.setOutputValueClass(Text.class);
  122. // 3.4 : output
  123. Path outPath = new Path(args[1]);
  124. FileOutputFormat.setOutputPath(job, outPath);
  125. // 4 : submit job
  126. boolean isSuccess = job.waitForCompletion(true);
  127. return isSuccess ? 0 : 1;
  128. }
  129. public static void main(String[] args) throws Exception {
  130. args = new String[] {
  131. "hdfs://namenode01.hadoop.com:8020/join",
  132. "hdfs://namenode01.hadoop.com:8020/output3/"
  133. };
  134. // get configuration
  135. Configuration configuration = new Configuration();
  136. // configuration.set(name, value);
  137. // run job
  138. int status = ToolRunner.run(//
  139. configuration,//
  140. new DataJoinMapReduce(),//
  141. args);
  142. // exit program
  143. System.exit(status);
  144. }
  145. }

DataJoinWritable.java

  1. package org.apache.hadoop.studyhadoop.join;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.Writable;
  6. public class DataJoinWritable implements Writable {
  7. private String tag ;
  8. private String data ;
  9. public DataJoinWritable() {
  10. }
  11. public DataJoinWritable(String tag, String data) {
  12. this.set(tag, data);
  13. }
  14. public void set(String tag, String data) {
  15. this.setTag(tag);
  16. this.setData(data);
  17. }
  18. public String getTag() {
  19. return tag;
  20. }
  21. public void setTag(String tag) {
  22. this.tag = tag;
  23. }
  24. public String getData() {
  25. return data;
  26. }
  27. public void setData(String data) {
  28. this.data = data;
  29. }
  30. @Override
  31. public int hashCode() {
  32. final int prime = 31;
  33. int result = 1;
  34. result = prime * result + ((data == null) ? 0 : data.hashCode());
  35. result = prime * result + ((tag == null) ? 0 : tag.hashCode());
  36. return result;
  37. }
  38. @Override
  39. public boolean equals(Object obj) {
  40. if (this == obj)
  41. return true;
  42. if (obj == null)
  43. return false;
  44. if (getClass() != obj.getClass())
  45. return false;
  46. DataJoinWritable other = (DataJoinWritable) obj;
  47. if (data == null) {
  48. if (other.data != null)
  49. return false;
  50. } else if (!data.equals(other.data))
  51. return false;
  52. if (tag == null) {
  53. if (other.tag != null)
  54. return false;
  55. } else if (!tag.equals(other.tag))
  56. return false;
  57. return true;
  58. }
  59. public void write(DataOutput out) throws IOException {
  60. out.writeUTF(this.getTag());
  61. out.writeUTF(this.getData());
  62. }
  63. public void readFields(DataInput in) throws IOException {
  64. this.setTag(in.readUTF());
  65. this.setData(in.readUTF());
  66. }
  67. @Override
  68. public String toString() {
  69. return tag + "," + data ;
  70. }
  71. }

上传文件:
hdfs dfs -put customers.txt /join
hdfs dfs -put orders.txt /join

运行结果:
1.png-142.4kB
2.png-49.7kB
3.png-28.3kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注