下面是一个Canal订阅阿里云RDS(MySQL)的案例。
- 安装Canal
首先需要安装Canal,可以通过官网下载最新的Canal版本,然后解压到任意目录即可。
- 配置Canal
Canal的配置分为两个部分,一个是Canal Server的配置,一个是Canal Client(即Canal Adapter)的配置。下面我们来分别配置这两个部分。
(1)Canal Server的配置
进入Canal目录,找到conf目录下的example目录,复制一份example目录,重命名为自己的配置文件名,例如:alibaba.properties。
然后打开该配置文件,修改以下参数:
canal.instance.master.address = {阿里云RDS的主库地址}
canal.instance.master.journal.name = mysql-bin.000001
canal.instance.master.position = 4
canal.instance.dbUsername = {阿里云RDS的用户名}
canal.instance.dbPassword = {阿里云RDS的密码}
canal.instance.connectionCharset = UTF-8
其中,canal.instance.master.address是阿里云RDS的主库地址,canal.instance.master.journal.name是binlog的名称,canal.instance.master.position是binlog的偏移量。可以通过执行SHOW MASTER STATUS命令获取到这些信息。
(2)Canal Client的配置
Canal Client是指Canal Adapter,它负责从Canal Server中拉取binlog数据,并将其转换为目标数据源(例如Kafka、RocketMQ等)的格式。下面我们以Kafka为例,来配置Canal Client。
进入Canal目录,找到conf目录下的example目录,复制一份example目录,重命名为自己的配置文件名,例如:kafka.properties。
然后打开该配置文件,修改以下参数:
canal.instance.filter.regex = .\..
canal.instance.kafka.bootstrap.servers = {Kafka的地址}
canal.instance.kafka.topic = {Kafka的Topic}
canal.instance.kafka.producer.batch.size = 16384
canal.instance.kafka.producer.linger.ms = 10
canal.instance.kafka.producer.buffer.memory = 33554432
canal.instance.kafka.producer.retries = 3
canal.instance.kafka.producer.key.serializer = org.apache.kafka.common.serialization.StringSerializer
canal.instance.kafka.producer.value.serializer = org.apache.kafka.common.serialization.StringSerializer
其中,canal.instance.kafka.bootstrap.servers是Kafka的地址,canal.instance.kafka.topic是Kafka的Topic。
- 启动Canal
配置完成后,进入Canal目录,执行如下命令启动Canal:
./bin/startup.sh
- 验证数据
Canal启动后,就可以在Kafka的Topic中看到订阅到的数据了。如果没有看到数据,可以检查Canal和Kafka的日志,看是否有报错信息。