Oozie分布式工作流——EL表达式

简介:

oozie支持使用EL(expression language)表达式。

基本的EL常量

  • KB
  • MB
  • GB
  • TB
  • PB

基本EL函数

string firstNotNull(String value1,String value2)

返回第一个不为空的值,如果都为null,则返回null

string concat(String s1,String s2)

拼接两个字符串,如果一个为null,拼接的字符串为空

string replaceAll(String src,String regex,String replacement)

替换正则表达式匹配的位置。如果regex为null,则什么也不做。如果replacement为null,则替换为空串

string appendAll(String src,String append,String delimeter)

把append字符串添加到切分后的字符串中。比如appendAll("a,b,c","123",",")将会返回a123,b123,c123。append为null代表返回空串,delimiter为null,代表什么也不做。

string trim(String s)

给指定的字符串去除空格

String urlEncode(String s)

URL解码

String timestamp()

返回当前的时间戳,并格式化为yyyy-MM-ddTHH:mmZ,到分钟粒度。

String toJsonStr(Map)

把Map返回成json,这在获取前一个action的输出内容时比较有用。比如wf:actionData(String actionName)格式化为json

String toPropertiesStr(Map)

把Map返回成Java Properties

String toConfigurationStr(Map)

把Map返回成Configuration

工作流EL函数

String wf:id()

获取当前工作流节点的id

String wf:name()

获取当前工作流的名称

String wf:appPath()

获取当前工作流workflow.xml所在的目录

String wf:conf(String name)

返回当前工作流的属性值

String wf:user()

返回启动当前工作流的用户

String wf:group()

返回当前工作流的组

String wf:callback(String stateVar)

返回当前工作流的回调,stateVar可以指定成某个状态,也可以传一个参数可以在远程进行替换

String wf:transition(String node)

返回工作流的状态

String wf:lastErrorNode()

返回当前工作流退出的状态

String wf:errorCode(String node)

返回特定node的错误代码

String wf:errorMessage(String message)

返回出错的主要信息

int wf:run()

返回当前工作流任务的标志,0代表正常

Map wf:actionData(String node)

返回指定节点输出的内容,需要配合<capture-output>标签使用

int wf:actionExternalId(String node)

返回节点的外部id

int wf:actionTrakerUri(String node)

返回当前节点的uri

int wf:actionExternalStatus(String node)

返回指定节点的外部状态

Hadoop EL常量

  • RECORDS
  • MAP_IN
  • MAP_OUT
  • REDUCE_IN
  • REDUCE_OUT
  • GROUPS

Hadoop 任务EL函数的例子

<workflow-app xmlns="uri:oozie:workflow:0.2" name="pig-wf">
    <start to="pig-node"/>
    <action name="pig-node">
        <pig>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/pig"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
            </configuration>
            <script>id.pig</script>
            <param>INPUT=/user/${wf:user()}/${examplesRoot}/input-data/text</param>
            <param>OUTPUT=/user/${wf:user()}/${examplesRoot}/output-data/pig</param>
        </pig>
        <ok to="java1"/>
        <error to="fail"/>
    </action>
    <action name="java1">
        <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
               <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <main-class>MyTest</main-class>
            <arg> ${wf:actionData("pig-node")["hadoopJobs"]}</arg>
            <capture-output/>
        </java>
        <ok to="end" />
        <error to="fail" />
    </action>
    <kill name="fail">
        <message>Pig failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

hdfs el方法

boolean fs:exists(String path)

判断指定的URI是否存在

boolean fs:isDir(String path)

判断是否是目录

long fs:dirSize(String path)

返回指定目录下的所有文件的大小。如果不是目录,返回-1。它不支持嵌套,只能返回下面一层的文件大小

long fs:fileSize(String path)

返回指定文件的大小,如果不是文件,返回-1

long fs:blockSize(String path)

返回指定文件占用的block大小。如果不是file,返回-1

本文转自博客园xingoo的博客,原文链接:Oozie分布式工作流——EL表达式,如需转载请自行联系原博主。
相关文章
|
1月前
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
7月前
|
存储 Kubernetes Cloud Native
云原生离线工作流编排利器 -- 分布式工作流 Argo 集群
云原生离线工作流编排利器 -- 分布式工作流 Argo 集群
105248 2
|
7月前
|
数据可视化 Linux 调度
DolphinScheduler【部署 01】分布式可视化工作流任务调度工具DolphinScheduler部署使用实例分享(一篇入门学会使用DolphinScheduler)
DolphinScheduler【部署 01】分布式可视化工作流任务调度工具DolphinScheduler部署使用实例分享(一篇入门学会使用DolphinScheduler)
801 0
springcloud vue 微服务分布式 activiti工作流 前后分离 集成代码生成器 shiro权限
springcloud vue 微服务分布式 activiti工作流 前后分离 集成代码生成器 shiro权限
|
消息中间件 数据可视化 Serverless
|
资源调度 Shell 调度
分布式工作流任务调度系统EasyScheduler自定义任务插件开发
分布式工作流任务调度系统EasyScheduler自定义任务插件开发
5701 0
开源分布式工作流任务调度系统EasyScheduler使用详解
开源分布式工作流任务调度系统EasyScheduler使用详解
4619 0
|
Web App开发 SQL 调度
分布式工作流任务调度系统--Easy Scheduler 1.0.3 发布
Easy Scheduler Release 1.0.3 Easy Scheduler 1.0.3是1.x系列中的第四个版本。 新特性: [EasyScheduler-254] 流程定义删除和批量删除 [EasyScheduler-347] 任务依赖增加“今日” [EasySchedule.
2239 0
|
调度 数据可视化 SQL
分布式工作流任务调度系统Easy Scheduler正式开源
Easy Scheduler是一个分布式工作流任务调度系统,主要解决数据研发ETL错综复杂的依赖关系,而不能直观监控任务健康状态等问题。Easy Scheduler以DAG流式的方式将Task组装起来,可实时监控任务的运行状态,同时支持重试、从指定节点恢复失败、暂停及Kill任务等操作。
12640 0