@w460461339
2016-11-15T18:16:22.000000Z
字数 17226
阅读 1166
Hadoop
Map部分:
1、每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
2、写磁盘前,要partition,sort。如果有combiner,combine排序后数据(即顺序为 排序,分组,combine)。
3、等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
Reduce部分:
1、Reducer通过Http方式得到输出文件的分区。
2、TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。
3、排序阶段合并map输出。然后走Reduce阶段。
这个自定义排序主要靠自定义类来实现。但是,不同于之前的自定义类只要实现writable接口;需要自定义排序的类需要实现writablecomparable接口。
package day_0330_sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class Mysort {
/**
* target:
* sort the data by the first line in aes;
* if the first line is equal,sort them by second in aes.
*/
public static String IN_PATH="hdfs://192.168.145.133:9000/data/sortdata2";
public static String OUT_PATH="hdfs://192.168.145.133:9000/day0330";
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=new Job(conf,Mysort.class.getName());
FileSystem fs=FileSystem.get(new URI("hdfs://192.168.145.133:9000/"), conf);
if(fs.exists(new Path(OUT_PATH))){
fs.delete(new Path(OUT_PATH),true);
}
//1.1 set input path
FileInputFormat.setInputPaths(job,new Path(IN_PATH));
job.setInputFormatClass(TextInputFormat.class);
//1.2 set mymapper
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(MyData.class);
job.setMapOutputValueClass(LongWritable.class);
//1.3 partition
job.setPartitionerClass(HashPartitioner.class);
//job.setNumReduceTasks(1);
//1.4 在各自分区内进行排序和分组
//排序由自定义类执行
//1.5 归约combiner
//这里不设置combiner
//2.1 reduce端的排序和分组 排序都按照key进行排序
//2.2 设置reduce
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
//2.3设置输出路径和格式
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job,new Path(OUT_PATH));
//submit
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, MyData, LongWritable>{
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, MyData, LongWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.map(key, value, context);
String[] spWords=value.toString().split(" ");
Long l1=Long.parseLong(spWords[0]);
Long l2=Long.parseLong(spWords[1]);
MyData k2=new MyData(l1, l2);
context.write(k2, new LongWritable(l1));
}
}
static class MyReducer extends Reducer<MyData, LongWritable,LongWritable, LongWritable>{
@Override
protected void reduce(MyData k2, Iterable<LongWritable> v2s,
Reducer<MyData, LongWritable, LongWritable, LongWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.reduce(arg0, arg1, arg2);
// System.out.println(k2.k2+" "+k2.v2);
context.write(new LongWritable(k2.k2),new LongWritable(k2.v2));
}
}
}
//注意这里是实现WritableComparable接口
//普通的是实现Writable接口,两个要实现的接口不一样~~
class MyData implements WritableComparable<MyData>{
//注意,这里类型不能使8大基本类型啊
//要是这样的包装类型
Long k2;
Long v2;
public MyData() {
}
public MyData(long k2, long v2) {
this.k2 = k2;
this.v2 = v2;
}
@Override
public String toString() {
return "MyData [k2=" + k2 + ", v2=" + v2 + "]";
}
//编码
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeLong(k2);
out.writeLong(v2);
}
//反编码
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.k2=in.readLong();
this.v2=in.readLong();
}
@Override
public int compareTo(MyData o) {
// TODO Auto-generated method stub
//注意,自定义返回值的时候应该返回1(大于零的数)(表示前者大),-1(小于零的数)(表示后者大),0表示相同(一般不返回0)
//千万注意别返回0 啊!!!!以及不要漏掉任何一种情况
//第一列不同
//这个compareTo的结果应该也参与到分组中
//另外,像我之前那样 (this.k2-o.k2)>0?1:-1,
//相当于没有考虑this.k2==o.k2的情况(因为没有返回0)
//因此(3,3) (3,3)这样的key不会被视为相同(因为返回是-1~)
if(this.k2 != o.k2){
System.out.println("first line "+
(this.k2-o.k2)+" "+this.k2+" "+o.k2);
return (int)(this.k2-o.k2);
}else{
//第一列相同
System.out.println("second line "+
(this.k2-o.k2)+" "+this.k2+" "+o.k2);
return (int)(this.v2-o.v2);
}
//return 0;
}
@Override
public int hashCode() {
// TODO Auto-generated method stub
return this.k2.hashCode()+this.v2.hashCode();
}
//注意这里怎么写equals方法的!
@Override
public boolean equals(Object obj) {
// TODO Auto-generated method stub
//注意这个判断某个对象是属于什么类 用的关键字
if(obj instanceof MyData){
MyData o=(MyData)obj;
return (this.k2==o.k2)&&(this.v2==o.v2);
}else{
return false;
}
}
}
原始数据:
3 3
3 3
3 2
3 1
2 2
2 1
2 1
1 1
程序中,将第一列第二列一起作为k2,第二列作为v2进行传输
结果日志:
Map-Reduce Framework
Map input records=8
Map output records=8 //map输出是8条(没有分组)
Map output bytes=192
Map output materialized bytes=214
Input split bytes=107
Combine input records=0
Combine output records=0
Reduce input groups=6 //这里,将(3,3)(3,3)视为一组
//将(2,1)(2,1)视为一组
//总共就只有6组了(这是用两列数据作为分组依据的结果)
Reduce shuffle bytes=0
Reduce input records=8
Reduce output records=6
Spilled Records=16
Shuffled Maps =0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=0
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=534773760
测试仅用k2中第一列的数据作为分组依据来
package day0330_group;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MyGroupDemo {
public static String IN_PATH = "hdfs://192.168.145.133:9000/data/sortdata2";
public static String OUT_PATH = "hdfs://192.168.145.133:9000/day0330_group";
//第一次运行忘记写main了……
public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();
Job job=new Job(conf,MyGroupDemo.class.getName());
FileSystem fs=FileSystem.get(new URI("hdfs://192.168.145.133:9000/"), conf);
if(fs.exists(new Path(OUT_PATH))){
fs.delete(new Path(OUT_PATH), true);
}
//1.1 设置输入路径和格式
FileInputFormat.setInputPaths(job, new Path(IN_PATH));
job.setInputFormatClass(TextInputFormat.class);
//1.2 设置自定义mapper
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(MyData.class);
job.setMapOutputValueClass(LongWritable.class);
//1.3设置分区 采用默认的,就不写了
//1.4 设置排序,分组
//就这样调用一下
job.setGroupingComparatorClass(MyGrouper.class);
//1.5 设置combiner 不写了
//2.1 map输出传到reduce端
//2.2 设置自定义reduce
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
//2.3 设置输出格式和输出路径
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
//submit
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, MyData, LongWritable> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, MyData, LongWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// super.map(key, value, context);
String[] spWords = value.toString().split(" ");
MyData k2 = new MyData(Long.parseLong(spWords[0]), Long.parseLong(spWords[1]));
LongWritable v2 = new LongWritable(Long.parseLong(spWords[1]));
context.write(k2, v2);
}
}
static class MyReducer extends Reducer<MyData, LongWritable, LongWritable, LongWritable> {
@Override
protected void reduce(MyData k2, Iterable<LongWritable> v2s,
Reducer<MyData, LongWritable, LongWritable, LongWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.reduce(arg0, arg1, arg2);
context.write(new LongWritable(k2.first),new LongWritable(k2.second));
}
}
//这里注意是实现这个接口 生的比较器 RawComparator
static class MyGrouper implements RawComparator<MyData>{
@Override
//两个相减,等于0就是同一个组的
public int compare(MyData o1, MyData o2) {
// TODO Auto-generated method stub
return (int)(o1.first-o2.first);
}
//使用位比较,b1表示第一个数据,s1表示起始比较位,为0;
//s2表示要比较多少位,
//long是8位的,比较第一个数字,所以就是从默认起始比较位0,开始读8位,读到第一个数字,进行比较
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// TODO Auto-generated method stub
return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
}
}
}
class MyData implements WritableComparable<MyData> {
Long first;
Long second;
public MyData() {
super();
}
public MyData(Long first, Long second) {
// super();
this.first = first;
this.second = second;
}
@Override
public String toString() {
return "MyData [first=" + first + ", second=" + second + "]";
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeLong(first);
out.writeLong(second);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.first = in.readLong();
this.second = in.readLong();
}
@Override
public int compareTo(MyData o) {
// TODO Auto-generated method stub
if (this.first == o.first) {
return (int) (this.second - o.second);
} else {
return (int) (this.first - o.first);
}
}
@Override
public int hashCode() {
// TODO Auto-generated method stub
return this.first.hashCode() + this.second.hashCode();
}
@Override
public boolean equals(Object obj) {
// TODO Auto-generated method stub
if (!(obj instanceof MyData)) {
return false;
} else {
MyData o2 = (MyData) obj;
return (this.first == o2.first) && (this.second == o2.second);
}
}
}
运行结果:
Map-Reduce Framework
Map input records=8
Map output records=8 //map输出是8条数据
Map output bytes=192
Map output materialized bytes=214
Input split bytes=107
Combine input records=0
Combine output records=0
Reduce input groups=3 //按照分类,只有三组了
Reduce shuffle bytes=0
Reduce input records=8
//相当于进来时8条数据,分组之后只有3条了因为有部分value合并成(value value value)的情况了
Reduce output records=3 //按组出去,只有三条了
Spilled Records=16
Shuffled Maps =0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=0
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=534773760
这个分组,最后的结果是在同一个文件里,主要是对mapper的输出结果进行分组;
之前的分区,是会将最终的结果分别写在不同的文件里。
那个第一列升序第二列降序的,修改一下自定义类中的compareTo方法,里面把两个对象的顺序变一下就好。比如元本书
this.first-o.first变成o.first-this.first 就变升序了
@Override
public int compareTo(MyData o) {
// TODO Auto-generated method stub
if (this.first == o.first) {
return (int) (o.second - this.second);
} else {
return (int) (this.first - o.first);
}
}
结果:
1 1
2 2
2 1
3 3
3 2
3 1
求出100w个数字里面的最大值,有重复
最重要的是里面那个如何让map多执行,最后只输出一次的方法
package day0330_maxnumber;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import sun.security.krb5.internal.PAData;
public class MaxNumber {
public static String IN_PATH="hdfs://192.168.145.133:9000/data/seq100w.txt";
public static String OUT_PATH="hdfs://192.168.145.133:9000/day0330_max";
public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();
Job job=new Job(conf, MaxNumber.class.getName());
//1.1
FileInputFormat.setInputPaths(job, new Path(IN_PATH));
job.setInputFormatClass(TextInputFormat.class);
//1.2
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(NullWritable.class);
//1.3
//1.4
//1.5
//2.1
//2.2
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);
//2.3
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
//submit
job.waitForCompletion(true);
}
//妈蛋,数据中最大的就是32767,根本没有到100w,只是由100w个数字,很多重复的而已= -
//好了,想想怎么输出前100个最大值= -
//想到了……利用它自己的排序功能= -
static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
//定义了一个成员变量;
//map里面只执行业务逻辑,不进行提交
public static Long maxnumber=Long.MIN_VALUE;
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.map(key, value, context);
long a=Long.parseLong(value.toString());
if(a>maxnumber){
maxnumber=a;
}
}
//最终的context会通过这个方法进行提交,因此,只要将context.write写在这个里面,就会执行100w次map,最终只提交一次
//reduce里面的同理。
@Override
protected void cleanup(Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.cleanup(context);
context.write(new LongWritable(maxnumber), NullWritable.get());
}
}
static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
public static Long maxnumber2=Long.MIN_VALUE;
@Override
protected void reduce(LongWritable k2, Iterable<NullWritable> v2s,
Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.reduce(arg0, arg1, arg2);
long a=k2.get();
if(a>maxnumber2){
maxnumber2=a;
}
}
@Override
protected void cleanup(Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.cleanup(context);
context.write(new LongWritable(maxnumber2), NullWritable.get());
}
}
}
这个我是这么做的:
1、利用它自己的排序,将这100w个数排好
2、排序好的数输入到reduce中时,是经过分组的,我不希望分组
3、可以自定义分组方法,或者自己写一个数据类型,里面覆盖compareTo方法等
4、在MyReducer里面定义成员变量,作为计数器;当计数器>(100w-1-10)的时候,开始输出即可。
package day0330_top10;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import day0330_maxnumber.MaxNumber;
public class MyTop10 {
public static String IN_PATH = "hdfs://192.168.145.133:9000/data/seq100w.txt";
public static String OUT_PATH = "hdfs://192.168.145.133:9000/day0330_top10";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, MaxNumber.class.getName());
FileSystem fs=FileSystem.get(new URI("hdfs://192.168.145.133:9000/"), conf);
if(fs.exists(new Path(OUT_PATH))){
fs.delete(new Path(OUT_PATH), true);
}
// 1.1
FileInputFormat.setInputPaths(job, new Path(IN_PATH));
job.setInputFormatClass(TextInputFormat.class);
// 1.2
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(MyLong.class);
job.setMapOutputValueClass(NullWritable.class);
// 1.3
// 1.4
// 1.5
// 2.1
// 2.2
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(MyLong.class);
job.setOutputValueClass(NullWritable.class);
// 2.3
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
// submit
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, MyLong, NullWritable> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, MyLong, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// super.map(key, value, context);
context.write(new MyLong(Long.parseLong(value.toString())), NullWritable.get());
}
}
static class MyReduce extends Reducer<MyLong, NullWritable, MyLong, NullWritable> {
public static int count = 0;
@Override
protected void reduce(MyLong k2, Iterable<NullWritable> v2s,
Reducer<MyLong, NullWritable, MyLong, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// super.reduce(arg0, arg1, arg2);
// 1000000
if (count >= 999990) {
context.write(k2, NullWritable.get());
}
count++;
}
}
}
//突然想到可以不用自定义类,自定义一个分组类就好= - 实现那个什么生的比较接口 rawcomparable接口= -
class MyLong implements WritableComparable<MyLong> {
Long number;
public MyLong() {
// super();
}
public MyLong(long number) {
// super();
this.number = number;
}
@Override
public String toString() {
return "MyLong [number=" + number + "]";
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeLong(number);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.number=in.readLong();
}
@Override
public int compareTo(MyLong o) {
// TODO Auto-generated method stub
return ((this.number-o.number)>0)?1:-1;
}
@Override
public int hashCode() {
// TODO Auto-generated method stub
return this.number.hashCode();
}
@Override
public boolean equals(Object obj) {
// TODO Auto-generated method stub
return super.equals(obj);
}
}