Canal监听MySQL Binarylog消费实践

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 在MySQL作为如今最为主流使用的数据库背景下,除了常规的数据存储使用场景,还存在大量的使用需求,如:数据自动同步,数据更新监听等场景。由于数据库层面的增量数据变动无法依靠应用服务层面进行有效感知,因此,还是需要从数据库自身提供的机制入手进行实现处理。下面为将展示关于如何借助Canal实践解决场景的几个业务场景问题

背景

在MySQL作为如今最为主流使用的数据库背景下,除了常规的数据存储使用场景,还存在大量的使用需求,如:数据自动同步,数据更新监听等场景。由于数据库层面的增量数据变动无法依靠应用服务层面进行有效感知,因此,还是需要从数据库自身提供的机制入手进行实现处理。下面为将展示关于如何借助Canal实践解决场景的几个业务场景问题。

Canal简述

Github开源地址:https://github.com/alibaba/canal
image.png

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)。
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)。
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据。

Canal工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议。
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )。
  • canal 解析 binary log 对象(原始为 byte 流)。

Canal服务范围

  • 当前Canal支持源端MYSQL的版本包括:5.1.x、5.5.x、5.6.x、5.7.x、8.0.x。
  • Canal直接支持的写入目标类型包括:MYSQL、Kafka、elasticsearch、Hbase、RocketMQ等。由于Datahub直接支持Kafka协议的写入,所以Canal服务也可以支持往Datahub中写入Binary Log数据。

Canal消费方式

Canal在伪装成为目标MySQL的一个Slave节点后,获取到来自主节点的BinaryLog日志内容。那么作为BinaryLog消费者该如何使用canal监听得到的内容呢。Canal为我们提供了两种类型的方式,直接消费和投递。直接消费即使用Canal配套提供的客户端程序,即时消费Canal的监听内容。投递是指配置指定的MQ类型以及对应信息,Canal将会按照BinaryLog的条目投递到指定的MQ下,再交由MQ为各种消费形式提供数据消费。

Canal客户端消费

Canal官方SDK提供地址:https://github.com/alibaba/canal/wiki/ClientExample

  • 消费使用代码摘要与简单说明:
// List<Entry>为一次消费包含的多条BinaryLog数据内容
private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            // 行变动数据存放对象,其中rowDatasList为更新前后数据内容
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }
        // 当前BinaryLog动作类型
        EventType eventType = rowChage.getEventType();
        
        // entry.getHeader()存放当前变动数据对应的Schema、Table信息
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

Canal投递MQ

Canal暂时提供的投递MQ类型包括:

投递配置

以投递到RocketMQ为样例提供Canal版本为1.1.4的配置样例(对比发现1.1.5开始配置内容有所改动,官网文档说明内容有所存疑,因为为大家提供可用1.1.4版本为配置样例参考),MQ使用大禹平台RocketMQ为样例。
  • canal.properties
# 改动部分一
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =${大禹账号AccessKey}
canal.aliyun.secretKey = ${大禹账号SecretKey}

#改动部分二
##################################################
#########              MQ              #############
##################################################
canal.mq.servers = ${大禹MQ访问地址}
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = ${大禹生产者Group}
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
  • instance.properties
# position info
canal.instance.master.address=${canal服务监听的MySQL地址}

canal.instance.dbUsername=${数据库访问用户名}
canal.instance.dbPassword=${数据库访问密码}

# table regex
canal.instance.filter.regex=${监听数据表匹配表达式}
# mq config
canal.mq.topic=${大禹MQ投递Topic名称}

投递内容数据结构展示

