阿里中间件--Canal框架实践

本文涉及的产品
性能测试 PTS,5000VUM额度
云原生网关 MSE Higress,422元/月
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
简介: 阿里中间件--Canal框架实践

最近在工作中需要处理一些大数据量同步的场景,正好运用到了canal这款数据库中间件,因此特意花了点时间来进行该中间件的的学习和总结。


背景介绍


早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。


适用版本:


支持mysql5.7及以下版本


传统的主从同步原理:


master将数据记录到了binlog日志里面,然后slave会通过一个io线程去读取master那边指定位置点开始的binlog日志内容,并将相应的信息写会到slave这边的relay日志里面,最后slave会有单独的sql线程来读取这些master那边执行的sql语句记录,达成两端的数据同步。


传统的mysql主从同步实现的原理图如下所示:


网络异常,图片无法展示
|


Canal中间件功能


基于纯java语言开发,可以用于做增量数据订阅和消费功能。


相比于传统的数据同步,我们通常需要进行先搭建主从架构,然后使用binlog日志进行读取,然后指定需要同步的数据库,数据库表等信息。但是随着我们业务的不断复杂,这种传统的数据同步方式以及开始变得较为繁琐,不够灵活。


canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议mysql master收到dump请求,开始推送binary log给slave(也就是canal),canal解析binary log对象(原始为byte流),通过对binlog数据进行解析即可获取需要同步的数据,在进行同步数据的过程中还可以加入开发人员的一些额外逻辑处理,比较开放。


Binlog的三种基本类型分别为


STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况


ROW模式除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但是会占用较多的空间,需要使用mysqlbinlog工具进行查看。


MIX模式比较灵活的记录,例如说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式


Canal环境搭建


需要先登录mysql数据库,检查binlog功能是否有开启。


mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | OFF    |
+---------------+-------+
1 row in set (0.00 sec)
复制代码


如果显示状态为OFF表示该功能未开启,那么这个时候就需要到my.ini里面进行相关配置了,在原来的my.ini配置底部插入以下内容:


server-id=192
log-bin=mysql-bin
binlog_format = ROW
复制代码


当再次通过客户端查看log_bin状态为ON的时候,就表示binlog已经开启:


mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)
复制代码


然后在mysql里面添加以下的相关用户和权限:


CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
复制代码


开启之后,我们可以前往canal的官方地址进行相应版本的安装包进行下载:

github.com/alibaba/can…


下载好指定的版本之后,找到里面的bin目录底下的startup脚本,启动。


启动之后会发现黑窗停止在这样一行的内容上,然后就不动了


Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Listening for transport dt_socket at address: 9099
复制代码


这时候需要前往日志文件夹底下\canal\logs\,查看canal日志文件是否已经开启,如果显示以下内容,就表示启动已经成功


2019-05-06 10:41:56.116 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2019-05-06 10:41:56.144 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-05-06 10:41:56.145 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2019-05-06 10:41:56.233 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.164.1:11111]
2019-05-06 10:41:58.179 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now .....
复制代码


canal server的默认端口号为:11111,如果需要调整的话,可以去到\conf目录底下的canal.properties文件中进行修改。


启动了canal的server之后,便是基于java的客户端搭建了。


首先在\canal\conf目录底下创建一个独立的文件夹(文件命名为idea_user_data),用于做额外的数据源配置:


网络异常,图片无法展示
|


然后创建一份特定的properties文件:(名称最好为:instance.properties),这里面只需要创建properties文件即可,其余几份文件会自动生成,instance.properties可以直接从example文件夹里面进行copy。


网络异常,图片无法展示
|


首先是导入相应的依赖文件:


<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>
复制代码


单机版本的canal连接案例:


单机版本的环境比较好搭建,相应的代码如下:


首先是canal客户端的配置类


/**
 * @author idea
 * @date 2019/5/6
 * @Version V1.0
 */
public class CanalConfig {
    public static String CANAL_ADDRESS="127.0.0.1";
    public static int PORT=11111;
    public static String DESTINATION="idea_user_data";
    public static String FILTER=".*\\..*";
}
复制代码


客户端代码:


package com.sise.client;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import static com.sise.config.CanalConfig.*;
/**
 * @author idea
 * @date 2019/5/6
 * @Version V1.0
 */
