基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 前言 在基于 MySQL + Tablestore 分层存储架构的大规模订单系统中,利用 CDC 技术将 MySQL 数据同步到 Tablestore 是不可缺少的一步。前文已经详细讲述了如何使用 DTS 向 Tablestore 同步数据。对于中小规模的数据库,或者个人开发者,还可以使用 Canal 从 MySQL 向 Tablestore 同步数据。Canal 部署简单,易于运维,且相比于 D

前言

在基于 MySQL + Tablestore 分层存储架构的大规模订单系统中,利用 CDC 技术将 MySQL 数据同步到 Tablestore 是不可缺少的一步。前文已经详细讲述了如何使用 DTS 向 Tablestore 同步数据。对于中小规模的数据库,或者个人开发者,还可以使用 Canal 从 MySQL 向 Tablestore 同步数据。Canal 部署简单,易于运维,且相比于 DTS,它成本更低,因此它更适合小规模的数据同步。

Canal 简介

Canal 是阿里开源 CDC 工具,他可以获取 MySQL binlog 解析,并将数据变动传输给下游。详情可参考Canal官网。基于 Canal,可以实现从 MySQL 到其他数据库的实时同步。Canal 部署简单、成本低,适合中小规模 Mysql 数据库同步工作。其架构如图:

Deployer 负责拉取 Binlog,解析数据,分发,记录位点。而 Client-adapter 负责接收上游数据,通过 Adapter适配器,将数据持久化到目标库。Deployer 和 Client-Adapter 作为 Canal 中的两个模块,分别独立部署。Tablestore 团队已经在 Adapter 中增加了 TablestoreAdapter,可以支持向 Tablestore 中写入数据。


下面,我们将部署 Canal,并将处于 Rds 中的 MySQL 订单数据同步进入 Tablestore,实现数据全量、增量的同步。

Canal 部署

环境准备

准备部署 canal 程序的机器。本文中在阿里云官网申请了一台 8 vCPU,16 GiB内存的 Linux 机器作为部署机器。如果读者同样需要申请 ECS,请参考:ECS入门概述

源表准备

使用下面 SQL 在 MySQL 中新建测试表 order_contract_canal,其表结构与 order_contract 相同。

