基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 前言 前文架构篇,可以看到 MySQL + Tablestore 非常适合大规模订单系统这一类需求场景。那么,我们首先要做的是,利用 CDC(Change Data Capture) 技术将订单数据实时从 MySQL 同步到 Tablestore 中。对于订单系统的数据同步,我们需要关注同步的稳定性、实时性。目前,存在多款工具可以实现这一功能,他们有的是开源工具如 Canal,有的是阿里云端服务如

前言 

前文架构篇,可以看到 MySQL + Tablestore 非常适合大规模订单系统这一类需求场景。那么,我们首先要做的是,利用 CDC(Change Data Capture) 技术将订单数据实时从 MySQL 同步到 Tablestore 中。对于订单系统的数据同步,我们需要关注同步的稳定性、实时性。目前,存在多款工具可以实现这一功能,他们有的是开源工具如 Canal,有的是阿里云端服务如 DTS。下面我们将对各种同步工具进行介绍,并以 DTS 为例展示同步操作。

同步工具介绍

DataX

DataX 是异构数据源离线同步的工具,支持多种异构数据源之间高效的数据同步。它使用 SQL 从数据库拉取数据,全内存操作。它具有一下特点:

  • 适合进行离线全量同步,不适合支持增量同步。
  • 可以通过编程来进行增量同步,但有一定延时,源表需要通过字段区分哪些记录为待同步字段,且无法捕获删除操作。
  • 单点执行。

因此,它适合中小用户,同步对实时性无太高要求的数据。其具体使用见:数据同步-从MySQL到Tablestore

Canal

Canal是阿里开源 CDC 工具,他可以获取 MySQL binlog 并解析,并将数据变动传输给下游。详情可参考Canal官网。基于 Canal,可以实现从 MySQL 到其他数据库的实时同步。Canal 部署简单、成本低,适合中小规模 Mysql 数据库向其他数据库的同步工作。Tablestore 团队已经在 Canal 中实现了 Tablestore 适配器,可以支持将 MySQL 数据同步进入 Tablestore,具体细节请参考后续文章。

DTS

数据传输服务(Data Transmission Service,简称DTS)支持关系型数据库、NoSQL、大数据(OLAP)等数据源,集数据迁移、订阅及实时同步功能于一体,能够解决公共云、混合云场景下,远距离、毫秒级异步数据传输难题。其特点为:

  • 基于云部署,只需要简单配置就可以运行
  • 基于 binlog,可以实时同步数据
  • 费用相对于 DataX 高

因此,目前,中小型用户,对实时性要求没有很高的用户,可以使用 DataX 进行 MySQL 到 Tablestore 的同步。而企业级用户,或者对于延迟要求比较高的客户,推荐使用 DTS 进行数据同步。 本文会展示如何完成基于 DTS 从 MySQL 到 Tablestore 的同步系统的搭建。而这套同步系统正是订单数据上Tablestore 的第一步工作。

服务开通

创建 MySQL 并建表

在 RDS 上申请源的 MySQL 数据库,可以参考创建RDS MySQL实例。已经在 RDS 上或者 ECS 上拥有 MySQL 实例的同学可以忽略这一步。

在数据库中创建订单表 order_contract,建表语句如下:

