阿里云DTS同步binlog实战

本文涉及的产品
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用版 2核4GB 50GB
简介: 阿里云DTS同步binlog实战

阿里云DTS同步binlog实战



首先在阿里云DTS订阅中配置你要订阅的库表,其次创建消费者组,最后就是写代码消费binlog即可。


一、创建的配置信息



从这里得到topic以及broker信息。



从这里得到消费者组ID。


二、写代码消费



  1. 创建消费类
import com.aliyun.dts.subscribe.clients.ConsumerContext;
import com.aliyun.dts.subscribe.clients.DTSConsumer;
import com.aliyun.dts.subscribe.clients.DefaultDTSConsumer;
import com.aliyun.dts.subscribe.clients.common.RecordListener;
import com.aliyun.dts.subscribe.clients.record.OperationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.Map;
@Component
public class DTSConsumerSubscribe {
    // 自己实现binlog解析组件
    @Resource
    MysqlRecordConsumeListener mysqlRecordConsumeListener;
    private static final Logger log = LoggerFactory.getLogger(DTSConsumerSubscribe.class);
    private final DTSConsumer dtsConsumer; // DTS消费实例
    public DTSConsumerSubscribe() {
        // kafka broker url
        String brokerUrl = "xxxxxxx";
        // topic to consume, partition is 0
        String topic = "xxxxxx";
        String sid = "ccc"; // 消费者组ID
        String userName = "xxx"; // 消费者组名称
        String password = "xxx"; // 消费者组密码
        // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019))
        String initCheckpoint = "1693567696";
        // when use subscribe mode, group config is required. kafka consumer group is enabled
        ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN;
        this.dtsConsumer = initDTSClient(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode);
    }
    public DTSConsumerSubscribe(String brokerUrl, String topic, String sid, String userName, String password,
                                String checkpoint, ConsumerContext.ConsumerSubscribeMode subscribeMode) {
        this.dtsConsumer = initDTSClient(brokerUrl, topic, sid, userName, password, checkpoint, subscribeMode);
    }
    // 初始化DTS客户端
    private DTSConsumer initDTSClient(String brokerUrl, String topic, String sid, String userName, String password,
                                      String initCheckpoint, ConsumerContext.ConsumerSubscribeMode subscribeMode) {
        ConsumerContext consumerContext = new ConsumerContext(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode);
        consumerContext.setForceUseCheckpoint(false);
        DTSConsumer dtsConsumer = new DefaultDTSConsumer(consumerContext);
        dtsConsumer.addRecordListeners(buildRecordListener());
        return dtsConsumer;
    }
    // 构建binlog监听器 
    public Map<String, RecordListener> buildRecordListener() {
        // user can impl their own listener
        RecordListener mysqlRecordPrintListener = record -> {
            OperationType operationType = record.getOperationType();
            if(operationType.equals(OperationType.INSERT)
                    || operationType.equals(OperationType.UPDATE)
                    || operationType.equals(OperationType.DELETE)
                    || operationType.equals(OperationType.DDL)) {
                // consume record
                mysqlRecordConsumeListener.consume(record);
                record.commit(""); // 提交消费位点
            }
        };
        return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener);
    }
    // 开启消费
    public void start() {
        System.out.println("Start DTS subscription client...");
        dtsConsumer.start();
    }
    // 关闭消费
    public void stop() {
        System.out.println("Stop DTS subscription client");
        dtsConsumer.close();
    }


  1. 创建自己的解析器
@Component
public class MysqlRecordConsumeListener implements RecordListener {
    private static final Logger log = LoggerFactory.getLogger(MysqlRecordConsumeListener.class);
    public void consume(DefaultUserRecord record) {
        OperationType operationType = record.getOperationType();
        RecordSchema recordSchema = record.getSchema();
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("\n").append("RecordID [").append(record.getId()).append("]\n").append("RecordTimestamp [").append(record.getSourceTimestamp()).append("] \n").append("Source [").append(recordSchema.getDatabaseInfo()).append("]\n").append("RecordType [").append(record.getOperationType()).append("]\n");
        if (operationType.equals(OperationType.INSERT) || operationType.equals(OperationType.UPDATE) || operationType.equals(OperationType.DELETE) || operationType.equals(OperationType.DDL)) {
            stringBuilder.append("Schema info [").append(recordSchema.toString()).append("]\n").append("Before image {").append(record.getBeforeImage()).append("}\n").append("After image {").append(record.getAfterImage()).append("}\n");
            parseRecord(operationType, record);
        }
        log.info(stringBuilder.toString());
    }
    public void parseRecord(OperationType operationType, DefaultUserRecord record) {
        log.info("解析binlog record:{}", record);
        if (OperationType.INSERT.equals(operationType)) { // 新增只有After 对于After,取出自增字段id查询即可
            log.info("=====INSERT getValues=====");
        } else if (OperationType.UPDATE.equals(operationType)) { // 修改有Before和After 取出自增字段id查询即可
            log.info("=====UPDATE getValues=====");
        } else if (OperationType.DELETE.equals(operationType)) { // 删除只有Before 取出自增字段id查询即可
            log.info("=====DELETE getValues=====");
        }
    }}


拿到binlog之后就可以轻松关联相关表做些操作了。大家可以看下UPDATE的binlog的打印日志:


Before image {[Field [id] [4]
Field [user_id] [1002316]
Field [user_name] [ascacac]
Field [goods_id] [1054]
Field [course_id] [null]
Field [course_name] [null]
Field [current_status] [null]
]}
After image {[Field [id] [4]
Field [user_id] [1002316]
Field [user_name] [jamlee]
Field [goods_id] [1054]
Field [course_id] [null]
Field [course_name] [null]
Field [current_status] [null]
]}


  1. 写task跑起来
@Component
public class UserRightDtsSubscribe implements ApplicationRunner, DisposableBean {
    @Resource
    DTSConsumerSubscribe dtsConsumerSubscribe;
    @Override
    public void destroy() throws Exception {
        log.info("关闭用户订单数据同步...");
        dtsConsumerSubscribe.stop();
    }
    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("启动用户订单数据同步...");
        dtsConsumerSubscribe.start();
    }
}


三、总结


阿里云DTS这种订阅消费binlog方式非常方便,尤其在做增量数据同步或者修改等操作的时候。当然也可以通过别的方式同步binlog,比如canal等。但是binlog这种同步方式有缺点吗或者会导致哪些问题呢?

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
3月前
|
关系型数据库 MySQL 数据挖掘
阿里云 SelectDB 携手 DTS ,一键实现 TP 数据实时入仓
DTS 作为阿里云核心的数据交互引擎,以其高效的实时数据流处理能力和广泛的数据源兼容性,为用户构建了一个安全可靠、可扩展、高可用的数据架构桥梁。阿里云数据库 SelectDB 通过与 DTS 联合,为用户提供了简单、实时、极速且低成本的事务数据分析方案。用户可以通过 DTS 数据传输服务,一键将自建 MySQL / RDS MySQL / PolarDB for MySQL 数据库,迁移或同步至阿里云数据库 SelectDB 的实例中,帮助企业在短时间内完成数据迁移或同步,并即时获得深度洞察。
阿里云 SelectDB 携手 DTS ,一键实现 TP 数据实时入仓
|
1月前
|
SQL 负载均衡 安全
阿里云DTS踩坑经验分享系列|全量迁移加速方法指南
阿里云数据传输服务DTS是一个便捷、高效的数据迁移和数据同步服务。一般而言,一个完整的DTS数据迁移任务主要包括预检查、结构迁移,全量迁移,增量迁移等阶段,其中全量迁移会将源数据库的存量数据全部迁移到目标数据库。面对各种各样的用户场景, 本文将重点介绍如何使用阿里云DTS实现全量数据迁移加速,以缩短迁移时间,确保数据迁移的效率和稳定性。
195 0
|
3月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用合集之在同步MySQL的时候卡在某个binlog文件处如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用合集之在同步MySQL的时候卡在某个binlog文件处如何解决
|
2月前
|
关系型数据库 MySQL 分布式数据库
PolarDB操作报错合集之当使用DTS(数据传输服务)同步的表在目标库中进行LEFT JOIN查询时遇到异常,是什么导致的
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。
|
3月前
|
SQL 运维 关系型数据库
阿里云DTS踩坑经验分享系列|数据不一致修复大法
阿里云数据传输服务DTS在帮助用户迁移数据、同步数据时,在某些复杂场景下会出现源库与目标库数据不一致的问题,造成数据错误,给用户带来困扰。由于数据不一致的问题很难完全避免,为了及时修复不一致的数据,DTS产品推出数据订正功能,保障用户在同步\迁移数据时的数据一致性。本文介绍了产生数据不一致的一些典型场景,并重点阐述了如何使用DTS数据订正功能来修复不一致的数据。
413 4
|
3月前
|
存储 缓存 NoSQL
Redis与数据库同步指南:订阅Binlog实现数据一致性
本文由开发者小米分享,探讨分布式系统中的一致性问题,尤其是数据库和Redis一致性。文章介绍了全量缓存策略的优势,如高效读取和稳定性,但也指出其一致性挑战。为解决此问题,提出了通过订阅数据库的Binlog实现数据同步的方法,详细解释了工作原理和步骤,并分析了优缺点。此外,还提到了异步校准方案作为补充,以进一步保证数据一致性。最后,提醒在实际线上环境中需注意日志记录、逐步优化和监控报警。
174 3
|
3月前
|
SQL 算法 关系型数据库
实时计算 Flink版产品使用合集之全量历史数据比较多,全量同步阶段时间长,是否会同时读取binlog进行合并输出
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之使用DTS从RDSMySQL数据库同步数据到云Kafka,增量同步数据延迟时间超过1秒。如何诊断问题并降低延迟
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
NoSQL 关系型数据库 数据库
数据传输服务DTS(Data Transmission Service)是阿里云提供的实时数据流服务
【2月更文挑战第29天】数据传输服务DTS(Data Transmission Service)是阿里云提供的实时数据流服务
42 5
|
3月前
|
关系型数据库 MySQL 数据库
使用阿里云的数据传输服务DTS(Data Transmission Service)进行MySQL 5.6到MySQL 8.0的迁移
【2月更文挑战第29天】使用阿里云的数据传输服务DTS(Data Transmission Service)进行MySQL 5.6到MySQL 8.0的迁移
267 2