微服务原理篇(Canal-Redis)

简介: 本文介绍了ES索引同步的常见方案,重点讲解Canal+MQ数据同步机制。通过解析MySQL的binlog日志,Canal模拟slave伪装接入主库,实现增量数据捕获,并结合RabbitMQ保证消息顺序性地同步至Elasticsearch。同时探讨了缓存一致性问题,提出使用分布式锁(如Redis)控制并发写操作,避免双写不一致。还涵盖Redis持久化、集群模式、过期淘汰策略及缓存三剑客(穿透、雪崩、击穿)的解决方案,系统梳理了高并发场景下的数据同步与缓存保障技术体系。

学习目标

  1. 能够说出ES索引同步的常用方案
  2. 能够说出Canal+MQ数据同步的方案
  3. 能够说出Canal是怎么伪装成 MySQL slave
  4. 能够测试通过Canal+MQ数据同步流程
  5. 能够说出MySQL和Redis如何保证双写一致性
  6. 能够说出分布式锁Redis原生、Redisson应用场景
  7. 能够说出缓存三剑客问题和解决方案
  8. 能够说出Redis持久化两种方案
  9. 能够说出Redis集群三种模式、哨兵选举流程
  10. 能够说出Redis过期策略、淘汰策略

1 多数据源数据同步方案

1.1. 技术方案分析

1.1.1 同步方式

管理员在商城的后台维护商品信息,数据存储在MySQL。

用户在商城搜索商品信息,从Elasticsearch搜索商品信息。

如果Elasticsearch的索引数据与MySQL的商品数据不一致会导致什么问题?

用户搜索到的商品信息并非商品最新的信息,比如:价格不同,搜索到的商品价格与实际价格不同,商品下架但是用户仍然可以搜索到商品信息,这些问题都会严重影响用户的体验。

我们需要一种方案,当管理员修改商品信息后及时的修改商品索引信息,使MySQL中的商品数据与ES中的商品数据保持一致。

常见的索引数据同步方案有两种:同步方式和异步方式。

首先说同步方式

在修改商品信息的方法中加入操作Elasticsearch索引的代码,即在原有业务方式的基础上添加索引同步的代码,CRUD操作MySQL的同时CRUD操作ES索引。如下代码,是在添加商品信息的时候向ES索引添加文档。

public void insert(Item item){
    //向本地数据库Item表添加记录
    
    //向ES的Item索引添加文档
}

此方式会在很多业务方法中加入操作ES索引的代码,增加代码的复杂度不方便维护,扩展性差。

其次,上边的代码存在分布式事务,操作Item表会访问数据库,向索引添加文档会访问ES,使用数据库本地事务是无法控制整个方法的一致性的,比如:向ES写成功了由于网络超时导致异常,最终写数据库操作回滚了而写ES操作没有回滚,数据库的数据和ES中的索引不一致。

1.1.2 异步方式

异步方式是通过引入MQ实现,修改商品信息时向MQ发送修改的商品信息,然后监听MQ的程序请求ES向索引写入,流程如下:

此方案的好处:

  1. 商品服务不用直接访问ES,通过MQ将商品服务和ES解耦合。

缺点:

  1. 在商品的CRUD方法中仍然需要加入向MQ发送消息的代码,如下:
public void insert(Item item){
    //向Item表添加记录
    
    //向MQ发送添加商品消息
}

此方式仍然增加代码的复杂度不方便维护,扩展性差

这种方案不少公司是有采用的,下述Canal方案较重量级,大家自行取舍不以HM为准,以实际业务为准

有没有一种方法不用对商品的CRUD方法进行侵入,商品的CRUD方法就是对商品的增删改查,不会存在向ES同步数据相关的代码。

