快讯~数据推送已上架 DataStudio 数据开发,与工作流完美结合

简介: 数据推送日前已在数据服务页面上提供全托管式的推送服务,基于同样的底层推送架构,我们将推送的能力也搬上了数据开发 (DataStudio),结合数据开发已有的工作流,提供了简单推送、合并推送、脚本推送及条件推送等四大推送能力,用户能在既有的工作流上弹性组装四种方式的推送。

前言

数据推送日前已在数据服务页面上提供全托管式的推送服务 (详情),基于同样的底层推送架构,我们将推送的能力也搬上了数据开发 (DataStudio),结合数据开发已有的工作流,提供了简单推送、合并推送、脚本推送及条件推送等四大推送能力,用户能在既有的工作流上弹性组装四种方式的推送。

文中我们以例子演示四种方式的推送,首先介绍如何在既有的工作流上增加推送能力。

业务流程 DAG 新建节点

用户可双击左侧目录树上的业务流程节点,即可打开业务流程,点击左上新建节点找到 SQL 节点、赋值节点、分支节点及数据推送等节点,点击后即可进入新建节点弹框。


节点上下游连结

用户可于单个节点里的右侧调度配置建立上游依赖,或于业务流程 DAG 上使用拖拽方式建立上游依赖。


上游节点参数输出

上游节点

于单个节点里的右侧调度配置找到节点上下文参数,找到输出参数区块,点击添加赋值参数,系统会将节点最后的结果输出到 outputs 变量里。

ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。

下游节点

于单个节点里的右侧调度配置找到节点上下文参数,找到输入参数区块,点击添加,找到上游节点的 outputs 参数,并取名为 inputs。


测试

于业务流程 DAG 上,对节点按右键,选择运行节点及下游运行到该节点,会自动跳出日志区块,并可对节点按右键查看单个节点的日志。

提交与发布

工作流中每个节点都有自己的调度,需要根据依赖自动调整调度时间,如上游是 00:00 开始调度,下游节点可以设定 00:05 开始调度,并将每个节点分别提交与发布。


提交后,至发布页面发布节点,并进入运维中心查看运行日志。


场景演示

准备数据

我们以订单销售系统且使用 MySQL 数据库为例,首先建立一个订单表。

CREATE TABLE orders (
     order_id INT NOT NULL AUTO_INCREMENT,
     category VARCHAR(100) NOT NULL, -- 类别
     sales DOUBLE NOT NULL, -- 订单销售额
     datetime DATETIME NOT NULL, -- 订单发生时间
     PRIMARY KEY (order_id),
     INDEX (category)
);


对订单表塞入以下数据。

INSERT INTO
    orders (category, sales, datetime)
VALUES
    ('Baby', 200.0, '2024-06-14 14:00:00'),
    ('Baby', 100.0, '2024-06-14 18:00:00'),
    ('Baby', 150.0, '2024-06-15 13:00:00'),
    ('Baby', 350.0, '2024-06-16 18:00:00'),
    ('Books', 150.0, '2024-05-12 13:00:00'),
    ('Computers', 3000.0, '2024-05-14 10:00:00'),
    ('Electronics', 1500.0, '2024-05-19 12:11:23'),
    ('Games', 399.0, '2024-05-17 19:00:00'),
    ('Garden', 370.0, '2024-05-11 14:01:00'),
    ('Clothing', 500.0, '2024-05-11 12:01:00'),
    ('Grocery', 100.0, '2024-05-01 10:00:00'),
    ('Health', 50.0, '2024-05-30 19:00:00'),
    ('Jewelry', 2200.0, '2024-05-15 15:10:00'),
    ('Kids', 999.0, '2024-05-11 14:00:00'),
    ('Movies', 50.0, '2024-05-19 14:06:00'),
    ('Music', 100.0, '2024-05-20 11:00:00'),
    ('Outdoors', 555.0, '2024-05-22 18:00:00');
-- 可自行添加订单数据


简单推送

下表是简单推送的场景概览。

场景

凌晨0点时,推送昨天销售总额。

工作流

推送结果


首先建立一个 MySQL 节点,并计算昨日销售总额,SQL 如下。

-- 建立一个临时表
CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
  total_amount DOUBLE
);
-- 统计前日销售总额并把总额数据插入临时表
INSERT INTO temp_array (total_amount) 
SELECT SUM(sales)
FROM orders
WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59');
-- 从临时表上把总额数据输出下游
select total_amount from temp_array;


