数据异构重器之 Canal 初探

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 数据异构重器之 Canal 初探

1、应用场景


提到 Canal,大家应该都能想到这是一个用于解析 MySQL binlog 日志的工具,并将 MySQL 数据库中数据同步到其他存储介质中,例如 Elasticsearch。


即 Canal 一个非常常用的使用场景:数据异构,一种更高级别的数据读写分离架构设计方法。


随着业务不断的发展,企业发展到一定阶段,发现单体的关系型数据库已无法支撑业务高速发展带来数据不断累积的压力,从而会诞生出一种设计架构:分库分表。分库分表对缓解单库数据库压力确实是一种非常好的解决方案,但又衍生出另外一种困境,关联查询不友好,甚至跨库JOIN就更加如此。


举例说明如下:例如一个订单系统,通常有两类用户需要去查询订单,一类是顾客,一类是商家,在对数据库进行分库分表时,如果以顾客(buy_id)进行分库的话,同一个商家的订单数据会分布在不同的库中,如果以商家(shop_id)进行分库的话,同一个用户购买的所有订单数据将会分布在不同的库中,这样进行关联查询,就必然需要跨库进行join,其成本都会偏高。而且上面的场景只能满足一方的需求,那如何是好呢?


Canal 这个时候就闪亮登场了,在电商设计中,其实商家、顾客会被拆分成两个不同的服务,我们可以为两个不同的服务搭建不同的数据库集群,我们可以用户订单库、商家订单库进行分库,以用户订单库为主库,当用户在订单系统下单后,数据进入到用户订单库中,然后可以通过 canal 监听数据库的binlog日志,然后将数据再同步到商家订单库,而用户订单库以用户ID为维度进行分库,商家订单库以商家ID做分库,完美解决问题。


2、架构设计原理


在了解到 Canal 的基本使用场景后,我们通过 canal 官方文档,去探究一下其核心架构设计理念,以此打开进入 Canal 的神秘世界中。


首先我们简单看一下 MySQL 的主从同步原理:

06f75a37fa9e654784cc9915914c2972.png

从上面的图中可以看成主从复制主要分成三个步骤:


  • master将改变记录到二进制日志(binary log ) 中( 这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看)
  • slave将master的binary log events拷贝到它的中继日志(relay log)
  • slave重做中继日志中的事件,将改变反映它自己的数据。


基于 MySQL 这种数据同步机制,那 Canal 的设计目标主要就是实现数据的同步,即数据的复制,从上面的图自然而然的想到了如下的设计:

0b83a82152bbdc51f1fddc419b9c762b.png

原理相对比较简单:


  • canal 模拟 mysql slave 的交互协议,伪装自己为 mysql slave,向 mysql master 发送 dump 协议
  • mysql master 收到 dump 请求,开始推送 binary log 给 slave (canal)
  • canal解析 binary log 对象(原始为byte流)


接下来我们来看一下 Canale 的整体组成部分:

804af27ff55559d40856874dff9a3479.png

说明:


  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1..n个instance)


instance模块:


  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

这些组件我暂时不打算深入去研究,因为在目前这个阶段我自己也不清楚,但这个是我后续需要学习研究的重点。

3、在 IntelliJ IDEA 中运行 Demo


在 Linux 环境中安装 canal 比较简单,大家可以按照官方手册一步一步操作即可,在这里我就不重复介绍,本节主要的目的是希望在开发工具中运行 Canal 的 Demo,以便后续在研究源码的过程中遇到难题时可以进行 Debug。


温馨提示:大家在学习过程中,可以根据官方文档先安装一遍 canal,对理解 Canal 的核心组件有着非常重要的帮助。


首先先从 canal 源码中寻找官方提供的 Demo,其示例代码在 example 包中,如下图所示:

aade1e92eef039bbc592fd65b893ce2a.png


但是另外稍微遗憾的是 canal 提供提供的示例代码中只包含了 client 端相关的代码,并没有包含服务端(server),故我们将目光放到其单元测试中,如下图所示:

3e3ec722b702786c6277f40767727257.png

