一、什么是Canal
官网的介绍(https://github.com/alibaba/canal):
Canal,译意为 “水道 / 管道 / 沟渠”,是阿里巴巴开源的一款基于 MySQL 数据库增量日志解析的工具。它的核心价值在于通过解析 MySQL 的二进制日志(binary log)捕获数据的增量变更,进而为用户提供可靠的增量数据订阅与消费能力,就像一条精准高效的 “数据管道”,让数据库的实时增量变化能够按需流转至下游系统进行处理。
简单来说,Canal 可理解为一款专注于 MySQL 增量数据同步的工具,它围绕 “增量日志解析” 这一核心技术,实现了增量数据从源数据库到消费端的精准传递,支撑各类依赖实时增量数据的业务场景。
二、Canal的工作原理
核心要点:Canal是借助MySQL的主从机制来工作的。
- MySQL master 将数据变更写入二进制日志binary log,简称Binlog。
- MySQL slave 将 master 的 binary log 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 操作,将变更数据同步到最新。
Canal的原理:
- Canal 将自己伪装为 MySQL slave(从库),向 MySQL master (主库)发送dump 协议。
- MySQL master (主库)收到 dump 请求,开始推送 binary log 给 canal
- Canal 接收并解析 Binlog 日志,得到变更数据,再发送到存储目的地,比如MVSQL,Kafka,ElasticSearch等
三、Canal的运用场景
数据同步:Canal 可以帮助用户进行多种数据同步操作,如实时同步 MySQL 数据到 Elasticsearch、Redis 等数据存储介质
数据库实时监控:Canal 可以实时监控 MySQL 的更新操作,对于敏感数据的修改可以及时通知相关人员
数据分析和挖掘:Canal 可以将 MySQL 增量数据投递到 Kafka 等消息队列中,为数据分析和挖掘提供数据来源
数据备份:Canal 可以将 MYSQL 主库上的数据增量日志复制到备库上,实现数据库备份
数据库迁移:Canal 可以协助完成 MYSQL 数据库的版本升级及数据迁移任务
四、Canal的安装与使用
MySQL 配置
- 确保 MySQL 版本 ≥ 5.6
- 开启 binlog 功能,修改 MySQL 配置文件(my.cnf 或 my.ini):
[mysqld] log_bin=mysql-bin # 开启 binlog binlog_format=ROW # binlog 格式必须为 ROW server_id=1
- 重启 MySQL 使配置生效
- 创建 Canal 专用账号并授权
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
JDK 环境
- 安装 JDK 1.8 及以上版本
- 配置 JAVA_HOME 环境变量
安装Canal Server
下载 Canal
- 官方下载地址:https://github.com/alibaba/canal/releases
- 选择合适的版本(如 canal.deployer-$version.tar.gz)
解压安装
# 创建安装目录 mkdir -p /usr/local/canal # 解压 tar -zxvf canal.deployer-$version.tar.gz -C /usr/local/canal
配置 Canal
- 进入配置目录:
cd /usr/local/canal/conf/example - 修改 instance.properties 文件:
# MySQL 服务器地址 canal.instance.master.address=127.0.0.1:3306 # MySQL 账号密码(之前创建的 canal 账号) canal.instance.dbUsername=canal canal.instance.dbPassword=canal # 可选:指定需要同步的数据库和表(默认同步所有) # canal.instance.filter.regex=.*\\..* # 所有库所有表 # canal.instance.filter.regex=test\\..* # 只同步 test 库
启动 Canal
# 进入 bin 目录 cd /usr/local/canal/bin # 启动 ./startup.sh # 查看日志(确认启动成功) tail -f ../logs/canal/canal.log tail -f ../logs/example/example.log
查看 Canal 进程是否启动:
ps-ef|grep canal
Java 客户端示例
添加 Maven 依赖:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>$canal-version</version> </dependency>
简单客户端代码:
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class CanalClientExample { public static void main(String[] args) { // 连接 Canal Server CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "", ""); try { connector.connect(); // 订阅所有库表 connector.subscribe(".*\\..*"); while (true) { // 获取数据(1024 条) Message message = connector.getWithoutAck(1024); long batchId = message.getId(); if (batchId == -1 || message.getEntries().isEmpty()) { Thread.sleep(1000); continue; } // 处理数据 processEntries(message.getEntries()); // 确认处理完成 connector.ack(batchId); } } catch (Exception e) { e.printStackTrace(); } finally { connector.disconnect(); } } private static void processEntries(List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries) { // 处理逻辑(如打印、同步到其他存储等) System.out.println("Entry: " + entry.toString()); } } }
注意事项
- Canal Server 默认端口为 11111,如需修改可编辑
conf/canal.properties文件 - 生产环境建议配置集群模式,避免单点故障
- 定期清理 Canal 日志,防止磁盘空间占用过大
- 对于大数据量同步,建议调整 JVM 参数(
bin/startup.sh中的 JAVA_OPTS)
通过以上步骤,你可以完成 Canal 的基本安装和使用。根据实际需求,还可以配置更复杂的过滤规则和数据同步目标。
每日一道算法题
给定一个整数数组 nums,找出所有和为 0 且不重复的三元组 (nums[i], nums[j], nums[k]),其中 i、j、k 互不相同。要求返回所有满足条件的三元组,且结果中不能包含重复的三元组。
示例 1:
输入:nums = [-1, 0, 1, 2, -1, -4]
输出:[[-1, -1, 2], [-1, 0, 1]]
示例 2:
输入:nums = [0, 1, 1]
输出:[]
示例 3:
输入:nums = [0, 0, 0]
输出:[[0, 0, 0]]
方法思路
核心思想:排序 + 双指针法。
- 排序预处理:将数组排序,使得相同元素相邻,便于后续去重操作。
- 固定第一个元素:遍历数组,固定
nums[i]作为第一个元素,将问题转化为在剩余元素中寻找两数之和等于-nums[i]。 - 双指针查找:使用左右指针
left和right分别指向i+1和数组末尾,根据当前和的大小调整指针位置:
- 若和大于目标值,右指针左移(减少和)。
- 若和小于目标值,左指针右移(增大和)。
- 若和等于目标值,记录三元组并移动指针跳过重复元素。
- 去重逻辑
- 跳过重复的
nums[i],避免重复三元组。 - 跳过重复的
nums[left]和nums[right],确保每个三元组唯一。
from typing import List class Solution: def threeSum(self, nums: List[int]) -> List[List[int]]: nums.sort() # 排序数组,便于去重和双指针操作 n = len(nums) res = [] for i in range(n): # 跳过重复的nums[i] if i > 0 and nums[i] == nums[i-1]: continue # 若当前数大于0,后续不可能有解 if nums[i] > 0: break target = -nums[i] left, right = i + 1, n - 1 while left < right: current_sum = nums[left] + nums[right] if current_sum == target: res.append([nums[i], nums[left], nums[right]]) # 跳过重复的nums[left] while left < right and nums[left] == nums[left + 1]: left += 1 # 跳过重复的nums[right] while left < right and nums[right] == nums[right - 1]: right -= 1 left += 1 right -= 1 elif current_sum < target: left += 1 else: right -= 1 return res
复杂度分析
- 时间复杂度:O (n²),其中
n是数组长度。排序的时间复杂度为 O (n log n),外层循环 O (n),内层双指针遍历 O (n),总体复杂度由双指针主导,为 O (n²)。 - 空间复杂度:O (log n) 或 O (n),取决于排序的实现。若使用原地排序,空间复杂度为 O (log n)(递归栈空间);若复制数组排序,则为 O (n)。
关键细节解析
- 排序的作用:
- 相同元素相邻,便于跳过重复值。
- 双指针移动时,和的变化方向明确(左指针右移增大和,右指针左移减小和)。
- 去重逻辑:
- 固定元素去重
:若 nums[i] 与前一个元素相同,直接跳过,避免重复处理相同的起始点。
- 左指针去重
:找到解后,若 nums[left] 与下一个元素相同,右移指针直到不同。
- 右指针去重
:同理,若 nums[right] 与前一个元素相同,左移指针直到不同。
- 边界优化:
- 若
nums[i] > 0,直接结束循环,因为后续元素均大于 0,无法构成和为 0 的三元组。 - 当
left和right相遇时,退出内层循环,避免无效遍历。
- 测试用例处理:
- 全零数组
:直接返回 [[0, 0, 0]]。
- 无有效解
:如 [0, 1, 1],遍历后无符合条件的三元组。
- 重复元素
:如 [-1, -1, 2],通过去重逻辑确保只记录一次。