[关闭]
@zqbinggong 2018-06-12T20:16:42.000000Z 字数 7073 阅读 1853

MapReduce应用开发

MapReduce基础

hadoop 《权威指南》

! All pictures are screenshots from the book 'Hadoop: The Definitive Guide, Fourth Edititon, by Tom White(O'Reilly).Copyright©2015TomWhite, 978-1-491-90163-2'


用于配置的API

Hadoop中的组件是通过Hadoop自己配置的API来配置的;Configuration通过从资源(即使用简单结构定义名值对的XML文件)中读取其属性值

  1. Configuration conf = new Configuration();
  2. conf.addResource("configuration-1.xml");
  3. assertThat(conf.get("color"), is("yellow"));
  4. assertThat(conf.getInt("size", 0), is(10));
  5. assertThat(conf.get("breadth", "wide"), is("wide"));//定义xml中没有的属性,并指定了默认值为wide
  1. Configuration conf = new Configuration();
  2. conf.addResource("configuration-1.xml");
  3. conf.addResource("configuration-2.xml");//override1中的相同属
  1. <property>
  2. <name>size-weight</name>
  3. <value>${size},${weight}</value>
  4. <description>Size and weight</description>
  5. </property>

配置开发环境

管理配置

问题:开发Hadoop应用时,需要经常在本地运行和集群运行之间切换
解决方法:是Hadoop配置文件包含每个集群的连接设置,并且在运行Hadoop应用或工具时指定使用哪一种连接
最好的做法: 将这些文件放置在Hadoop安装目录树之外,方便切换同时避免重复和丢失设置信息

  1. hadoop fs -conf conf/hadoop-localhost.xml -ls

辅助类GenericOptionsParser,Tool,和ToolRuner

  1. API conf
  2. api中给出了实现Tool接口的范例,和书中的范例格式一致
  3. ToolRunner.run(new ConfigurationPrinter(), args)
    • 等价于run(tool.getConf(), tool, args)
      public static int run(Configuration conf,Tool tool,String[] args)
      Runs the given Tool by Tool.run(String[]), after parsing with the given generic arguments. Uses the given Configuration, or builds one if null. Sets the Tool's configuration with the possibly modified version of the conf.
    • 注意args参数被GenericOptionsParser解析
  4. 使用示例:需要注意的是示例2中-D之后有空格,这与变量扩展中所说的JVM参数不同,后者没有空格;两者的作用也不同,前者是为GOP设置某个别属性,而后者是为JVM来改变系统属性的
  1. 1. hadoop ConfigurationPrinter -conf conf/hadoop-localhost.xml
  2. 2. hadoop ConfigurationPrinter -D color=yellow
  1. public class ConfigurationPrinter extends Configured implements Tool {
  2. static {
  3. // 注意configuration默认载入的是core-default.xml和core-site.xml
  4. Configuration.addDefaultResource("hdfs-default.xml");
  5. }
  6. @Override
  7. public int run(String[] args) throws Exception {
  8. Configuration conf = getConf();//configurable接口中的方法,configured是该接口的一个最简单实现
  9. for (Entry<String, String> entry: conf) {
  10. System.out.printf("%s=%s\n", entry.getKey(), entry.getValue());
  11. }
  12. return 0;
  13. }
  14. public static void main(String[] args) throws Exception {
  15. int exitCode = ToolRunner.run(new ConfigurationPrinter(), args);
  16. System.exit(exitCode);
  17. }
  18. }

使用MRUnit来写单元测试

  1. MapDriver
  2. ReduceDriver

本地运行测试数据

目的: 写一个作业驱动程序(job driver),然后在开发机器上使用测试数据运行它

在本地作业运行器上运行作业

  1. Hadoop有一个本地作业运行器(job runner),它是在MapReduce执行引擎上运行单个JVM的MR作业的简化版本,为测试而设计。
  2. 如果mapreduce.framework.name被设置成local(默认情况),则运行本地作业运行器
  1. public class MaxTemperatureDriver extends Configured implements Tool {
  2. @Override
  3. public int run(String[] args) throws Exception {
  4. Job job = new Job(getConf(), "Max temperature");
  5. job.setJarByClass(getClass());
  6. FileInputFormat.addInputPath(job, new Path(args[0]));
  7. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  8. job.setMapperClass(MaxTemperatureMapper.class);
  9. job.setCombinerClass(MaxTemperatureReducer.class);
  10. job.setReducerClass(MaxTemperatureReducer.class);
  11. job.setOutputKeyClass(Text.class);
  12. job.setOutputValueClass(IntWritable.class);
  13. return job.waitForCompletion(true) ? 0 : 1;
  14. }
  15. public static void main(String[] args) throws Exception {
  16. int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
  17. System.exit(exitCode);
  18. }
  19. }

测试驱动程序

