mysql开启binlog
这里使用的是
mysql 5.7.32
,server-id
可以自定义,和后面配置的canal
里面的server-id
要不一样
# binlog log-bin=mysql-bin binlog_format=ROW server-id=33081
修改完成后,需要重启mysql服务
show variables like 'log_bin';
返回 ON 表示 binlog 启动成功
+---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+
mysql创建canal用户
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by "canal";
flush privileges;
启动canal容器
配置canal
mkdir -p /data/canal/conf
配置可以从容器内获取
容器内的路径
/home/admin/canal-server/conf/example/instance.properties
vim /data/canal/conf/instance.properties
canal.instance.master.address
需要监听的mysql主机ip:端口canal.instance.dbUsername
canal订阅binlog使用的用户canal.instance.dbPassword
canal订阅binlog使用的用户密码
canal.instance.connectionCharset = UTF-8
canal订阅的字符集canal.instance.filter.regex
需要监听的mysql库和表
- 全库:
.*\\..*
- 指定库下的所有表:
canal\\..*
- 指定库下的指定表:
canal\\.canal,test\\.test
库名\\.表明
:转义需要用\\
,使用逗号分隔多个库
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=192.168.1.200:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex # canal.instance.filter.regex=.*\\..* canal.instance.filter.regex=backendorder\\.t_kry_takeout_.*,backendorder\\.t_kry_product_.*,backendshare\\.t_backend_keruyun_bom_.*,backend\\.t_wx_.* # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #################################################
启动canal容器
docker run -d \ -v /data/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \ -p 11111:11111 \ --name canal \ canal/canal-server:v1.1.5
相关的canal参数,可以参考[canal配置文件参数解释](
查看docker容器日志
docker logs <容器id>
DOCKER_DEPLOY_TYPE=VM ==> INIT /alidata/init/02init-sshd.sh ==> EXIT CODE: 0 ==> INIT /alidata/init/fix-hosts.py ==> EXIT CODE: 0 ==> INIT DEFAULT Generating SSH1 RSA host key: [ OK ] Starting sshd: [ OK ] Starting crond: [ OK ] ==> INIT DONE ==> RUN /home/admin/app.sh ==> START ... start canal ... start canal successful ==> START SUCCESSFUL ...
canal-client 验证
pip3 install canal-python pip3 install protobuf
vim canal_client.py
如果不是全库,需要对指定库同步的话,需要将filter=b'.*\\..*'
里面的.*\\..*
改成canal.instance.filter.regex
参数的值,否则启动客户端就会一直监听全库
import time from canal.client import Client from canal.protocol import EntryProtocol_pb2 from canal.protocol import CanalProtocol_pb2 client = Client() client.connect(host='127.0.0.1', port=11111) client.check_valid(username=b'', password=b'') client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*') while True: message = client.get(100) entries = message['entries'] for entry in entries: entry_type = entry.entryType if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]: continue row_change = EntryProtocol_pb2.RowChange() row_change.MergeFromString(entry.storeValue) event_type = row_change.eventType header = entry.header database = header.schemaName table = header.tableName event_type = header.eventType for row in row_change.rowDatas: format_data = dict() if event_type == EntryProtocol_pb2.EventType.DELETE: for column in row.beforeColumns: format_data = { column.name: column.value } elif event_type == EntryProtocol_pb2.EventType.INSERT: for column in row.afterColumns: format_data = { column.name: column.value } else: format_data['before'] = format_data['after'] = dict() for column in row.beforeColumns: format_data['before'][column.name] = column.value for column in row.afterColumns: format_data['after'][column.name] = column.value data = dict( db=database, table=table, event_type=event_type, data=format_data, ) print(data) time.sleep(1) client.disconnect()
python3 canal_client.py
只有在数据库进行增删改操作才会有输出,查询操作不会改变binlog日志,毕竟这是一款增量同步数据库的工具