于右侧调度配置的节点上下文参数,输出参数区块点击添加赋值参数,可以看到新增输出 outputs 参数。

ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。


新增一数据推送节点,于右侧调度配置设定上游节点 (或于业务流程 DAG 上拖拽节点连线),并于节点上下文参数的输入参数区块新增上游输入的 outputs 参数。

数据推送节点内容部份,设定推送目标、标题及正文,正文能使用 outputs 输出的数据,如 total_amount。


若要在推送内文中使用调度参数,也可于调度配置中新增参数。


于业务流程 DAG 中进行测试,查看推送目标是否显示正确内容,并提交与发布。


合并推送

下表是合并推送的场景概览。

场景

凌晨0点时,推送昨天销售额总额及各类别销售额增长情况。

工作流

推送结果


在简单推送的场景上,添加一个查询昨日销售额增长的 MySQL 节点,SQL 如下。

-- 建立一个大前天的数据临时表
CREATE TEMPORARY TABLE IF NOT EXISTS temp_array1 (
  category VARCHAR(255),
  sales DOUBLE
);
-- 将大前天的数据插入到临时表
INSERT INTO temp_array1 (category, sales) SELECT category, SUM(sales)
FROM orders
WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 23:59:59')
GROUP BY category;
-- 建立一个前天的数据临时表
CREATE TEMPORARY TABLE IF NOT EXISTS temp_array2 (
  category VARCHAR(255),
  sales DOUBLE
);
-- 将前天的数据插入到临时表
INSERT INTO temp_array2 (category, sales) SELECT category, SUM(sales)
FROM orders
WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59')
GROUP BY category;
-- 建立一个输出的临时表
CREATE TEMPORARY TABLE IF NOT EXISTS result (
  category VARCHAR(255),
  diff DOUBLE
);
-- 将前天的数据减去大前天的数据,并将结果放到输出的临时表
INSERT INTO result (category, diff) select temp_array2.category as category, temp_array2.sales - temp_array1.sales as diff from temp_array1 left join temp_array2 on temp_array1.category = temp_array2.category;
-- 将结果输出到下游节点
select category, diff from result;


在原简单场景的数据推送节点上,于右侧调度配置设定新增的上游节点 (或于业务流程 DAG 上拖拽节点连线),并于节点上下文参数的输入参数区块新增此上游节点的 outputs 参数,如下图取名 inputs_2 的行。


于正文新增表格区块,增加上游新增输出的 category 与 diff 数据,点击 diff 列上的设定图标,设定数值小于 0 则显示红色的条件。


至业务流程 DAG 上对数据推送节点右键点击运行至该节点,查看推送目标是否显示正确内容,并提交与发布。

脚本推送

下表是脚本推送的场景概览。

场景

周日凌晨0点时,推送上周销售额前三名的类别。

工作流

推送结果


首先新增一个 MySQL 节点,其 SQL 如下,查询上一周销售前三名的类别与各自的销售总额。

-- 统计上一周销售额
SELECT category, SUM(sales) as amount
FROM orders
WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 WEEK), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59')
GROUP BY category ORDER BY amount DESC limit 3;


在右侧调度配置的节点上下文参数的输出参数区块添加赋值参数 outputs。

ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。


新增一个赋值节点,语言选择 Python,其代码如下,把上游节点的输出组合成 Markdown 并输出到下游,赋值节点从上游取值的说明可以参考此篇文档

def main():
    from datetime import date
    today = date.today()
    formatted_date = today.strftime('%Y-%m-%d')
    
    msg = 'Stat date: ' + formatted_date + ' \\n\\n ' \
    '- 1: ${inputs[0][0]}, sales: ${inputs[0][1]} \\n\\n ' \
    '- 2: ${inputs[1][0]}, sales: ${inputs[1][1]} \\n\\n ' \
    '- 3: ${inputs[2][0]}, sales: ${inputs[2][1]} \\n\\n '
    
    print(msg)
if __name__ == "__main__":
    import sys
    main()


Python 代码里的 intputs 由右侧面板,调度配置的节点上下文参数的输入参数区块添加上游的输出参数 outputs,并取名为 intputs。


新增一数据推送节点,于右侧面板调度配置的节点上下文参数的输入参数区块添加上游的输出参数 outputs,并取名为 intputs。