与客户端直接消费不同,canal投递上MQ的消息内容为文本内容,下面为大家展示MQ中一条BinaryLog的格式内容,直观地感受可以使用的相关信息。

  • 样例数据
{
    "data": [
        {
            "id": "4971874",
            "code": "45ffb86",
            "name": "组织1",
            "display_name": "组织1",
            "simple_name": "组织1",
            "parent_code": "da43c609",
            "type": "R",
            "sub_type": null,
            "comment": null,
            "level": "2",
            "region_id": null,
            "dutyuser_id": null,
            "ou_code": null,
            "cry": null,
            "line": null,
            "manager": null,
            "address": null,
            "create_by": null,
            "create_on": null,
            "last_modify_by": "20443",
            "last_modify_on": "2021-12-09 06:00:03"
        }
    ],
    "database": "test",
    "es": 1617179083000,
    "id": 7,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(20)",
        "code": "varchar(255)",
        "name": "varchar(255)",
        "display_name": "varchar(255)",
        "simple_name": "varchar(128)",
        "parent_code": "varchar(255)",
        "type": "varchar(32)",
        "sub_type": "varchar(32)",
        "comment": "varchar(2040)",
        "level": "int(11)",
        "region_id": "varchar(128)",
        "dutyuser_id": "varchar(32)",
        "ou_code": "varchar(32)",
        "cry": "varchar(32)",
        "line": "varchar(32)",
        "manager": "varchar(255)",
        "address": "varchar(255)",
        "create_by": "bigint(20)",
        "create_on": "datetime",
        "last_modify_by": "bigint(20)",
        "last_modify_on": "datetime"
    },
    "old": [
        {
            "last_modify_by": null
        }
    ],
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": -5,
        "code": 12,
        "name": 12,
        "display_name": 12,
        "simple_name": 12,
        "parent_code": 12,
        "type": 12,
        "sub_type": 12,
        "comment": 12,
        "level": 4,
        "region_id": 12,
        "dutyuser_id": 12,
        "ou_code": 12,
        "cry": 12,
        "line": 12,
        "manager": 12,
        "address": 12,
        "create_by": -5,
        "create_on": 93,
        "last_modify_by": -5,
        "last_modify_on": 93
    },
    "table": "test2",
    "ts": 1617179084459,
    "type": "UPDATE"
}

上述展示的内容为更新其中一条数据中的"last_modify_by"字段的内容。借助上面的BinaryLog样例,可以关注到一下内容:

  • 库表信息:字段"database","table"分别展示了数据库名称以及库表名称。
  • 数据表结构:mysqlType中囊括了当前BinaryLog中操作的数据表的具体字段结构,包括字段名称、字段类型;
  • 操作类型:字段"type"显示了该BinaryLog内容对应的操作类型。主要包括:INSERT、UPDATE、DELETE。
  • 更新前后的数据内容:"data"中可以看到组成为数组List,说明一条BinaryLog数据可能包括多条更新后的数据内容。同样地,从"old"字段中也可以看到更新前的数据内容,也同为数据List结构。

应用实践

数据增量同步

  • 实践背景

    在某项目交付过程中,客户希望解决一个数据同步问题,要求实时性较高,性能影响尽可能少地减少。同时,由于来源数据库为一个外部维护的数据库,且无法直接使用到新建设的业务系统中作为业务数据库。旧的解决方案进行每日同步的频次执行,在业务量较少的时间段进行一次大规模的数据查询再插入的动作。旧的方案会导致数据同步滞后时间为一天,且全量的查询动作对外部数据库产生较大的使用影响,遭到外部维护方的查询限制,同步执行时间也比较长。

  • 数据同步实时化

    为了使数据能够进行实时同步,决定使用Canal接入到外部数据库,然后把Canal监听的BinaryLog接入到新建设的MySQL库中,使得两边的数据库数据同步延迟仅有秒级差异。Canal的接入也使得每日执行的同步任务得以取消,减少了额外的系统维护工作。而且BinaryLog的监听推送对外部数据库性能来说影响较少。

  • 增量数据投递消费

    此外,Canal投递消费能力能够拓展数据增量改动的体现形式。Canal把感知到的数据库变动内容投递到指定的MQ Topic,为后续的消费途径提供多样性。如:Canal订阅指定数据表的变动数据投递到Datahub中,投递的内容就如上面的数据结构展示。允许借助Blink计算平台对数据进行感知整合,实现业务场景的下聚合统计等实时计算诉求;也能够开放Datahub的Topic订阅权限,把增量数据的变动开发到指定使用者,提供实时数据变动推送。

image.png

