[关闭]
@w460461339 2016-11-15T18:16:22.000000Z 字数 17226 阅读 1166

Hadoop学习Day7(自定义排序,自定义分组,以及其他算法)

Hadoop


1、shuffle阶段

Map部分:

1、每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
2、写磁盘前,要partition,sort。如果有combiner,combine排序后数据(即顺序为 排序,分组,combine)。
3、等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。

Reduce部分:

1、Reducer通过Http方式得到输出文件的分区。
2、TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。
3、排序阶段合并map输出。然后走Reduce阶段。

2、自定义排序

这个自定义排序主要靠自定义类来实现。但是,不同于之前的自定义类只要实现writable接口;需要自定义排序的类需要实现writablecomparable接口。

  1. package day_0330_sort;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.FileSystem;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.io.LongWritable;
  10. import org.apache.hadoop.io.NullWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.io.WritableComparable;
  13. import org.apache.hadoop.io.WritableComparator;
  14. import org.apache.hadoop.mapreduce.Job;
  15. import org.apache.hadoop.mapreduce.Mapper;
  16. import org.apache.hadoop.mapreduce.Reducer;
  17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  18. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  20. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  21. import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
  22. public class Mysort {
  23. /**
  24. * target:
  25. * sort the data by the first line in aes;
  26. * if the first line is equal,sort them by second in aes.
  27. */
  28. public static String IN_PATH="hdfs://192.168.145.133:9000/data/sortdata2";
  29. public static String OUT_PATH="hdfs://192.168.145.133:9000/day0330";
  30. public static void main(String[] args) throws Exception {
  31. Configuration conf=new Configuration();
  32. Job job=new Job(conf,Mysort.class.getName());
  33. FileSystem fs=FileSystem.get(new URI("hdfs://192.168.145.133:9000/"), conf);
  34. if(fs.exists(new Path(OUT_PATH))){
  35. fs.delete(new Path(OUT_PATH),true);
  36. }
  37. //1.1 set input path
  38. FileInputFormat.setInputPaths(job,new Path(IN_PATH));
  39. job.setInputFormatClass(TextInputFormat.class);
  40. //1.2 set mymapper
  41. job.setMapperClass(MyMapper.class);
  42. job.setMapOutputKeyClass(MyData.class);
  43. job.setMapOutputValueClass(LongWritable.class);
  44. //1.3 partition
  45. job.setPartitionerClass(HashPartitioner.class);
  46. //job.setNumReduceTasks(1);
  47. //1.4 在各自分区内进行排序和分组
  48. //排序由自定义类执行
  49. //1.5 归约combiner
  50. //这里不设置combiner
  51. //2.1 reduce端的排序和分组 排序都按照key进行排序
  52. //2.2 设置reduce
  53. job.setReducerClass(MyReducer.class);
  54. job.setOutputKeyClass(LongWritable.class);
  55. job.setOutputValueClass(LongWritable.class);
  56. //2.3设置输出路径和格式
  57. job.setOutputFormatClass(TextOutputFormat.class);
  58. FileOutputFormat.setOutputPath(job,new Path(OUT_PATH));
  59. //submit
  60. job.waitForCompletion(true);
  61. }
  62. static class MyMapper extends Mapper<LongWritable, Text, MyData, LongWritable>{
  63. @Override
  64. protected void map(LongWritable key, Text value,
  65. Mapper<LongWritable, Text, MyData, LongWritable>.Context context)
  66. throws IOException, InterruptedException {
  67. // TODO Auto-generated method stub
  68. //super.map(key, value, context);
  69. String[] spWords=value.toString().split(" ");
  70. Long l1=Long.parseLong(spWords[0]);
  71. Long l2=Long.parseLong(spWords[1]);
  72. MyData k2=new MyData(l1, l2);
  73. context.write(k2, new LongWritable(l1));
  74. }
  75. }
  76. static class MyReducer extends Reducer<MyData, LongWritable,LongWritable, LongWritable>{
  77. @Override
  78. protected void reduce(MyData k2, Iterable<LongWritable> v2s,
  79. Reducer<MyData, LongWritable, LongWritable, LongWritable>.Context context)
  80. throws IOException, InterruptedException {
  81. // TODO Auto-generated method stub
  82. //super.reduce(arg0, arg1, arg2);
  83. // System.out.println(k2.k2+" "+k2.v2);
  84. context.write(new LongWritable(k2.k2),new LongWritable(k2.v2));
  85. }
  86. }
  87. }
  88. //注意这里是实现WritableComparable接口
  89. //普通的是实现Writable接口,两个要实现的接口不一样~~
  90. class MyData implements WritableComparable<MyData>{
  91. //注意,这里类型不能使8大基本类型啊
  92. //要是这样的包装类型
  93. Long k2;
  94. Long v2;
  95. public MyData() {
  96. }
  97. public MyData(long k2, long v2) {
  98. this.k2 = k2;
  99. this.v2 = v2;
  100. }
  101. @Override
  102. public String toString() {
  103. return "MyData [k2=" + k2 + ", v2=" + v2 + "]";
  104. }
  105. //编码
  106. @Override
  107. public void write(DataOutput out) throws IOException {
  108. // TODO Auto-generated method stub
  109. out.writeLong(k2);
  110. out.writeLong(v2);
  111. }
  112. //反编码
  113. @Override
  114. public void readFields(DataInput in) throws IOException {
  115. // TODO Auto-generated method stub
  116. this.k2=in.readLong();
  117. this.v2=in.readLong();
  118. }
  119. @Override
  120. public int compareTo(MyData o) {
  121. // TODO Auto-generated method stub
  122. //注意,自定义返回值的时候应该返回1(大于零的数)(表示前者大),-1(小于零的数)(表示后者大),0表示相同(一般不返回0)
  123. //千万注意别返回0 啊!!!!以及不要漏掉任何一种情况
  124. //第一列不同
  125. //这个compareTo的结果应该也参与到分组中
  126. //另外,像我之前那样 (this.k2-o.k2)>0?1:-1,
  127. //相当于没有考虑this.k2==o.k2的情况(因为没有返回0)
  128. //因此(3,3) (3,3)这样的key不会被视为相同(因为返回是-1~)
  129. if(this.k2 != o.k2){
  130. System.out.println("first line "+
  131. (this.k2-o.k2)+" "+this.k2+" "+o.k2);
  132. return (int)(this.k2-o.k2);
  133. }else{
  134. //第一列相同
  135. System.out.println("second line "+
  136. (this.k2-o.k2)+" "+this.k2+" "+o.k2);
  137. return (int)(this.v2-o.v2);
  138. }
  139. //return 0;
  140. }
  141. @Override
  142. public int hashCode() {
  143. // TODO Auto-generated method stub
  144. return this.k2.hashCode()+this.v2.hashCode();
  145. }
  146. //注意这里怎么写equals方法的!
  147. @Override
  148. public boolean equals(Object obj) {
  149. // TODO Auto-generated method stub
  150. //注意这个判断某个对象是属于什么类 用的关键字
  151. if(obj instanceof MyData){
  152. MyData o=(MyData)obj;
  153. return (this.k2==o.k2)&&(this.v2==o.v2);
  154. }else{
  155. return false;
  156. }
  157. }
  158. }

