Redis系列-6.Redis缓存双写一致性问题(下)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Redis系列-6.Redis缓存双写一致性问题

Redis系列-6.Redis缓存双写一致性问题(上):https://developer.aliyun.com/article/1414658


Redis与Mysql数据双写一致性工程落地案例


多个线程同时去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上使用一个 互斥锁来锁住它。


其他的线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。


后面的线程进来发现已经有缓存了,就直接走缓存。


canal


canal [kə’næl],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发;


历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了canal项目;


主要用来做数据库镜像、数据库实时备份、索引构建和实时维护、业务cache刷新、带业务逻辑的增量数据处理


工作原理


传统Mysql主从复制工作原理


MySQL的主从复制将经过如下步骤:


1、当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;


2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,


如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;


3、同时 master 主服务器为每个 I/O Thread 启动一个dump Thread,用于向其发送二进制事件日志;


4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;


5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;


6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;


canal工作原理



mysql-canal-redis双写一致性Coding


mysql


查看mysql版本


select version() mysql5.7.28


当前的主机二进制日志


show master status


查看 SHOW VARIABLES LIKE ‘log_bin’;


SHOW VARIABLES LIKE ‘log_bin’;


开启Mysql的binlog写入功能


在mysql5.7.28的目录下打开,最好提前备份

log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1    #配置MySQL replaction需要定义,不要和canal的 slaveId重复
ROW模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但会占用较多的空间。
STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;
MIX模式比较灵活的记录,理论上说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;

window my.ini


linux my.cnf


重启mysql


再次查看SHOW VARIABLES LIKE ‘log_bin’;



授权canal连接Mysql账号


mysql默认的用户在mysql库的user表里

默认没有canal账户,此处新建 + 授权

DROP USER IF EXISTS 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';  
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';  
FLUSH PRIVILEGES;
SELECT * FROM mysql.user;


canal服务端


下载


注意发布时间+版本,2022.8.11后发布的才用


解压


解压后整体放入/mycanal路径下


配置


修改/mycanal/conf/example路径下 instance.properties 文件

instance.properties


换成自己的mysql主机master的ip地址

换成自己在mysql新建的canal账户


启动


/opt/mycanal/bin 路径下执行


./startup.sh


查看


判断canal是否启动成功


查看server日志

查看样例 example的日志


canal客户端(Java编写业务程序)


SQL脚本


