一、缓存更新策略
1、三种策略
- 内存淘汰:redis自带的内存淘汰机制
- 过期淘汰:利用expire命令给数据设置过期时间
- 主动更新:主动完成数据库和缓存的同时更新
2、策略选择
- 低一致性需求:内存淘汰或过期淘汰
- 高一致性需求:主动更新为主,过期淘汰兜底
3、主动更新的方案
- Cache Aside:缓存调用者在更新数据库的同时完成对缓存的更新
- 一致性良好
- 实现难度一般
- Read/Write Through:缓存与数据库成为一个服务,服务保证两者的一致性,对外暴露的API接口。调用者调用API,无需知道自己操作的数据库还是缓存,不关心一致性
- 一致性优秀
- 实现复杂
- 性能一般
- Write Back:缓存调用者的CRUD都针对缓存完成。由独立线程异步的将缓存写到数据库,实现最终一致
- 一致性差
- 性能好
- 实现复杂
二、缓存存在的问题
1、缓存穿透
产生原因:客户端请求的数据在缓存和数据库中都不存在。当这种情况大量出现或被恶意攻击时,接口的访问全部透过Redis访问数据库,而数据库中也没有这些数据,我们称这种现象为"缓存穿透"。
解决方案:
- 缓存空对象:对于不存在的数据也在Redis建立缓存,值为空,设置一个较短的TTL时间
- 优点:实现简单,维护方便
- 缺点:额外消耗内存,短期的数据不一致
2.布隆过滤:利用布隆过滤算法,在请求Redis之前先判断是否存在,如果不存在则直接拒绝访问
- 优点:内存占用少
- 缺点:实现复杂,存在误判的可能性
3.其他方法:
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点数据的限流
布隆过滤器:
一种数据结构,由一串很长的二进制向量组成,可以将其看成一个二进制数组。
当要向布隆过滤器中添加一个元素key时,我们通过多个hash函数,算出一个值,然后将这个值所在的方格置为1。
因为多个不同的数据通过hash函数算出来的结果是会有重复的,所以布隆过滤器可以判断某个数据一定不存在,但是无法判断一定存在。
优点:优点很明显,二进制组成的数组,占用内存极少,并且插入和查询速度都足够快。
缺点:随着数据的增加,误判率会增加;还有无法判断数据一定存在;另外还有一个重要缺点,无法删除数据。
2、缓存雪崩
产生原因:在同一时间段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力
解决方案:
- 给不同的Key的TTL设置随机值
- 利用Redis集群提高服务的可用性
- 诶缓存业务添加降级限流策略
- 给业务添加多级缓存
3、缓存击穿
产生原因:热点Key在某一个时间段被高并发访问,而此时Key正好过期,如果重建缓存时间耗时长,在这段时间内大量请求剾数据库,带来巨大冲击
解决方案:
解决方案:
1.设置value永不过期:通过定时任务进行数据库查询更新缓存,当然前提时不会给数据库造成压力过大
- 优点:最可靠,性能好
- 缺点:占空间,内存消耗大,一致性差
2.互斥锁:给缓存重建过程加锁,确保重建过程只有一个线程执行,其他线程等待
- 优点:实现简单,没有额外内存消耗,一致性好
- 缺点:等待导致性能下降,有死锁风险
3.逻辑过期:热点Key缓存永不过期,认识设置一个逻辑过期时间,查询到数据时通过对逻辑时间判断,来决定是否需要进行缓存重建。重建过程也通过互斥锁来保证单线程执行。利用独立线程异步执行,其他线程无需等待,直接查询到旧的数据即可。
- 优点:线程无需等待,性能较好
- 缺点:不保证一致性,有额外内存消耗,实现复杂
private final RedisTemplate<String, String> redisTemplate; private static final ExecutorService CACHE_REBUILD_EXECUTOR = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(20), r -> new Thread(r, "cache_rebuild")); public CacheClient(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } public void setWithLogicalExpire(String key, Object value, Long expireTime, TimeUnit unit) { // 设置逻辑过期时间 RedisData redisData = new RedisData(); redisData.setValue(value); redisData.setExpireTime(LocalDateTime.now().plusNanos(unit.toNanos(expireTime))); redisTemplate.opsForValue().set(key, JSON.toJSONString(redisData)); } /** * 逻辑过期,互斥锁获取值,用于避免热点数据出现缓存击穿 */ public <R, V> R getMutex(String keyPrefix, V id, Class<R> clazz, Function<V, R> dbFallback, Long expireTime, TimeUnit unit) { String key = keyPrefix + id; String value = redisTemplate.opsForValue().get(key); if (StringUtils.isBlank(value)) { return null; } RedisData redisData = JSON.parseObject(value, RedisData.class); R result = JSONUtil.toBean((JSONObject) redisData.getValue(), clazz); if (redisData.getExpireTime().isAfter(LocalDateTime.now())) { return result; } // 如果缓存已过期,则尝试更新 String localKey = RedisConstant.LOCK + id; // 获取锁成功 if (getLock(localKey)) { // 异步更新缓存 CACHE_REBUILD_EXECUTOR.submit( () -> { try { R res = dbFallback.apply(id); this.setWithLogicalExpire(key, res, expireTime, unit); } catch (Exception e) { throw new RuntimeException(e); } finally { unLock(localKey); } } ); } return result; } private boolean getLock(String key) { // 直接返回会进行自动拆箱,可能会出现空指针异常 return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1")); } private void unLock(String key) { redisTemplate.delete(key); }
三、解决缓存问题
1、自定义分布式锁
/** * <pre> * 简易实现的Redis分布式锁 * </pre> * * @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a> * @date 2023/2/26 21:18 */ public class SimpleRedisLock { private final RedisTemplate<String, String> redisTemplate; /** 锁的名字,根据业务设置 */ private final String lockName; /** * key前缀 */ private static final String KEY_PREFIX = "lock:"; /** * value中线程标识的前缀(为每个节点提供一个随机的前缀,避免集群部署下线程id出现重复而导致value出现相同的情况) */ private static final String ID_PREFIX = UUID.fastUUID().toString(true); /** * 释放锁逻辑的lua脚本 */ private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); } public SimpleRedisLock(String lockName, RedisTemplate<String, String> redisTemplate) { this.lockName = lockName; this.redisTemplate = redisTemplate; } public boolean tryLock(long timeoutSec) { long threadId = Thread.currentThread().getId(); // 返回的是Boolean类型,直接return会进行自动拆箱,可能会出现空指针异常 // 需要为锁设置过期时间,防止因服务宕机而导致锁无法释放 return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + lockName, ID_PREFIX + threadId, timeoutSec, TimeUnit.SECONDS)); } public void unlock() { redisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + lockName), ID_PREFIX + Thread.currentThread().getId() ); } }
Lua脚本——unlock.lua
--- 比较线程标识与锁中的标识是否一致 if(redis.call('get', KEYS[1]) == ARGS[1]) then --- 释放锁 return redis.call('del', KEYS[1]) end return 0
使得释放锁的操作具有原子性
Redis是单线程处理,本身不会存在并发问题,但是由于可能有多个客户端访问,每个客户端会有一个线程,之间存在竞争,所以服务端收到的指令有可能出现多个客户端的指令穿插,而lua脚本可以保证多条指令的原子性从而解决并发问题
2、解决缓存穿透问题
/** * 避免缓存穿透的获取 */ public <R, V> R get(String keyPrefix, V id, Class<R> clazz, Function<V, R> dbFallback, Long expireTime, TimeUnit unit) { String key = keyPrefix + id; // 查询缓存 String value = redisTemplate.opsForValue().get(key); // 缓存存在则直接返回 if (StringUtils.isNotBlank(value)) { return JSON.parseObject(value, clazz); } // 缓存不存在(到此处说明value要么是空,要么是null) if (value != null) { // 不为null则说明为“”,代表数据不存在,直接返回null,不用查询数据库(解决缓存穿透问题) return null; } // value为null则查询数据库获取数据进行更新 R result = dbFallback.apply(id); if (result == null) { // 数据库查询不到结果,则存入空串避免缓存穿透 redisTemplate.opsForValue().set(key, "", RedisConstant.CACHE_NULL_TTL, TimeUnit.MINUTES); return null; } // 查询到结果,写回缓存 this.set(key, result, expireTime, unit); return result; }
3、解决缓存击穿问题
/** * 逻辑过期,互斥锁获取值,用于避免热点数据出现缓存击穿 */ public <R, V> R getMutex(String keyPrefix, V id, Class<R> clazz, Function<V, R> dbFallback, Long expireTime, TimeUnit unit) { String key = keyPrefix + id; String value = redisTemplate.opsForValue().get(key); if (StringUtils.isBlank(value)) { return null; } RedisData redisData = JSON.parseObject(value, RedisData.class); R result = JSONUtil.toBean((JSONObject) redisData.getValue(), clazz); if (redisData.getExpireTime().isAfter(LocalDateTime.now())) { return result; } // 如果缓存已过期,则获取锁尝试更新 SimpleRedisLock lock = new SimpleRedisLock(key, redisTemplate); // 获取锁成功 if (lock.tryLock(5)) { // 异步更新缓存 CACHE_REBUILD_EXECUTOR.submit( () -> { try { R res = dbFallback.apply(id); this.setWithLogicalExpire(key, res, expireTime, unit); } catch (Exception e) { throw new RuntimeException(e); } finally { lock.unlock(); } } ); } return result; }