快讯~数据推送已上架 DataStudio 数据开发,与工作流完美结合

本文涉及的产品
大数据开发治理平台 DataWorks,不限时长
简介: 数据推送日前已在数据服务页面上提供全托管式的推送服务,基于同样的底层推送架构,我们将推送的能力也搬上了数据开发 (DataStudio),结合数据开发已有的工作流,提供了简单推送、合并推送、脚本推送及条件推送等四大推送能力,用户能在既有的工作流上弹性组装四种方式的推送。

前言

数据推送日前已在数据服务页面上提供全托管式的推送服务 (详情),基于同样的底层推送架构,我们将推送的能力也搬上了数据开发 (DataStudio),结合数据开发已有的工作流,提供了简单推送、合并推送、脚本推送及条件推送等四大推送能力,用户能在既有的工作流上弹性组装四种方式的推送。

文中我们以例子演示四种方式的推送,首先介绍如何在既有的工作流上增加推送能力。

业务流程 DAG 新建节点

用户可双击左侧目录树上的业务流程节点,即可打开业务流程,点击左上新建节点找到 SQL 节点、赋值节点、分支节点及数据推送等节点,点击后即可进入新建节点弹框。


节点上下游连结

用户可于单个节点里的右侧调度配置建立上游依赖,或于业务流程 DAG 上使用拖拽方式建立上游依赖。


上游节点参数输出

上游节点

于单个节点里的右侧调度配置找到节点上下文参数,找到输出参数区块,点击添加赋值参数,系统会将节点最后的结果输出到 outputs 变量里。

ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。

下游节点

于单个节点里的右侧调度配置找到节点上下文参数,找到输入参数区块,点击添加,找到上游节点的 outputs 参数,并取名为 inputs。


测试

于业务流程 DAG 上,对节点按右键,选择运行节点及下游运行到该节点,会自动跳出日志区块,并可对节点按右键查看单个节点的日志。

提交与发布

工作流中每个节点都有自己的调度,需要根据依赖自动调整调度时间,如上游是 00:00 开始调度,下游节点可以设定 00:05 开始调度,并将每个节点分别提交与发布。


提交后,至发布页面发布节点,并进入运维中心查看运行日志。


场景演示

准备数据

我们以订单销售系统且使用 MySQL 数据库为例,首先建立一个订单表。

CREATE TABLE orders (
     order_id INT NOT NULL AUTO_INCREMENT,
     category VARCHAR(100) NOT NULL, -- 类别
     sales DOUBLE NOT NULL, -- 订单销售额
     datetime DATETIME NOT NULL, -- 订单发生时间
     PRIMARY KEY (order_id),
     INDEX (category)
);


对订单表塞入以下数据。

INSERT INTO
    orders (category, sales, datetime)
VALUES
    ('Baby', 200.0, '2024-06-14 14:00:00'),
    ('Baby', 100.0, '2024-06-14 18:00:00'),
    ('Baby', 150.0, '2024-06-15 13:00:00'),
    ('Baby', 350.0, '2024-06-16 18:00:00'),
    ('Books', 150.0, '2024-05-12 13:00:00'),
    ('Computers', 3000.0, '2024-05-14 10:00:00'),
    ('Electronics', 1500.0, '2024-05-19 12:11:23'),
    ('Games', 399.0, '2024-05-17 19:00:00'),
    ('Garden', 370.0, '2024-05-11 14:01:00'),
    ('Clothing', 500.0, '2024-05-11 12:01:00'),
    ('Grocery', 100.0, '2024-05-01 10:00:00'),
    ('Health', 50.0, '2024-05-30 19:00:00'),
    ('Jewelry', 2200.0, '2024-05-15 15:10:00'),
    ('Kids', 999.0, '2024-05-11 14:00:00'),
    ('Movies', 50.0, '2024-05-19 14:06:00'),
    ('Music', 100.0, '2024-05-20 11:00:00'),
    ('Outdoors', 555.0, '2024-05-22 18:00:00');
