使用Blink CEP实现差值聚合计算

简介: 本文介绍通过CEP实现实时流上的差值聚合计算。

使用Blink SQL+UDAF实现差值聚合计算介绍了如何使用Blink SQL+UDAF实现实时流上的差值聚合计算,后来在与@付典就业务需求和具体实现方式进行探讨时,付典提出通过CEP实现的思路和方法。
本文介绍通过CEP实现实时流上的差值聚合计算。
感谢@付典在实现过程中的指导。笔者水平有限,若有纰漏,请批评指出。

一、客户需求

电网公司每天采集各个用户的电表数据(格式如下表),其中data_date为电表数据上报时间,cons_id为电表id,r1为电表度数,其他字段与计算逻辑无关,可忽略。为了后续演示方便,仅输入cons_id=100000002的数据。

no(string) data_date(string) cons_id(string) org_no(string) r1(double)
101 20190716 100000002 35401 13.76
101 20190717 100000002 35401 14.12
101 20190718 100000002 35401 16.59
101 20190719 100000002 35401 18.89

表1:输入数据
电网公司希望通过实时计算(Blink)对电表数据处理后,每天得到每个电表最近两天(当天和前一天)的差值数据,结果类似如下表:

cons_id(string) data_date(string) subDegreeR1(double)
100000002 20190717 0.36
100000002 20190718 2.47
100000002 20190719 2.3

表2:期望的输出数据

二、需求分析

根据业务需求以及CEP跨事件模式匹配的特性,定义两个CEP事件e1和e2,输出e2.r1-e1.r1即可得到差值。

三、CEP开发及测试结果

参考复杂事件处理(CEP)语句,CEP代码如下:

