- 场景
某经销零售企业,需要每天定时查询供应商的某个服务,以确认产品目录是否有变更,如果有变更,则全量拉取最新目录数据(数据量比较大,拉取一次成本很高),如果无变更则继续沿用上一次拉取的数据,在dataphin如何实现?
- 解决方案及功能
可以将检测产品目录是否变更的程序写入一个输出节点任务,将产品目录更新状态作为跨节点参数输出,拉取同步产品目录的节点作为输入节点,基于跨节点参数的取值来调度(条件调度)。
输出节点
- 创建一个模拟的检测产品目录是否变更的 Python 任务,添加跨节点输出参数 update_status
- 键入代码,使用随机函数来返回状态(仅作演示,实际根据产品目录是否变更来返回状态)。右键菜单可快捷设置跨节点参数。提交任务
输入节点
- 创建一个离线管道任务 imp_product_catalog,将 check_update 添加为其上游任务。
- 在 imp_product_catalog 上开启条件调度
- 添加条件:跨节点参数 check_update.update_status = 0 (无更新)时,空跑调度,不符合该条件则命中默认条件(正常调度)
- 输入不同的跨节点参数值,预览调度运行计划
综上,我们就可以根据上游节点提供的跨节点参数的状态结合条件调度,动态调整下游节点的调度运行计划