-- 可自行添加订单数据


简单推送

下表是简单推送的场景概览。

场景

凌晨0点时,推送昨天销售总额。

工作流

推送结果


首先建立一个 MySQL 节点,并计算昨日销售总额,SQL 如下。

-- 建立一个临时表
CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
  total_amount DOUBLE
);
-- 统计前日销售总额并把总额数据插入临时表
INSERT INTO temp_array (total_amount) 
SELECT SUM(sales)
FROM orders
WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59');
-- 从临时表上把总额数据输出下游
select total_amount from temp_array;


于右侧调度配置的节点上下文参数,输出参数区块点击添加赋值参数,可以看到新增输出 outputs 参数。

ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。


新增一数据推送节点,于右侧调度配置设定上游节点 (或于业务流程 DAG 上拖拽节点连线),并于节点上下文参数的输入参数区块新增上游输入的 outputs 参数。

数据推送节点内容部份,设定推送目标、标题及正文,正文能使用 outputs 输出的数据,如 total_amount。


若要在推送内文中使用调度参数,也可于调度配置中新增参数。


于业务流程 DAG 中进行测试,查看推送目标是否显示正确内容,并提交与发布。


合并推送

下表是合并推送的场景概览。

场景

凌晨0点时,推送昨天销售额总额及各类别销售额增长情况。

工作流

推送结果


在简单推送的场景上,添加一个查询昨日销售额增长的 MySQL 节点,SQL 如下。

-- 建立一个大前天的数据临时表
CREATE TEMPORARY TABLE IF NOT EXISTS temp_array1 (
  category VARCHAR(255),
  sales DOUBLE
);
-- 将大前天的数据插入到临时表
INSERT INTO temp_array1 (category, sales) SELECT category, SUM(sales)
FROM orders
WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 23:59:59')
GROUP BY category;
-- 建立一个前天的数据临时表
CREATE TEMPORARY TABLE IF NOT EXISTS temp_array2 (
  category VARCHAR(255),
  sales DOUBLE
);
-- 将前天的数据插入到临时表
INSERT INTO temp_array2 (category, sales) SELECT category, SUM(sales)
FROM orders
WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59')
GROUP BY category;
-- 建立一个输出的临时表
CREATE TEMPORARY TABLE IF NOT EXISTS result (
  category VARCHAR(255),
  diff DOUBLE
);
-- 将前天的数据减去大前天的数据,并将结果放到输出的临时表
INSERT INTO result (category, diff) select temp_array2.category as category, temp_array2.sales - temp_array1.sales as diff from temp_array1 left join temp_array2 on temp_array1.category = temp_array2.category;
-- 将结果输出到下游节点
select category, diff from result;


在原简单场景的数据推送节点上,于右侧调度配置设定新增的上游节点 (或于业务流程 DAG 上拖拽节点连线),并于节点上下文参数的输入参数区块新增此上游节点的 outputs 参数,如下图取名 inputs_2 的行。


于正文新增表格区块,增加上游新增输出的 category 与 diff 数据,点击 diff 列上的设定图标,设定数值小于 0 则显示红色的条件。


至业务流程 DAG 上对数据推送节点右键点击运行至该节点,查看推送目标是否显示正确内容,并提交与发布。

脚本推送

下表是脚本推送的场景概览。

场景

周日凌晨0点时,推送上周销售额前三名的类别。

工作流

推送结果


首先新增一个 MySQL 节点,其 SQL 如下,查询上一周销售前三名的类别与各自的销售总额。

-- 统计上一周销售额
SELECT category, SUM(sales) as amount
FROM orders
WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 WEEK), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59')
GROUP BY category ORDER BY amount DESC limit 3;


在右侧调度配置的节点上下文参数的输出参数区块添加赋值参数 outputs。

ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。


新增一个赋值节点,语言选择 Python,其代码如下,把上游节点的输出组合成 Markdown 并输出到下游,赋值节点从上游取值的说明可以参考此篇文档