3、自定义分组

原始数据:

3 3
3 3
3 2
3 1
2 2
2 1
2 1
1 1

程序中,将第一列第二列一起作为k2,第二列作为v2进行传输
结果日志:

Map-Reduce Framework
    Map input records=8
    Map output records=8 //map输出是8条(没有分组)
    Map output bytes=192
    Map output materialized bytes=214
    Input split bytes=107
    Combine input records=0
    Combine output records=0
    Reduce input groups=6 //这里,将(3,3)(3,3)视为一组
                          //将(2,1)(2,1)视为一组
          //总共就只有6组了(这是用两列数据作为分组依据的结果)
    Reduce shuffle bytes=0
    Reduce input records=8
    Reduce output records=6
    Spilled Records=16
    Shuffled Maps =0
    Failed Shuffles=0
    Merged Map outputs=0
    GC time elapsed (ms)=0
    CPU time spent (ms)=0
    Physical memory (bytes) snapshot=0
    Virtual memory (bytes) snapshot=0
    Total committed heap usage (bytes)=534773760

测试仅用k2中第一列的数据作为分组依据来

  1. package day0330_group;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.FileSystem;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.io.LongWritable;
  10. import org.apache.hadoop.io.RawComparator;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.io.WritableComparable;
  13. import org.apache.hadoop.io.WritableComparator;
  14. import org.apache.hadoop.mapred.Counters.Group;
  15. import org.apache.hadoop.mapreduce.Job;
  16. import org.apache.hadoop.mapreduce.Mapper;
  17. import org.apache.hadoop.mapreduce.Reducer;
  18. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  19. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  20. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  21. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  22. public class MyGroupDemo {
  23. public static String IN_PATH = "hdfs://192.168.145.133:9000/data/sortdata2";
  24. public static String OUT_PATH = "hdfs://192.168.145.133:9000/day0330_group";
  25. //第一次运行忘记写main了……
  26. public static void main(String[] args) throws Exception{
  27. Configuration conf=new Configuration();
  28. Job job=new Job(conf,MyGroupDemo.class.getName());
  29. FileSystem fs=FileSystem.get(new URI("hdfs://192.168.145.133:9000/"), conf);
  30. if(fs.exists(new Path(OUT_PATH))){
  31. fs.delete(new Path(OUT_PATH), true);
  32. }
  33. //1.1 设置输入路径和格式
  34. FileInputFormat.setInputPaths(job, new Path(IN_PATH));
  35. job.setInputFormatClass(TextInputFormat.class);
  36. //1.2 设置自定义mapper
  37. job.setMapperClass(MyMapper.class);
  38. job.setMapOutputKeyClass(MyData.class);
  39. job.setMapOutputValueClass(LongWritable.class);
  40. //1.3设置分区 采用默认的,就不写了
  41. //1.4 设置排序,分组
  42. //就这样调用一下
  43. job.setGroupingComparatorClass(MyGrouper.class);
  44. //1.5 设置combiner 不写了
  45. //2.1 map输出传到reduce端
  46. //2.2 设置自定义reduce
  47. job.setReducerClass(MyReducer.class);
  48. job.setOutputKeyClass(LongWritable.class);
  49. job.setOutputValueClass(LongWritable.class);
  50. //2.3 设置输出格式和输出路径
  51. job.setOutputFormatClass(TextOutputFormat.class);
  52. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  53. //submit
  54. job.waitForCompletion(true);
  55. }
  56. static class MyMapper extends Mapper<LongWritable, Text, MyData, LongWritable> {
  57. @Override
  58. protected void map(LongWritable key, Text value,
  59. Mapper<LongWritable, Text, MyData, LongWritable>.Context context)
  60. throws IOException, InterruptedException {
  61. // TODO Auto-generated method stub
  62. // super.map(key, value, context);
  63. String[] spWords = value.toString().split(" ");
  64. MyData k2 = new MyData(Long.parseLong(spWords[0]), Long.parseLong(spWords[1]));
  65. LongWritable v2 = new LongWritable(Long.parseLong(spWords[1]));
  66. context.write(k2, v2);
  67. }
  68. }
  69. static class MyReducer extends Reducer<MyData, LongWritable, LongWritable, LongWritable> {
  70. @Override
  71. protected void reduce(MyData k2, Iterable<LongWritable> v2s,
  72. Reducer<MyData, LongWritable, LongWritable, LongWritable>.Context context)
  73. throws IOException, InterruptedException {
  74. // TODO Auto-generated method stub
  75. //super.reduce(arg0, arg1, arg2);
  76. context.write(new LongWritable(k2.first),new LongWritable(k2.second));
  77. }
  78. }
  79. //这里注意是实现这个接口 生的比较器 RawComparator
  80. static class MyGrouper implements RawComparator<MyData>{
  81. @Override
  82. //两个相减,等于0就是同一个组的
  83. public int compare(MyData o1, MyData o2) {
  84. // TODO Auto-generated method stub
  85. return (int)(o1.first-o2.first);
  86. }
  87. //使用位比较,b1表示第一个数据,s1表示起始比较位,为0;
  88. //s2表示要比较多少位,
  89. //long是8位的,比较第一个数字,所以就是从默认起始比较位0,开始读8位,读到第一个数字,进行比较
  90. @Override
  91. public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  92. // TODO Auto-generated method stub
  93. return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
  94. }
  95. }
  96. }
  97. class MyData implements WritableComparable<MyData> {
  98. Long first;
  99. Long second;
  100. public MyData() {
  101. super();
  102. }
  103. public MyData(Long first, Long second) {
  104. // super();
  105. this.first = first;
  106. this.second = second;
  107. }
  108. @Override
  109. public String toString() {
  110. return "MyData [first=" + first + ", second=" + second + "]";
  111. }
  112. @Override
  113. public void write(DataOutput out) throws IOException {
  114. // TODO Auto-generated method stub
  115. out.writeLong(first);
  116. out.writeLong(second);
  117. }
  118. @Override
  119. public void readFields(DataInput in) throws IOException {
  120. // TODO Auto-generated method stub
  121. this.first = in.readLong();
  122. this.second = in.readLong();
  123. }
  124. @Override
  125. public int compareTo(MyData o) {
  126. // TODO Auto-generated method stub
  127. if (this.first == o.first) {
  128. return (int) (this.second - o.second);
  129. } else {
  130. return (int) (this.first - o.first);
  131. }
  132. }
  133. @Override
  134. public int hashCode() {
  135. // TODO Auto-generated method stub
  136. return this.first.hashCode() + this.second.hashCode();
  137. }
  138. @Override
  139. public boolean equals(Object obj) {
  140. // TODO Auto-generated method stub
  141. if (!(obj instanceof MyData)) {
  142. return false;
  143. } else {
  144. MyData o2 = (MyData) obj;
  145. return (this.first == o2.first) && (this.second == o2.second);
  146. }
  147. }
  148. }

