1 案例
1.1 Hello World
first.project
azkaban-flow-version: 2.0
first.flow
nodes:
- name: jobA
type: command
config:
command: echo "hi 大佬"
打包为zip,即可上传
name:job名称
type:job类型。command表示要执行作业的方式为命令。
config:job配置
1.2 作业依赖案例
需求:JobA和Job执行完了,才能执行 JobC
pojo.project
azkaban-flow-version: 2.0
pojo.flow
nodes:
- name: jobC
type: command
# JobC 依赖于 JobA 和 JobB
dependsOn:
- jobA
- jobB
config:
command: echo "hi 大佬"
- name: jobA
type: command
config:
command: echo "开始执行 jobA"
- name: jobB
type: command
config:
command: echo "开始执行 jobB"
dependsOn: 作业依赖
1.3 自动失败重试
需求:如果执行任务失败,需要重试3次,重试时间间隔10000ms
pojo.flow
nodes:
- name: jobA
type: command
config:
command: sh /usr/local/azkaban/test/test.sh
retries: 3
retry.backoff: 10000
retries
: 重试次数
retry.backoff
: 重试的时间间隔
1.4 手动失败重试
需求:JobA->jobF,一次依赖吗,在生产环境,任何job都有可能挂掉,可以根据需求执行想要执行的job。
pojo.flow
nodes:
- name: jobA
type: command
config:
command: echo "执行jobA"
- name: jobB
type: command
dependsOn:
- jobA
config:
command: echo "执行jobB"
- name: jobC
type: command
dependsOn:
- jobB
config:
command: echo "执行jobC"
- name: jobD
type: command
dependsOn:
- jobC
config:
command: echo "执行jobD"
- name: jobE
type: command
dependsOn:
- jobD
config:
command: echo "执行jobE"
- name: jobF
type: command
dependsOn:
- jobE
config:
command: echo "执行jobF"
或者禁止
2 JavaProcess作业案例
JavaProcess类型可以运行一个自定义主类方法,type类型为javaprocess,可用配置为:
Xms: 最小堆
Xmx: 最大堆
classpath: 类路径
java.class: 要运行的java对象,必须包含Main方法
main.args: main方法的参数
案例:
1、新建一个azkaban的maven工程: azkaban_cs
2、创建包名:com.cs
3、创建 AzTest类
package com.cs;
public class AzTest {
public static void main(String[] args) {
System.out.println("java测试AzTest");
}
}
4、然后打包: azkaban_cs-1.0-SNAPSHOT.jar
5、testJava.flow
nodes:
- name: test_java
type: javaprocess
config:
Xms: 96M
Xmx: 200M
java.class: com.cs.AzTest
3 条件工作流
条件工作流功能允许用户自定义执行条件来决定是否运行某些job。条件可以由当前job的父job输出运行时的参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定job执行逻辑时获得更大的灵活性,例如,至于只要父job之一成功,就可以运行当前job。
3.1 运行时参数
1、运行原理
- 父job将参数写入 JOB_OUTPUT_PROP_FILE 环境变量所指向的文件。
- 子job使用 ${jobName:param} 来获取父job输出的参数并定义执行条件。
2、支持的运算符
==、!=、>、<、>=、<=、&& 与、
3、需求案例
- jobA执行一个shell脚本
- jobB执行一个shell脚本,但jobB不需要每天都执行,而只需要每周一执行。
jobA.sh
#!/bin/bash
echo "执行jobA"
wk=`date +%w`
echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
jobB.sh
#!/bin/bash
echo "执行jobB"
condition.flow
nodes:
- name: jobA
type: command
config:
command: sh JobA.sh
- name: jobB
type: command
dependsOn:
- jobA
config:
command: sh jobB.sh
condition: ${jobA:wk} == 1
3.2 预定义宏案例
Azkaban中预置了几个特殊的判断条件,称为预定义宏。
预定义宏会根据所有父job的完成情况进行判断,在决定是否执行。可用的预定义宏入下:
1、all_sucess: 表示父job全部成功才执行(默认)
2、all_done:表示父job全部完成才执行
3、all_failed:表示父job全部失败才执行
4、one_sucess:表示父job至少一个成功才执行
5、one_failed:表示父job至少一个失败才执行
需求:
- jobA执行一个shell脚本
- jobB执行一个shell脚本
- jobC执行一个shell脚本,要求jobA、jobB中有一个成功即可执行。
jobA.sh
#!/bin/bash
echo "执行jobA"
jobB.sh
#!/bin/bash
echo "执行jobB"
jobC.sh
#!/bin/bash
echo "执行jobC"
marco.flow
nodes:
- name: jobA
type: command
config:
command: sh jobA.sh
- name: jobB
type: command
config:
command: sh jobB.sh
- name: jobC
type: command
dependsOn:
- jobA
- jobB
config:
command: sh jobC.sh
condition: one_success
4 定时执行
需求:jobA 每隔1分钟执行一次
使用方法同 cron,具体参考:https://www.couragesteak.com/article/189
5 报警
5.1 邮件
修改配置文件
cd /usr/local/azkaban/azkaban-web-server-3.84.4
vim conf/azkaban.properties
mail.sender=发件人邮箱
mail.host=SMTP服务器地址
mail.user=
mail.password=
重启 web server
cd /usr/local/azkaban/azkaban-web-server-3.84.4
bin/shutdown-web.sh
bin/start-web.sh
5.2 电话告警
通常邮件报警可能会不及时,因此需要电话来及时通知。可与第三方平台进行集成。
5 Azkaban 多 Executor 模式注意事项
Azkaban 多 Executor模式指:在集群中多个节点部署 Executor。在这种模式下, Azkaban web server会根据策略,选取其中一个 Executor 去执行任务。
为确保所选的 Executor 能够准确的执行任务,我们须在一下两种方案任选其一,推荐方案二。
方案一:在特定的 Executor 去执行任务
1、在 MySQL 中 azkaban 数据库 executors 表中,查询 Executor的id。
use azkaban;
select * from executors;
这是任务脚本,可以写为绝对路径。
方案二:在 Executor 所在所有节点部署任务所需的脚本和应用。