此时我们要借助一个神器就是Canal [kə'næl],先看下Canal在整个流程中的位置,如下图:

从图中可以看出,商品管理的CRUD方式仅仅包括对商品表的CRUD业务操作(下图红色框内部分),不再有操作MQ的相关逻辑。

Canal是和MySQL存在联系,并且Canal负责和MQ交互,这种方案就是借助了Canal和MQ实现的。

1.2 Canal+MQ数据同步

1.2.1. MySQL主从复制

要理解Canal的工作原理需要首先要知道MySQL主从数据同步的原理。

首先我们要知道,平时我们在学习时只用MySQL单机即可,但是生产环境中MySQL部署为主从集群模式,MySQL主从集群由MySQL主服务器(master)和MySQL从服务器(slave)组成,主数据库提供写服务,从数据库提供读服务,主从之间进行数据复制保证数据同步,如下图:

MySQL主从之间是如何同步的呢?

MySQL主从数据同步是一种数据库复制技术,进行写数据会先向主服务器写,写成功后通过binlog日志将数据同步到从数据库。

具体流程如下图:

1、主服务器将所有写操作(INSERT、UPDATE、DELETE)以二进制日志(binlog)的形式记录下来。

2、从服务器连接到主服务器,发送dump 协议,请求获取主服务器上的binlog日志。

MySQL的dump协议是MySQL复制协议中的一部分。

3、MySQL master 收到 dump 请求,开始推送 binary log 给 slave

4、从服务器解析binlog日志,根据日志内容更新从服务器的数据库,完成从服务器的数据保持与主服务器同步。

1.2.1.1 binlog

binlog日志是什么?

MySQL的binlog(二进制日志)是一种记录数据库服务器上所有修改数据的日志文件。它主要用于数据复制和数据恢复。binlog的主要作用是记录数据库的DDL(数据定义语言)操作和DML(数据操作语言)操作,以便在数据库发生故障时进行恢复。

binlog长什么样?

类似下边这样:

binlog的主要特点如下:

  1. 事务级别的记录:
  1. Binlog 以事务为单位记录数据更改,这意味着每个事务的开始和结束都会被记录下来。
  2. 这种记录方式有助于保证数据的一致性和事务的完整性。
  1. 支持多种格式:
  1. STATEMENT:记录每条 SQL 语句,适用于大多数情况,但有些 SQL 语句的结果依赖于会话状态,可能导致复制问题。
  2. ROW:记录每行数据的更改,精确度高,但会增加日志文件的大小。
  3. MIXED:默认模式,结合了 STATEMENT 和 ROW 的优点,大部分情况下采用 STATEMENT 模式,但在 STATEMENT 模式可能引起问题时自动切换到 ROW 模式。
  1. 非阻塞性:
  1. Binlog 的写入操作是非阻塞的,即写入 Binlog 不会阻塞客户端的事务提交
  2. 这意味着应用程序可以在无需等待日志写入完成的情况下继续运行,提高了性能。
  1. 数据恢复:
  1. Binlog 可以用于数据恢复,允许恢复到特定的时间点或事务。
  2. 这对于灾难恢复非常重要,可以减少数据丢失的风险。
  1. 主从复制:
  1. Binlog 是 MySQL 主从复制的基础。
  2. 通过从主服务器读取并重放 Binlog,从服务器可以保持与主服务器相同的数据状态。

在 MySQL 中启用 Binlog 需要在配置文件 (my.cnfmy.ini) 中进行设置。

一些关键的配置选项包括:

  • server-id:用于标识服务器的唯一 ID,这对于多服务器环境非常重要。
  • log_bin:指定是否启用 Binlog 以及 Binlog 文件的保存位置。
  • binlog_format:定义 Binlog 的格式,如 STATEMENT, ROW 或 MIXED。
  • expire_logs_days:定义 Binlog 文件保留的时间,超过这个时间的文件会被自动删除。
  • max_binlog_size:单个 Binlog 文件的最大大小,达到这个大小后会自动创建新的文件。

举例:

注意事项:

  • Binlog 文件会占用磁盘空间,因此需要定期清理不再需要的旧文件。
  • 使用 Binlog 进行数据恢复或复制时,要确保所有相关服务器的时间同步,否则可能会出现问题。

binlog常用命令:查看是否开启binlog日志

show variables like 'log_bin';

使用以下命令查看所有binlog日志列表:

SHOW MASTER LOGS;

要查看MySQL服务器上的binlog状态,可以使用以下命令:

SHOW MASTER STATUS;

要查看所有的binlog文件列表,可以使用以下命令:

SHOW BINARY LOGS;

查看binlog日志保存路径

SHOW VARIABLES LIKE 'datadir';

刷新log日志,立刻产生一个新编号的binlog日志文件,跟重启一个效果,可以执行以下命令:

FLUSH LOGS;

清空所有binlog日志,可以执行以下命令:

RESET MASTER;

1.2.2. Canal+MQ同步流程

Canal是什么呢?

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,对数据进行同步,如下图:

Canal可与很多数据源进行对接,将数据由MySQL同步到ES、MQ、DB等各个数据源。

Canal的意思是水道/管道/沟渠,它相当于一个数据管道,通过解析MySQL的binlog日志完成数据同步工作。

官方文档:https://github.com/alibaba/canal/wiki

Canal数据同步的工作流程如下:

1、Canal模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

MySQL的dump协议是MySQL复制协议中的一部分。

2、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

。一旦连接建立成功(长连接),Canal会一直等待并监听来自MySQL主服务器的binlog事件流,当有新的数据库变更发生时MySQL master主服务器发送binlog事件流给Canal。

3、Canal会及时接收并解析这些变更事件并解析 binary log

理解了Canal的工作原理下边再看数据同步流程:

  1. 首先创建一张专门用于向ES同步商品信息的表item_sync,item_sync表的字段内容可能包含item表的字段,一定覆盖所有索引字段。

方法:复制item表到item_sync表。

这里为什么要单独创建一张同步表呢?

因为同步表的字段和索引是对应的,方便进行同步。

  1. 商品服务在对商品进行CRUD时向Item表写数据并且向item_sync写入数据,并产生binlog。
  2. Canal请求MySQL读取binlog,并解析出item_sync表的数据更新日志,并发送至MQ的数据同步队列。
  3. 异步同步程序监听MQ的数据同步队列,收到消息后解析出item_sync表的更新日志。
  4. 异步同步程序根据item_sync表的更新日志请求Elasticsearch添加、更新、删除索引文档。

最终实现了将MySQL中的Item表的数据同步至Elasticsearch

1.2.3. 配置数据同步环境

本节实现将MySQL的变更数据通过Canal写入MQ。

根据Canal+MQ同步流程,进行如下配置:

  1. 配置Mysql主从同步,开启MySQL主服务器的binlog
  2. 安装Canal并配置,保证Canal连接MySQL主服务器成功
  3. 安装RabbitMQ,并配置同步队列。
  4. 在Canal中配置RabbitMQ的连接信息,保证Canal收到binlog消息写入MQ

对于异步程序监听MQ通过Java程序中实现。以上四步配置详细参考“配置搜索及数据同步环境”。

1.2.4. 同步程序

前边我们实现了Canal读取binlog日志并向MQ发送消息的整个流程,下边我们需要编写同步程序监听MQ,解析出更改的数据更新ES索引数据。

在search-service工程添加依赖:

<properties>
  <canal.version>1.1.5</canal.version>
</properties>
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>${canal.version}</version>
</dependency>
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.protocol</artifactId>
  <version>${canal.version}</version>
</dependency>

从课程资料中拷贝"es/canal"目录到search-service工程的com.hmall.search包下。

阅读AbstractCanalRabbitMqMsgListener类parseMsg(Message message) 方法,理解同步程序的执行思路。

parseMsg(Message message) 方法实现了解析canal发送给mq的消息,并调用batchHandle或singleHandle处理数据,在这两个方法中会调用抽象方法void batchSave(List<T> data)和void batchDelete(List<Long> ids)去向数据库保存数据、删除数据。

public void parseMsg(Message message) throws Exception {
    try {
        // 1.数据格式转换
        CanalMqInfo canalMqInfo = JSONUtil.toBean(new String(message.getBody()), CanalMqInfo.class);
        // 2.过滤数据,没有数据或者非插入、修改、删除的操作均不处理
        if (CollUtils.isEmpty(canalMqInfo.getData()) || !(OperateType.canHandle(canalMqInfo.getType()))) {
            return;
        }
        if (canalMqInfo.getData().size() > 1) {
            // 3.多条数据处理
            batchHandle(canalMqInfo);
        } else {
            // 4.单条数据处理
            singleHandle(canalMqInfo);
        }
    } catch (Exception e) {
        //出现错误延迟1秒重试
        Thread.sleep(1000);
        throw new RuntimeException(e);
    }
}

如果我们要实现商品信息同步就需要编写商品信息同步类,同步程序做两件事:

  1. 同步类需要监听MQ,接收canal发送给mq的消息
  2. 同步程序需要继承AbstractCanalRabbitMqMsgListener类,并重写void batchSave(List<T> data)和void batchDelete(List<Long> ids)这两个方法,这样就实现了将canal发送的商品信息保存或删除ES中对应的数据。

代码如下:下边的代码能读懂会用即可

package com.hmall.search.canal.listeners;
import cn.hutool.core.bean.BeanUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import com.hmall.search.domain.po.ItemDoc;
import com.hmall.search.domain.po.ItemSync;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Component
public class ItemCanalDataSyncHandler extends AbstractCanalRabbitMqMsgListener<ItemSync> {
    @Resource
    private ElasticsearchClient esClient;
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "canal-mq-hmall-item"),
        exchange = @Exchange(name = "exchange.canal-hmall", type = ExchangeTypes.TOPIC),
        key = "canal-mq-hmall-item"),
                    concurrency = "1"
                   )
    public void onMessage(Message message) throws Exception {
        parseMsg(message);
    }
    @Override
    public void batchSave(List<ItemSync> data) {
        BulkRequest.Builder br = new BulkRequest.Builder();
        for (ItemSync itemSync : data) {
            br.operations(op -> op
                          .index(idx -> idx
                                 .index("items")
                                 .id(itemSync.getId().toString())
                                 .document(itemSync)
                                )
                         );
        }
        BulkResponse result = null;
        try {
            result = esClient.bulk(br.build());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        Boolean aBoolean = result.errors();
        if(aBoolean) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            throw new RuntimeException("同步失败");
        }
    }
    @Override
    public void batchDelete(List<Long> ids) {
        List<String> idList = ids.stream().map(id -> id.toString()).collect(Collectors.toList());
        DeleteByQueryResponse response = null;
        try {
            response = esClient.deleteByQuery(dq -> dq
                                              .query(t -> t.ids(t1 -> t1.values(idList)))
                                              .index("items"));
            boolean hasFailures = response.failures().size() > 0;
            if(hasFailures) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                throw new RuntimeException("同步失败");
            }
        } catch (IOException e) {
            throw new RuntimeException("同步失败");
        }
    }
}

