@w460461339
2016-11-15T10:16:22.000000Z
字数 17226
阅读 1397
Hadoop
Map部分:
1、每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
2、写磁盘前,要partition,sort。如果有combiner,combine排序后数据(即顺序为 排序,分组,combine)。
3、等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
Reduce部分:
1、Reducer通过Http方式得到输出文件的分区。
2、TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。
3、排序阶段合并map输出。然后走Reduce阶段。
这个自定义排序主要靠自定义类来实现。但是,不同于之前的自定义类只要实现writable接口;需要自定义排序的类需要实现writablecomparable接口。
package day_0330_sort;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class Mysort {/*** target:* sort the data by the first line in aes;* if the first line is equal,sort them by second in aes.*/public static String IN_PATH="hdfs://192.168.145.133:9000/data/sortdata2";public static String OUT_PATH="hdfs://192.168.145.133:9000/day0330";public static void main(String[] args) throws Exception {Configuration conf=new Configuration();Job job=new Job(conf,Mysort.class.getName());FileSystem fs=FileSystem.get(new URI("hdfs://192.168.145.133:9000/"), conf);if(fs.exists(new Path(OUT_PATH))){fs.delete(new Path(OUT_PATH),true);}//1.1 set input pathFileInputFormat.setInputPaths(job,new Path(IN_PATH));job.setInputFormatClass(TextInputFormat.class);//1.2 set mymapperjob.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(MyData.class);job.setMapOutputValueClass(LongWritable.class);//1.3 partitionjob.setPartitionerClass(HashPartitioner.class);//job.setNumReduceTasks(1);//1.4 在各自分区内进行排序和分组//排序由自定义类执行//1.5 归约combiner//这里不设置combiner//2.1 reduce端的排序和分组 排序都按照key进行排序//2.2 设置reducejob.setReducerClass(MyReducer.class);job.setOutputKeyClass(LongWritable.class);job.setOutputValueClass(LongWritable.class);//2.3设置输出路径和格式job.setOutputFormatClass(TextOutputFormat.class);FileOutputFormat.setOutputPath(job,new Path(OUT_PATH));//submitjob.waitForCompletion(true);}static class MyMapper extends Mapper<LongWritable, Text, MyData, LongWritable>{@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, MyData, LongWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stub//super.map(key, value, context);String[] spWords=value.toString().split(" ");Long l1=Long.parseLong(spWords[0]);Long l2=Long.parseLong(spWords[1]);MyData k2=new MyData(l1, l2);context.write(k2, new LongWritable(l1));}}static class MyReducer extends Reducer<MyData, LongWritable,LongWritable, LongWritable>{@Overrideprotected void reduce(MyData k2, Iterable<LongWritable> v2s,Reducer<MyData, LongWritable, LongWritable, LongWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stub//super.reduce(arg0, arg1, arg2);// System.out.println(k2.k2+" "+k2.v2);context.write(new LongWritable(k2.k2),new LongWritable(k2.v2));}}}//注意这里是实现WritableComparable接口//普通的是实现Writable接口,两个要实现的接口不一样~~class MyData implements WritableComparable<MyData>{//注意,这里类型不能使8大基本类型啊//要是这样的包装类型Long k2;Long v2;public MyData() {}public MyData(long k2, long v2) {this.k2 = k2;this.v2 = v2;}@Overridepublic String toString() {return "MyData [k2=" + k2 + ", v2=" + v2 + "]";}//编码@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeLong(k2);out.writeLong(v2);}//反编码@Overridepublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubthis.k2=in.readLong();this.v2=in.readLong();}@Overridepublic int compareTo(MyData o) {// TODO Auto-generated method stub//注意,自定义返回值的时候应该返回1(大于零的数)(表示前者大),-1(小于零的数)(表示后者大),0表示相同(一般不返回0)//千万注意别返回0 啊!!!!以及不要漏掉任何一种情况//第一列不同//这个compareTo的结果应该也参与到分组中//另外,像我之前那样 (this.k2-o.k2)>0?1:-1,//相当于没有考虑this.k2==o.k2的情况(因为没有返回0)//因此(3,3) (3,3)这样的key不会被视为相同(因为返回是-1~)if(this.k2 != o.k2){System.out.println("first line "+(this.k2-o.k2)+" "+this.k2+" "+o.k2);return (int)(this.k2-o.k2);}else{//第一列相同System.out.println("second line "+(this.k2-o.k2)+" "+this.k2+" "+o.k2);return (int)(this.v2-o.v2);}//return 0;}@Overridepublic int hashCode() {// TODO Auto-generated method stubreturn this.k2.hashCode()+this.v2.hashCode();}//注意这里怎么写equals方法的!@Overridepublic boolean equals(Object obj) {// TODO Auto-generated method stub//注意这个判断某个对象是属于什么类 用的关键字if(obj instanceof MyData){MyData o=(MyData)obj;return (this.k2==o.k2)&&(this.v2==o.v2);}else{return false;}}}
原始数据:
3 3
3 3
3 2
3 1
2 2
2 1
2 1
1 1
程序中,将第一列第二列一起作为k2,第二列作为v2进行传输
结果日志:
Map-Reduce Framework
Map input records=8
Map output records=8 //map输出是8条(没有分组)
Map output bytes=192
Map output materialized bytes=214
Input split bytes=107
Combine input records=0
Combine output records=0
Reduce input groups=6 //这里,将(3,3)(3,3)视为一组
//将(2,1)(2,1)视为一组
//总共就只有6组了(这是用两列数据作为分组依据的结果)
Reduce shuffle bytes=0
Reduce input records=8
Reduce output records=6
Spilled Records=16
Shuffled Maps =0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=0
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=534773760
测试仅用k2中第一列的数据作为分组依据来
package day0330_group;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.mapred.Counters.Group;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MyGroupDemo {public static String IN_PATH = "hdfs://192.168.145.133:9000/data/sortdata2";public static String OUT_PATH = "hdfs://192.168.145.133:9000/day0330_group";//第一次运行忘记写main了……public static void main(String[] args) throws Exception{Configuration conf=new Configuration();Job job=new Job(conf,MyGroupDemo.class.getName());FileSystem fs=FileSystem.get(new URI("hdfs://192.168.145.133:9000/"), conf);if(fs.exists(new Path(OUT_PATH))){fs.delete(new Path(OUT_PATH), true);}//1.1 设置输入路径和格式FileInputFormat.setInputPaths(job, new Path(IN_PATH));job.setInputFormatClass(TextInputFormat.class);//1.2 设置自定义mapperjob.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(MyData.class);job.setMapOutputValueClass(LongWritable.class);//1.3设置分区 采用默认的,就不写了//1.4 设置排序,分组//就这样调用一下job.setGroupingComparatorClass(MyGrouper.class);//1.5 设置combiner 不写了//2.1 map输出传到reduce端//2.2 设置自定义reducejob.setReducerClass(MyReducer.class);job.setOutputKeyClass(LongWritable.class);job.setOutputValueClass(LongWritable.class);//2.3 设置输出格式和输出路径job.setOutputFormatClass(TextOutputFormat.class);FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));//submitjob.waitForCompletion(true);}static class MyMapper extends Mapper<LongWritable, Text, MyData, LongWritable> {@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, MyData, LongWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stub// super.map(key, value, context);String[] spWords = value.toString().split(" ");MyData k2 = new MyData(Long.parseLong(spWords[0]), Long.parseLong(spWords[1]));LongWritable v2 = new LongWritable(Long.parseLong(spWords[1]));context.write(k2, v2);}}static class MyReducer extends Reducer<MyData, LongWritable, LongWritable, LongWritable> {@Overrideprotected void reduce(MyData k2, Iterable<LongWritable> v2s,Reducer<MyData, LongWritable, LongWritable, LongWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stub//super.reduce(arg0, arg1, arg2);context.write(new LongWritable(k2.first),new LongWritable(k2.second));}}//这里注意是实现这个接口 生的比较器 RawComparatorstatic class MyGrouper implements RawComparator<MyData>{@Override//两个相减,等于0就是同一个组的public int compare(MyData o1, MyData o2) {// TODO Auto-generated method stubreturn (int)(o1.first-o2.first);}//使用位比较,b1表示第一个数据,s1表示起始比较位,为0;//s2表示要比较多少位,//long是8位的,比较第一个数字,所以就是从默认起始比较位0,开始读8位,读到第一个数字,进行比较@Overridepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {// TODO Auto-generated method stubreturn WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);}}}class MyData implements WritableComparable<MyData> {Long first;Long second;public MyData() {super();}public MyData(Long first, Long second) {// super();this.first = first;this.second = second;}@Overridepublic String toString() {return "MyData [first=" + first + ", second=" + second + "]";}@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeLong(first);out.writeLong(second);}@Overridepublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubthis.first = in.readLong();this.second = in.readLong();}@Overridepublic int compareTo(MyData o) {// TODO Auto-generated method stubif (this.first == o.first) {return (int) (this.second - o.second);} else {return (int) (this.first - o.first);}}@Overridepublic int hashCode() {// TODO Auto-generated method stubreturn this.first.hashCode() + this.second.hashCode();}@Overridepublic boolean equals(Object obj) {// TODO Auto-generated method stubif (!(obj instanceof MyData)) {return false;} else {MyData o2 = (MyData) obj;return (this.first == o2.first) && (this.second == o2.second);}}}
运行结果:
Map-Reduce Framework
Map input records=8
Map output records=8 //map输出是8条数据
Map output bytes=192
Map output materialized bytes=214
Input split bytes=107
Combine input records=0
Combine output records=0
Reduce input groups=3 //按照分类,只有三组了
Reduce shuffle bytes=0
Reduce input records=8
//相当于进来时8条数据,分组之后只有3条了因为有部分value合并成(value value value)的情况了
Reduce output records=3 //按组出去,只有三条了
Spilled Records=16
Shuffled Maps =0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=0
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=534773760
这个分组,最后的结果是在同一个文件里,主要是对mapper的输出结果进行分组;
之前的分区,是会将最终的结果分别写在不同的文件里。
那个第一列升序第二列降序的,修改一下自定义类中的compareTo方法,里面把两个对象的顺序变一下就好。比如元本书
this.first-o.first变成o.first-this.first 就变升序了
@Overridepublic int compareTo(MyData o) {// TODO Auto-generated method stubif (this.first == o.first) {return (int) (o.second - this.second);} else {return (int) (this.first - o.first);}}
结果:
1 1
2 2
2 1
3 3
3 2
3 1
求出100w个数字里面的最大值,有重复
最重要的是里面那个如何让map多执行,最后只输出一次的方法
package day0330_maxnumber;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import sun.security.krb5.internal.PAData;public class MaxNumber {public static String IN_PATH="hdfs://192.168.145.133:9000/data/seq100w.txt";public static String OUT_PATH="hdfs://192.168.145.133:9000/day0330_max";public static void main(String[] args) throws Exception{Configuration conf=new Configuration();Job job=new Job(conf, MaxNumber.class.getName());//1.1FileInputFormat.setInputPaths(job, new Path(IN_PATH));job.setInputFormatClass(TextInputFormat.class);//1.2job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(NullWritable.class);//1.3//1.4//1.5//2.1//2.2job.setReducerClass(MyReducer.class);job.setOutputKeyClass(LongWritable.class);job.setOutputValueClass(NullWritable.class);//2.3job.setOutputFormatClass(TextOutputFormat.class);FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));//submitjob.waitForCompletion(true);}//妈蛋,数据中最大的就是32767,根本没有到100w,只是由100w个数字,很多重复的而已= -//好了,想想怎么输出前100个最大值= -//想到了……利用它自己的排序功能= -static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{//定义了一个成员变量;//map里面只执行业务逻辑,不进行提交public static Long maxnumber=Long.MIN_VALUE;@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stub//super.map(key, value, context);long a=Long.parseLong(value.toString());if(a>maxnumber){maxnumber=a;}}//最终的context会通过这个方法进行提交,因此,只要将context.write写在这个里面,就会执行100w次map,最终只提交一次//reduce里面的同理。@Overrideprotected void cleanup(Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stub//super.cleanup(context);context.write(new LongWritable(maxnumber), NullWritable.get());}}static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{public static Long maxnumber2=Long.MIN_VALUE;@Overrideprotected void reduce(LongWritable k2, Iterable<NullWritable> v2s,Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stub//super.reduce(arg0, arg1, arg2);long a=k2.get();if(a>maxnumber2){maxnumber2=a;}}@Overrideprotected void cleanup(Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stub//super.cleanup(context);context.write(new LongWritable(maxnumber2), NullWritable.get());}}}
这个我是这么做的:
1、利用它自己的排序,将这100w个数排好
2、排序好的数输入到reduce中时,是经过分组的,我不希望分组
3、可以自定义分组方法,或者自己写一个数据类型,里面覆盖compareTo方法等
4、在MyReducer里面定义成员变量,作为计数器;当计数器>(100w-1-10)的时候,开始输出即可。
package day0330_top10;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import day0330_maxnumber.MaxNumber;public class MyTop10 {public static String IN_PATH = "hdfs://192.168.145.133:9000/data/seq100w.txt";public static String OUT_PATH = "hdfs://192.168.145.133:9000/day0330_top10";public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = new Job(conf, MaxNumber.class.getName());FileSystem fs=FileSystem.get(new URI("hdfs://192.168.145.133:9000/"), conf);if(fs.exists(new Path(OUT_PATH))){fs.delete(new Path(OUT_PATH), true);}// 1.1FileInputFormat.setInputPaths(job, new Path(IN_PATH));job.setInputFormatClass(TextInputFormat.class);// 1.2job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(MyLong.class);job.setMapOutputValueClass(NullWritable.class);// 1.3// 1.4// 1.5// 2.1// 2.2job.setReducerClass(MyReduce.class);job.setOutputKeyClass(MyLong.class);job.setOutputValueClass(NullWritable.class);// 2.3job.setOutputFormatClass(TextOutputFormat.class);FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));// submitjob.waitForCompletion(true);}static class MyMapper extends Mapper<LongWritable, Text, MyLong, NullWritable> {@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, MyLong, NullWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stub// super.map(key, value, context);context.write(new MyLong(Long.parseLong(value.toString())), NullWritable.get());}}static class MyReduce extends Reducer<MyLong, NullWritable, MyLong, NullWritable> {public static int count = 0;@Overrideprotected void reduce(MyLong k2, Iterable<NullWritable> v2s,Reducer<MyLong, NullWritable, MyLong, NullWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stub// super.reduce(arg0, arg1, arg2);// 1000000if (count >= 999990) {context.write(k2, NullWritable.get());}count++;}}}//突然想到可以不用自定义类,自定义一个分组类就好= - 实现那个什么生的比较接口 rawcomparable接口= -class MyLong implements WritableComparable<MyLong> {Long number;public MyLong() {// super();}public MyLong(long number) {// super();this.number = number;}@Overridepublic String toString() {return "MyLong [number=" + number + "]";}@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeLong(number);}@Overridepublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubthis.number=in.readLong();}@Overridepublic int compareTo(MyLong o) {// TODO Auto-generated method stubreturn ((this.number-o.number)>0)?1:-1;}@Overridepublic int hashCode() {// TODO Auto-generated method stubreturn this.number.hashCode();}@Overridepublic boolean equals(Object obj) {// TODO Auto-generated method stubreturn super.equals(obj);}}