[关闭]
@songlaf 2016-05-12T20:41:16.000000Z 字数 9854 阅读 800

作业十【MapReduce高级应用练习】

北风网大数据培训


一)编码实现二次排序

1)MR的运行流程

622084127.jpg-164.1kB
(a)Mapper任务会接收输入分片,调用map函数,对记录进行处理。处理完毕,转换为新的输出。
(b)对map函数输出的调用分区函数进行分区。不同分区的数据会被送到不同的Reducer任务中。
(c)对于不同分区的数据,会按照key进行排序,这里的key必须实现WritableComparable接口。该接口实现了Comparable接口,因此可以进行比较排序。
(d)对于排序后的,会按照key进行分组。如果key相同,那么相同key的就被分到一个组中。最终,每个分组会调用一次reduce函数。
(e)排序、分组后的数据会被送到Reducer节点。

2)二次排序

MR默认会对键进行排序,然而有的时候我们也有对值进行排序的需求。满足这种需求一是可以在reduce阶段排序收集过来的values,但是,如果有数量巨大的values可能就会导致内存溢出等问题,所以我们可以把Key和需要排序的字段组合生成一个新的key,我们可以称之为newKey,再map段用newKey进行处理。

在分区分组的过程中,如果继续是用newKey,那么分区分组的数量就会变得很多,分区分组会消耗大量的资源,这个时候可以修改分区分组的规则,采用原来的key进行分区分组。

3) 实现代码

3.1)自定义newKey

  1. package njt.song.study.hadoop;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.WritableComparable;
  6. public class PairWritable implements WritableComparable<PairWritable> {
  7. public PairWritable() {
  8. super();
  9. }
  10. public PairWritable(String first, int second) {
  11. super();
  12. this.first = first;
  13. this.second = second;
  14. }
  15. public void Set(String first, int second) {
  16. this.setFirst(first);
  17. this.setSecond(second);
  18. }
  19. public String getFirst() {
  20. return first;
  21. }
  22. public void setFirst(String first) {
  23. this.first = first;
  24. }
  25. public int getSecond() {
  26. return second;
  27. }
  28. public void setSecond(int second) {
  29. this.second = second;
  30. }
  31. private String first;
  32. private int second;
  33. @Override
  34. public void readFields(DataInput in) throws IOException {
  35. // TODO Auto-generated method stub
  36. this.setFirst(in.readUTF());
  37. this.setSecond(in.readInt());
  38. }
  39. @Override
  40. public void write(DataOutput out) throws IOException {
  41. // TODO Auto-generated method stub
  42. out.writeUTF(this.getFirst());
  43. out.writeInt(this.getSecond());
  44. }
  45. @Override
  46. public int compareTo(PairWritable o) {
  47. int comp = this.getFirst().compareTo(o.getFirst());
  48. if(0 != comp){
  49. return comp;
  50. }
  51. return Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond()));
  52. }
  53. @Override
  54. public String toString() {
  55. // TODO Auto-generated method stub
  56. return this.getFirst() + "," + this.getSecond();
  57. }
  58. }

3.2)自定义分区类

  1. package njt.song.study.hadoop;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.mapreduce.Partitioner;
  4. public class FirstPartitioner extends Partitioner<PairWritable, IntWritable> {
  5. @Override
  6. public int getPartition(PairWritable key, IntWritable value,
  7. int numPartitions) {
  8. return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
  9. }
  10. }

3.3) 自定义分组类

  1. package njt.song.study.hadoop;
  2. import org.apache.hadoop.io.RawComparator;
  3. import org.apache.hadoop.io.WritableComparator;
  4. public class FirstGroupCompair implements RawComparator<PairWritable> {
  5. @Override
  6. public int compare(PairWritable o1, PairWritable o2) {
  7. // TODO Auto-generated method stub
  8. return o1.getFirst().compareTo(o2.getFirst());
  9. }
  10. @Override
  11. public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
  12. int l2) {
  13. return WritableComparator.compareBytes(b1, 0, l1 -4, b2, 0, l2-4);
  14. }
  15. }

