使用docker部署canal

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 使用docker部署canal

关于canal

mysql开启binlog

这里使用的是 mysql 5.7.32server-id可以自定义,和后面配置的canal里面的server-id要不一样

# binlog
log-bin=mysql-bin
binlog_format=ROW
server-id=33081

修改完成后,需要重启mysql服务

show variables like 'log_bin';

返回 ON 表示 binlog 启动成功

+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+

mysql创建canal用户

grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by "canal";
flush privileges;

启动canal容器

配置canal

mkdir -p /data/canal/conf

配置可以从容器内获取

容器内的路径/home/admin/canal-server/conf/example/instance.properties

vim /data/canal/conf/instance.properties
  • canal.instance.master.address 需要监听的mysql主机ip:端口
  • canal.instance.dbUsername canal订阅binlog使用的用户
  • canal.instance.dbPassword canal订阅binlog使用的用户密码
  • canal.instance.connectionCharset = UTF-8 canal订阅的字符集
  • canal.instance.filter.regex需要监听的mysql库和表
  • 全库: .*\\..*
  • 指定库下的所有表: canal\\..*
  • 指定库下的指定表:canal\\.canal,test\\.test
  • 库名\\.表明 :转义需要用\\,使用逗号分隔多个库
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=192.168.1.200:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
# canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=backendorder\\.t_kry_takeout_.*,backendorder\\.t_kry_product_.*,backendshare\\.t_backend_keruyun_bom_.*,backend\\.t_wx_.*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

启动canal容器

docker run -d \
-v /data/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-p 11111:11111 \
--name canal \
canal/canal-server:v1.1.5

相关的canal参数,可以参考[canal配置文件参数解释](

查看docker容器日志

docker logs <容器id>
DOCKER_DEPLOY_TYPE=VM
==> INIT /alidata/init/02init-sshd.sh
==> EXIT CODE: 0
==> INIT /alidata/init/fix-hosts.py
==> EXIT CODE: 0
==> INIT DEFAULT
Generating SSH1 RSA host key: [  OK  ]
Starting sshd: [  OK  ]
Starting crond: [  OK  ]
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
start canal successful
==> START SUCCESSFUL ...

canal-client 验证

pip3 install canal-python
pip3 install protobuf
vim canal_client.py

如果不是全库,需要对指定库同步的话,需要将filter=b'.*\\..*'里面的.*\\..*改成canal.instance.filter.regex参数的值,否则启动客户端就会一直监听全库

import time
from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2
client = Client()
client.connect(host='127.0.0.1', port=11111)
client.check_valid(username=b'', password=b'')
client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*')
while True:
    message = client.get(100)
    entries = message['entries']
    for entry in entries:
        entry_type = entry.entryType
        if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
            continue
        row_change = EntryProtocol_pb2.RowChange()
        row_change.MergeFromString(entry.storeValue)
        event_type = row_change.eventType
        header = entry.header
        database = header.schemaName
        table = header.tableName
        event_type = header.eventType
        for row in row_change.rowDatas:
            format_data = dict()
            if event_type == EntryProtocol_pb2.EventType.DELETE:
                for column in row.beforeColumns:
                    format_data = {
                        column.name: column.value
                    }
            elif event_type == EntryProtocol_pb2.EventType.INSERT:
                for column in row.afterColumns:
                    format_data = {
                        column.name: column.value
                    }
            else:
                format_data['before'] = format_data['after'] = dict()
                for column in row.beforeColumns:
                    format_data['before'][column.name] = column.value
                for column in row.afterColumns:
                    format_data['after'][column.name] = column.value
            data = dict(
                db=database,
                table=table,
                event_type=event_type,
                data=format_data,
            )
            print(data)
    time.sleep(1)
client.disconnect()
python3 canal_client.py

只有在数据库进行增删改操作才会有输出,查询操作不会改变binlog日志,毕竟这是一款增量同步数据库的工具

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
3天前
|
测试技术 Linux 网络安全
【Docker项目实战】使用Docker部署RSS阅读器yarr
【6月更文挑战第22天】使用Docker部署RSS阅读器yarr
14 3
|
3天前
|
Docker 容器
如何使用Docker部署WPS Office服务并实现无公网IP远程处理文档表格(二)
使用Docker部署的WPS Office服务可以通过内网穿透工具Cpolar实现远程访问。首先,创建一个名为“wps office”的隧道,选择HTTP协议和3000端口,分配免费的随机域名,并指定中国地区。然后,通过Cpolar的管理界面获取HTTP公网地址,用以远程访问WPS Office。由于随机域名会变化,可以升级Cpolar套餐并保留一个二级子域名,确保长期稳定的远程访问。配置子域名后,更新隧道设置,完成固定公网地址的绑定,从而实现随时随地通过固定地址访问WPS Office。
|
3天前
|
数据采集 JavaScript Go
蓝易云 - crawlab通过docker单节点部署简单爬虫
以上就是通过Docker单节点部署Crawlab并运行简单爬虫的步骤。在实际操作中,你需要根据自己的需求对爬虫代码进行编写。
7 2
|
4天前
|
搜索推荐 测试技术 Linux
【Docker项目实战】使用Docker部署EnBizCard数字名片工具
【6月更文挑战第21天】使用Docker部署EnBizCard数字名片工具
18 3
|
4天前
|
域名解析 网络协议 Ubuntu
docker快速部署DNS,实现快速上线
docker快速部署DNS,实现快速上线
|
5天前
|
Java Docker 容器
使用 Spring Boot 构建 Docker 镜像并进行多模式部署
使用 Spring Boot 构建 Docker 镜像并进行多模式部署
24 2
|
6天前
|
测试技术 Linux 数据库
【Docker项目实战】使用Docker部署Xnote轻量级笔记系统
【6月更文挑战第19天】使用Docker部署Xnote轻量级笔记系统
20 3
|
5天前
|
测试技术 Linux 网络安全
【Docker项目实战】使用Docker部署RSS阅读器yarr
【6月更文挑战第20天】使用Docker部署RSS阅读器yarr
12 1
|
1天前
|
应用服务中间件 Linux 数据安全/隐私保护
Linux+docker部署项目
Linux+docker部署项目
8 0
|
3天前
|
安全 Linux 网络安全
如何使用Docker部署WPS Office服务并实现无公网IP远程处理文档表格(一)
在群晖NAS上使用Docker部署WPS Office并结合Cpolar内网穿透的步骤包括: 1. 通过SSH命令行拉取`linuxserver/wps-office`镜像。 2. 在群晖容器管理界面运行镜像,设置启动选项和端口映射。 3. 本地访问群晖IP:3000端口以使用WPS Office。 4. 安装Cpolar套件,手动添加并安装到群晖,通过9200端口访问其Web管理界面。 5. 使用Cpolar配置内网穿透,实现远程访问WPS Office。 这一过程允许用户即使在没有公网IP的情况下,也能通过Cpolar将内网的WPS Office服务暴露到公网,便于远程办公和文档处理。