场景介绍
传统的数据治理厂商(没有采用云计算技术),往往使用 Oracle 作为数仓的存储,使用 Kettle 作为ETL和流程调度工具。依托于 Oracle 的稳定、高效,以及 Kettle 的灵活,传统架构可以胜任各种复杂的场景。他们数据治理的架构简单来说是这样的:
随着云计算技术的不断发展和推广,传统架构正在慢慢淡出市场,但是交付过程中,不可避免的会遇到把云计算和传统厂商相集成的场景。例如,我们使用了阿里云的产品DataWorks,它集成了云数仓ODPS和离线同步工具DataX,整体架构会变成这样:
本文主要介绍的是,在上述的场景下,如何保证 DataX 作业和 Kettle 作业的同步问题。
方案介绍
1、在DataWorks中创建一个虚拟节点(vn_root
),并把该节点设置为“暂停”状态(暂停状态的实例,会在设定的执行时间到达时,转换为“运行失败”状态),然后把所有DataX数据集成任务的上游配置成该节点;
2、使用 Java 封装 DataWorks API,封装类名为ResumeTask
,并输出为 jar 包(active_vn_root.jar
),代码中实现以下流程用来激活DataX作业:
3、把active_vn_root.jar
放入 data-integration\lib 目录后重启 Kettle
4、在原业务流程中增加一个 java 代码节点,源代码如下:
/* 引用 jar 包中的方法 */
import dataworks.ResumeTask;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
/* 只处理第一行 */
if (first) {
first = false;
} else {
setOutputDone();
return false;
}
Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());
/* 捕获上游信号 */
String signal = get(Fields.In, "signal").getString(r);
/* 处理信号 */
if (signal.compareTo("1") == 0) {
logBasic("signal is 1");
/* 节点号 1234 是固定值,不会变*/
int ret = ResumeTask.doResumeTask(1234);
/** ret 返回数字的含义
* 0 成功;
* -1 其他异常(网络异常);
* 1 查询异常,未查询到相关实例;
* 2 查询异常,查询到多个相关实例;
* 3 恢复失败;
* 4 恢复失败,任务为一次性任务;
* 5 恢复失败,任务为空跑任务;
* 6 恢复失败,接口调用返回值异常;
* 7 实例恢复成功,但重跑失败;
* 8 实例恢复成功,但重跑失败,任务并非失败任务,不需要重跑
* 9 实例恢复成功,但重跑失败,接口调用返回值异常;
*/
logBasic("return code is " + ret);
if (ret == 0) {
logBasic("active root node successfully: 调用成功");
get(Fields.Out, "result").setValue(outputRow, "success");
} else {
String err_msg = "UNKOWN ERROR";
switch(ret) {
case -1: err_msg="其他异常(网络异常);";break;
case 1: err_msg="查询异常,未查询到相关实例;";break;
case 2: err_msg="查询异常,查询到多个相关实例;";break;
case 3: err_msg="恢复失败;";break;
case 4: err_msg="恢复失败,任务为一次性任务;";break;
case 5: err_msg="恢复失败,任务为空跑任务;";break;
case 6: err_msg="恢复失败,接口调用返回值异常;";break;
case 7: err_msg="实例恢复成功,但重跑失败;";break;
case 8: err_msg="实例恢复成功,但重跑失败,任务并非失败任务,不需要重跑;";break;
case 9: err_msg="实例恢复成功,但重跑失败,接口调用返回值异常;";break;
}
logBasic("active root node failed: 调用失败," + err_msg);
get(Fields.Out, "result").setValue(outputRow, "fail");
}
} else {
logBasic("signal is not 1, will do nothing: 接收到的信号错误");
get(Fields.Out, "result").setValue(outputRow, "fail");
}
putRow(data.outputRowMeta, outputRow);
setOutputDone();
return false;
}
5、由于DataWorks API属于异步操作(也就是说,调用后即返回,不会等待任务执行完毕),所以不用担心该节点的执行会阻塞整体流程的执行。