接下来测试:

  • 手动修改Item_sync表的数据,断点跟踪onMessage(Message message)方法,当插入、修改数据时执行踪onMessage(Message message)方法,当删除数据时执行batchDelete(List<Long> ids)。
  • 手动对Item_sync表增、删、改,观察ES中item索引的数据是否正常增、删、改。

1.2.5. 保证消息的顺序性

如何保证Canal+MQ同步消息的顺序性?场景如下图:

首先明确Canal解析binlog日志信息按顺序发到MQ的队列中,现在是要保证消费端如何按顺序消费队列中的消息。生产中同一个服务会启动多个jvm进程,每个进程作为同一个队列的消费者,如下图:

现在对商品价格先修改为100再修改为200,在MQ中的有两个消息:

修改价格为100

修改价格为200

预期:最终将价格修改为200

此时两条消息会被分发给两个jvm进程,假设“修改价格为100”的消息发给jvm进程1,“修改价格为200”的消息发给jvm进程2,两个进程分别去消费,此时无法控制两个消息的先后顺序,可能导致价格最终并非修改200。

解决方法:

多个jvm进程监听同一个队列保证只有一个消费者活跃,即只有一个消费者接收消息

消费队列中的数据使用单线程。

如何保证只有一个消费者接收消息?

队列需要增加x-single-active-consumer参数,表示否启用单一活动消费者模式。