public class CanalClient {
    private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
    public static void main(String args[]) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_ADDRESS,
                PORT), DESTINATION, "", "");
        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(FILTER);
            connector.rollback();
            try {
                while (true) {
                    //尝试从master那边拉去数据batchSize条记录,有多少取多少
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        Thread.sleep(1000);
                    } else {
                        dataHandle(message.getEntries());
                    }
                    connector.ack(batchId);
                    //当队列里面堆积的sql大于一定数值的时候就模拟执行
                    if (SQL_QUEUE.size() >= 10) {
                        executeQueueSql();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        } finally {
            connector.disconnect();
        }
    }
    /**
     * 模拟执行队列里面的sql语句
     */
    public static void executeQueueSql() {
        int size = SQL_QUEUE.size();
        for (int i = 0; i < size; i++) {
            String sql = SQL_QUEUE.poll();
            System.out.println("[sql]----> " + sql);
        }
    }
    /**
     * 数据处理
     *
     * @param entrys
     */
    private static void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
        for (Entry entry : entrys) {
            if (EntryType.ROWDATA == entry.getEntryType()) {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                EventType eventType = rowChange.getEventType();
                if (eventType == EventType.DELETE) {
                    saveDeleteSql(entry);
                } else if (eventType == EventType.UPDATE) {
                    saveUpdateSql(entry);
                } else if (eventType == EventType.INSERT) {
                    saveInsertSql(entry);
                }
            }
        }
    }
    /**
     * 保存更新语句
     *
     * @param entry
     */
    private static void saveUpdateSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> newColumnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");
                for (int i = 0; i < newColumnList.size(); i++) {
                    sql.append(" " + newColumnList.get(i).getName()
                            + " = '" + newColumnList.get(i).getValue() + "'");
                    if (i != newColumnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(" where ");
                List<Column> oldColumnList = rowData.getBeforeColumnsList();
                for (Column column : oldColumnList) {
                    if (column.getIsKey()) {
                        //暂时只支持单一主键
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }
    /**
     * 保存删除语句
     *
     * @param entry
     */
    private static void saveDeleteSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getBeforeColumnsList();
                StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");
                for (Column column : columnList) {
                    if (column.getIsKey()) {
                        //暂时只支持单一主键
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }
    /**
     * 保存插入语句
     *
     * @param entry
     */
    private static void saveInsertSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append(columnList.get(i).getName());
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(") VALUES (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append("'" + columnList.get(i).getValue() + "'");
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(")");
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }
}
复制代码


启动程序之后,我们对数据库表进行10次左右的修改操作之后,便可以从控制台中看到sql的打印信息。


网络异常,图片无法展示
|


关于canal集群搭建的一些坑


在实际开发中,如果只有一台canal机器作为server,当该台机器挂掉之后,服务就会终止,那么这个时候我们便需要引入集群部署的方式了。


搭建canal集群的环境需要先搭建好相应的zk集群模式。zk的集群搭建网上资料很多,这里就不进行讲解了。


canal搭建集群的一些资料可以参考以下链接:

github.com/alibaba/can…


canal在搭建HA模式的时候有几个容易掉坑的步骤:

canal.properties配置里面需要添加zk的地址,同时canal.instance.global.spring.xml需

要修改为classpath:spring/default-instance.xml


网络异常,图片无法展示
|


每台机子的canal里面的具体instance所在目录的名称需要统一,每个实例都有对应的slaveId,他们的id需要保证不重复。


搭建好了canal集群环境之后,

然后代码部分需要在链接的那个模块进行稍微的调整:


CanalConnector connector = CanalConnectors.newClusterConnector(CLUSTER_ADDRESS, DESTINATION, "", "");
复制代码


为了保证master在某些特殊场景下挂掉,mysql需要搭建为双M模式,那么我们这个时候可以在每个canal机器的instance配置文件中加入master的地址和standby的地址:


canal.instance.master.address=******
canal.instance.standby.address = ******
复制代码


同时对于detecing也需要进行配置修改


canal.instance.detecting.enable = true ## 需要开启心跳检查
canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() ##心跳检查sql
canal.instance.detecting.interval.time = 3 ##心跳检查频率
canal.instance.detecting.retry.threshold = 3  ## 心跳检查失败次数阀值,当超过这个次数之后,就会自动切换到standby上边的机器进行binlog的订阅读取
canal.instance.detecting.heartbeatHaEnable = true  ## 是否开启master和standby的主动切换
复制代码


ps: master和standby进行切换机器的时候可能会有时间延迟。


启动2台canal机器,可以在zk里面查看到canal注册的节点信息:


网络异常,图片无法展示
|


通过模拟测试,关闭当前端口为11111的canal机器,节点信息会自动更换为第二台canal进行替换:


网络异常,图片无法展示
|


ClusterCanalConnector和SimpleCanalConnector类发现了username和password的参数,但是似乎具体配置中并没有做具体的设置,这是为什么呢?


后来也在github上边查看到了一些网友的相关讨论:


网络异常,图片无法展示
|


canal结合kafka发送sql数据案例


pom依赖:


<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.1</version>
        </dependency>
复制代码


kafka的配置类:


public class KafkaProperties
{
    public final static String ZK_CONNECTION = "XXX.XXX.XXX.XXX:2181";
    public final static String BROKER_LIST_ADDRESS = "XXX.XXX.XXX.XXX:9092";
    public final static String GROUP_ID = "group1";
    public final static String TOPIC = "USER-DATA";
}
复制代码


关于kafka的环境搭建步骤比较简单,网上有很多的资料,这里就不多一一介绍了。

首先是kafka的producer部分代码:


import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;
import java.util.Properties;
import static com.sise.kafka.KafkaProperties.TOPIC;
/**
 * @author idea
 * @date 2019/5/7
 * @Version V1.0
 */
public class KafkaProducerDemo extends Thread {
    public static Logger log = Logger.getLogger(KafkaProducerDemo.class);
    //kafka的链接地址要使用hostname 默认9092端口
    private static final String BROKER_LIST = BROKER_LIST_ADDRESS;
    private static KafkaProducer<String, String> producer = null;
    static {
        Properties configs = initConfig();
        producer = new KafkaProducer<String, String>(configs);
    }
    /*
    初始化配置
     */
    private static Properties initConfig() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }
    public static void sendMsg(String msg) {
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, msg);
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (null != e) {
                    log.info("send error" + e.getMessage());
                } else {
                    System.out.println("send success");
                }
            }
        });
    }
}
复制代码


