学习目标
- 能够说出ES索引同步的常用方案
- 能够说出Canal+MQ数据同步的方案
- 能够说出Canal是怎么伪装成 MySQL slave
- 能够测试通过Canal+MQ数据同步流程
- 能够说出MySQL和Redis如何保证双写一致性
- 能够说出分布式锁Redis原生、Redisson应用场景
- 能够说出缓存三剑客问题和解决方案
- 能够说出Redis持久化两种方案
- 能够说出Redis集群三种模式、哨兵选举流程
- 能够说出Redis过期策略、淘汰策略
1 多数据源数据同步方案
1.1. 技术方案分析
1.1.1 同步方式
管理员在商城的后台维护商品信息,数据存储在MySQL。
用户在商城搜索商品信息,从Elasticsearch搜索商品信息。
如果Elasticsearch的索引数据与MySQL的商品数据不一致会导致什么问题?
用户搜索到的商品信息并非商品最新的信息,比如:价格不同,搜索到的商品价格与实际价格不同,商品下架但是用户仍然可以搜索到商品信息,这些问题都会严重影响用户的体验。
我们需要一种方案,当管理员修改商品信息后及时的修改商品索引信息,使MySQL中的商品数据与ES中的商品数据保持一致。
常见的索引数据同步方案有两种:同步方式和异步方式。
首先说同步方式
在修改商品信息的方法中加入操作Elasticsearch索引的代码,即在原有业务方式的基础上添加索引同步的代码,CRUD操作MySQL的同时CRUD操作ES索引。如下代码,是在添加商品信息的时候向ES索引添加文档。
public void insert(Item item){ //向本地数据库Item表添加记录 //向ES的Item索引添加文档 }
此方式会在很多业务方法中加入操作ES索引的代码,增加代码的复杂度不方便维护,扩展性差。
其次,上边的代码存在分布式事务,操作Item表会访问数据库,向索引添加文档会访问ES,使用数据库本地事务是无法控制整个方法的一致性的,比如:向ES写成功了由于网络超时导致异常,最终写数据库操作回滚了而写ES操作没有回滚,数据库的数据和ES中的索引不一致。
1.1.2 异步方式
异步方式是通过引入MQ实现,修改商品信息时向MQ发送修改的商品信息,然后监听MQ的程序请求ES向索引写入,流程如下:
此方案的好处:
- 商品服务不用直接访问ES,通过MQ将商品服务和ES解耦合。
缺点:
- 在商品的CRUD方法中仍然需要加入向MQ发送消息的代码,如下:
public void insert(Item item){ //向Item表添加记录 //向MQ发送添加商品消息 }
此方式仍然增加代码的复杂度不方便维护,扩展性差
这种方案不少公司是有采用的,下述Canal方案较重量级,大家自行取舍不以HM为准,以实际业务为准
有没有一种方法不用对商品的CRUD方法进行侵入,商品的CRUD方法就是对商品的增删改查,不会存在向ES同步数据相关的代码。
此时我们要借助一个神器就是Canal [kə'næl],先看下Canal在整个流程中的位置,如下图:
从图中可以看出,商品管理的CRUD方式仅仅包括对商品表的CRUD业务操作(下图红色框内部分),不再有操作MQ的相关逻辑。
Canal是和MySQL存在联系,并且Canal负责和MQ交互,这种方案就是借助了Canal和MQ实现的。
1.2 Canal+MQ数据同步
1.2.1. MySQL主从复制
要理解Canal的工作原理需要首先要知道MySQL主从数据同步的原理。
首先我们要知道,平时我们在学习时只用MySQL单机即可,但是生产环境中MySQL部署为主从集群模式,MySQL主从集群由MySQL主服务器(master)和MySQL从服务器(slave)组成,主数据库提供写服务,从数据库提供读服务,主从之间进行数据复制保证数据同步,如下图:
MySQL主从之间是如何同步的呢?
MySQL主从数据同步是一种数据库复制技术,进行写数据会先向主服务器写,写成功后通过binlog日志将数据同步到从数据库。
具体流程如下图:
1、主服务器将所有写操作(INSERT、UPDATE、DELETE)以二进制日志(binlog)的形式记录下来。
2、从服务器连接到主服务器,发送dump 协议,请求获取主服务器上的binlog日志。
MySQL的dump协议是MySQL复制协议中的一部分。
3、MySQL master 收到 dump 请求,开始推送 binary log 给 slave
4、从服务器解析binlog日志,根据日志内容更新从服务器的数据库,完成从服务器的数据保持与主服务器同步。
1.2.1.1 binlog
binlog日志是什么?
MySQL的binlog(二进制日志)是一种记录数据库服务器上所有修改数据的日志文件。它主要用于数据复制和数据恢复。binlog的主要作用是记录数据库的DDL(数据定义语言)操作和DML(数据操作语言)操作,以便在数据库发生故障时进行恢复。
binlog长什么样?
类似下边这样:
binlog的主要特点如下:
- 事务级别的记录:
- Binlog 以事务为单位记录数据更改,这意味着每个事务的开始和结束都会被记录下来。
- 这种记录方式有助于保证数据的一致性和事务的完整性。
- 支持多种格式:
- STATEMENT:记录每条 SQL 语句,适用于大多数情况,但有些 SQL 语句的结果依赖于会话状态,可能导致复制问题。
- ROW:记录每行数据的更改,精确度高,但会增加日志文件的大小。
- MIXED:默认模式,结合了 STATEMENT 和 ROW 的优点,大部分情况下采用 STATEMENT 模式,但在 STATEMENT 模式可能引起问题时自动切换到 ROW 模式。
- 非阻塞性:
- Binlog 的写入操作是非阻塞的,即写入 Binlog 不会阻塞客户端的事务提交。
- 这意味着应用程序可以在无需等待日志写入完成的情况下继续运行,提高了性能。
- 数据恢复:
- Binlog 可以用于数据恢复,允许恢复到特定的时间点或事务。
- 这对于灾难恢复非常重要,可以减少数据丢失的风险。
- 主从复制:
- Binlog 是 MySQL 主从复制的基础。
- 通过从主服务器读取并重放 Binlog,从服务器可以保持与主服务器相同的数据状态。
在 MySQL 中启用 Binlog 需要在配置文件 (my.cnf 或 my.ini) 中进行设置。
一些关键的配置选项包括:
server-id:用于标识服务器的唯一 ID,这对于多服务器环境非常重要。log_bin:指定是否启用 Binlog 以及 Binlog 文件的保存位置。binlog_format:定义 Binlog 的格式,如 STATEMENT, ROW 或 MIXED。expire_logs_days:定义 Binlog 文件保留的时间,超过这个时间的文件会被自动删除。max_binlog_size:单个 Binlog 文件的最大大小,达到这个大小后会自动创建新的文件。
举例:
注意事项:
- Binlog 文件会占用磁盘空间,因此需要定期清理不再需要的旧文件。
- 使用 Binlog 进行数据恢复或复制时,要确保所有相关服务器的时间同步,否则可能会出现问题。
binlog常用命令:查看是否开启binlog日志
show variables like 'log_bin';
使用以下命令查看所有binlog日志列表:
SHOW MASTER LOGS;
要查看MySQL服务器上的binlog状态,可以使用以下命令:
SHOW MASTER STATUS;
要查看所有的binlog文件列表,可以使用以下命令:
SHOW BINARY LOGS;
查看binlog日志保存路径
SHOW VARIABLES LIKE 'datadir';
刷新log日志,立刻产生一个新编号的binlog日志文件,跟重启一个效果,可以执行以下命令:
FLUSH LOGS;
清空所有binlog日志,可以执行以下命令:
RESET MASTER;
1.2.2. Canal+MQ同步流程
Canal是什么呢?
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,对数据进行同步,如下图:
Canal可与很多数据源进行对接,将数据由MySQL同步到ES、MQ、DB等各个数据源。
Canal的意思是水道/管道/沟渠,它相当于一个数据管道,通过解析MySQL的binlog日志完成数据同步工作。
官方文档:https://github.com/alibaba/canal/wiki
Canal数据同步的工作流程如下:
1、Canal模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL的dump协议是MySQL复制协议中的一部分。
2、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
。一旦连接建立成功(长连接),Canal会一直等待并监听来自MySQL主服务器的binlog事件流,当有新的数据库变更发生时MySQL master主服务器发送binlog事件流给Canal。
3、Canal会及时接收并解析这些变更事件并解析 binary log
理解了Canal的工作原理下边再看数据同步流程:
- 首先创建一张专门用于向ES同步商品信息的表item_sync,item_sync表的字段内容可能包含item表的字段,一定覆盖所有索引字段。
方法:复制item表到item_sync表。
这里为什么要单独创建一张同步表呢?
因为同步表的字段和索引是对应的,方便进行同步。
- 商品服务在对商品进行CRUD时向Item表写数据并且向item_sync写入数据,并产生binlog。
- Canal请求MySQL读取binlog,并解析出item_sync表的数据更新日志,并发送至MQ的数据同步队列。
- 异步同步程序监听MQ的数据同步队列,收到消息后解析出item_sync表的更新日志。
- 异步同步程序根据item_sync表的更新日志请求Elasticsearch添加、更新、删除索引文档。
最终实现了将MySQL中的Item表的数据同步至Elasticsearch
1.2.3. 配置数据同步环境
本节实现将MySQL的变更数据通过Canal写入MQ。
根据Canal+MQ同步流程,进行如下配置:
- 配置Mysql主从同步,开启MySQL主服务器的binlog
- 安装Canal并配置,保证Canal连接MySQL主服务器成功
- 安装RabbitMQ,并配置同步队列。
- 在Canal中配置RabbitMQ的连接信息,保证Canal收到binlog消息写入MQ
对于异步程序监听MQ通过Java程序中实现。以上四步配置详细参考“配置搜索及数据同步环境”。
1.2.4. 同步程序
前边我们实现了Canal读取binlog日志并向MQ发送消息的整个流程,下边我们需要编写同步程序监听MQ,解析出更改的数据更新ES索引数据。
在search-service工程添加依赖:
<properties> <canal.version>1.1.5</canal.version> </properties> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>${canal.version}</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>${canal.version}</version> </dependency>
从课程资料中拷贝"es/canal"目录到search-service工程的com.hmall.search包下。
阅读AbstractCanalRabbitMqMsgListener类parseMsg(Message message) 方法,理解同步程序的执行思路。
parseMsg(Message message) 方法实现了解析canal发送给mq的消息,并调用batchHandle或singleHandle处理数据,在这两个方法中会调用抽象方法void batchSave(List<T> data)和void batchDelete(List<Long> ids)去向数据库保存数据、删除数据。
public void parseMsg(Message message) throws Exception { try { // 1.数据格式转换 CanalMqInfo canalMqInfo = JSONUtil.toBean(new String(message.getBody()), CanalMqInfo.class); // 2.过滤数据,没有数据或者非插入、修改、删除的操作均不处理 if (CollUtils.isEmpty(canalMqInfo.getData()) || !(OperateType.canHandle(canalMqInfo.getType()))) { return; } if (canalMqInfo.getData().size() > 1) { // 3.多条数据处理 batchHandle(canalMqInfo); } else { // 4.单条数据处理 singleHandle(canalMqInfo); } } catch (Exception e) { //出现错误延迟1秒重试 Thread.sleep(1000); throw new RuntimeException(e); } }
如果我们要实现商品信息同步就需要编写商品信息同步类,同步程序做两件事:
- 同步类需要监听MQ,接收canal发送给mq的消息
- 同步程序需要继承AbstractCanalRabbitMqMsgListener类,并重写void batchSave(List<T> data)和void batchDelete(List<Long> ids)这两个方法,这样就实现了将canal发送的商品信息保存或删除ES中对应的数据。
代码如下:下边的代码能读懂会用即可。
package com.hmall.search.canal.listeners; import cn.hutool.core.bean.BeanUtil; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse; import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import com.hmall.search.domain.po.ItemDoc; import com.hmall.search.domain.po.ItemSync; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @Component public class ItemCanalDataSyncHandler extends AbstractCanalRabbitMqMsgListener<ItemSync> { @Resource private ElasticsearchClient esClient; @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "canal-mq-hmall-item"), exchange = @Exchange(name = "exchange.canal-hmall", type = ExchangeTypes.TOPIC), key = "canal-mq-hmall-item"), concurrency = "1" ) public void onMessage(Message message) throws Exception { parseMsg(message); } @Override public void batchSave(List<ItemSync> data) { BulkRequest.Builder br = new BulkRequest.Builder(); for (ItemSync itemSync : data) { br.operations(op -> op .index(idx -> idx .index("items") .id(itemSync.getId().toString()) .document(itemSync) ) ); } BulkResponse result = null; try { result = esClient.bulk(br.build()); } catch (IOException e) { throw new RuntimeException(e); } Boolean aBoolean = result.errors(); if(aBoolean) { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } throw new RuntimeException("同步失败"); } } @Override public void batchDelete(List<Long> ids) { List<String> idList = ids.stream().map(id -> id.toString()).collect(Collectors.toList()); DeleteByQueryResponse response = null; try { response = esClient.deleteByQuery(dq -> dq .query(t -> t.ids(t1 -> t1.values(idList))) .index("items")); boolean hasFailures = response.failures().size() > 0; if(hasFailures) { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } throw new RuntimeException("同步失败"); } } catch (IOException e) { throw new RuntimeException("同步失败"); } } }
接下来测试:
- 手动修改Item_sync表的数据,断点跟踪onMessage(Message message)方法,当插入、修改数据时执行踪onMessage(Message message)方法,当删除数据时执行batchDelete(List<Long> ids)。
- 手动对Item_sync表增、删、改,观察ES中item索引的数据是否正常增、删、改。
1.2.5. 保证消息的顺序性
如何保证Canal+MQ同步消息的顺序性?场景如下图:
首先明确Canal解析binlog日志信息按顺序发到MQ的队列中,现在是要保证消费端如何按顺序消费队列中的消息。生产中同一个服务会启动多个jvm进程,每个进程作为同一个队列的消费者,如下图:
现在对商品价格先修改为100再修改为200,在MQ中的有两个消息:
修改价格为100
修改价格为200
预期:最终将价格修改为200
此时两条消息会被分发给两个jvm进程,假设“修改价格为100”的消息发给jvm进程1,“修改价格为200”的消息发给jvm进程2,两个进程分别去消费,此时无法控制两个消息的先后顺序,可能导致价格最终并非修改200。
解决方法:
多个jvm进程监听同一个队列保证只有一个消费者活跃,即只有一个消费者接收消息。
消费队列中的数据使用单线程。
如何保证只有一个消费者接收消息?
队列需要增加x-single-active-consumer参数,表示否启用单一活动消费者模式。
配置完成查保证队列上存在SAC标识,如下图:
当有多个jvm进程都去监听该队列时,只有一个为活跃状态
如果使用x-single-active-consumer参数需要修改为如下代码:
在Queue中添加:arguments={@Argument(name="x-single-active-consumer", value = "true", type = "java.lang.Boolean") }
如下所示:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "canal-mq-hmall-item",arguments={@Argument(name="x-single-active-consumer", value = "true", type = "java.lang.Boolean") }), exchange = @Exchange(name = "exchange.canal-hmall", type = ExchangeTypes.TOPIC), key = "canal-mq-hmall-item"), concurrency = "1" ) public void onMessage(Message message) throws Exception { parseMsg(message); }
concurrency=”1“表示 指定消费线程为1。
1.2.6 面试题
Canal是怎么伪装成 MySQL slave?
Canal数据同步异常了怎么处理?
项目中如何进行索引同步的?
如何保证Canal+MQ同步消息的顺序性?
2 缓存常见问题
2.1 面试题
你的项目是怎么保证缓存一致性的?或 说一下双写不一致的解决方案。
考察项目中对缓存的应用,对缓存一致性方案的理解。
分布式锁你们用什么实现的?
考察对分布式锁的理解和应用。
说一下缓存穿透、缓存雪崩、缓存击穿?
考察高并发下对缓存的常见问题及解决方案的理解。
2.2. 缓存应用入门
2.2.1 缓存概念
缓存(Cache)是一种用于提高数据访问速度的技术,它通过暂时存储频繁访问或计算成本较高的数据,使得后续的访问可以直接从缓存中获取数据,而不需要重新计算或从较慢的数据源(如数据库或磁盘)中读取数据。它可以显著提升应用程序的性能和响应速度。
缓存通常位于应用程序和底层数据存储之间,充当一个快速访问的中间层。
如下图所示:
我们的业务数据通常存储在数据库中,用户请求应用程序接口,应用程序查询数据库响应结果。
当访问量增大为了减轻对数据库的访问,提高响应速度,会在应用程序和数据库之间增加缓存层,用户请求应用程序接口,应用程序先查询缓存,如果查询到数据则直接响应用户,如果未在缓存命中则查询数据库并响应用户,并且将数据库查询到的数据存储到缓存中。
缓存可以存在于多个层次,从CPU缓存、内存缓存到分布式缓存系统。
上边我们用redis作为缓存层,redis可独立部署,这种称为分布式缓存。
我们也可以在应用程序内存中增加缓存结构,这种称为内存缓存,简单理解就是缓存存在于应用程序的内存中,如下图:在应用程序内存中创建一个HashMap对象,应用程序接口先从HashMap中查询数据,如果查询到则直接响应给用户,如果查询不到则查询数据库响应给用户并且从数据库查询到的数据存储到HashMap中。
关于内存缓存的例子还有很多,比如我们使用的MyBatis的一级缓存、二级缓存默认就是使用内存缓存实现。
MyBatis的一级缓存:
- 当同一个 SqlSession 中执行相同的 SQL(查询语句+条件一模一样才可以) 查询时,第一次执行查询的结果会被缓存起来,如果后续的查询条件相同,则直接从缓存中获取结果,而不再执行 SQL 语句。
- 一级缓存在 SqlSession 关闭或提交后就会被清空。
- 一级缓存就是SqlSession级别的缓存。
MyBatis的二级缓存:
- 二级缓存是在命名空间级别上的缓存,它作用于多个 SqlSession 之间。
- 与一级缓存相比,二级缓存可以跨越多个 SqlSession。
- 要启用二级缓存,需要在 Mapper 的 XML 文件的根节点
<mapper>上添加cache元素,或者自定义一个实现Cache接口的类。
关于MyBatis一级缓存和二级缓存的内容请大家课下自行复习。
2.2.2 缓存案例
2.2.2.1 需求
下图是购物车的界面,购物车中的商品信息来源于商品服务,为了提高访问速度减少对数据库的访问,购物车请求商品服务查询商品信息,商品服务的商品查询接口正是使用缓存去实现。
商品服务的商品信息查询接口使用缓存实现流程,如下图:
2.2.2.2 集成 redis
下边在商品服务中使用Redis在商品查询接口中使用缓存。
- 首先在商品服务添加redis依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
- 配置RedisConfig.java
从课程资料的“redis”目录拷贝RedisConfig.java到商品服务的com.hmall.item.config包下。
在RedisConfig.java中定义了RedisTemplate,使用RedisTemplate去操作Redis。
记住:
常用的有Jedis和Lettuce两个访问redis的客户端库,其中Lettuce的性能和并发性要好一些,Spring Boot 默认使用的是 Lettuce 作为 Redis 的客户端。
本项目集成了Spring data redis框架,在项目中可以通过RedisTemplate访问Redis,RedisTemplate提供了方便访问redis的模板方法。
RedisTemplate和Lettuce 是什么关系?
RedisTemplate 进行 Redis 操作时,实际上是通过 Lettuce 客户端与 Redis 服务器进行通信。
- 配置redis地址
在application.yaml中配置
spring: redis: host: 192.168.101.68 port: 6379 password: redis
2.2.2.3 编写缓存方法
在IItemService.java类中增加缓存方法
@Override public List<ItemDTO> queryItemByIdsCache(Collection<Long> ids) { //定义空列表 List<ItemDTO> itemDTOList = new ArrayList<>(); ids.stream().forEach(id->{ //先查询缓存 Item itemCache = (Item) redisTemplate.opsForValue().get("hmall:item:" + id); //如果缓存有则转为ItemDto if(Objects.nonNull(itemCache)){ ItemDTO itemDTO = BeanUtils.copyBean(itemCache, ItemDTO.class); itemDTOList.add(itemDTO); }else{ //查询数据库 Item itemDB = getById(id); if(Objects.nonNull(itemDB)){ //将数据库查询到的数据放入缓存 redisTemplate.opsForValue().set("hmall:item:" + id,itemDB); //转为ItemDto ItemDTO itemDTO = BeanUtils.copyBean(itemDB, ItemDTO.class); itemDTOList.add(itemDTO); } } }); return itemDTOList; }
修改“根据id批量查询商品” 接口,调用queryItemByIdsCache 方法查询商品信息
2.2.2.4 测试
通过swagger文档进行测试,请求查询商品信息
查询成功观察redis是否存储了商品信息
再次请求查询观察代码是否还查询数据库。
2.3 缓存一致性问题
2.3.1 问题描述
缓存一致性是指缓存和数据库两者的数据保持一致。
如果管理员在后台修改了商品信息保存在数据库,用户在购物车从缓存查询商品信息,数据库和缓存两者的数据如果不能保持一致将影响用户的体验性,比如:商品调价,原价是90元,调为100元,如果用户在购物车仍然查询到调价前的价格等用户去支付时却是调价后的价格,这个体验是非常差的。如下图:
为什么会出现缓存不一致呢?
我们在修改商品信息时去修改缓存信息不就保持一致了吗,如下代码:
public void updateItem(ItemDTO item){ //修改数据库 //更新缓存 }
这样的代码仍然会存在缓存不一致的问题。下边我们分析造成缓存不一致的原因:
造成缓存不一致的原因可能是在写数据库和写缓存两步存在异常,也可能是并发所导致。
写数据库和写缓存导致不一致称为双写不一致,比如:先更新数据库成功了,更新缓存时失败了,最终导致不一致。并发导致缓存不一致举例如下:
执行流程:
- 线程1先写入数据库X,当去写入缓存X时网络卡顿
- 线程2先写入数据库Y
- 线程2再写入缓存Y
- 线程1 写入缓存旧值X覆盖了新值Y
即使先写入缓存再写数据在并发环境也可能存在问题,如下图:
流程:
- 线程1先写入缓存X,当去写入数据库X时网络卡顿
- 线程2先写入缓存Y
- 线程2再写入数据库Y
- 线程1 写入数据库旧值X覆盖了新值Y
2.3.2 解决方案
如何解决并发环境下双写不一致的问题?
出现并发环境下双写不一致的主要原因就是多线程并发导致,只要把并行操作改为串行操作即可解决。
2.3.2.1 使用分布式锁
什么是分布式锁?
syncronized是jvm内存锁,要执行syncronized代码块需要先获取锁,获取锁的线程执行完成并释放锁后其它线程才可以获取锁,同步代码块的代码是串行执行。
这里是用synchronzed引出分布式锁,synchronzed本身只是单JVM的锁,并非分布式锁
syncronized{ //同步代码块 }
为什么会有分布式锁,为什么不使用syncronized内存锁?
如果是同一个进程内的线程去争抢资源可以用syncronized内存锁,因为都是同一个jvm中的线程去争抢同一个锁。
如果是多个进程的线程去争抢一个资源此时用syncronized是无法控制的,此时就需要用分布式锁,也就是独立于jvm进程部署一个分布式锁服务。
因为我们的微服务实例都会部署多个jvm进程,至少是两个进程去保证高可用,所以不同的进程去修改同一份数据就相当于去争抢同一个资源,此时就需要用分布式锁。
所以,分布式锁区别于内存锁,分布式锁就是由第三方软件单独提供获取锁释放锁的服务,独立部署,应用程序通过网络接口请求分布式锁服务获取锁释放锁,比如:Redis可以作为分布式锁服务,如下图:
上图中SETNX 是 Redis 中的一个命令,它的全称是 "SET if not exists",即只有当键不存在时才设置键值对成功。
多线程执行SETNX命令,同时只会有一个线程执行SETNX成功,执行成功的线程表示获取锁成功。
如下图:
当mykey不存在时执行SETNX mykey 1 返回1表示执行成功,执行成功的线程表示获取锁成功。
获取锁的线程执行业务逻辑完成后删除mykey表示释放锁,删除mykey后其它线程再次执行SETNX才会成功。
测试(自行测试):
开两个ssh窗口,并用redis-cli程序连接上redis
docker exec -it redis redis-cli
虚拟机中redis的密码为:redis,通过下边的命令进行认证:
auth redis
同时向两个窗口发送:setnx mykey 1命令
发现只有一个窗口执行成功,这说明只会有一个线程执行SETNX成功
下边我们理解使用分布式锁解决双写不一致的方案:
流程:
线程1申请分布式锁,拿到锁。此时其它线程无法获取同一把锁。
线程1写数据库,写缓存,操作完成释放锁。
伪代码如下:
public void updateItem(ItemDTO item){ //获取分布式锁 //修改数据库 //更新缓存 //释放锁 }
线程2申请分布锁成功,写数据库,写缓存。
对双写的操作每个线程顺序执行。
对操作异常问题仍需要解决:写数据库成功写缓存失败了,数据库需要回滚,此时就需要使用分布式事务组件。
使用分布式锁解决双写一致性不仅性能低下,复杂度增加。
2.3.2.2 延迟双删
既然双写操作存在不一致,我们把写缓存改为删除缓存呢?
先写数据库再删除缓存,如果删除缓存失败了缓存也就不一致了,那我们改为:先删除缓存再写数据库,如下图:
执行流程:
- 线程1删除缓存
- 线程2读缓存发现没有数据此时查询数据库拿到旧数据写入缓存【线程1的删除变成无效的了】
- 线程1写入数据库
- 最终数据库和缓存数据不一致。
即使线程1删除缓存、写数据库操作后线程2再去查询缓存也可能存在问题,如下图:
线程1向主数据库写,线程2向从数据库查询,流程如下:
- 线程1删除缓存
- 线程1向主数据库写,数据向从数据库同步
- 线程2查询缓存没有数据,查询从数据库,得到旧数据
- 线程2将旧数据写入缓存
解决上边的问题采用延迟双删:
线程1先删除缓存,再写入主数据库,延迟一定时间再删除缓存。
上图线程1的动作简化为下图:
延迟多长时间呢?
延迟主数据向从数据库同步的时间间隔,如果延迟时间设置不合理也会导致数据不一致。
2.3.2.3 异步同步
延迟双删的目的也是为了保证最终一致性,即允许缓存短暂不一致,最终保证一致性。
保证最终一致性的方案有很多,比如:通过MQ、定时任务都可以实现。
- 对实时性要求较高可以采用MQ异步同步:
流程如下:
线程1写数据库
- canal读取binlog日志,将数据变化日志写入mq
- 同步程序监听mq接收到数据变化的消息
- 同步程序解析消息内容写入redis,写入redis成功正常消费完成,消息从mq删除。
- 对实时性要求不高可以采用定时任务方式:
专门启动一个数据同步任务定时读取数据同步到redis,此方式适用于对数据实时性要求不强更新不频繁的数据。
- 线程1写入数据库(业务数据表,变化日志表)
- 同步程序读取数据库(变化日志表),根据变化日志内容写入redis,同步完成删除变化日志。
2.3.2.4 总结
通过上边的解决方案分析可知:
保存双写一致性的方案包括两个方向:
- 保证强一致性
使用分布式锁对缓存的读和写加锁控制,这样可以保证强一致性,但是这样影响了使用缓存的性能,可以设想当前如果线程01正在修改缓存,此时线程02去读缓存就需要等线程01修改完成释放锁后才能读到最新数据,这肯定会影响读数据的性能。
伪代码如下:
public void updateItem(ItemDTO item){ //获取写锁 //修改数据库 //更新缓存 //释放锁 } public ItemDTO getItemById(Long id){ //获取读锁 //从缓存读取商品信息 //释放锁 }
- 保证最终一致性
允许缓存和数据库的数据存在一段时间不一致,但最终数据会一致。
可以使用延迟双删、定时任务异步同步的方式。
延迟双删给了我们启发,那就是:要想保证缓存最终于数据库一致就是key必须加过期时间,即使一旦发生缓存不一致,当缓存过期后会重新加载,数据最终还是能保证一致。
下边尝试回答面试题:
你的项目是怎么保证缓存一致性的?或 说一下双写不一致的解决方案。
2.4 分布式锁入门
在缓存一致性方案中用到了分布式锁,这里对分布式锁进行介绍。
2.4.1 面试题
面试题:分布式锁与syncronized锁的区别
- Synchronized锁
Synchronized 是 Java 语言内置的关键字,提供了一种简单的方式来实现线程之间的互斥。它可以在方法或代码块级别上使用,保证了在同一时刻只有一个线程可以执行被 synchronized 修饰的方法或代码块。
- 作用范围:
synchronized只能在单个 JVM 中起作用,适用于多线程环境中的同步问题。 - 实现方式:通过 JVM 实现,利用了底层的操作系统互斥锁(mutex)。
- 使用简便:直接在代码层面使用,不需要额外的配置或依赖。
- 应用场景:
synchronized主要用于解决同一 JVM 内多线程间的同步问题。
- 分布式锁
分布式锁是一种在分布式系统中实现同步的机制。当应用程序分布在不同的机器上时,需要一种协调机制来确保多个节点之间的一致性。分布式锁已不属于某个虚拟机,而是分布式部署,由多个虚拟机所共享。
- 用范围:分布式锁跨越多个 JVM 或者多个服务实例,适用于分布式系统中的同步问题。
- 实现方式:通常依赖于外部的服务或中间件,如 Redis、Zookeeper、Etcd 等,通过一定的协议(如两阶段锁、心跳检测)来实现。
- 复杂性:实现相对复杂,需要考虑网络延迟、故障恢复等问题
- 应用场景:分布式锁则用于解决跨多个 JVM 或者跨多个服务实例的同步问题。
2.4.2 技术方案
实现分布式锁的方案有很多,常用的如下:
1、基于数据库实现分布锁
利用数据库主键唯一性的特点,或利用数据库唯一索引、行级锁的特点,比如:多个线程同时向数据库插入主键相同的同一条记录,谁插入成功谁就获取锁,多个线程同时去更新相同的记录,谁更新成功谁就抢到锁。
2、基于redis实现锁
redis提供了分布式锁的实现方案,比如:SETNX、redisson等。
3、使用zookeeper实现
zookeeper是一个分布式协调服务,主要解决分布式程序之间的同步的问题。zookeeper的结构类似的文件目录,多线程向zookeeper创建一个子目录(节点)只会有一个创建成功,利用此特点可以实现分布式锁,谁创建该结点成功谁就获得锁。
2.4.3 SETNX实现分布式锁
AI: 使用redisTemplate写一个例子用SETNX实现分布式锁
修改更新商品接口:
@ApiOperation("更新商品") @PutMapping public void updateItem(@RequestBody ItemDTO item) { Long id = item.getId(); //锁id String lockKey = "hmall:item:lock:"+item.getId(); //通过SETNX获取锁【关键代码】 if (!redisTemplate.opsForValue().setIfAbsent(lockKey, "lock", 20, TimeUnit.SECONDS)) { //获取锁失败 throw new BizIllegalException("操作过于频繁,请稍后再试!"); } try { // 不允许修改商品状态,所以强制设置为null,更新时,就会忽略该字段 item.setStatus(null); // 更新 itemService.updateById(BeanUtils.copyBean(item, Item.class)); Item item1 = itemService.getBaseMapper().selectById(item.getId()); item = BeanUtils.copyBean(item1, ItemDTO.class); //更新缓存 redisTemplate.opsForValue().set("hmall:item:"+id,item,1, TimeUnit.HOURS); }catch (Exception e){ e.printStackTrace(); }finally { // 【关键代码】 redisTemplate.delete(lockKey); } }
测试:
同时开两个测试窗口,每个窗口的请求都会开一个新线程,开两个窗口来模拟两个线程请求接口。
第一个窗口更新商品信息获取锁成功,更新商品信息未完成时锁未释放,另一个窗口无法获取锁导致更新失败。
使用SETNX实现分布式锁有以下问题:
- 锁过期被强占
当线程01执行业务逻辑的时间过长,锁到达过期时间则自动释放锁,此时线程02将获取锁成功,仍然存在并发问题。
- 锁被强删
当线程01执行业务逻辑的时间过长,锁到达过期时间则自动释放锁,此时线程02将获取锁成功,线程01执行完业务逻辑后删除了线程02的锁。
- 获取锁失败没有重试机制
当线程获取锁失败没有重试机制导致程序直接失败,一些场景需要加入重试机制提高获取锁的成功率。
在生产中建议使用成熟的分布式锁库,如 Redisson,以确保锁的安全性和健壮性。
2.4.4 Redisson实现分布式锁
2.4.4.1 基本使用
在SETNX实现分布式的缺陷都可以用Redisson去解决。
AI:写一个redisson实现分布式锁的例子
Redisson 是一个用于 Java 开发的 Redis 客户端和分布式锁框架,它不仅可以实现分布式锁,还可以实现分布式集合(如 List、Set、Map 等)和分布式对象(如 AtomicInteger、AtomicLong、CountDownLatch 等),简单理解就是将JVM中内存存储的List、Set、AtomicInteger这些对象使用Redis去存储和管理。
使用Redisson的基本用法如下:
// 创建Redisson客户端 RedissonClient redissonClient = Redisson.create(); // 获取名为myLock的分布式锁实例,通过此实例进行加锁、解锁 RLock lock = redissonClient.getLock("myLock"); try { // 尝试获取锁,最多等待3秒,持锁时间为5秒【关键代码】 boolean isLockAcquired = lock.tryLock(3, 5, TimeUnit.SECONDS); if (isLockAcquired ) { try { // 获取锁成功,执行业务逻辑 Thread.sleep(5000); } finally { // 释放锁【关键代码】 lock.unlock(); } } else { // 获取锁失败,处理相应逻辑 } } catch (InterruptedException e) { // 处理中断异常 }
说明:
lock.tryLock方法是一种非阻塞获取锁的方式,没有获取锁可以直接返回,而lock.lock()是一种阻塞获取锁的方法,多个线程通过lock()方法获取锁,只有一个线程获取到锁,其它线程将阻塞等待。
通常lock.tryLock方法使用的更广泛。
- 使用tryLock方法获取锁时传3个参数:
- waitTime:尝试获取锁的最大等待时间,在这个时间范围内会不断地尝试获取锁,如果在
waitTime时间内未能获取到锁,则返回false。waitTime默认为-1,表示获取锁失败后立刻返回不重试。 - leaseTime:表示持锁的时间,即锁的自动释放时间。在获取锁成功后,锁会在
leaseTime时间后自动释放。如果在持锁的时间内未手动释放锁,锁也会在leaseTime时间后自动释放。 - TimeUnit:表示时间单位,可以是秒、毫秒等。
- tryLock方法返回值:
true:获取到了锁
false:未获取到锁
- 注意释放锁
获取到锁后的代码放在try中,在finally 中释放锁。
下边用Redisson替换SETNX方式。
- 首先在商品服务加入 Redisson依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.17.7</version> </dependency>
- 将资料目录“redis”下的RedissonConfiguration.java 拷贝到商品服务的com.hmall.item.config包下
- 编写service方法,使用Redisson替换SETNX
@PutMapping public void updateItem(@RequestBody ItemDTO item) { Long id = item.getId(); //锁id String lockKey = "hmall:item:lock:"+item.getId(); RLock lock = redissonClient.getLock(lockKey); try { //尝试获取锁 boolean b = lock.tryLock(3,15,TimeUnit.SECONDS); if(!b){ throw new BizIllegalException("操作过于频繁,请稍后再试!"); } try { // 不允许修改商品状态,所以强制设置为null,更新时,就会忽略该字段 item.setStatus(null); // 更新 itemService.updateById(BeanUtils.copyBean(item, Item.class)); Item item1 = itemService.getBaseMapper().selectById(item.getId()); item = BeanUtils.copyBean(item1, ItemDTO.class); //更新缓存 redisTemplate.opsForValue().set("hmall:item:"+id,item,1, TimeUnit.HOURS); }catch (Exception e){ e.printStackTrace(); }finally { //释放锁 lock.unlock(); } }catch (Exception e){ e.printStackTrace(); } }
- 测试
同SETNX测试过程。
2.4.4.2 看门狗机制
再回顾下使用SETNX的前两个问题如下:
- 锁过期被强占
当线程01执行业务逻辑的时间过长,锁到达过期时间则自动释放锁,此时线程02将获取锁成功,仍然存在并发问题。
- 锁被强删
当线程01执行业务逻辑的时间过长,锁到达过期时间则自动释放锁,此时线程02将获取锁成功,线程01执行完业务逻辑后删除了线程02的锁。
- 获取锁失败没有重试机制
当线程获取锁失败没有重试机制导致程序直接失败,一些场景需要加入重试机制提高获取锁的成功率。
通过Redisson的测试可知Redisson具有重试功能,解决SETNX的第三个问题。
导致前两个问题的主要原因是任务执行时间过长,超过了锁的有效期,锁失效后被其它线程抢占,有同学可能会说不要设置锁的过期时间,如果不设置锁的过期时间当程序断电结束会导致死锁发生,所以锁一定要设置过期时间。
但是设置锁的过期时间为多少合适呢?
设置多少都不合适,这个问题Redisson提供了锁自动续期的功能,默认锁的过期时间为30秒,当任务没有执行完成时每隔10秒自动续期一次,这个机制就是Redisson的看门狗机制。
"看门狗机制"(Watchdog)是一种用于监测和维护锁的超时时间的机制,它可以确保在任务没有完成时对锁的过期时间进行自动续期,以避免任务没有完成时锁自动释放的问题。开启看门狗后针对当前锁创建一个线程执行延迟任务,默认每隔10秒将锁的过期时间重新续期为30秒。当任务结束,程序执行unlock()方法释放锁时会结束看门狗线程。
注意:任务结束一定要执行unlock()方法释放锁,否则看门狗线程一直进行续期,导致锁无法释放。
下边测试看门狗:
调用下边的方法都可以开启看门狗:
tryLock(long waitTime, -1,TimeUnit unit)
传入leaseTime参数为-1可以开启看门狗。
下边进行测试:
修改tryLock的方法,leaseTime参数传入-1,开启看门狗。
boolean isLock = lock.tryLock(3, -1, TimeUnit.SECONDS);//开启看门狗
在代码中添加Thread.sleep(50000);休眠50秒。
重启商品服务,调用更新商品信息接口,获取锁成功,通过redis客户端观察分布式锁的自动续期功能
锁的过期时间到20秒时会自动续期到30秒。
2.5 缓存三剑客问题
这部分在中州讲过,大家自己复习
- 缓存击穿:热点数据失效
- 数据预热
- 永不过期
- 缓存穿透:访问一个Redis和DB都不存在的数据
- 缓存null值
- 布隆过滤器
- 缓存雪崩:大量key同一时间过期
- 过期时间+随机值
- 分片集群
2.6 Redis持久化
2.6.1 面试题
面试题:说一下Redid的持久化机制。
该面试题考察你对Redis持久化机制的理解程度。
Redis使用两种主要的持久化机制来保证数据的安全性:RDB(Redis Database Backup)和AOF(Append Only File),需要去理解这两种持久化机制的工作原理及应用场景才能回答本问题。
2.6.2 RDB
1 RDB是什么?
RDB全称Redis Database Backup file(Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为RDB文件。
2 执行方式
RDB 的触发方式包括:手动触发、自动触发。
- 手动触发:
- 使用
SAVE命令,这会导致Redis在执行完该命令之前阻塞客户端请求。 - 使用
BGSAVE命令,Redis会在后台异步进行快照操作,不会阻塞客户端请求。
- 自动触发:
- 通过配置
save选项,可以设置在满足特定条件时自动触发RDB持久化。例如,可以在一定时间内发生了一定数量的写操作后自动进行一次快照。 - Redis停机时,Redis停机时会执行一次save命令,实现RDB持久化。
在redis.conf文件中配置了rdb的文件名称和保存路径:
是否压缩 ,建议不开启,压缩也会消耗cpu,磁盘的话不值钱 rdbcompression yes RDB文件名称 dbfilename dump.rdb 文件保存的路径目录 dir ./
1)save命令
执行save命令可以立即执行一次RDB,在执行完该命令之前阻塞客户端请求.
进入redis容器:docker exec -it redis /bin/bash
在执行命令前我查看rdb文件
执行save命令后从文件更新时间上看发现rdb文件有更新
2)bgsave命令
下面的命令可以异步执行RDB:
这个命令执行后会开启独立进程完成RDB,主进程可以持续处理用户请求,不受影响。
3)触发RDB条件
Redis内部有触发RDB的机制,可以在redis.conf文件中找到,格式如下:
900秒内,如果至少有1个key被修改,则执行bgsave , 如果是save "" 则表示禁用RDB save 900 1 save 300 10 save 60 10000
3 执行原理
RDB 的工作流程如下图所示:
- 启动子进程:当Redis服务器接收到
BGSAVE命令时,它会先检查是否有正在运行的子进程。如果没有,则创建一个新的子进程(fork操作)。 - 复制父进程内存映像:此时父进程和子进程共享相同的内存页。当任一进程对某个内存页进行修改时,系统才会为这个进程分配新的内存页并复制内存过去。
- 子进程生成RDB文件:子进程开始读取内存中的数据并生成RDB文件。在此过程中,父进程继续处理客户端请求。
- 替换旧的RDB文件:一旦子进程完成RDB文件的生成,它会用新生成的文件替换旧的RDB文件。
- 父进程继续工作:子进程结束后,父进程继续正常工作,此时新的RDB文件已经包含了最新的数据状态。
4 RDB 的优缺点
- 优点:
- 性能影响小:因为RDB文件的生成是在子进程中进行的,不会阻塞主进程。
- 启动速度快:在Redis重启时,可以通过加载RDB文件快速恢复数据。
- 文件紧凑:RDB文件是一个紧凑的二进制文件,占用的空间较小。
- 缺点:
- 数据丢失风险:如果Redis服务器在最后一次成功生成RDB文件后宕机,那么从上次快照以来的所有更改都将丢失。
- 快照生成频率:需要平衡性能影响与数据丢失的风险。
5 总结
RDB方式bgsave的基本流程?
- fork主进程得到一个子进程,共享内存空间
- 子进程读取内存数据并写入新的RDB文件
- 用新RDB文件替换旧的RDB文件
RDB会在什么时候执行?save 60 1000代表什么含义?
- 默认是服务停止时
- 代表60秒内至少执行1000次修改则触发RDB
RDB的缺点?
- RDB执行间隔时间长,两次RDB之间写入数据有丢失的风险
- fork子进程、压缩、写入RDB文件都比较耗时
2.6.3 AOF
1.AOF原理
AOF全称为Append Only File(追加文件)是Redis提供的另一种持久化机制,与RDB相比,AOF持久化通过记录每次写操作的命令到一个单独的日志文件中,使得即使Redis服务重启,也可以通过重放这些命令来恢复数据。
2.AOF配置
AOF默认是关闭的,需要修改redis.conf配置文件来开启AOF:
#是否开启AOF功能,默认是no appendonly yes #AOF文件的名称 appendfilename "appendonly.aof"
AOF的命令记录的频率也可以通过redis.conf文件来配:
#表示每执行一次写命令,立即记录到AOF文件 appendfsync always #写命令执行完先放入AOF缓冲区,然后表示每隔1秒将缓冲区数据写到AOF文件,是默认方案 appendfsync everysec #写命令执行完先放入AOF缓冲区,由操作系统决定何时将缓冲区内容写回磁盘 appendfsync no
三种策略对比:
打开aof重启redis,写入数据查看aof文件的内容,aof内容是若干redis命令的集合。
3.AOF文件重写
因为是记录命令,AOF文件会比RDB文件大的多。而且AOF会记录对同一个key的多次写操作,但只有最后一次写操作才有意义。通过执行bgrewriteaof命令,可以让AOF文件执行重写功能,用最少的命令达到相同效果。
如图,AOF原本有三个命令,但是set num 123 和 set num 666都是对num的操作,第二次会覆盖第一次的值,因此第一个命令记录下来没有意义。
所以重写命令后,AOF文件内容就是:mset name jack num 666
Redis也会在触发阈值时自动去重写AOF文件。阈值也可以在redis.conf中配置:
# AOF文件比上次文件 增长超过多少百分比则触发重写 auto-aof-rewrite-percentage 100 # AOF文件体积最小多大以上才触发重写 auto-aof-rewrite-min-size 64mb
4 总结
RDB与AOF对比,RDB和AOF各有自己的优缺点,如下图:
在实际应用中,RDB(Redis Database dump)和AOF(Append Only File)这两种持久化方式各有优势,通常的选择取决于具体的应用场景和需求。
如果对数据完整性有极高要求,那么AOF通常是首选;
如果对性能要求较高并且可以接受少量的数据丢失,那么RDB是更好的选择。
而在大多数情况下,结合使用RDB和AOF能够提供最佳的数据保护和性能表现。
RDB与AOF详细总结如下:
- RDB (Redis Database dump)
- 原理
- 是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为RDB文件。
- 执行方式:手动触发,自动触发
- 异步方式同步,当Redis服务器接收到
BGSAVE命令时,它会先检查是否有正在运行的子进程。如果没有,则创建一个新的子进程(fork操作)。子进程开始读取内存中的数据并生成RDB文件。在此过程中,父进程继续处理客户端请求。一旦子进程完成RDB文件的生成,它会用新生成的文件替换旧的RDB文件。
- 主要用途:
- 数据备份:由于RDB文件是某个时间点的快照,因此非常适合用于数据备份和灾难恢复。
- 快速恢复:RDB文件相对较小,可以在服务重启时快速加载到内存中,适合需要快速恢复服务的情况。
- 适用场景:
- 当数据的完整性不是绝对重要,可以接受一定程度的数据丢失时。
- 对于读多写少的应用,RDB可以提供较好的性能。
- 需要定期做全量备份时。
- 配置示例:
- 使用
save命令手动触发快照生成。 - 设置自动快照生成规则,例如:
save 900 1表示在900秒内如果有1个key发生变化,则生成快照。
- AOF (Append Only File)
- 原理
- AOF持久化通过记录每次写操作的命令到一个单独的日志文件中,使得即使Redis服务重启,也可以通过重放这些命令来恢复数据。
- 主要用途:
- 数据持久化:AOF通过记录每个写命令来保证数据的完整性,非常适合需要强一致性的场景。
- 数据恢复:即使发生故障,也可以通过重放AOF文件中的命令来恢复数据。
- 适用场景:
- 当数据不能有任何丢失时。
- 对于写密集型应用,AOF可以提供更好的数据保护。
- 需要频繁的写操作,并且要求数据尽可能不丢失的情况下。
- 配置示例:
- 设置
appendfsync选项来控制文件同步的频率,例如:appendfsync everysec表示每秒同步一次。 - 定期进行AOF重写以减少文件大小。
- 组合使用 RDB 和 AOF
在很多情况下会选择同时使用RDB和AOF两种持久化方式,以结合两者的优点:
- 使用RDB进行定期的全量备份,这样即使发生灾难性的故障也能快速恢复。
- 使用AOF来确保数据的完整性和连续性,在服务重启时可以通过重放AOF文件来恢复最新的数据状态。
这种组合方式能够提供较好的性能和数据保护,通常被认为是最佳实践。例如:
- 配置RDB定期生成快照文件,用于灾难恢复。
- 配置AOF以
everysec同步策略运行,确保数据的完整性。 - 定期进行AOF重写以保持文件大小在可管理范围内。
3 Redis集群
3.1 面试题
面试题:你们用的是Redis单机还是Redis集群?Redis集群具体怎么做的?
一般在自己学习时用Redis单机,或者一些个人项目中用Redis单机,在生产项目中会使用Redis集群。
Redis单机就是部署一个Redis实例,它不具有高可用,当这个Redis实例挂了将直接影响系统的运行,并且当一台Redis实例不足以承担请求压力时将会影响系统的情况。
Redis集群就是多个Redis实例组成一个集群共同对外提供Redis服务,首先具有高可用性,一个Redis实例挂了还有其它Redis实例对外提供服务,不影响整个集群对外提供服务。还有就是可扩展性,当系统压力比较大时通过扩展Redis实例节点数即可增加集群的服务能力。
所以在生产中正规的项目一般都会使用Redis集群。
那Redis集群具体怎么做的?这个是考察你对Redis集群掌握多少,首先你得知道生产中你的项目用的Redis集群是什么模式,是主从结构、还是分片集群,因为模式不同对于应用程序配置连接Redis的方式也可能不同,其次是对集群之间数据同步、故障转移等基本特性的掌握。至于你的集群有多少Redis节点组成这个一般由运维人员进行部署,作为Java程序员并不清楚生产环境具体节点数是正常的。
所以下一步我们需要搞清楚Redis集群的结构是什么。
3.2 主从结构
3.2.1 介绍
首先说说Redis 的主从结构,下图就是一个简单的Redis主从集群结构:
如图所示,主从集群中有一个master节点、多个slave节点(现在叫replica)组成。
特点:
- 写操作访问master节点,master会自动将数据同步给两个slave节点
- 读操作访问各个slave节点,从而分担并发压力
具体主从集群的搭建请参考“Redis集群搭建”文档,有兴趣的同学在课下可以自已搭建。
3.2.2 主从同步原理
主从集群的master节点是如何把数据同步到slave节点呢?
主从之间通过全量同步、增量同步的方式完成数据同步。
3.2.2.1 全量同步
主从第一次建立连接时,会执行全量同步,将master节点的所有数据都拷贝给slave节点,流程如下:
完整流程描述:
slave节点请求数据同步master节点判断replid,发现不一致表示是第一次同步,进行全量同步
master通过replid判断是否是第一次同步,Replication Id简称replid,是数据集的标记,replid一致则是同一数据集。每个master都有唯一的replid,slave则会继承master节点的replid
master将完整内存数据生成RDB,发送RDB到slaveslave清空本地数据,加载master的RDB- 全量同步完成,后边进行增量同步,命令记录在
repl_baklog中,通过offset偏移量master持续将log中的命令发送给slave。
3.2.2.2 增量同步
全量同步需要先做RDB,然后将RDB文件通过网络传输给slave,成本太高了。因此除了第一次做全量同步,其它大多数时候slave与master都是做增量同步。
什么是增量同步?就是只更新slave与master存在差异的部分数据。如图:
那么master怎么知道slave与自己的数据差异在哪里呢?
这就要说到全量同步时的repl_baklog文件了。这个文件是一个固定大小的数组,只不过数组是环形,也就是说角标到达数组末尾后,会再次从0开始读写,这样数组头部的数据就会被覆盖。
repl_baklog中会记录Redis处理过的命令及offset,包括master当前的offset,和slave已经拷贝到的offset:
slave与master的offset之间的差异,就是salve需要增量拷贝的数据了。
随着不断有数据写入,master的offset逐渐变大,slave也不断的拷贝,追赶master的offset:
直到数组被填满:
此时,如果有新的数据写入,就会覆盖数组中的旧数据。不过,旧的数据只要是绿色的,说明是已经被同步到slave的数据,即便被覆盖了也没什么影响。因为未同步的仅仅是红色部分:
但是,如果slave出现网络阻塞,导致master的offset远远超过了slave的offset:
如果master继续写入新数据,master的offset就会覆盖repl_baklog中旧的数据,直到将slave现在的offset也覆盖:
棕色框中的红色部分,就是尚未同步,但是却已经被覆盖的数据。此时如果slave恢复,需要同步,却发现自己的offset都没有了,无法完成增量同步了。只能做全量同步。
repl_baklog大小有上限,写满后会覆盖最早的数据。如果slave断开时间过久,导致尚未备份的数据被覆盖,则无法基于repl_baklog做增量同步,只能再次全量同步。
3.2.3 总结
Redis全量同步和增量同步区别?
- 全量同步:
主从第一次建立连接时,会执行全量同步.
从节点的offset被主的offset覆盖后需要全量同步。
master将完整内存数据生成RDB,发送RDB到slave。
- 增量同步:
slave节点断开又恢复,并且在repl_baklog中能找到offset时执行增量同步。
slave提交自己的offset到master,master获取repl_baklog中从offset之后的命令给slave。
3.3 Redis哨兵
3.3.1 哨兵工作原理
主从结构中master节点的作用非常重要,一旦故障就会导致集群不可用。那么有什么办法能保证主从集群的高可用性呢?
Redis提供了哨兵(Sentinel)机制来监控主从集群监控状态,确保集群的高可用性。
下图是哨兵集群作用原理图:
哨兵的作用如下:
- 状态监控:
Sentinel会不断检查您的master和slave是否按预期
Sentinel 通过定时向master和slave发送ping判断是否下线,如果超过半数的Sentinel 判断master下线则认为是客观下线,按照一定的规则在salve中选择一个作为新的master。
- 故障恢复(failover):如果
master故障,Sentinel会将一个slave提升为master。当故障实例恢复后会成为slave
在所有Sentinel中找一个Leader,由该Leader向新master发送slaveof no one命令,让该节点成为master。
Leader给所有其它slave发送slaveof命令让slave成为新master的slave。
- 状态通知:
Sentinel充当Redis客户端的服务发现来源,当集群发生failover时,会将最新集群信息推送给Redis的客户端
3.3.1.1 状态监控
Sentinel基于心跳机制监测服务状态,每隔1秒向集群的每个节点发送ping命令,并通过实例的响应结果来做出判断:
- 主观下线(sdown):如果某sentinel节点发现某Redis节点未在规定时间响应,则认为该节点主观下线。
- 客观下线(odown):若超过指定数量(通过
quorum设置)的sentinel都认为该节点主观下线,则该节点客观下线。quorum值最好超过Sentinel节点数量的一半,Sentinel节点数量至少3台。
如图:
一旦发现master故障,sentinel需要在salve中选择一个作为新的master,选择依据是这样的:
- 首先会判断slave节点与master节点断开时间长短,如果超过
down-after-milliseconds * 10则会排除该slave节点 - 然后判断slave节点的
slave-priority值,越小优先级越高,如果是0则永不参与选举(默认都是1)。 - 如果
slave-prority一样,则判断slave节点的offset值,越大说明数据越新,优先级越高 - 最后是判断slave节点的
run_id大小,越小优先级越高(通过info server可以查看run_id)。
对应的官方文档如下:
https://redis.io/docs/management/sentinel/#replica-selection-and-priority
问题来了,当选出一个新的master后,该如何实现身份切换呢?
大概分为两步:
- 在多个
sentinel中选举一个leader - 由
leader执行failover(故障转移)
3.3.1.2 选举leader
首先,Sentinel集群要选出一个执行failover的Sentinel节点,可以成为leader。要成为leader要满足两个条件:
- 最先获得超过半数的投票
- 获得的投票数不小于
quorum值
而sentinel投票的原则有两条:
- 优先投票给目前得票最多的
- 如果目前没有任何节点的票,就投给自己
比如有3个sentinel节点,s1、s2、s3,假如s2先投票:
- 此时发现没有任何人在投票,那就投给自己。
s2得1票 - 接着
s1和s3开始投票,发现目前s2票最多,于是也投给s2,s2得3票 s2称为leader,开始故障转移
不难看出,谁先投票,谁就会称为leader,那什么时候会触发投票呢?
答案是第一个确认master客观下线的人会立刻发起投票,一定会成为leader。
OK,sentinel找到leader以后,该如何完成failover呢?
3.3.1.3 failover
我们举个例子,有一个集群,初始状态下7001为master,7002和7003为slave:
假如master发生故障,slave1当选。则故障转移的流程如下:
1)sentinel给备选的slave1节点发送slaveof no one命令,让该节点成为master
2)sentinel给所有其它slave发送slaveof 192.168.150.101 7002 命令,让这些节点隶属于新master,也就是7002的slave节点,开始从新的master上同步数据。
3)最后,当故障节点恢复后会接收到哨兵信号,执行slaveof 192.168.150.101 7002命令,成为slave:
3.3.2 总结
Redis哨兵的三个作用是什么?
- 集群监控
Redis哨兵每隔1秒向主从节点发送一次ping命令,如果超过一定时间没有相向则认为是主观下线(sdown)
如果大多数Redis哨兵都认为实例主观下线,则判定服务客观下线(odown)
- 故障恢复
首先要在Redis哨兵中选出一个leader,由leader执行failover
- 状态通知
Redis哨兵从slave中选取master后会通过给redis客户端。
Redis哨兵如何判断一个redis实例是否健康?
- Redis哨兵每隔1秒向主从节点发送一次ping命令,如果超过一定时间没有相向则认为是主观下线(
sdown) - 如果大多数Redis哨兵都认为实例主观下线,则判定服务客观下线(
odown)
故障转移步骤有哪些?
- 首先要在Redis哨兵中选出一个
leader,由leader执行failover - 选定一个
slave作为新的master,执行slaveof no one,切换到master模式 - 然后让所有节点都执行
slaveof新master - 修改故障节点配置,添加
slaveof新master
Redis哨兵选举leader的依据是什么?
- 票数超过Redis哨兵节点数量1半
- 票数超过设定数量(quorum)
- 一般情况下最先发起failover的节点会当选
Redis哨兵从slave中选取master的依据是什么?
- 首先会判断slave节点与master节点断开时间长短,如果超过
down-after-milliseconds * 10则会排除该slave节点 - 然后判断slave节点的
slave-priority值,越小优先级越高,如果是0则永不参与选举(默认都是1)。 - 如果
slave-prority一样,则判断slave节点的offset值,越大说明数据越新,优先级越高 - 最后是判断slave节点的
run_id大小,越小优先级越高(通过info server可以查看run_id)。
3.4 Redis分片集群
3.4.1 介绍
主从模式可以解决高可用、高并发读的问题。但依然有两个问题没有解决:
- 海量数据存储
- 高并发写
要解决这两个问题就需要用到分片集群了。分片的意思,就是把数据拆分存储到不同节点,这样整个集群的存储数据量就更大了。
Redis分片集群的结构如图:
分片集群特征:
- 集群中有多个master,每个master保存不同分片数据 ,解决海量数据存储问题
- 每个master都可以有多个slave节点 ,确保高可用
- master之间通过ping监测彼此健康状态 ,类似哨兵作用
- 客户端请求可以访问集群任意节点,最终都会被转发到数据所在节点
3.4.2 散列插槽
数据要分片存储到不同的Redis节点,肯定需要有分片的依据,这样下次查询的时候才能知道去哪个节点查询。很多数据分片都会采用一致性hash算法。而Redis则是利用散列插槽(hash slot)的方式实现数据分片。
详见官方文档:
https://redis.io/docs/management/scaling/#redis-cluster-101
在Redis集群中,共有16384个hash slots,集群中的每一个master节点都会分配一定数量的hash slots。具体的分配在集群创建时就已经指定了:
如图中所示:
- Master[0],本例中就是7001节点,分配到的插槽是0~5460
- Master[1],本例中就是7002节点,分配到的插槽是5461~10922
- Master[2],本例中就是7003节点,分配到的插槽是10923~16383
当我们读写数据时,Redis基于CRC16 算法对key做hash运算,得到的结果与16384取余,就计算出了这个key的slot值。然后到slot所在的Redis节点执行读写操作。
不过hash slot的计算也分两种情况:
- 当
key中包含{}时,根据{}之间的字符串计算hash slot - 当
key中不包含{}时,则根据整个key字符串计算hash slot
例如:
- key是
user,则根据user来计算hash slot - key是
user:{age},则根据age来计算hash slot
我们来测试一下,先于7001建立连接:
# 进入容器 docker exec -it r1 bash # 进入redis-cli,这里要加-c表示连接集群 redis-cli -c -p 7001 # 测试 set user jack
结果如下:
可以看到,客户端自动跳转到了5474这个slot所在的7002节点。
现在,我们添加一个新的key,这次加上{}:
# 试一下key中带{} set user:{age} 21 # 再试一下key中不带{} set age 20
结果如下:
可以看到user:{age}和age计算出的slot都是741。
3.4.3 故障转移
分片集群的节点之间会互相通过ping的方式做心跳检测,超时未回应的节点会被标记为下线状态。当发现master下线时,会将这个master的某个slave提升为master。
3.4.4 总结
Redis分片集群如何判断某个key应该在哪个实例?
- 将16384个插槽分配到不同的实例
- 根据key计算哈希值,对16384取余
- 余数作为插槽,寻找插槽所在实例即可
如何将同一类数据固定的保存在同一个Redis实例?
- Redis计算key的插槽值时会判断key中是否包含
{},如果有则基于{}内的字符计算插槽 - 数据的key中可以加入
{类型},例如key都以{typeId}为前缀,这样同类型数据计算的插槽一定相同
3.5 总结
面试题:你们用的是Redis单机还是Redis集群?Redis集群具体怎么做的?
面试题:Java程序如何访问Redis集群?
参考:"Redis集群搭建"中的“Java客户端连接分片集群”章节。
4.Redis内存回收
4.1 面试题
说一下Redis内存回收机制。
考察对Redis内存过期策略和内存淘汰策略的理解。
Redis之所以性能强,最主要的原因就是基于内存存储。然而单节点的Redis其内存大小不宜过大,会影响持久化或主从同步性能。
我们可以通过修改redis.conf文件,添加下面的配置来配置Redis的最大内存:
maxmemory 1gb
当内存达到上限,就无法存储更多数据了。因此,Redis内部会有两套内存回收的策略:
- 内存过期策略
- 内存淘汰策略
4.2.内存过期处理
存入Redis中的数据可以配置过期时间,到期后再次访问会发现这些数据都不存在了,也就是被过期清理了。
4.2.1.过期命令
Redis中通过expire命令可以给KEY设置TTL(过期时间),例如:
# 写入一条数据 set num 123 # 设置20秒过期时间 expire num 20
不过set命令本身也可以支持过期时间的设置:
# 写入一条数据并设置20s过期时间 set num EX 20
当过期时间到了以后,再去查询数据,会发现数据已经不存在。
4.2.2.过期策略
那么问题来了:
- Redis如何判断一个KEY是否过期呢?
- Redis又是何时删除过期KEY的呢?
Redis是何时删除过期KEY的呢?
Redis并不会在KEY过期时立刻删除KEY,因为要实现这样的效果就必须给每一个过期的KEY设置时钟,并监控这些KEY的过期状态。无论对CPU还是内存都会带来极大的负担。
Redis的过期KEY删除策略有两种:
- 惰性删除
- 周期删除
惰性删除:
当一个客户端尝试访问一个键时,Redis 会检查该键是否已经过期。如果过期,Redis 将删除该键,并返回一个表示键不存在的响应给客户端。这种方式确保了对内存的有效管理,但可能在高并发访问过期键的情况下导致性能下降。
定期删除:
Redis 还有一个后台线程,以一定的频率检查过期的键并删除它们。这个频率可以通过 server.hz 参数配置。server.hz 定义了 Redis 服务器每秒运行维护任务的次数,包括但不限于过期键的清理。默认情况下,server.hz 的值为 10,意味着每秒进行 10 次检查。
如何调整 server.hz?
- 提高频率:如果您的应用中有很多短生命周期的键,并且希望更快地回收这些键所占用的内存,可以考虑增加
server.hz的值。但是请注意,更高的server.hz会增加 CPU 的使用率,因为它会导致更频繁的后台任务执行。 - 降低频率:如果您发现 Redis 服务器的 CPU 使用率较高,而您的应用对过期键的清理速度要求不是特别严格,可以考虑降低
server.hz的值以减少 CPU 负载。
配置方法:
要修改 server.hz 的值,您可以在 Redis 的配置文件 redis.conf 中找到相应的设置,并根据需要更改它。
4.3.内存淘汰策略
对于某些特别依赖于Redis的项目而言,仅仅依靠过期KEY清理是不够的,内存可能很快就达到上限。因此Redis允许设置内存告警阈值,当内存使用达到阈值时就会主动挑选部分KEY删除以释放更多内存。这叫做内存淘汰机制。
Redis支持8种不同的内存淘汰策略:
1)noeviction: 不删除,直接返回报错信息。
2)volatile-lfu:在设置了过期时间的key中,移除最近最少(最少频率使用)使用的key。
4)volatile-lru:在设置了过期时间的key中,移除最近最久未使用的key。
3)volatile-ttl: 在设置了过期时间的key中,移除准备过期的key。
5)volatile-random:在设置了过期时间的key中,随机移除某个key。
6)allkeys-random:随机移除某个key。
7)allkeys-lru:移除最久未使用的key。
8)allkeys-lfu:移除最近最少使用的key。
比较容易混淆的有两个算法:
- LFU(
LeastFrequentlyUsed),最少频率使用。会统计每个key的访问频率,值越小淘汰优先级越高。 - LRU(
LeastRecentlyUsed),最近最久未使用。用当前时间减去最后一次访问时间,这个值越大则淘汰优先级越高。
不过这里大家要注意一下:Redis中的KEY可能有数百万甚至更多,每个KEY都有自己访问时间或者逻辑访问次数。我们要找出时间最早的或者访问次数最小的,难道要把Redis中所有数据排序?
要知道Redis的内存淘汰是在每次执行命令时处理的。如果每次执行命令都先对全量数据做内存排序,那命令的执行时长肯定会非常长,这是不现实的。
所以Redis采取的是抽样法,即每次抽样一定数量(maxmemory_smples)的key,然后基于内存策略做排序,找出淘汰优先级最高的,删除这个key。这就导致Redis的算法并不是真正的LRU,而是一种基于抽样的近似LRU算法。
4.4.总结
Redis何时删除过期KEY?如何删除?/删除策略?
Redis淘汰策略有哪几种?