【源码】canal和otter的高可靠性分析

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介:

一般来说,我们对于数据库最主要的要求就是:数据不丢。不管是主从复制,还是使用类似otter+canal这样的数据库同步方案,我们最基本的需求是,在数据不丢失的前提下,尽可能的保证系统的高可用,也就是在某个节点挂掉,或者数据库发生主从切换等情况下,我们的数据同步系统依然能够发挥它的作用--数据同步。本文讨论的场景是数据库发生主从切换,本文将从源码的角度,来看看otter和canal是如何保证高可用和高可靠的。

一、EventParser

通过阅读文档和源码,我们可以知道,对于一个canal server,基础的框架包括以下几个部分:MetaManager、EventParser、EventSink和EventStore。其中EventParser的作用就是发送dump命令,从mysql数据库获取binlog文件。发送dump命令,可以指定时间戳或者position,从指定的时间或者位置开始dump。我们来看看过程:

首先是CanalServer启动。otter默认使用的是内置版的canal server,所以我们主要看CanalServerWithEmbedded这个类。来看下他的启动过程:

    public void start(final String destination) {
        final CanalInstance canalInstance = canalInstances.get(destination);
        if (!canalInstance.isStart()) {
            try {
                MDC.put("destination", destination);
                canalInstance.start();//启动实例
                logger.info("start CanalInstances[{}] successfully", destination);
            } finally {
                MDC.remove("destination");
            }
        }
    }

我们看下实例启动那一行,跟到AbstractCanalInstance类中

    public void start() {
        super.start();
        if (!metaManager.isStart()) {
            metaManager.start();//源数据管理启动
        }

        if (!alarmHandler.isStart()) {
            alarmHandler.start();//报警处理器启动
        }

        if (!eventStore.isStart()) {
            eventStore.start();//数据存储器启动
        }

        if (!eventSink.isStart()) {
            eventSink.start();//数据过滤器启动
        }

        if (!eventParser.isStart()) {//数据解析器启动
            beforeStartEventParser(eventParser);
            eventParser.start();
            afterStartEventParser(eventParser);
        }
        logger.info("start successful....");
    }

我们主要看下eventParser.start()方法里面的内容。我们主要关注的是EventParser使如何在主从切换的条件下,进行dump节点的确定的。我们跟踪到AbstractEventParser类中的start()方法,重点看下

// 4. 获取最后的位置信息
EntryPosition position = findStartPosition(erosaConnection);

这块有两个实现,但是canal目前使用的是MysqlEventParser,也就是基于Mysql的Binlog文件来进行数据同步。我们看下代码:

protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
    EntryPosition startPosition = findStartPositionInternal(connection);
    if (needTransactionPosition.get()) {
        logger.warn("prepare to find last position : {}", startPosition.toString());
        Long preTransactionStartPosition = findTransactionBeginPosition(connection, startPosition);
        if (!preTransactionStartPosition.equals(startPosition.getPosition())) {
            logger.warn("find new start Transaction Position , old : {} , new : {}",
                    startPosition.getPosition(),
                    preTransactionStartPosition);
            startPosition.setPosition(preTransactionStartPosition);
        }
        needTransactionPosition.compareAndSet(true, false);
    }
    return startPosition;
}

对于第一行findStartPositionInternal(connection),我们重点关注的情况是数据库连接地址发生变化,也就是进行了主从切换的情况。

boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
                && logPosition.getPostion().getServerId() != null
                && !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
if (case2) {
    long timestamp = logPosition.getPostion().getTimestamp();
    long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
    logger.warn("prepare to find start position by last position {}:{}:{}", new Object[]{"", "",
                logPosition.getPostion().getTimestamp()});
    EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp);
    // 重新置为一下
    dumpErrorCount = 0;
    return findPosition;
}

我们分析下case2这个条件,其实就是表示的就是配置了主从切换,而且发生了serverId变化的情况,在这种情况下,首先需要获取到事件发生的时间戳,然后将这个事件发生的时间减去60s,也就是向前推一分钟之后,在新的binlog文件中根据新的时间戳来找到当时对应的事件。

这块根据时间戳来寻找事件的过程比较简单,首先根据binglog-index文件找到所有的binlog文件名,然后遍历binlog文件的头,找到binlog文件的写入时间,与新的时间戳进行对比,定位到binlog文件。定位到文件后,直接根据时间戳来进行遍历,找到新的时间戳之前发生的那个事务起始位置。

/**
 * 根据给定的时间戳,在指定的binlog中找到最接近于该时间戳(必须是小于时间戳)的一个事务起始位置。
 * 针对最后一个binlog会给定endPosition,避免无尽的查询
 */