接着是consumer部分的代码:


import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
 * @author idea
 * @date 2019/5/7
 * @Version V1.0
 */
public class KafkaConsumerDemo extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;
    public KafkaConsumerDemo(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        this.topic = topic;
    }
    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.ZK_CONNECTION);
        props.put("group.id", KafkaProperties.GROUP_ID);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }
    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("【receive】" + new String(it.next().message()));
        }
    }
}
复制代码


然后需要在CanalClient 的executeQueueSql函数出进行部分功能的修改:


/**
     * 给kafka发送sql语句
     */
    public static void executeQueueSql() {
        int size = SQL_QUEUE.size();
        for (int i = 0; i < size; i++) {
            String sql = SQL_QUEUE.poll();
            //发送sql给kafka
            KafkaProducerDemo.sendMsg(sql);
        }
    }
复制代码


为了验证程序是否正常,启动canal和kafka之后,对canal监听的数据库里面的表进行数据信息的修改,然后canal会将修改的binlog里面的sql放入队列中,当队列满了之后便向kafka中进行发送:


网络异常,图片无法展示
|


consumer端接受到数据之后控制台便打印出相应内容:


网络异常,图片无法展示
|

目录
相关文章
|
1月前
|
数据采集 中间件 开发者
Scrapy爬虫框架-自定义中间件
Scrapy爬虫框架-自定义中间件
|
4月前
|
中间件 数据处理 Apache
|
18天前
|
消息中间件 NoSQL Java
springboot整合常用中间件框架案例
该项目是Spring Boot集成整合案例,涵盖多种中间件的使用示例,每个案例项目使用最小依赖,便于直接应用到自己的项目中。包括MyBatis、Redis、MongoDB、MQ、ES等的整合示例。
74 1
|
3月前
|
存储 中间件 PHP
Python编程入门:从零到一的代码实践深入理解 PHP 中的中间件模式
【8月更文挑战第28天】本文旨在通过浅显易懂的方式,向初学者介绍Python编程的基础知识,并结合具体代码示例,带领读者一步步实现从零基础到能够独立编写简单程序的转变。文章将围绕Python语言的核心概念进行讲解,并通过实例展示如何应用这些概念解决实际问题。无论你是编程新手还是希望扩展技能的专业人士,这篇文章都将为你打开编程世界的大门。 【8月更文挑战第28天】在PHP的世界中,设计模式是构建可维护和可扩展软件的重要工具。本文将通过浅显易懂的语言和生动的比喻,带领读者深入理解中间件模式如何在PHP应用中发挥魔力,实现请求处理的高效管理。我们将一步步揭开中间件的神秘面纱,从它的定义、工作原理到
|
4月前
|
JSON 中间件 数据处理
实践出真知:通过项目学习Python Web框架的路由与中间件设计
【7月更文挑战第19天】探索Python Web开发,掌握Flask或Django的关键在于理解路由和中间件。路由连接URL与功能,如Flask中@app.route()定义请求响应路径。中间件在请求处理前后执行,提供扩展功能,如日志、认证。通过实践项目,不仅学习理论,还能提升构建高效Web应用的能力。示例代码展示路由定义及模拟中间件行为,强调动手实践的重要性。
58 1
|
4月前
|
设计模式 中间件 测试技术
PHP中的中间件模式解析与实践
【7月更文挑战第11天】在现代Web开发中,中间件模式已成为设计高效、可维护应用程序的关键。本文深入探讨了PHP环境下中间件模式的实现方法,并提供了一个实际示例来演示如何利用中间件优化请求处理流程。
40 1
|
4月前
|
运维 中间件 PHP
深入理解PHP中的中间件模式自动化运维之脚本编程实践##
【7月更文挑战第31天】在PHP开发中,中间件模式是一种强大的设计模式,它允许开发者在请求处理流程中注入自定义的处理逻辑。本文将通过实际代码示例来探讨如何在PHP项目中实现和使用中间件,以及这种模式如何提升应用程序的可维护性和扩展性。 【7月更文挑战第31天】 在现代IT运维管理中,自动化不再是可选项,而是提高生产效率、确保服务质量的必需品。本文将通过Python脚本编程的角度,探讨如何利用代码简化日常运维任务,提升工作效率。我们将从实际案例出发,逐步剖析自动化脚本的设计思路、实现过程及其带来的益处。 ##
29 0
|
4月前
|
缓存 监控 安全
中间件在Python Web框架中的角色与应用场景
【7月更文挑战第21天】中间件在Python Web开发中作为服务器与应用间的软件层,拦截、处理请求和响应,无需改动应用代码。它扩展框架功能,复用跨应用逻辑,加强安全,优化性能。如Django中间件处理请求/响应,Flask通过WSGI中间件实现类似功能,两者均在不触及核心代码前提下,灵活增强应用行为,是现代Web开发关键组件。
55 0
|
4月前
|
中间件 数据库 开发者
解析Python Web框架的四大支柱:模板、ORM、中间件与路由
【7月更文挑战第20天】Python Web框架如Django、Flask、FastAPI的核心包括模板(如Django的DTL和Flask的Jinja2)、ORM(Django的内置ORM与Flask的SQLAlchemy)、中间件(Django的全局中间件与Flask的装饰器实现)和路由(Django的urls.py配置与Flask的@app.route()装饰器)。这些组件提升了代码组织和数据库操作的便捷性,确保了Web应用的稳定性和可扩展性。
65 0
|
4月前
|
中间件 API 开发者
深入理解Python Web框架:中间件的工作原理与应用策略
【7月更文挑战第19天】Python Web中间件摘要:**中间件是扩展框架功能的关键组件,它拦截并处理请求与响应。在Flask中,通过`before_request`和`after_request`装饰器模拟中间件行为;Django则有官方中间件系统,需实现如`process_request`和`process_response`等方法。中间件用于日志、验证等场景,但应考虑性能、执行顺序、错误处理和代码可维护性。
80 0