Canal实现mysql数据与redis同步

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: Canal实现mysql数据与redis同步

Canal实现mysql数据与redis同步

1.Canal简介

canal [kə'næl],译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

2.项目背景

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

基于日志增量订阅和消费支持的业务包括:

1.数据库镜像

2.数据库实时备份

3.多级索引 (卖家和买家各自分库索引)

4.search build

5.业务cache刷新

6.带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

3.工作原理

3.1.MySQL主从复制原理

1.MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

2.MySQL slave master binary log events 拷贝到它的中继日志(relay log)

3.MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

3.2.Canal工作原理

1.canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

2.MySQL master 收到 dump 请求,开始推送 binary log slave ( canal )

3.canal 解析 binary log 对象(原始为 byte )

实现步骤

MySQL配置

我使用的是mysql5.6.x,修改mysql的配置文件my.ini,加入下面内容

日志存放路径

general_log=ON

general_log_file=G:\mysql-cluster\master\master.log

#default-character-set=utf8

character-set-server=utf8

default-storage-engine=InnoDB

#innodb_force_recovery = 1

# server_id = .....

log_bin=mysql-bin

binlog-format=ROW

server-id=1

需要同步的数据库名称

binlog-do-db=test_canal

忽略的数据库,建议填写

binlog-ignore-db=mysql

配置完后重新启动数据库服务

执行下列授权语句

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT,REPLICATION SLAVE,REPLICATION client ON *.* to 'canal'@'localhost'

FLUSH PRIVILEGES;

 

Canal部署与配置

下载canalhttps://github.com/alibaba/canal/releases/

这里我下载的是1.1.5

下载完成解压即可,修改配置

canal/conf/canal.properties可保持不变,默认的端口51

canal/conf/example/instance.properties需要配置

#################################################

## mysql serverId , v1.0.26+ will autoGen

# canal.instance.mysql.slaveId=0

 

# enable gtid use true/false

这里的id不要跟mysqlserver-id相同

canal.instance.mysql.slaveId=123

canal.instance.gtidon=false

 

# position info

canal.instance.master.address=127.0.0.1:3307

canal.instance.master.journal.name=

canal.instance.master.position=

canal.instance.master.timestamp=

canal.instance.master.gtid=

 

# rds oss binlog

canal.instance.rds.accesskey=

canal.instance.rds.secretkey=

canal.instance.rds.instanceId=

 

# table meta tsdb info

canal.instance.tsdb.enable=false

#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb

#canal.instance.tsdb.dbUsername=canal

#canal.instance.tsdb.dbPassword=canal

 

#canal.instance.standby.address =

#canal.instance.standby.journal.name =

#canal.instance.standby.position =

#canal.instance.standby.timestamp =

#canal.instance.standby.gtid=

 

# username/password

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

canal.instance.connectionCharset = UTF-8

# enable druid Decrypt database password

canal.instance.enableDruid=false

#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

 

# table regex

canal.instance.filter.regex=.*\\..*

# table black regex

canal.instance.filter.black.regex=mysql\\.slave_.*

# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch

# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

 

# mq config

canal.mq.topic=example

# dynamic topic route by schema or table regex

#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*

canal.mq.partition=0

# hash partition config

#canal.mq.partitionsNum=3

#canal.mq.partitionHash=test.table:id^name,.*\\..*

#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6

#################################################

 

配置完成后启动canal,我这里是windows系统,直接双击bin/startup.bat即可

查看是否正常启动,需要看2个日志文件

logs/canal/canal.log文件中有如下内容:the canal server is running now ......

logs/example/example.log文件中有如下内容:start successful....

证明启动成功

Javacanal客户端

创建maven工程

导入pom依赖

<dependency>

    <groupId>com.alibaba.otter</groupId>

    <artifactId>canal.client</artifactId>

    <version>1.1.0</version>

</dependency>

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

Redis工具类

package cn.demo.util;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.io.IOException;
import java.util.Properties;

/**
 * 获取连接池对象 */
public enum RedisUtils {
    INSTANCE;
    static JedisPool jedisPool = null;

    static {
        //1 创建连接池配置对象        JedisPoolConfig config = new JedisPoolConfig();
        //2 进行配置-四个配置        config.setMaxIdle(1);//最小连接数        config.setMaxTotal(11);//最大连接数        config.setMaxWaitMillis(10 * 1000L);//最长等待时间        config.setTestOnBorrow(true);//测试连接时是否畅通        //3 通过配置对象创建连接池对象        Properties properties = null;
        try {
            properties = new Properties();
            properties.load(RedisUtils.class.getClassLoader().getResourceAsStream("redis.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        String host = properties.getProperty("redis.host");
        String port = properties.getProperty("redis.port");
        String password = properties.getProperty("redis.password");
        String timeout = properties.getProperty("redis.timeout");
        System.out.println(host);
        System.out.println(port);
        System.out.println(password);
        System.out.println(timeout);
        jedisPool = new JedisPool(config, host, Integer.valueOf(port),Integer.valueOf(timeout), password);
    }

    //获取连接    public Jedis getSource() {
        return jedisPool.getResource();
    }

    //关闭资源    public void closeSource(Jedis jedis) {
        if (jedis != null) {
            jedis.close();
        }

    }
    /**
     * 设置字符值     *
     * @param key
     * @param
     */
    public void del(String key) {
        Jedis jedis = getSource();
        jedis.del(key);
        closeSource(jedis);
    }
    /**
     * 设置字符值     *
     * @param key
     * @param value
     */
    public void set(String key, String value) {
        Jedis jedis = getSource();
        jedis.set(key, value);
        closeSource(jedis);
    }

    /**
     * 设置     * @param key
     * @param value
     */
    public void set(byte[] key, byte[] value) {
        Jedis jedis = getSource();
        jedis.set(key, value);
        closeSource(jedis);
    }

    /**
     *
     * @param key
     * @return
     */
    public byte[]  get(byte[] key) {
        Jedis jedis = getSource();
        try {
            return jedis.get(key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeSource(jedis);
        }
        return null;

    }

    /**
     * 设置字符值     *
     * @param key
     */
    public String get(String key) {
        Jedis jedis = getSource();
        try {
            return jedis.get(key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeSource(jedis);
        }

        return null;

    }

    public void set(String key, String value, Integer time) {
        Jedis jedis = getSource();
        jedis.setex(key,time,value);
        closeSource(jedis);
    }
}

Canal客户端测试类

package cn.demo.test;

import cn.demo.util.RedisUtils;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * canal测试 */
public class SimpleCanalClientExample {


    public static void main(String args[]) {
        // 创建链接        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认                // connector.rollback(batchId); // 处理失败回滚数据            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    //printColumn(rowData.getBeforeColumnsList());
                    redisDelete(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    //printColumn(rowData.getAfterColumnsList());
                    redisInsertOrUpdate(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    //printColumn(rowData.getAfterColumnsList());
                    redisInsertOrUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }
    // 修改redis
    private static void redisInsertOrUpdate(List<Column> columns) {
        JSONObject json = new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            RedisUtils.INSTANCE.set("user:" + columns.get(0).getValue(), json.toJSONString());
        }
    }
    // 删除redis
    private static void redisDelete(List<Column> columns) {
        JSONObject json = new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            RedisUtils.INSTANCE.del("user:" + columns.get(0).getValue());
        }
    }



    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}

测试效果,触发数据库变更

执行sql语句查看redis相应的变化

经过测试,mysql的数据已经同步到redis,测试成功

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
9天前
|
安全 关系型数据库 MySQL
如何将数据从MySQL同步到其他系统
【10月更文挑战第17天】如何将数据从MySQL同步到其他系统
65 0
|
5天前
|
SQL Java 关系型数据库
java连接mysql查询数据(基础版,无框架)
【10月更文挑战第12天】该示例展示了如何使用Java通过JDBC连接MySQL数据库并查询数据。首先在项目中引入`mysql-connector-java`依赖,然后通过`JdbcUtil`类中的`main`方法实现数据库连接、执行SQL查询及结果处理,最后关闭相关资源。
|
2天前
|
SQL 关系型数据库 MySQL
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
10 1
|
3天前
|
SQL 关系型数据库 MySQL
mysql数据误删后的数据回滚
【11月更文挑战第1天】本文介绍了四种恢复误删数据的方法:1. 使用事务回滚,通过 `pymysql` 库在 Python 中实现;2. 使用备份恢复,通过 `mysqldump` 命令备份和恢复数据;3. 使用二进制日志恢复,通过 `mysqlbinlog` 工具恢复特定位置的事件;4. 使用延迟复制从副本恢复,通过停止和重启从库复制来恢复数据。每种方法都有详细的步骤和示例代码。
|
15天前
|
存储 关系型数据库 MySQL
面试官:MySQL一次到底插入多少条数据合适啊?
本文探讨了数据库插入操作的基础知识、批量插入的优势与挑战,以及如何确定合适的插入数据量。通过面试对话的形式,详细解析了单条插入与批量插入的区别,磁盘I/O、内存使用、事务大小和锁策略等关键因素。最后,结合MyBatis框架,提供了实际应用中的批量插入策略和优化建议。希望读者不仅能掌握技术细节,还能理解背后的原理,从而更好地优化数据库性能。
|
6月前
|
存储 NoSQL Redis
redis存储原理和数据模型
redis存储原理和数据模型
62 1
|
3月前
|
存储 NoSQL Redis
Redis存储原理与数据模型
Redis存储原理与数据模型
|
5月前
|
存储 缓存 NoSQL
了解Redis,第一弹,什么是RedisRedis主要适用于分布式系统,用来用缓存,存储数据,在内存中存储那么为什么说是分布式呢?什么叫分布式什么是单机架构微服务架构微服务的本质
了解Redis,第一弹,什么是RedisRedis主要适用于分布式系统,用来用缓存,存储数据,在内存中存储那么为什么说是分布式呢?什么叫分布式什么是单机架构微服务架构微服务的本质
|
6月前
|
存储 缓存 NoSQL
为什么要在 Redis 中存储两次同一份数据?
为什么要在 Redis 中存储两次同一份数据?
74 0
为什么要在 Redis 中存储两次同一份数据?
|
6月前
|
存储 NoSQL 算法
redis存储什么类型的数据?redis分布式锁怎么实现的?
redis存储什么类型的数据?redis分布式锁怎么实现的?

推荐镜像

更多