除了灵活的配置选项可以使应用程序实现Tool还可以插入任意Configuration来增加可测试性

  1. @Test
  2. public void test() throws Exception {
  3. Configuration conf = new Configuration();
  4. conf.set("fs.defaultFS", "file:///");
  5. conf.set("mapreduce.framework.name", "local");
  6. conf.setInt("mapreduce.task.io.sort.mb", 1);
  7. Path input = new Path("input/ncdc/micro");
  8. Path output = new Path("output");
  9. FileSystem fs = FileSystem.getLocal(conf);
  10. fs.delete(output, true); // delete old output
  11. MaxTemperatureDriver driver = new MaxTemperatureDriver();
  12. driver.setConf(conf);
  13. int exitCode = driver.run(new String[] {
  14. input.toString(), output.toString() });
  15. assertThat(exitCode, is(0));
  16. checkOutput(conf, output);//该方法会对实际输出和预计输出进行比较(预计输出从何而来呢?)
  17. }

在集群上运行

打包作业

  1. 本地作业运行器使用单JVM运行一个作业,只需要所有的类(比如编写的额map类)都在类路径(classpath)上,那么作业就可以正常执行
  2. 对于分布式环境:
    • 首先将作业的类打包成一个作业jar文件发送给集群
    • Hadoop will find the job JAR automatically by searching for the JAR on the driver’s classpath that contains the class set in the setJarByClass() method (on JobConf or Job ). Alternatively, if you want to set an explicit JAR file by its file path, you can use the setJar() method. (The JAR file path may be local or an HDFS file path.)
  3. if you have single job per JAR, you can specify the main class to run in the JAR file's manifest
  4. 任何有以来关系的JAR文件应该打包到作业的JAR文件的lib子目录中

客户端的类路径

Hadoop jar <jar>设置的用户客户端类路径包括:

任务的类路径

On a cluster (and this includes pseudodistributed mode), map and reduce tasks run in separate JVMs, and their classpaths are not controlled by HADOOP_CLASSPATH . HADOOP_CLASSPATH is a client-side setting and only sets the classpath for the driver JVM, which submits the job.

打包依赖(Packaging dependencies)

用以处理作业的库依赖的操作(corresponding options for including library dependencies for a
job):

任务类路径的优先权

用户的JAR文件被添加到客户端和任务类路径的后面,这意味着,如果Hadoop使用的库版本和你的代码使用的不同或不相容,则可能会发生冲突,此时需要调整任务类路径的次序以让你的类被先提取出来

启动作业

We unset the HADOOP_CLASSPATH environment variable because we don’t have any third-party dependencies for this job. If it were left set to target/classes/ (from earlier in the chapter), Hadoop wouldn’t be able to find the job JAR; it would load the MaxTempera tureDriver class from target/classes rather than the JAR, and the job would fail.

  1. unset HADOOP_CLASSPATH
  2. hadoop jar hadoop-examples.jar v2.MaxTemperatureDriver \
  3. -conf conf/hadoop-cluster.xml input/ncdc/all max-temp

MR的web界面

获取结果

作业调试

Hadoop日志

远程调试


作业调优

此处输入图片的描述


MR的工作流

问题: 如何将数据处理问题转化为MR模型

将问题分解成MR作业

ChainReducer

Hadoop自带的ChainReducer可以将很多mapper连接成一个mapper

JobControl

问题: 当一个工作流中的作业不止一个时,如何管理这些作业按顺序进行
主要解决途径是考虑是否存在一个线性的作业链或一个更复杂的作业有向无环图
1. 对于线性链表, 最简单的方法就是一个接一个地运行作业:

  1. JobClient.runJob(conf1)
  2. JobClient.runJob(conf2)
  1. 对于更复杂的结构,需要使用JpbControl类(同样适用于线性链表),JobControl的实例表示一个作业的运行图,可以加入作业配置,然后告知JobControl实例作业之间的依赖关系;如果一个作业失败,那么后面的作业将不会执行

Apache Oozie

  1. Apache Oozie是一个运行由相互依赖的作业组成的工作流。由两部分组成:

    • 工作流引擎,负责存储和运行不同Hadoop作业(MR,pig,hive等)组成的工作流
    • coordinate引擎,负责基于预定义的调度策略以及数据可用性运行工作流作业
  2. 不同于在客户端运行并提交作业的JobControl,Oozie是作为集群中的服务器运行的,客户端提交有个立即或稍后执行的工作流定义到服务器
  3. 在Oozie中,工作流是一个由动作(action)节点和控制流节点组成的DAG:
    • 动作流节点执行工作流任务,例如在Hadoop中移动文件,运行MR、streaming,pig或hive作业等
    • 控制流节点通过构建条件逻辑(so different execution branches may be followed depending on the result of an earlier action node)或并行执行来管理actions之间的工作流执行情况

定义Oozie工作流

打包和配置工作流应用

运行工作流作业

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注