MapReduce的类型与格式
MapReduce基础
MapReduce应用开发
YARN
hadoop
《权威指南》
! All pictures are screenshots from the book 'Hadoop: The Definitive Guide, Fourth Edititon, by Tom White(O'Reilly).Copyright©2015TomWhite, 978-1-491-90163-2'
MapReduce的类型
默认的MR作业
- 默认的mapper是Mapper类,它将输入的键和值原封不动地写到输出中
- 默认的partitioner是HashPartitioner,它对每条记录的键进行哈希操作以决定该记录应该属于哪个分区(每个分区对应于一个reduce任务)
- 默认的reducer是Reducer类,它将所有的输入写到输出中
- map任务的数量等于输入文件被划分成的块数
- reduce任务的个数的选择: 一个经验法则是目标reducer保持在每个运行5分钟左右且产生至少一个HDFS块的输出比较合适
- 默认的输入格式是TexInputFormat,输出是TextOutpFormat
默认的streaming作业
输入格式
输入分片与记录
- 一个输入分片就是由单个map操作来处理的数据块,并且每一个map只处理一个分片、
- 每个输入分片分为若干个记录,每条记录就是 一个键值对,map将一个接一个地处理记录
- 输入分片和记录都是逻辑概念,不一定对应着文件,也可能对应其他数据形式,如对于数据库,输入分片就是对应于一个表上的若干行,一条记录对应着其中的一行
输入分片只是指向数据的引用,不包含数据本身
- InputSpilt接口(Java中的实现),包含
- 以字节为单位的长度,表示分片的大小,用以排序分片,以便优先处理最大的分片,从而最小化作业运行时间
- 一组存储位置,供MR系统使用一边将map任务尽可能放在分片数据附近
- 该接口由InputFormat创建
- InputFormat
- 运行作业的客户端使用getSplits方法计算分片,并将结果告知application master,后者使用其存储信息来调度map任务从而在集群集群上处理这些分片数据
- map任务将输入分片传给createRecordReader方法来获取这个分片的RecordReader(就像是记录上的迭代器),map任务用这个RecordReader来生成记录的键值对,然后再将键值对传递给map函数(参见run方法)
////////////////////////////////////////////////////////////////////
/////InputFormat接口
public abstract class InputFormat<K, V> {
public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}
//////////////////////////////////////////////////////////////////////
/////Mapper的run方法
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()/*委托给RecorReader的同名方法,为mapper产生键值对*/) {
map(context.getCurrentKey(), context.getCurrentValue(), context);//从RecordReader中检索出并传递给map方法
}
cleanup(context);
}
- FileInputFormat类提供两个功能:
- 指出作业的输入文件位置
- 实现了为输入文件产生输入分片的功能(把分片切割成记录的功能由其子类完成)
输入路径
public static void addInputPath(Job job, Path path)
public static void addInputPaths(Job job, String commaSeparatedPaths)
public static void setInputPaths(Job job, Path... inputPaths)
public static void setInputPaths(Job job, String commaSeparatedPaths)
- 前两者用于加入一个或多个路径到路径列表中,后两者一次设定完整的路径列表(replacing any paths set on the Job in previous calls)
- 一条路径可以是文件、目录或者glob(文件和目录的结合),但是目录在默认情况下不会进行递归处理,如果目录下存在子目录,则要么采用glob的形式,要么设置过滤器过滤子目录(因为子目录会被当作文件而报错),或者更改属性设置让其可以递归处理
- FileInputFormat有默认过滤器,用以过滤隐藏文件(自定义的过滤器会和这个默认的一起工作)
- 输入分片: FileInputFormat只分割大文件(超过块的大小)
小文件与CombineFileInputFormat
- CFIF类可以把多个文件打包到一个分片中(在决定将哪些块放到同一分片时,会考虑节点和机架的因素)
避免切分
- 设置最小分片大小以避免切分、
- 重写isSplitable方法
mapper中文件信息
- 调用Mapper类中Context对象的getInputSplit方法来获得InputSplit,对于FileInputFormat,它会被转成FileSplit
- 注意此处的getInputSplit方法和InputFormat中的getSplit方法,后者是用于为整个输入计算分片,而前者是为某个mapper获取该输入分片的相关信息
把整个文件作为一条记录处理
文本输入--TextInputFormat
Hadoop非常擅长处理非结构化文本数据
TextInputFormat是默认的InputFormat
- 每条记录是一条输入,键是LongWritable雷巡官,存储该行在整个文件中的字节偏移量,值是该行的内容(不包括任何行终止符)
- 由于此处的逻辑记录是以行为单位的,因而可能出现某一行会跨文件块存放,从未会为‘本地化’的map任务带来远程读操作的开销(这是因为分片是和行对齐的而不是hdfs块,参考图示)
控制一行最大的长度
- 目的是应对损坏的文件,文件的损坏可能对应一个超长行,从而导致内存溢出
- 长度通过属性mapreduce.input.linerecordreader.line.maxlength设置
关于KeyValueTextInputFFormat
- 目的是应对那些每行内容是一个键值对的文件(之所以是键值对,是因为它经过了一些操作,比如TextOutputFormat的输出就会将键值对写入文件,两者之间使用分隔符分开)
- 所以使用时要指定键值对之间的分隔符,默认是制表符(属性mapreduce.input.keyvaluelinere cordreader.key.value.separator),且保持原来的键而不是使用偏移量作为键
关于NLineInputFormat
- 一般每个mapper收到的行数不同(行数取决于分片大小和行长度),通过该类可是使每个mapper收到的行数相同
- 键是文件中行的字节偏移量,值是行本身
- 应用场景
- 仿真
- 用Hadoop引导从多个数据源(如数据库)加载数据,每行一个数据源
- 关于xml
二进制输入
多个输入
- 问题: 一个MapReduce 作业 的输入可能包括多个输入文件,但是用来解释它们的InputFormat和Mapper是同一个,因此如果它们格式不同,会出现问题(注意一个作业虽然有多个map任务,但是这些map任务都是由同一个mapper类来实现的)
- 解决方法: MultipleInputs类,允许为每条输入路径指定InputFormat和Mapper
MultipleInputs.addInputPath(job, ncdcInputPath, TextInputFormat.class, MaxTemperatureMapper.class);
MultipleInputs.addInputPath(job, metOfficeInputPath, TextInputFormat.class, MetOfficeMaxTemperatureMapper.class);
- 也有没有Mapper类的重载版本,用于多种输入格式且只需一个Mapper(通过Job的setMapperClass方法设定)的情形
public static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass)
数据库输入和输出
--
输出格式