CREATE TABLE input_dh_e_mp_read_curve (
    `no`                  VARCHAR,
    data_date             VARCHAR,
    cons_id               VARCHAR,
    org_no                VARCHAR,
    r1                    DOUBLE,
    ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss')
    ,WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (
    type = 'datahub',
    endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',
    roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',
    project = 'jszc_datahub',
    topic = 'input_dh_e_mp_read_curve'
);

CREATE TABLE data_out(
    cons_id varchar
    ,data_date varchar
    ,subDegreeR1 DOUBLE
)with(
    type = 'print'
);

insert into data_out
select
    cons_id,
    data_date,
    subDegreeR1
from input_dh_e_mp_read_curve
MATCH_RECOGNIZE(
    PARTITION BY cons_id
    ORDER BY ts
    MEASURES
        e2.data_date as data_date,
        e2.r1 - e1.r1 as subDegreeR1
    ONE ROW PER MATCH
    AFTER MATCH SKIP TO NEXT ROW
    PATTERN(e1 e2)
    DEFINE
        e1 as TRUE,
        e2 as TRUE
);

由于使用了print connector,从对应的sink的taskmanager.out日志中可以查看到输出如下:

task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006

对比期望输出(表2),20190717和20190718两个窗口的数据均正确,表明业务逻辑正确,但此输出与期望输出有少许差异:
(1)20190719的数据没有输出,这是因为我们设置了watermark,测试环境下20190719之后没有数据进来触发20190719对应的窗口的结束。

四、其他说明

1、对比使用Blink SQL+UDAF实现差值聚合计算(1),我们可以看出使用CEP开发代码非常简洁,所以在跨事件处理的情况下CEP还是非常的合适。从另外一个方面讲,同样的需求有不同的实现方式,所以融会贯通Blink SQL中的各种语法,利用更合适的语法来实现业务需求,将可能大大提升工作效率和业务性能。
2、在实现本案例时,笔者发现使用CEP时有如下需要注意的地方:
(1)partiton by里的字段(如本案的cons_id),默认会带到输出里,若同时在MEASURES中定义,则可能会报类似如下错误:
13_47_33__08_03_2019.jpg
(2)define及其内容必须定义,否则前端页面提示类似如下错误:
图片.png

图片.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
phpenv:PHP多版本安装和管理工具
phpenv:PHP多版本安装和管理工具
1511 0
设置VSCode代码编辑器右侧的Minimap代码缩略图滚动条切换显示、隐藏的快捷键Alt+M
设置VSCode代码编辑器右侧的Minimap代码缩略图滚动条切换显示、隐藏的快捷键Alt+M
|
10月前
|
Java
Java 中 `toList()` 与 `collect(Collectors.toList())` 的微妙差异:别再乱用了!
Java 中 `toList()` 与 `collect(Collectors.toList())` 的微妙差异:别再乱用了!
798 0
|
存储 NoSQL Java
Tablestore集成MCP协议: 标量与向量混合检索的新范式
基于表格存储(Tablestore)实现的MCP(Model Context Protocol)服务,支持文档存储与混合检索工具两大功能。通过Cherry-Studio界面和通义千问qwen-max模型进行演示,展示了文本数据上传、向量嵌入及查询过程。此外,详细说明了Python和Java版本的本地运行步骤、环境配置及二次开发方法,并提供了集成三方工具如Cherry Studio的应用示例。Tablestore凭借混合查询、Serverless低成本、弹性扩展等优势,为MCP场景提供高效解决方案。
1172 3
|
存储 安全 程序员
47.9K star!全平台开源笔记神器,隐私安全首选!
Joplin 是一款开源的笔记记录和待办事项应用,拥有 47.9K star,支持 Windows、macOS、Linux、iOS 和 Android 全平台同步。它采用端到端加密,确保数据隐私安全,支持 Markdown 编辑、数学公式、流程图等丰富功能,并可通过插件扩展实现更多定制化需求。Joplin 完美替代商业笔记软件,适用于程序员知识库、个人事务管理及团队协作等多种场景。
1745 1
|
弹性计算 运维 安全
阿里云操作系统迁移最佳实践|飞天技术沙龙-CentOS 迁移替换专场
本次方案的主题是阿里云操作系统迁移最佳实践,Alibaba Cloud Linux /Anolis OS 兼容 CentOS 生态,因此能够很丝滑的进行迁移替换。无论是对企业的运维人员,还是对企业操作系统的使用者来说,相对简化了它的维护成本。通过 SMC 操作系统迁移实践带用户深入了解,不仅阐述了原地迁移方案的独特优势,还针对不同的迁移场景,逐步剖析了整个迁移流程,力求使复杂的操作变得直观易懂,实现了真正的“白屏化”体验。 1. CentOS 迁移背景 2. 操作系统迁移实践 3. 迁移故障处理
512 2
|
小程序 Android开发 iOS开发
微信小程序-虚拟支付:适用场景 / iPhone调试用支付成功,Android调用失败,提示“小程序支付能力已被限制” / “errMsg“.“requestPayment:fail banned”
微信小程序-虚拟支付:适用场景 / iPhone调试用支付成功,Android调用失败,提示“小程序支付能力已被限制” / “errMsg“.“requestPayment:fail banned”
1953 0
|
消息中间件 Kafka 数据库
深入理解Kafka的数据一致性原理及其与传统数据库的对比
【8月更文挑战第24天】在分布式系统中,确保数据一致性至关重要。传统数据库利用ACID原则保障事务完整性;相比之下,Kafka作为高性能消息队列,采用副本机制与日志结构确保数据一致性。通过同步所有副本上的数据、维护消息顺序以及支持生产者的幂等性操作,Kafka在不牺牲性能的前提下实现了高可用性和数据可靠性。这些特性使Kafka成为处理大规模数据流的理想工具。
457 6
|
弹性计算
阿里云带宽计费模式按固定宽带和按使用流量区别对比及选择方法
阿里云服务器公网IP带宽计费方式分为按固定宽带和按使用流量,按固定宽带和按使用流量有什么区别?阿里云公网带宽计费模式如何选择?需要根据大家实际的应用场景来选择
6377 0
阿里云带宽计费模式按固定宽带和按使用流量区别对比及选择方法
|
网络虚拟化 网络架构
三层交换机对接路由器配置上网实验
三层交换机简介 三层交换机是具有路由功能的交换机,由于路由属于OSI模型中第三层网络层的功能,所以称为三层交换机。 三层交换机既可以工作在二层也可以工作在三层,可以部署在接入层,也可以部署在汇聚层,作为用户的网关。
468 2