基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 前言 在基于 MySQL + Tablestore 分层存储架构的大规模订单系统中,利用 CDC 技术将 MySQL 数据同步到 Tablestore 是不可缺少的一步。前文已经详细讲述了如何使用 DTS 向 Tablestore 同步数据。对于中小规模的数据库,或者个人开发者,还可以使用 Canal 从 MySQL 向 Tablestore 同步数据。Canal 部署简单,易于运维,且相比于 D

前言 

在基于 MySQL + Tablestore 分层存储架构的大规模订单系统中,利用 CDC 技术将 MySQL 数据同步到 Tablestore 是不可缺少的一步。前文已经详细讲述了如何使用 DTS 向 Tablestore 同步数据。对于中小规模的数据库,或者个人开发者,还可以使用 Canal 从 MySQL 向 Tablestore 同步数据。Canal 部署简单,易于运维,且相比于 DTS,它成本更低,因此它更适合小规模的数据同步。

Canal 简介

Canal 是阿里开源 CDC 工具,他可以获取 MySQL binlog 解析,并将数据变动传输给下游。详情可参考Canal官网。基于 Canal,可以实现从 MySQL 到其他数据库的实时同步。Canal 部署简单、成本低,适合中小规模 Mysql 数据库同步工作。其架构如图:

Deployer 负责拉取 Binlog,解析数据,分发,记录位点。而 Client-adapter 负责接收上游数据,通过 Adapter适配器,将数据持久化到目标库。Deployer 和 Client-Adapter 作为 Canal 中的两个模块,分别独立部署。Tablestore 团队已经在 Adapter 中增加了 TablestoreAdapter,可以支持向 Tablestore 中写入数据。

下面,我们将部署 Canal,并将处于 Rds 中的 MySQL 订单数据同步进入 Tablestore,实现数据全量、增量的同步。

Canal 部署

环境准备

准备部署 canal 程序的机器。本文中在阿里云官网申请了一台 8 vCPU,16 GiB内存的 Linux 机器作为部署机器。如果读者同样需要申请 ECS,请参考:ECS入门概述

源表准备

使用下面 SQL 在 MySQL 中新建测试表 order_contract_canal,其表结构与 order_contract 相同。

