1 Canal对接Kafka联调
1.1 配置修改
canal.properties
修改 zk:
canal.zkServers = 10.51.50.219:2181
instance.properties
开启配置项:
canal.mq.dynamicTopic
是 Canal 的 MQ 动态 Topic 配置项:
test_javaedge_01
是kafka 的 topictest_db.users
要监控的数据库、表- 当 test_db.users 表发生变化时,Canal 将会把变化的数据推送到名为 test_javaedge_01:test_db.users 的 MQ Topic 中。
canal.mq.dynamicTopic=test_javaedge_01:test_db\\.users
开启一个消费者
[root@javaedge-kafka-dev bin]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_javaedge_01
datagrip 新增数据:
消费到该数据:
2 使用TCP方式拉取Canal数据
现在 serverMode 改回tcp。重启
javaedge@JavaEdgedeMac-mini deployer % jps 71002 CanalLauncher javaedge@JavaEdgedeMac-mini deployer %
canal 同步程序
package com.javaedge.canal; import com.alibaba.fastjson.JSON; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.common.base.CaseFormat; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; public class CanalClientApp { public static void main(String[] args) throws Exception { CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("localhost", 11111), "example", null, null); while (true) { connector.connect(); connector.subscribe("test_db.users"); Message message = connector.get(100); List<CanalEntry.Entry> entries = message.getEntries(); if (entries.size()>0) { for (CanalEntry.Entry entry : entries) { String tableName = entry.getHeader().getTableName(); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); CanalEntry.EventType eventType = rowChange.getEventType(); if (eventType == CanalEntry.EventType.INSERT) { for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); HashMap<Object, Object> map = new HashMap<>(); for (CanalEntry.Column column : afterColumnsList) { String key = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName()); map.put(key, column.getValue()); } System.out.println("tableName=" + tableName + " map=" + JSON.toJSONString(map)); } } } } } } }
运行程序。操作 user 数据表,新增一行数据:
程序输出:
显然,后续不管你想把数据同步到哪儿去,都完全自由!
数据链路
MySQL -》canal server(tcp)-》canal client-》kafka。