至业务流程 DAG 上对上游节点右键点击运行该节点及下游,查看推送目标是否显示正确内容,并提交与发布。

条件推送

下表是条件推送的场景概览。

场景

凌晨0点时,推送上月销售额总额情况,达标的话,推送前三名销售类别,不达标的话,推送倒数三名销售类别。

工作流

推送结果


首先新增一个查询上月销售额的 MySQL 节点,SQL 如下。

-- 统计上个月销售额
SELECT SUM(sales) AS sales_amount
FROM orders
WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');


于右侧调度配置的节点上下文参数,输出参数区块点击添加赋值参数,可以看到新增输出 outputs 参数。

ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。


新增一分支节点,于右侧调度配置面板的节点上下文参数的输入参数区块,增加上游 MySQL 节点的 outputs 参数,这里命名为 inputs,并可于分支节点的条件判断中使用,可参考此篇文档


上游输出的 outputs 为二维数组,在分支节点可于 inputs 里取值,如上游 MySQL 节点输出的 sales_amount 数据会放到 ${inputs[0][0]} 里,这里设定了 10000 为阈值,并分为达标与不达标两个节点输出。


设定好条件后,右侧调度配置面板即会自动出现输出名。

达标

新增一 MySQL 节点,其 SQL 内容如下。

-- 统计上月总销售额
SET @all_cat_sales_volume_month := 0.0;
SELECT SUM(sales) INTO @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
-- 新增一临时表统计前三名销售类别及各别的总销售额,并带上全体总销售额
CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
  category VARCHAR(255),
  sales DOUBLE,
  all_cat_sales_volume_month DOUBLE
);
-- 将值塞进临时表
INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) as amount, @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category order by amount desc limit 3;
-- 将值输出到下游
SELECT category, sales, all_cat_sales_volume_month from temp_array;


将此 MySQL 节点与上游分支节点建立连线,可于右侧调度配置面板的上游节点找到达标的输出名,或于业务流程 DAG 拖拽建立连线。


于右侧调度配置的节点上下文参数,输出参数区块点击添加赋值参数,可以看到新增输出 outputs 参数。

ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。


新增一个销售达标的推送节点,于右侧调度配置设定上游的 MySQL 节点 (或于业务流程 DAG 上拖拽节点连线),并于节点上下文参数的输入参数区块新增此上游节点的 outputs 参数,并使用上游透出的 category, sales, all_cat_sales_volume_month 数据,如下图。


不达标

新增一 MySQL 节点,其 SQL 内容如下。

-- 统计上月总销售额
SET @all_cat_sales_volume_month := 0.0;
SELECT SUM(sales) INTO @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
-- 新增一临时表统计到数三名销售类别及各别的总销售额,并带上全体总销售额
CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
  category VARCHAR(255),
  sales DOUBLE,
  all_cat_sales_volume_month DOUBLE
);
-- 将值塞进临时表
INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) as amount, @all_cat_sales_volume_month from orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category order by amount asc limit 3;
-- 将值输出到下游
SELECT category, sales, all_cat_sales_volume_month from temp_array;


将此 MySQL 节点与上游分支节点建立连线,可于右侧调度配置面板的上游节点找到不达标的输出名,或于业务流程 DAG 拖拽建立连线。


于右侧调度配置的节点上下文参数,输出参数区块点击添加赋值参数,可以看到新增输出 outputs 参数。

ps: 如果为 ODPS 节点,直接使用赋值节点,选用 ODPS 语言,此类型节点已自动增加 outputs 参数。


新增一个销售不达标的推送节点,于右侧调度配置设定上游的 MySQL 节点 (或于业务流程 DAG 上拖拽节点连线),并于节点上下文参数的输入参数区块新增此上游节点的 outputs 参数,并使用上游透出的 category, sales, all_cat_sales_volume_month 数据,如下图。


建立好达标与不达标两个分支后,至业务流程 DAG 上对上游节点右键点击运行该节点及下游,查看推送目标是否显示正确内容,并提交与发布。


小结

目前用户可以在数据开发已有的工作流用上数据推送,接下来我们还会再增进数据推送的能力,如支持更多类型的推送目标、支持自定义 Webhook 推送结构、更好的推送预览画面、支持更丰富的推送内容 (如数据分析卡片)等,让数据开发有更多的表现能力。