下来我根据官方的一些提示,结合自己的理解,编写出如下测试代码,在 IDEA 开发工具中实现运行 Canal 相关的 Demo。下面的代码已通过测试,可直接使用。


1、Canal Server Demo


package com.alibaba.otter.canal.server;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class CanalServerTestMain {
    protected static final String ZK_CLUSTER_ADDRESS      = "127.0.0.1:2181";
    protected static final String DESTINATION   = "example";
    protected static final String DETECTING_SQL = "select 1";
    protected static final String MYSQL_ADDRESS = "127.0.0.1";
    protected static final String USERNAME      = "canal";
    protected static final String PASSWORD      = "canal";
    protected static final String FILTER        = ".\\*\\\\\\\\..\\*";
    /** 默认 500s 后关闭 */
    protected static final long RUN_TIME = 120 * 1000;
    private final ByteBuffer header        = ByteBuffer.allocate(4);
    private CanalServerWithNetty nettyServer;
    public static void main(String[] args) {
        CanalServerTestMain test = new CanalServerTestMain();
        try {
            test.setUp();
            System.out.println("start");
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            System.out.println("sleep");
            try {
                Thread.sleep(RUN_TIME);
            } catch (Throwable ee) {
            }
            test.tearDown();
            System.out.println("end");
        }
    }
    public void setUp() {
        CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();
        embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
            public CanalInstance generate(String destination) {
                Canal canal = buildCanal();
                return new CanalInstanceWithManager(canal, FILTER);
            }
        });
        nettyServer = CanalServerWithNetty.instance();
        nettyServer.setEmbeddedServer(embeddedServer);
        nettyServer.setPort(11111);
        nettyServer.start();
        // 启动 instance
        embeddedServer.start("example");
    }
    public void tearDown() {
        nettyServer.stop();
    }
    private Canal buildCanal() {
        Canal canal = new Canal();
        canal.setId(1L);
        canal.setName(DESTINATION);
        canal.setDesc("test");
        CanalParameter parameter = new CanalParameter();
        //parameter.setZkClusters(Arrays.asList(ZK_CLUSTER_ADDRESS));
        parameter.setMetaMode(CanalParameter.MetaMode.MEMORY);
        parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT);
        parameter.setIndexMode(CanalParameter.IndexMode.MEMORY);
        parameter.setStorageMode(CanalParameter.StorageMode.MEMORY);
        parameter.setMemoryStorageBufferSize(32 * 1024);
        parameter.setSourcingType(CanalParameter.SourcingType.MYSQL);
        parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306),
                new InetSocketAddress(MYSQL_ADDRESS, 3306)));
        parameter.setDbUsername(USERNAME);
        parameter.setDbPassword(PASSWORD);
        parameter.setSlaveId(1234L);
        parameter.setDefaultConnectionTimeoutInSeconds(30);
        parameter.setConnectionCharset("UTF-8");
        parameter.setConnectionCharsetNumber((byte) 33);
        parameter.setReceiveBufferSize(8 * 1024);
        parameter.setSendBufferSize(8 * 1024);
        parameter.setDetectingEnable(false);
        parameter.setDetectingIntervalInSeconds(10);
        parameter.setDetectingRetryTimes(3);
        parameter.setDetectingSQL(DETECTING_SQL);
        canal.setCanalParameter(parameter);
        return canal;
    }
}

2、Canal Client Demo