private EntryPosition findAsPerTimestampInSpecificLogFile(MysqlConnection mysqlConnection,
                                                              final Long startTimestamp,
                                                              final EntryPosition endPosition,
                                                              final String searchBinlogFile) {

    final LogPosition logPosition = new LogPosition();
    try {
        mysqlConnection.reconnect();
        // 开始遍历文件
        mysqlConnection.seek(searchBinlogFile, 4L, new SinkFunction<LogEvent>() {

            private LogPosition lastPosition;

            public boolean sink(LogEvent event) {
                EntryPosition entryPosition = null;
                try {
                    CanalEntry.Entry entry = parseAndProfilingIfNecessary(event);
                    if (entry == null) {
                        return true;
                    }

                    String logfilename = entry.getHeader().getLogfileName();
                    Long logfileoffset = entry.getHeader().getLogfileOffset();
                    Long logposTimestamp = entry.getHeader().getExecuteTime();

                    if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())
                            || CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
                        logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[]{
                                logfilename, logfileoffset, logposTimestamp, startTimestamp});
                        // 事务头和尾寻找第一条记录时间戳,如果最小的一条记录都不满足条件,可直接退出
                        if (logposTimestamp >= startTimestamp) {
                            return false;
                        }
                    }

                    if (StringUtils.equals(endPosition.getJournalName(), logfilename)
                            && endPosition.getPosition() <= (logfileoffset + event.getEventLen())) {
                        return false;
                    }

                    // 记录一下上一个事务结束的位置,即下一个事务的position
                    // position = current +
                    // data.length,代表该事务的下一条offest,避免多余的事务重复
                    if (CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
                        entryPosition = new EntryPosition(logfilename,
                                logfileoffset + event.getEventLen(),
                                logposTimestamp);
                        logger.debug("set {} to be pending start position before finding another proper one...",
                                entryPosition);
                        logPosition.setPostion(entryPosition);
                    } else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
                        // 当前事务开始位点
                        entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp);
                        logger.debug("set {} to be pending start position before finding another proper one...",
                                entryPosition);
                        logPosition.setPostion(entryPosition);
                    }

                    lastPosition = buildLastPosition(entry);
                } catch (Throwable e) {
                    processSinkError(e, lastPosition, searchBinlogFile, 4L);
                }

                return running;
            }
        });

    } catch (IOException e) {
        logger.error("ERROR ## findAsPerTimestampInSpecificLogFile has an error", e);
    }

    if (logPosition.getPostion() != null) {
        return logPosition.getPostion();
    } else {
        return null;
    }
}

这块的逻辑如下:

  • 发送dump命令,起始位置为4L,也就是跳过了binlog的第一个标志事件。
  • canal收到binlog,开始进行对binlog文件进行解析。
  • 主要我们看的是事务开始和事务提交的事件,判断事务开始或结束的时间,是否小于我们要找的时间戳,如果大于等于,直接遍历下一个事件。
  • 传入了一个endPosition,防止无限扫描。
  • 虽说是从头开始扫描的,但是要想跳出遍历,需要满足一定的条件。在跳出遍历之前,最后一次设置的logPosition才是我们要招的logPosition。
  • 如果是一个事务提交的事件,我们要找的position就是这个事件的position+event.length。如果是事务开始,position就是当前事件的position。其他的事件都忽略。

至此,我们已经找到了我们想要的binlog文件名和对应的事务开始position,我们继续下面的步骤即可。

二、EventStore

这块内容的主要思想如下:

  • 维护一个类似于Disruptor的RingBuffer,同时维护三个序列,put/get/ack。
  • EventSink之后的数据,调用put接口,将数据放入环形队列中。
  • Canal client获取数据,调用get方法。
  • 异步调用ack方法,清除ack之前的数据。
  • 值得注意的是,这块get和ack采用了流式API的模式,get和ack异步进行,可以先get,然后异步调用ack。
  • ack是有序的,不允许跳跃式的提交。

三、Binlog的Row模式

至此,我们基本上知道了canal是如何在发生数据库主从切换时保证高可用和高可靠的,我们可能还有疑惑:为什么要回退60s,来解析binlog,这样不会导致数据重复吗?还有一些自增的update语句(不具备幂等性),不会产生数据错误吗?要想回答这些问题,就需要我们了解Binlog的Row模式了。

Mysql Binlog的Row模式记录的,是数据库中每一行的数据变化,而不仅仅是sql语句。比如我们对数据库中的多行,使用一条sql语句进行了修改。在这种情况下,如果Binlog模式为Statement,只会记录一条sql语句。而Row模式下,会对每一行的数据变化进行记录,以及变化前后每个字段的值。这也就是为什么Row模式的binlog文件如此之大的原因。

对于一些不具备幂等性的sql语句,采用Row语句进行Binlog解析时,也是可以通过重复执行,来保证我们数据的最终一致性的。这也就解释了,为什么要回退60s来进行Binlog位点定位、解析的问题。考虑到Mysql主从的数据复制的延迟性(60s,一般来说的延迟没有这么久),我们可以在主节点挂掉的情况下,回退60s到从节点上继续进行binlog的解析。

