Canal实现mysql数据与redis同步

本文涉及的产品
云原生内存数据库 Tair,内存型 2GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 Redis 版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Canal实现mysql数据与redis同步

Canal实现mysql数据与redis同步

1.Canal简介

canal [kə'næl],译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

2.项目背景

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

基于日志增量订阅和消费支持的业务包括:

1.数据库镜像

2.数据库实时备份

3.多级索引 (卖家和买家各自分库索引)

4.search build

5.业务cache刷新

6.带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

3.工作原理

3.1.MySQL主从复制原理

1.MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

2.MySQL slave master binary log events 拷贝到它的中继日志(relay log)

3.MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

3.2.Canal工作原理

1.canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

2.MySQL master 收到 dump 请求,开始推送 binary log slave ( canal )

3.canal 解析 binary log 对象(原始为 byte )

实现步骤

MySQL配置

我使用的是mysql5.6.x,修改mysql的配置文件my.ini,加入下面内容

日志存放路径

general_log=ON

general_log_file=G:\mysql-cluster\master\master.log

#default-character-set=utf8

character-set-server=utf8

default-storage-engine=InnoDB

#innodb_force_recovery = 1

# server_id = .....

log_bin=mysql-bin

binlog-format=ROW

server-id=1

需要同步的数据库名称

binlog-do-db=test_canal

忽略的数据库,建议填写

binlog-ignore-db=mysql

配置完后重新启动数据库服务

执行下列授权语句

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT,REPLICATION SLAVE,REPLICATION client ON *.* to 'canal'@'localhost'

FLUSH PRIVILEGES;

 

Canal部署与配置

下载canalhttps://github.com/alibaba/canal/releases/

这里我下载的是1.1.5

下载完成解压即可,修改配置

canal/conf/canal.properties可保持不变,默认的端口51

canal/conf/example/instance.properties需要配置

#################################################

## mysql serverId , v1.0.26+ will autoGen

# canal.instance.mysql.slaveId=0

 

# enable gtid use true/false

这里的id不要跟mysqlserver-id相同

canal.instance.mysql.slaveId=123

canal.instance.gtidon=false

 

# position info

canal.instance.master.address=127.0.0.1:3307

canal.instance.master.journal.name=

canal.instance.master.position=

canal.instance.master.timestamp=

canal.instance.master.gtid=

 

# rds oss binlog

canal.instance.rds.accesskey=

canal.instance.rds.secretkey=

canal.instance.rds.instanceId=

 

# table meta tsdb info

canal.instance.tsdb.enable=false

#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb

#canal.instance.tsdb.dbUsername=canal

#canal.instance.tsdb.dbPassword=canal

 

#canal.instance.standby.address =

#canal.instance.standby.journal.name =

#canal.instance.standby.position =

#canal.instance.standby.timestamp =

#canal.instance.standby.gtid=

 

# username/password

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

canal.instance.connectionCharset = UTF-8

# enable druid Decrypt database password

canal.instance.enableDruid=false

#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

 

# table regex

canal.instance.filter.regex=.*\\..*

# table black regex

canal.instance.filter.black.regex=mysql\\.slave_.*

# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch

# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

 

# mq config

canal.mq.topic=example

# dynamic topic route by schema or table regex

#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*

canal.mq.partition=0

# hash partition config

#canal.mq.partitionsNum=3

#canal.mq.partitionHash=test.table:id^name,.*\\..*

#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6

#################################################

 

配置完成后启动canal,我这里是windows系统,直接双击bin/startup.bat即可

查看是否正常启动,需要看2个日志文件

logs/canal/canal.log文件中有如下内容:the canal server is running now ......

logs/example/example.log文件中有如下内容:start successful....

证明启动成功

Javacanal客户端

创建maven工程

导入pom依赖

<dependency>

    <groupId>com.alibaba.otter</groupId>

    <artifactId>canal.client</artifactId>

    <version>1.1.0</version>

</dependency>

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

Redis工具类

package cn.demo.util;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.io.IOException;
import java.util.Properties;

/**
 * 获取连接池对象 */
public enum RedisUtils {
    INSTANCE;
    static JedisPool jedisPool = null;

    static {
        //1 创建连接池配置对象        JedisPoolConfig config = new JedisPoolConfig();
        //2 进行配置-四个配置        config.setMaxIdle(1);//最小连接数        config.setMaxTotal(11);//最大连接数        config.setMaxWaitMillis(10 * 1000L);//最长等待时间        config.setTestOnBorrow(true);//测试连接时是否畅通        //3 通过配置对象创建连接池对象        Properties properties = null;
        try {
            properties = new Properties();
            properties.load(RedisUtils.class.getClassLoader().getResourceAsStream("redis.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        String host = properties.getProperty("redis.host");
        String port = properties.getProperty("redis.port");
        String password = properties.getProperty("redis.password");
        String timeout = properties.getProperty("redis.timeout");
        System.out.println(host);
        System.out.println(port);
        System.out.println(password);
        System.out.println(timeout);
        jedisPool = new JedisPool(config, host, Integer.valueOf(port),Integer.valueOf(timeout), password);
    }

    //获取连接    public Jedis getSource() {
        return jedisPool.getResource();
    }

    //关闭资源    public void closeSource(Jedis jedis) {
        if (jedis != null) {
            jedis.close();
        }

    }
    /**
     * 设置字符值     *
     * @param key
     * @param
     */
    public void del(String key) {
        Jedis jedis = getSource();
        jedis.del(key);
        closeSource(jedis);
    }
    /**
     * 设置字符值     *
     * @param key
     * @param value
     */
    public void set(String key, String value) {
        Jedis jedis = getSource();
        jedis.set(key, value);
        closeSource(jedis);
    }

    /**
     * 设置     * @param key
     * @param value
     */
    public void set(byte[] key, byte[] value) {
        Jedis jedis = getSource();
        jedis.set(key, value);
        closeSource(jedis);
    }

    /**
     *
     * @param key
     * @return
     */
    public byte[]  get(byte[] key) {
        Jedis jedis = getSource();
        try {
            return jedis.get(key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeSource(jedis);
        }
        return null;

    }

    /**
     * 设置字符值     *
     * @param key
     */
    public String get(String key) {
        Jedis jedis = getSource();
        try {
            return jedis.get(key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeSource(jedis);
        }

        return null;

    }

    public void set(String key, String value, Integer time) {
        Jedis jedis = getSource();
        jedis.setex(key,time,value);
        closeSource(jedis);
    }
}

Canal客户端测试类

package cn.demo.test;

import cn.demo.util.RedisUtils;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * canal测试 */
public class SimpleCanalClientExample {


    public static void main(String args[]) {
        // 创建链接        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认                // connector.rollback(batchId); // 处理失败回滚数据            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    //printColumn(rowData.getBeforeColumnsList());
                    redisDelete(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    //printColumn(rowData.getAfterColumnsList());
                    redisInsertOrUpdate(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    //printColumn(rowData.getAfterColumnsList());
                    redisInsertOrUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }
    // 修改redis
    private static void redisInsertOrUpdate(List<Column> columns) {
        JSONObject json = new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            RedisUtils.INSTANCE.set("user:" + columns.get(0).getValue(), json.toJSONString());
        }
    }
    // 删除redis
    private static void redisDelete(List<Column> columns) {
        JSONObject json = new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            RedisUtils.INSTANCE.del("user:" + columns.get(0).getValue());
        }
    }



    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}

测试效果,触发数据库变更

执行sql语句查看redis相应的变化

经过测试,mysql的数据已经同步到redis,测试成功

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6天前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
66 4
|
4天前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
9天前
|
SQL 存储 缓存
MySQL是如何保证数据不丢失的?
文章详细阐述了InnoDB存储引擎中Buffer Pool与DML操作的关系。在执行插入、更新或删除操作时,InnoDB为了减少磁盘I/O,会在Buffer Pool中缓存数据页进行操作,随后将更新后的“脏页”刷新至磁盘。为防止服务宕机导致数据丢失,InnoDB采用了日志先行(WAL)机制,通过将DML操作记录为Redo Log并异步刷新到磁盘,结合双写机制和合理的日志刷新策略,确保数据的持久性和一致性。尽管如此,仍需合理配置参数以平衡性能与数据安全性。
MySQL是如何保证数据不丢失的?
|
4天前
|
缓存 NoSQL 关系型数据库
MySQL与Redis缓存一致性的实现与挑战
在现代软件开发中,MySQL作为关系型数据库管理系统,广泛应用于数据存储;而Redis则以其高性能的内存数据结构存储特性,常被用作缓存层来提升数据访问速度。然而,当MySQL与Redis结合使用时,确保两者之间的数据一致性成为了一个重要且复杂的挑战。本文将从技术角度分享MySQL与Redis缓存一致性的实现方法及其面临的挑战。
18 2
|
7天前
|
存储 关系型数据库 MySQL
|
7天前
|
SQL 关系型数据库 MySQL
SQL Server、MySQL、PostgreSQL:主流数据库SQL语法异同比较——深入探讨数据类型、分页查询、表创建与数据插入、函数和索引等关键语法差异,为跨数据库开发提供实用指导
【8月更文挑战第31天】SQL Server、MySQL和PostgreSQL是当今最流行的关系型数据库管理系统,均使用SQL作为查询语言,但在语法和功能实现上存在差异。本文将比较它们在数据类型、分页查询、创建和插入数据以及函数和索引等方面的异同,帮助开发者更好地理解和使用这些数据库。尽管它们共用SQL语言,但每个系统都有独特的语法规则,了解这些差异有助于提升开发效率和项目成功率。
63 0
|
16天前
|
SQL 关系型数据库 MySQL
【揭秘】MySQL binlog日志与GTID:如何让数据库备份恢复变得轻松简单?
【8月更文挑战第22天】MySQL的binlog日志记录数据变更,用于恢复、复制和点恢复;GTID为每笔事务分配唯一ID,简化复制和恢复流程。开启binlog和GTID后,可通过`mysqldump`进行逻辑备份,包含binlog位置信息,或用`xtrabackup`做物理备份。恢复时,使用`mysql`命令执行备份文件,或通过`innobackupex`恢复物理备份。GTID模式下的主从复制配置更简便。
69 2
|
11天前
|
弹性计算 关系型数据库 数据库
手把手带你从自建 MySQL 迁移到云数据库,一步就能脱胎换骨
阿里云瑶池数据库来开课啦!自建数据库迁移至云数据库 RDS原来只要一步操作就能搞定!点击阅读原文完成实验就可获得一本日历哦~
|
15天前
|
关系型数据库 MySQL 数据库
RDS MySQL灾备服务协同解决方案构建问题之数据库备份数据的云上云下迁移如何解决
RDS MySQL灾备服务协同解决方案构建问题之数据库备份数据的云上云下迁移如何解决
|
12天前
|
人工智能 小程序 关系型数据库
【MySQL】黑悟空都掌握的技能,数据库隔离级别全攻略
本文以热门游戏《黑神话:悟空》为契机,深入浅出地解析了数据库事务的四种隔离级别:读未提交、读已提交、可重复读和串行化。通过具体示例,展示了不同隔离级别下的事务行为差异及可能遇到的问题,如脏读、不可重复读和幻读等。此外,还介绍了在MySQL中设置隔离级别的方法,包括全局和会话级别的调整,并通过实操演示了各隔离级别下的具体效果。本文旨在帮助开发者更好地理解和运用事务隔离级别,以提升数据库应用的一致性和性能。
78 2
【MySQL】黑悟空都掌握的技能,数据库隔离级别全攻略

热门文章

最新文章

推荐镜像

更多
下一篇
DDNS