package com.alibaba.otter.canal.example;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
public class SimpleCanalClientExample {
    public static void main(String[] args) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 3000;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

运行 client 的效果如下图所示:


4ae3e6c55049f8960555f6d1808c3b18.png

在数据库中变更一条数据,以便产生新的binlog日志,其输出结果如下:

530fe710e7d0f0a37e632685c2fb0449.png

能在 IDEA 中搭建并运行 Demo,是我们踏入 canal 的第一步,后续将根据官方文档中的内容为提纲,尝试逐步解开 canal 的实现原理,以便更好的指导实践。

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
8月前
|
消息中间件 存储 负载均衡
【亿级数据专题】「分布式消息引擎」 盘点本年度我们探索服务的HA高可用解决方案
昔之善战者,先为不可胜,以待敌之可胜。不可胜在己,可胜在敌。故善战者,能为不可胜,不能使敌之必可胜。故曰:胜可知,而不可为。
265 2
【亿级数据专题】「分布式消息引擎」 盘点本年度我们探索服务的HA高可用解决方案
|
存储 Cloud Native NoSQL
分布式数据库新秀TIDB初探
背景随着社会数字化程度的加深,网络逐渐成为了社会的基础设施。随着互联网渗透程度的不断深入和互联网的进一步下沉,人们会在互联网上面 花费更多的时间,产生更多的数据。作为数据存储基石的DB面临着新的挑战和发展空间,由于数量的增长,之前的单机DB将面临越来越多的挑战,此时就出现数据库扩展的多种方案以满足海量数据的存储。目前主流的应对方案主要是分库分表,但是也存在着分布式事务,跨节点 join,扩容复杂等
993 0
分布式数据库新秀TIDB初探
|
3天前
|
存储 安全 数据管理
时序数据库TDengine 与中移软件达成兼容性互认证,推动虚拟化云平台与时序数据库的深度融合
在数字化转型和智能化升级的浪潮下,企业对数据的需求日益增长,尤其是在物联网、大数据和实时分析等领域。随着设备数量的激增,时序数据的管理和处理变得愈发复杂,企业亟需高效、稳定的数据解决方案来应对这一挑战。时序数据库作为专门处理时间序列数据的工具,正逐渐成为各行业数字化转型的重要支撑。
21 4
|
5月前
|
监控 NoSQL 大数据
【MongoDB复制集瓶颈】高频大数据写入引发的灾难,如何破局?
【8月更文挑战第24天】在MongoDB复制集中,主节点处理所有写请求,从节点通过复制保持数据一致性。但在大量高频数据插入场景中,会出现数据延迟增加、系统资源过度消耗、复制队列积压及从节点性能不足等问题,影响集群性能与稳定性。本文分析这些问题,并提出包括优化写入操作、调整写入关注级别、采用分片技术、提升从节点性能以及持续监控调优在内的解决方案,以确保MongoDB复制集高效稳定运行。
122 2
|
5月前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
898 0
|
5月前
|
SQL 存储 Cloud Native
揭秘TDengine:这个数据库如何以光速处理时间序列数据,颠覆你的世界观!
【8月更文挑战第7天】随着物联网的发展,数据生成呈爆炸式增长,催生了如TDengine这样的高性能时序数据库。TDengine采用优化的列式存储和标签索引,实现高速写入与高效压缩,减少存储空间的同时保持高性能。内置丰富的分析函数支持复杂的数据聚合操作,并通过数据复制保障高可靠性。其SQL接口易于使用,分布式架构便于扩展,且支持多种云环境部署,成为处理物联网、车联网等场景下时间序列数据的理想选择。
147 0
|
存储 监控 物联网
从实时数据库转战时序数据库,他陪伴 TDengine 从 1.0 走到 3.0
他与 TDengine 的六年故事,始于一个“无奈之举”。
269 1
|
存储 消息中间件 NoSQL
中原银行 OLAP 架构实时化演进
中原银行数据信息部杜威科,在 Flink Forward Asia 2022 行业案例专场的分享。
503 2
中原银行 OLAP 架构实时化演进
|
消息中间件 存储 Kafka
谈一谈Kafka在高性能和数据一致性之间做的妥协与改进
CAP定理是分布式系统的基本定理,描述了一致性、可用性和分区容错性三大特性,只能满足两种,开发者必须在此做出取舍。而 Kafka 作为一款高性能的消息队列与分布式存储系统,必然要在高性能和数据一致性之间做出取舍,本文在这方面做了一番探索。
|
存储 XML NoSQL
金融业务的数据存储选型
为什么用关系型数据库?最常见的理由是别人在用,所以我也得用,但是这个并不是理由,而是借口。
341 0