[关闭]
@zhangyy 2017-02-04T07:03:23.000000Z 字数 4374 阅读 136

mapreduce 高级案例倒排索引

hadoop的部分


  • 理解【倒排索引】的功能
  • 熟悉mapreduce 中的combine 功能
  • 根据需求编码实现【倒排索引】的功能,旨在理解mapreduce 的功能。

一:理解【倒排索引】的功能

倒排索引:
由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,因而称为倒排索引
简单来说根据单词,返回它在哪个文件中出现过,而且频率是多少的结果。例如:就像百度里的搜索,你输入一个关键字,那么百度引擎就迅速的在它的服务器里找到有该关键字的文件,并根据频率和其他一些策略(如页面点击投票率)等来给你返回结果。

二:熟悉mapreduce 中的combine 功能

1 Map过程:Map过程首先分析输入的对,得到索引中需要的信息:单词,文档URI 和词频。key:单词和URI.value:出现同样单词的次数。

2 Combine过程:经过map方法处理后,Combine过程将key值相同的value值累加,得到一个单词在文档中的词频。

3 Reduce过程:经过上述的俩个过程后,Reduce过程只需要将相同的key值的value值组合成倒排引索文件的格式即可,其余的事情直接交给MapReduce框架进行处理
4.png-15.3kB

三:根据需求编码实现【倒排索引】的功能,旨在理解mapreduce 的功能。

InvertedIndexMapReduce.java

  1. package org.apache.hadoop.studyhadoop.index;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Mapper.Context;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.util.Tool;
  16. import org.apache.hadoop.util.ToolRunner;
  17. /**
  18. *
  19. * @author zhangyy
  20. *
  21. */
  22. public class InvertedIndexMapReduce extends Configured implements Tool {
  23. // step 1 : mapper
  24. /**
  25. * public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  26. */
  27. public static class WordCountMapper extends //
  28. Mapper<LongWritable, Text, Text, Text> {
  29. private Text mapOutputKey = new Text();
  30. private Text mapOutputValue = new Text("1");
  31. @Override
  32. public void map(LongWritable key, Text value, Context context)
  33. throws IOException, InterruptedException {
  34. // split1
  35. String[] lines = value.toString().split("##");
  36. // get url
  37. String url = lines[0];
  38. // split2
  39. String[] strs = lines[1].split(" ");
  40. for (String str : strs) {
  41. mapOutputKey.set(str + "," + url);
  42. context.write(mapOutputKey, mapOutputValue);
  43. }
  44. }
  45. }
  46. // set combiner class
  47. public static class InvertedIndexCombiner extends //
  48. Reducer<Text, Text, Text, Text> {
  49. private Text CombinerOutputKey = new Text();
  50. private Text CombinerOutputValue = new Text();
  51. @Override
  52. public void reduce(Text key, Iterable<Text> values, Context context)
  53. throws IOException, InterruptedException {
  54. // split
  55. String[] strs = key.toString().split(",");
  56. // set key
  57. CombinerOutputKey.set(strs[0] + "\n");
  58. // set value
  59. int sum = 0;
  60. for (Text value : values) {
  61. sum += Integer.valueOf(value.toString());
  62. }
  63. CombinerOutputValue.set(strs[1] + ":" + sum);
  64. context.write(CombinerOutputKey, CombinerOutputValue);
  65. }
  66. }
  67. // step 2 : reducer
  68. public static class WordCountReducer extends //
  69. Reducer<Text, Text, Text, Text> {
  70. private Text outputValue = new Text();
  71. @Override
  72. public void reduce(Text key, Iterable<Text> values, Context context)
  73. throws IOException, InterruptedException {
  74. // TODO
  75. String result = new String();
  76. for (Text value : values) {
  77. result += value.toString() + "\t";
  78. }
  79. outputValue.set(result);
  80. context.write(key, outputValue);
  81. }
  82. }
  83. // step 3 : job
  84. public int run(String[] args) throws Exception {
  85. // 1 : get configuration
  86. Configuration configuration = super.getConf();
  87. // 2 : create job
  88. Job job = Job.getInstance(//
  89. configuration,//
  90. this.getClass().getSimpleName());
  91. job.setJarByClass(InvertedIndexMapReduce.class);
  92. // job.setNumReduceTasks(tasks);
  93. // 3 : set job
  94. // input --> map --> reduce --> output
  95. // 3.1 : input
  96. Path inPath = new Path(args[0]);
  97. FileInputFormat.addInputPath(job, inPath);
  98. // 3.2 : mapper
  99. job.setMapperClass(WordCountMapper.class);
  100. // TODO
  101. job.setMapOutputKeyClass(Text.class);
  102. job.setMapOutputValueClass(Text.class);
  103. // ====================shuffle==========================
  104. // 1: partition
  105. // job.setPartitionerClass(cls);
  106. // 2: sort
  107. // job.setSortComparatorClass(cls);
  108. // 3: combine
  109. job.setCombinerClass(InvertedIndexCombiner.class);
  110. // 4: compress
  111. // set by configuration
  112. // 5 : group
  113. // job.setGroupingComparatorClass(cls);
  114. // ====================shuffle==========================
  115. // 3.3 : reducer
  116. job.setReducerClass(WordCountReducer.class);
  117. // TODO
  118. job.setOutputKeyClass(Text.class);
  119. job.setOutputValueClass(Text.class);
  120. // 3.4 : output
  121. Path outPath = new Path(args[1]);
  122. FileOutputFormat.setOutputPath(job, outPath);
  123. // 4 : submit job
  124. boolean isSuccess = job.waitForCompletion(true);
  125. return isSuccess ? 0 : 1;
  126. }
  127. public static void main(String[] args) throws Exception {
  128. args = new String[] {
  129. "hdfs://namenode01.hadoop.com:8020/input/index.txt",
  130. "hdfs://namenode01.hadoop.com:8020/outputindex/"
  131. };
  132. // get configuration
  133. Configuration configuration = new Configuration();
  134. // configuration.set(name, value);
  135. // run job
  136. int status = ToolRunner.run(//
  137. configuration,//
  138. new InvertedIndexMapReduce(),//
  139. args);
  140. // exit program
  141. System.exit(status);
  142. }
  143. }

上传文件:
hdfs dfs -put index.txt /input

代码运行结果:
5.png-125.6kB
6.png-47.1kB
输出结果:
7.png-22.5kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注