@awsekfozc
2015-12-09T11:23:13.000000Z
字数 3257
阅读 1671
MapReduce
package com.zc.hadoop.mapreduce.partition;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
public class WordCountMr extends Configured implements Tool{
public static void main(String[] args) throws Exception{
// System.setProperty("hadoop.home.dir","E:\\zc\\大数据\\工具\\hadoop-2.5.0\\");
// args = new String[]{
// "/user/zc/tmp/input" ,
// "/user/zc/tmp/output"+System.currentTimeMillis() };
int status = new WordCountMr().run(args);
System.exit(status);
}
// Map
public static class WCMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private Text mapOutPutkey = new Text();
private final static IntWritable mapOutPutValue = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String linvalue = value.toString();
String[] strs = linvalue.split(" ");
for(String str:strs)
{
mapOutPutkey.set(str);
context.write(mapOutPutkey, mapOutPutValue);
}
}
}
// Reduce
public static class WCReduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outputvalue = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int showsum = 0;
for(IntWritable value:values){
showsum += value.get();
}
outputvalue.set(showsum);
context.write(key, outputvalue);
}
}
/**
* @param
* */
// Driver
public int run(String[] args) throws Exception {
// 1 set configuration
Configuration configuration = new Configuration();
// 2 create job
Job job = Job.getInstance(configuration, this.getClass()
.getSimpleName());
job.setJarByClass(this.getClass());
// 3.1 set job
Path path = new Path(args[0]);
FileInputFormat.addInputPath(job, path);
// 3.2 set mapper
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//-----------Shuffle------分区--------------------
//1)partitioner分区
job.setPartitionerClass(WCPartitioner.class);
//reduce task number
job.setNumReduceTasks(3);
//-----------------Shuffle--------------------
// 3.3 set reduce
job.setReducerClass(WCReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//3.4 set output
Path outpPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpPath);
//submit
boolean b = job.waitForCompletion(true);
return b?0:1;
}
}
package com.zc.hadoop.mapreduce.partition;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WCPartitioner extends Partitioner<Text, IntWritable> {
/**
* @ClassName: WCPartitioner
* @Description: 根据字母大小写分区,其他0,小写1,大写2(在此建立了3个分区,即会产生3个reduce任务输出到不同的文件0和1,2)
* @param K k2(map输出的键), V v2(map输出的值)
* @author zc
*/
@Override
public int getPartition(Text key, IntWritable value,
int numPartitions) {
if(null==key)
return 0;
String str = key.toString();
if(str.matches("^[a-z]+$")){
return 1;
}
if(str.matches("^[A-Z]+$")){
return 2;
}
return 0;
}
}
在此输入正文