跨节点参数的缘起与今生

简介: Dataphin v3.13引入了跨节点参数功能,允许任务间传递消息。输出节点(如SQL、Shell、Python任务)能输出参数,输入节点可以接收并使用这些参数。此功能解决了通过公共存储中转消息的复杂性和低效问题。应用场景包括:金融企业的币种转换,其中汇率任务(输出节点)提供汇率,转换任务(输入节点)使用该汇率;以及产品目录更新检查,通过跨节点参数控制是否需要执行数据导入任务。用户可以通过任务编辑器设置和传递跨节点参数,并在运维中进行补数据操作。

Dataphin 在 v3.13 版本上线了跨节点参数功能,有不少用户对相关概念和使用流程还不是很清楚,本文将通过示例来详细介绍该功能。


1. 背景与相关概念

Dataphin 的节点(任务)依赖关系组成了一个有向无环图(DAG),在以往,可以在 DAG 的节点间传递的只有状态,即上游节点的运行状态会影响其下游节点,节点本身的运行内容系统不感知,也无法将这些内容通过 DAG 向下传递。下游节点如果需要消费上游节点的产出,需要上游节点将产出写到公共存储,由下游节点去公共存储读取,最常用的公共位置就是表。但通过公共存储中转消息有以下不足:

  1. 对公共存储的写入和读取操作流程比较复杂,有些类型的节点甚至无法操作。如: 逻辑表节点的计算逻辑完全有字段计算逻辑决定,无法单独去读取公共存储中的消息。
  2. 写入公共存储再读取出来的效率比较低,如果有多个下游节点需要用到该消息,每一个节点都需要独立去读取消息。如果消息存储位置或格式变更,每一个下游节点均需要同步修改读取方式。
  3. 中转的消息通常非常短小,通过中转方式传递比较重



为了解决以上问题,Dataphin 引入了跨节点参数来在节点(任务)间传递消息

  1. 输出节点: 输出参数(向外传递消息)的节点。仅支持 shell,Python 和 SQL (MAXCOMPUTE_SQL,HIVE_SQL,ADBPG_SQL 等)类型的计算任务输出参数
  2. 输入节点: 接收并使用参数的节点。一个输出节点可以下接多个输入节点,只要是输出节点的直接下游节点,都可以引用输出节点的跨节点参数

image.png

2. 应用场景

跨节点参数有两个典型的应用场景:

  1. 某金融企业每天的周期任务涉及到币种转换,需要将每日的投资收益按照美元进行结算。每日的收盘汇率都会从业务系统抽取同步到一个离线表,有很多周期任务都涉及到币种换算,需要使用业务日期相应的汇率。可以将读取汇率的程序写入一个输出节点任务,将汇率作为跨节点参数输出,需要汇率的节点任务作为输入节点来引用跨节点参数。
  2. 某经销零售企业,需要每天定时查询供应商的某个服务,以确认产品目录是否有变更,如果有变更,则全量拉取最新目录数据(数据量比较大,拉取一次成本很高),如果无变更则继续沿用上一次拉取的数据。可以将检测产品目录是否变更的程序写入一个输出节点任务,将产品目录更新状态作为跨节点参数输出,拉取同步产品目录的节点作为输入节点,基于跨节点参数的取值来调度(条件调度)。


3. 步骤示例

3.1 汇率

数据结构

存在汇率表 exchange_rate_table,表结构和示例数据如下。


输出节点

  1. 创建 SQL 任务 get_exchange_rate,定义跨节点输出参数 usd2cny


  1. 在编辑器中,点击右键,选择“设置跨节点参数”


  1. 编辑器将自动提示已经声明定义好的跨节点输出参数


  1. 按照“使用说明”将某一字段的别名设置为跨节点输出参数的名称,系统将自动将查询结果的第一行的相应字段的取值赋给跨节点参数。注意: 对于 SQL 任务,如果有多个语句输出跨节点参数,每一个语句之前的 set 语句不可省略。本例有两种 SQL 写法:
  1. 第一种是每一个跨节点参数一个独立的 SQL

  1. 第二种是使用一个语句


  1. 提交任务

输入节点

  1. 创建 SQL 任务 exchange_usd_to_cny,测试代码如下。将输出节点 get_exchange_rate 添加为上游。
-- 10000 美金换算为人民币,日元,欧元,澳元和港币
select 10000 * ${usd2cny_rate}, 
       10000 * ${usd2jpy_rate}, 
       10000 * ${usd2eur_rate}, 
       10000 * ${usd2aud_rate}, 
       10000 * ${usd2hkd_rate};


  1. 将识别出的变量的参数类型修改为“跨节点变量”。


  1. 各个跨节点变量的参数值中选择 get_exchange_rate 的相应跨节点输出参数


补数据

  1. 先确认表中当日的汇率


  1. 进入运维,在 get_exchange_rate 上点击“补数据-补当前及下游任务”,选中下游 exchange_usd_to_cny 一起补数据。注意: 对输入节点补数据时,必须连带补输出节点,否则输入节点将使用跨节点参数的默认值。


  1. exchange_usd_to_cny 的运行日志如下,可以看到系统从表中将数据读取出来赋值给跨节点输出参数,并传递给了下游任务。




3.2 确认更新状态

输出节点

  1. 创建一个模拟的检测更新状态的 Python 任务,添加跨节点输出参数 update_status


  1. 键入代码,使用随机函数来返回状态。右键菜单可快捷设置跨节点参数。提交任务

from random import randint
def check_update():
    return randint(0, 1)
setv("update_status", check_update())


输入节点

  1. 创建一个离线管道任务 imp_product_catalog,将 check_update 添加为其上游任务。


  1. 在 imp_product_catalog 上开启条件调度


  1. 添加条件 跨节点参数 check_update.update_status = 0 (无更新)时,空跑调度,不符合该条件则命中默认条件(正常调度)


  1. 输入不同的跨节点参数值,预览调度运行计划


补数据

亲爱的读者可亲自测试下效果,本文不再赘述。


以上就是跨节点参数的全部内容,如有建议或疑问,欢迎联系 Datpahin 技术支持。

作者介绍
目录