3.4) MR程序

  1. package njt.song.study.hadoop;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.util.Tool;
  14. import org.apache.hadoop.util.ToolRunner;
  15. public class WordCount extends Configured implements Tool {
  16. public static class WordCountMapper extends Mapper<Object,Text,PairWritable,IntWritable> {
  17. private IntWritable outPutValue = new IntWritable();
  18. private PairWritable outPutKey = new PairWritable();
  19. protected void map(Object key, Text value, Context context)
  20. throws IOException, InterruptedException {
  21. String line = value.toString();
  22. String[] words = line.split(",");
  23. outPutKey.Set(words[0], Integer.valueOf(words[1]));
  24. outPutValue.set(Integer.valueOf(words[1]));
  25. context.write(outPutKey, outPutValue);
  26. }
  27. }
  28. public static class WordCountReducer extends Reducer<PairWritable,IntWritable,Text,IntWritable> {
  29. private Text outPutkey = new Text();
  30. @Override
  31. protected void reduce(PairWritable key, Iterable<IntWritable> values,Context context)
  32. throws IOException, InterruptedException {
  33. outPutkey.set(key.getFirst());
  34. for(IntWritable value:values){
  35. context.write(outPutkey, value);
  36. }
  37. }
  38. }
  39. public static void main(String[] args) throws Exception{
  40. Configuration conf = new Configuration();
  41. int status = ToolRunner.run(conf, new WordCount(),args);
  42. System.exit(status);
  43. }
  44. public int run(String[] args) throws Exception {
  45. Configuration configuration = super.getConf();
  46. Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
  47. job.setJarByClass(WordCount.class);
  48. Path inPath = new Path(args[0]);
  49. FileInputFormat.addInputPath(job, inPath);
  50. job.setMapperClass(WordCountMapper.class);
  51. job.setMapOutputKeyClass(PairWritable.class);
  52. job.setMapOutputValueClass(IntWritable.class);
  53. job.setPartitionerClass(FirstPartitioner.class);
  54. job.setGroupingComparatorClass(FirstGroupCompair.class);
  55. job.setReducerClass(WordCountReducer.class);
  56. job.setOutputKeyClass(Text.class);
  57. job.setOutputValueClass(IntWritable.class);
  58. Path outPath = new Path(args[1]);
  59. FileOutputFormat.setOutputPath(job, outPath);
  60. boolean isSuccess = job.waitForCompletion(true);
  61. return isSuccess ? 0 : 1;
  62. }
  63. }

3.5)运行

  1. #打包成SecondSortjar.jar
  2. bin/yarn jar /home/sjf/SecondSortjar.jar /input/word1.txt /Out21
  1. #执行之前内容
  2. i,20
  3. b,1
  4. c,40
  5. a,10
  6. b,20
  7. c,300
  8. #执行之后内容
  9. a 10
  10. b 1
  11. b 20
  12. c 40
  13. c 300
  14. i 20

执行过程截图:
无标题.jpg-151.2kB

二)编码实现Join

1)Join的三种方式

1.1) reduce join

连接在Reduce端进行,适合大表对大表的连接,根据key把来自于不同文件的数据根据相同的Key,把Value连接在一起。

1.2) map join

在map段进行,两个文件一大一小,小的占用的资源比较少,可以放到内存中执行,采用DistributedCache来实现。遍历大表,从内存中的小表根据key找到匹配的数据,在reduce几乎没有任何改变。

1.3) Semi Join

SemiJoin,也叫半连接,综合Reduce join和Map join的优点,map端和reduce端都用到。对于reduce端的join,跨机器的数据传输量非常大,为了提高性能,先在map端滤掉不参加Join的数据,留下那些需要Join的数据到Reduce端进行处理。reduce端的处理和reduce join几乎相同。

2)Join的代码实现

2.1)实现原理

在map阶段,map函数同时读取多个文件,为了区分来多个文件的Key/Value,对每条数据打一个标签(tag),在reduce阶段,reduce函数根据tag区分来自不同文件的数据,然后根据相同的Key对value list进行Join连接。

2.2)实现代码

2.2.1) Tag类

  1. package njt.song.study.hadoop;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.Writable;
  6. public class DataJoinWritable implements Writable {
  7. private String tag;
  8. private String data;
  9. public DataJoinWritable() {
  10. super();
  11. }
  12. public void Set(String tag, String data) {
  13. this.tag = tag;
  14. this.data = data;
  15. }
  16. public String getTag() {
  17. return tag;
  18. }
  19. public void setTag(String tag) {
  20. this.tag = tag;
  21. }
  22. public String getData() {
  23. return data;
  24. }
  25. public void setData(String data) {
  26. this.data = data;
  27. }
  28. @Override
  29. public String toString() {
  30. // TODO Auto-generated method stub
  31. return "DataJoinWritabble Tag = [" + this.getTag() + "],Data =[" + this.getData() + "]";
  32. }
  33. @Override
  34. public void readFields(DataInput in) throws IOException {
  35. // TODO Auto-generated method stub
  36. this.setTag(in.readUTF());
  37. this.setData(in.readUTF());
  38. }
  39. @Override
  40. public void write(DataOutput out) throws IOException {
  41. // TODO Auto-generated method stub
  42. out.writeUTF(this.getTag());
  43. out.writeUTF(this.getData());
  44. }
  45. }

