[关闭]
@tsing1226 2016-01-08T19:17:47.000000Z 字数 10518 阅读 6148

oozie

Oozie workflow中四个Action解析

WorkFlow定义

Oozie中WorkFlow包括job.properties、workflow.xml 、lib 目录(依赖jar包)三部分组成。job.properties配置文件中包括nameNode、jobTracker、queueName、oozieAppsRoot、oozieDataRoot、oozie.wf.application.path、inputDir、outputDir,其关键点是指向workflow.xml文件所在的HDFS位置。

    • job.properties

      关键点:指向workflow.xml文件所在的HDFS位置

    • workflow.xml (该文件需存放在HDFS上)

      包含几点:

      • start
      • action
        MapReduce、Hive、Sqoop、Shell
        • ok
        • error
      • kill
      • end
    • lib 目录 (该目录需存放在HDFS上)

      依赖jar包

Sqoop Action

创建文件并写入表的数据

hadoop$:touch my_user.txt

在HDFS上创建目录

hadoop$:bin/hdfs dfs -mkdir oozie-datas/sqoop
    hadoop$: bin/hdfs dfs -put my_user.txt oozie-datas/sqoop

拷贝example的sqoop项目

oozie$:cp -r examples/apps/sqoop oozie-apps/

在项目根目录下创建lib目录

oozie$:mkdir oozie-apps/sqoop/lib
    oozie$:cp /opt/cdh3.5.6/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar oozie-apps/sqoop/lib/

配置文件

  • job.properties
nameNode=hdfs://hadoop-senior01.grc.com:8020
jobTracker=hadoop-senior01.grc.com:8032
queueName=default
oozieAppsRoot=user/grc/oozie-apps
oozieDataRoot=user/grc/oozie-datas

oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/sqoop
  • workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.5" name="sqoop-wf">
<start to="sqoop-node"/>
<action name="sqoop-node">
    <sqoop xmlns="uri:oozie:sqoop-action:0.3">
        <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
       <configuration>
            <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <command>export --connect jdbc:mysql://hadoop-senior01.grc.com:3306/db_1206 --username root --password 123456 --table my_user --num-mappers 1 --input-fields-terminated-by "\t" --export-dir /user/grc/oozie-datas/sqoop</command>
    </sqoop>
    <ok to="end"/>
    <error to="fail"/>
</action>

<kill name="fail">
    <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>




/opt/cdh3.5.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put oozie-apps/sqoop/ oozie-apps


workflow.xml第二种配置

  • workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.5" name="sqoop-wf">
<start to="sqoop-node"/>

<action name="sqoop-node">
    <sqoop xmlns="uri:oozie:sqoop-action:0.3">
        <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
       <configuration>
            <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <arg>export</arg>
        <arg>--connect</arg>
        <arg>jdbc:mysql://hadoop-senior01.grc.com:3306/db_1206</arg>
        <arg>--username</arg>
        <arg>root</arg>
        <arg>--password</arg>
        <arg>123456</arg>
        <arg>--table</arg>
        <arg>my_user</arg>
        <arg>--num-mappers</arg>
        <arg>1</arg>
        <arg>--input-fields-terminated-by</arg>
        <arg>"\t"</arg>
        <arg>--export-dir</arg>
        <arg>/user/grc/oozie-datas/sqoop</arg>
    </sqoop>
    <ok to="end"/>
    <error to="fail"/>
</action>

<kill name="fail">
    <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>

Note:之间不能有空格,"\t"之间可以加双引号,或不加,但是不能加单引号。

Shell Action部署

在本地创建项目目录

oozie$:mkdir oozie-apps/shell

拷贝example中shell项目到指定目录

