@zhangyy
2017-08-28T05:05:20.000000Z
字数 12692
阅读 355
hadoop的部分
- 一: 理解二次排序的功能, 使用自己理解的方式表达(包括自定义数据类型,分区,分组,排序)
- 二: 编写实现二次排序功能, 提供源码文件。
- 三:理解mapreduce join 的几种 方式,编码实现reduce join,提供源代码,说出思路。
1) partitionerjob.setPartitionerClass(FirstPartitioner.class);2) sortjob.setSortComparatorClass(cls);3) combinejob.setCombinerClass(cls);4) compressset by configuration5) groupjob.setGroupingComparatorClass(FirstGroupingComparator.class);
package org.apache.hadoop.studyhadoop.sort;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/**** @author zhangyy**/public class SecondarySortMapReduce extends Configured implements Tool{// step 1: mapper class/*** public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>*/public static class SecondarySortMapper extends //Mapper<LongWritable,Text,PairWritable,IntWritable>{private PairWritable mapOutputKey = new PairWritable() ;private IntWritable mapOutputValue = new IntWritable() ;@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// line valueString lineValue = value.toString();// splitString[] strs = lineValue.split(",") ;// invalidateif(2 != strs.length){return ;}// set map output key and valuemapOutputKey.set(strs[0], Integer.valueOf(strs[1]));mapOutputValue.set(Integer.valueOf(strs[1]));// outputcontext.write(mapOutputKey, mapOutputValue);}}// step 2: reducer class/*** public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>*/public static class SecondarySortReducer extends //Reducer<PairWritable,IntWritable,Text,IntWritable>{private Text outputKey = new Text() ;@Overridepublic void reduce(PairWritable key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {// set output keyoutputKey.set(key.getFirst());// iteratorfor(IntWritable value : values){// outputcontext.write(outputKey, value);}}}// step 3: driverpublic int run(String[] args) throws Exception {// 1: get configurationConfiguration configuration = super.getConf() ;// 2: create jobJob job = Job.getInstance(//configuration, //this.getClass().getSimpleName()//);job.setJarByClass(this.getClass());// 3: set job// input -> map -> reduce -> output// 3.1: inputPath inPath = new Path(args[0]) ;FileInputFormat.addInputPath(job, inPath);// 3.2: mapperjob.setMapperClass(SecondarySortMapper.class);job.setMapOutputKeyClass(PairWritable.class);job.setMapOutputValueClass(IntWritable.class);// ===========================Shuffle======================================// 1) partitionerjob.setPartitionerClass(FirstPartitioner.class);// 2) sort// job.setSortComparatorClass(cls);// 3) combine// job.setCombinerClass(cls);// 4) compress// set by configuration// 5) groupjob.setGroupingComparatorClass(FirstGroupingComparator.class);// ===========================Shuffle======================================// 3.3: reducerjob.setReducerClass(SecondarySortReducer.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);// set reducer numberjob.setNumReduceTasks(2);// 3.4: outputPath outPath = new Path(args[1]);FileOutputFormat.setOutputPath(job, outPath);// 4: submit jobboolean isSuccess = job.waitForCompletion(true);return isSuccess ? 0 : 1 ;}public static void main(String[] args) throws Exception {args = new String[]{"hdfs://namenode01.hadoop.com:8020/input/sort" ,//"hdfs://namenode01.hadoop.com:8020/output"};// create configurationConfiguration configuration = new Configuration();// run jobint status = ToolRunner.run(//configuration, //new SecondarySortMapReduce(), //args) ;// exit programSystem.exit(status);}}
PairWritable.java
package org.apache.hadoop.studyhadoop.sort;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class PairWritable implements WritableComparable<PairWritable> {private String first;private int second;public PairWritable() {}public PairWritable(String first, int second) {this.set(first, second);}public void set(String first, int second) {this.first = first;this.setSecond(second);}public String getFirst() {return first;}public void setFirst(String first) {this.first = first;}public int getSecond() {return second - Integer.MAX_VALUE;}public void setSecond(int second) {this.second = second + Integer.MAX_VALUE;}public void write(DataOutput out) throws IOException {out.writeUTF(first);out.writeInt(second);}public void readFields(DataInput in) throws IOException {this.first = in.readUTF();this.second = in.readInt();}public int compareTo(PairWritable o) {// compare firstint comp =this.first.compareTo(o.getFirst()) ;// eqaulsif(0 != comp){return comp ;}// comparereturn Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond())) ;}}
FirstPartitioner.java
package org.apache.hadoop.studyhadoop.sort;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Partitioner;public class FirstPartitioner extends Partitioner<PairWritable,IntWritable> {@Overridepublic int getPartition(PairWritable key, IntWritable value,int numPartitions) {return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;}}
FirstGroupingComparator.java
package org.apache.hadoop.studyhadoop.sort;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.WritableComparator;public class FirstGroupingComparator implements RawComparator<PairWritable> {// object comparepublic int compare(PairWritable o1, PairWritable o2) {return o1.getFirst().compareTo(o2.getFirst());}// bytes comparepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return WritableComparator.compareBytes(b1, 0, l1 - 4, b2, 0, l2 - 4);}}
上传数据处理:
hdfs dfs -put sort /input
运行输出:


mapreduce join 有三种:
3.1 map 的端的joinmap阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache 去实现。3.2 reduce 的端的join在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作3.3 SemiJoinSemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同
3.4 编程代码:
DataJoinMapReduce.java
package org.apache.hadoop.studyhadoop.join;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Mapper.Context;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/**** @author zhangyy**/public class DataJoinMapReduce extends Configured implements Tool {// step 1 : mapper/*** public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>*/public static class WordCountMapper extends //Mapper<LongWritable, Text, LongWritable, DataJoinWritable> {private LongWritable mapOutputKey = new LongWritable();private DataJoinWritable mapOutputValue = new DataJoinWritable();@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// splitString[] strs = value.toString().split(",");// invalidateif ((3 != strs.length) && (4 != strs.length)) {return;}// set mapoutput keyLong cid = Long.valueOf(strs[0]);mapOutputKey.set(cid);// set nameString name = strs[1];// customerif (3 == strs.length) {String phone = strs[2];mapOutputValue.set("customer", name + "," + phone);}// orderif (4 == strs.length) {String price = strs[2];String date = strs[3];mapOutputValue.set("order", name + "," + price + "," + date);}context.write(mapOutputKey, mapOutputValue);}}// step 2 : reducerpublic static class WordCountReducer extends //Reducer<LongWritable, DataJoinWritable, NullWritable, Text> {private Text outputValue = new Text();@Overridepublic void reduce(LongWritable key, Iterable<DataJoinWritable> values,Context context) throws IOException, InterruptedException {String customerInfo = new String();List<String> orderList = new ArrayList<String>();for (DataJoinWritable value : values) {if ("customer".equals(value.getTag())) {customerInfo = value.getData();} else if ("order".equals(value.getTag())) {orderList.add(value.getData());}}for (String order : orderList) {outputValue.set(key.toString() + "," + customerInfo + ","+ order);context.write(NullWritable.get(), outputValue);}}}// step 3 : jobpublic int run(String[] args) throws Exception {// 1 : get configurationConfiguration configuration = super.getConf();// 2 : create jobJob job = Job.getInstance(//configuration,//this.getClass().getSimpleName());job.setJarByClass(DataJoinMapReduce.class);// job.setNumReduceTasks(tasks);// 3 : set job// input --> map --> reduce --> output// 3.1 : inputPath inPath = new Path(args[0]);FileInputFormat.addInputPath(job, inPath);// 3.2 : mapperjob.setMapperClass(WordCountMapper.class);// TODOjob.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(DataJoinWritable.class);// ====================shuffle==========================// 1: partition// job.setPartitionerClass(cls);// 2: sort// job.setSortComparatorClass(cls);// 3: combine// job.setCombinerClass(cls);// 4: compress// set by configuration// 5 : group// job.setGroupingComparatorClass(cls);// ====================shuffle==========================// 3.3 : reducerjob.setReducerClass(WordCountReducer.class);// TODOjob.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(Text.class);// 3.4 : outputPath outPath = new Path(args[1]);FileOutputFormat.setOutputPath(job, outPath);// 4 : submit jobboolean isSuccess = job.waitForCompletion(true);return isSuccess ? 0 : 1;}public static void main(String[] args) throws Exception {args = new String[] {"hdfs://namenode01.hadoop.com:8020/join","hdfs://namenode01.hadoop.com:8020/output3/"};// get configurationConfiguration configuration = new Configuration();// configuration.set(name, value);// run jobint status = ToolRunner.run(//configuration,//new DataJoinMapReduce(),//args);// exit programSystem.exit(status);}}
DataJoinWritable.java
package org.apache.hadoop.studyhadoop.join;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class DataJoinWritable implements Writable {private String tag ;private String data ;public DataJoinWritable() {}public DataJoinWritable(String tag, String data) {this.set(tag, data);}public void set(String tag, String data) {this.setTag(tag);this.setData(data);}public String getTag() {return tag;}public void setTag(String tag) {this.tag = tag;}public String getData() {return data;}public void setData(String data) {this.data = data;}@Overridepublic int hashCode() {final int prime = 31;int result = 1;result = prime * result + ((data == null) ? 0 : data.hashCode());result = prime * result + ((tag == null) ? 0 : tag.hashCode());return result;}@Overridepublic boolean equals(Object obj) {if (this == obj)return true;if (obj == null)return false;if (getClass() != obj.getClass())return false;DataJoinWritable other = (DataJoinWritable) obj;if (data == null) {if (other.data != null)return false;} else if (!data.equals(other.data))return false;if (tag == null) {if (other.tag != null)return false;} else if (!tag.equals(other.tag))return false;return true;}public void write(DataOutput out) throws IOException {out.writeUTF(this.getTag());out.writeUTF(this.getData());}public void readFields(DataInput in) throws IOException {this.setTag(in.readUTF());this.setData(in.readUTF());}@Overridepublic String toString() {return tag + "," + data ;}}
上传文件:
hdfs dfs -put customers.txt /join
hdfs dfs -put orders.txt /join
运行结果:
