融合数据库生态:利用EventBridge构建CDC应用

简介: CDC(Change Data Capture)指的是监听上游数据变更,并将变更信息同步到下游业务以供进一步处理的一种应用场景。近年来事件驱动架构(EDA)热度逐步上升,日渐成为项目架构设计者的第一选择。EDA天然契合CDC的底层基础架构,其将数据变更作为事件,各个服务通过监听自己感兴趣的事件来完成...

CDC(Change Data Capture)指的是监听上游数据变更,并将变更信息同步到下游业务以供进一步处理的一种应用场景。近年来事件驱动架构(EDA)热度逐步上升,日渐成为项目架构设计者的第一选择。EDA天然契合CDC的底层基础架构,其将数据变更作为事件,各个服务通过监听自己感兴趣的事件来完成一些列业务驱动。阿里云EventBridge是阿里云推出的一款无服务器事件总线服务,能够帮助用户轻松快捷地搭建基于EDA架构的应用。近期,EventBridge 事件流已经支持了基于阿里云DTS服务的CDC能力。本文将从CDC、CDC在EventBridge上的应用以及若干最佳实践场景等方面,为大家介绍如何利用EventBridge轻松构建CDC应用。

1. CDC概述

基本原理与应用场景

CDC从源数据库捕获增量的数据以及数据模式变更,以高可靠、低延时的数据传输将这些变更有序地同步到目标数据库、数据湖或者其他数据分析服务。目前业界主流的开源CDC工具包括DebeziumCanal以及Maxwell

cdc-1.png

图片来源:https://dbconvert.com

目前业界主要有以下几类CDC的实现:

1. 基于时间戳或版本号

数据库表有一列代表时间戳,当存在插入或更新时,对应时间戳就会随之更新。周期性检索更新时间大于上次同步时间的数据记录,即可捕获本周期内数据的变更。基于版本号跟踪和基于时间戳跟踪原理基本一致,要求开发者变更数据时必须更新数据的版本号。

2.基于快照

基于快照的CDC实现在存储层面使用到了数据源3份副本,分别是原始数据、先前快照和当前快照。通过对比2次快照之间的差异来获取这之间的数据变更内容。

3.基于触发器

基于触发器的CDC实现方式事实上是在源表上建立触发器将对数据的变更操作(INSERT、UPDATE、DELETE)记录存储下来。例如专门建立一张表记录用户的变更操作,随后创建INSERT、UPDATE、DELETE三种类型的触发器将用户变更同步到此表。

4.基于日志

以上三种方式都对源数据库存在一定侵入性,而基于日志的方式则是一种非侵入性的CDC方式。数据库利用事务日志实现灾备,例如MySQL的binlog就记录了用户对数据库的所有变更操作。基于日志的CDC通过持续监听事务日志来实时获取数据库的变化情况。

CDC的应用场景广泛,包括但不限于这些方面:如异地机房数据库同步、异构数据库数据同步、微服务解耦、缓存更新与CQRS等。

基于阿里云的CDC解决方案:DTS

数据传输服务DTS(Data Transmission Service)是阿里云提供的实时数据流服务,支持关系型数据库(RDBMS)、非关系型的数据库(NoSQL)、数据多维分析(OLAP)等数据源间的数据交互,集数据同步、迁移、订阅、集成、加工于一体。其中,DTS数据订阅功能可以帮助用户获取自建MySQL、RDS MySQL、Oracle等数据库的实时增量数据。

dts数据订阅.png

2. CDC在EventBridge上的应用

阿里云EventBridge提供了事件总线事件流2款不同应用场景的事件路由服务。

事件总线底层拥有事件的持久化能力,可以按照需要将事件路由到多个事件目标中。

事件流适用于端到端的流式数据处理场景,对源端产生的事件实时抽取、转换和分析并加载至目标端,无需创建事件总线,端到端转储效率更高,使用更轻便。

为了更好地支持用户在CDC场景下的需求,EventBridge在事件流源端支持了阿里云DTS的数据订阅功能,用户仅需简单配置,即可将数据库变更信息同步到EventBridge事件流。

EventBridge定制了基于DTS sdk的DTS Source Connector。当用户配置事件提供方为DTS的事件流时,source connector会实时地从DTS服务端拉取DTS record数据。数据拉取到本地后,会进行一定的结构封装,保留id、operationType、topicPartition、beforeImage、afterImage等数据,同时增加streaming event所需要的一些系统属性。

DTS Event样例可参考(部分字段省略,详情可参考EventBridge官方文档)

{
  "data": {
    "id": 3218099,
    "topicPartition": {},
    "offset": 3218099,
    "sourceTimestamp": 1654847757,
    "operationType": "UPDATE",
    "schema": {},
    "beforeImage": {},
    "afterImage": {} 
  },
  "id": "12f701a43741d404fa9a7be89d9acae0-3218099",
  "source": "DTSstreamDemo",
  "specversion": "1.0",
  "type": "dts:ConsumeMessage",
  "datacontenttype": "application/json; charset=utf-8",
  "time": "2022-06-10T07:55:57Z",
  "subject": "acs:dts:cn-hangzhou:123456789:abcd123456/dtsabcdet1ro"
}

EventBridge Streaming保证了DTS事件的顺序性,但可能存在事件重复投递的可能性,EventId在保证了和每条DTS record的一一映射关系,用户可依据此字段来对事件做幂等处理。

创建源为DTS的EventBridge事件流

下面展示如何在EventBridge控制台创建源为DTS的事件流

