基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 前言 在基于 MySQL + Tablestore 分层存储架构的大规模订单系统中,利用 CDC 技术将 MySQL 数据同步到 Tablestore 是不可缺少的一步。前文已经详细讲述了如何使用 DTS 向 Tablestore 同步数据。对于中小规模的数据库,或者个人开发者,还可以使用 Canal 从 MySQL 向 Tablestore 同步数据。Canal 部署简单,易于运维,且相比于 D

前言

在基于 MySQL + Tablestore 分层存储架构的大规模订单系统中,利用 CDC 技术将 MySQL 数据同步到 Tablestore 是不可缺少的一步。前文已经详细讲述了如何使用 DTS 向 Tablestore 同步数据。对于中小规模的数据库,或者个人开发者,还可以使用 Canal 从 MySQL 向 Tablestore 同步数据。Canal 部署简单,易于运维,且相比于 DTS,它成本更低,因此它更适合小规模的数据同步。

Canal 简介

Canal 是阿里开源 CDC 工具,他可以获取 MySQL binlog 解析,并将数据变动传输给下游。详情可参考Canal官网。基于 Canal,可以实现从 MySQL 到其他数据库的实时同步。Canal 部署简单、成本低,适合中小规模 Mysql 数据库同步工作。其架构如图:

Deployer 负责拉取 Binlog,解析数据,分发,记录位点。而 Client-adapter 负责接收上游数据,通过 Adapter适配器,将数据持久化到目标库。Deployer 和 Client-Adapter 作为 Canal 中的两个模块,分别独立部署。Tablestore 团队已经在 Adapter 中增加了 TablestoreAdapter,可以支持向 Tablestore 中写入数据。


下面,我们将部署 Canal,并将处于 Rds 中的 MySQL 订单数据同步进入 Tablestore,实现数据全量、增量的同步。

Canal 部署

环境准备

准备部署 canal 程序的机器。本文中在阿里云官网申请了一台 8 vCPU,16 GiB内存的 Linux 机器作为部署机器。如果读者同样需要申请 ECS,请参考:ECS入门概述

源表准备

使用下面 SQL 在 MySQL 中新建测试表 order_contract_canal,其表结构与 order_contract 相同。

