有了 Dataphin v4.0,跨系统调度依赖再也不是难题

简介: Dataphin v4.0引入了新的触发式节点,用于解决多数据平台间的调度问题。当上游系统(如Unix的crontab)完成数据采集后,可通过触发式节点通知Dataphin开始拉取数据,避免传统轮询方式的效率低和资源占用。触发式节点需满足Dataphin OpenAPI开通和网络连通条件,并通过SDK进行外部触发。示例展示了如何创建和使用触发式节点,以及使用Java SDK模拟触发请求。

在企业的数据处理流程中,可能会有以下这样的场景: 存在多个不同的数据平台,比如使用 Dataphin 作为核心的数据仓库处理平台,另外有一个上游数据采集系统,使用 unix 的 crontab 定时调度。采集系统每天完成某个数据的采集完成时间,会由于上游数据量的原因而不稳定。当采集完成时,需要有一种手段可以告知 Dataphin,以便于 Dataphin 的集成系统开始拉取数据。


比较常见的解决方案是,上游采集系统在数据准备好之后,在某一个公共的位置写下一个完成标记,Dataphin 通过轮询的方式检测该完成标记是否已生成。这种方案存在以下问题:

  1. 存放标记的公共位置比较难管理,常用的主要有某个数据库或文件系统,有时不一定存在一个双方都容易访问的公共位置服务。
  2. Dataphin 的轮询任务有时会长时间运行,占用系统资源。


Dataphin v4.0 引入了全新的计算任务类型--触发式节点,可以作为另一种跨系统调度的选择。


1. 触发式节点基本特点

触发式节点任务与其他计算任务比较相似,特别的地方是:

  1. 触发式节点任务的周期实例进入运行状态需要满足
  1. 当前系统时间大于或等于实例的定时运行时间,与其他计算任务一致
  2. 所有上游节点已成功,与其他计算任务一致
  3. 已收到外部触发请求,此为触发式节点独有的运行条件
  1. 触发式节点任务的补数据实例,不依赖外部触发请求,与普通计算任务一致


2. 触发式节点使用条件

使用外部触发节点,需要满足以下条件:

  1. 已开通 Dataphin OpenAPI 功能。
  2. 发起触发请求的环境与 Dataphin 必须网络可连通。


3. 触发式节点使用示例

  1. 创建触发式节点,与普通虚拟节点流程一致。提交发布到生产环境。


  1. 在生产运维-周期实例中,可以看到触发式节点除了运行时间和内部上游依赖外,还会等待外部触发。


  1. 在外部触发,本例使用本地的程序模拟外部触发。
  • 下载 OpenAPI 的 SDK


  • 将 SDK 加入到 classpath,编写触发程序,示例如下
package org.sample;

import com.aliyuncs.dataphin.DataphinAcsClient;
import com.aliyuncs.dataphin.model.v20200830.sop.node.RunTriggerNodeRequest;
import com.aliyuncs.dataphin.model.v20200830.sop.node.RunTriggerNodeResponse;
import com.aliyuncs.exceptions.ClientException;
import java.util.HashMap;

public class RunTriggerNodeTest {
    static class ArgumentParser {
        public HashMap<String, String> named_arguments = new HashMap<>();
        
        public ArgumentParser(String[] args) {
            for (String s : args) {
                String[] split = s.split("=");
                this.named_arguments.put(split[0], split[1]);
            }
        }
        
        public HashMap<String, String> getNamedArguments() { return this.named_arguments; }
    }
    
    public static void main(String[] args) throws ClientException {
        ArgumentParser arg = new ArgumentParser(args);
        HashMap<String, String> named_arguments = arg.getNamedArguments();
        
        DataphinAcsClient client = DataphinAcsClient.create(named_arguments.get("region"), named_arguments.get("endpoint"), named_arguments.get("access_key"), named_arguments.get("access_secret"));
        RunTriggerNodeRequest runTriggerNodeRequest = new RunTriggerNodeRequest();
        
        // 设置需要触发的节点的节点ID
        runTriggerNodeRequest.setNodeId(named_arguments.get("node_id"));
        
        // 设置需要触发的周期实例对应的业务日期,格式为yyyy-MM-dd
        runTriggerNodeRequest.setBizDate(named_arguments.get("bizdate"));
        
        // 设置需要触发的节点所在项目的项目ID
        runTriggerNodeRequest.setProjectId(Long.parseLong(named_arguments.get("project_id")));
        
        // 设置环境类型,建议保持 PROD (生产环境) 不变
        runTriggerNodeRequest.setEnv("PROD");
        
        // 设置需要触发的节点所在租户的租户ID
        runTriggerNodeRequest.setOpTenantId(Long.parseLong(named_arguments.get("tenant_id")));
        
        System.out.println("Start to trigger node:" + named_arguments.get("node_id") + " on bizdate:" + named_arguments.get("bizdate"));
        RunTriggerNodeResponse runTriggerNodeResponse = client.getAcsResponse(runTriggerNodeRequest);
        System.out.println(runTriggerNodeResponse.getCode());
    }
}


  • 编译打包为 trigger.jar


  • 从浏览器 URL 获取 租户 ID(tenant_id),项目 ID(project_id),从属性获取节点 ID(node_id)


  • 执行触发命令,可以将命令作为外部其他系统的任务代码,替换 “bizdate” 的取值为相应系统的日期变量


执行命令:

java -cp trigger.jar org.sample.RunTriggerNodeTest region=cn-hangzhou endpoint=<你的openapi endpoint> access_key=<你的ak> access_secret=<你的sk> node_id=n_5657258388970864640 bizdate=2024-03-19 project_id=6904554516377664 tenant_id=300523310


输出如下:

Start to trigger node:n_5657258388970864640 on bizdate:2024-03-19
OK


  1. 在生产运维,可以看到任务已被触发。


以上就是本期触发式节点的全部内容,欢迎升级后体验使用。

作者介绍
目录