前期准备

  • 开通EventBridge服务;

  • 创建DTS数据订阅任务;

  • 创建用于消费订阅数据的消费组账号信息。

创建事件流

  1. 登录EventBridge控制台,点击左侧导航栏,选择“事件流”,在事件流列表页点击“创建事件流”;

  2. “基本信息”中“事件流名称”与“描述”按照需要填写即可;

  3. 在创建事件流,选择事件提供方时,下拉框选择“数据库DTS”;

  4. 在“数据订阅任务”一栏中选择已创建的DTS数据订阅任务。在消费组一栏,选择要使用哪个消费组消费订阅数据,同时填写消费组密码与初始消费时间;

创建事件流.jpg

  1. 事件流规则与目标按照需要填写,保存启动即可创建以DTS数据订阅为事件源的事件流。

事件流列表页.jpg

注意事项

使用时有以下几点需要注意

  1. EventBridge使用的是SUBSCRIBE消费模式,所以请保证当前DTS消费组没有其他客户端实例在运行。如果设置的消费组在之前有运行,则传入的位点失效,会基于此消费组上次消费过的位点继续消费;

  2. 创建DTS事件源时传入的位点仅在新消费组第一次运行时起效,后续任务重启后会基于上次消费位点继续消费

  3. EventBridge事件流订阅OperationType为INSERT、DELETE、UPDATE、DDL类型的DTS数据;

  4. 使用DTS事件源可能会有消息重复,即保证消息不丢,但无法保证仅投递一次,建议用户做好幂等处理;

  5. 用户如果需要保证顺序消费,则需要将异常容忍策略设置为“NONE”,即不容忍异常。在这种情况下,如果事件流目标端消费消息异常,整个事件流将暂停,直至恢复目标端正常。

最佳实践示例

1. 基于EventBridge实现CQRS

在CQRS(Command Query Responsibility Segregation)模型中,命令模型用于执行写以及更新操作,查询模型用于支持高效的读操作。读操作和写操作使用的数据模型存在一定区别,需要使用一定方式保证数据的同步,基于EventBridge事件流的CDC刚好可以满足这样的需求。

基于云上服务,用户可以使用如下方式轻松构建基于EventBridge的CQRS

  1. 命令模型最终操作数据库进行变更,查询模型读取elasticsearch获取数据;

  2. 开启DTS数据订阅任务,捕获DB变更内容;

  3. 配置EventBridge事件流,事件提供方为DTS数据订阅任务,事件接收方为函数计算FC;

  4. FC中的服务即为更新elasticsearch数据操作。

2. 微服务解耦

CDC也可以用于微服务解耦。例如下文是一个电商平台的订单处理系统,当有新建的未付款订单产生时,数据库会有一条INSERT操作,而当某笔订单状态由“未付款”变为“已付款”时,数据库会有一条UPDATE操作。根据订单状态变化的不同,后端会有不同的微服务来对此进行处理。

  1. 用户下单/付款,订单系统进行业务处理,将数据变更写入DB;

  2. 新建DTS订阅任务捕获DB数据变更;

  3. 搭建EventBridge 事件流。事件提供方为DTS数据订阅任务,事件接收方为RocketMQ;

  4. 在消费RocketMQ数据时,同一个topic下启用3个group代表不同的业务消费逻辑;

  1. GroupA 将捕获到的DB变更用户缓存更新,便于用户查询订单状态;

  2. GroupB 下游关联财务系统,仅处理新建订单,即处理DB 操作类型为INSERT的事件,丢弃其余类型事件;

  3. GroupC 仅关心订单状态由“未付款”变为“已付款”的事件,当有符合条件事件到达时,调用下游物流、仓储系统,对订单进行进一步处理。

如果采用接口调用方式,那么用户在下单之后订单系统将分别需要调用缓存更新接口、新建订单接口以及订单付款接口,业务耦合性过高。除此之外,这种模式使得数据消费端不用担心上游订单处理接口返回内容的语义信息,在存储模型不变的情况下,直接从数据层面判断此次数据变更是否需要处理已经需要怎样的处理。同时,RocketMQ天然的消息堆积能力也可以帮助用户在订单峰值到来时实现业务削峰填谷。

事实上,目前EventBridge Streaming支持的消息产品还包括RabbitMQ、Kafka、MNS,在实际操作中用户可以根据自己的需要进行选择。

3. 数据库备份&异构数据库同步

数据库备份容灾和异构数据库数据同步也是CDC重要的应用场景。使用阿里云EventBridge可以快速搭建此类应用。

  1. 新建DTS数据订阅任务,捕获用户MySQL数据库变更;

  2. 搭建EventBridge 事件流。事件提供方为DTS数据订阅任务;

  3. 使用EventBridge在目的数据库执行指定sql,以实现数据库备份;

  4. 事件投递到函数计算,用户业务根据数据变化内容更新对应异构数据库。

4.自建SQL审计

对于用户有自建SQL审计的需求,使用EventBridge也可以轻松实现。

  1. 用户新建DTS数据订阅任务,捕获数据库变更;

  2. 搭建EventBridge 事件流,事件提供方为DTS,事件接收方为日志服务SLS;

  3. 用户需要对SQL进行审计时,通过查询SLS进行。

3. 总结

本文介绍了CDC的一些概念、CDC在EventBridge上的应用以及若干最佳实践场景。随着支持产品的不断增加,EventBridge所承载的生态版图也不断扩大,从消息生态到数据库生态,从日志生态到大数据生态,EventBridge不断扩大其适用领域,巩固云上事件枢纽的地位,此后也将按照这个方向继续发展,技术做深,生态做广。

作者介绍
目录