def main():
    from datetime import date
    today = date.today()
    formatted_date = today.strftime('%Y-%m-%d')
    
    msg = 'Stat date: ' + formatted_date + ' \\n\\n ' \
    '- 1: ${inputs[0][0]}, sales: ${inputs[0][1]} \\n\\n ' \
    '- 2: ${inputs[1][0]}, sales: ${inputs[1][1]} \\n\\n ' \
    '- 3: ${inputs[2][0]}, sales: ${inputs[2][1]} \\n\\n '
    
    print(msg)
if __name__ == "__main__":
    import sys
    main()


Python 代码里的 intputs 由右侧面板,调度配置的节点上下文参数的输入参数区块添加上游的输出参数 outputs,并取名为 intputs。


新增一数据推送节点,于右侧面板调度配置的节点上下文参数的输入参数区块添加上游的输出参数 outputs,并取名为 intputs。


至业务流程 DAG 上对上游节点右键点击运行该节点及下游,查看推送目标是否显示正确内容,并提交与发布。

条件推送

下表是条件推送的场景概览。

场景

凌晨0点时,推送上月销售额总额情况,达标的话,推送前三名销售类别,不达标的话,推送倒数三名销售类别。

工作流

推送结果


首先新增一个查询上月销售额的 MySQL 节点,SQL 如下。

-- 统计上个月销售额
SELECT SUM(sales) AS sales_amount
FROM orders
WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');


于右侧调度配置的节点上下文参数,输出参数区块点击添加赋值参数,可以看到新增输出 outputs 参数。

ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。


新增一分支节点,于右侧调度配置面板的节点上下文参数的输入参数区块,增加上游 MySQL 节点的 outputs 参数,这里命名为 inputs,并可于分支节点的条件判断中使用,可参考此篇文档


上游输出的 outputs 为二维数组,在分支节点可于 inputs 里取值,如上游 MySQL 节点输出的 sales_amount 数据会放到 ${inputs[0][0]} 里,这里设定了 10000 为阈值,并分为达标与不达标两个节点输出。


设定好条件后,右侧调度配置面板即会自动出现输出名。

达标

新增一 MySQL 节点,其 SQL 内容如下。

-- 统计上月总销售额
SET @all_cat_sales_volume_month := 0.0;
SELECT SUM(sales) INTO @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
-- 新增一临时表统计前三名销售类别及各别的总销售额,并带上全体总销售额
CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
  category VARCHAR(255),
  sales DOUBLE,
  all_cat_sales_volume_month DOUBLE
);
-- 将值塞进临时表
INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) as amount, @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category order by amount desc limit 3;
-- 将值输出到下游
SELECT category, sales, all_cat_sales_volume_month from temp_array;


将此 MySQL 节点与上游分支节点建立连线,可于右侧调度配置面板的上游节点找到达标的输出名,或于业务流程 DAG 拖拽建立连线。


于右侧调度配置的节点上下文参数,输出参数区块点击添加赋值参数,可以看到新增输出 outputs 参数。

ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。


新增一个销售达标的推送节点,于右侧调度配置设定上游的 MySQL 节点 (或于业务流程 DAG 上拖拽节点连线),并于节点上下文参数的输入参数区块新增此上游节点的 outputs 参数,并使用上游透出的 category, sales, all_cat_sales_volume_month 数据,如下图。


不达标

新增一 MySQL 节点,其 SQL 内容如下。

-- 统计上月总销售额
SET @all_cat_sales_volume_month := 0.0;
SELECT SUM(sales) INTO @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
-- 新增一临时表统计到数三名销售类别及各别的总销售额,并带上全体总销售额
CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
  category VARCHAR(255),
  sales DOUBLE,
  all_cat_sales_volume_month DOUBLE
);
-- 将值塞进临时表
INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) as amount, @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category order by amount asc limit 3;
-- 将值输出到下游
SELECT category, sales, all_cat_sales_volume_month from temp_array;