1 随便选个数据库,以你自己为主,本例bigdata,按照下面建表
CREATE TABLE `t_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `userName` varchar(100) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4


修改POM


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.atguigu.canal</groupId>
    <artifactId>canal_demo02</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.14</version>
        <relativePath/>
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <junit.version>4.12</junit.version>
        <log4j.version>1.2.17</log4j.version>
        <lombok.version>1.16.18</lombok.version>
        <mysql.version>5.1.47</mysql.version>
        <druid.version>1.1.16</druid.version>
        <mapper.version>4.1.5</mapper.version>
        <mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
    </properties>
    <dependencies>
        <!--canal-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.0</version>
        </dependency>
        <!--SpringBoot通用依赖模块-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--swagger2-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--SpringBoot与Redis整合依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <!--SpringBoot与AOP-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>
        <!--Mysql数据库驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
        <!--SpringBoot集成druid连接池-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>${druid.version}</version>
        </dependency>
        <!--mybatis和springboot整合-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis.spring.boot.version}</version>
        </dependency>
        <!--通用基础配置junit/devtools/test/log4j/lombok/hutool-->
        <!--hutool-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.2.3</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <optional>true</optional>
        </dependency>
        <!--persistence-->
        <dependency>
            <groupId>javax.persistence</groupId>
            <artifactId>persistence-api</artifactId>
            <version>1.0.2</version>
        </dependency>
        <!--通用Mapper-->
        <dependency>
            <groupId>tk.mybatis</groupId>
            <artifactId>mapper</artifactId>
            <version>${mapper.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.8.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>


配置文件


server.port=5555
# ========================alibaba.druid=====================
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.druid.test-while-idle=false


业务类


RedisUtils

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtils
{
    public static final String  REDIS_IP_ADDR = "192.168.111.185";
    public static final String  REDIS_pwd = "111111";
    public static JedisPool jedisPool;
    static {
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(20);
        jedisPoolConfig.setMaxIdle(10);
        jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);
    }
    public static Jedis getJedis() throws Exception {
        if(null!=jedisPool){
            return jedisPool.getResource();
        }
        throw new Exception("Jedispool is not ok");
    }
}

RedisCanalClientExample

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.canal.util.RedisUtils;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class RedisCanalClientExample
{
    public static final Integer _60SECONDS = 60;
    public static final String  REDIS_IP_ADDR = "192.168.111.185";
    private static void redisInsert(List<Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    private static void redisDelete(List<Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.del(columns.get(0).getValue());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    private static void redisUpdate(List<Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
                System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
  // 打印mysql的变更
    public static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                //获取变更的row数据
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
            }
            //获取变动类型
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else if (eventType == EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else {//EventType.UPDATE
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }
    public static void main(String[] args)
    {
        System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
        //=================================
        // 创建链接canal服务端
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,
                11111), "example", "", "");
        int batchSize = 1000;
        //空闲空转计数器
        int emptyCount = 0;
        System.out.println("---------------------canal init OK,开始监听mysql变化------");
        try {
            connector.connect();
            //connector.subscribe(".*\\..*");
            connector.subscribe("bigdata.t_user");
            connector.rollback();
            int totalEmptyCount = 10 * _60SECONDS;
            // 监听时间
            while (emptyCount < totalEmptyCount) {
                System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                } else {
                    //计数器重新置零
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
        } finally {
            connector.disconnect();
        }
    }
}


题外话


java程序下connector.subscribe 配置的过滤正则

关闭资源代码简写


try-with-resource释放资源

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
15小时前
|
存储 缓存 NoSQL
【技术分享】求取列表需求的redis缓存方案
【技术分享】求取列表需求的redis缓存方案
5 0
|
15小时前
|
canal 缓存 NoSQL
【后端面经】【缓存】33|缓存模式:缓存模式能不能解决缓存一致性问题?-03 Refresh Ahead + SingleFlight + 删除缓存 + 延迟双删
【5月更文挑战第11天】Refresh Ahead模式通过CDC异步刷新缓存,但面临缓存一致性问题,可借鉴Write Back策略解决。SingleFlight限制并发加载,减少数据库压力,适合热点数据。删除缓存模式在更新数据库后删除缓存,一致性问题源于读写线程冲突。延迟双删模式两次删除,理论上减少不一致,但可能降低缓存命中率。选用模式需权衡优劣,延迟双删在低并发下较优。装饰器模式可用于实现多种缓存模式,无侵入地增强现有缓存系统。
20 2
|
15小时前
|
缓存 数据库 NoSQL
【后端面经】【缓存】33|缓存模式:缓存模式能不能解决缓存一致性问题?-02 Write Through + Write Back
【5月更文挑战第10天】`Write Through`是一种缓存策略,写操作仅需写入缓存,缓存负责更新数据库。异步版本可能丢失数据,而同步变种先写数据库再异步刷新缓存,减少丢数据风险。`Write Back`模式数据先写入缓存,过期时才写入数据库,可能导致数据丢失,但若使用Redis并确保高可用,可部分解决一致性问题。在特定条件下,如使用SETNX命令,能缓解一致性挑战。
16 0
【后端面经】【缓存】33|缓存模式:缓存模式能不能解决缓存一致性问题?-02 Write Through + Write Back
|
15小时前
|
缓存 NoSQL 安全
Redis经典问题:缓存击穿
本文探讨了高并发系统中Redis缓存击穿的问题及其解决方案。缓存击穿指大量请求同一未缓存数据,导致数据库压力过大。为解决此问题,可以采取以下策略:1) 热点数据永不过期,启动时加载并定期异步刷新;2) 写操作加互斥锁,保证并发安全并设置查询失败返回默认值;3) 预期热点数据直接加缓存,系统启动时加载并设定合理过期时间;4) 手动操作热点数据上下线,通过界面控制缓存刷新。这些方法能有效增强系统稳定性和响应速度。
63 0
|
15小时前
|
消息中间件 缓存 中间件
中间件缓存一致性
【5月更文挑战第6天】中间件缓存一致性
12 1
中间件缓存一致性
|
15小时前
|
存储 缓存 数据库
【后端面经】【缓存】33|缓存模式:缓存模式能不能解决缓存一致性问题?
【5月更文挑战第9天】面试准备中,熟悉缓存模式如Cache Aside、Read Through、Write Through、Write Back、Singleflight,以及删除缓存和延迟双删策略,能解决缓存一致性、穿透、击穿和雪崩问题。在自我介绍时展示对缓存模式的理解,例如Cache Aside模式,它是基础模式,读写由业务控制,先写数据库以保证数据准确性,但无法解决所有一致性问题。Read Through模式在缓存未命中时自动从数据库加载数据,可异步加载优化响应时间,但也存在一致性挑战。
16 0
|
16小时前
|
缓存 NoSQL 应用服务中间件
Redis多级缓存
Redis多级缓存
9 0
|
16小时前
|
缓存 NoSQL 关系型数据库
Redis 缓存 一致性
Redis 缓存 一致性
8 0
|
15小时前
|
消息中间件 缓存 NoSQL
Redis经典问题:缓存雪崩
本文介绍了Redis缓存雪崩问题及其解决方案。缓存雪崩是指大量缓存同一时间失效,导致请求涌入数据库,可能造成系统崩溃。解决方法包括:1) 使用Redis主从复制和哨兵机制提高高可用性;2) 结合本地ehcache缓存和Hystrix限流降级策略;3) 设置随机过期时间避免同一时刻大量缓存失效;4) 使用缓存标记策略,在标记失效时更新数据缓存;5) 实施多级缓存策略,如一级缓存失效时由二级缓存更新;6) 通过第三方插件如RocketMQ自动更新缓存。这些策略有助于保障系统的稳定运行。
141 1
|
15小时前
|
存储 消息中间件 缓存
Redis缓存技术详解
【5月更文挑战第6天】Redis是一款高性能内存数据结构存储系统,常用于缓存、消息队列、分布式锁等场景。其特点包括速度快(全内存存储)、丰富数据类型、持久化、发布/订阅、主从复制和分布式锁。优化策略包括选择合适数据类型、设置过期时间、使用Pipeline、开启持久化、监控调优及使用集群。通过这些手段,Redis能为系统提供高效稳定的服务。