Dataphin 在 v3.13 版本上线了跨节点参数功能,有不少用户对相关概念和使用流程还不是很清楚,本文将通过示例来详细介绍该功能。
1. 背景与相关概念
Dataphin 的节点(任务)依赖关系组成了一个有向无环图(DAG),在以往,可以在 DAG 的节点间传递的只有状态,即上游节点的运行状态会影响其下游节点,节点本身的运行内容系统不感知,也无法将这些内容通过 DAG 向下传递。下游节点如果需要消费上游节点的产出,需要上游节点将产出写到公共存储,由下游节点去公共存储读取,最常用的公共位置就是表。但通过公共存储中转消息有以下不足:
- 对公共存储的写入和读取操作流程比较复杂,有些类型的节点甚至无法操作。如: 逻辑表节点的计算逻辑完全有字段计算逻辑决定,无法单独去读取公共存储中的消息。
- 写入公共存储再读取出来的效率比较低,如果有多个下游节点需要用到该消息,每一个节点都需要独立去读取消息。如果消息存储位置或格式变更,每一个下游节点均需要同步修改读取方式。
- 中转的消息通常非常短小,通过中转方式传递比较重
为了解决以上问题,Dataphin 引入了跨节点参数来在节点(任务)间传递消息
- 输出节点: 输出参数(向外传递消息)的节点。仅支持 shell,Python 和 SQL (MAXCOMPUTE_SQL,HIVE_SQL,ADBPG_SQL 等)类型的计算任务输出参数
- 输入节点: 接收并使用参数的节点。一个输出节点可以下接多个输入节点,只要是输出节点的直接下游节点,都可以引用输出节点的跨节点参数
2. 应用场景
跨节点参数有两个典型的应用场景:
- 某金融企业每天的周期任务涉及到币种转换,需要将每日的投资收益按照美元进行结算。每日的收盘汇率都会从业务系统抽取同步到一个离线表,有很多周期任务都涉及到币种换算,需要使用业务日期相应的汇率。可以将读取汇率的程序写入一个输出节点任务,将汇率作为跨节点参数输出,需要汇率的节点任务作为输入节点来引用跨节点参数。
- 某经销零售企业,需要每天定时查询供应商的某个服务,以确认产品目录是否有变更,如果有变更,则全量拉取最新目录数据(数据量比较大,拉取一次成本很高),如果无变更则继续沿用上一次拉取的数据。可以将检测产品目录是否变更的程序写入一个输出节点任务,将产品目录更新状态作为跨节点参数输出,拉取同步产品目录的节点作为输入节点,基于跨节点参数的取值来调度(条件调度)。
3. 步骤示例
3.1 汇率
数据结构
存在汇率表 exchange_rate_table,表结构和示例数据如下。
输出节点
- 创建 SQL 任务 get_exchange_rate,定义跨节点输出参数 usd2cny
- 在编辑器中,点击右键,选择“设置跨节点参数”
- 编辑器将自动提示已经声明定义好的跨节点输出参数
- 按照“使用说明”将某一字段的别名设置为跨节点输出参数的名称,系统将自动将查询结果的第一行的相应字段的取值赋给跨节点参数。注意: 对于 SQL 任务,如果有多个语句输出跨节点参数,每一个语句之前的 set 语句不可省略。本例有两种 SQL 写法:
- 第一种是每一个跨节点参数一个独立的 SQL
- 第二种是使用一个语句
- 提交任务
输入节点
- 创建 SQL 任务 exchange_usd_to_cny,测试代码如下。将输出节点 get_exchange_rate 添加为上游。
-- 10000 美金换算为人民币,日元,欧元,澳元和港币 select 10000 * ${usd2cny_rate}, 10000 * ${usd2jpy_rate}, 10000 * ${usd2eur_rate}, 10000 * ${usd2aud_rate}, 10000 * ${usd2hkd_rate};
- 将识别出的变量的参数类型修改为“跨节点变量”。
- 各个跨节点变量的参数值中选择 get_exchange_rate 的相应跨节点输出参数
补数据
- 先确认表中当日的汇率
- 进入运维,在 get_exchange_rate 上点击“补数据-补当前及下游任务”,选中下游 exchange_usd_to_cny 一起补数据。注意: 对输入节点补数据时,必须连带补输出节点,否则输入节点将使用跨节点参数的默认值。
- exchange_usd_to_cny 的运行日志如下,可以看到系统从表中将数据读取出来赋值给跨节点输出参数,并传递给了下游任务。
3.2 确认更新状态
输出节点
- 创建一个模拟的检测更新状态的 Python 任务,添加跨节点输出参数 update_status
- 键入代码,使用随机函数来返回状态。右键菜单可快捷设置跨节点参数。提交任务
from random import randint def check_update(): return randint(0, 1) setv("update_status", check_update())
输入节点
- 创建一个离线管道任务 imp_product_catalog,将 check_update 添加为其上游任务。
- 在 imp_product_catalog 上开启条件调度
- 添加条件 跨节点参数 check_update.update_status = 0 (无更新)时,空跑调度,不符合该条件则命中默认条件(正常调度)
- 输入不同的跨节点参数值,预览调度运行计划
补数据
亲爱的读者可亲自测试下效果,本文不再赘述。
以上就是跨节点参数的全部内容,如有建议或疑问,欢迎联系 Datpahin 技术支持。