参数传递是调度字体工作流运行时非常重要的一部分,工作流的执行,单个作业的执行,多个工作流之间的依赖执行,历史任务重算,都涉及到参数传递和同步。
1 参数类型综述
azkaban的工作流中的参数可以分为如下几个类型:
- Azkaban UI 页面输入参数
- 环境变量参数
- job作业文件中定义的参数
- 工作流的用户定义的属性文件,上游作业传递给下游的参数
- 工作流运行时产生的系统参数
- job的common参数
参数类型与其对应的参数范围如下:
参数类型 | 作用域 |
UI 页面输入参数 ,即工作流参数 | flow全局有效 |
工作流ZIP压缩包中的属性文件(.properties结尾) | flow全局有效,zip文件目录以及子目录有效 |
工作流运行时参数 | flow全局有效 |
环境变量参数 | flow全局有效 |
job的common参数 | job内局部有效 |
JOB文件中定义的参数 | job内局部有效 |
上游作业传递给下游的参数 | job内局部有效 |
2. job 参数简介commom参数
除了type,command,dependencies三个参数外,还有如下一些保留参数可以为每个job配置
参数 | 说明 |
retries | 失败的job的自动重试的次数 |
retry.backoff | 重试的间隔(毫秒) |
working.dir | 指定命令被调用的目录。默认的working目录是executions/${execution_ID}目录 |
env.property | 指定在命令执行前需设置的环境变量。Property定义环境变量的名称,因此 env.VAR_NAME=VALUE就创建了一个$VAR_NAME环境变量,并且指定了它的VALUE |
failure.emails | job失败时发送的邮箱,用逗号隔开 |
success.emails | job成功时发送的邮箱,用逗号隔开 |
notify.emails | job成功或失败都发送的邮箱,用逗号隔开 |
一个flow的email属性,只会取最后一个job的配置,其他的job的email配置将会被忽略。
3. job之间的参数传递
先看官网的描述:
Parameter Passing
There is often a desire to pass these parameters to the executing job code. The method of passing these parameters is dependent on the jobtype that is run, but usually Azkaban writes these parameters to a temporary file that is readable by the job.
The path of the file is set in JOB_PROP_FILE environment variable. The format is the same key value pair property files. Certain built-in job types do this automatically for you. The java type, for instance, will invoke your Runnable and given a proper constructor, Azkaban can pass parameters to your code automatically.
Parameter Output
Properties can be exported to be passed to its dependencies. A second environment variable JOB_OUTPUT_PROP_FILE is set by Azkaban. If a job writes a file to that path, Azkaban will read this file and then pass the output to the next jobs in the flow.
The output file should be in json format. Certain built-in job types can handle this automatically, such as the java type.
意思是:JOB_OUTPUT_PROP_FILE和JOB_PROP_FILE都是一个环境变量,指向文件路径。
参数传入:
上游节点把需要输出的值以json的格式写入JOB_OUTPUT_PROP_FILE文件,azkaban以job执行过程中,上游job传递进来的临时参数,运行时参数,项目中配置文件的参数,job定义中参数等 都保存在 ${JOB_PROP_FILE}文件中,保存格式为key=value。执行job的中shell命令时,可以作为参数传递。
参数传出:
一个azkaban job执行结束,可以将一些参数写入到${JOB_OUTPUT_PROP_FILE}文件 中,azkaban会将这些参数传递到下游依赖的的job的参数文件${JOB_PROP_FILE}文件中,供下游job引用。写入到${JOB_OUTPUT_PROP_FILE}文件中参数需要是json格式的,否则会报json解析错。下游节点就可以在JOB_PROP_FILE中看到key-value形式的输出,用${key}的方式使用变量。
举例:
baseflow.flow
#baseflow.flow nodes: - name: jobB type: command dependsOn: - jobA config: command: sh commandB.sh "${firstName}" - name: jobA type: command config: command: sh commandA.sh
commandA.sh
#!/bin/bash echo '{ "firstName":"John" , "lastName":"Doe" }' >> ${JOB_OUTPUT_PROP_FILE}
commandB.sh
#!/bin/bash cat ${JOB_PROP_FILE} >> /root/azkaban.txt echo $1 >> /root/azkaban.txt
jobB依赖JobA,jobA执行完成后,会一串json内容到${JOB_OUTPUT_PROP_FILE}指向的文件中,JobA执行完成后,jobB才可以执行,等job执行时,会将jobA输出的内容写入到/root/azkaban.txt,并追加参数中的firstName写入到文件中,注意第一个参数只能通过shell调用的方式来传递。
4 job参数之runtime属性
runtime属性是在job运行期间自动被添加的
参数 |
说明 |
azkaban.job.attempt |
job重试次数,从0开始增加 |
azkaban.job.id |
运行的job name |
azkaban.flow.flowid |
运行的job的flow name |
azkaban.flow.execid |
flow的执行id |
azkaban.flow.projectid |
工程id |
azkaban.flow.projectversion |
project上传的版本 |
azkaban.flow.uuid |
flow uuid |
azkaban.flow.start.timestamp |
flow start的时间戳 |
azkaban.flow.start.year |
flow start的年份 |
azkaban.flow.start.month |
flow start 的月份 |
azkaban.flow.start.day |
flow start 的天 |
azkaban.flow.start.hour |
flow start的小时 |
azkaban.flow.start.minute |
start 分钟 |
azkaban.flow.start.second |
start 秒 |
azkaban.flow.start.millseconds |
start的毫秒 |
azkaban.flow.start.timezone |
start 的时区 |
5 job参数之参数继承
后缀为.properties的文件将会作为参数文件加载,并且为flow中每个job所共享,属性文件通过目录分层结构继承。
比如,在zip包中有以下结构
system.properties baz.job myflow/myflow.properties myflow/myflow2.properties myflow/foo.job myflow/bar.job
system.properties是全局的属性,将会被baz.job和myflow目录下的foo.job和bar.job使用,但是baz.job不会继承myflow.properties和myflow2.properties的属性,因为是它的下层.
6 job参数之参数替换
azkaban支持参数替换;替换参数样式: azkaban会替换{}中的参数。无论${parameterName} 在job file中或者在参数文件中或者运行时参数发现,都可以被替换为对应的值。
shared.properties
# shared.properties replaceparameter=bar
myjob.job
# myjob.job param1=mytest foo=${replaceparameter} #${replaceparameter}会替换为bar param2=${param1} # ${param1} 会被替换成mytest。
前面这个例子,在myjob 作业运行前,foo 会被赋值为bar , param2会被赋值为mytest.
注意:参数名不能有空格,标点符号等。
7 shell动态传参
azkaban中的shell 作业,如何接收从webUI传递的参数?
7.1 UI页面输入参数定义
ui_test=test111111111
7.2 在job文件myjob.job指定
##作业定义文件UI输入参数接收: job_param4=${ui_test} ##作业定义文件脚本命令行引用UI输入参数: sh test_azkaban_job.sh "${job_param4}"
7.3 shell test_azkaban_job.sh 的内容
vim test_azkaban_job.sh
echo "inputparamter:$1" #接收job文件中传递的参数。
FAQ1:在页面手动执行前面的job时,如果UI参数ui_test在job执行没有输入,会执行失败。异常信息如下:
hello ERROR - Failed to build job executor for job hello Could not find variable substitution for variable(s) [param4->ui_test ]
在定时调度任务指定时,需要指定工作流参数flowParameters :ui_test,避免该错误。
7.4 shell中使用参数的注意事项
在UI页面重新输入运行时参数时,可以覆盖系统默认生成的参数值。运行时参数,和UI输入的参数,都可以认为是全局参数,在整个工作流的作业配置中,都可以通过 ${参数名} 的方式引用使用。
- 在shell 中直接引用 公共参数,运行时系统参数,UI输入参数,是无效的。
- 在shell中只能直接使用环境变量;
- 公共参数,运行时系统参数,UI输入参数能只通过shell的脚本参数的方式传递进来。
- job文件中定义的环境变量参数,可以在shell脚本中直接引用,但只对当前job有效。
8 reference
1. https://www.cnblogs.com/chenmingjun/p/10506488.html