@awsekfozc
2015-12-09T03:23:13.000000Z
字数 3257
阅读 1853
MapReduce
package com.zc.hadoop.mapreduce.partition;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;public class WordCountMr extends Configured implements Tool{public static void main(String[] args) throws Exception{// System.setProperty("hadoop.home.dir","E:\\zc\\大数据\\工具\\hadoop-2.5.0\\");// args = new String[]{// "/user/zc/tmp/input" ,// "/user/zc/tmp/output"+System.currentTimeMillis() };int status = new WordCountMr().run(args);System.exit(status);}// Mappublic static class WCMapper extendsMapper<LongWritable, Text, Text, IntWritable> {private Text mapOutPutkey = new Text();private final static IntWritable mapOutPutValue = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String linvalue = value.toString();String[] strs = linvalue.split(" ");for(String str:strs){mapOutPutkey.set(str);context.write(mapOutPutkey, mapOutPutValue);}}}// Reducepublic static class WCReduce extendsReducer<Text, IntWritable, Text, IntWritable> {private IntWritable outputvalue = new IntWritable();@Overridepublic void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int showsum = 0;for(IntWritable value:values){showsum += value.get();}outputvalue.set(showsum);context.write(key, outputvalue);}}/*** @param* */// Driverpublic int run(String[] args) throws Exception {// 1 set configurationConfiguration configuration = new Configuration();// 2 create jobJob job = Job.getInstance(configuration, this.getClass().getSimpleName());job.setJarByClass(this.getClass());// 3.1 set jobPath path = new Path(args[0]);FileInputFormat.addInputPath(job, path);// 3.2 set mapperjob.setMapperClass(WCMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//-----------Shuffle------分区--------------------//1)partitioner分区job.setPartitionerClass(WCPartitioner.class);//reduce task numberjob.setNumReduceTasks(3);//-----------------Shuffle--------------------// 3.3 set reducejob.setReducerClass(WCReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//3.4 set outputPath outpPath = new Path(args[1]);FileOutputFormat.setOutputPath(job, outpPath);//submitboolean b = job.waitForCompletion(true);return b?0:1;}}
package com.zc.hadoop.mapreduce.partition;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class WCPartitioner extends Partitioner<Text, IntWritable> {/*** @ClassName: WCPartitioner* @Description: 根据字母大小写分区,其他0,小写1,大写2(在此建立了3个分区,即会产生3个reduce任务输出到不同的文件0和1,2)* @param K k2(map输出的键), V v2(map输出的值)* @author zc*/@Overridepublic int getPartition(Text key, IntWritable value,int numPartitions) {if(null==key)return 0;String str = key.toString();if(str.matches("^[a-z]+$")){return 1;}if(str.matches("^[A-Z]+$")){return 2;}return 0;}}
在此输入正文