使用TCP方式拉取Canal数据

简介: 使用TCP方式拉取Canal数据

1 Canal对接Kafka联调

1.1 配置修改

canal.properties

修改 zk:

canal.zkServers = 10.51.50.219:2181
instance.properties

开启配置项

canal.mq.dynamicTopic 是 Canal 的 MQ 动态 Topic 配置项:

  • test_javaedge_01 是kafka 的 topic
  • test_db.users 要监控的数据库、表
  • 当 test_db.users 表发生变化时,Canal 将会把变化的数据推送到名为 test_javaedge_01:test_db.users 的 MQ Topic 中。
canal.mq.dynamicTopic=test_javaedge_01:test_db\\.users

开启一个消费者

[root@javaedge-kafka-dev bin]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_javaedge_01

datagrip 新增数据:

消费到该数据:

2 使用TCP方式拉取Canal数据

现在 serverMode 改回tcp。重启

javaedge@JavaEdgedeMac-mini deployer % jps
71002 CanalLauncher
javaedge@JavaEdgedeMac-mini deployer %

canal 同步程序

package com.javaedge.canal;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.base.CaseFormat;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
public class CanalClientApp {
    public static void main(String[] args) throws Exception {
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("localhost", 11111),
                "example",
                null, null);
        while (true) {
            connector.connect();
            connector.subscribe("test_db.users");
            Message message = connector.get(100);
            List<CanalEntry.Entry> entries = message.getEntries();
            if (entries.size()>0) {
                for (CanalEntry.Entry entry : entries) {
                    String tableName = entry.getHeader().getTableName();
                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                    CanalEntry.EventType eventType = rowChange.getEventType();
                    if (eventType == CanalEntry.EventType.INSERT) {
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            HashMap<Object, Object> map = new HashMap<>();
                            for (CanalEntry.Column column : afterColumnsList) {
                                String key = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());
                                map.put(key, column.getValue());
                            }
                            System.out.println("tableName=" + tableName + "  map=" + JSON.toJSONString(map));
                        }
                    }
                }
            }
        }
    }
}

运行程序。操作 user 数据表,新增一行数据:

程序输出:

显然,后续不管你想把数据同步到哪儿去,都完全自由!

数据链路

MySQL -》canal server(tcp)-》canal client-》kafka。

目录
相关文章
|
消息中间件 网络协议 网络性能优化
为什么TCP要做成流式协议,而非包呢?
为什么TCP要做成流式协议,而非包呢?
100 1
|
6月前
|
时序数据库
客户端连接InfluxDB
客户端连接InfluxDB
150 0
|
5月前
|
安全 C++
gRPC 四模式之 客户端流RPC模式
gRPC 四模式之 客户端流RPC模式
51 0
|
网络协议 算法 JavaScript
实现 memcached 客户端:TCP、连接池、一致性哈希、自定义协议
实现 memcached 客户端:TCP、连接池、一致性哈希、自定义协议
94 0
|
Java 开发者
JavaHTTP心跳:服务器与客户端实时连接的实现方式
JavaHTTP心跳:服务器与客户端实时连接的实现方式 在网络通信中,实时连接是一种至关重要的功能。它允许服务器与客户端之间保持持久的通信信道,实现快速、高效的数据传输。对于Java开发者来说,实现服务器与客户端之间的实时连接可以通过JavaHTTP心跳技术来实现。本文将介绍如何利用JavaHTTP心跳来实现服务器与客户端之间的实时连接。
340 0
|
设计模式 程序员 Nacos
Nacos 客户端/服务端同步集群数据源码分析(五)
Nacos 客户端/服务端同步集群数据源码分析(五)
162 0
|
缓存 网络协议 Java
在项目中使用Curator的Java 客户端搭建后进行长TCP连接和TCP权限配置【Zookeeper】
在项目中使用Curator的Java 客户端搭建后进行长TCP连接和TCP权限配置【Zookeeper】
288 0
在项目中使用Curator的Java 客户端搭建后进行长TCP连接和TCP权限配置【Zookeeper】
|
消息中间件 RocketMQ 开发者
消息拉取客户端处理服务端相应|学习笔记
快速学习消息拉取客户端处理服务端相应
消息拉取客户端处理服务端相应|学习笔记
|
网络协议
客户端与服务端--(基于TCP/IP的群聊系统的设计与实现)
接上期 *Swing组件组合使用--登录界面源码(仿QQ)*,这期分享客户端与服务端源码