oozie$:cp -r examples/apps/shell/* oozie-apps/shell/

修改配置文件

  • job.properties
nameNode=hdfs://hadoop-senior01.grc.com:8020
jobTracker=hadoop-senior01.grc.com:8032
queueName=default
oozieAppsRoot=user/grc/oozie-apps
oozieDataRoot=user/grc/oozie-datas

oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/shell
#Shell Script to run
EXEC=emp-join-demp.sh
  • workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.5" name="shell-wf">
<start to="shell-node"/>
<action name="shell-node">
    <shell xmlns="uri:oozie:shell-action:0.2">
        <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
        <configuration>
            <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>

       <exec>${EXEC}</exec> 
            <file>${nameNode}/${oozieAppsRoot}/shell/${EXEC}#${EXEC}</file>
            <capture-output/>
        </shell>
        <ok to="end"/>
        <error to="fail"/>
    </action>
       <kill name="fail">
        <message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>

脚本

  • 创建emp-join-demp.sh脚本
#!/bin/sh
## set system enviroment variable
 . /etc/profile
## set HIVE_HOME
HIVE_HOME=/opt/cdh3.5.6/hive-0.13.1-cdh5.3.6
$HIVE_HOME/bin/hive -e "create table db_1206.tb_join as select e.empno, e.ename, e.job, d.deptno, d.dname from db_1206.emp e join db_1206.dept d on e.deptno = d.deptno ;"

Note:但是在开发中我们更希望把操作单独写成文件,这样方便我们日后管理调试。

  • emp-join-demp.sh脚本
#!/bin/sh
## set system enviroment variable
 . /etc/profile
##set SCRIPT_PATH
SCRIPT_PATH=/opt/cdh3.5.6/oozie-4.0.0-cdh5.3.6/oozie-apps/shell/
## set HIVE_HOME
HIVE_HOME=/opt/cdh3.5.6/hive-0.13.1-cdh5.3.6
$HIVE_HOME/bin/hive -f $SCRIPT_PATH/emp_join_dept.sql
  • emp-join-dept.sql
create table db_1206.tb_join2 
  as 
select 
     e.empno, e.ename, e.job, d.deptno, d.dname 
from 
     db_1206.emp e 
join 
    db_1206.dept d 
on 
   e.deptno = d.deptno ;

*赋予脚本执行权限

chmod u+x oozie-apps/shell/emp-join-demp.sh

在HDFS上创建项目存储目录

/opt/cdh3.5.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -mkdir oozie-apps/shell

上传配置文件及脚本到项目目录

/opt/cdh3.5.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put oozie-apps/shell oozie-apps/shell

运行项目

bin/oozie job --config oozie-apps/shell/job.properties -run 

查看运行结果

Hive Action部署

创建action项目

oozie$:mkdir -p oozie-apps/hive/

拷贝hive-site.xml文件到项目文件夹下

oozie$:cp /opt/cdh3.5.6/hive-0.13.1-cdh5.3.6/conf/hive-site.xml oozie-apps/hive/

创建lib

oozie$:mkdir oozie-apps/hive/lib

导入mysqljar包

oozie$:cp /opt/cdh3.5.6/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar oozie-apps/hive/lib

配置文件

  • job.properties
nameNode=hdfs://hadoop-senior01.grc.com:8020
jobTracker=hadoop-senior01.grc.com:8032
queueName=default
oozieAppsRoot=user/grc/oozie-apps
oozieDataRoot=user/grc/oozie-datas

oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/hive
oozie.use.system.libpath=true
outputDir=hive/output
  • workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.5" name="hive-wf">
<start to="hive-node"/>

<action name="hive-node">
<hive xmlns="uri:oozie:hive-action:0.5">
<job-tracker>${jobTracker}</job-tracker>
    <name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
    </prepare>
    <!-- localtion file hive-site.xml-->
    			<job-xml>${nameNode}/${oozieAppsRoot}/hive/hive-site.xml</job-xml>
    <configuration>
    <property>
    <name>mapred.job.queue.name</name>
    <value>${queueName}</value>
</property>
</configuration>
<script>script.q</script>
<!--output-->
<param>OUTPUT=${nameNode}/${oozieDataRoot}/${outputDir}</param>
</hive>
<ok to="end"/>
<error to="fail"/>
</action>

<kill name="fail">
<message>Hive failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>

Note:修改配置文件的版本号。

  • script.q
insert overwrite directory'${OUTPUT}' 
select * from db_track.track_log limit 5;

创建HDFS文件系统上的目录

oozie$:/opt/cdh3.5.6/hadoop-2.5.0-cdh5.3.6/hdfs dfs -mkdiroozie-apps/hive/

输出目录

oozie$:/opt/cdh3.5.6/hadoop-2.5.0-cdh5.3.6/hdfs dfs -mkdir -p /user/grc/oozie-datas/hive/input

将项目文件上传至HDFS文件系统上

oozie$:/opt/cdh3.5.6/hadoop-2.5.0-cdh5.3.6/hdfs dfs -put oozie-apps/hive/*  oozie-apps/hive/

运行Hive Action

oozie$:bin/oozie job -config oozie-apps/hive/job.properties -run

hue中查看运行结果

MapReduce Action部署

创建项目文件并导入相应的配置文件

mkdir oozie-apps/
cp -r  examples/apps/map-reduce/ oozie-apps/

将wordcountjar包拷贝到项目文件夹下

cp /opt/softwares/mr-wc.jar /opt/cdh3.5.6/oozie-4.0.0-cdh5.3.6/oozie-apps/map-reduce/lib/

说明:下面我们需要配置workflow.xml,里面的属性为方便配置,我们先运行相应的程序,本节我们讲解的是MapReduce Action,运行一个wordcount程序这里不做介绍。我们打开刚才运行的wordcount程序上的历史文件选择configuration,搜索MapReduce的五个阶段:input、mapper、shuffle、reducer、output五个阶段的相关属性。

修改配置文件

  • job.properties ####
nameNode=hdfs://hadoop-senior01.grc.com:8020
jobTracker=hadoop-senior01.grc.com:8032
queueName=default
oozieAppsRoot=user/grc/oozie-apps
oozieDataRoot=user/grc/oozie-datas

oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/map-reduce/workflow.xml
inputDir=map-reduce/input
outputDir=map-reduce/output
  • workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.5" name="mr-wordcount-wf">
<start to="mr-node"/>
<action name="mr-node">
    <map-reduce>
        <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
        <prepare>
            <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapreduce.job.queuename</name>
                    <value>${queueName}</value>
            </property>
            <!--use new mapper /reducer api -->
             <property>
                <name>mapred.mapper.new-api</name>
                <value>true</value>
            </property>
             <property>
                <name>mapred.reducer.new-api</name>
                <value>true</value>
            </property>
            <!-- input mapper shuffle reducer output-->
            <!-- input-->
            <property>
                <name>mapreduce.input.fileinputformat.inputdir</name>
                <value>${nameNode}/${oozieDataRoot}/${inputDir}</value>
            </property>
	<!-- mapper-->
	<property>
            <name>mapreduce.job.map.class</name>
            <value>com.ibeifeng.bigdata.senior.hadoop.mapreduce.WordCountMapReduce$WordCountMapper</value>
            </property>
            <property>
                <name>mapreduce.map.output.key.class</name>
                <value>org.apache.hadoop.io.Text</value>
            </property>
            <property>
                <name>mapreduce.map.output.value.class</name>
                <value>org.apache.hadoop.io.IntWritable</value>
            </property>
            <!--mapper compress-->              
            <property>
                <name>mapreduce.map.output.compress</name>
                <value>true</value>
            </property>
            <property>
                <name>mapreduce.map.output.compress.codec</name>
                <value>org.apache.hadoop.io.compress.SnappyCodec</value>
            </property>

            <!-- reducer-->
            <property>
                <name>mapreduce.job.reduce.class</name>
                <value>com.ibeifeng.bigdata.senior.hadoop.mapreduce.WordCountMapReduce$WordCountReducer</value>
            </property>
            <property>
                    <name>mapreduce.job.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
            </property>
            <property>
                    <name>mapreduce.job.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
            </property>
	    <!-- output-->
            <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>${nameNode}/${oozieDataRoot}/${outputDir}</value>
            </property>
          <!--reducer compress-->
            <property>
                <name>mapreduce.output.fileoutputformat.compress</name>
                <value>true</value>
            </property>
            <property>
                <name>mapreduce.output.fileoutputformat.compress.codec</name>
                <value>org.apache.hadoop.io.compress.SnappyCodec</value>
            </property>

        </configuration>
    </map-reduce>
    <ok to="end"/>
    <error to="fail"/>
</action>
<kill name="fail">
    <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>

NOTE:配置文件workflow.xml中,我们需要选择比较新的版本0.5,同时为了组件的生效,我们还需配置mapred.mapper.new-api、mapred.reducer.new-api两个属性的值都为true,否则,选用新版本配置的文件不生效。

hdfs创建并上传

/opt/cdh3.5.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put oozie-apps oozie-apps

创建数据在HDFS上的存储目录

/opt/cdh3.5.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -mkdir -p  /user/grc/oozie-datas/map-reduce/input

上传wordcount数据到HDFS文件系统上/user/grc/oozie-datas/map-reduce/input文件夹下。

运行程序

bin/oozie job -config oozie-apps/map-reduce/job.properties -run

运行结果



参考地址:

1、http://archive.cloudera.com/cdh5/cdh/5/oozie-4.0.0-cdh5.3.6/WorkflowFunctionalSpec.html#a3.2.2_Map-Reduce_Action

2、http://archive.cloudera.com/cdh5/cdh/5/oozie-4.0.0-cdh5.3.6/DG_HiveActionExtension.html

3、http://archive.cloudera.com/cdh5/cdh/5/oozie-4.0.0-cdh5.3.6/DG_SqoopActionExtension.html

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注