CREATE TABLE `order_contract` (
  `oId` varchar(50) NOT NULL,
  `create_time` datetime NOT NULL COMMENT '下单时间',
  `pay_time` datetime DEFAULT NULL COMMENT '支付时间',
  `has_paid` tinyint(4) DEFAULT NULL COMMENT '是否已经支付',
  `c_id` varchar(20) DEFAULT NULL COMMENT '消费者id',
  `c_name` varchar(20) DEFAULT NULL COMMENT '消费者姓名',
  `p_brand` tinytext COMMENT '产品品牌',
  `p_count` mediumint(9) DEFAULT NULL COMMENT '产品数量',
  `p_id` varchar(20) DEFAULT NULL COMMENT '产品id',
  `p_name` varchar(20) DEFAULT NULL COMMENT '产品名',
  `p_price` decimal(16,2) DEFAULT NULL COMMENT '产品价格',
  `s_id` varchar(20) DEFAULT NULL COMMENT '店铺id',
  `s_name` varchar(20) DEFAULT NULL COMMENT '店铺名称',
  `total_price` decimal(16,2) DEFAULT NULL COMMENT '总价格',
  PRIMARY KEY (`oId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
;

开通 Tablestore 服务

提供了三种方式创建 Tablestore 实例,读者请自由选择一种创建。

阿里云 CLI (POP Client) 创建按量模式实例

通过阿里云官网下载安装并配置阿里云 CLI。具体参考 阿里云 CLI。完毕后,打开命令行,输入

aliyun ots InsertInstance --endpoint ots.cn-hangzhou.aliyuncs.com  --InstanceName test-20210609

用于创建实例。其中 endpoint 填入实例所在地域域名。然后在 Tabelstore 控制台即可看到创建出的实例如图。

在 Cloud Shell 中可以同样使用此指令建立 Tablestore 实例,Cloud Shell 地址见 Cloud Shell 地址

控制台创建按量模式实例

进入Tablestore首页。点击进入管理控制台

点击创建实例

选择按量模式。填入实例名称

点击确定,完成创建。然后可以在控制台首页看到对应实例。

控制台创建预留模式实例

此创建过程与控制台创建按量模式实例基本相同,只是在选择购买方式时选择预留模式。

开通配置 DTS

进入DTS首页。点击立即购买

商品类型选择数据传输服务 Tablestore(后付费),功能选择数据同步,源实例选择 MySQL ,目标实例选择 Tablestore ,同步拓扑选择单向同步

若有压测等需要大量同步记录的需求,同步链路规格参数可以适当选大,每种链路的传输数据能力可以在页面上查看。

点击立即购买。弹出如下页面,勾选服务协议,然后点击立即开通,即完成购买。

购买成功后,进入管理控制台,点击同步实例下面的数字“1”,可以看见尚未配置的实例信息。

点击配置同步链路

配置源库 MySQL 和目标库 Tablestore 的信息。AccessKey 配置参考:获取AccessKey。配置完成后点击授权白名单并进入下一步

选择同步对象页,同步初始化这里,结构初始化全量数据初始化增量数据初始化这三个选项都要进行勾选。源库对象,选择源表 order_contract,点击中间红框中的按钮,将表传送到右侧“已选中对象”。

点击编辑

将 pay_time 的数据类型映射为 Integer。这样在 Tablestore 中的 pay_time字段是以微秒为单位的时间戳。点击确定。

 

配置完成后,向下滑动窗口,点击“下一步”进入高级配置。继续点击预检查并启动,进行启动。在控制台的数据同步页,可以看到刚刚配置的同步任务。

数据同步测试

字段说明

通过前面,我们申请好了充当订单库的 MySQL 数据库,Tablestore 实例,并且搭建了 DTS 任务从 MySQL 向Tablestore 同步数据。下面,我们简要写一个 Java 程序,持续向 MySQL 中写入订单数据,以验证 DTS 的持续同步能力。生成的订单记录,各字段加入一些随机性,以构造更加真实的测试数据。各字段生成逻辑见表。

字段

字段含义

取值说明

oId

订单号

使用当前时间戳 + c_id,例如1623228187366_user2

create_time

下单时间

取当前时间

pay_time

支付时间

取当前时间,不做更细化仿真,假设每笔订单下单同时支付。

has_paid

是否已经支付

设定为true,这里不对此字段进行仿真。

c_id

消费者id

取一千万以内的随机整数,假设有一千万消费者。消费者id为“user” + id格式,比如“user1”

c_name

消费者姓名

使用“客户” + 消费者id的格式,比如1号消费者,姓名为“客户1”

p_brand

产品品牌

格式为“品牌id”,id为5000以内随机整数,仿真5000个品牌

p_count

产品数量

1到10取随机整数

p_id

产品id

格式为"store1_id", id为100以内随机数,假设每个店铺有100个产品,例如store1_1

p_name

产品名

格式为“产品” + p_id,比如“产品store4075_25”

p_price

产品价格

0到1000元随机浮点数

s_id

店铺id

5000以内随机整数,假设有5000家店铺。店铺id为“store” + id 格式,比如“store1”

s_name

店铺名称

使用“旗舰店” + id的格式,比如1号店铺,id为“store1”,店铺名称为“旗舰店1”

total_price

总价格

p_count * p_price

程序说明

搭建 Springboot 项目,其中创建订单代码如下,代码中包含随机生成参数的逻辑。

    // 创建订单
    private OrderContract createOrder() {
        long now = System.currentTimeMillis();
        LocalDateTime nowT = LocalDateTime.now();
        int cNumber = r.nextInt(1000 * 10000); // 一千万用户
        String userId = "user" + cNumber;
        String oId = now + "_" + userId;

        OrderContract item = new OrderContract();
        item.setoId(oId);
        item.setCreateTime(nowT);
        item.setPayTime(nowT);
        item.setHasPaid(true);
        item.setcId(userId);
        item.setcName("客户" + cNumber);

        int count = r.nextInt(10) + 1;
        item.setpCount(count);   // 商品数量

        double price = r.nextDouble() * 1000d;   // 单价1到1000
        item.setpPrice(price);

        int storeId = r.nextInt(5000); //5000个店铺
        item.setsId("store" + storeId);
        item.setsName("旗舰店" + storeId);
        item.setTotalPrice(item.getpPrice() * item.getpCount());

        int brandId = r.nextInt(5000);
        item.setpBrand("品牌" + brandId);

        int productId = r.nextInt(100);
        item.setpId(item.getsId() +"_" + productId);
        item.setpName("产品" + item.getpId());

        return item;
    }

批量获得订单并插入数据库代码如下,根据传入参数,插入数据库。

public void insertIntoOrders(int size) {
        System.out.println("start insert orders");
        List<OrderContract> list = new ArrayList<>();
        for (int i = 0; i < size; i++) {
            OrderContract order = createOrder();
            list.add(order);
        }

        userMapper.batchInsert(list);
        System.out.println("finish insert orders.");
    }

插入数据库代码:

// Mybatis配置sql
<insert id="batchInsert" parameterType="List">
    insert into order_contract(oId,create_time,pay_time,has_paid,c_id,c_name,p_brand,
    p_count,p_id,p_name,p_price,s_id,s_name,total_price)
    values
    <foreach collection="list" index="index" item="item" separator=",">
        (#{item.oId},#{item.createTime},#{item.payTime},#{item.hasPaid},#{item.cId},#{item.cName},#{item.pBrand},
        #{item.pCount},#{item.pId},#{item.pName},#{item.pPrice},#{item.sId},#{item.sName},#{item.totalPrice})
    </foreach>
</insert>

循环执行批量插入数据库逻辑,以达到批量生成订单数据的目的。

   public void initOrders() {
        while (true) {
            try {
                int size = r.nextInt(1000);
                insertIntoOrders(size);

                Thread.sleep(4000L);
            } catch (InterruptedException e) {
               break;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

同步结果

启动 Java 程序,可以在 Tablestore 中观察到新的订单记录持续的从 MySQL 库同步到 Tablestore 中。

手动导入 SQL

用户也可以手动执行 SQL 插入测试数据。使用下面的存储过程可以直接通过 SQL 写入测试数据。因为逐条插入,插入性能要比程序里的批量插入慢。

DROP PROCEDURE if EXISTS test;
DELIMITER //
CREATE procedure test()
BEGIN
DECLARE i INT;
DECLARE userId INT;
DECLARE c INT;
DECLARE price DOUBLE;
DECLARE storeId INT;
DECLARE brandId INT;
DECLARE productId INT;
DECLARE c_id VARCHAR(255);
SET i = 0;
WHILE i<1000 DO    // 这里的值决定写入记录数
SET userId=CEILING(RAND()*1000*10000);
SET c=CEILING(RAND()*10);
SET price=RAND()*1000;
SET storeId=CEILING(RAND()*5000);
SET brandId=CEILING(RAND()*5000);
SET productId=CEILING(RAND()*100);
SET c_id=CONCAT("user",userId);

INSERT INTO test(oId,create_time,pay_time,
has_paid,c_id,c_name,
p_brand,p_count,p_id,
p_name,p_price,s_id,
s_name,total_price) VALUES
(CONCAT(unix_timestamp(now()),"_",c_id), now(), now(),
true,c_id,CONCAT("客户",userId),
CONCAT("品牌",brandId),c,CONCAT("store",storeId,"_",productId),
CONCAT("产品store",storeId,"_",productId),price,CONCAT("store",storeId),
CONCAT("旗舰店",storeId),p_price * c
);
SET i = i+1;
END WHILE;
END
//
DELIMITER ;
CALL test();

总结

基于 DTS,我们可以实现 MySQL 数据向 Tablestore 的实时同步。数据进入 Tablestore 后,我们可以利用 Tablestore 的特性进行搜索、分析等操作。我们会在后续文章中进行说明。

附录

代码 git 地址:https://github.com/aliyun/tablestore-examples

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
6天前
|
存储 SQL NoSQL
mysql存储过程和存储函数
mysql存储过程和存储函数
|
6天前
|
存储 JSON 关系型数据库
轻松入门MySQL:MySQL字段类型精解,优化存储结构,助力系统高效运行(2)
轻松入门MySQL:MySQL字段类型精解,优化存储结构,助力系统高效运行(2)
|
6天前
|
SQL 存储 关系型数据库
轻松入门MySQL:简明教程解析数据存储与管理(1)
轻松入门MySQL:简明教程解析数据存储与管理(1)
|
4天前
|
消息中间件 关系型数据库 MySQL
MySQL 到 Kafka 实时数据同步实操分享(1),字节面试官职级
MySQL 到 Kafka 实时数据同步实操分享(1),字节面试官职级
|
4天前
|
机器学习/深度学习 关系型数据库 MySQL
MySQL 到 Greenplum 实时数据同步实操分享,2024年最新【Python面试题
MySQL 到 Greenplum 实时数据同步实操分享,2024年最新【Python面试题
|
4天前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之从MySQL到Flink 1.16.2 Flink-SQL的数据同步工作出现了一个异常如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
19 0
|
6天前
|
存储 关系型数据库 MySQL
MySQL是怎样存储数据的?
MySQL是怎样存储数据的?
|
6天前
|
DataWorks Shell 对象存储
DataWorks产品使用合集之在 DataWorks 中,有一个 MySQL 数据表,数据量非常大且数据会不断更新将这些数据同步到 DataWorks如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
33 3
|
6天前
|
存储 Kubernetes 关系型数据库
使用Helm管理创建MYSQL并使用外置存储
使用Helm管理创建MYSQL并使用外置存储
47 0
|
6天前
|
canal 消息中间件 关系型数据库
【分布式技术专题】「分布式技术架构」MySQL数据同步到Elasticsearch之N种方案解析,实现高效数据同步
【分布式技术专题】「分布式技术架构」MySQL数据同步到Elasticsearch之N种方案解析,实现高效数据同步
103 0