配置完成查保证队列上存在SAC标识,如下图:

当有多个jvm进程都去监听该队列时,只有一个为活跃状态

如果使用x-single-active-consumer参数需要修改为如下代码:

在Queue中添加:arguments={@Argument(name="x-single-active-consumer", value = "true", type = "java.lang.Boolean") }

如下所示:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "canal-mq-hmall-item",arguments={@Argument(name="x-single-active-consumer", value = "true", type = "java.lang.Boolean") }),
    exchange = @Exchange(name = "exchange.canal-hmall", type = ExchangeTypes.TOPIC),
    key = "canal-mq-hmall-item"),
                concurrency = "1"
               )
public void onMessage(Message message) throws Exception {
    parseMsg(message);
}

concurrency=”1“表示 指定消费线程为1。

1.2.6 面试题

Canal是怎么伪装成 MySQL slave?

Canal数据同步异常了怎么处理?

项目中如何进行索引同步的?

如何保证Canal+MQ同步消息的顺序性?

2 缓存常见问题

2.1 面试题

你的项目是怎么保证缓存一致性的?或 说一下双写不一致的解决方案。

考察项目中对缓存的应用,对缓存一致性方案的理解。

分布式锁你们用什么实现的?

考察对分布式锁的理解和应用。

说一下缓存穿透、缓存雪崩、缓存击穿?