将此 MySQL 节点与上游分支节点建立连线,可于右侧调度配置面板的上游节点找到不达标的输出名,或于业务流程 DAG 拖拽建立连线。


于右侧调度配置的节点上下文参数,输出参数区块点击添加赋值参数,可以看到新增输出 outputs 参数。

ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。


新增一个销售不达标的推送节点,于右侧调度配置设定上游的 MySQL 节点 (或于业务流程 DAG 上拖拽节点连线),并于节点上下文参数的输入参数区块新增此上游节点的 outputs 参数,并使用上游透出的 category, sales, all_cat_sales_volume_month 数据,如下图。


建立好达标与不达标两个分支后,至业务流程 DAG 上对上游节点右键点击运行该节点及下游,查看推送目标是否显示正确内容,并提交与发布。


小结

目前用户可以在数据开发已有的工作流用上数据推送,接下来我们还会再增进数据推送的能力,如支持更多类型的推送目标、支持自定义 Webhook 推送结构、更好的推送预览画面、支持更丰富的推送内容 (如数据分析卡片)等,让数据开发有更多的表现能力。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 课程目标  通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群  企业数据仓库开发人员  大数据平台开发人员  数据分析师  大数据运维人员  对于大数据平台、数据中台产品感兴趣的开发者
目录
相关文章
|
2月前
|
数据管理 机器人 BI
数据管理DMS产品使用合集之如何让报表自动更新推送到钉钉机器人
阿里云数据管理DMS提供了全面的数据管理、数据库运维、数据安全、数据迁移与同步等功能,助力企业高效、安全地进行数据库管理和运维工作。以下是DMS产品使用合集的详细介绍。
50 3
|
2月前
|
数据采集 分布式计算 DataWorks
DataWorks产品使用合集之如何把所有的历史记录留存下来
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
10 1
|
2月前
|
运维 DataWorks 安全
DataWorks产品使用合集之有待发布的,我取消发布了之后,还是删不掉,该怎么办
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之监控告警是否支持发飞书群消息
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
15 0
|
3月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之DataWorks中,我想要转交自己负责的表给其他人如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
33 1
DataWorks产品使用合集之DataWorks中,我想要转交自己负责的表给其他人如何解决
|
3月前
|
缓存 运维 监控
应用研发平台EMAS 常见问题之用华为的推送界面阿里云收不到如何解决
应用研发平台EMAS(Enterprise Mobile Application Service)是阿里云提供的一个全栈移动应用开发平台,集成了应用开发、测试、部署、监控和运营服务;本合集旨在总结EMAS产品在应用开发和运维过程中的常见问题及解决方案,助力开发者和企业高效解决技术难题,加速移动应用的上线和稳定运行。
556 2
|
3月前
|
设计模式 小程序 安全
【社区每周】商家分账接入指南更新;基础库新增抽象节点功能及上周问题反馈(2月第二期)
【社区每周】商家分账接入指南更新;基础库新增抽象节点功能及上周问题反馈(2月第二期)
167 11
|
3月前
|
监控 安全 BI
宜搭报表中,如何实现将报表定时下载并推送到群里
宜搭报表中,如何实现将报表定时下载并推送到群里
|
3月前
|
SQL 运维 分布式计算
DataWorks发现通过可视化界面创建表,没有提交生产的按钮啦?现在开发不能提交生产了码?
DataWorks发现通过可视化界面创建表,没有提交生产的按钮啦?现在开发不能提交生产了码?
44 1
|
SQL JSON NoSQL
ChatGPT工作提效之小鹅通二次开发批量API对接解决方案(学习记录同步、用户注册同步、权益订购同步、开发文档)
ChatGPT工作提效之小鹅通二次开发批量API对接解决方案(学习记录同步、用户注册同步、权益订购同步、开发文档)
341 0
ChatGPT工作提效之小鹅通二次开发批量API对接解决方案(学习记录同步、用户注册同步、权益订购同步、开发文档)

热门文章

最新文章

下一篇
云函数使用