运行结果:

Map-Reduce Framework
    Map input records=8
    Map output records=8  //map输出是8条数据
    Map output bytes=192
    Map output materialized bytes=214
    Input split bytes=107
    Combine input records=0
    Combine output records=0
    Reduce input groups=3    //按照分类,只有三组了
    Reduce shuffle bytes=0
    Reduce input records=8
    //相当于进来时8条数据,分组之后只有3条了因为有部分value合并成(value value value)的情况了
    Reduce output records=3 //按组出去,只有三条了
    Spilled Records=16
    Shuffled Maps =0
    Failed Shuffles=0
    Merged Map outputs=0
    GC time elapsed (ms)=0
    CPU time spent (ms)=0
    Physical memory (bytes) snapshot=0
    Virtual memory (bytes) snapshot=0
    Total committed heap usage (bytes)=534773760

这个分组,最后的结果是在同一个文件里,主要是对mapper的输出结果进行分组;
之前的分区,是会将最终的结果分别写在不同的文件里。

那个第一列升序第二列降序的,修改一下自定义类中的compareTo方法,里面把两个对象的顺序变一下就好。比如元本书

this.first-o.first变成o.first-this.first 就变升序了
  1. @Override
  2. public int compareTo(MyData o) {
  3. // TODO Auto-generated method stub
  4. if (this.first == o.first) {
  5. return (int) (o.second - this.second);
  6. } else {
  7. return (int) (this.first - o.first);
  8. }
  9. }