CREATE TABLE `order_contract_canal` (
  `oId` varchar(50) NOT NULL,
  `create_time` datetime NOT NULL COMMENT '下单时间',
  `pay_time` datetime DEFAULT NULL COMMENT '支付时间',
  `has_paid` tinyint(4) DEFAULT NULL COMMENT '是否已经支付',
  `c_id` varchar(20) DEFAULT NULL COMMENT '消费者id',
  `c_name` varchar(20) DEFAULT NULL COMMENT '消费者姓名',
  `p_brand` tinytext COMMENT '产品品牌',
  `p_count` mediumint(9) DEFAULT NULL COMMENT '产品数量',
  `p_id` varchar(20) DEFAULT NULL COMMENT '产品id',
  `p_name` varchar(20) DEFAULT NULL COMMENT '产品名',
  `p_price` decimal(16,2) DEFAULT NULL COMMENT '产品价格',
  `s_id` varchar(20) DEFAULT NULL COMMENT '店铺id',
  `s_name` varchar(20) DEFAULT NULL COMMENT '店铺名称',
  `total_price` decimal(16,2) DEFAULT NULL COMMENT '总价格',
  PRIMARY KEY (`oId`),
  KEY `idx_sid` (`s_id`),
  KEY `idx_paytime_sid` (`pay_time`,`s_id`) USING BTREE,
  KEY `idx_cid` (`c_id`),
  KEY `idx_paytime_cid_totalprice` (`pay_time`,`c_id`,`total_price`) USING BTREE,
  KEY `idx_sid_paytime` (`s_id`,`pay_time`),
  KEY `idx_sid_paytime_totalprice` (`s_id`,`pay_time`,`total_price`),
  KEY `idx_paytime_totalprice_pbrand` (`p_price`,`total_price`,`pay_time`) USING BTREE,
  KEY `idx_paytime` (`pay_time`),
  KEY `idx_pbrand_paytime` (`p_brand`(10),`pay_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

目标表准

在 Tablestore 中创建表 canal_target_order作为测试表,使其表结构与订单表 order_contract 相同。其表结构如图:

Deployer部署

在 Canal 官方 release页,下载 canal.deployer 包。将包解压后,配置 conf 路径下的配置文件,然后就可以通过 bin 路径下的启动脚本启动。配置 MySQL Binlog 模式以及部署 Canal Deployer 的具体步骤可以参考 Canal 官方文档的 QuickStart

本文创建新的实例,在 Deployer 的 conf 目录下创建文件夹 test_ots。将 conf/example/instance.properties 复制到 test_ots 路径下。然后修改 test_ots 路径下的 instance.properties 配置文件。需要关注如下配置项:

参数

说明

canal.instance.master.address

rm-bp15p07134rkvf7z6wo.mysql.rds.aliyuncs.com:3306

数据库域名端口

canal.instance.rds.accesskey

***

本文 MySQL 为阿里云产品 RDS,需填入对应accessKey。若非 RDS 库,此项不用填写。

canal.instance.rds.secretkey

***

本文 MySQL 为阿里云产品 RDS,需填入对应secretkey。若非 RDS 库,此项不用填写。

canal.instance.rds.instanceId

rm-bp15p07134rkvf7z6

本文 MySQL 为阿里云产品 RDS,需填入对应示例 id。若非 RDS 库,此项不用填写。

canal.instance.dbUsername

***

数据库账号用户名

canal.instance.dbPassword

***

数据库账号密码

canal.instance.filter.regex

test_ots\\.[test|order_contract_canal].*

Canal 实例关注的表。通过正则表达式匹配。

这里匹配 test_ots 库下表名以 test 开头或者以 order_contract_canal 开头的表

canal.destinations

test_ots

canal 的实例名称,需要配置文件所在上层路径相同,本例路径为 conf/test_ots /instance.properties,那么实例名为test_ots

ClientAdapter部署

在 Canal 官方 release页,下载 canal.adapter 包。将包解压后,配置 conf 路径下的配置文件,然后就可以通过 bin 路径下的启动脚本启动。包解压后,若 plugin 路径下不存在以 client-adapter.tablestore 开头的 jar 包,说明此安装包不包含 Tablestore 对接部分代码,需要使用 canal-adapter下的 zip 包作为安装包,该安装包与官网安装包部署路径相同,配置、启动方式相同。详细的部署步骤见 ClientAdapter

conf 路径下 application.yml中需要额外关注配置见表。

参数

说明

是否必填

canal.conf:

canalAdapters:instance

test_ots

与depolyer中的destinations保持一致

canal.conf:

canalAdapters:outerAdapters: 

-name:

tablestore

定义适配器类型,填入 tablestore 说明此适配器下游写入 Tablestore

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.endpoint

https://test-20210609.cn-hangzhou.ots.aliyuncs.com

tablestore endpoint

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretId

****

AccessSecretId

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretKey

****

AccessSecretKey

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.instanceName

test-20210609

tablestore 中的 InstanceName

canal.conf: terminateOnException

true

默认为false。若配置为true,则若数据同步重试后仍失败,程序会暂停实时同步任务,等待用户手动处理

完整配置 application.yml 配置如下

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 3
  timeout:
  accessKey:
  secretKey:
  terminateOnException: true
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://rm-bp15p07134rkvf7z6wo.mysql.rds.aliyuncs.com:3306/test_ots?useUnicode=true
      username: ****
      password: ****
  canalAdapters:
  - instance: test_ots # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: tablestore
        key: ts
        properties:
          tablestore.endpoint: https://test-20210609.cn-hangzhou.ots.aliyuncs.com
          tablestore.accessSecretId: ****
          tablestore.accessSecretKey: ****
          tablestore.instanceName: test-20210609

在 conf/tablestore 路径下,创建 order.yml 文件,填入以下内容。配置表示从源库表 test_ots. order_contract_canal 向目标表 canal_target_order 同步数据。

dataSourceKey: defaultDS
destination: test_ots
groupId: g1
outerAdapterKey: ts
threads: 8
updateChangeColumns: false
dbMapping:
  database: test_ots
  table: order_contract_canal
  targetTable: canal_target_order
  targetPk:
    oId: oId
  targetColumns:
    oId:
    create_time:
    pay_time:
    has_paid:
    c_id:
    c_name:
    p_brand:
    p_count:
    p_id:
    p_name:
    p_price:
    s_id:
    s_name:
    total_price:
  etlCondition: 
  commitBatch: 200 # 批量提交的大小

其中各参数含义见表。

参数

说明

是否必填

dataSourceKey

该任务的源数据库标识,在application.yml中可以找到该标识对应的数据库

destination

canal实例名,与application.yml下的instance相同

groupId

分组id,MQ模式下使用,这里不关心,配置成application.yml中canalAdapters中相同即可

outerAdapterKey

使用的Adapter标识,应与application.yml中outerAdapters下的key值相同

threads

筒数量,默认为1,对应tablestorewriter中的bucket数量

dbMapping.database

源库名

dbMapping.table

源表名

dbMapping.targetTable

目标表

dbMapping.targetPk

主键配置

id: target_id 源表主键:目标表主键。多主键可以配置多个

dbMapping.targetColumns

配置需要同步的列名,以及列映射,可以配置类型转换。

id: target_id$string,表示id字段同步后为target_id字段,且类型映射为string。

id: target_id,表示id字段同步后为target_id字段

id: ,表示id字段同步前后字段名不变,字段类型采用默认映射。

id: $string 功能等同于id: id$string

dbMapping.etlCondition

全量抽取数据时的过滤条件

dbMapping.commitBatch

一次批量RPC请求导入的行数,对应tablestorewriter中的maxBatchRowsCount,默认取writerConfig中的默认值200

updateChangeColumns

行覆盖或行更新。

默认为false,为行覆盖,即记录更新时,使用该记录最新整行值覆盖 Tablestore 中的老记录。若为true,为行更新,即记录更新时,只对变化的字段进行操作。

全量同步数据

使用程序向原始表 order_contract_canal 中插入 1 千万行记录。

同步数据

调用 client-adapter 服务的方法触发同步任务。指令格式为

curl "localhost:8081/etl/{type}/{key}/{task}" -X POST

type 为下游数据库类型;key 是 adapter key;task 为任务配置文件的名称。在本文中,指令为:

curl "localhost:8081/etl/tablestore/ts/order.yml" -X POST

程序会首先中止增量数据传输,然后同步全量历史数据。同步开始后,可以在日志中看到 Adapter 中 TablestoreWriter 的传输日志变化。

性能测试

在 Tablestore 监控页面查看数据写入速率,首先进入 Tablestore控制台。点击对应实例进入实例详情页。

点击表 canal_target_order。

点击监控指标进入监控页面。

同步任务开始后,在监控页面可以看到数据如图。此时 Canal 所在机器配置为 8 核 16G,order.yml 中 threads 配置为 8。源库记录数在 1千万,每行数据大小约 0.5KB。可以看到在任务开始的时候,并发写入 Tablestore 速率很高,在 2w行/s 左右。而随着任务的进行,写入速率开始下降,这是由于全量导入数据时从源库获取数据时使用 limit offset 导致的,受限于上游数据的获取。

1千万数据完成写入共耗时 28 分钟。耗时统计见下表。可以看到,在数据导入初期,导入速率相对较快,而数据导入后期,导入效率明显降低。时间统计和控制台中监控数据吻合。

从程序开始到完成导入

使用时间

完成 300w 行导入

3m

完成 400w 行导入

5m

完成 500w 行导入

8m

完成 600w 行导入

11m

完成 700w 行导入

14m

完成 800w 行导入

18m

完成 900w 行导入

22.5m

完成 1000w 行导入

28m

增量同步数据

使用附录中的程序中的接口("/canal/press"),向原始表持续写入数据。

性能测试

输入如下指令调用接口,使用 3 线程写入数据,每个线程每秒写入 4000 行记录。

curl "localhost:8082/canal/press?rps=4000&threads=3"  -X POST

在控制台可以看到数据持续写入,速率约在 1.2w 行/s。

在当前 8 核 16G 的机器配置下,继续增加并发写入量,写入 1.6w 到 2w 行每秒,测试出的增量同步上限约在 1.5w 行/s,每行记录约 0.5KB。

异常处理

ClientAdapter 配置文件application.yml 中 terminateOnException 若不配置或配置为 false,同步程序同步后仍报错,则程序会记录日志,跳过报错数据,继续同步任务。而若 terminateOnException 配置为 true,则同步报错后,程序会中止增量数据同步任务,等待用户介入处理报错。此时,用户可以通过下面接口查看任务的开启、中断状态。命令格式如下:

curl "localhost:8081/syncSwitch/{destination}"

在本文中命令为

curl "localhost:8081/syncSwitch/test_ots"

处理异常后,可以调用如下接口重新启动增量同步任务。

curl "localhost:8081/syncSwitch/test_ots/on" -X PUT

总结

本文简要介绍了 Canal,并且详细的展示了如何使用 Canal 从 MySQL 库向 Tablestore 中同步全量、增量数据。

附录

Canal 测试程序git地址:

https://github.com/aliyun/tablestore-examples

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
1天前
|
设计模式 监控 持续交付
深入理解微服务架构:从理论到实践
【6月更文挑战第22天】微服务架构作为现代软件开发的基石,其理念和实践已经深入人心。本文将通过一个实际案例,探讨如何将微服务架构的理论应用到实践中去,包括设计原则、技术选型、以及实施过程中可能遇到的挑战和相应的解决策略。我们将看到,尽管微服务带来了许多优势,但在实际应用中也不可避免地会遇到一系列问题,需要开发者具备深厚的技术功底和丰富的实践经验才能妥善应对。
|
3天前
|
存储 弹性计算 安全
构建高效企业应用架构:阿里云产品组合实践深度解析
该方案展现了阿里云产品组合的强大能力和灵活性,不仅满足了当前业务需求,也为未来的扩展打下了坚实的基础。希望本文的分享能为读者在设计自己的IT解决方案时提供一定的参考和启发。
20 1
|
3天前
|
监控 Cloud Native 安全
云原生架构下的微服务治理实践
本文旨在深入探讨在云原生环境下,如何有效实施微服务治理。通过分析微服务架构的核心价值与挑战,结合具体的云平台工具和最佳实践,文章详细阐述了服务发现、配置管理、弹性设计等关键治理策略。此外,文章还提供了关于如何在保障系统可观测性的同时,确保安全性和合规性的实用建议。读者将获得一套完整的微服务治理框架,以及在云原生旅程中应对复杂问题的能力提升。
|
3天前
|
Kubernetes 负载均衡 Cloud Native
云原生架构下的微服务治理实践
在云计算的浪潮中,云原生架构以其弹性、可扩展性及高效的资源利用成为企业数字化转型的首选。本文将深入探讨云原生环境下微服务的治理策略,包括服务发现、配置管理、流量控制等关键组件的应用,并通过案例分析展示如何构建健壮、灵活的微服务系统。
13 0
|
3天前
|
监控 持续交付 开发者
探索后端开发中的微服务架构实践
在软件工程的广阔天地中,微服务架构以其独特的魅力和优势逐渐成为后端开发的新宠。本文将深入探讨微服务的核心概念、设计原则以及在实际项目中的实施策略和面临的挑战,旨在为后端开发者提供一套清晰的微服务实践指南。
|
2天前
|
存储 关系型数据库 MySQL
|
2天前
|
SQL 关系型数据库 MySQL
|
2天前
|
存储 关系型数据库 MySQL
|
3天前
|
存储 关系型数据库 MySQL
关系型数据库mysql日志和临时文件
【6月更文挑战第15天】
21 4
|
3天前
|
存储 关系型数据库 MySQL
关系型数据库mysql数据文件存储
【6月更文挑战第15天】
10 4

热门文章

最新文章