`
student_lp
  • 浏览: 428512 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论
阅读更多

1、什么是Oozie

Oozie是一种java web应用程序,它运行在java servlet容器中,并使用数据库来存储一下内容:

      ①工作流定义

      ②当前运行的工作流实例,包括实例的状态和变量

Oozie工作流失放置在控制依赖DAG(有向无环图)中的一组动作(例如:hadoop的Map/Reduce作业、pig作业等),其中指定了动作执行的顺序。我们会使用hPDL(一种xml流程定义语言)来描述这个图。

hPDL是一种很简洁的语言,只会使用少数流程控制和动作节点。控制节点会定义执行的流程,并包含工作流的起点和终点(start、end和fail节点)以及控制工作流执行路径的机制(decision、fork和join节点)。动作节点是一些机制,通过他们工作流会触发执行计算或者处理任务。Oozie为一下类型的动作提供支持:Hadoop map-reduce、hadoop文件系统、pig、java和Oozie的子工作流(ssh动作已经从Oozie schema0.2之后的版本中移除了)。

所有动作节点处罚的计算和处理任务都不在Oozie之中—他们是由hadoop的map/reduce框架执行的。这种方法让oozie可以支持现在的hadoop用于负载均衡、灾难恢复的机制。这些任务主要是异步执行的(只有文件系统动作例外,它是同步处理的)。这意味着对于大多数工作触发的计算或处理任务的类型来说,在工作流操作转换到工作流的下一个节点之前都需要等待,直到计算或处理任务结束了之后才能够继续。Oozie可以通过两种不同的方式来检测计算或处理任务是否完成,也就是回调和轮询。当Oozie启动了计算或处理任务的时候,它会为任务提供唯一的回调URL,然后任务会在完成的时候发送通知给特定的URL。在任务无法触发回调URL的情况下(可能是因为任何原因,比方说网络闪断),或者当任务的类型无法在完成时触发回调URL的时候,Oozie有一种机制,可以对计算或处理任务进行轮询,从而保证能够完成任务。

Oozie工作流可以参数化(在工作流定义中使用像${inputDir}之类的变量)。在提交工作流操作的时候,我们必须提供参数值。如果经过合适地参数化(比方说,使用不同的输出目录),那么多个同样的工作流操作可以并发。

一些工作流是根据需要触发的,但是大多数情况下,我们有必要基于一定的时间段和(或)数据可用性和(或)外部事件来运行它们。Oozie协调系统(Coordinator system)让用户可以基于这些参数来定义工作流执行计划。Oozie协调程序让我们可以以谓词的方式对工作流执行触发器进行建模,那可以指向数据、事件和(或)外部事件。工作流作业会在谓词得到满足的时候启动。

经常我们还需要连接定时运行、但时间间隔不同的工作流操作。多个随后运行的工作流的输出会成为下一个工作流的输入。把这些工作流连接在一起,会让系统把它作为数据应用的管道来引用。Oozie协调程序支持创建这样的数据应用管道。

2、Oozie工作流程定义详解

Oozie工作流程定义是一个DAG图,它由控制流节点或动作节点组成,各个节点又是通过转移的剑线相互连通。对于工作流一般对应存在流程定义语言,大多数都是基于XML定义的,oozie就是基于xml定义的,称为hPDL(hadoop process definition lanaguage).

2.1、工作流生命周期:

状态

含义说明

PREP

一个工作流Job第一次创建将处于PREP状态,表示工作流Job已经定义,但是没有运行。

RUNNING

当一个已经被创建的工作流Job开始执行的时候,就处于RUNNING状态。它不会达到结束状态,只能因为出错而结束,或者被挂起。

SUSPENDED

一个RUNNING状态的工作流Job会变成SUSPENDED状态,而且它会一直处于该状态,除非这个工作流Job被重新开始执行或者被杀死。

SUCCEEDED

当一个RUNNING状态的工作流Job到达了end节点,它就变成了SUCCEEDED最终完成状态。

KILLED

当一个工作流Job处于被创建后的状态,或者处于RUNNING、SUSPENDED状态时,被杀死,则工作流Job的状态变为KILLED状态。

FAILED

当一个工作流Job不可预期的错误失败而终止,就会变成FAILED状态。

上述各种状态存在相应的转移(工作流程因为某些事件,可能从一个状态跳转到另一个状态),其中合法的状态转移有如下几种,如下表所示:

转移前状态

转移后状态集合

未启动

PREP

PREP

RUNNING、KILLED

RUNNING

SUSPENDED、SUCCEEDED、KILLED、FAILED

SUSPENDED

RUNNING、KILLED

明确上述给出的状态转移空间以后,可以根据实际需要更加灵活地来控制工作流Job的运行。

2.2、控制流节点

工作流程定义中,控制工作流的开始和结束,以及工作流job的执行路径的几点,它定义了流程的开始(start节点)和结束(end节点和kill节点),同时提供了一种控制流程执行路径的机制(decision决策节点、fork分支节点、join会签节点)。通过上面提到的各种节点,我们大概应该能够知道他们的工作流中起着怎样的作用。下面是各节点的语法格式:

2.2.1、start节点

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">

     ...

     <start to="[NODE-NAME]" />

     ...

</workflow-app>

上面start元素的to属性,指向第一个要执行的工作流节点。

2.2.2、end节点

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">

     ...

     <end name="[NODE-NAME]" />

     ...

</workflow-app>

达到该节点,工作流job会变成success状态,表示成功完成。需要注意的是,一个工作流定义必须只能有一个end节点。

2.2.3、kill节点

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">

     ...

     <kill name="[NODE-NAME]">

         <message>[MESSAGE-TO-LOG]</message>

     </kill>

     ...

</workflow-app>

Kill元素的name属性,是要杀死的工作流节点的名称,message元素指定了工作流节点被杀死的备注信息。达到该节点,工作流job会变成状态KILLED。

2.2.4、decision节点

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">

     ...

     <decision name="[NODE-NAME]">

         <switch>

              <case to="[NODE_NAME]">[PREDICATE]</case>

              ...

              <case to="[NODE_NAME]">[PREDICATE]</case>

              <default to="[NODE_NAME]" />

         </switch>

     </decision>

     ...

</workflow-app>

Decision节点通过预定义一组条件,当工作流job执行到该节点时,会根据其中的条件进行判断选择,满足条件的路径将被执行。Decision节点通过switch…case语法来进行路径选择,只要有满足条件的判断,就会执行对应的路径,如果没有可以配置default元素指向的节点。

2.2.5、fork节点和join节点

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">

     ...

     <fork name="[FORK-NODE-NAME]">

         <path start="[NODE-NAME]" />

         ...

         <path start="[NODE-NAME]" />

     </fork>

     ...

     <join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />

     ...

</workflow-app>

Fork元素下面会有多个path元素,指定了可以并发执行的多个执行路径。Fork中多个并发执行路径会在join节点的位置会和,只有所有的路径都到达后,才会继续执行join节点。

2.3、动作节点(Action Nodes)

工作流程定义中,能够触发一个计算任务或者处理任务执行的节点。所有的动作(Action)都有一些基本的特性,如下:

*远程执行:对于Oozie来说动作节点的执行都是远程的,因为Oozie可能部署在一个单独的服务器上,而工作流job实在hadoop集群的节点上执行的。即使Oozie在hadoop集群的某个节点上,它也处于与hadoop进行独立无关的JVM实例中(Oozie部署在Servlet容器当中)

*异步性:动作节点的执行,对于Oozie来说是异步的。Oozie启动一个工作流Job,这个工作流Job便开始执行。Oozie可以通过两种方式来探测工作流Job的执行情况:一种是基于回调机制,对每个任务的执行(可以看成是动作节点的执行)都对应一个唯一的URL,如果任务执行结束或者执行失败,会通过回调这个URL通知Oozie已经完成;另一种就是轮询,Oozie不停地去查询任务执行的完成状态,如果由于网络故障回调机制失败,也会使用轮询的方式来处理。

*执行结果要么成功,要么失败:如果动作节点执行成功,则会转向ok节点;如果失败则会转向error节点。

*可恢复性:如果一个动作节点执行失败,Oozie提供了一些恢复执行的策略,这个要根据失败的特点来进行:如果是状态转移过程中失败,Oozie会根据指定的重试时间间隔去重新执行;如果不是转移性质的失败,则只能通过手工干预来进行恢复;如果重试恢复执行都没有解决问题,则最终会跳转到error节点。

下面详解Oozie内置支持的动作节点类型,如下:

2.3.1、FS动作

FS动作主要是基于HDFS的一些基本操作,如删除路径、创建路径、移动文件、设置文件全乡等等。语法格式:

2.3.2、Map-Reduce动作

map-reduce动作会在工作流Job中启动一个MapReduce Job任务运行,我们可以详细配置这个MapReduce Job。另外,可以通过map-reduce元素的子元素来配置一些其他的任务,如streaming、pipes、file、archive等等。下面给出包含这些内容的语法格式说明:

2.3.3、Hive动作

Hive主要是基于类似SQL的HQL语言的,它能够方便地操作HDFS中数据,实现对海量数据的分析工作。Hive动作的语法格式如下所示:

2.3.4、Sqoop动作

Sqoop是一个能够在Hadoop和结构化存储系统之间进行数据的导入导出的工具,Sqoop动作的语法格式如下:

2.3.5、Pig动作

pig动作可以启动运行pig脚本实现的Job,在工作流定义中配置的语法格式说明如下:

2.3.6、SSH动作

该动作主要是通过ssh登录到一台主机,能够执行一组shell命令,它在Oozie schema 0.2中已经被删除。语法格式:

2.3.7、Java动作

Java动作是执行一个具有main入口方法的应用程序,在Oozie工作流定义中,会作为一个MapReduce Job执行,这个Job只有一个Map任务。我们需要指定NameNode、JobTracker的信息,还有配置一个Java应用程序的JVM选项参数(java-opts),以及传给主函数(arg)。语法格式:

2.3.8、Sub-workflow动作

Sub-workflow动作是一个子流程的动作,主流程执行过程中,遇到子流程节点执行时,会一直等待子流程节点执行完成后,才能继续跳转到下一个要执行的节点。语法格式:

2.3.9、Shell动作

Shell动作可以执行Shell命令,并通过配置命令所需要的参数。它的语法规则:

      2.4、表达式语言函数

Oozie除了可以使用Properties文件定义一些属性之外,还提供了一些内置的EL函数,能够方便地实现流程的定义和控制,下面我们分组列表说明:

2.4.1、基本EL常量

常量名称

含义说明

KB

1KB,类型为long。

MB

1MB,类型为long。

GB

1GB,类型为long。

TB

1TB,类型为long。

PB

1PB,类型为long。

2.4.2、基本EL函数

函数声明

含义说明

String firstNotNull(String value1, String value2)

返回value1和value2中不为null的值,若都为null则返回null

String concat(String s1, String s2)

连接字符串s1和s2,如果s1或s2为null值,则使用空字符串替换null值

String replaceAll(String src, String regex, String replacement)

满足正则表达式regex,则使用replace替换src字符串中匹配上的部分

String appendAll(String src, String append, String delimeter)

将src中的分隔符delimeter替换为append

String trim(String s)

去掉字符串两边的空格,如果s为null则返回空字符串

String urlEncode(String s)

对字符串s使用URL UTF-8进行编码

String timestamp()

返回UTC当前时间字符串,格式为YYYY-MM-DDThh:mm:ss.sZ

String toJsonStr(Map)

Oozie 3.3支持,将Map转转成一个XML编码的JSON表示形式

String toPropertiesStr(Map)

Oozie 3.3支持,将Map转转成一个XML编码的Properties表示形式

String toConfigurationStr(Map)

Oozie 3.3支持,将Map转转成一个XML编码的Configuration表示形式

2.4.3、工作流EL函数

函数声明

含义说明

String wf:id()

返回当前的工作流Job的ID

String wf:name()

返回当前的工作流Job的名称

String wf:appPath()

返回当前的工作流Job的应用路径

String wf:conf(String name)

返回当前的工作流Job的配置属性

String wf:user()

返回启动当前的工作流Job的用户名称

String wf:group()

返回当前的工作流Job的的用户组名称

String wf:callback(String stateVar)

返回当前的工作流Job的当前动作节点的回调URL

String wf:transition(String node)

返回转移节点,该节点是一个工作流动作节点触发的

String wf:lastErrorNode()

返回最后一个以ERROR状态退出的节点名称

String wf:errorCode(String node)

返回指定动作节点执行的错误码,如果没有则返回空

String wf:errorMessage(String message)

返回指定动作节点执行的错误信息,如果没有则返回空

int wf:run()

返回当前工作流Job的运行编号,正常的话返回0,如果执行过re-run则返回非0

Map wf:actionData(String node)

返回当前动作节点完成时输出的信息

int wf:actionExternalId(String node)

返回动作节点的外部ID

int wf:actionTrackerUri(String node)

返回跟踪一个动作节点的URI

int wf:actionExternalStatus(String node)

返回一个动作节点的状态

2.4.4、Hadoop EL常量

常量名称

含义说明

RECORDS

Hadoop Record计数器组名称

MAP_IN

Hadoop Mapper输入Record计数器名称

MAP_OUT

Hadoop Mapper输出Record计数器名称

REDUCE_IN

Hadoop Reducer输入Record计数器名称

REDUCE_OUT

HadoopReducer输出Record计数器名称

GROUPS

1024 * Hadoop Mapper/Reducer输入Record组计数器名称

2.4.5、Hadoop EL函数

函数声明

含义说明

Map < String, Map > hadoop:counters(String node)

返回工作流Job某个动作节点的统计计数器信息,例如,MR的动作统计集合内容:
{
“ACTION_TYPE”: “MAP_REDUCE”,
“org.apache.hadoop.mapred.JobInProgress$Counter”: {
“TOTAL_LAUNCHED_REDUCES”: 1,
“TOTAL_LAUNCHED_MAPS”: 1,
“DATA_LOCAL_MAPS”: 1
},
“FileSystemCounters”: {
“FILE_BYTES_READ”: 1746,
“HDFS_BYTES_READ”: 1409,
“FILE_BYTES_WRITTEN”: 3524,
“HDFS_BYTES_WRITTEN”: 1547
},
“org.apache.hadoop.mapred.Task$Counter”: {
“REDUCE_INPUT_GROUPS”: 33,
“COMBINE_OUTPUT_RECORDS”: 0,
“MAP_INPUT_RECORDS”: 33,
“REDUCE_SHUFFLE_BYTES”: 0,
“REDUCE_OUTPUT_RECORDS”: 33,
“SPILLED_RECORDS”: 66,
“MAP_OUTPUT_BYTES”: 1674,
“MAP_INPUT_BYTES”: 1409,
“MAP_OUTPUT_RECORDS”: 33,
“COMBINE_INPUT_RECORDS”: 0,
“REDUCE_INPUT_RECORDS”: 33
}
}
则${hadoop:counters(“mr-node”)["FileSystemCounters"]["FILE_BYTES_READ"]},得到名称为mr-node的动作节点组的FILE_BYTES_READ计数器的值

2.4.6、HDFS EL函数

选项

含义说明

boolean fs:exists(String path)

path是否存在

boolean fs:isDir(String path)

path是否是目录

long fs:dirSize(String path)

如果path不是目录或者path是一个文件,则返回-1,否则返回该path下所有文件的字节数

long fs:fileSize(String path)

如果path是目录,则返回-1,否则返回该path下所有文件的字节数

long fs:blockSize(String path)

如果path不是文件或者不存在则返回-1,否则返回文件的块大小字节数

3、Oozie hive action项目实例

在我们做一个简单的hive action 的过程中,需要两个文件,一个是workflow.xml和script.q文件。分别如下:

3.1、Workflow.xml文件如下

3.2、script.q文件如下

3.3、注意事项

在我们编写oozie的工作流程中,通过官网实例我们知道,在编写hive action的workflow.xml的过程中需要指定hive-site.xml文件位置。指定方式如下:

但是在实际操作过程中,我们发现即使指定了hive-site.xml的位置,但是配置文件中的配置信息并没有起作用。这个问题还在查找过程中。

当前的解决方案,就是把必要的配置信息,直接写到workflow.xml中,这样在使用的过程中就可以避免配置项不起作用的问题。

3.4、通过http方式提交定时任务

通过http的方式可以向oozie提交工作流。但是需要注意的是,我们的工作流所需要的工作环境(以hive action为例,所需文件有:script.q和workflow.xml)需要提前初始化好,这样通过http方式直接通知oozie服务器端,运行该工作流即可,下面是一个详细的例子说明(以hive action为例):

3.4.1、workspaces环境初始化,即所需要的文件如下

1、workflow.xml文件:

2、script.q文件:

在实际的项目中,虽然要求工作环境需要提前准备好,但是在实际操作中可以非常灵活的处理,比如我们需要执行的hql需要根据上面的程序动态生成。这是我们只需要把hql语句写入到script.q文件中,然后把文件重新上传到hdfs工作目录中就可以正常工作了。

3.4.2、通过http协议,通知oozie执行工作流

1、需要传递的config.xml配置工作流所需的环境

    2、需要执行的shell脚本(run.sh):

注意:在run.sh脚本中,需要传递的config.xml文件需要采用绝对路径,负责run.sh脚本中的指令找不到该文件。

再用程序的实现过程中,我们可以通过个编程语言提供的curl库动态的实现。还有一个简单的方法就是通过各种语言,启动一个管道执行shell指令,完成执行。

3.4.3、通过python重构后的代码

1、workflow.xml文件内容:

2、script.q文件

3、python代码


4、配置信息:


4、Coordinator工作流协调者(定时任务)

         Oozie所支持的工作流为:通过在workflow中定义将多个action,并按照一定的顺序组织起来,然后作为一个整体按照既定的顺序和配置逐一运行。一个工作流一旦定义,通过启动该工作流job,就会执行该工作流中所包含的的多个action,直到完成,这就是工作流job的生命周期。

         如果有一个工作流job,希望每天半夜00:00启动运行,我们能够想到的就是通过编写一个定时任务脚本来调度程序运行。如果有有多个工作流job,使用crontab的方式调用可能需要编写大量的脚本,还要通过脚本来控制好每个工作流job的执行时序问题,不但脚本不好维护,而且监控也不太方便。基于这样的背景,oozie提出了Coordinator的概念,将每一个工作流job作为一个动作(Action)来运行,相当于工作流定义中的一个执行节点,这样就能够将多个工作流job组织起来,称为coordinator job。可以指定触发时间和频率,还可以配置数据集、并发数等。一个coordinator job包含在job外部设置执行周期和频率的语义,类似在工作流外部增加了一个协调器来管理这些工作流的运行。

4.1、通过简例初识oozie coordinator(协调器)

         在官方发行的包中自带了一个简单的例子,它能够实现定时调度一个工作流job运行,这个例子中给出的一个空的工作流job,并通过coordinator系统系统调度起来。例子中3个配置文件,分别如下:

    1、job.properties配置:

    2、workflow.xml配置:

         这是一个空的job,没有做任何修改。

    3、coordinator.xml配置

         通过修改coordinator配置文件,将定时任务调度频率改为2分钟,然后需要将他们上传到hdfs上;需要上传的文件有workflow.xml和coordinator.xml,而job.properties配置可以通过指定config选项类执行。启动一个Coordinator job可以执行如下指令:

         运行上面的命令后,在控制台上会返回这个job id,然后我们可以通过控制台查看该job id工作流执行的状态。

         如果想要杀死一个job,需要指定oozie的job id,可以执行如下命令:

4.2、详解Coordinator应用定义

         一个同步的Coordinator应用定义的语法格式,如下所示:

         基于上面的语法格式,我们分别说明一下对应元素的含义,如下所示:

4.2.1、Control元素

         Control元素定义了一个Coordinator job的控制信息,主要包含如下三个配置元素:

元素名称

含义说明

timeout

超时时间,单位为分钟。当一个Coordinator Job启动的时候,会初始化多个Coordinator动作,timeout用来限制这个初始化过程。默认值为-1,表示永远不超时,如果为0 则总是超时。

concurrency

并发数,指多个Coordinator Job并发执行,默认值为1。

execution

配置多个Coordinator Job并发执行的策略:默认是FIFO。另外还有两种:LIFO(最新的先执行)、LAST_ONLY(只执行最新的Coordinator Job,其它的全部丢弃)。

throttle

一个Coordinator Job初始化时,允许Coordinator动作处于WAITING状态的最大数量。

4.2.2、DataSet元素

         Coordinator job中有一个dataset的概念,他可以为实际计算提供计算的数据,主要是指HDFS上的数据目录和文件,能够配置数据集生成的频率(Frequency)、URI模板、时间等信息,下面看一下dataset的语法格式:

         举例如下:

         上面会每天生成一个用户事件表,可以供hive查询分析,这里指定了这二个数据集的位置,后续计算会使用这部分数据。其中uri-template指定了一个匹配的模板,满足这个模板的路径都会被作为计算的基础数据。

         另外,还有一种定义dataset集合的方式,将多个dataset合并成一个组来定义,语法格式如下:

         注意:通过实践得知,dataset主要是为后面的input-events和output-events服务的;在dataSet中定义了数据模板,通过这个模板,可以将其配置为目录,也可以将其配置为文件路径,有用户自己选择。但是,dataSet配置的模板路径,dataSet是不会自动生成的,使用的前提是,配置的模板路径已经存在,否则,后面的程序不会运行。

4.2.3、input-events和output-events元素

         一个Coordinator应用的数据事件指定了要执行一个Coordinator动作必须满足的输入条件,在Oozie当前版本,只支持使用dataset实例。

         一个Coordinator动作可能会生成一个或者多个dataset实例,在oozie当前版本中,输出事件只支持输出dataset实例。

         注意:配置的这些元素为后面的action中服务,而且实际环境中必须满足这些输入输出条件。

4.2.4、有关Frequently中用到的EL常量

常量表示形式

含义说明

${coord:minutes(int n)}

返回日期时间:从一开始,周期执行n分钟

${coord:hours(int n)}

返回日期时间:从一开始,周期执行n * 60分钟

${coord:days(int n)}

返回日期时间:从一开始,周期执行n * 24 * 60分钟

${coord:months(int n)}

返回日期时间:从一开始,周期执行n * M * 24 * 60分钟(M表示一个月的天数)

${coord:endOfDays(int n)}

返回日期时间:从当天的最晚时间(即下一天)开始,周期执行n * 24 * 60分钟

${coord:endOfMonths(1)}

返回日期时间:从当月的最晚时间开始(即下个月初),周期执行n * 24 * 60分钟

${coord:current(int n)}

返回日期时间:从一个Coordinator动作(Action)创建时开始计算,第n个dataset实例执行时间

${coord:dataIn(String name)}

在输入事件(input-events)中,解析dataset实例包含的所有的URI

${coord:dataOut(String name)}

在输出事件(output-events)中,解析dataset实例包含的所有的URI

${coord:offset(int n, String timeUnit)}

表示时间偏移,如果一个Coordinator动作创建时间为T,n为正数表示向时刻T之后偏移,n为负数向向时刻T之前偏移,timeUnit表示时间单位(选项有MINUTE、HOUR、DAY、MONTH、YEAR)

${coord:hoursInDay(int n)}

指定的第n天的小时数,n>0表示向后数第n天的小时数,n=0表示当天小时数,n<0表示向前数第n天的小时数

${coord:daysInMonth(int n)}

指定的第n个月的天数,n>0表示向后数第n个月的天数,n=0表示当月的天数,n<0表示向前数第n个月的天数

${coord:tzOffset()}

ataset对应的时区与Coordinator Job的时区所差的分钟数

${coord:latest(int n)}

最近以来,当前可以用的第n个dataset实例

${coord:future(int n, int limit)}

当前时间之后的dataset实例,n>=0,当n=0时表示立即可用的dataset实例,limit表示dataset实例的个数

${coord:nominalTime()}

nominal时间等于Coordinator Job启动时间,加上多个Coordinator Job的频率所得到的日期时间。例如:start=”2009-01-01T24:00Z”,end=”2009-12-31T24:00Z”,frequency=”${coord:days(1)}”,frequency=”${coord:days(1)},则nominal时间为:2009-01-02T00:00Z、2009-01-03T00:00Z、2009-01-04T00:00Z、…、2010-01-01T00:00Z

${coord:actualTime()}

Coordinator动作的实际创建时间。例如:start=”2011-05-01T24:00Z”,end=”2011-12-31T24:00Z”,frequency=”${coord:days(1)}”,则实际时间为:2011-05-01,2011-05-02,2011-05-03,…,2011-12-31

${coord:user()}

启动当前Coordinator Job的用户名称

${coord:dateOffset(String baseDate, int instance, String timeUnit)}

计算新的日期时间的公式:newDate = baseDate + instance * timeUnit,如:baseDate=’2009-01-01T00:00Z’,instance=’2′,timeUnit=’MONTH’,则计算得到的新的日期时间为’2009-03-01T00:00Z’。

${coord:formatTime(String timeStamp, String format)}

格式化时间字符串,format指定模式

4.2.5、配置举例

         下面是官网给出的例子,进行说明,配置例子如下:

         名称为logs的dataset实例频率为1天,它配置的初始实例时间为2009-01-07T24:00Z,则在input-events输入事件中开始实例(start-instance)时间为6天前,即2009-01-01T24:00Z,结束实例(end-instance)时间为当天时间。

         后边定义了action,其中${coord:dataln(‘input’)}表示解析名称为input的输入时间所关联的URI(即HDFS上的文件或目录)

4.3、工作实例

4.3.1、workflow.xml配置如下:

4.3.1、workflow.properties配置如下:

4.3.2、script.pig如下:

4.3.3、script.q如下:

4.3.4、coordinator.xml配置如下:

4.3.5、coordinator.properties配置如下:

 

  • 大小: 42 KB
  • 大小: 11.7 KB
  • 大小: 55 KB
  • 大小: 60.3 KB
  • 大小: 8.6 KB
  • 大小: 34.1 KB
  • 大小: 82 KB
  • 大小: 50.5 KB
  • 大小: 62.1 KB
  • 大小: 60.2 KB
  • 大小: 9.4 KB
  • 大小: 927 Bytes
  • 大小: 7.9 KB
  • 大小: 9.7 KB
  • 大小: 5 KB
  • 大小: 1.1 KB
  • 大小: 11.9 KB
  • 大小: 2.6 KB
  • 大小: 32.5 KB
  • 大小: 2.8 KB
  • 大小: 34.1 KB
  • 大小: 5.5 KB
  • 大小: 28.4 KB
  • 大小: 11.9 KB
  • 大小: 55.4 KB
  • 大小: 16.6 KB
  • 大小: 8.5 KB
  • 大小: 131.1 KB
  • 大小: 18 KB
  • 大小: 23.3 KB
  • 大小: 26 KB
  • 大小: 128.2 KB
  • 大小: 169.6 KB
  • 大小: 3.5 KB
  • 大小: 8.6 KB
  • 大小: 2.4 KB
  • 大小: 31 KB
  • 大小: 5.5 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics