基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
表格存储 Tablestore,50G 2个月
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 前言 前文架构篇,可以看到 MySQL + Tablestore 非常适合大规模订单系统这一类需求场景。那么,我们首先要做的是,利用 CDC(Change Data Capture) 技术将订单数据实时从 MySQL 同步到 Tablestore 中。对于订单系统的数据同步,我们需要关注同步的稳定性、实时性。目前,存在多款工具可以实现这一功能,他们有的是开源工具如 Canal,有的是阿里云端服务如

前言 

前文架构篇,可以看到 MySQL + Tablestore 非常适合大规模订单系统这一类需求场景。那么,我们首先要做的是,利用 CDC(Change Data Capture) 技术将订单数据实时从 MySQL 同步到 Tablestore 中。对于订单系统的数据同步,我们需要关注同步的稳定性、实时性。目前,存在多款工具可以实现这一功能,他们有的是开源工具如 Canal,有的是阿里云端服务如 DTS。下面我们将对各种同步工具进行介绍,并以 DTS 为例展示同步操作。

同步工具介绍

DataX

DataX 是异构数据源离线同步的工具,支持多种异构数据源之间高效的数据同步。它使用 SQL 从数据库拉取数据,全内存操作。它具有一下特点:

  • 适合进行离线全量同步,不适合支持增量同步。
  • 可以通过编程来进行增量同步,但有一定延时,源表需要通过字段区分哪些记录为待同步字段,且无法捕获删除操作。
  • 单点执行。

因此,它适合中小用户,同步对实时性无太高要求的数据。其具体使用见:数据同步-从MySQL到Tablestore

Canal

Canal是阿里开源 CDC 工具,他可以获取 MySQL binlog 并解析,并将数据变动传输给下游。详情可参考Canal官网。基于 Canal,可以实现从 MySQL 到其他数据库的实时同步。Canal 部署简单、成本低,适合中小规模 Mysql 数据库向其他数据库的同步工作。Tablestore 团队已经在 Canal 中实现了 Tablestore 适配器,可以支持将 MySQL 数据同步进入 Tablestore,具体细节请参考后续文章。

DTS

数据传输服务(Data Transmission Service,简称DTS)支持关系型数据库、NoSQL、大数据(OLAP)等数据源,集数据迁移、订阅及实时同步功能于一体,能够解决公共云、混合云场景下,远距离、毫秒级异步数据传输难题。其特点为:

  • 基于云部署,只需要简单配置就可以运行
  • 基于 binlog,可以实时同步数据
  • 费用相对于 DataX 高

因此,目前,中小型用户,对实时性要求没有很高的用户,可以使用 DataX 进行 MySQL 到 Tablestore 的同步。而企业级用户,或者对于延迟要求比较高的客户,推荐使用 DTS 进行数据同步。 本文会展示如何完成基于 DTS 从 MySQL 到 Tablestore 的同步系统的搭建。而这套同步系统正是订单数据上Tablestore 的第一步工作。

服务开通

创建 MySQL 并建表

在 RDS 上申请源的 MySQL 数据库,可以参考创建RDS MySQL实例。已经在 RDS 上或者 ECS 上拥有 MySQL 实例的同学可以忽略这一步。

在数据库中创建订单表 order_contract,建表语句如下:

CREATE TABLE `order_contract` (
  `oId` varchar(50) NOT NULL,
  `create_time` datetime NOT NULL COMMENT '下单时间',
  `pay_time` datetime DEFAULT NULL COMMENT '支付时间',
  `has_paid` tinyint(4) DEFAULT NULL COMMENT '是否已经支付',
  `c_id` varchar(20) DEFAULT NULL COMMENT '消费者id',
  `c_name` varchar(20) DEFAULT NULL COMMENT '消费者姓名',
  `p_brand` tinytext COMMENT '产品品牌',
  `p_count` mediumint(9) DEFAULT NULL COMMENT '产品数量',
  `p_id` varchar(20) DEFAULT NULL COMMENT '产品id',
  `p_name` varchar(20) DEFAULT NULL COMMENT '产品名',
  `p_price` decimal(16,2) DEFAULT NULL COMMENT '产品价格',
  `s_id` varchar(20) DEFAULT NULL COMMENT '店铺id',
  `s_name` varchar(20) DEFAULT NULL COMMENT '店铺名称',
  `total_price` decimal(16,2) DEFAULT NULL COMMENT '总价格',
  PRIMARY KEY (`oId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
;

开通 Tablestore 服务

提供了三种方式创建 Tablestore 实例,读者请自由选择一种创建。

阿里云 CLI (POP Client) 创建按量模式实例

通过阿里云官网下载安装并配置阿里云 CLI。具体参考 阿里云 CLI。完毕后,打开命令行,输入

aliyun ots InsertInstance --endpoint ots.cn-hangzhou.aliyuncs.com  --InstanceName test-20210609

用于创建实例。其中 endpoint 填入实例所在地域域名。然后在 Tabelstore 控制台即可看到创建出的实例如图。

在 Cloud Shell 中可以同样使用此指令建立 Tablestore 实例,Cloud Shell 地址见 Cloud Shell 地址

控制台创建按量模式实例

进入Tablestore首页。点击进入管理控制台

点击创建实例

选择按量模式。填入实例名称

点击确定,完成创建。然后可以在控制台首页看到对应实例。

控制台创建预留模式实例

此创建过程与控制台创建按量模式实例基本相同,只是在选择购买方式时选择预留模式。

开通配置 DTS

进入DTS首页。点击立即购买

商品类型选择数据传输服务 Tablestore(后付费),功能选择数据同步,源实例选择 MySQL ,目标实例选择 Tablestore ,同步拓扑选择单向同步

若有压测等需要大量同步记录的需求,同步链路规格参数可以适当选大,每种链路的传输数据能力可以在页面上查看。

点击立即购买。弹出如下页面,勾选服务协议,然后点击立即开通,即完成购买。

购买成功后,进入管理控制台,点击同步实例下面的数字“1”,可以看见尚未配置的实例信息。

点击配置同步链路

配置源库 MySQL 和目标库 Tablestore 的信息。AccessKey 配置参考:获取AccessKey。配置完成后点击授权白名单并进入下一步

选择同步对象页,同步初始化这里,结构初始化全量数据初始化增量数据初始化这三个选项都要进行勾选。源库对象,选择源表 order_contract,点击中间红框中的按钮,将表传送到右侧“已选中对象”。

点击编辑

将 pay_time 的数据类型映射为 Integer。这样在 Tablestore 中的 pay_time字段是以微秒为单位的时间戳。点击确定。

 

配置完成后,向下滑动窗口,点击“下一步”进入高级配置。继续点击预检查并启动,进行启动。在控制台的数据同步页,可以看到刚刚配置的同步任务。

数据同步测试

字段说明

通过前面,我们申请好了充当订单库的 MySQL 数据库,Tablestore 实例,并且搭建了 DTS 任务从 MySQL 向Tablestore 同步数据。下面,我们简要写一个 Java 程序,持续向 MySQL 中写入订单数据,以验证 DTS 的持续同步能力。生成的订单记录,各字段加入一些随机性,以构造更加真实的测试数据。各字段生成逻辑见表。

字段

字段含义

取值说明

oId

订单号

使用当前时间戳 + c_id,例如1623228187366_user2

create_time

下单时间

取当前时间

pay_time

支付时间

取当前时间,不做更细化仿真,假设每笔订单下单同时支付。

has_paid

是否已经支付

设定为true,这里不对此字段进行仿真。

c_id

消费者id

取一千万以内的随机整数,假设有一千万消费者。消费者id为“user” + id格式,比如“user1”

c_name

消费者姓名

使用“客户” + 消费者id的格式,比如1号消费者,姓名为“客户1”

p_brand

产品品牌

格式为“品牌id”,id为5000以内随机整数,仿真5000个品牌

p_count

产品数量

1到10取随机整数

p_id

产品id

格式为"store1_id", id为100以内随机数,假设每个店铺有100个产品,例如store1_1

p_name

产品名

格式为“产品” + p_id,比如“产品store4075_25”

p_price

产品价格

0到1000元随机浮点数

s_id

店铺id

5000以内随机整数,假设有5000家店铺。店铺id为“store” + id 格式,比如“store1”

s_name

店铺名称

使用“旗舰店” + id的格式,比如1号店铺,id为“store1”,店铺名称为“旗舰店1”

total_price

总价格

p_count * p_price

程序说明

搭建 Springboot 项目,其中创建订单代码如下,代码中包含随机生成参数的逻辑。

    // 创建订单
    private OrderContract createOrder() {
        long now = System.currentTimeMillis();
        LocalDateTime nowT = LocalDateTime.now();
        int cNumber = r.nextInt(1000 * 10000); // 一千万用户
        String userId = "user" + cNumber;
        String oId = now + "_" + userId;

        OrderContract item = new OrderContract();
        item.setoId(oId);
        item.setCreateTime(nowT);
        item.setPayTime(nowT);
        item.setHasPaid(true);
        item.setcId(userId);
        item.setcName("客户" + cNumber);

        int count = r.nextInt(10) + 1;
        item.setpCount(count);   // 商品数量

        double price = r.nextDouble() * 1000d;   // 单价1到1000
        item.setpPrice(price);

        int storeId = r.nextInt(5000); //5000个店铺
        item.setsId("store" + storeId);
        item.setsName("旗舰店" + storeId);
        item.setTotalPrice(item.getpPrice() * item.getpCount());

        int brandId = r.nextInt(5000);
        item.setpBrand("品牌" + brandId);

        int productId = r.nextInt(100);
        item.setpId(item.getsId() +"_" + productId);
        item.setpName("产品" + item.getpId());

        return item;
    }

批量获得订单并插入数据库代码如下,根据传入参数,插入数据库。

public void insertIntoOrders(int size) {
        System.out.println("start insert orders");
        List<OrderContract> list = new ArrayList<>();
        for (int i = 0; i < size; i++) {
            OrderContract order = createOrder();
            list.add(order);
        }

        userMapper.batchInsert(list);
        System.out.println("finish insert orders.");
    }

插入数据库代码:

// Mybatis配置sql
<insert id="batchInsert" parameterType="List">
    insert into order_contract(oId,create_time,pay_time,has_paid,c_id,c_name,p_brand,
    p_count,p_id,p_name,p_price,s_id,s_name,total_price)
    values
    <foreach collection="list" index="index" item="item" separator=",">
        (#{item.oId},#{item.createTime},#{item.payTime},#{item.hasPaid},#{item.cId},#{item.cName},#{item.pBrand},
        #{item.pCount},#{item.pId},#{item.pName},#{item.pPrice},#{item.sId},#{item.sName},#{item.totalPrice})
    </foreach>
</insert>

循环执行批量插入数据库逻辑,以达到批量生成订单数据的目的。

   public void initOrders() {
        while (true) {
            try {
                int size = r.nextInt(1000);
                insertIntoOrders(size);

                Thread.sleep(4000L);
            } catch (InterruptedException e) {
               break;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

同步结果

启动 Java 程序,可以在 Tablestore 中观察到新的订单记录持续的从 MySQL 库同步到 Tablestore 中。

手动导入 SQL

用户也可以手动执行 SQL 插入测试数据。使用下面的存储过程可以直接通过 SQL 写入测试数据。因为逐条插入,插入性能要比程序里的批量插入慢。

DROP PROCEDURE if EXISTS test;
DELIMITER //
CREATE procedure test()
BEGIN
DECLARE i INT;
DECLARE userId INT;
DECLARE c INT;
DECLARE price DOUBLE;
DECLARE storeId INT;
DECLARE brandId INT;
DECLARE productId INT;
DECLARE c_id VARCHAR(255);
SET i = 0;
WHILE i<1000 DO    // 这里的值决定写入记录数
SET userId=CEILING(RAND()*1000*10000);
SET c=CEILING(RAND()*10);
SET price=RAND()*1000;
SET storeId=CEILING(RAND()*5000);
SET brandId=CEILING(RAND()*5000);
SET productId=CEILING(RAND()*100);
SET c_id=CONCAT("user",userId);

INSERT INTO test(oId,create_time,pay_time,
has_paid,c_id,c_name,
p_brand,p_count,p_id,
p_name,p_price,s_id,
s_name,total_price) VALUES
(CONCAT(unix_timestamp(now()),"_",c_id), now(), now(),
true,c_id,CONCAT("客户",userId),
CONCAT("品牌",brandId),c,CONCAT("store",storeId,"_",productId),
CONCAT("产品store",storeId,"_",productId),price,CONCAT("store",storeId),
CONCAT("旗舰店",storeId),p_price * c
);
SET i = i+1;
END WHILE;
END
//
DELIMITER ;
CALL test();

总结

基于 DTS,我们可以实现 MySQL 数据向 Tablestore 的实时同步。数据进入 Tablestore 后,我们可以利用 Tablestore 的特性进行搜索、分析等操作。我们会在后续文章中进行说明。

附录

代码 git 地址:https://github.com/aliyun/tablestore-examples

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
1月前
|
存储 人工智能 NoSQL
阿里云表格存储 Tablestore 全面升级 AI 能力,存储成本直降 30%
近日,阿里云表格存储 Tablestore 宣布全面升级 AI 场景支持能力,正式推出 AI Agent 记忆存储功能,在保障高性能与高可用的同时,整体存储成本降低 30%,标志着 Tablestore 在构建 AI 数据处理和存储的技术内核能力上,迈出关键一步。
183 5
|
3月前
|
存储 机器学习/深度学习 缓存
软考软件评测师——计算机组成与体系结构(分级存储架构)
本内容全面解析了计算机存储系统的四大核心领域:虚拟存储技术、局部性原理、分级存储体系架构及存储器类型。虚拟存储通过软硬件协同扩展内存,支持动态加载与地址转换;局部性原理揭示程序运行特性,指导缓存设计优化;分级存储架构从寄存器到外存逐级扩展,平衡速度、容量与成本;存储器类型按寻址和访问方式分类,并介绍新型存储技术。最后探讨了存储系统未来优化趋势,如异构集成、智能预取和近存储计算等,为突破性能瓶颈提供了新方向。
|
29天前
|
存储 人工智能 NoSQL
阿里云表格存储 Tablestore 全面升级 AI 能力,存储成本直降 30%
让 AI 记得久、找得快、用得上,表格存储加速智能体记忆进化。
|
3月前
|
存储 关系型数据库 MySQL
成本直降30%!RDS MySQL存储自动分层实战:OSS冷热分离架构设计指南
在日均订单量超500万的场景下,MySQL数据年增200%,但访问集中在近7天(85%)。通过冷热数据分离,将历史数据迁移至OSS,实现存储成本下降48%,年省72万元。结合RDS、OSS与Redis构建分层架构,自动化管理数据生命周期,优化查询性能与资源利用率,支撑PB级数据扩展。
211 3
|
3月前
|
存储 关系型数据库 数据库
高性能云盘:一文解析RDS数据库存储架构升级
性能、成本、弹性,是客户实际使用数据库过程中关注的三个重要方面。RDS业界率先推出的高性能云盘(原通用云盘),是PaaS层和IaaS层的深度融合的技术最佳实践,通过使用不同的存储介质,为客户提供同时满足低成本、低延迟、高持久性的体验。
|
4月前
|
开发框架 Java 关系型数据库
在Linux系统中安装JDK、Tomcat、MySQL以及部署J2EE后端接口
校验时,浏览器输入:http://[your_server_IP]:8080/myapp。如果你看到你的应用的欢迎页面,恭喜你,一切都已就绪。
385 17
|
5月前
|
关系型数据库 MySQL Linux
CentOS 7系统下详细安装MySQL 5.7的步骤:包括密码配置、字符集配置、远程连接配置
以上就是在CentOS 7系统下安装MySQL 5.7的详细步骤。希望这个指南能帮助你顺利完成安装。
1401 26
|
3月前
|
人工智能 运维 关系型数据库
数据库运维:mysql 数据库迁移方法-mysqldump
本文介绍了MySQL数据库迁移的方法与技巧,重点探讨了数据量大小对迁移方式的影响。对于10GB以下的小型数据库,推荐使用mysqldump进行逻辑导出和source导入;10GB以上可考虑mydumper与myloader工具;100GB以上则建议物理迁移。文中还提供了统计数据库及表空间大小的SQL语句,并讲解了如何使用mysqldump导出存储过程、函数和数据结构。通过结合实际应用场景选择合适的工具与方法,可实现高效的数据迁移。
665 1
|
4月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!

热门文章

最新文章

推荐镜像

更多