结果:

1   1
2   2
2   1
3   3
3   2
3   1

4、最大值

求出100w个数字里面的最大值,有重复
最重要的是里面那个如何让map多执行,最后只输出一次的方法

  1. package day0330_maxnumber;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.NullWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  15. import sun.security.krb5.internal.PAData;
  16. public class MaxNumber {
  17. public static String IN_PATH="hdfs://192.168.145.133:9000/data/seq100w.txt";
  18. public static String OUT_PATH="hdfs://192.168.145.133:9000/day0330_max";
  19. public static void main(String[] args) throws Exception{
  20. Configuration conf=new Configuration();
  21. Job job=new Job(conf, MaxNumber.class.getName());
  22. //1.1
  23. FileInputFormat.setInputPaths(job, new Path(IN_PATH));
  24. job.setInputFormatClass(TextInputFormat.class);
  25. //1.2
  26. job.setMapperClass(MyMapper.class);
  27. job.setMapOutputKeyClass(LongWritable.class);
  28. job.setMapOutputValueClass(NullWritable.class);
  29. //1.3
  30. //1.4
  31. //1.5
  32. //2.1
  33. //2.2
  34. job.setReducerClass(MyReducer.class);
  35. job.setOutputKeyClass(LongWritable.class);
  36. job.setOutputValueClass(NullWritable.class);
  37. //2.3
  38. job.setOutputFormatClass(TextOutputFormat.class);
  39. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  40. //submit
  41. job.waitForCompletion(true);
  42. }
  43. //妈蛋,数据中最大的就是32767,根本没有到100w,只是由100w个数字,很多重复的而已= -
  44. //好了,想想怎么输出前100个最大值= -
  45. //想到了……利用它自己的排序功能= -
  46. static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
  47. //定义了一个成员变量;
  48. //map里面只执行业务逻辑,不进行提交
  49. public static Long maxnumber=Long.MIN_VALUE;
  50. @Override
  51. protected void map(LongWritable key, Text value,
  52. Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context)
  53. throws IOException, InterruptedException {
  54. // TODO Auto-generated method stub
  55. //super.map(key, value, context);
  56. long a=Long.parseLong(value.toString());
  57. if(a>maxnumber){
  58. maxnumber=a;
  59. }
  60. }
  61. //最终的context会通过这个方法进行提交,因此,只要将context.write写在这个里面,就会执行100w次map,最终只提交一次
  62. //reduce里面的同理。
  63. @Override
  64. protected void cleanup(Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context)
  65. throws IOException, InterruptedException {
  66. // TODO Auto-generated method stub
  67. //super.cleanup(context);
  68. context.write(new LongWritable(maxnumber), NullWritable.get());
  69. }
  70. }
  71. static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
  72. public static Long maxnumber2=Long.MIN_VALUE;
  73. @Override
  74. protected void reduce(LongWritable k2, Iterable<NullWritable> v2s,
  75. Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)
  76. throws IOException, InterruptedException {
  77. // TODO Auto-generated method stub
  78. //super.reduce(arg0, arg1, arg2);
  79. long a=k2.get();
  80. if(a>maxnumber2){
  81. maxnumber2=a;
  82. }
  83. }
  84. @Override
  85. protected void cleanup(Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)
  86. throws IOException, InterruptedException {
  87. // TODO Auto-generated method stub
  88. //super.cleanup(context);
  89. context.write(new LongWritable(maxnumber2), NullWritable.get());
  90. }
  91. }
  92. }

