@ghimi
2018-05-25T12:50:20.000000Z
字数 4369
阅读 859
MapReduce 天气
package mapreduce.tq2;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.util.Date;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Main {//分组比较器/** 进入同一个reduce的key是按照顺序排好的,该类使得:* 如果连续(注意,一定连续)的两条或多条记录满足同组(即compare方法返回0)的条件,* 即使key不相同,他们的value也会进入同一个values,执行一个reduce方法。* 相反,如果原来key相同,但是并不满足同组的条件,他们的value也不会进入一个valeus。* 最后返回的key是:满足这些条件的一组key中排在最后的那个。*/public static class TGroupingComparator extends WritableComparator {public TGroupingComparator() {super(TQ.class, true);}@SuppressWarnings("rawtypes")@Overridepublic int compare(WritableComparable a, WritableComparable b) {TQ t1 = (TQ) a;TQ t2 = (TQ) b;int compare = Integer.compare(t1.getYear(), t2.getYear());// 分组比较器比较年和月份,使得同一年月的记录位于同一ReduceTask当中return compare == 0 ? Integer.compare(t1.getMonth(), t2.getMonth()) : compare;}}//排序比较器,进行 Mapper 端的排序,如果没有制定的话,默认使用 KEY 进行比较public static class TSorter extends WritableComparator {public TSorter() {super(TQ.class, true);}@SuppressWarnings("rawtypes")@Overridepublic int compare(WritableComparable a, WritableComparable b) {TQ t1 = (TQ) a;TQ t2 = (TQ) b;int compare = Integer.compare(t1.getYear(), t2.getYear());if (compare == 0) {compare = Integer.compare(t1.getMonth(), t2.getMonth());if (compare == 0) {return -Integer.compare(t1.getWd(), t2.getWd());}}return compare;}}//分区器,用来对不同的 key 进行分区public static class TPartitioner extends Partitioner<TQ, IntWritable> {@Overridepublic int getPartition(TQ key, IntWritable value, int numPartitions) {return key.getYear() % numPartitions;}}public class TQ implements WritableComparable<TQ> {private int year;private int month;private int day;private int wd;public int getYear() {return this.year;}public void setYear(int year) {this.year = year;}public int getMonth() {return this.month;}public void setMonth(int month) {this.month = month;}public int getDay() {return this.day;}public void setDay(int day) {this.day = day;}public int getWd() {return this.wd;}public void setWd(int wd) {this.wd = wd;}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(year);out.writeInt(month);out.writeInt(day);out.writeInt(wd);}@Overridepublic void readFields(DataInput in) throws IOException {this.year = in.readInt();this.month = in.readInt();this.day = in.readInt();this.wd = in.readInt();}@Overridepublic int compareTo(TQ o) {int compare = Integer.compare(this.year, o.getYear());if (compare == 0) {compare = Integer.compare(this.month, o.getMonth());if (compare == 0) {return Integer.compare(this.day, o.getDay());}}return compare;}}// Reducer 端的聚合逻辑public static class TReducer extends Reducer<TQ, IntWritable, Text, IntWritable> {private Text rkey = new Text();private IntWritable rval = new IntWritable();@Overrideprotected void reduce(TQ key, Iterable<IntWritable> values,Reducer<TQ, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int flg = 0;int day = 0;for (IntWritable intWritable : values) {if(flg == 0){rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());}}}}//Mapper 端的聚合逻辑public static class TMapper extends Mapper<LongWritable, Text, Text, IntWritable> {}//作业程序的主要入口public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// TODO Auto-generated method stubConfiguration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Main.class);job.setMapperClass(TMapper.class);job.setMapOutputKeyClass(TQ.class);job.setMapOutputValueClass(IntWritable.class);// mapTask中的分区器job.setPartitionerClass(TPartitioner.class);// mapTask中的排序比较器job.setSortComparatorClass(TSorter.class);// mapTask中的分组比较器job.setGroupingComparatorClass(TGroupingComparator.class);job.setReducerClass(TReducer.class);job.setNumReduceTasks(2);Path infile = new Path("/data/tq/input");FileInputFormat.addInputPath(job, infile);//当输出路径存在时删除输出路径Path outfile = new Path("/data/tq/output/");//当输出路径存在时删除输出路径if(outfile.getFileSystem(conf).exists(outfile)){outfile.getFileSystem(conf).delete(outfile,true);}//设置输出路径FileOutputFormat.setOutputPath(job, outfile);job.waitForCompletion(true);}}