考察高并发下对缓存的常见问题及解决方案的理解。

2.2. 缓存应用入门

2.2.1 缓存概念

缓存(Cache)是一种用于提高数据访问速度的技术,它通过暂时存储频繁访问或计算成本较高的数据,使得后续的访问可以直接从缓存中获取数据,而不需要重新计算或从较慢的数据源(如数据库或磁盘)中读取数据。它可以显著提升应用程序的性能和响应速度。

缓存通常位于应用程序和底层数据存储之间,充当一个快速访问的中间层。

如下图所示:

我们的业务数据通常存储在数据库中,用户请求应用程序接口,应用程序查询数据库响应结果。

当访问量增大为了减轻对数据库的访问,提高响应速度,会在应用程序和数据库之间增加缓存层,用户请求应用程序接口,应用程序先查询缓存,如果查询到数据则直接响应用户,如果未在缓存命中则查询数据库并响应用户,并且将数据库查询到的数据存储到缓存中。

缓存可以存在于多个层次,从CPU缓存、内存缓存到分布式缓存系统。

上边我们用redis作为缓存层,redis可独立部署,这种称为分布式缓存

我们也可以在应用程序内存中增加缓存结构,这种称为内存缓存,简单理解就是缓存存在于应用程序的内存中,如下图:在应用程序内存中创建一个HashMap对象,应用程序接口先从HashMap中查询数据,如果查询到则直接响应给用户,如果查询不到则查询数据库响应给用户并且从数据库查询到的数据存储到HashMap中。

关于内存缓存的例子还有很多,比如我们使用的MyBatis的一级缓存、二级缓存默认就是使用内存缓存实现。

MyBatis的一级缓存

  • 当同一个 SqlSession 中执行相同的 SQL(查询语句+条件一模一样才可以) 查询时,第一次执行查询的结果会被缓存起来,如果后续的查询条件相同,则直接从缓存中获取结果,而不再执行 SQL 语句。
  • 一级缓存在 SqlSession 关闭或提交后就会被清空。
  • 一级缓存就是SqlSession级别的缓存。

MyBatis的二级缓存

  • 二级缓存是在命名空间级别上的缓存,它作用于多个 SqlSession 之间。
  • 与一级缓存相比,二级缓存可以跨越多个 SqlSession。
  • 要启用二级缓存,需要在 Mapper 的 XML 文件的根节点 <mapper> 上添加 cache 元素,或者自定义一个实现 Cache 接口的类。

关于MyBatis一级缓存和二级缓存的内容请大家课下自行复习。

2.2.2 缓存案例

2.2.2.1 需求

下图是购物车的界面,购物车中的商品信息来源于商品服务,为了提高访问速度减少对数据库的访问,购物车请求商品服务查询商品信息,商品服务的商品查询接口正是使用缓存去实现。

商品服务的商品信息查询接口使用缓存实现流程,如下图:

2.2.2.2 集成 redis

下边在商品服务中使用Redis在商品查询接口中使用缓存。

  1. 首先在商品服务添加redis依赖
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  1. 配置RedisConfig.java

从课程资料的“redis”目录拷贝RedisConfig.java到商品服务的com.hmall.item.config包下。

在RedisConfig.java中定义了RedisTemplate,使用RedisTemplate去操作Redis。

