[关闭]
@awsekfozc 2015-12-09T11:23:13.000000Z 字数 3257 阅读 1657

WordCount Partitioner

MapReduce

WordCountMr

  1. package com.zc.hadoop.mapreduce.partition;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import org.apache.hadoop.util.Tool;
  15. public class WordCountMr extends Configured implements Tool{
  16. public static void main(String[] args) throws Exception{
  17. // System.setProperty("hadoop.home.dir","E:\\zc\\大数据\\工具\\hadoop-2.5.0\\");
  18. // args = new String[]{
  19. // "/user/zc/tmp/input" ,
  20. // "/user/zc/tmp/output"+System.currentTimeMillis() };
  21. int status = new WordCountMr().run(args);
  22. System.exit(status);
  23. }
  24. // Map
  25. public static class WCMapper extends
  26. Mapper<LongWritable, Text, Text, IntWritable> {
  27. private Text mapOutPutkey = new Text();
  28. private final static IntWritable mapOutPutValue = new IntWritable(1);
  29. @Override
  30. protected void map(LongWritable key, Text value, Context context)
  31. throws IOException, InterruptedException {
  32. String linvalue = value.toString();
  33. String[] strs = linvalue.split(" ");
  34. for(String str:strs)
  35. {
  36. mapOutPutkey.set(str);
  37. context.write(mapOutPutkey, mapOutPutValue);
  38. }
  39. }
  40. }
  41. // Reduce
  42. public static class WCReduce extends
  43. Reducer<Text, IntWritable, Text, IntWritable> {
  44. private IntWritable outputvalue = new IntWritable();
  45. @Override
  46. public void reduce(Text key, Iterable<IntWritable> values,
  47. Context context) throws IOException, InterruptedException {
  48. int showsum = 0;
  49. for(IntWritable value:values){
  50. showsum += value.get();
  51. }
  52. outputvalue.set(showsum);
  53. context.write(key, outputvalue);
  54. }
  55. }
  56. /**
  57. * @param
  58. * */
  59. // Driver
  60. public int run(String[] args) throws Exception {
  61. // 1 set configuration
  62. Configuration configuration = new Configuration();
  63. // 2 create job
  64. Job job = Job.getInstance(configuration, this.getClass()
  65. .getSimpleName());
  66. job.setJarByClass(this.getClass());
  67. // 3.1 set job
  68. Path path = new Path(args[0]);
  69. FileInputFormat.addInputPath(job, path);
  70. // 3.2 set mapper
  71. job.setMapperClass(WCMapper.class);
  72. job.setMapOutputKeyClass(Text.class);
  73. job.setMapOutputValueClass(IntWritable.class);
  74. //-----------Shuffle------分区--------------------
  75. //1)partitioner分区
  76. job.setPartitionerClass(WCPartitioner.class);
  77. //reduce task number
  78. job.setNumReduceTasks(3);
  79. //-----------------Shuffle--------------------
  80. // 3.3 set reduce
  81. job.setReducerClass(WCReduce.class);
  82. job.setOutputKeyClass(Text.class);
  83. job.setOutputValueClass(IntWritable.class);
  84. //3.4 set output
  85. Path outpPath = new Path(args[1]);
  86. FileOutputFormat.setOutputPath(job, outpPath);
  87. //submit
  88. boolean b = job.waitForCompletion(true);
  89. return b?0:1;
  90. }
  91. }

WCPartitioner

  1. package com.zc.hadoop.mapreduce.partition;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Partitioner;
  5. public class WCPartitioner extends Partitioner<Text, IntWritable> {
  6. /**
  7. * @ClassName: WCPartitioner
  8. * @Description: 根据字母大小写分区,其他0,小写1,大写2(在此建立了3个分区,即会产生3个reduce任务输出到不同的文件0和1,2)
  9. * @param K k2(map输出的键), V v2(map输出的值)
  10. * @author zc
  11. */
  12. @Override
  13. public int getPartition(Text key, IntWritable value,
  14. int numPartitions) {
  15. if(null==key)
  16. return 0;
  17. String str = key.toString();
  18. if(str.matches("^[a-z]+$")){
  19. return 1;
  20. }
  21. if(str.matches("^[A-Z]+$")){
  22. return 2;
  23. }
  24. return 0;
  25. }
  26. }

在此输入正文

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注