相关实践学习
基于Hologres轻量实时的高性能OLAP分析
本教程基于GitHub Archive公开数据集,通过DataWorks将GitHub中的项⽬、行为等20多种事件类型数据实时采集至Hologres进行分析,同时使用DataV内置模板,快速搭建实时可视化数据大屏,从开发者、项⽬、编程语⾔等多个维度了解GitHub实时数据变化情况。
目录
相关文章
|
存储 城市大脑 运维
中国信通院&沙利文最新报告:阿里云混合云全面领先
中国信息通信研究院与国际权威分析机构沙利文(Frost & Sullivan)联合发布《2023 混合云价值影响力矩阵》,报告显示,阿里云是唯一一家全域领导者,在技术表现域、战略布局域以及市场表现域三个维度均排名领先于97%的企业。
1067 1
|
存储 缓存 监控
在Linux中,如何优化虚拟机和容器的性能和资源使用?
在Linux中,如何优化虚拟机和容器的性能和资源使用?
|
Kubernetes Cloud Native 安全
阿里云原生容器服务产品体系-ACK Pro 托管集群
阿里云原生容器服务产品体系-ACK Pro 托管集群
阿里云原生容器服务产品体系-ACK Pro 托管集群
|
数据采集 存储 移动开发
关于数据埋点的认识以及在流量分析系统中的实际使用
关于数据埋点的认识以及在流量分析系统中的实际使用
1673 0
关于数据埋点的认识以及在流量分析系统中的实际使用
|
存储 人工智能 运维
无影分支机构场景解决方案,助力新康众实现门店安全高效管理
无影分支机构场景解决方案助力新康众解决门店PC运维管理难度大、安全风险高等问题,同时让客户门店的工作环境更加整洁,能耗更低。
2934 1
无影分支机构场景解决方案,助力新康众实现门店安全高效管理
|
云安全 运维 供应链
首评 | 阿里云顺利完成国内首个云原生安全成熟度评估
从互联网到零售、金融、制造、交通等,越来越多的行业在利用云原生技术解决实际业务问题。阿里云丰富的云原生安全产品家族保障了阿里巴巴自身的大规模云原生化实践,确保应用全生命周期的云原生安全。同时这些云原生安全能力也支撑了云上百万企业,从基础设施、云原生基础架构、云原生应用、云原生研发运营到云原生安全运维,提升了全链路的安全性及企业安全治理的效率,加速企业的云原生化架构升级,助力企业打造更安全可控、更先进智能的业务体系。
首评 | 阿里云顺利完成国内首个云原生安全成熟度评估
|
存储 传感器 运维
从开源社区到商业核心,EMQ打造物联网数据的全生命周期管理平台
随着5G和物联网技术在各行各业的深度融合,全球物联网应用和设备正处于爆发式增长阶段,或在不久的未来,将真正迎来亿级万物互联的新时代。 “在我们物联网从业者看来,例如汽车、工业领域由于本身就有很多的设备,这些设备会有一个天然的互联需求,形成车联网、工业互联网。”EMQ映云科技(以下简称EMQ)联合创始人兼CPO金发华认为,“物联网一方面可以帮助客户对外把用户体验做得更好,另一方面可以对内实时监控设备的运行状况,提前预警来降低业务中断的风险,从而降低运营成本。因此物联网的需求是一个长远的需求,是一片蓝海市场。”
889 0
|
监控 NoSQL 数据可视化
Redis 官方可视化工具,功能真心强大!高颜值
Redis 官方可视化工具,功能真心强大!高颜值
Redis 官方可视化工具,功能真心强大!高颜值
|
安全 应用服务中间件 网络安全
密码学系列之:在线证书状态协议OCSP详解
我们在进行网页访问的时候会跟各种各样的证书打交道,比如在访问https网页的时候,需要检测https网站的证书有效性。 OCSP就是一种校验协议,用于获取X.509数字证书的撤销状态。它是为了替换CRL而出现的。 本文将会详细介绍OCSP的实现和优点。
密码学系列之:在线证书状态协议OCSP详解
|
云安全 数据采集 弹性计算
阿里云ACP认证考什么?考试费用是多少?
最近几年越来越多的人想进入IT行业,在这之前,他们都选择先考取阿里云ACP认证,这个认证是阿里云旗下的人才认证系统,可以说是打开大厂的敲门砖,是获得良好待遇、丰厚报酬的谈判筹码。
949 0
阿里云ACP认证考什么?考试费用是多少?