当然,也需要考虑一些极端的情况,也就是主从复制确实超过了60s的延迟,在这种情况下,就需要otter登场了。基本思路是:反查数据库同步 (以数据库最新版本同步,解决交替性,比如设置一致性反查数据库延迟阀值为60秒,即当同步过程中发现数据延迟超过了60秒,就会基于PK反查一次数据库,拿到当前最新值进行同步,减少交替性的问题)。

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
4月前
|
数据安全/隐私保护 Windows
PDF24 Tools离线版下载,PDF编辑阅读工具
PDF24 Tools是一款免费且易于使用的在线PDF工具合集,支持网页版和Windows客户端。它包含近50个PDF处理工具,如PDF创建、合并、压缩、加密、解密、编辑、提取、转换等,所有功能均可离线使用,无需登录,操作简便。软件界面直观,功能丰富,适合各类PDF文件处理需求,是办公学习的实用工具。下载即用,无限制,完全免费,广受用户好评。
987 6
|
人工智能 自然语言处理 搜索推荐
阿里云百炼产品月刊【2025年2月】
本期⽉刊主要亮点包括推出全新多模态理解生成大模型通义千问Omni系列,支持文本、图像、语音和视频输入,提供流式输出和四种自然对话音色,新增高性价比图生视频模型wanx2.1-i2v-turbo,生成速度快,耗时仅为旧模型的三分之一。此外,qwen-plus采购季资源包上线,享受8.6折优惠;qwen-max模型降价88%,极大降低使用门槛。智能体应用和工作流应用现支持DeepSeek系列模型,增强私有知识库问答和任务型、对话型工作流构建能力。文件交互和批量节点功能进一步提升应用灵活性和实用性。本月还推出了AI实训营和应用开发实训营,提供手把手AI课程和企业级多模态应用构建指导。
1116 0
|
4月前
|
数据可视化 关系型数据库 数据处理
手把手教你选对ETL工具:从理解到实战的完整指南
在数字化时代,ETL(抽取、转换、加载)是数据处理的关键环节。本文详解ETL概念、作用及工具选型技巧,助你高效管理企业数据。
|
数据采集 数据挖掘 大数据
数据处理利器:使用Pandas进行数据清洗与转换
【4月更文挑战第12天】在大数据时代,Pandas是Python数据分析的关键工具,提供高效的数据清洗和转换功能。本文介绍了如何使用Pandas处理缺失值(删除或填充)、异常值(Z-Score法和IQR法)以及重复值(检测和删除)。此外,还涵盖了数据转换,包括数据类型转换、数据标准化(Min-Max和Z-Score)以及类别数据的one-hot编码。通过学习这些方法,可以为数据分析和挖掘奠定坚实基础。
405 0
|
安全 测试技术 网络安全
网络安全中的渗透测试与风险评估:技术深度解析
【7月更文挑战第3天】在网络安全领域,渗透测试和风险评估是两种不可或缺的技术手段。通过模拟黑客的攻击手段来发现系统中的安全漏洞,以及通过系统性的方法来识别和评估潜在的风险和威胁,两者共同为组织提供了全面的网络安全保障。随着技术的不断发展和网络环境的日益复杂,渗透测试和风险评估的重要性将日益凸显。因此,网络安全从业者应不断学习和掌握这两种技术,以应对日益严峻的网络安全挑战。
|
前端开发 UED 开发者
React组件优化全攻略:深度解析让你的前端应用飞速运行的秘诀——从PureComponent到React.memo的彻底性能比较
【8月更文挑战第31天】在构建现代Web应用时,性能是提升用户体验的关键因素。React作为主流前端库,其组件优化尤为重要。本文深入探讨了React组件优化策略,包括使用`PureComponent`、`React.memo`及避免不必要的渲染等方法,帮助开发者显著提升应用性能。通过实践案例对比优化前后效果,不仅提高了页面渲染速度,还增强了用户体验。优化React组件是每个开发者必须关注的重点。
234 0
|
存储 Windows
下载Windows ISO镜像的方法
一、镜像介绍 1、大概介绍 .iso 是电脑上镜像的存储格式之一,所以通常在电脑中以后缀.iso命名,俗称iso镜像文件。 2、详细介绍 ISO镜像文件_百度百科 二、下载Windows 11镜像 1、Windows 11 官方下载网址 https://www.microsoft.com/zh-cn/software-download/windows11 2、步骤 点击官网地址,进入windows11官网,找到下载 Windows 11 磁盘映像 (ISO)
5564 0
|
Java 网络安全 Maven
要在云效中使用JDK 21进行打包
【2月更文挑战第18天】要在云效中使用JDK 21进行打包
513 4
|
SQL 缓存 运维
使用篇丨链路追踪(Tracing)很简单:链路拓扑
使用篇丨链路追踪(Tracing)很简单:链路拓扑
31857 98
|
安全 Java API
验证码短信 API 接入指南:Java 语言示例代码
验证码短信 API 接入指南:Java 语言示例代码
9647 0