MapReduce的工作机制
MapReduce基础
MapReduce应用开发
YARN
hadoop
《权威指南》
! All pictures are screenshots from the book 'Hadoop: The Definitive Guide, Fourth Edititon, by Tom White(O'Reilly).Copyright©2015TomWhite, 978-1-491-90163-2'
MR作业运行机制
运行MR作业的两种简单方式:
- Job对象的submitfang方法,该方法封装了大量的细节
- waitForCompletion, 用于提交以前没有提交过的作业,并等待它的完成
运行MR作业涉及到的5大实体:
1. 客户端, 提交MR作业
2. YARN资源管理器, 负责协调集群上计算机资源的分配
3. YARN节点管理器, 负责启动和监视集群中机器上的计算容器(container),注意容器的分配是由YARN的调度器负责的
4. MR的application master, 负责协调运行MR作业的任务。它和MR作任务在容器中运行,这些容器由资源管理器分配并由节点管理器进行管理
5. 分布式文件系统(一般为hdfs), 用来与其他实体共享作业文件
作业的提交
- Job的summit方法创建一个JobSubmitter实例,并且调用其submitJobInternal方法
- 提交作业后,waitForCompletion每秒轮询作业的进度,如有改变,将进度报告给控制台
- 作业完成后,成功则显示作业计数器,失败则将错误报告给控制台
JobSubmitter所实现的作业提交过程
- 向资源管理器请求一个新应用ID,用于MR作业ID,参见图示步骤2
- 检查作业的输出说明,例如没有指定输出目录或者目录已经存在,则不提交作业,并将错误抛回给MR程序
- 计算作业的输入分片,如果分片无法计算(比如由输入路径不存在所致),则不提交作业,并将错误抛回给MR程序
- 将运行作业需要的资源(包括作业JAR文件、配置文件、和计算所得的输入分片)复制到一个以作业ID命名的共享文件系统中,参见图示步骤4。由于JAR文件的副本较多(由mapreduce.client.submit.file.replication属性控制,默认为10),因而在运行作业的任务时,集群中由很多个副本可供节点管理器访问
- 通过调用资源管理器的submitApplication方法提交作业,参见图示步骤4
作业的初始化
- 资源管理器收到要调用它的submiApplication方法后,便将请求传递给YARN调度器,然后调度器将分配一个容器,最后资源管理器在节点管理器的管理下启动application master(参见图示步骤5a和5b)
- application对作业的初始化是通过创建多个簿记对象以保持对作业进度的追踪来完成的(as it will receive the progress and completion reports from the tasks)。参见图示步骤6
- 接下来,application master将接受来自共享文件系统的由客户端计算的输入分片,然后为每一个输入分片创建一个map任务对象,以及由mapreduce.job.reduces属性(由setNumReducerTasks方法设置)确定的多个reducer对象,任务ID将在此时分配
- application master会决定如何运行构成MR作业的各个任务,如果作业很小,就选择和自己同在一个JVM上运行任务,这样的作业称为uberized,被作为uber任务运行
- 小作业默认情况下指的是小于10个mapper且只有一个reducer且输入大小小于一个HDFS块的作业(通过mapreduce.job.ubertask.*的相关值可以改变具体的值)
- 最后在任务运行之前,application master调用setupJob方法设置OutputCommiter,默认值为FileOutputtCommiter
任务的分配
如果作业不适合作为uber任务运行,那么application master就会为该作业中的所有map任务和reduce任务先资源管理器请求容器,参见图示步骤8
1. 首先为map任务请求,因为所有的map任务必须在reduce的排序阶段启动之前完成,直到5%的map任务已经完成时,为reduce任务的请求才会发出
2. reduce任务能够在集群中的任意位置运行,但是map任务的请求必须有着数据本地化的限制,这也是调度器所关注的(数据本地化时理想选择,机架本地化时可选的选择,有些任务既不是数据本地化也不是机架本地化)
3. 请求也为任务制定了内存需求和CPUs
任务的执行
- 一旦调度器为任务分配了一个特点节点上的容器之后,application master将通过与节点管理器通信来启动容器
- 该任务将由YarnChild运行(参见图示步骤11),由于YarnChild在指定的JVM中运行,因此用户定义的map和reduce函数(甚至时YarnChild自身)的任何缺陷都不会影响到节点管理器
- 在YarnChild运行任务之前,它会将所需资源本地化(从共享文件系统中,参见图示步骤10)
- 每个任务都能执行setup和commit动作,这两个动作和任务本身在同一个JVM中运行,并由作业的OutputCommiter决定
streaming
streamin运行特殊的mao任务和reduce任务,目的是运行用户提供的可执行程序,并与之通信
进度和状态的更新
一个作业和它的每个任务都有一个状态,包括:
- 作业或任务的状态(运行中,成功完成,失败)
- map和reduce的进度
- 作业计数器的值
- 状态信息或描述
任务在运行时,对其进度保持追踪
- map任务的进度就是已处理数据的比例
- reduce的整个过程分成3部分,与shuffle的3个阶段相对应(reduce 复制 排序各占1/3)
任务也有一组计数器,负责对任务运行过程中各个事件进行计数
- map和reduce任务运行时,子进程和父application master通过umbilical接口进行通信
- 在作业期间,客户端每秒钟轮询一次application master以接受最新状态,客户端也可以通过Job的getStatus方法得到一个JobStatus实例,后会包含作业的所有状态信息
作业的完成
- 当application master接受到最后一个任务已经完成的通知后,便把作业的装填设置为successful
- 然后,但Job轮询状态时,便知道任务已经完成,于是Job打印消息告知用户,然后从waitForCompletion方法返回
- 如果application master有相关的设置,也会发送一个HTTP作业通知
- 最后,作业完成时,application master和任务容器清理它们的工作状态(中间输出会被删除),OutputCommiter的commitJob方法会被调用,作业信息由作业历史服务器存档,以便日后用户需要时可以查询
失败
我们需要考虑以下实体的失败:
- 任务
- application master
- 节点管理器
- 资源管理器
任务运行失败
任务失败的模式
- 最常见的是map任务或reduce任务中的用户代码抛出运行异常
- 此时任务JVM会在退出之前向其父application master发送错误报告,错误报告最后会被计入用户日志
- application master将此任务尝试标记为failed,并释放容器以便资源可以为其他任务使用
- 对于streaming任务,如果streaming进程以非零推出代码推出,则被标记为faile
- JVM突然退出(可能由于JVM软件的缺陷所导致),此时节点管理器会注意到进程已经退出,并通知application master将此次任务尝试标记为failed
- 任务挂起: 一旦application master超过一定的时间(由mapreduce.task.timeout属性指定,为0则表示关闭)没有收到进度的更新,便会将任务标记为失败
处理
application master被告知一个任务尝试失败后,将重新调度该任务的执行
- application master会尽量避免在以前失过的节点管理器上重新调度该任务
- 一个任务尝试失败过4次(mapreduce.map/reduce.maxattempts属性指定),将不会再重试
对于一些应用程序,我们不希望一旦由少数几个任务失败就中止运行整个作业,因为即使由任务失败,作业的一些结果可能还是有用的,此时可以为作业设置不触发作业失败的情况下允许任务失败的最大百分比(mapreduce.map/reduce.failures.maxpercent属性)
- a task attemp may also be killed, which is different from it failing
- 任务尝试可以被终止是因为它是一个推测副本(推测副本在一个相同的任务完成后被中止)或因为它所处的节点管理器失败,导致application master将它上面运行的所有任务尝试标记为killed
- 被终止的任务尝试不会被计入任务尝试次数(即上面的控制重试的次数)
application master运行失效
- MapReduce application master最大尝试次数由属性mapreduce.am.max-attempts设定,默认为2,超出后将不再尝试,作业讲失败
- Yarn对集群上运行的YARN application master的最大尝试次数由属性yarn.resourcemanager.am.max-attempts设定,默认为2(因而想要增加MR application master的尝试次数,也必须增加Yarn application master的尝试次数)
- 恢复过程:
- application master向资源管理器发送周期性心跳,当其失败时,资源管理器会检查到并在一个新的容器里(由 节点管理器管理)开始一个新的master实例
- 对于MR application master,它将使用作业历史来恢复已被失败的应用程序所运行的任务的状态,使其(任务)不必重新运行,恢复功能由属性yarn.app.mapreduce.am.job.recovery.enable控制,默认开启
- MR客户端向application master轮询进度报告,若application master运行失败,那么客户端就需要定位新的实例即折回向资源管理器请求新的application master的地址(注意初始化时客户端便从资源管理器那得到了地址,因而轮询时不必重载资源管理器)
节点管理器运行失败
- 如果节点管理器由于崩溃或者运行缓慢而失败,就会停止向资源管理器发送心跳信息(或发送频率很低),10分钟(由属性yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms设定)内都没有收到心跳信息,那么资源管理器就会notice停止发送心跳信息的节点管理器,并将其从自己的节点池中移除以调度启用容器
- 在失败的节点管理器上运行的所有任务或application master恢复机制如前所述,需要注意的是作业中已经完成的任务也需要重新运行,因为此时你没法保证reduce能够访问到驻留在失败节点上的中间输出
- 拉黑机制(暂时处于application master的层面上,也就是对于特定作业),对于MR,如果一个节点管理器上由超过3个(由属性mapreduce.job.maxtaskfailures.per.tracker job property设定)任务失败,那么application master讲尽量将任务调度到其他节点上
资源管理器运行失败
- 为了获得高可性,在双机热备配置下,运行一对资源管理器是必要的
- 所有运行中的应用程序的信息存储在一个高可用的状态存储区中(由ZooKeeper或HDFS备份),这样备机可以恢复出失败的主资源管理器的关键状态
- 注意节点管理器的信息没有存储在状态存储区中,因为当节点管理器第一次发送心跳信息时,新的资源管理器就能快速重构出节点管理器的信息;此外任务是由application master管理的,因此它不是资源管理器状态的一部分
- 当新的资源管理器启动后,它状态存储区中读取应用程序的信息,然后为集群中运行的所有应用程序重启application master
- 资源管理器从备机到主机的切换由故障转移控制器处理,为应对资源管理器的故障转移,必须对客户和节点管理器进行配置,因为他们可能是在和两个资源管理器打交道。解决发方法就是客户和节点管理器以轮询方式试图连接每一个资源管理器直达找到主资源管理器为止。如果主资源管理器故障,他们将再次尝试直到备份资源管理器变成主机
shuffle
- MR必须确保每个reducer的输入都是按键排序的
- 系统执行排序、将map输出作为输入传递给reducer的过程称为shuffle
map端
map函数开始产生输出时,利用缓存的方式写到内存并处于高效的方式进行预排序
1. 每个map任务都有一个环形缓冲区(先进先出)用于存储任务输出(属性mapreduce.task.io.sort.mb控制大小,默认100mb),一旦缓冲内容达到阈值(默认为0.8),一个后台线程便会将内容溢出(spill)磁盘;在此过程中,map输出继续写到缓冲,如果此时缓冲区便填满,则map会被阻塞直到spill完成
- Spills are written in round-robin fashion to the directories specified by the mapreduce.cluster.local.dir property, in a job-specific subdirectory.
- 在写入磁盘之前,需要根据数据最终要传入的reducer对数据进行分区,在每个分区内,按键进行内存中排序(in-memory sort),如果由combiner函数,则在排序后的输出上运行(会使数据更加紧凑)
- 每次内存缓存区达到溢出值,就会新建一个溢出文件,在任务完成之前,溢出文件会被合并成一个已分区且已排序的输出文件一次最多合并多少流由属性mapreduce.task.io.sort.factor控制)
- 如果至少存在3个溢出文件(mapreduce.map.combine.minspills属性控制)时,则combiner就会在输出文件写到磁盘之前再次运行
- 在将map输出写进磁盘的过程中对其进行压缩可以加快磁盘的写入速度,节约磁盘空间以及减少传给reducer的数据量(属性mapreduce.map.output.compress为true时启用)
- reducer通过HTTP得到输出文件的分区
reduce端
- 复制阶段: 在每个map任务完成时复制其输出
- 输出较小时直接复制到reduce任务JVM的内存中,否则复制到磁盘上。复制到内存的过程和map端的缓冲区过程相似,当超过阈值时会合并溢出到磁盘中(如果指定了combiner可以运行其来降低写入磁盘的数据量)
- 随着磁盘上的副本数量增多,后台线程会将它们合并成更大的排好序的文件,为后面的合并节约时间(注意为了合并,压缩的map输出需要在内存中解压缩)
- 排序阶段(合并阶段): 合并map输出,并维持其之前的排序
- reduce阶段: 直接将数据输入reduce函数 ,从而省去了一次磁盘往返过程(这里的直接指的是将合并阶段产生的几个文件直接合并成一个作为输入而不是先存储在磁盘上在作为输入);此阶段的输出直接写入数据文件系统,一般为HDFS(此时由于节点管理器也运行着一个datanode,因而第一个块副本将写在本地磁盘)
配置调优
总的原则是尽可能多地给shuffle过程提供内存,不过要确保map和reduce函数能够得到足够的内存来运行(这也是在写这两个函数时尽可能减少内存使用的原因,<避免在map中堆积数据>)
1. map端: 避免spill过程多次发生就可获得最佳性能
2. reduce端: 中间数据全部驻留在内存时就可以获得最佳性能(默认情况下不可能实现,因为所有内存一般都留给了reduce函数)
任务的执行
任务执行环境
推测执行
推测执行: Hadoop不会尝试诊断或修复执行慢的任务,而是在一个任务运行地比预期慢时,它会尽量检测并启动另一个相同的任内务作为备份
1. 推测执行的属性
- 推测执行是以集群执行效率为代价的,因而可以考虑关闭
- 尤其对于reduce任务,因为任意重复的reduce任务都必须将取得map任务的输出,而这可能大幅度增加集群上的网络传输
- 非幂等任务
OutputCommitters
- Hadoop MapReduce通过使用一个提交协议来确保作业和任务都完全失败或成功,这通过对作业使用OutputCommitters来实现
- OutputCommiter由OutputFormat通过它的getOutputCommitter方法来确定,默认为FileOutpCommiter
- API如下
public abstract class OutputCommitter {
public abstract void setupJob(JobContext jobContext) throws IOException;
public void commitJob(JobContext jobContext) throws IOException { }
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { }
public abstract void setupTask(TaskAttemptContext taskContext) throws IOException;
public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException;
public abstract void commitTask(TaskAttemptContext taskContext) throws IOException;
public abstract void abortTask(TaskAttemptContext taskContext) throws IOException;
}
- setupJob方法在作业运行前调用,通常用来执行初始化操作),对于FileOutputCommite,该方法将创建最终的输出目录${mapreduce.output.fileoutputformat.outputdir},并且为任务输出创建一个临时的工作空间_temporarry,作为最终输出目录的子目录
- 作业成功则调用commitJob方法,默认的实现中,它用于删除临时额工作空间并在输出目录中创建一个名为_success的隐藏标志文件,以告知文件系统给的客户端该作业成功完成;反之调用absorbJob方法,默认是想中,它将删除作业的临时工作空间
- 任务级别上的操作与Job上类似,不同之处在于setupTask默认实现中是不做任何事的,因为针对任务输出命名的临时目录在写任务输出时创建
- 任务的提交时可选的,通过needTaskCommit来开启或关闭,关闭后执行框架将不会为任务运行分布提交协议
- 指定框架保证特定任务在有多次任务尝试的情况下,只有一个任务会被提交,其他的则被取消
任务附属文件