前言
数据推送日前已在数据服务页面上提供全托管式的推送服务 (详情),基于同样的底层推送架构,我们将推送的能力也搬上了数据开发 (DataStudio),结合数据开发已有的工作流,提供了简单推送、合并推送、脚本推送及条件推送等四大推送能力,用户能在既有的工作流上弹性组装四种方式的推送。
文中我们以例子演示四种方式的推送,首先介绍如何在既有的工作流上增加推送能力。
业务流程 DAG 新建节点
用户可双击左侧目录树上的业务流程节点,即可打开业务流程,点击左上新建节点找到 SQL 节点、赋值节点、分支节点及数据推送等节点,点击后即可进入新建节点弹框。
节点上下游连结
用户可于单个节点里的右侧调度配置建立上游依赖,或于业务流程 DAG 上使用拖拽方式建立上游依赖。
上游节点参数输出
上游节点
于单个节点里的右侧调度配置找到节点上下文参数,找到输出参数区块,点击添加赋值参数,系统会将节点最后的结果输出到 outputs 变量里。
ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。
下游节点
于单个节点里的右侧调度配置找到节点上下文参数,找到输入参数区块,点击添加,找到上游节点的 outputs 参数,并取名为 inputs。
测试
于业务流程 DAG 上,对节点按右键,选择运行节点及下游或运行到该节点,会自动跳出日志区块,并可对节点按右键查看单个节点的日志。
提交与发布
工作流中每个节点都有自己的调度,需要根据依赖自动调整调度时间,如上游是 00:00 开始调度,下游节点可以设定 00:05 开始调度,并将每个节点分别提交与发布。
场景演示
准备数据
我们以订单销售系统且使用 MySQL 数据库为例,首先建立一个订单表。
CREATE TABLE orders ( order_id INT NOT NULL AUTO_INCREMENT, category VARCHAR(100) NOT NULL, -- 类别 sales DOUBLE NOT NULL, -- 订单销售额 datetime DATETIME NOT NULL, -- 订单发生时间 PRIMARY KEY (order_id), INDEX (category) );
对订单表塞入以下数据。
INSERT INTO orders (category, sales, datetime) VALUES ('Baby', 200.0, '2024-06-14 14:00:00'), ('Baby', 100.0, '2024-06-14 18:00:00'), ('Baby', 150.0, '2024-06-15 13:00:00'), ('Baby', 350.0, '2024-06-16 18:00:00'), ('Books', 150.0, '2024-05-12 13:00:00'), ('Computers', 3000.0, '2024-05-14 10:00:00'), ('Electronics', 1500.0, '2024-05-19 12:11:23'), ('Games', 399.0, '2024-05-17 19:00:00'), ('Garden', 370.0, '2024-05-11 14:01:00'), ('Clothing', 500.0, '2024-05-11 12:01:00'), ('Grocery', 100.0, '2024-05-01 10:00:00'), ('Health', 50.0, '2024-05-30 19:00:00'), ('Jewelry', 2200.0, '2024-05-15 15:10:00'), ('Kids', 999.0, '2024-05-11 14:00:00'), ('Movies', 50.0, '2024-05-19 14:06:00'), ('Music', 100.0, '2024-05-20 11:00:00'), ('Outdoors', 555.0, '2024-05-22 18:00:00'); -- 可自行添加订单数据
简单推送
下表是简单推送的场景概览。
场景 |
凌晨0点时,推送昨天销售总额。 |
工作流 |
|
推送结果 |
|
首先建立一个 MySQL 节点,并计算昨日销售总额,SQL 如下。
-- 建立一个临时表 CREATE TEMPORARY TABLE IF NOT EXISTS temp_array ( total_amount DOUBLE ); -- 统计前日销售总额并把总额数据插入临时表 INSERT INTO temp_array (total_amount) SELECT SUM(sales) FROM orders WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59'); -- 从临时表上把总额数据输出下游 select total_amount from temp_array;
于右侧调度配置的节点上下文参数,输出参数区块点击添加赋值参数,可以看到新增输出 outputs 参数。
ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。
新增一数据推送节点,于右侧调度配置设定上游节点 (或于业务流程 DAG 上拖拽节点连线),并于节点上下文参数的输入参数区块新增上游输入的 outputs 参数。
数据推送节点内容部份,设定推送目标、标题及正文,正文能使用 outputs 输出的数据,如 total_amount。
若要在推送内文中使用调度参数,也可于调度配置中新增参数。
于业务流程 DAG 中进行测试,查看推送目标是否显示正确内容,并提交与发布。
合并推送
下表是合并推送的场景概览。
场景 |
凌晨0点时,推送昨天销售额总额及各类别销售额增长情况。 |
工作流 |
|
推送结果 |
|
在简单推送的场景上,添加一个查询昨日销售额增长的 MySQL 节点,SQL 如下。
-- 建立一个大前天的数据临时表 CREATE TEMPORARY TABLE IF NOT EXISTS temp_array1 ( category VARCHAR(255), sales DOUBLE ); -- 将大前天的数据插入到临时表 INSERT INTO temp_array1 (category, sales) SELECT category, SUM(sales) FROM orders WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 23:59:59') GROUP BY category; -- 建立一个前天的数据临时表 CREATE TEMPORARY TABLE IF NOT EXISTS temp_array2 ( category VARCHAR(255), sales DOUBLE ); -- 将前天的数据插入到临时表 INSERT INTO temp_array2 (category, sales) SELECT category, SUM(sales) FROM orders WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59') GROUP BY category; -- 建立一个输出的临时表 CREATE TEMPORARY TABLE IF NOT EXISTS result ( category VARCHAR(255), diff DOUBLE ); -- 将前天的数据减去大前天的数据,并将结果放到输出的临时表 INSERT INTO result (category, diff) select temp_array2.category as category, temp_array2.sales - temp_array1.sales as diff from temp_array1 left join temp_array2 on temp_array1.category = temp_array2.category; -- 将结果输出到下游节点 select category, diff from result;
在原简单场景的数据推送节点上,于右侧调度配置设定新增的上游节点 (或于业务流程 DAG 上拖拽节点连线),并于节点上下文参数的输入参数区块新增此上游节点的 outputs 参数,如下图取名 inputs_2 的行。
于正文新增表格区块,增加上游新增输出的 category 与 diff 数据,点击 diff 列上的设定图标,设定数值小于 0 则显示红色的条件。
至业务流程 DAG 上对数据推送节点右键点击运行至该节点,查看推送目标是否显示正确内容,并提交与发布。
脚本推送
下表是脚本推送的场景概览。
场景 |
周日凌晨0点时,推送上周销售额前三名的类别。 |
工作流 |
|
推送结果 |
|
首先新增一个 MySQL 节点,其 SQL 如下,查询上一周销售前三名的类别与各自的销售总额。
-- 统计上一周销售额 SELECT category, SUM(sales) as amount FROM orders WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 WEEK), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59') GROUP BY category ORDER BY amount DESC limit 3;
在右侧调度配置的节点上下文参数的输出参数区块添加赋值参数 outputs。
ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。
新增一个赋值节点,语言选择 Python,其代码如下,把上游节点的输出组合成 Markdown 并输出到下游,赋值节点从上游取值的说明可以参考此篇文档。
def main(): from datetime import date today = date.today() formatted_date = today.strftime('%Y-%m-%d') msg = 'Stat date: ' + formatted_date + ' \\n\\n ' \ '- 1: ${inputs[0][0]}, sales: ${inputs[0][1]} \\n\\n ' \ '- 2: ${inputs[1][0]}, sales: ${inputs[1][1]} \\n\\n ' \ '- 3: ${inputs[2][0]}, sales: ${inputs[2][1]} \\n\\n ' print(msg) if __name__ == "__main__": import sys main()
Python 代码里的 intputs 由右侧面板,调度配置的节点上下文参数的输入参数区块添加上游的输出参数 outputs,并取名为 intputs。
新增一数据推送节点,于右侧面板调度配置的节点上下文参数的输入参数区块添加上游的输出参数 outputs,并取名为 intputs。
至业务流程 DAG 上对上游节点右键点击运行该节点及下游,查看推送目标是否显示正确内容,并提交与发布。
条件推送
下表是条件推送的场景概览。
场景 |
凌晨0点时,推送上月销售额总额情况,达标的话,推送前三名销售类别,不达标的话,推送倒数三名销售类别。 |
工作流 |
|
推送结果 |
|
首先新增一个查询上月销售额的 MySQL 节点,SQL 如下。
-- 统计上个月销售额 SELECT SUM(sales) AS sales_amount FROM orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
于右侧调度配置的节点上下文参数,输出参数区块点击添加赋值参数,可以看到新增输出 outputs 参数。
ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。
新增一分支节点,于右侧调度配置面板的节点上下文参数的输入参数区块,增加上游 MySQL 节点的 outputs 参数,这里命名为 inputs,并可于分支节点的条件判断中使用,可参考此篇文档。
上游输出的 outputs 为二维数组,在分支节点可于 inputs 里取值,如上游 MySQL 节点输出的 sales_amount 数据会放到 ${inputs[0][0]} 里,这里设定了 10000 为阈值,并分为达标与不达标两个节点输出。
设定好条件后,右侧调度配置面板即会自动出现输出名。
达标
新增一 MySQL 节点,其 SQL 内容如下。
-- 统计上月总销售额 SET @all_cat_sales_volume_month := 0.0; SELECT SUM(sales) INTO @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59'); -- 新增一临时表统计前三名销售类别及各别的总销售额,并带上全体总销售额 CREATE TEMPORARY TABLE IF NOT EXISTS temp_array ( category VARCHAR(255), sales DOUBLE, all_cat_sales_volume_month DOUBLE ); -- 将值塞进临时表 INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) as amount, @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category order by amount desc limit 3; -- 将值输出到下游 SELECT category, sales, all_cat_sales_volume_month from temp_array;
将此 MySQL 节点与上游分支节点建立连线,可于右侧调度配置面板的上游节点找到达标的输出名,或于业务流程 DAG 拖拽建立连线。
于右侧调度配置的节点上下文参数,输出参数区块点击添加赋值参数,可以看到新增输出 outputs 参数。
ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。
新增一个销售达标的推送节点,于右侧调度配置设定上游的 MySQL 节点 (或于业务流程 DAG 上拖拽节点连线),并于节点上下文参数的输入参数区块新增此上游节点的 outputs 参数,并使用上游透出的 category, sales, all_cat_sales_volume_month 数据,如下图。
不达标
新增一 MySQL 节点,其 SQL 内容如下。
-- 统计上月总销售额 SET @all_cat_sales_volume_month := 0.0; SELECT SUM(sales) INTO @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59'); -- 新增一临时表统计到数三名销售类别及各别的总销售额,并带上全体总销售额 CREATE TEMPORARY TABLE IF NOT EXISTS temp_array ( category VARCHAR(255), sales DOUBLE, all_cat_sales_volume_month DOUBLE ); -- 将值塞进临时表 INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) as amount, @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category order by amount asc limit 3; -- 将值输出到下游 SELECT category, sales, all_cat_sales_volume_month from temp_array;
将此 MySQL 节点与上游分支节点建立连线,可于右侧调度配置面板的上游节点找到不达标的输出名,或于业务流程 DAG 拖拽建立连线。
于右侧调度配置的节点上下文参数,输出参数区块点击添加赋值参数,可以看到新增输出 outputs 参数。
ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。
新增一个销售不达标的推送节点,于右侧调度配置设定上游的 MySQL 节点 (或于业务流程 DAG 上拖拽节点连线),并于节点上下文参数的输入参数区块新增此上游节点的 outputs 参数,并使用上游透出的 category, sales, all_cat_sales_volume_month 数据,如下图。
建立好达标与不达标两个分支后,至业务流程 DAG 上对上游节点右键点击运行该节点及下游,查看推送目标是否显示正确内容,并提交与发布。
小结
目前用户可以在数据开发已有的工作流用上数据推送,接下来我们还会再增进数据推送的能力,如支持更多类型的推送目标、支持自定义 Webhook 推送结构、更好的推送预览画面、支持更丰富的推送内容 (如数据分析卡片)等,让数据开发有更多的表现能力。