无论是在开发过程中还是在准备跑路的面试过程中,有关redis相关的,难免会涉及到四个特殊场景:缓存穿透、缓存雪崩、缓存击穿以及数据一致性。如果在开发中不注意这些场景的话,在高并发场景下有可能会导致系统崩溃,数据错乱等情况。现在,结合实际的业务场景来复现并解决这些问题。
相关技术:springboot2.2.2+mybatisplus3.1+redis5.0+hutool5.8
缓存穿透
缓存穿透是指查询缓存和数据库中都不存在的数据,导致所有的查询压力全部给到了数据库。
比如查询一篇文章信息并对其进行缓存,一般的逻辑是先查询缓存中是否存在该文章,如果存在则直接返回,否则再查询数据库并将查询结果进行缓存。
@Slf4j
@Service
public class DocumentInfoServiceImpl extends ServiceImpl<DocumentInfoMapper, DocumentInfo> implements DocumentInfoService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public DocumentInfo getDocumentDetail(int docId) {
String redisKey = "doc::info::" + docId;
String obj = stringRedisTemplate.opsForValue().get(redisKey);
DocumentInfo documentInfo = null;
if (StrUtil.isNotEmpty(obj)) { //缓存命中
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
log.info("==== select from db ====");
documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
if (ObjectUtil.isNotNull(documentInfo)) { // 缓存结果
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
}
}
return documentInfo;
}
}
复制代码
@GetMapping("/doc/queryById")
public Result queryById(@RequestParam(name = "docId") Integer docId) {
return Result.success(documentInfoService.getDocumentDetail(docId));
}
复制代码
如果项目的并发量不大,这样写的话几乎没啥问题。如果项目的并发量很大,那么这就存在一个隐藏问题,如果在访问了一个不存在的文章(这个文章已经被分享出去,但是在后台可能是被删除或者下线状态),那么就会导致所有的请求全部需要到数据库中进行查询,从而给数据库造成压力,甚至造成宕机。
http://127.0.0.1:8081/doc/queryById?docId=不存在的id
2023-01-05 10:18:57.954 INFO 19692 --- [nio-8081-exec-8] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.121 INFO 19692 --- [nio-8081-exec-5] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.350 INFO 19692 --- [io-8081-exec-10] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.519 INFO 19692 --- [nio-8081-exec-3] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.661 INFO 19692 --- [nio-8081-exec-6] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.859 INFO 19692 --- [nio-8081-exec-4] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:59.012 INFO 19692 --- [nio-8081-exec-9] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:59.154 INFO 19692 --- [nio-8081-exec-7] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
复制代码
解决方案一:缓存空对象
针对缓存穿透问题缓存空对象可以有效避免所产生的影响,当查询一条不存在的数据时,在缓存中存储一个空对象并设置一个过期时间(设置过期时间是为了避免出现数据库中存在了数据但是缓存中仍然是空数据现象),这样可以避免所有请求全部查询数据库的情况。
// 查询对象不存在
if(StrUtil.equals(obj,"")){
log.info("==== select from cache , data not available ====");
return null;
}
if (StrUtil.isNotEmpty(obj)) {
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
log.info("==== select from db ====");
documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
//如果数据不存在,则缓存一个空对象并设置过期时间
stringRedisTemplate.opsForValue().set(redisKey, ObjectUtil.isNotNull(documentInfo)?JSONUtil.toJsonStr(documentInfo):"", 5L, TimeUnit.SECONDS);
// if (ObjectUtil.isNotNull(documentInfo)) {
// stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
// }
}
复制代码
2023-01-05 13:15:01.057 INFO 16600 --- [nio-8081-exec-3] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 13:15:01.214 INFO 16600 --- [nio-8081-exec-4] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====
2023-01-05 13:15:01.384 INFO 16600 --- [nio-8081-exec-5] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====
2023-01-05 13:15:01.540 INFO 16600 --- [nio-8081-exec-6] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====
2023-01-05 13:15:01.720 INFO 16600 --- [nio-8081-exec-7] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====
复制代码
解决方案二:布隆过滤器
缓存空对象的缺点在于无论数据存不存在都需要查询一次数据库,并且redis中存储了大量的空数据,这个时候可以采用布隆过滤器来解决。布隆过滤器可以简单的理解为由一个很长的二进制数组结合n个hash算法计算出n个数组下标,将这些数据下标置为1。在查找数据时,再次通过n个hash算法计算出数组下标,如果这些下标的值为1,表示该值可能存在(存在hash冲突的原因),如果为0,则表示该值一定不存在。
/**
- 布隆过滤器添加元素伪代码
*/
BitArr[] bit = new BitArr[10000]; // 新建一个二进制数组
List insertData = Arrays.asList("A", "B", "C"); // 待添加元素
for (String insertDatum : insertData) {
for (int i=1;i<=3;i++){ // 使用3中hash算法计算出3个数组下标
int bitIdx = hash_i(insertDatum); //hash1(insertDatum),hash2(insertDatum),hash3(insertDatum)
bit[bitIdx]=1; // 将下标元素置为1
}
}
复制代码
/**
- 布隆过滤器查找元素伪代码
*/
BitArr[] bit = new BitArr[10000];
for (int i=1;i<=3;i++){
int bitIdx = hash_i("E"); //计算E的数组下标
if(bit[bitIdx]==0){ //如果对应的元素为0,则一定不存在
return false;
}
}
return true;
复制代码
布隆过滤器的实现
在使用布隆过滤器时有两个核心参数,分别是预估的数据量size以及期望的误判率fpp,这两个参数我们可以根据自己的业务场景和数据量进行自主设置。在实现布隆过滤器时,有两个核心问题,分别是hash函数的选取个数n以及确定bit数组的大小len。
单机版布隆过滤器
目前单机版的布隆过滤器实现方式有很多,比如Guava提供的BloomFilter,Hutool工具包中提供的BitMapBloomFilter等。以Guava为例,需要引入对应的依赖包,在BloomFilter类中提供了create方法来进行布隆过滤器的创建。
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
复制代码
public static BloomFilter localBloomFilter = BloomFilter.create(Funnels.integerFunnel(),10000L,0.01);
复制代码
创建完成后,将需要筛选的数据同步到过滤器中。
/**
- 单机版布隆过滤器数据初始化
*/
@PostConstruct
public void initDocumentDataLocal(){
List<DocumentInfo> documentInfos = documentInfoService.lambdaQuery().select(DocumentInfo::getId).list();
if(CollUtil.isNotEmpty(documentInfos)){
documentInfos.stream().map(DocumentInfo::getId).forEach(e->{
BloomFilterUtil.localBloomFilter.put(e);
});
}
}
复制代码
在业务代码中,可以直接调用BloomFilter提供的mightContain方法,判断目标docId是否可能存在于过滤器中,如果可能存在,那么继续向下执行业务逻辑,否则直接中断执行。
@Override
public DocumentInfo getDocumentDetail(int docId) {
//布隆过滤器拦截
boolean mightContain = BloomFilterUtil.localBloomFilter.mightContain(docId);
if(!mightContain){ //是否有可能存在于布隆过滤器中
log.info("==== select from bloomFilter , data not available ====");
return null;
}
String redisKey = "doc::info::" + docId;
String obj = stringRedisTemplate.opsForValue().get(redisKey);
DocumentInfo documentInfo = null;
if (StrUtil.isNotEmpty(obj)) {
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
log.info("==== select from db ====");
documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
if(ObjectUtil.isNotNull(documentInfo)){
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
}
}
return documentInfo;
}