CREATE TABLE `order_contract_canal` (
  `oId` varchar(50) NOT NULL,
  `create_time` datetime NOT NULL COMMENT '下单时间',
  `pay_time` datetime DEFAULT NULL COMMENT '支付时间',
  `has_paid` tinyint(4) DEFAULT NULL COMMENT '是否已经支付',
  `c_id` varchar(20) DEFAULT NULL COMMENT '消费者id',
  `c_name` varchar(20) DEFAULT NULL COMMENT '消费者姓名',
  `p_brand` tinytext COMMENT '产品品牌',
  `p_count` mediumint(9) DEFAULT NULL COMMENT '产品数量',
  `p_id` varchar(20) DEFAULT NULL COMMENT '产品id',
  `p_name` varchar(20) DEFAULT NULL COMMENT '产品名',
  `p_price` decimal(16,2) DEFAULT NULL COMMENT '产品价格',
  `s_id` varchar(20) DEFAULT NULL COMMENT '店铺id',
  `s_name` varchar(20) DEFAULT NULL COMMENT '店铺名称',
  `total_price` decimal(16,2) DEFAULT NULL COMMENT '总价格',
  PRIMARY KEY (`oId`),
  KEY `idx_sid` (`s_id`),
  KEY `idx_paytime_sid` (`pay_time`,`s_id`) USING BTREE,
  KEY `idx_cid` (`c_id`),
  KEY `idx_paytime_cid_totalprice` (`pay_time`,`c_id`,`total_price`) USING BTREE,
  KEY `idx_sid_paytime` (`s_id`,`pay_time`),
  KEY `idx_sid_paytime_totalprice` (`s_id`,`pay_time`,`total_price`),
  KEY `idx_paytime_totalprice_pbrand` (`p_price`,`total_price`,`pay_time`) USING BTREE,
  KEY `idx_paytime` (`pay_time`),
  KEY `idx_pbrand_paytime` (`p_brand`(10),`pay_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

目标表准

在 Tablestore 中创建表 canal_target_order作为测试表,使其表结构与订单表 order_contract 相同。其表结构如图:

Deployer部署

在 Canal 官方 release页,下载 canal.deployer 包。将包解压后,配置 conf 路径下的配置文件,然后就可以通过 bin 路径下的启动脚本启动。配置 MySQL Binlog 模式以及部署 Canal Deployer 的具体步骤可以参考 Canal 官方文档的 QuickStart


本文创建新的实例,在 Deployer 的 conf 目录下创建文件夹 test_ots。将 conf/example/instance.properties 复制到 test_ots 路径下。然后修改 test_ots 路径下的 instance.properties 配置文件。需要关注如下配置项:

参数

说明

canal.instance.master.address

rm-bp15p07134rkvf7z6wo.mysql.rds.aliyuncs.com:3306

数据库域名端口

canal.instance.rds.accesskey

***

本文 MySQL 为阿里云产品 RDS,需填入对应accessKey。若非 RDS 库,此项不用填写。

canal.instance.rds.secretkey

***

本文 MySQL 为阿里云产品 RDS,需填入对应secretkey。若非 RDS 库,此项不用填写。

canal.instance.rds.instanceId

rm-bp15p07134rkvf7z6

本文 MySQL 为阿里云产品 RDS,需填入对应示例 id。若非 RDS 库,此项不用填写。

canal.instance.dbUsername

***

数据库账号用户名

canal.instance.dbPassword

***

数据库账号密码

canal.instance.filter.regex

test_ots\\.[test|order_contract_canal].*

Canal 实例关注的表。通过正则表达式匹配。

这里匹配 test_ots 库下表名以 test 开头或者以 order_contract_canal 开头的表

canal.destinations

test_ots

canal 的实例名称,需要配置文件所在上层路径相同,本例路径为 conf/test_ots /instance.properties,那么实例名为test_ots

ClientAdapter部署

在 Canal 官方 release页,下载 canal.adapter 包。将包解压后,配置 conf 路径下的配置文件,然后就可以通过 bin 路径下的启动脚本启动。包解压后,若 plugin 路径下不存在以 client-adapter.tablestore 开头的 jar 包,说明此安装包不包含 Tablestore 对接部分代码


conf 路径下 application.yml中需要额外关注配置见表。

参数

说明

是否必填

canal.conf:

canalAdapters:instance

test_ots

与depolyer中的destinations保持一致

canal.conf:

canalAdapters:outerAdapters:

-name:

tablestore

定义适配器类型,填入 tablestore 说明此适配器下游写入 Tablestore

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.endpoint


ablestore endpoint

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretId

****

AccessSecretId

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretKey

****

AccessSecretKey

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.instanceName

test-20210609

tablestore 中的 InstanceName

canal.conf: terminateOnException

true

默认为false。若配置为true,则若数据同步重试后仍失败,程序会暂停实时同步任务,等待用户手动处理

完整配置 application.yml 配置如下

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 3
  timeout:
  accessKey:
  secretKey:
  terminateOnException: true
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://rm-bp15p07134rkvf7z6wo.mysql.rds.aliyuncs.com:3306/test_ots?useUnicode=true
      username: ****
      password: ****
  canalAdapters:
  - instance: test_ots # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: tablestore
        key: ts
        properties:
          tablestore.endpoint: https://test-20210609.cn-hangzhou.ots.aliyuncs.com
          tablestore.accessSecretId: ****
          tablestore.accessSecretKey: ****
          tablestore.instanceName: test-20210609

在 conf/tablestore 路径下,创建 order.yml 文件,填入以下内容。配置表示从源库表 test_ots. order_contract_canal 向目标表 canal_target_order 同步数据。

dataSourceKey: defaultDS
destination: test_ots
groupId: g1
outerAdapterKey: ts
threads: 8
updateChangeColumns: false
dbMapping:
  database: test_ots
  table: order_contract_canal
  targetTable: canal_target_order
  targetPk:
    oId: oId
  targetColumns:
    oId:
    create_time:
    pay_time:
    has_paid:
    c_id:
    c_name:
    p_brand:
    p_count:
    p_id:
    p_name:
    p_price:
    s_id:
    s_name:
    total_price:
  etlCondition: 
  commitBatch: 200 # 批量提交的大小

其中各参数含义见表。

参数

说明

是否必填

dataSourceKey

该任务的源数据库标识,在application.yml中可以找到该标识对应的数据库

destination

canal实例名,与application.yml下的instance相同

groupId

分组id,MQ模式下使用,这里不关心,配置成application.yml中canalAdapters中相同即可

outerAdapterKey

使用的Adapter标识,应与application.yml中outerAdapters下的key值相同

threads

筒数量,默认为1,对应tablestorewriter中的bucket数量

dbMapping.database

源库名

dbMapping.table

源表名

dbMapping.targetTable

目标表

dbMapping.targetPk

主键配置

id: target_id 源表主键:目标表主键。多主键可以配置多个

dbMapping.targetColumns

配置需要同步的列名,以及列映射,可以配置类型转换。

id: target_id$string,表示id字段同步后为target_id字段,且类型映射为string。

id: target_id,表示id字段同步后为target_id字段

id: ,表示id字段同步前后字段名不变,字段类型采用默认映射。

id: $string 功能等同于id: id$string

dbMapping.etlCondition

全量抽取数据时的过滤条件

dbMapping.commitBatch

一次批量RPC请求导入的行数,对应tablestorewriter中的maxBatchRowsCount,默认取writerConfig中的默认值200

updateChangeColumns

行覆盖或行更新。

默认为false,为行覆盖,即记录更新时,使用该记录最新整行值覆盖 Tablestore 中的老记录。若为true,为行更新,即记录更新时,只对变化的字段进行操作。

全量同步数据

使用程序向原始表 order_contract_canal 中插入 1 千万行记录。

同步数据

调用 client-adapter 服务的方法触发同步任务。指令格式为

curl "localhost:8081/etl/{type}/{key}/{task}" -X POST

type 为下游数据库类型;key 是 adapter key;task 为任务配置文件的名称。在本文中,指令为:

curl "localhost:8081/etl/tablestore/ts/order.yml" -X POST

程序会首先中止增量数据传输,然后同步全量历史数据。同步开始后,可以在日志中看到 Adapter 中 TablestoreWriter 的传输日志变化。

性能测试

在 Tablestore 监控页面查看数据写入速率,首先进入 Tablestore控制台。点击对应实例进入实例详情页。

点击表 canal_target_order。

点击监控指标进入监控页面。

同步任务开始后,在监控页面可以看到数据如图。此时 Canal 所在机器配置为 8 核 16G,order.yml 中 threads 配置为 8。源库记录数在 1千万,每行数据大小约 0.5KB。可以看到在任务开始的时候,并发写入 Tablestore 速率很高,在 2w行/s 左右。而随着任务的进行,写入速率开始下降,这是由于全量导入数据时从源库获取数据时使用 limit offset 导致的,受限于上游数据的获取。

1千万数据完成写入共耗时 28 分钟。耗时统计见下表。可以看到,在数据导入初期,导入速率相对较快,而数据导入后期,导入效率明显降低。时间统计和控制台中监控数据吻合。

从程序开始到完成导入

使用时间

完成 300w 行导入

3m

完成 400w 行导入

5m

完成 500w 行导入

8m

完成 600w 行导入

11m

完成 700w 行导入

14m

完成 800w 行导入

18m

完成 900w 行导入

22.5m

完成 1000w 行导入

28m


增量同步数据

使用附录中的程序中的接口("/canal/press"),向原始表持续写入数据。

性能测试

输入如下指令调用接口,使用 3 线程写入数据,每个线程每秒写入 4000 行记录。

curl "localhost:8082/canal/press?rps=4000&threads=3"  -X POST

在控制台可以看到数据持续写入,速率约在 1.2w 行/s。

在当前 8 核 16G 的机器配置下,继续增加并发写入量,写入 1.6w 到 2w 行每秒,测试出的增量同步上限约在 1.5w 行/s,每行记录约 0.5KB。

异常处理

ClientAdapter 配置文件application.yml 中 terminateOnException 若不配置或配置为 false,同步程序同步后仍报错,则程序会记录日志,跳过报错数据,继续同步任务。而若 terminateOnException 配置为 true,则同步报错后,程序会中止增量数据同步任务,等待用户介入处理报错。此时,用户可以通过下面接口查看任务的开启、中断状态。命令格式如下:

curl "localhost:8081/syncSwitch/{destination}"

在本文中命令为

curl "localhost:8081/syncSwitch/test_ots"

处理异常后,可以调用如下接口重新启动增量同步任务。

curl "localhost:8081/syncSwitch/test_ots/on" -X PUT

总结

本文简要介绍了 Canal,并且详细的展示了如何使用 Canal 从 MySQL 库向 Tablestore 中同步全量、增量数据。

附录

Canal 测试程序git地址:

https://github.com/aliyun/tablestore-examples

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
1月前
|
监控 持续交付 API
深入理解微服务架构:构建高效、可扩展的系统
【10月更文挑战第14天】深入理解微服务架构:构建高效、可扩展的系统
84 0
|
8天前
|
传感器 算法 物联网
智能停车解决方案之停车场室内导航系统(二):核心技术与系统架构构建
随着城市化进程的加速,停车难问题日益凸显。本文深入剖析智能停车系统的关键技术,包括停车场电子地图编辑绘制、物联网与传感器技术、大数据与云计算的应用、定位技术及车辆导航路径规划,为读者提供全面的技术解决方案。系统架构分为应用层、业务层、数据层和运行环境,涵盖停车场室内导航、车位占用检测、动态更新、精准导航和路径规划等方面。
44 4
|
18天前
|
前端开发 安全 关系型数据库
秒合约系统/开发模式规则/技术架构实现
秒合约系统是一种高频交易平台,支持快速交易、双向持仓和高杠杆。系统涵盖用户注册登录、合约创建与编辑、自动执行、状态记录、提醒通知、搜索筛选、安全权限管理等功能。交易规则明确,设有价格限制和强平机制,确保风险可控。技术架构采用高并发后端语言、关系型数据库和前端框架,通过智能合约实现自动化交易,确保安全性和用户体验。
|
26天前
|
存储 数据管理 调度
HarmonyOS架构理解:揭开鸿蒙系统的神秘面纱
【10月更文挑战第21天】华为的鸿蒙系统(HarmonyOS)以其独特的分布式架构备受关注。该架构包括分布式软总线、分布式数据管理和分布式任务调度。分布式软总线实现设备间的无缝连接;分布式数据管理支持跨设备数据共享;分布式任务调度则实现跨设备任务协同。这些特性为开发者提供了强大的工具,助力智能设备的未来发展。
79 1
|
1月前
|
存储 关系型数据库 MySQL
PACS系统 中 dicom 文件在mysql 8.0 数据库中的 存储和读取(pydicom 库使用)
PACS系统 中 dicom 文件在mysql 8.0 数据库中的 存储和读取(pydicom 库使用)
32 2
|
1月前
|
存储 监控 负载均衡
|
1月前
|
存储 关系型数据库 MySQL
Key_Value 形式 存储_5级省市城乡划分代码 (mysql 8.0 实例)
本文介绍了如何使用MySQL8.0数据库中的Key_Value形式存储全国统计用区划代码和城乡划分代码(5级),包括导入数据、通过数学函数提取省市区信息,以及查询5级行政区划的详细数据。
32 0
|
11天前
|
SQL 关系型数据库 MySQL
12 PHP配置数据库MySQL
路老师分享了PHP操作MySQL数据库的方法,包括安装并连接MySQL服务器、选择数据库、执行SQL语句(如插入、更新、删除和查询),以及将结果集返回到数组。通过具体示例代码,详细介绍了每一步的操作流程,帮助读者快速入门PHP与MySQL的交互。
26 1
|
13天前
|
SQL 关系型数据库 MySQL
go语言数据库中mysql驱动安装
【11月更文挑战第2天】
29 4
|
1月前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
63 3
Mysql(4)—数据库索引

热门文章

最新文章

下一篇
无影云桌面