Oozie

Table of Contents

1 Overview

#note: 我只是粗略地使用了一下,直觉上觉得这个东西很烂。推荐大家可以考虑一下azkaban

oozie工作方式也是启动一个mapper任务(这个任务启动真正任务)放在cluster上面运行。大致上来看的话需要几个文件:

  • workflow.xml oozie读取它来知道每个任务DAG如何并且失败以及成功之后如何处理。需要提交到hdfs上面。
  • coordinator.xml 如果是coordinator模式的话,还需要这个文件。需要提交到hdfs上面。
  • job.properties 这个用来存放一些任务相关的参数等。这个部分其实可以和workflow.xml放在一起,但是分离出来的话可以方便分离。本地使用。
  • lib 目录下面存放启动启动需要的库文件比如jar或者是so等。需要提交到hdfs上面。

然后我们需要将这写文件提交到hdfs上面,然后使用oozie启动。oozie会提供一个回调url,启动的任务应该会定时向这些url进行汇报状态(回调),或者是 oozie去查询这些任务状态(这些任务应该也会内置httpserver)。(应该是通过回调方式完成 http://localhost/utils/oozie-2.3.2-cdh3u3/docs/WorkflowFunctionalSpec.html#a5_Oozie_Notifications , 并且这里注意到oozie只是使用best-effort方式来做notification)。值得一提的就是oozie也有local-mode方便调试和测试。

oozie执行分为三种模式:

  • workflow. 这种方式非常简单,就是定义DAG来执行。
  • coordinator. workflow缺点非常明显,就是没有办法定时触发或者是条件触发。coordinator可以完成这个需求。coordinator构建在workflow工作方式上面,可以定时运行也可以触发运行。

触发条件非常巧妙,提供一个叫做synchronous dataset的数据集。这个数据集其实就是一个url(文件系统url),不过这个url通过时间来进行区分。比如hdfs://foo:9000/usr/logs/2009/04/15/23/30.

  • bundle. bundle的作用就是将多个coordinator管理起来。这样我们只需要提供一个bundle提交即可。然后可以start/stop/suspend/resume任何coordinator.

下面是在使用时候总结的一些问题:

  • 时区似乎只是支持UTC。对于我来说处在Asia/Shanghai这个timezone,时间是提前8个小时的。这个在使用的时候需要转换一下。
  • oozie调度时间似乎比较怪异(不像是分钟级别,也不像是小时级别,而且现在也没有找到参数配置)。即使对于上面这个小时任务,可以看到create time间隔是5min左右。oozie-coord-create-interval.png
  • oozie支持故障恢复。直接restart之后启动oozie,之前提交的coord job还能够正常执行。

2 Deployment

关于部署的话可以参考这个链接 http://localhost/utils/oozie-2.3.2-cdh3u3/docs/DG_QuickStart.html

  • Expand the Oozie distribution tar.gz
  • Expand the Hadoop distribution tar.gz # 不过如果oozie是从cloudera down下来的话,里面应该是包含了hadoop的,不需要这个步骤
  • Download ExtJS library (it must be version 2.2)
  • bin/oozie-setup.sh -hadoop 0.20.200 ${HADOOP_HOME} -extjs /tmp/ext-2.2.zip # 同样如果是cloudera的话,那么不需要-hadoop这个选项
  • bin/oozie-start.sh # 启动后台程序 bin/oozie-run.sh可以用来作为前台进程启动
  • bin/oozie-stop.sh # 停止后台程序
  • 可以通过bin/oozie admin -oozie http://localhost:11000/oozie -status察看状态。如果安装了extjs的话,也可以通过直接访问 http://localhost:11000/oozie 来看当前状态
  • Expand the oozie-sharelib TAR.GZ file bundled with the distribution. 并且将share目录上传 hadoop fs -put share share
  • Expand the oozie-examples.tar.gz 并且 hadoop fs -put examples examples 可以用来进行一些example测试

提交任务之前先需要将hadoop cluster启动。关于如何搭建single node cluster可以参考 http://localhost/utils/hadoop-0.20.2-cdh3u3/docs/single_node_setup.html 。但是启动之后,默认的nameNode是hdfs://localhost:9000,而默认的jobTracker是localhost:9001。而oozie自带的examples里面不是这样的,所以需要修改过来。


另外一个问题就是oozie提交任务涉及到hadoop权限问题,需要在hadoop的core-site.xml里面添加如下内容:

<property>
  <name>hadoop.proxyuser.dirlt.hosts</name>
  <value>*</value>
</property>
<property>
  <name>hadoop.proxyuser.dirlt.groups</name>
  <value>*</value>
</property>

其中dirlt更换成为用户名称。


有时候我们会碰到下面这个错误信息导致HDFS不能够修改文件(删除文件),因为HDFS处于safe mode

rmr: org.apache.hadoop.dfs.SafeModeException: Cannot delete /user/hadoop/input. Name node is in safe mode

我们可以使用命令强制离开hadoop dfsadmin -safemode leave

3 Workflow

对于workflow来说,最主要关注下面几个部分:

  • node
    • control flow node // 控制流节点,决定这个DAG。
    • action node // 动作节点。#todo: 这里不是很明白streaming和pipe方式之间的差别。
  • parameterization // 参数化,可以获得很多外部状态变量并且进行计算判断。

下面是一些具体细节:

这里给出一个例子配置文件作为说明(这个例子就是oozie homepage里面run example使用的例子 examples/app/map-reduce)。首先是workflow.xml

<!--
  Copyright (c) 2010 Yahoo! Inc. All rights reserved.
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->
<workflow-app xmlns="uri:oozie:workflow:0.1" name="map-reduce-wf">
    <start to="mr-node"/>
    <action name="mr-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.oozie.example.SampleMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.oozie.example.SampleReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/${wf:user()}/${examplesRoot}/input-data/text</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

这是一个map-reduce的action,在prepare阶段将原来的输出文件删除掉,在configuration部分配置了一些参数。${}部分就是parameterization, 这些参数内容都是从job.properties里面得到的。下面看看这个job.properties是如何定义的

nameNode=hdfs://localhost:9000
jobTracker=localhost:9001
queueName=default
examplesRoot=examples

oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce
outputDir=map-reduce

接着使用 oozie job -oozie http://localhost:11000/oozie/ -config job.properties -run 就可以进行提交。提交完成之后就可以得到一个jobID。 接着使用 oozie job -oozie http://localhost:11000/oozie/ -kill ${jobID} 就可以用来将这个job kill掉。 http://localhost:11000/oozie/ 也提供了webconsole的方式来察看所有提交的job运行状况(在workflow jobs这个tab里面),下面是screenshot

oozie-workflow.png

可以察看某个workflow详细信息,以及对应的actions的信息(状态以及失败原因等)

oozie-workflow-action.png

为了方便我编写了下面几个脚本:

  • submit
  • cancel
  • update
#!/bin/bash
oozie job -oozie http://localhost:11000/oozie/ -config job.properties -run

#!/bin/bash
oozie job -oozie http://localhost:11000/oozie/ -kill $@

#!/bin/bash
hadoop fs -rm /user/dirlt/examples/apps/map-reduce/workflow.xml
hadoop fs -put workflow.xml /user/dirlt/examples/apps/map-reduce/

4 Coordinator

对于coordinator来说,有几个比较重要的概念:

sync dataset通常是一个hdfs uri,你可以让uri里面指定date以及time来对应到每一个具体的任务。一旦某个任务完成的话,那么这个hdfs uri就会建立, 并且在先面会存在一个_SUCCESS的文件(当然你也可以指定其他文件名,如果没有指定的话那么就以目录是否存在作为依据),来表示任务完成。各个任务之间可以通过这种方式来做数据流之间的依赖。

#note: 关于coordinator的文档非常少,而且使用起来有诸多不便。比如时区设置难以设置正确,以及在webconsole下面不方便察看killed掉workflow的原因。 另外如果想做一些定制化执行策略的话,还需要通过使用一些workaround的方法才可以OK。

下面看一个使用coordinator的例子(这个例子在 examples/apps/aggregator/ 目录下面)。我们现在需要三个文件

  • coordinator.xml
  • workflow.xml
  • job.properties

和workflow工作方式非常类似,需要.xmlf放到hdfs上面,而job.properties在本地提供一些参数。先看看coordinator.xml

<coordinator-app name="coord" frequency="${coord:hours(1)}"
                 start="${startTime}" end="${endTime}" timezone="${tz}"
                 xmlns="uri:oozie:coordinator:0.1">
  <controls>
    <timeout>-1</timeout>
    <concurrency>2</concurrency>
    <execution>FIFO</execution>
  </controls>

  <datasets>
    <dataset name="ds" frequency="${coord:hours(1)}"
             initial-instance="${dsStartTime}" timezone="${tz}">
      <uri-template>${appPath}/data/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
    </dataset>
  </datasets>

  <input-events>
    <data-in name="input" dataset="ds">
      <instance>${coord:current(-1)}</instance>
    </data-in>
  </input-events>
  <output-events>
    <data-out name="output" dataset="ds">
      <instance>${coord:current(0)}</instance>
    </data-out>
  </output-events>

  <action>
    <workflow>
      <app-path>${appPath}</app-path>
      <configuration>
        <property>
          <name>jobTracker</name>
          <value>${jobTracker}</value>
        </property>
        <property>
          <name>nameNode</name>
          <value>${nameNode}</value>
        </property>
        <property>
          <name>queueName</name>
          <value>${queueName}</value>
        </property>
        <property>
          <name>inputData</name>
          <value>${coord:dataIn('input')}</value>
          <!-- <value>${inputData}</value> -->
        </property>
        <property>
          <name>outputData</name>
          <value>${coord:dataOut('output')}</value>
          <!-- <value>${outputData}</value> -->
        </property>
      </configuration>
    </workflow>
  </action>
</coordinator-app>

其中appPath就是我们之前提到的workflow目录。所以可见coordinator是架在workflow上面的。至于workflow.xml不需要做任何修改。 同样job.properties里面定义也是参数化的内容。不过需要注意的一点就是,这里必须指定oozie.coord.application.path而不是 oozie.wf.application.path.

nameNode=hdfs://localhost:9000
jobTracker=localhost:9001
queueName=default
examplesRoot=examples
tz=UTC
appPath=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce
startTime=2012-07-02T02:24Z
dsStartTime=2012-07-02T01:24Z
endTime=2014-07-02T02:24Z

inputData=${nameNode}/user/${user.name}/${examplesRoot}/input-data/text
oozie.coord.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce

运行和取消方式都和之前的workflow方式没有任何差别。至于察看webconsole内容在coordinator jobs这个tab里面。 比较郁闷的就是,不能够察看每一个action具体的情况,这点是非常不利于调试的。

#note: 不过后来测试了一下之后,发现其实是可以观察coordinator jobs里面对应的workflow的。在coord job info里面对应每个action都有一个ext id

oozie-coord-job-info.png

好比第一个ext id的就是 0001521-120801131630722-oozie-dirl-W。我们可以通过这个ext i的在workflow里面察看

oozie-coord-job-action-info.png

5 Alternatives

azkaban一些说明:

  • azkaban是twitter出的一个任务调度系统。从Quick Start以及Documentation来看的话,确实比oozie要简单很多而且非常直观。
  • 任务之间的依赖,不能够指定部分完成(比如我们希望任务A依赖于B,但是并不是B完全执行完成A才可以启动,而是B的某个阶段完成的话就可以启动A)。
  • 从job描述上面看,系统本身并没有来做cron这见事情,可能是通过外部任务自己cron方式提交来完成cron功能。
  • azkaban对于每一个command单独fork出一个单元来监视这个command完成情况(猜想应该是判断是否返回值为0确定是否成功执行,但是对于MR这样的任务来说的话,如果forker挂掉的话任务本身也依然是在执行的,这样下次重启的话就会重复启动多个任务)。
comments powered by Disqus