记住:

常用的有Jedis和Lettuce两个访问redis的客户端库,其中Lettuce的性能和并发性要好一些,Spring Boot 默认使用的是 Lettuce 作为 Redis 的客户端。

本项目集成了Spring data redis框架,在项目中可以通过RedisTemplate访问Redis,RedisTemplate提供了方便访问redis的模板方法。

RedisTemplate和Lettuce 是什么关系?

RedisTemplate 进行 Redis 操作时,实际上是通过 Lettuce 客户端与 Redis 服务器进行通信。

  1. 配置redis地址

在application.yaml中配置

spring:
  redis:
    host: 192.168.101.68
    port: 6379
    password: redis

2.2.2.3 编写缓存方法

在IItemService.java类中增加缓存方法

@Override
public List<ItemDTO> queryItemByIdsCache(Collection<Long> ids) {
    //定义空列表
    List<ItemDTO> itemDTOList = new ArrayList<>();
    ids.stream().forEach(id->{
        //先查询缓存
        Item itemCache = (Item) redisTemplate.opsForValue().get("hmall:item:" + id);
        //如果缓存有则转为ItemDto
        if(Objects.nonNull(itemCache)){
            ItemDTO itemDTO = BeanUtils.copyBean(itemCache, ItemDTO.class);
            itemDTOList.add(itemDTO);
        }else{
            //查询数据库
            Item itemDB = getById(id);
            if(Objects.nonNull(itemDB)){
                //将数据库查询到的数据放入缓存
                redisTemplate.opsForValue().set("hmall:item:" + id,itemDB);
                //转为ItemDto
                ItemDTO itemDTO = BeanUtils.copyBean(itemDB, ItemDTO.class);
                itemDTOList.add(itemDTO);
            }
        }
    });
    return itemDTOList;
}

修改“根据id批量查询商品” 接口,调用queryItemByIdsCache 方法查询商品信息

2.2.2.4 测试

通过swagger文档进行测试,请求查询商品信息

查询成功观察redis是否存储了商品信息

再次请求查询观察代码是否还查询数据库。

2.3 缓存一致性问题

2.3.1 问题描述

缓存一致性是指缓存和数据库两者的数据保持一致。

如果管理员在后台修改了商品信息保存在数据库,用户在购物车从缓存查询商品信息,数据库和缓存两者的数据如果不能保持一致将影响用户的体验性,比如:商品调价,原价是90元,调为100元,如果用户在购物车仍然查询到调价前的价格等用户去支付时却是调价后的价格,这个体验是非常差的。如下图:

为什么会出现缓存不一致呢?

我们在修改商品信息时去修改缓存信息不就保持一致了吗,如下代码:

public void updateItem(ItemDTO item){
//修改数据库
//更新缓存
}

这样的代码仍然会存在缓存不一致的问题。下边我们分析造成缓存不一致的原因:

造成缓存不一致的原因可能是在写数据库和写缓存两步存在异常,也可能是并发所导致。

写数据库和写缓存导致不一致称为双写不一致,比如:先更新数据库成功了,更新缓存时失败了,最终导致不一致。并发导致缓存不一致举例如下:

执行流程:

  • 线程1先写入数据库X,当去写入缓存X时网络卡顿
  • 线程2先写入数据库Y
  • 线程2再写入缓存Y
  • 线程1 写入缓存旧值X覆盖了新值Y

即使先写入缓存再写数据在并发环境也可能存在问题,如下图:

流程:

  • 线程1先写入缓存X,当去写入数据库X时网络卡顿
  • 线程2先写入缓存Y
  • 线程2再写入数据库Y
  • 线程1 写入数据库旧值X覆盖了新值Y

2.3.2 解决方案

如何解决并发环境下双写不一致的问题?

出现并发环境下双写不一致的主要原因就是多线程并发导致,只要把并行操作改为串行操作即可解决。

2.3.2.1 使用分布式锁

什么是分布式锁?

syncronized是jvm内存锁,要执行syncronized代码块需要先获取锁,获取锁的线程执行完成并释放锁后其它线程才可以获取锁,同步代码块的代码是串行执行。

