[关闭]
@songlaf 2016-05-14T06:59:49.000000Z 字数 3820 阅读 603

作业十一【MapReduce高级案例倒排索引】

北风网大数据培训


1) 倒排索引概念

某一个关键字,可以是在Key中,也可以在Value中的,根据这个字来找到这个字的一些数据信息,可以是与之相关的其他的数据,也可以是出现的频率等。比如搜索引擎,输入一个关键字,找出这个关键字在哪个网站出现过,出现的频率等。

2) 实现过程

以一个实际的例子来说明倒排索在Map、Combine、Reduce过程的处理。
combine运行的map端
注意:
i)Map的输出是Combine的输入,Combin的输出是Reduce的输入,
ii)Combine不能改变map的输出。

例如下面的数据:

  1. url-01##the apache hadoop
  2. url-02##apache framework hadoop
  3. url-03##the common apache
  4. url-01##apache the hadoop
  5. url-02##apache framework

2.1)Map过程

在Map端,关键字和URL合并作为一个Key,1来作为value输出,那么Map端输出的结果如下:

Key Value
the,url-01 1
apache,url-01 1
hadoop,url-01 1
...其他的省略

2.2) Combine过程

将key值相同得value值累加,得到一个单词在URL上得合计次数,然后把Key根据逗号分割,输出key=单词,value=合计次数,给reduce过程实用,于是combine输出的结果如下:

Key Value
apache url-01:2
apache url-02:2
apache url-03:1
...其他的省略

2.3) Reduce过程

Reduce就是把Combine过来的数据合并,生成我们所需要显示的格式。

3) 代码实现

3.1)Combine类

  1. package njt.song.study.hadoop;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. public class wordInverse extends Reducer<Text,Text,Text,Text> {
  6. private Text combineOutPueKey = new Text();
  7. private Text combineOutValue = new Text();
  8. @Override
  9. protected void reduce(Text key, Iterable<Text> values,Context context)
  10. throws IOException, InterruptedException {
  11. String[] lines = key.toString().split(",");
  12. combineOutPueKey.set(lines[0]);
  13. int sum = 0;
  14. for (Text value:values) {
  15. sum += Integer.valueOf(value.toString());
  16. }
  17. combineOutValue.set(lines[1] + ":" + sum);
  18. context.write(combineOutPueKey, combineOutValue);
  19. }
  20. }

3.2)MR程序

  1. package njt.song.study.hadoop;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.util.Tool;
  14. import org.apache.hadoop.util.ToolRunner;
  15. public class WordCount extends Configured implements Tool {
  16. public static class WordCountMapper extends Mapper<LongWritable,Text,Text,Text> {
  17. private Text outPutKey = new Text();
  18. private Text outPutValue = new Text("1");
  19. protected void map(LongWritable key, Text value, Context context)
  20. throws IOException, InterruptedException {
  21. String[] lines = value.toString().split("##");
  22. if(lines.length != 2){
  23. return;
  24. }
  25. String url = lines[0];
  26. String[] words = lines[1].split(" ");
  27. for(String word:words){
  28. outPutKey.set(word + "," + url);
  29. context.write(outPutKey, outPutValue);
  30. }
  31. }
  32. }
  33. public static class WordCountReducer extends Reducer<Text,Text,Text,Text> {
  34. private Text outPutValue = new Text();
  35. @Override
  36. protected void reduce(Text key, Iterable<Text> values,Context context)
  37. throws IOException, InterruptedException {
  38. String result = "";
  39. for(Text value:values){
  40. result += value.toString() + "\t";
  41. }
  42. outPutValue.set(result);
  43. context.write(key, outPutValue);
  44. }
  45. }
  46. public static void main(String[] args) throws Exception{
  47. Configuration conf = new Configuration();
  48. int status = ToolRunner.run(conf, new WordCount(),args);
  49. System.exit(status);
  50. }
  51. public int run(String[] args) throws Exception {
  52. Configuration configuration = super.getConf();
  53. Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
  54. job.setJarByClass(WordCount.class);
  55. Path inPath = new Path(args[0]);
  56. FileInputFormat.addInputPath(job, inPath);
  57. job.setMapperClass(WordCountMapper.class);
  58. job.setMapOutputKeyClass(Text.class);
  59. job.setMapOutputValueClass(Text.class);
  60. job.setCombinerClass(wordInverse.class);
  61. job.setReducerClass(WordCountReducer.class);
  62. job.setOutputKeyClass(Text.class);
  63. job.setOutputValueClass(Text.class);
  64. Path outPath = new Path(args[1]);
  65. FileOutputFormat.setOutputPath(job, outPath);
  66. boolean isSuccess = job.waitForCompletion(true);
  67. return isSuccess ? 0 : 1;
  68. }
  69. }

3.3) 执行

  1. #编译成Jar文件,以上述的例子文件为蓝本来处理
  2. bin/yarn jar /home/sjf/WordInverse.jar /input2 /Out20

执行结果:

  1. bin/hdfs dfs -cat /Out20/part-r-00000
  1. apache url-01:2 url-02:2 url-03:1
  2. common url-03:1
  3. framework url-02:2
  4. hadoop url-01:2 url-02:1
  5. the url-01:2 url-03:1

执行过程截图:
无标题1.png-136.7kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注