@linux1s1s
2018-02-07T10:48:01.000000Z
字数 3755
阅读 1504
Big-Data
2018-02
本文自写wordcount实例,以期熟悉Hadoop操作,以下为详细记录.
新建wordcount目录,并在该目录下新建两个子目录分别为src和classes.分别存放源码和编译结果.
最简单的MapReduce程序,只需要三个Java文件即可
package com.linroid.hadoop;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordMapper extends Mapper<Object, Text, Text, IntWritable> {
/**
* IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类实现了WritableComparable接口,
* 都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为int,String 的替代品。
* 声明one常量和word用于存放单词的变量
*/
IntWritable one = new IntWritable(1);
Text word = new Text();
/**
* Mapper中的map方法:
* void map(K1 key, V1 value, Context context)
* 映射一个单个的输入k/v对到一个中间的k/v对
* 输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。
* Context:收集Mapper输出的<k,v>对。
* Context的write(k, v)方法:增加一个(k,v)对到context
* 程序员主要编写Map和Reduce函数.这个Map函数使用StringTokenizer函数对字符串进行分隔,通过write方法把单词存入word中
* write方法存入(单词,1)这样的二元组到context中
*/
public void map(Object key, Text value, Context context) throws IOException,InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
package com.linroid.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable result = new IntWritable();
/**
* Reducer类中的reduce方法:
* void reduce(Text key, Iterable<IntWritable> values, Context context)
* 中k/v来自于map函数中的context,可能经过了进一步处理(combiner),同样通过context输出
*/
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {
int sum = 0;
for(IntWritable val:values) {
sum += val.get();
}
result.set(sum);
context.write(key,result);
}
}
package com.linroid.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static void main(String[] args) throws Exception {
/**
* Configuration:map/reduce的j配置类,向hadoop框架描述map-reduce执行的工作
*/
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "wordcount"); //设置一个用户定义的job名称
job.setJarByClass(WordCount.class);
job.setMapperClass(WordMapper.class); //为job设置Mapper类
job.setCombinerClass(WordReducer.class); //为job设置Combiner类
job.setReducerClass(WordReducer.class); //为job设置Reducer类
job.setOutputKeyClass(Text.class); //为job的输出数据设置Key类
job.setOutputValueClass(IntWritable.class); //为job输出设置value类
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为job设置输入路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //为job设置输出路径
System.exit(job.waitForCompletion(true)?0:1);
}
}
Hadoop2.x 版本编译不再集中到一个hadoop-core-*.jar中,而是需要多个jar,如下图找到这些jar的目录
然后使用javac命令进行编译
javac -classpath /usr/local/hadoop/share/hadoop/common/hadoop-common-2.6.5.jar:/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.5.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar -d classes/ src/*.java
如图所示:
- classpath,设置源代码里使用的各种类库所在的路径,多个路径用":"隔开。
- d,设置编译后的 class 文件保存的路径。
- src/*.java,待编译的源文件。
jar -cvf wordcount.jar classes
如图所示:
su - hadoop #输入密码
start-all.sh #启动hadoop
hadoop jar /home/mutex/word-count/wordcount.jar com.linroid.hadoop.WordCount /input /otput
然后再otput目录中查看结果
hdfs dfs -cat /otput/part-r-00000
如图所示: