【阿里云MVP月度分享】如何基于MYSQL做实时计算?

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS SQL Server,独享型 2核4GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 有时候我们会有这样的场景,在某个接口中,数据已经很规范地存入到一张的MYSQL表中,现在想对这样的数据做一些实时或准实时处理,比如数据多模式存储、异步准实时业务流程、业务实时监控等。

有时候我们会有这样的场景,在某个接口中,数据已经很规范地存入到一张的MYSQL表中,现在想对这样的数据做一些实时或准实时处理,比如数据多模式存储、异步准实时业务流程、业务实时监控等。接口中处理流程如下:
111

最原始的方法,是改动业务代码,将这些额外的处理流程作为同步流程,在更新MYSQL数据之后同步执行。如下图:
222

但是这样的处理流程可能会越来越多,如果一直作为同步流程,整个接口会变得越来越庞大、并且耗时越来越长、出问题的风险越来越高。

所以我会考虑异步处理流程。如果可以改动一下代码,将数据额外写一份儿到队列里,再用flink、storm之类的去消费不就好了么。如下图:
333

但实际上,或许由于架构设计的不规范、或许由于业务场景的繁多,导致在代码中加一遍数据埋点,就如同重构一般的工作量。所以我们需要另一种方式,能实时感知到MYSQL中数据的变化。

MYSQL的binlog可以帮我们记录数据的变化,我们还需要一个工具来收集binlog,并转为我们能读懂的数据。阿里有一款叫canal的开源软件正是做这个用的,可以通过修改源码,增加监控、告警、投递队列功能来实现。但现在,阿里云的日志服务为我们集成了这一功能,我们可以用更短的时间、更少的资源来获得更稳定、更放心的服务。如下图:
4444

日志服务收集binlog的功能还在内测中,不久之后将与大家见面。


比如有这样一个场景,我的MYSQL里有一张订单推送记录表,现在有一个需求,需要将这个表中的数据,按照一定格式再写入一份儿到表格存储TableStore中。
传统的实现方式,是在程序有写入到MYSQL的地方,再加一段代码,写入MYSQL成功后再写入到表格存储中。而现在,为完成这个需求,我选用的技术方案是:

日志服务SLS+流计算StreamCompute+表格存储TableStore


首先使用日志服务,配置对mysql中订单推送记录表所在实例的binlog的收集。

日志服务收集binlog的原理,与canal一致。具体配置这里暂不作叙述。

收集到的日志中,包含:数据库名、表名、事件(row_insert、row_update、row_delete)、全局事务ID、各个字段修改前的值、各个字段修改后的值。

根据场景,我们需要捕获到每个row_insert和row_update操作中,各个字段修改后的值,然后写入到表格存储中。所以我们在流计算中,先配置好日志服务的源表、和表格存储的目标表,中间的逻辑这样写:

INSERT INTO ots_result_order_push (pk, id, order_id, master_id, pull_status
    , offer_time, hire_time, push_time, limit_offer)
SELECT concat(REVERSE(order_id),'|',master_id) as pk,id, order_id, master_id, pull_status, offer_time
    , hire_time, push_time, limit_offer
FROM sls_stream_distribute_canal
WHERE _db_ = 'distribute'
    AND (_event_ = 'row_insert'
        OR _event_ = 'row_update')
    AND _table_ = 'order_push';

即可完成此需求。


再比如有这样一个场景,我的MYSQL里有一张用户信息表,现在想要实时统计每日注册用户数,并通过大屏展示出来。
为完成这个需求,我选用的技术方案是:

日志服务SLS+流计算StreamCompute+表格存储TableStore+数据可视化DataV

首先使用日志服务,配置对mysql中user_info表所在实例的binlog的收集。
根据场景,我们需要捕获到每个row_insert操作的时间,并将时间截取到日期。统计每天有多少条往用户信息表中插入的操作记录。所以我们在流计算中,先配置好日志服务的源表、和表格存储的目标表,中间逻辑这样写:

insert into ots_result_user(pk,val)
select concat('RegUser4General|t|',t.dt) as pk,t.val
from (select from_unixtime(cast(create_time as bigint),'yyyyMMdd') as dt,count(_table_) as val 
from sls_stream_user_service_canal
where _event_ = 'row_insert'
and _table_ = 'user_info'
group by from_unixtime(cast(create_time as bigint),'yyyyMMdd')) t
;

即可完成需求。


异步获取MYSQL数据变化,触发异步流程,避免了多个同步流程可能造成的执行时间过长、或者由于网络原因卡住等等导致的风险。同时,异步流程也可以并行,总体上加快了业务流程的速度,使“一份儿数据、多种处理”变得更加方便快捷。

当然,对于上边作为例子的两个场景来说,文中给出的方案并不是唯一的解决办法,还可以使用函数计算代替流计算实现同样的效果

整套流程全部采用阿里云的服务化产品进行,使得本来全部独立开发需要几天的工作量,可以在几分钟之内搞定,方便快捷,且整套流程都有完善的监控、告警机制,安全放心。

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
17天前
|
存储 容灾 安全
在阿里云RDS(Relational Database Service)迁移前准备目标区域选择
在阿里云RDS(Relational Database Service)迁移前准备目标区域选择
18 3
|
19天前
|
监控 NoSQL 关系型数据库
在进行RDS(例如阿里云的RDS)数据迁移后,评估数据一致性
在进行RDS(例如阿里云的RDS)数据迁移后,评估数据一致性
26 3
|
22天前
|
监控 安全 关系型数据库
在规划阿里云RDS跨区迁移资源和服务可用性
在规划阿里云RDS跨区迁移资源和服务可用性
207 4
|
23天前
|
SQL 运维 关系型数据库
在阿里云RDS(Relational Database Service)进行跨区域迁移
在阿里云RDS(Relational Database Service)进行跨区域迁移
28 2
|
23天前
|
存储 监控 关系型数据库
在规划和准备阿里云RDS(Relational Database Service)跨区域迁移
在规划和准备阿里云RDS(Relational Database Service)跨区域迁移
12 1
|
21天前
|
SQL 监控 关系型数据库
规划阿里云RDS跨区迁移业务需求迁移方案设计
规划阿里云RDS跨区迁移业务需求迁移方案设计
21 5
|
21天前
|
存储 弹性计算 关系型数据库
规划阿里云RDS跨区迁移业务需求数据量与迁移时间预估
规划阿里云RDS跨区迁移业务需求数据量与迁移时间预估
13 4
|
21天前
|
存储 运维 关系型数据库
规划阿里云RDS跨区迁移业务需求业务影响分析
规划阿里云RDS跨区迁移业务需求业务影响分析
13 4
|
22天前
|
监控 容灾 安全
规划阿里云RDS跨区迁移并构建容灾与备份策略
规划阿里云RDS(Relational Database Service)跨区迁移并构建容灾与备份策略
20 2
|
22天前
|
监控 关系型数据库 测试技术
规划阿里云RDS跨区迁移业务需求
规划阿里云RDS(Relational Database Service)跨区迁移业务需求
13 1

热门文章

最新文章