前言
前文架构篇,可以看到 MySQL + Tablestore 非常适合大规模订单系统这一类需求场景。那么,我们首先要做的是,利用 CDC(Change Data Capture) 技术将订单数据实时从 MySQL 同步到 Tablestore 中。对于订单系统的数据同步,我们需要关注同步的稳定性、实时性。目前,存在多款工具可以实现这一功能,他们有的是开源工具如 Canal,有的是阿里云端服务如 DTS。下面我们将对各种同步工具进行介绍,并以 DTS 为例展示同步操作。
同步工具介绍
DataX
DataX 是异构数据源离线同步的工具,支持多种异构数据源之间高效的数据同步。它使用 SQL 从数据库拉取数据,全内存操作。它具有一下特点:
-
适合进行离线全量同步,不适合支持增量同步。
-
可以通过编程来进行增量同步,但有一定延时,源表需要通过字段区分哪些记录为待同步字段,且无法捕获删除操作。
-
单点执行。
因此,它适合中小用户,同步对实时性无太高要求的数据。其具体使用见:数据同步-从MySQL到Tablestore。
Canal
Canal是阿里开源 CDC 工具,他可以获取 MySQL binlog 并解析,并将数据变动传输给下游。详情可参考Canal官网。基于 Canal,可以实现从 MySQL 到其他数据库的实时同步。Canal 部署简单、成本低,适合中小规模 Mysql 数据库向其他数据库的同步工作。Tablestore 团队已经在 Canal 中实现了 Tablestore 适配器,可以支持将 MySQL 数据同步进入 Tablestore,具体细节请参考后续文章。
DTS
数据传输服务(Data Transmission Service,简称DTS)支持关系型数据库、NoSQL、大数据(OLAP)等数据源,集数据迁移、订阅及实时同步功能于一体,能够解决公共云、混合云场景下,远距离、毫秒级异步数据传输难题。其特点为:
-
基于云部署,只需要简单配置就可以运行
-
基于 binlog,可以实时同步数据
-
费用相对于 DataX 高
因此,目前,中小型用户,对实时性要求没有很高的用户,可以使用 DataX 进行 MySQL 到 Tablestore 的同步。而企业级用户,或者对于延迟要求比较高的客户,推荐使用 DTS 进行数据同步。 本文会展示如何完成基于 DTS 从 MySQL 到 Tablestore 的同步系统的搭建。而这套同步系统正是订单数据上Tablestore 的第一步工作。
服务开通
创建 MySQL 并建表
在 RDS 上申请源的 MySQL 数据库,可以参考创建RDS MySQL实例。已经在 RDS 上或者 ECS 上拥有 MySQL 实例的同学可以忽略这一步。
在数据库中创建订单表 order_contract,建表语句如下:
CREATE TABLE `order_contract` (
`oId` varchar(50) NOT NULL,
`create_time` datetime NOT NULL COMMENT '下单时间',
`pay_time` datetime DEFAULT NULL COMMENT '支付时间',
`has_paid` tinyint(4) DEFAULT NULL COMMENT '是否已经支付',
`c_id` varchar(20) DEFAULT NULL COMMENT '消费者id',
`c_name` varchar(20) DEFAULT NULL COMMENT '消费者姓名',
`p_brand` tinytext COMMENT '产品品牌',
`p_count` mediumint(9) DEFAULT NULL COMMENT '产品数量',
`p_id` varchar(20) DEFAULT NULL COMMENT '产品id',
`p_name` varchar(20) DEFAULT NULL COMMENT '产品名',
`p_price` decimal(16,2) DEFAULT NULL COMMENT '产品价格',
`s_id` varchar(20) DEFAULT NULL COMMENT '店铺id',
`s_name` varchar(20) DEFAULT NULL COMMENT '店铺名称',
`total_price` decimal(16,2) DEFAULT NULL COMMENT '总价格',
PRIMARY KEY (`oId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
;
开通 Tablestore 服务
提供了三种方式创建 Tablestore 实例,读者请自由选择一种创建。
阿里云 CLI (POP Client) 创建按量模式实例
通过阿里云官网下载安装并配置阿里云 CLI。具体参考 阿里云 CLI。完毕后,打开命令行,输入
aliyun ots InsertInstance --endpoint ots.cn-hangzhou.aliyuncs.com --InstanceName test-20210609
用于创建实例。其中 endpoint 填入实例所在地域域名。然后在 Tabelstore 控制台即可看到创建出的实例如图。
在 Cloud Shell 中可以同样使用此指令建立 Tablestore 实例,Cloud Shell 地址见 Cloud Shell 地址。
控制台创建按量模式实例
进入Tablestore首页。点击进入管理控制台。
点击创建实例。
选择按量模式。填入实例名称。
点击确定,完成创建。然后可以在控制台首页看到对应实例。
控制台创建预留模式实例
此创建过程与控制台创建按量模式实例基本相同,只是在选择购买方式时选择预留模式。
开通配置 DTS
进入DTS首页。点击立即购买。
商品类型选择数据传输服务 Tablestore(后付费),功能选择数据同步,源实例选择 MySQL ,目标实例选择 Tablestore ,同步拓扑选择单向同步。
若有压测等需要大量同步记录的需求,同步链路规格参数可以适当选大,每种链路的传输数据能力可以在页面上查看。
点击立即购买。弹出如下页面,勾选服务协议,然后点击立即开通,即完成购买。
购买成功后,进入管理控制台,点击同步实例下面的数字“1”,可以看见尚未配置的实例信息。
点击配置同步链路。
配置源库 MySQL 和目标库 Tablestore 的信息。AccessKey 配置参考:获取AccessKey。配置完成后点击授权白名单并进入下一步。
在选择同步对象页,同步初始化这里,结构初始化、全量数据初始化、增量数据初始化这三个选项都要进行勾选。源库对象,选择源表 order_contract,点击中间红框中的按钮,将表传送到右侧“已选中对象”。
点击编辑,
将 pay_time 的数据类型映射为 Integer。这样在 Tablestore 中的 pay_time字段是以微秒为单位的时间戳。点击确定。
配置完成后,向下滑动窗口,点击“下一步”进入高级配置。继续点击预检查并启动,进行启动。在控制台的数据同步页,可以看到刚刚配置的同步任务。
数据同步测试
字段说明
通过前面,我们申请好了充当订单库的 MySQL 数据库,Tablestore 实例,并且搭建了 DTS 任务从 MySQL 向Tablestore 同步数据。下面,我们简要写一个 Java 程序,持续向 MySQL 中写入订单数据,以验证 DTS 的持续同步能力。生成的订单记录,各字段加入一些随机性,以构造更加真实的测试数据。各字段生成逻辑见表。
字段 |
字段含义 |
取值说明 |
oId |
订单号 |
使用当前时间戳 + c_id,例如1623228187366_user2 |
create_time |
下单时间 |
取当前时间 |
pay_time |
支付时间 |
取当前时间,不做更细化仿真,假设每笔订单下单同时支付。 |
has_paid |
是否已经支付 |
设定为true,这里不对此字段进行仿真。 |
c_id |
消费者id |
取一千万以内的随机整数,假设有一千万消费者。消费者id为“user” + id格式,比如“user1” |
c_name |
消费者姓名 |
使用“客户” + 消费者id的格式,比如1号消费者,姓名为“客户1” |
p_brand |
产品品牌 |
格式为“品牌id”,id为5000以内随机整数,仿真5000个品牌 |
p_count |
产品数量 |
1到10取随机整数 |
p_id |
产品id |
格式为"store1_id", id为100以内随机数,假设每个店铺有100个产品,例如store1_1 |
p_name |
产品名 |
格式为“产品” + p_id,比如“产品store4075_25” |
p_price |
产品价格 |
0到1000元随机浮点数 |
s_id |
店铺id |
5000以内随机整数,假设有5000家店铺。店铺id为“store” + id 格式,比如“store1” |
s_name |
店铺名称 |
使用“旗舰店” + id的格式,比如1号店铺,id为“store1”,店铺名称为“旗舰店1” |
total_price |
总价格 |
p_count * p_price |
程序说明
搭建 Springboot 项目,其中创建订单代码如下,代码中包含随机生成参数的逻辑。
// 创建订单
private OrderContract createOrder() {
long now = System.currentTimeMillis();
LocalDateTime nowT = LocalDateTime.now();
int cNumber = r.nextInt(1000 * 10000); // 一千万用户
String userId = "user" + cNumber;
String oId = now + "_" + userId;
OrderContract item = new OrderContract();
item.setoId(oId);
item.setCreateTime(nowT);
item.setPayTime(nowT);
item.setHasPaid(true);
item.setcId(userId);
item.setcName("客户" + cNumber);
int count = r.nextInt(10) + 1;
item.setpCount(count); // 商品数量
double price = r.nextDouble() * 1000d; // 单价1到1000
item.setpPrice(price);
int storeId = r.nextInt(5000); //5000个店铺
item.setsId("store" + storeId);
item.setsName("旗舰店" + storeId);
item.setTotalPrice(item.getpPrice() * item.getpCount());
int brandId = r.nextInt(5000);
item.setpBrand("品牌" + brandId);
int productId = r.nextInt(100);
item.setpId(item.getsId() +"_" + productId);
item.setpName("产品" + item.getpId());
return item;
}
批量获得订单并插入数据库代码如下,根据传入参数,插入数据库。
public void insertIntoOrders(int size) {
System.out.println("start insert orders");
List<OrderContract> list = new ArrayList<>();
for (int i = 0; i < size; i++) {
OrderContract order = createOrder();
list.add(order);
}
userMapper.batchInsert(list);
System.out.println("finish insert orders.");
}
插入数据库代码:
// Mybatis配置sql
<insert id="batchInsert" parameterType="List">
insert into order_contract(oId,create_time,pay_time,has_paid,c_id,c_name,p_brand,
p_count,p_id,p_name,p_price,s_id,s_name,total_price)
values
<foreach collection="list" index="index" item="item" separator=",">
(#{item.oId},#{item.createTime},#{item.payTime},#{item.hasPaid},#{item.cId},#{item.cName},#{item.pBrand},
#{item.pCount},#{item.pId},#{item.pName},#{item.pPrice},#{item.sId},#{item.sName},#{item.totalPrice})
</foreach>
</insert>
循环执行批量插入数据库逻辑,以达到批量生成订单数据的目的。
public void initOrders() {
while (true) {
try {
int size = r.nextInt(1000);
insertIntoOrders(size);
Thread.sleep(4000L);
} catch (InterruptedException e) {
break;
} catch (Exception e) {
e.printStackTrace();
}
}
}
同步结果
启动 Java 程序,可以在 Tablestore 中观察到新的订单记录持续的从 MySQL 库同步到 Tablestore 中。
手动导入 SQL
用户也可以手动执行 SQL 插入测试数据。使用下面的存储过程可以直接通过 SQL 写入测试数据。因为逐条插入,插入性能要比程序里的批量插入慢。
DROP PROCEDURE if EXISTS test;
DELIMITER //
CREATE procedure test()
BEGIN
DECLARE i INT;
DECLARE userId INT;
DECLARE c INT;
DECLARE price DOUBLE;
DECLARE storeId INT;
DECLARE brandId INT;
DECLARE productId INT;
DECLARE c_id VARCHAR(255);
SET i = 0;
WHILE i<1000 DO // 这里的值决定写入记录数
SET userId=CEILING(RAND()*1000*10000);
SET c=CEILING(RAND()*10);
SET price=RAND()*1000;
SET storeId=CEILING(RAND()*5000);
SET brandId=CEILING(RAND()*5000);
SET productId=CEILING(RAND()*100);
SET c_id=CONCAT("user",userId);
INSERT INTO test(oId,create_time,pay_time,
has_paid,c_id,c_name,
p_brand,p_count,p_id,
p_name,p_price,s_id,
s_name,total_price) VALUES
(CONCAT(unix_timestamp(now()),"_",c_id), now(), now(),
true,c_id,CONCAT("客户",userId),
CONCAT("品牌",brandId),c,CONCAT("store",storeId,"_",productId),
CONCAT("产品store",storeId,"_",productId),price,CONCAT("store",storeId),
CONCAT("旗舰店",storeId),p_price * c
);
SET i = i+1;
END WHILE;
END
//
DELIMITER ;
CALL test();
总结
基于 DTS,我们可以实现 MySQL 数据向 Tablestore 的实时同步。数据进入 Tablestore 后,我们可以利用 Tablestore 的特性进行搜索、分析等操作。我们会在后续文章中进行说明。