5、求出最大的前10个数

这个我是这么做的:

1、利用它自己的排序,将这100w个数排好
2、排序好的数输入到reduce中时,是经过分组的,我不希望分组
3、可以自定义分组方法,或者自己写一个数据类型,里面覆盖compareTo方法等
4、在MyReducer里面定义成员变量,作为计数器;当计数器>(100w-1-10)的时候,开始输出即可。
  1. package day0330_top10;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.FileSystem;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.io.LongWritable;
  10. import org.apache.hadoop.io.NullWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.io.WritableComparable;
  13. import org.apache.hadoop.mapreduce.Job;
  14. import org.apache.hadoop.mapreduce.Mapper;
  15. import org.apache.hadoop.mapreduce.Reducer;
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  20. import day0330_maxnumber.MaxNumber;
  21. public class MyTop10 {
  22. public static String IN_PATH = "hdfs://192.168.145.133:9000/data/seq100w.txt";
  23. public static String OUT_PATH = "hdfs://192.168.145.133:9000/day0330_top10";
  24. public static void main(String[] args) throws Exception {
  25. Configuration conf = new Configuration();
  26. Job job = new Job(conf, MaxNumber.class.getName());
  27. FileSystem fs=FileSystem.get(new URI("hdfs://192.168.145.133:9000/"), conf);
  28. if(fs.exists(new Path(OUT_PATH))){
  29. fs.delete(new Path(OUT_PATH), true);
  30. }
  31. // 1.1
  32. FileInputFormat.setInputPaths(job, new Path(IN_PATH));
  33. job.setInputFormatClass(TextInputFormat.class);
  34. // 1.2
  35. job.setMapperClass(MyMapper.class);
  36. job.setMapOutputKeyClass(MyLong.class);
  37. job.setMapOutputValueClass(NullWritable.class);
  38. // 1.3
  39. // 1.4
  40. // 1.5
  41. // 2.1
  42. // 2.2
  43. job.setReducerClass(MyReduce.class);
  44. job.setOutputKeyClass(MyLong.class);
  45. job.setOutputValueClass(NullWritable.class);
  46. // 2.3
  47. job.setOutputFormatClass(TextOutputFormat.class);
  48. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  49. // submit
  50. job.waitForCompletion(true);
  51. }
  52. static class MyMapper extends Mapper<LongWritable, Text, MyLong, NullWritable> {
  53. @Override
  54. protected void map(LongWritable key, Text value,
  55. Mapper<LongWritable, Text, MyLong, NullWritable>.Context context)
  56. throws IOException, InterruptedException {
  57. // TODO Auto-generated method stub
  58. // super.map(key, value, context);
  59. context.write(new MyLong(Long.parseLong(value.toString())), NullWritable.get());
  60. }
  61. }
  62. static class MyReduce extends Reducer<MyLong, NullWritable, MyLong, NullWritable> {
  63. public static int count = 0;
  64. @Override
  65. protected void reduce(MyLong k2, Iterable<NullWritable> v2s,
  66. Reducer<MyLong, NullWritable, MyLong, NullWritable>.Context context)
  67. throws IOException, InterruptedException {
  68. // TODO Auto-generated method stub
  69. // super.reduce(arg0, arg1, arg2);
  70. // 1000000
  71. if (count >= 999990) {
  72. context.write(k2, NullWritable.get());
  73. }
  74. count++;
  75. }
  76. }
  77. }
  78. //突然想到可以不用自定义类,自定义一个分组类就好= - 实现那个什么生的比较接口 rawcomparable接口= -
  79. class MyLong implements WritableComparable<MyLong> {
  80. Long number;
  81. public MyLong() {
  82. // super();
  83. }
  84. public MyLong(long number) {
  85. // super();
  86. this.number = number;
  87. }
  88. @Override
  89. public String toString() {
  90. return "MyLong [number=" + number + "]";
  91. }
  92. @Override
  93. public void write(DataOutput out) throws IOException {
  94. // TODO Auto-generated method stub
  95. out.writeLong(number);
  96. }
  97. @Override
  98. public void readFields(DataInput in) throws IOException {
  99. // TODO Auto-generated method stub
  100. this.number=in.readLong();
  101. }
  102. @Override
  103. public int compareTo(MyLong o) {
  104. // TODO Auto-generated method stub
  105. return ((this.number-o.number)>0)?1:-1;
  106. }
  107. @Override
  108. public int hashCode() {
  109. // TODO Auto-generated method stub
  110. return this.number.hashCode();
  111. }
  112. @Override
  113. public boolean equals(Object obj) {
  114. // TODO Auto-generated method stub
  115. return super.equals(obj);
  116. }
  117. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注