数据同步神器-Canal

简介: Canal是阿里巴巴开源的MySQL增量日志解析工具,通过模拟MySQL主从复制机制,实时捕获数据库变更,实现数据同步至Kafka、Elasticsearch等系统,广泛应用于数据同步、监控、备份与迁移场景。

一、什么是Canal

官网的介绍(https://github.com/alibaba/canal):

Canal,译意为 “水道 / 管道 / 沟渠”,是阿里巴巴开源的一款基于 MySQL 数据库增量日志解析的工具。它的核心价值在于通过解析 MySQL 的二进制日志(binary log)捕获数据的增量变更,进而为用户提供可靠的增量数据订阅与消费能力,就像一条精准高效的 “数据管道”,让数据库的实时增量变化能够按需流转至下游系统进行处理。

简单来说,Canal 可理解为一款专注于 MySQL 增量数据同步的工具,它围绕 “增量日志解析” 这一核心技术,实现了增量数据从源数据库到消费端的精准传递,支撑各类依赖实时增量数据的业务场景。

二、Canal的工作原理

核心要点:Canal是借助MySQL的主从机制来工作的。

  • MySQL master 将数据变更写入二进制日志binary log,简称Binlog。
  • MySQL slave 将 master 的 binary log 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 操作,将变更数据同步到最新。

Canal的原理:

  • Canal 将自己伪装为 MySQL slave(从库),向 MySQL master (主库)发送dump 协议。
  • MySQL master (主库)收到 dump 请求,开始推送 binary log 给 canal
  • Canal 接收并解析 Binlog 日志,得到变更数据,再发送到存储目的地,比如MVSQL,Kafka,ElasticSearch等

三、Canal的运用场景

数据同步:Canal 可以帮助用户进行多种数据同步操作,如实时同步 MySQL 数据到 Elasticsearch、Redis 等数据存储介质

数据库实时监控:Canal 可以实时监控 MySQL 的更新操作,对于敏感数据的修改可以及时通知相关人员

数据分析和挖掘:Canal 可以将 MySQL 增量数据投递到 Kafka 等消息队列中,为数据分析和挖掘提供数据来源


数据备份:Canal 可以将 MYSQL 主库上的数据增量日志复制到备库上,实现数据库备份


数据库迁移:Canal 可以协助完成 MYSQL 数据库的版本升级及数据迁移任务

四、Canal的安装与使用

MySQL 配置

  • 确保 MySQL 版本 ≥ 5.6
  • 开启 binlog 功能,修改 MySQL 配置文件(my.cnf 或 my.ini):
[mysqld]
log_bin=mysql-bin       # 开启 binlog
binlog_format=ROW       # binlog 格式必须为 ROW
server_id=1
  • 重启 MySQL 使配置生效
  • 创建 Canal 专用账号并授权
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

JDK 环境

  • 安装 JDK 1.8 及以上版本
  • 配置 JAVA_HOME 环境变量

安装Canal Server

下载 Canal

解压安装

# 创建安装目录
mkdir -p /usr/local/canal
# 解压
tar -zxvf canal.deployer-$version.tar.gz -C /usr/local/canal

配置 Canal

  • 进入配置目录:cd /usr/local/canal/conf/example
  • 修改 instance.properties 文件:
# MySQL 服务器地址
canal.instance.master.address=127.0.0.1:3306
# MySQL 账号密码(之前创建的 canal 账号)
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 可选:指定需要同步的数据库和表(默认同步所有)
# canal.instance.filter.regex=.*\\..*  # 所有库所有表
# canal.instance.filter.regex=test\\..*  # 只同步 test 库

启动 Canal

# 进入 bin 目录
cd /usr/local/canal/bin
# 启动
./startup.sh
# 查看日志(确认启动成功)
tail -f ../logs/canal/canal.log
tail -f ../logs/example/example.log

查看 Canal 进程是否启动:

ps-ef|grep canal

Java 客户端示例

添加 Maven 依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>$canal-version</version>
</dependency>

简单客户端代码:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalClientExample {
    public static void main(String[] args) {
        // 连接 Canal Server
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress("127.0.0.1", 11111), 
            "example", "", "");
        try {
            connector.connect();
            // 订阅所有库表
            connector.subscribe(".*\\..*");
            while (true) {
                // 获取数据(1024 条)
                Message message = connector.getWithoutAck(1024);
                long batchId = message.getId();
                if (batchId == -1 || message.getEntries().isEmpty()) {
                    Thread.sleep(1000);
                    continue;
                }
                // 处理数据
                processEntries(message.getEntries());
                // 确认处理完成
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
    private static void processEntries(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            // 处理逻辑(如打印、同步到其他存储等)
            System.out.println("Entry: " + entry.toString());
        }
    }
}

注意事项

  1. Canal Server 默认端口为 11111,如需修改可编辑 conf/canal.properties 文件
  2. 生产环境建议配置集群模式,避免单点故障
  3. 定期清理 Canal 日志,防止磁盘空间占用过大
  4. 对于大数据量同步,建议调整 JVM 参数(bin/startup.sh 中的 JAVA_OPTS)

通过以上步骤,你可以完成 Canal 的基本安装和使用。根据实际需求,还可以配置更复杂的过滤规则和数据同步目标。


每日一道算法题

给定一个整数数组 nums,找出所有和为 0 且不重复的三元组 (nums[i], nums[j], nums[k]),其中 ijk 互不相同。要求返回所有满足条件的三元组,且结果中不能包含重复的三元组。

示例 1:
输入:nums = [-1, 0, 1, 2, -1, -4]
输出:[[-1, -1, 2], [-1, 0, 1]]

示例 2
输入:nums = [0, 1, 1]
输出:[]

示例 3
输入:nums = [0, 0, 0]
输出:[[0, 0, 0]]

方法思路

核心思想:排序 + 双指针法。

  1. 排序预处理:将数组排序,使得相同元素相邻,便于后续去重操作。
  2. 固定第一个元素:遍历数组,固定 nums[i] 作为第一个元素,将问题转化为在剩余元素中寻找两数之和等于 -nums[i]
  3. 双指针查找:使用左右指针 leftright 分别指向 i+1 和数组末尾,根据当前和的大小调整指针位置:
  • 若和大于目标值,右指针左移(减少和)。
  • 若和小于目标值,左指针右移(增大和)。
  • 若和等于目标值,记录三元组并移动指针跳过重复元素。
  1. 去重逻辑
  • 跳过重复的 nums[i],避免重复三元组。
  • 跳过重复的 nums[left]nums[right],确保每个三元组唯一。
from typing import List
class Solution:
    def threeSum(self, nums: List[int]) -> List[List[int]]:
        nums.sort()  # 排序数组,便于去重和双指针操作
        n = len(nums)
        res = []
        for i in range(n):
            # 跳过重复的nums[i]
            if i > 0 and nums[i] == nums[i-1]:
                continue
            # 若当前数大于0,后续不可能有解
            if nums[i] > 0:
                break
            target = -nums[i]
            left, right = i + 1, n - 1
            while left < right:
                current_sum = nums[left] + nums[right]
                if current_sum == target:
                    res.append([nums[i], nums[left], nums[right]])
                    # 跳过重复的nums[left]
                    while left < right and nums[left] == nums[left + 1]:
                        left += 1
                    # 跳过重复的nums[right]
                    while left < right and nums[right] == nums[right - 1]:
                        right -= 1
                    left += 1
                    right -= 1
                elif current_sum < target:
                    left += 1
                else:
                    right -= 1
        return res

复杂度分析

  • 时间复杂度:O (n²),其中 n 是数组长度。排序的时间复杂度为 O (n log n),外层循环 O (n),内层双指针遍历 O (n),总体复杂度由双指针主导,为 O (n²)。
  • 空间复杂度:O (log n) 或 O (n),取决于排序的实现。若使用原地排序,空间复杂度为 O (log n)(递归栈空间);若复制数组排序,则为 O (n)。

关键细节解析

  1. 排序的作用
  • 相同元素相邻,便于跳过重复值。
  • 双指针移动时,和的变化方向明确(左指针右移增大和,右指针左移减小和)。
  1. 去重逻辑
  • 固定元素去重

:若 nums[i] 与前一个元素相同,直接跳过,避免重复处理相同的起始点。

  • 左指针去重

:找到解后,若 nums[left] 与下一个元素相同,右移指针直到不同。

  • 右指针去重

:同理,若 nums[right] 与前一个元素相同,左移指针直到不同。

  1. 边界优化
  • nums[i] > 0,直接结束循环,因为后续元素均大于 0,无法构成和为 0 的三元组。
  • leftright 相遇时,退出内层循环,避免无效遍历。
  1. 测试用例处理
  • 全零数组

:直接返回 [[0, 0, 0]]

  • 无有效解

:如 [0, 1, 1],遍历后无符合条件的三元组。

  • 重复元素

:如 [-1, -1, 2],通过去重逻辑确保只记录一次。

相关文章
|
canal 关系型数据库 MySQL
cancal 同步mysql数据到es中
cancal 同步mysql数据到es中
460 1
|
2月前
|
安全 NoSQL Java
SpringBoot接口安全:限流、重放攻击、签名机制分析
本文介绍如何在Spring Boot中实现API安全机制,涵盖签名验证、防重放攻击和限流三大核心。通过自定义注解与拦截器,结合Redis,构建轻量级、可扩展的安全防护方案,适用于B2B接口与系统集成。
486 3
|
2月前
|
人工智能 安全 中间件
阿里云 AI 中间件重磅发布,打通 AI 应用落地“最后一公里”
9 月 26 日,2025 云栖大会 AI 中间件:AI 时代的中间件技术演进与创新实践论坛上,阿里云智能集团资深技术专家林清山发表主题演讲《未来已来:下一代 AI 中间件重磅发布,解锁 AI 应用架构新范式》,重磅发布阿里云 AI 中间件,提供面向分布式多 Agent 架构的基座,包括:AgentScope-Java(兼容 Spring AI Alibaba 生态),AI MQ(基于Apache RocketMQ 的 AI 能力升级),AI 网关 Higress,AI 注册与配置中心 Nacos,以及覆盖模型与算力的 AI 可观测体系。
745 39
|
2月前
|
存储 分布式计算 数据库
数据湖技术选型指南:Iceberg vs Delta Lake vs Paimon
对比当前最主流的三种开源湖格式:Iceberg、Delta Lake 和 Paimon,深入分析它们的差异,帮助大家更好地进行技术选型。
648 4
|
1月前
|
人工智能 运维 Java
Spring AI Alibaba Admin 开源!以数据为中心的 Agent 开发平台
Spring AI Alibaba Admin 正式发布!一站式实现 Prompt 管理、动态热更新、评测集构建、自动化评估与全链路可观测,助力企业高效构建可信赖的 AI Agent 应用。开源共建,现已上线!
2808 42
|
canal 缓存 关系型数据库
Spring Boot整合canal实现数据一致性解决方案解析-部署+实战
Spring Boot整合canal实现数据一致性解决方案解析-部署+实战
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
4140 3
Flink CDC:新一代实时数据集成框架
|
关系型数据库 API Apache
Flink CDC:基于 Apache Flink 的流式数据集成框架
本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
20733 11
Flink CDC:基于 Apache Flink 的流式数据集成框架
|
canal 监控 Java
Canal 结合spring boot项目开发
Canal 结合spring boot项目开发
433 3

热门文章

最新文章