监听字段更新

  • 实践背景

    在大屏项目建设过程中,大量的指标数据维护同一指标表中是常见的处理手段。但指标数据的更新来源比较复杂,有数据开发同学进行写入,有外部系统进行推送。客户要求对大屏指标的实时性与有效性进行保证,但大量的指标更新情况无法有效监控。

  • 解决方案

    使用Canal对指定的指标表进行监听,对指标表的更新数据BinaryLog进行解析,然后以日志形式记录。针对每一条数据内容能够识别到具体的指标,把当前更新的数据信息记录到库表中。再按照对应的指标更新要求,感知更新日志表的数据库,就能够确保及时知道指标的更新频次是否符合预期,指标数据每次更新的数据内容,做到更新频次可监控,更新数据变动可追溯。

image.png

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
19天前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
115 4
|
19天前
|
API C# 开发框架
WPF与Web服务集成大揭秘:手把手教你调用RESTful API,客户端与服务器端优劣对比全解析!
【8月更文挑战第31天】在现代软件开发中,WPF 和 Web 服务各具特色。WPF 以其出色的界面展示能力受到欢迎,而 Web 服务则凭借跨平台和易维护性在互联网应用中占有一席之地。本文探讨了 WPF 如何通过 HttpClient 类调用 RESTful API,并展示了基于 ASP.NET Core 的 Web 服务如何实现同样的功能。通过对比分析,揭示了两者各自的优缺点:WPF 客户端直接处理数据,减轻服务器负担,但需处理网络异常;Web 服务则能利用服务器端功能如缓存和权限验证,但可能增加服务器负载。希望本文能帮助开发者根据具体需求选择合适的技术方案。
53 0
|
1月前
|
SQL 关系型数据库 MySQL
(二十五)MySQL主从实践篇:超详细版读写分离、双主热备架构搭建教学
在上篇《主从原理篇》中,基本上把主从复制原理、主从架构模式、数据同步方式、复制技术优化.....等各类细枝末节讲清楚了,本章则准备真正对聊到的几种主从模式落地实践,但实践的内容通常比较枯燥乏味,因为就是调整各种配置、设置各种参数等步骤。
202 2
|
26天前
|
存储 关系型数据库 MySQL
深入MySQL:事务日志redo log详解与实践
【8月更文挑战第24天】在MySQL的InnoDB存储引擎中,为确保事务的持久性和数据一致性,采用了redo log(重做日志)机制。redo log记录了所有数据修改,在系统崩溃后可通过它恢复未完成的事务。它由内存中的redo log buffer和磁盘上的redo log file组成。事务修改先写入buffer,再异步刷新至磁盘,最后提交事务。若系统崩溃,InnoDB通过redo log重放已提交事务并利用undo log回滚未提交事务,确保数据完整。理解redo log工作流程有助于优化数据库性能和确保数据安全。
106 0
|
1月前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
260 0
|
2月前
|
关系型数据库 MySQL Java
|
2月前
|
分布式计算 关系型数据库 MySQL
MySQL超时参数优化与DataX高效数据同步实践
通过合理设置MySQL的超时参数,可以有效地提升数据库的稳定性和性能。而DataX作为一种高效的数据同步工具,可以帮助企业轻松实现不同数据源之间的数据迁移。无论是优化MySQL参数还是使用DataX进行数据同步,都需要根据具体的应用场景来进行细致的配置和测试,以达到最佳效果。
|
3月前
|
canal 关系型数据库 MySQL
蓝易云 - 详解canal同步MySQL增量数据到ES
以上就是使用Canal同步MySQL增量数据到Elasticsearch的基本步骤。在实际操作中,可能还需要根据具体的业务需求和环境进行一些额外的配置和优化。
90 2
|
2月前
|
关系型数据库 MySQL 数据挖掘
MySQL 聚合函数案例解析:深入实践与应用
MySQL 聚合函数案例解析:深入实践与应用
|
2月前
|
SQL 关系型数据库 MySQL
MySQL DCL(数据控制语言)详解与实践
MySQL DCL(数据控制语言)详解与实践

热门文章

最新文章