这里是用synchronzed引出分布式锁,synchronzed本身只是单JVM的锁,并非分布式锁

syncronized{
    //同步代码块
}

为什么会有分布式锁,为什么不使用syncronized内存锁?

如果是同一个进程内的线程去争抢资源可以用syncronized内存锁,因为都是同一个jvm中的线程去争抢同一个锁。

如果是多个进程的线程去争抢一个资源此时用syncronized是无法控制的,此时就需要用分布式锁,也就是独立于jvm进程部署一个分布式锁服务。

因为我们的微服务实例都会部署多个jvm进程,至少是两个进程去保证高可用,所以不同的进程去修改同一份数据就相当于去争抢同一个资源,此时就需要用分布式锁。

所以,分布式锁区别于内存锁,分布式锁就是由第三方软件单独提供获取锁释放锁的服务,独立部署,应用程序通过网络接口请求分布式锁服务获取锁释放锁,比如:Redis可以作为分布式锁服务,如下图:

上图中SETNX 是 Redis 中的一个命令,它的全称是 "SET if not exists",即只有当键不存在时才设置键值对成功。

多线程执行SETNX命令,同时只会有一个线程执行SETNX成功,执行成功的线程表示获取锁成功。

如下图:

当mykey不存在时执行SETNX mykey 1 返回1表示执行成功,执行成功的线程表示获取锁成功。

获取锁的线程执行业务逻辑完成后删除mykey表示释放锁,删除mykey后其它线程再次执行SETNX才会成功。

测试(自行测试):

开两个ssh窗口,并用redis-cli程序连接上redis

docker exec -it redis redis-cli

虚拟机中redis的密码为:redis,通过下边的命令进行认证:

auth redis

同时向两个窗口发送:setnx mykey 1命令

发现只有一个窗口执行成功,这说明只会有一个线程执行SETNX成功

相关文章
|
12天前
|
数据采集 人工智能 安全
|
7天前
|
机器学习/深度学习 人工智能 前端开发
构建AI智能体:七十、小树成林,聚沙成塔:随机森林与大模型的协同进化
随机森林是一种基于决策树的集成学习算法,通过构建多棵决策树并结合它们的预测结果来提高准确性和稳定性。其核心思想包括两个随机性:Bootstrap采样(每棵树使用不同的训练子集)和特征随机选择(每棵树分裂时只考虑部分特征)。这种方法能有效处理大规模高维数据,避免过拟合,并评估特征重要性。随机森林的超参数如树的数量、最大深度等可通过网格搜索优化。该算法兼具强大预测能力和工程化优势,是机器学习中的常用基础模型。
344 164
|
6天前
|
机器学习/深度学习 自然语言处理 机器人
阿里云百炼大模型赋能|打造企业级电话智能体与智能呼叫中心完整方案
畅信达基于阿里云百炼大模型推出MVB2000V5智能呼叫中心方案,融合LLM与MRCP+WebSocket技术,实现语音识别率超95%、低延迟交互。通过电话智能体与座席助手协同,自动化处理80%咨询,降本增效显著,适配金融、电商、医疗等多行业场景。
345 155
|
7天前
|
编解码 人工智能 自然语言处理
⚽阿里云百炼通义万相 2.6 视频生成玩法手册
通义万相Wan 2.6是全球首个支持角色扮演的AI视频生成模型,可基于参考视频形象与音色生成多角色合拍、多镜头叙事的15秒长视频,实现声画同步、智能分镜,适用于影视创作、营销展示等场景。
581 4
|
15天前
|
SQL 自然语言处理 调度
Agent Skills 的一次工程实践
**本文采用 Agent Skills 实现整体智能体**,开发框架采用 AgentScope,模型使用 **qwen3-max**。Agent Skills 是 Anthropic 新推出的一种有别于mcp server的一种开发方式,用于为 AI **引入可共享的专业技能**。经验封装到**可发现、可复用的能力单元**中,每个技能以文件夹形式存在,包含特定任务的指导性说明(SKILL.md 文件)、脚本代码和资源等 。大模型可以根据需要动态加载这些技能,从而扩展自身的功能。目前不少国内外的一些框架也开始支持此种的开发方式,详细介绍如下。
1018 7