CREATE TABLE `order_contract_canal` (
  `oId` varchar(50) NOT NULL,
  `create_time` datetime NOT NULL COMMENT '下单时间',
  `pay_time` datetime DEFAULT NULL COMMENT '支付时间',
  `has_paid` tinyint(4) DEFAULT NULL COMMENT '是否已经支付',
  `c_id` varchar(20) DEFAULT NULL COMMENT '消费者id',
  `c_name` varchar(20) DEFAULT NULL COMMENT '消费者姓名',
  `p_brand` tinytext COMMENT '产品品牌',
  `p_count` mediumint(9) DEFAULT NULL COMMENT '产品数量',
  `p_id` varchar(20) DEFAULT NULL COMMENT '产品id',
  `p_name` varchar(20) DEFAULT NULL COMMENT '产品名',
  `p_price` decimal(16,2) DEFAULT NULL COMMENT '产品价格',
  `s_id` varchar(20) DEFAULT NULL COMMENT '店铺id',
  `s_name` varchar(20) DEFAULT NULL COMMENT '店铺名称',
  `total_price` decimal(16,2) DEFAULT NULL COMMENT '总价格',
  PRIMARY KEY (`oId`),
  KEY `idx_sid` (`s_id`),
  KEY `idx_paytime_sid` (`pay_time`,`s_id`) USING BTREE,
  KEY `idx_cid` (`c_id`),
  KEY `idx_paytime_cid_totalprice` (`pay_time`,`c_id`,`total_price`) USING BTREE,
  KEY `idx_sid_paytime` (`s_id`,`pay_time`),
  KEY `idx_sid_paytime_totalprice` (`s_id`,`pay_time`,`total_price`),
  KEY `idx_paytime_totalprice_pbrand` (`p_price`,`total_price`,`pay_time`) USING BTREE,
  KEY `idx_paytime` (`pay_time`),
  KEY `idx_pbrand_paytime` (`p_brand`(10),`pay_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

目标表准

在 Tablestore 中创建表 canal_target_order作为测试表,使其表结构与订单表 order_contract 相同。其表结构如图:

Deployer部署

在 Canal 官方 release页,下载 canal.deployer 包。将包解压后,配置 conf 路径下的配置文件,然后就可以通过 bin 路径下的启动脚本启动。配置 MySQL Binlog 模式以及部署 Canal Deployer 的具体步骤可以参考 Canal 官方文档的 QuickStart


本文创建新的实例,在 Deployer 的 conf 目录下创建文件夹 test_ots。将 conf/example/instance.properties 复制到 test_ots 路径下。然后修改 test_ots 路径下的 instance.properties 配置文件。需要关注如下配置项:

参数

说明

canal.instance.master.address

rm-bp15p07134rkvf7z6wo.mysql.rds.aliyuncs.com:3306

数据库域名端口

canal.instance.rds.accesskey

***

本文 MySQL 为阿里云产品 RDS,需填入对应accessKey。若非 RDS 库,此项不用填写。

canal.instance.rds.secretkey

***

本文 MySQL 为阿里云产品 RDS,需填入对应secretkey。若非 RDS 库,此项不用填写。

canal.instance.rds.instanceId

rm-bp15p07134rkvf7z6

本文 MySQL 为阿里云产品 RDS,需填入对应示例 id。若非 RDS 库,此项不用填写。

canal.instance.dbUsername

***

数据库账号用户名

canal.instance.dbPassword

***

数据库账号密码

canal.instance.filter.regex

test_ots\\.[test|order_contract_canal].*

Canal 实例关注的表。通过正则表达式匹配。

这里匹配 test_ots 库下表名以 test 开头或者以 order_contract_canal 开头的表

canal.destinations

test_ots

canal 的实例名称,需要配置文件所在上层路径相同,本例路径为 conf/test_ots /instance.properties,那么实例名为test_ots

ClientAdapter部署

在 Canal 官方 release页,下载 canal.adapter 包。将包解压后,配置 conf 路径下的配置文件,然后就可以通过 bin 路径下的启动脚本启动。包解压后,若 plugin 路径下不存在以 client-adapter.tablestore 开头的 jar 包,说明此安装包不包含 Tablestore 对接部分代码


conf 路径下 application.yml中需要额外关注配置见表。

参数

说明

是否必填

canal.conf:

canalAdapters:instance

test_ots

与depolyer中的destinations保持一致

canal.conf:

canalAdapters:outerAdapters:

-name:

tablestore

定义适配器类型,填入 tablestore 说明此适配器下游写入 Tablestore

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.endpoint


ablestore endpoint

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretId

****

AccessSecretId

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretKey

****

AccessSecretKey

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.instanceName

test-20210609

tablestore 中的 InstanceName

canal.conf: terminateOnException

true

默认为false。若配置为true,则若数据同步重试后仍失败,程序会暂停实时同步任务,等待用户手动处理

完整配置 application.yml 配置如下

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 3
  timeout:
  accessKey:
  secretKey:
  terminateOnException: true
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://rm-bp15p07134rkvf7z6wo.mysql.rds.aliyuncs.com:3306/test_ots?useUnicode=true
      username: ****
      password: ****
  canalAdapters:
  - instance: test_ots # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: tablestore
        key: ts
        properties:
          tablestore.endpoint: https://test-20210609.cn-hangzhou.ots.aliyuncs.com
          tablestore.accessSecretId: ****
          tablestore.accessSecretKey: ****
          tablestore.instanceName: test-20210609

在 conf/tablestore 路径下,创建 order.yml 文件,填入以下内容。配置表示从源库表 test_ots. order_contract_canal 向目标表 canal_target_order 同步数据。

dataSourceKey: defaultDS
destination: test_ots
groupId: g1
outerAdapterKey: ts
threads: 8
updateChangeColumns: false
dbMapping:
  database: test_ots
  table: order_contract_canal
  targetTable: canal_target_order
  targetPk:
    oId: oId
  targetColumns:
    oId:
    create_time:
    pay_time:
    has_paid:
    c_id:
    c_name:
    p_brand:
    p_count:
    p_id:
    p_name:
    p_price:
    s_id:
    s_name:
    total_price:
  etlCondition: 
  commitBatch: 200 # 批量提交的大小

其中各参数含义见表。

参数

说明

是否必填

dataSourceKey

该任务的源数据库标识,在application.yml中可以找到该标识对应的数据库

destination

canal实例名,与application.yml下的instance相同

groupId

分组id,MQ模式下使用,这里不关心,配置成application.yml中canalAdapters中相同即可

outerAdapterKey

使用的Adapter标识,应与application.yml中outerAdapters下的key值相同

threads

筒数量,默认为1,对应tablestorewriter中的bucket数量

dbMapping.database

源库名

dbMapping.table

源表名

dbMapping.targetTable

目标表

dbMapping.targetPk

主键配置

id: target_id 源表主键:目标表主键。多主键可以配置多个

dbMapping.targetColumns

配置需要同步的列名,以及列映射,可以配置类型转换。

id: target_id$string,表示id字段同步后为target_id字段,且类型映射为string。

id: target_id,表示id字段同步后为target_id字段

id: ,表示id字段同步前后字段名不变,字段类型采用默认映射。

id: $string 功能等同于id: id$string

dbMapping.etlCondition

全量抽取数据时的过滤条件

dbMapping.commitBatch

一次批量RPC请求导入的行数,对应tablestorewriter中的maxBatchRowsCount,默认取writerConfig中的默认值200

updateChangeColumns

行覆盖或行更新。

默认为false,为行覆盖,即记录更新时,使用该记录最新整行值覆盖 Tablestore 中的老记录。若为true,为行更新,即记录更新时,只对变化的字段进行操作。

全量同步数据

使用程序向原始表 order_contract_canal 中插入 1 千万行记录。

同步数据

调用 client-adapter 服务的方法触发同步任务。指令格式为

curl "localhost:8081/etl/{type}/{key}/{task}" -X POST

type 为下游数据库类型;key 是 adapter key;task 为任务配置文件的名称。在本文中,指令为:

curl "localhost:8081/etl/tablestore/ts/order.yml" -X POST

程序会首先中止增量数据传输,然后同步全量历史数据。同步开始后,可以在日志中看到 Adapter 中 TablestoreWriter 的传输日志变化。

性能测试

在 Tablestore 监控页面查看数据写入速率,首先进入 Tablestore控制台。点击对应实例进入实例详情页。

点击表 canal_target_order。

点击监控指标进入监控页面。

同步任务开始后,在监控页面可以看到数据如图。此时 Canal 所在机器配置为 8 核 16G,order.yml 中 threads 配置为 8。源库记录数在 1千万,每行数据大小约 0.5KB。可以看到在任务开始的时候,并发写入 Tablestore 速率很高,在 2w行/s 左右。而随着任务的进行,写入速率开始下降,这是由于全量导入数据时从源库获取数据时使用 limit offset 导致的,受限于上游数据的获取。

1千万数据完成写入共耗时 28 分钟。耗时统计见下表。可以看到,在数据导入初期,导入速率相对较快,而数据导入后期,导入效率明显降低。时间统计和控制台中监控数据吻合。

从程序开始到完成导入

使用时间

完成 300w 行导入

3m

完成 400w 行导入

5m

完成 500w 行导入

8m

完成 600w 行导入

11m

完成 700w 行导入

14m

完成 800w 行导入

18m

完成 900w 行导入

22.5m

完成 1000w 行导入

28m


增量同步数据

使用附录中的程序中的接口("/canal/press"),向原始表持续写入数据。

性能测试

输入如下指令调用接口,使用 3 线程写入数据,每个线程每秒写入 4000 行记录。

curl "localhost:8082/canal/press?rps=4000&threads=3"  -X POST

在控制台可以看到数据持续写入,速率约在 1.2w 行/s。

在当前 8 核 16G 的机器配置下,继续增加并发写入量,写入 1.6w 到 2w 行每秒,测试出的增量同步上限约在 1.5w 行/s,每行记录约 0.5KB。

异常处理

ClientAdapter 配置文件application.yml 中 terminateOnException 若不配置或配置为 false,同步程序同步后仍报错,则程序会记录日志,跳过报错数据,继续同步任务。而若 terminateOnException 配置为 true,则同步报错后,程序会中止增量数据同步任务,等待用户介入处理报错。此时,用户可以通过下面接口查看任务的开启、中断状态。命令格式如下:

curl "localhost:8081/syncSwitch/{destination}"

在本文中命令为

curl "localhost:8081/syncSwitch/test_ots"

处理异常后,可以调用如下接口重新启动增量同步任务。

curl "localhost:8081/syncSwitch/test_ots/on" -X PUT

总结

本文简要介绍了 Canal,并且详细的展示了如何使用 Canal 从 MySQL 库向 Tablestore 中同步全量、增量数据。

附录

Canal 测试程序git地址:

https://github.com/aliyun/tablestore-examples

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
24天前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
11天前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
90 4
|
23天前
|
关系型数据库 MySQL 数据库
【MySQL】手把手教你MySQL数据同步
【MySQL】手把手教你MySQL数据同步
|
19天前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
1月前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
87 1
|
1月前
|
XML 分布式数据库 数据库
【计算机三级数据库技术】第13章 大规模数据库架构--附思维导图
文章概述了分布式数据库、并行数据库、云计算数据库架构和XML数据库的基本概念、目标、体系结构以及与传统数据库的比较,旨在提供对这些数据库技术的全面理解。
21 1
|
1月前
|
SQL canal 关系型数据库
(二十四)全解MySQL之主从篇:死磕主从复制中数据同步原理与优化
兜兜转转,经过《全解MySQL专栏》前面二十多篇的内容讲解后,基本对MySQL单机模式下的各方面进阶知识做了详细阐述,同时在前面的《分库分表概念篇》、《分库分表隐患篇》两章中也首次提到了数据库的一些高可用方案,但前两章大多属于方法论,并未涵盖真正的实操过程。接下来的内容,会以目前这章作为分割点,开启MySQL高可用方案的落地实践分享的新章程!
535 1
|
22天前
|
Java API 微服务
Java微服务架构应对互联网应用的大规模访问与快速迭代挑战
Java微服务架构应对互联网应用的大规模访问与快速迭代挑战,通过将应用分解为小型、自治的服务,增强系统灵活性与可扩展性。本文概览微服务定义及特点,深入剖析服务拆分、注册发现、API网关等核心原理,并介绍Spring Boot、Spring Cloud、Docker与Kubernetes等关键技术实践,助力高效构建稳定、高性能的企业级应用。
26 0
|
24天前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
198 0
|
2月前
|
分布式计算 关系型数据库 MySQL
MySQL超时参数优化与DataX高效数据同步实践
通过合理设置MySQL的超时参数,可以有效地提升数据库的稳定性和性能。而DataX作为一种高效的数据同步工具,可以帮助企业轻松实现不同数据源之间的数据迁移。无论是优化MySQL参数还是使用DataX进行数据同步,都需要根据具体的应用场景来进行细致的配置和测试,以达到最佳效果。