使用TCP方式拉取Canal数据

简介: 使用TCP方式拉取Canal数据

1 Canal对接Kafka联调

1.1 配置修改

canal.properties

修改 zk:

canal.zkServers = 10.51.50.219:2181
instance.properties

开启配置项

canal.mq.dynamicTopic 是 Canal 的 MQ 动态 Topic 配置项:

  • test_javaedge_01 是kafka 的 topic
  • test_db.users 要监控的数据库、表
  • 当 test_db.users 表发生变化时,Canal 将会把变化的数据推送到名为 test_javaedge_01:test_db.users 的 MQ Topic 中。
canal.mq.dynamicTopic=test_javaedge_01:test_db\\.users

开启一个消费者

[root@javaedge-kafka-dev bin]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_javaedge_01

datagrip 新增数据:

消费到该数据:

2 使用TCP方式拉取Canal数据

现在 serverMode 改回tcp。重启

javaedge@JavaEdgedeMac-mini deployer % jps
71002 CanalLauncher
javaedge@JavaEdgedeMac-mini deployer %

canal 同步程序

package com.javaedge.canal;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.base.CaseFormat;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
public class CanalClientApp {
    public static void main(String[] args) throws Exception {
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("localhost", 11111),
                "example",
                null, null);
        while (true) {
            connector.connect();
            connector.subscribe("test_db.users");
            Message message = connector.get(100);
            List<CanalEntry.Entry> entries = message.getEntries();
            if (entries.size()>0) {
                for (CanalEntry.Entry entry : entries) {
                    String tableName = entry.getHeader().getTableName();
                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                    CanalEntry.EventType eventType = rowChange.getEventType();
                    if (eventType == CanalEntry.EventType.INSERT) {
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            HashMap<Object, Object> map = new HashMap<>();
                            for (CanalEntry.Column column : afterColumnsList) {
                                String key = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());
                                map.put(key, column.getValue());
                            }
                            System.out.println("tableName=" + tableName + "  map=" + JSON.toJSONString(map));
                        }
                    }
                }
            }
        }
    }
}

运行程序。操作 user 数据表,新增一行数据:

程序输出:

显然,后续不管你想把数据同步到哪儿去,都完全自由!

数据链路

MySQL -》canal server(tcp)-》canal client-》kafka。

目录
相关文章
|
canal 关系型数据库 MySQL
cancal 同步mysql数据到es中
cancal 同步mysql数据到es中
415 1
|
canal SQL JSON
Elastic: canal数据同步到ES配置常见报错
所有报错均为博主在实操过程中遇到的错误和解决办法,如果有其他报错或者不同的解决办法,请留言告诉我 安装canal过程中遇到问题,先在本文中查询是否有相同报错,将会为你节约大量排错时间
1178 0
Elastic: canal数据同步到ES配置常见报错
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之DataWorks运行报错com.alibaba.otter.canal.parse.exception.PositionNotFoundException: can't find start position for XXX如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
257 1
|
canal 存储 NoSQL
mysql进阶:canal搭建主从|集群架构
之前我们讲解过canal的各种应用,但是对于生产环境来讲,服务高可用是必须保证的。因此canal单节点是不能满足我们的需求的。就需要搭建canal集群。
1406 2
mysql进阶:canal搭建主从|集群架构
|
12月前
|
消息中间件 Kafka 测试技术
Kafka常用命令大全及kafka-console-consumer.sh及参数说明
该文章汇总了Kafka常用命令,包括集群管理、Topic操作、生产者与消费者的命令行工具使用方法等,适用于Kafka的日常运维和开发需求。
3172 3
|
SQL 存储 运维
从Citus深度解密如何基于PostgreSQL做分布式数据库
从源码级别揭秘Citus如何基于PostgreSQL做一款分布式数据库,解决分布式场景的数据分片、分布式SQL、分布式事务、数据倾斜、数据迁移等难点问题,理解分布式领域设计的“取”与“舍”。
2285 3
从Citus深度解密如何基于PostgreSQL做分布式数据库
|
自然语言处理 达摩院 索引
Elasticsearch 中文分词器
在使用Elasticsearch 进行搜索中文时,Elasticsearch 内置的分词器会将所有的汉字切分为单个字,对用国内习惯的一些形容词、常见名字等则无法优雅的处理,此时就需要用到一些开源的分词器,以下分别介绍几种常见的中文分词器
10067 2
Elasticsearch 中文分词器
|
SQL 关系型数据库 MySQL
TiDB特有的SQL语法和特性
【2月更文挑战第28天】本章将深入探讨TiDB特有的SQL语法和特性,这些功能和优化是TiDB相较于传统关系型数据库所独有的。通过了解这些特性,读者将能更充分地利用TiDB的优势,优化数据库性能,提升业务处理效率。
|
消息中间件 canal NoSQL
Canal+Kafka实现MySQL与Redis数据同步(一)
Canal+Kafka实现MySQL与Redis数据同步
1034 0