在企业的数据处理流程中,可能会有以下这样的场景: 存在多个不同的数据平台,比如使用 Dataphin 作为核心的数据仓库处理平台,另外有一个上游数据采集系统,使用 unix 的 crontab 定时调度。采集系统每天完成某个数据的采集完成时间,会由于上游数据量的原因而不稳定。当采集完成时,需要有一种手段可以告知 Dataphin,以便于 Dataphin 的集成系统开始拉取数据。
比较常见的解决方案是,上游采集系统在数据准备好之后,在某一个公共的位置写下一个完成标记,Dataphin 通过轮询的方式检测该完成标记是否已生成。这种方案存在以下问题:
- 存放标记的公共位置比较难管理,常用的主要有某个数据库或文件系统,有时不一定存在一个双方都容易访问的公共位置服务。
- Dataphin 的轮询任务有时会长时间运行,占用系统资源。
Dataphin v4.0 引入了全新的计算任务类型--触发式节点,可以作为另一种跨系统调度的选择。
1. 触发式节点基本特点
触发式节点任务与其他计算任务比较相似,特别的地方是:
- 触发式节点任务的周期实例进入运行状态需要满足
- 当前系统时间大于或等于实例的定时运行时间,与其他计算任务一致
- 所有上游节点已成功,与其他计算任务一致
- 已收到外部触发请求,此为触发式节点独有的运行条件
- 触发式节点任务的补数据实例,不依赖外部触发请求,与普通计算任务一致
2. 触发式节点使用条件
使用外部触发节点,需要满足以下条件:
- 已开通 Dataphin OpenAPI 功能。
- 发起触发请求的环境与 Dataphin 必须网络可连通。
3. 触发式节点使用示例
- 创建触发式节点,与普通虚拟节点流程一致。提交发布到生产环境。
- 在生产运维-周期实例中,可以看到触发式节点除了运行时间和内部上游依赖外,还会等待外部触发。
- 在外部触发,本例使用本地的程序模拟外部触发。
- 下载 OpenAPI 的 SDK
- 将 SDK 加入到 classpath,编写触发程序,示例如下
package org.sample; import com.aliyuncs.dataphin.DataphinAcsClient; import com.aliyuncs.dataphin.model.v20200830.sop.node.RunTriggerNodeRequest; import com.aliyuncs.dataphin.model.v20200830.sop.node.RunTriggerNodeResponse; import com.aliyuncs.exceptions.ClientException; import java.util.HashMap; public class RunTriggerNodeTest { static class ArgumentParser { public HashMap<String, String> named_arguments = new HashMap<>(); public ArgumentParser(String[] args) { for (String s : args) { String[] split = s.split("="); this.named_arguments.put(split[0], split[1]); } } public HashMap<String, String> getNamedArguments() { return this.named_arguments; } } public static void main(String[] args) throws ClientException { ArgumentParser arg = new ArgumentParser(args); HashMap<String, String> named_arguments = arg.getNamedArguments(); DataphinAcsClient client = DataphinAcsClient.create(named_arguments.get("region"), named_arguments.get("endpoint"), named_arguments.get("access_key"), named_arguments.get("access_secret")); RunTriggerNodeRequest runTriggerNodeRequest = new RunTriggerNodeRequest(); // 设置需要触发的节点的节点ID runTriggerNodeRequest.setNodeId(named_arguments.get("node_id")); // 设置需要触发的周期实例对应的业务日期,格式为yyyy-MM-dd runTriggerNodeRequest.setBizDate(named_arguments.get("bizdate")); // 设置需要触发的节点所在项目的项目ID runTriggerNodeRequest.setProjectId(Long.parseLong(named_arguments.get("project_id"))); // 设置环境类型,建议保持 PROD (生产环境) 不变 runTriggerNodeRequest.setEnv("PROD"); // 设置需要触发的节点所在租户的租户ID runTriggerNodeRequest.setOpTenantId(Long.parseLong(named_arguments.get("tenant_id"))); System.out.println("Start to trigger node:" + named_arguments.get("node_id") + " on bizdate:" + named_arguments.get("bizdate")); RunTriggerNodeResponse runTriggerNodeResponse = client.getAcsResponse(runTriggerNodeRequest); System.out.println(runTriggerNodeResponse.getCode()); } }
- 编译打包为 trigger.jar
- 从浏览器 URL 获取 租户 ID(tenant_id),项目 ID(project_id),从属性获取节点 ID(node_id)
- 执行触发命令,可以将命令作为外部其他系统的任务代码,替换 “bizdate” 的取值为相应系统的日期变量
执行命令:
java -cp trigger.jar org.sample.RunTriggerNodeTest region=cn-hangzhou endpoint=<你的openapi endpoint> access_key=<你的ak> access_secret=<你的sk> node_id=n_5657258388970864640 bizdate=2024-03-19 project_id=6904554516377664 tenant_id=300523310
输出如下:
Start to trigger node:n_5657258388970864640 on bizdate:2024-03-19 OK
- 在生产运维,可以看到任务已被触发。
以上就是本期触发式节点的全部内容,欢迎升级后体验使用。