【阿里云MVP月度分享】如何基于MYSQL做实时计算?

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: 有时候我们会有这样的场景,在某个接口中,数据已经很规范地存入到一张的MYSQL表中,现在想对这样的数据做一些实时或准实时处理,比如数据多模式存储、异步准实时业务流程、业务实时监控等。

有时候我们会有这样的场景,在某个接口中,数据已经很规范地存入到一张的MYSQL表中,现在想对这样的数据做一些实时或准实时处理,比如数据多模式存储、异步准实时业务流程、业务实时监控等。接口中处理流程如下:
111

最原始的方法,是改动业务代码,将这些额外的处理流程作为同步流程,在更新MYSQL数据之后同步执行。如下图:
222

但是这样的处理流程可能会越来越多,如果一直作为同步流程,整个接口会变得越来越庞大、并且耗时越来越长、出问题的风险越来越高。

所以我会考虑异步处理流程。如果可以改动一下代码,将数据额外写一份儿到队列里,再用flink、storm之类的去消费不就好了么。如下图:
333

但实际上,或许由于架构设计的不规范、或许由于业务场景的繁多,导致在代码中加一遍数据埋点,就如同重构一般的工作量。所以我们需要另一种方式,能实时感知到MYSQL中数据的变化。

MYSQL的binlog可以帮我们记录数据的变化,我们还需要一个工具来收集binlog,并转为我们能读懂的数据。阿里有一款叫canal的开源软件正是做这个用的,可以通过修改源码,增加监控、告警、投递队列功能来实现。但现在,阿里云的日志服务为我们集成了这一功能,我们可以用更短的时间、更少的资源来获得更稳定、更放心的服务。如下图:
4444

日志服务收集binlog的功能还在内测中,不久之后将与大家见面。


比如有这样一个场景,我的MYSQL里有一张订单推送记录表,现在有一个需求,需要将这个表中的数据,按照一定格式再写入一份儿到表格存储TableStore中。
传统的实现方式,是在程序有写入到MYSQL的地方,再加一段代码,写入MYSQL成功后再写入到表格存储中。而现在,为完成这个需求,我选用的技术方案是:

日志服务SLS+流计算StreamCompute+表格存储TableStore


首先使用日志服务,配置对mysql中订单推送记录表所在实例的binlog的收集。

日志服务收集binlog的原理,与canal一致。具体配置这里暂不作叙述。

收集到的日志中,包含:数据库名、表名、事件(row_insert、row_update、row_delete)、全局事务ID、各个字段修改前的值、各个字段修改后的值。

根据场景,我们需要捕获到每个row_insert和row_update操作中,各个字段修改后的值,然后写入到表格存储中。所以我们在流计算中,先配置好日志服务的源表、和表格存储的目标表,中间的逻辑这样写:

INSERT INTO ots_result_order_push (pk, id, order_id, master_id, pull_status
    , offer_time, hire_time, push_time, limit_offer)
SELECT concat(REVERSE(order_id),'|',master_id) as pk,id, order_id, master_id, pull_status, offer_time
    , hire_time, push_time, limit_offer
FROM sls_stream_distribute_canal
WHERE _db_ = 'distribute'
    AND (_event_ = 'row_insert'
        OR _event_ = 'row_update')
    AND _table_ = 'order_push';

即可完成此需求。


再比如有这样一个场景,我的MYSQL里有一张用户信息表,现在想要实时统计每日注册用户数,并通过大屏展示出来。
为完成这个需求,我选用的技术方案是:

日志服务SLS+流计算StreamCompute+表格存储TableStore+数据可视化DataV

首先使用日志服务,配置对mysql中user_info表所在实例的binlog的收集。
根据场景,我们需要捕获到每个row_insert操作的时间,并将时间截取到日期。统计每天有多少条往用户信息表中插入的操作记录。所以我们在流计算中,先配置好日志服务的源表、和表格存储的目标表,中间逻辑这样写:

insert into ots_result_user(pk,val)
select concat('RegUser4General|t|',t.dt) as pk,t.val
from (select from_unixtime(cast(create_time as bigint),'yyyyMMdd') as dt,count(_table_) as val 
from sls_stream_user_service_canal
where _event_ = 'row_insert'
and _table_ = 'user_info'
group by from_unixtime(cast(create_time as bigint),'yyyyMMdd')) t
;

即可完成需求。


异步获取MYSQL数据变化,触发异步流程,避免了多个同步流程可能造成的执行时间过长、或者由于网络原因卡住等等导致的风险。同时,异步流程也可以并行,总体上加快了业务流程的速度,使“一份儿数据、多种处理”变得更加方便快捷。

当然,对于上边作为例子的两个场景来说,文中给出的方案并不是唯一的解决办法,还可以使用函数计算代替流计算实现同样的效果

整套流程全部采用阿里云的服务化产品进行,使得本来全部独立开发需要几天的工作量,可以在几分钟之内搞定,方便快捷,且整套流程都有完善的监控、告警机制,安全放心。

相关实践学习
自建数据库迁移到云数据库
本场景将引导您将网站的自建数据库平滑迁移至云数据库RDS。通过使用RDS,您可以获得稳定、可靠和安全的企业级数据库服务,可以更加专注于发展核心业务,无需过多担心数据库的管理和维护。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
目录
相关文章
|
6月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
1108 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
6月前
|
存储 运维 分布式计算
零售数据湖的进化之路:滔搏从Lambda架构到阿里云Flink+Paimon统一架构的实战实践
在数字化浪潮席卷全球的今天,传统零售企业面临着前所未有的技术挑战和转型压力。本文整理自 Flink Forward Asia 2025 城市巡回上海站,滔搏技术负责人分享了滔搏从传统 Lambda 架构向阿里云实时计算 Flink 版+Paimon 统一架构转型的完整实战历程。这不仅是一次技术架构的重大升级,更是中国零售企业拥抱实时数据湖仓一体化的典型案例。
461 0
|
7月前
|
关系型数据库 MySQL 分布式数据库
阿里云PolarDB云原生数据库收费价格:MySQL和PostgreSQL详细介绍
阿里云PolarDB兼容MySQL、PostgreSQL及Oracle语法,支持集中式与分布式架构。标准版2核4G年费1116元起,企业版最高性能达4核16G,支持HTAP与多级高可用,广泛应用于金融、政务、互联网等领域,TCO成本降低50%。
|
消息中间件 关系型数据库 Kafka
阿里云基于 Flink CDC 的现代数据栈云上实践
阿里云基于 Flink CDC 的现代数据栈云上实践
258 1
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1292 0
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
1023 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
3267 45
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
568 16
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
589 17

推荐镜像

更多
下一篇
开通oss服务