使用DTS同步MySQL增量数据到Tablestore

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 摘要 从MySQL到Tablestore的全量数据导出可以参考同系列文章《数据同步-从MySQL到Tablestore》,本文主要介绍将MySQL的增量数据同步到Tablestore的一种方式——使用阿里集团的数据传输服务DTS的数据订阅功能做增量数据的读取以及改写。

摘要

从MySQL到Tablestore的全量数据导出可以参考同系列文章《数据同步-从MySQL到Tablestore》,本文主要介绍将MySQL的增量数据同步到Tablestore的一种方式——使用阿里集团的数据传输服务DTS的数据订阅功能做增量数据的读取以及改写。
注意:DTS数据订阅服务支持多种数据库环境,老版现不支持MySQL8.0,使用sdk进行消费;新版新增了分组消费概念,需要使用Kafka客户端消费订阅数据。本文以RDS(MySQL 5.7)订阅为例,使用sdk完成增量数据订阅与改写。

原理介绍

_2_

导出步骤

1.源、目的数据库资源

源数据库:

RDS(新建实例)/实例[pingsheng]/数据库[pingstest]/表[to_tablestore]
数据表结构如图
_00002

目的数据库:

Tablestore(新建实例)/实例[pingsheng]/表[from_rds]
数据表结构如图
_00010

2.云账号资源

准备具有源、目的数据库读写权限的一组云账号AK

3.DTS数据订阅

创建订阅通道参考,选择上述源数据库实例为数据源配置订阅信息
_001
选择需要订阅的数据表
_003
通过数据源预检查后,数据订阅配置完成,进入初始化阶段大约需要等待十分钟。初始化完成后,数据订阅状态变为“正常”即可以开始消费增量数据。增量数据的消费点从界面可以看到,支持动态调整参考文档
_004
从控制台的“订阅数据”可以看到已经拉取到的部分展示数据
_005
从DTS拉取到的增量数据是经过解析和再封装的,增添了一些解释参数,订阅数据的各字段含义参考

4.订阅数据的解析与改写

从DTS读取MySQL增量数据

下载DTS的SDK,在本地(ECS)进行编译,参考
_006
在数据订阅“更多”中下载示例代码,替换掉AK信息、订阅ID,编译启动程序尝试获取增量数据,测试rds数据表中若无增量,会每隔1s收到一条“heartbeat”心跳记录
_007
尝试在源数据表insert、update数据,会打印出以Opt:begin开头,包含Opt:insert、update,以Opt:commit结尾的多行数据。修改代码仅保留改写数据需要的操作类型“Opt”和行信息的前后镜像“FieldList”

public void notify(List<ClusterMessage> messages) throws Exception {
  for (ClusterMessage message : messages) {
    // debug
    System.out.println(message.getRecord().getOpt());
    System.out.println(message.getRecord().getFieldList());
    //you must call ackAsConsumed when you consume the data
    message.ackAsConsumed();
  }
}

//BEGIN
//[]
//UPDATE
//[Field name: pk1  //依次输出各列的前、后镜像
//Field type: 3
//Field length: 2
//Field value: 83
//,Field name: pk1
//Field type: 3
//Field length: 2
//Field value: 80
//, Field name: pk2
//Field type: 3
//Field length: 1
//Field value: 3
//, Field name: pk2
//Field type: 3
//Field length: 1
//Field value: 3
//, Field name: v1
//Field type: 3
//Field length: 2
//Field value: 47
//, Field name: v1
//Field type: 3
//Field length: 2
//Field value: 50
//]
//COMMIT
//[]

将增量数据写入Tablestore

下载Tablestore的SDK ,本地(ECS)进行编译参考
调用单行数据操作,将增、删、改的行写入Tablestore参考

//PutRow
private static void putRow(SyncClient client, String pkValue, MyColumnValue columnvalue) {
    // 构造主键
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
    PrimaryKey primaryKey = primaryKeyBuilder.build();
    RowPutChange rowPutChange = new RowPutChange(TABLE_NAME, primaryKey);
    //加入属性列
    rowPutChange.addColumn(new Column("v1", columnvalue.getv1()));
    rowPutChange.addColumn(new Column("v2", columnvalue.getv2()));
    client.putRow(new PutRowRequest(rowPutChange));
}
//DeleteRow
private static void deleteRow(SyncClient client, String pkValue) {
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
    PrimaryKey primaryKey = primaryKeyBuilder.build();
    RowDeleteChange rowDeleteChange = new RowDeleteChange(TABLE_NAME, primaryKey);
    client.deleteRow(new DeleteRowRequest(rowDeleteChange));
}

注意:涉及主键的Update,需要查分成Delete+Put两步操作
_010
->
_011

源码参考

下载

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
6月前
|
canal 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行整库同步MySQL数据到StarRocks时,遇到全量数据可以同步,但增量数据无法同步,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
NoSQL 安全 容灾
阿里云DTS踩坑经验分享系列|Redis迁移、同步
阿里云数据传输服务DTS在帮助用户迁移Redis数据、同步数据时,在某些复杂场景下会出现报错,或者源库与目标库数据不一致的问题,给用户带来困扰。本文介绍了DTS Redis到Redis迁移、同步过程中的典型问题,以帮助用户更好地使用DTS。
219 2
|
4月前
|
NoSQL MongoDB 数据库
DTS 的惊天挑战:迁移海量 MongoDB 数据时,捍卫数据准确完整的生死之战!
【8月更文挑战第7天】在数字化时代,大数据量的MongoDB迁移至关重要。DTS(数据传输服务)通过全面的数据评估、可靠的传输机制(如事务保证一致性)、异常处理(如回滚或重试),以及迁移后的数据校验来确保数据准确无损。DTS还处理数据转换与映射,即使面对不同数据库结构也能保持数据完整性,为企业提供可靠的数据迁移解决方案。
66 2
|
5月前
|
关系型数据库 MySQL 数据库
关系型数据库mysql数据增量恢复
【7月更文挑战第3天】
162 2
|
6月前
|
分布式计算 大数据 关系型数据库
MaxCompute产品使用问题之如何实现MySQL的实时增量同步
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
105 6
|
6月前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用问题之dts是否支持传输数据到mc主键表2.0
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6月前
|
canal 关系型数据库 MySQL
蓝易云 - 详解canal同步MySQL增量数据到ES
以上就是使用Canal同步MySQL增量数据到Elasticsearch的基本步骤。在实际操作中,可能还需要根据具体的业务需求和环境进行一些额外的配置和优化。
173 2
|
6月前
|
SQL 监控 关系型数据库
实时计算 Flink版产品使用问题之使用mysql cdc配置StartupOptions.initial()全量之后就不增量了,是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
存储 数据采集 NoSQL
DTS在迁移大数据量的MongoDB数据库时如何保证数据的准确性和完整性?
【6月更文挑战第4天】DTS在迁移大数据量的MongoDB数据库时如何保证数据的准确性和完整性?
146 1
|
6月前
|
关系型数据库 MySQL 分布式数据库
PolarDB操作报错合集之当使用DTS(数据传输服务)同步的表在目标库中进行LEFT JOIN查询时遇到异常,是什么导致的
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。