使用Blink CEP实现差值聚合计算

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 本文介绍通过CEP实现实时流上的差值聚合计算。

使用Blink SQL+UDAF实现差值聚合计算介绍了如何使用Blink SQL+UDAF实现实时流上的差值聚合计算,后来在与@付典就业务需求和具体实现方式进行探讨时,付典提出通过CEP实现的思路和方法。
本文介绍通过CEP实现实时流上的差值聚合计算。
感谢@付典在实现过程中的指导。笔者水平有限,若有纰漏,请批评指出。

一、客户需求

电网公司每天采集各个用户的电表数据(格式如下表),其中data_date为电表数据上报时间,cons_id为电表id,r1为电表度数,其他字段与计算逻辑无关,可忽略。为了后续演示方便,仅输入cons_id=100000002的数据。

no(string) data_date(string) cons_id(string) org_no(string) r1(double)
101 20190716 100000002 35401 13.76
101 20190717 100000002 35401 14.12
101 20190718 100000002 35401 16.59
101 20190719 100000002 35401 18.89

表1:输入数据
电网公司希望通过实时计算(Blink)对电表数据处理后,每天得到每个电表最近两天(当天和前一天)的差值数据,结果类似如下表:

cons_id(string) data_date(string) subDegreeR1(double)
100000002 20190717 0.36
100000002 20190718 2.47
100000002 20190719 2.3

表2:期望的输出数据

二、需求分析

根据业务需求以及CEP跨事件模式匹配的特性,定义两个CEP事件e1和e2,输出e2.r1-e1.r1即可得到差值。

三、CEP开发及测试结果

参考复杂事件处理(CEP)语句,CEP代码如下:

CREATE TABLE input_dh_e_mp_read_curve (
    `no`                  VARCHAR,
    data_date             VARCHAR,
    cons_id               VARCHAR,
    org_no                VARCHAR,
    r1                    DOUBLE,
    ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss')
    ,WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (
    type = 'datahub',
    endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',
    roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',
    project = 'jszc_datahub',
    topic = 'input_dh_e_mp_read_curve'
);

CREATE TABLE data_out(
    cons_id varchar
    ,data_date varchar
    ,subDegreeR1 DOUBLE
)with(
    type = 'print'
);

insert into data_out
select
    cons_id,
    data_date,
    subDegreeR1
from input_dh_e_mp_read_curve
MATCH_RECOGNIZE(
    PARTITION BY cons_id
    ORDER BY ts
    MEASURES
        e2.data_date as data_date,
        e2.r1 - e1.r1 as subDegreeR1
    ONE ROW PER MATCH
    AFTER MATCH SKIP TO NEXT ROW
    PATTERN(e1 e2)
    DEFINE
        e1 as TRUE,
        e2 as TRUE
);

由于使用了print connector,从对应的sink的taskmanager.out日志中可以查看到输出如下:

task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006

对比期望输出(表2),20190717和20190718两个窗口的数据均正确,表明业务逻辑正确,但此输出与期望输出有少许差异:
(1)20190719的数据没有输出,这是因为我们设置了watermark,测试环境下20190719之后没有数据进来触发20190719对应的窗口的结束。

四、其他说明

1、对比使用Blink SQL+UDAF实现差值聚合计算(1),我们可以看出使用CEP开发代码非常简洁,所以在跨事件处理的情况下CEP还是非常的合适。从另外一个方面讲,同样的需求有不同的实现方式,所以融会贯通Blink SQL中的各种语法,利用更合适的语法来实现业务需求,将可能大大提升工作效率和业务性能。
2、在实现本案例时,笔者发现使用CEP时有如下需要注意的地方:
(1)partiton by里的字段(如本案的cons_id),默认会带到输出里,若同时在MEASURES中定义,则可能会报类似如下错误:
13_47_33__08_03_2019.jpg
(2)define及其内容必须定义,否则前端页面提示类似如下错误:
图片.png

图片.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
设置VSCode代码编辑器右侧的Minimap代码缩略图滚动条切换显示、隐藏的快捷键Alt+M
设置VSCode代码编辑器右侧的Minimap代码缩略图滚动条切换显示、隐藏的快捷键Alt+M
|
10月前
|
算法
基于遗传优化算法的风力机位置布局matlab仿真
本项目基于遗传优化算法(GA)进行风力机位置布局的MATLAB仿真,旨在最大化风场发电效率。使用MATLAB2022A版本运行,核心代码通过迭代选择、交叉、变异等操作优化风力机布局。输出包括优化收敛曲线和最佳布局图。遗传算法模拟生物进化机制,通过初始化、选择、交叉、变异和精英保留等步骤,在复杂约束条件下找到最优布局方案,提升风场整体能源产出效率。
187 28
|
9月前
|
存储 NoSQL Java
Tablestore集成MCP协议: 标量与向量混合检索的新范式
基于表格存储(Tablestore)实现的MCP(Model Context Protocol)服务,支持文档存储与混合检索工具两大功能。通过Cherry-Studio界面和通义千问qwen-max模型进行演示,展示了文本数据上传、向量嵌入及查询过程。此外,详细说明了Python和Java版本的本地运行步骤、环境配置及二次开发方法,并提供了集成三方工具如Cherry Studio的应用示例。Tablestore凭借混合查询、Serverless低成本、弹性扩展等优势,为MCP场景提供高效解决方案。
861 3
|
12月前
|
搜索推荐 小程序 物联网
基于HarmonyOS 5.0的元服务:技术架构、应用场景与未来发展【探讨】
鸿蒙OS 5.0推出的元服务(Super Service)是一种创新的服务架构,旨在提供无缝的跨设备体验。它具备无感知启动、跨设备共享和智能推送等特点,适用于智能家居、车载系统、即时通讯等场景。与传统应用及微信小程序相比,元服务更轻量、跨平台能力强,且无需下载安装。未来,元服务将通过AI增强智能化,并扩展到更多行业,如智慧医疗、智能零售等,推动物联网和智慧城市的发展。然而,其发展仍面临平台依赖、隐私安全等挑战。
基于HarmonyOS 5.0的元服务:技术架构、应用场景与未来发展【探讨】
|
10月前
|
人工智能 监控 开发者
阿里云PAI发布DeepRec Extension,打造稳定高效的分布式训练,并宣布开源!
阿里云PAI发布DeepRec Extension,打造稳定高效的分布式训练,并宣布开源!
227 0
|
消息中间件 Kafka 数据库
深入理解Kafka的数据一致性原理及其与传统数据库的对比
【8月更文挑战第24天】在分布式系统中,确保数据一致性至关重要。传统数据库利用ACID原则保障事务完整性;相比之下,Kafka作为高性能消息队列,采用副本机制与日志结构确保数据一致性。通过同步所有副本上的数据、维护消息顺序以及支持生产者的幂等性操作,Kafka在不牺牲性能的前提下实现了高可用性和数据可靠性。这些特性使Kafka成为处理大规模数据流的理想工具。
323 6
|
机器学习/深度学习 数据采集 算法
深入理解VGG网络,清晰易懂
深入理解VGG网络,清晰易懂
|
网络虚拟化 网络架构
三层交换机对接路由器配置上网实验
三层交换机简介 三层交换机是具有路由功能的交换机,由于路由属于OSI模型中第三层网络层的功能,所以称为三层交换机。 三层交换机既可以工作在二层也可以工作在三层,可以部署在接入层,也可以部署在汇聚层,作为用户的网关。
299 2
|
Python
Python中的and or not
Python中的and or not
739 1
|
SQL 数据处理 API
实时计算 Flink版产品使用问题之怎么新建自建的doris catalog
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。