2.2.2) MR程序

  1. package njt.song.study.hadoop;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.conf.Configured;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.NullWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.util.Tool;
  16. import org.apache.hadoop.util.ToolRunner;
  17. public class WordCount extends Configured implements Tool {
  18. public static class WordCountMapper extends Mapper<LongWritable,Text,LongWritable,DataJoinWritable> {
  19. private DataJoinWritable outPutValue = new DataJoinWritable();
  20. private LongWritable outPutKey = new LongWritable();
  21. protected void map(LongWritable key, Text value, Context context)
  22. throws IOException, InterruptedException {
  23. String[] words = value.toString().split(",");
  24. if(words.length != 3 && words.length != 4)
  25. {
  26. return;
  27. }
  28. Long id = Long.valueOf(words[0]);
  29. outPutKey.set(id);
  30. String name = words[1];
  31. if(words.length == 3)
  32. {
  33. String phone = words[2];
  34. outPutValue.Set("customer", name + "," + phone);
  35. }else if(words.length == 4) {
  36. String price = words[2];
  37. String date = words[3];
  38. outPutValue.Set("order", name + "," + price + "," + date);
  39. }
  40. System.out.print("****Map********" + outPutKey.toString());
  41. context.write(outPutKey, outPutValue);
  42. }
  43. }
  44. public static class WordCountReducer extends Reducer<LongWritable,DataJoinWritable,NullWritable,Text> {
  45. private Text outPutValue = new Text();
  46. @Override
  47. protected void reduce(LongWritable key,Iterable<DataJoinWritable> values, Context context)
  48. throws IOException, InterruptedException {
  49. String customerInfo = new String();
  50. ArrayList<String> orderList = new ArrayList<String>();
  51. for(DataJoinWritable value :values){
  52. System.out.print("****Reduce********" + value.toString());
  53. if(value.getTag().equals("customer"))
  54. {
  55. customerInfo = value.getData();
  56. }else if(value.getTag().equals("order")){
  57. orderList.add(value.getData());
  58. }
  59. }
  60. for(String order:orderList){
  61. outPutValue.set(key.toString()+","+customerInfo+"," + order);
  62. context.write(NullWritable.get(), outPutValue);
  63. }
  64. }
  65. }
  66. public static void main(String[] args) throws Exception{
  67. Configuration conf = new Configuration();
  68. int status = ToolRunner.run(conf, new WordCount(),args);
  69. System.exit(status);
  70. }
  71. public int run(String[] args) throws Exception {
  72. Configuration configuration = super.getConf();
  73. Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
  74. job.setJarByClass(WordCount.class);
  75. Path inPath = new Path(args[0]);
  76. FileInputFormat.addInputPath(job, inPath);
  77. job.setMapperClass(WordCountMapper.class);
  78. job.setMapOutputKeyClass(LongWritable.class);
  79. job.setMapOutputValueClass(DataJoinWritable.class);
  80. job.setReducerClass(WordCountReducer.class);
  81. job.setOutputKeyClass(NullWritable.class);
  82. job.setOutputValueClass(Text.class);
  83. Path outPath = new Path(args[1]);
  84. FileOutputFormat.setOutputPath(job, outPath);
  85. boolean isSuccess = job.waitForCompletion(true);
  86. return isSuccess ? 0 : 1;
  87. }
  88. }

2.2.3) 运行

  1. #打包成jar文件,然后执行
  2. bin/yarn jar /home/sjf/datajoin.jar /orderinfo /Out10

执行之前的数据

  1. #customer数据
  2. 1,TI,137888888
  3. 2,LG,13966666
  4. #order数据
  5. 1,TI,20,2009-1-1
  6. 1,TI,21,2010-2-2
  7. 1,TI,22,2016-1-1
  8. 2,LG,30,2015-1-1
  9. 2,LG,31,2014-3-3
  10. #执行结果
  11. 1,TI,137888888,TI,22,2016-1-1
  12. 1,TI,137888888,TI,21,2010-2-2
  13. 1,TI,137888888,TI,20,2009-1-1
  14. 2,LG,13966666,LG,31,2014-3-3
  15. 2,LG,13966666,LG,30,2015-1-1

执行过程
222223333.png-268.2kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注