2023.2.20—–商户查询缓存
什么是缓存
数据交换的缓存区,是存储数据的临时地方,一般读写效率高
web应用中:
缓存在web应用中,缓存可以降低后端的负载、提高读写效率、降低响应时间
成本 : 数据的一致性成本、代码维护成本、运维成本…..
如何添加缓存
业务流程分析 与 模型
- 从redis中查询商铺的缓存
- 判断redis中是否存在该id的商户
- 如果存在 : 返回商户的信息
- 如果不存在: 根据传入的id查询数据库 ,判断数据库中是否存在商户,如果不存在就返回401
- 数据库中 商户如果存在就将商户信息写入redis
- 返回商户信息
接口:
/** * 根据id查询商铺信息 * @param id 商铺id * @return 商铺详情数据 */ @GetMapping("/{id}") public Result queryShopById(@PathVariable("id") Long id) { return shopService.queryById(id); }
Service层
@Service public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService { @Resource private StringRedisTemplate stringRedisTemplate; @Override public Result queryById(Long id) { String Iid = String.valueOf(id); //强转时会出现异常 // 1. 从redis中查询商铺的缓存 String shopJson = stringRedisTemplate.opsForValue().get(Iid); // 2. 判断redis中是否存在该id的商户 if(StrUtil.isNotBlank(shopJson)){ // 3. 如果存在 : 返回商户的信息 Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } Shop shopN = getById(id); // 4. 如果不存在: // 4.1根据传入的id查询数据库 ,判断数据库中是否存在商户 String key = CACHE_SHOP_KEY + id; if(shopN == null){ //4.3 如果不存在就返回401 return Result.fail("店铺不存在!"); } // 4.2数据库中 商户如果存在就将商户信息写入redis stringRedisTemplate.opsForValue().set(key,shopN.toString()); // 6. 返回商户信息 return Result.ok(shopN); } }
给商品类型添加redis缓存
service层
@Service public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService { @Resource private StringRedisTemplate stringRedisTemplate; @Override public Result queryTypeList() { ShopType shopType = new ShopType(); //先查看缓存中是否存在 List<String> range = stringRedisTemplate.opsForList().range("shopTypeList", 0, -1); //如果存在,那么就直接返回 if(!range.isEmpty()){ return Result.ok(range); } //如果不存在,先从数据库中查到,然后再交给redis,然后再返回 List<ShopType> sort = query().orderByAsc("sort").list(); for(ShopType item : sort){ stringRedisTemplate.opsForList().rightPush("shopTypeList", JSONUtil.toJsonStr(item)); } List<String> typeList = stringRedisTemplate.opsForList().range("shopTypeList", 0, -1); return Result.ok(typeList); } }
缓存更新策略- —双写一致性问题
解决数据同步的问题
解决策略
场景:
低一致性需求: 使用内存淘汰机制,例如店铺类型的查询缓存
高一致性需求: 主动更新,并且使用超时剔除作为兜底方案。例如店铺的详情查询
主动更新的策略
- 由缓存的调用者,在更新数据库的同时更新缓存(常用!)
- 缓存与数据库整合为一个服务,由服务来维护一致性。,调用者无需关系缓存一致性问题
- 调用者只操作缓存,由其他线程异步的将缓存数据持久化到数据库,保证最终的一致
主动更新策略的考虑问题
删除缓存还是更新缓存?
- 更新缓存:每次更新数据库都更新缓存,无效写操作较多
- 删除缓存:更新数据库时让缓存失效,查询时再更新缓存
如何保证缓存与数据库的操作的同时成功或失败?
- 单体系统,将缓存与数据库操作放在一个事务
- 分布式系统,利用TCC等
分布式事务方案先操作缓存还是先操作数据库?
- 先操作数据库,再删除缓存 / 反之亦可
缓存更新策略的最佳实践方案:
低一致性需求:
- 使用Redis自带的内存淘汰机制
高一致性需求:
- 主动更新,并以超时剔除作为兜底方案
- 读操作:缓存命中则直接返回缓存未命中则查询数据库,并写入缓存,设定超时时间
- 写操作:先写数据库,然后再删除缓存要确保数据库与缓存操作的原子性
操作:
查询商户时设置超时删除策略
// 4.2数据库中 商户如果存在就将商户信息写入redis ,超时删除30min stringRedisTemplate.opsForValue().set(key,shopN.toString(),30, TimeUnit.MINUTES);
每次更新数据时,就会先删除缓存,然后再从次查询时会先从数据库中查出更新过的数据保存到缓存中去,然后再回显
/** * 更新商铺信息 * @param shop 商铺数据 * @return 无 */ @PutMapping public Result updateShop(@RequestBody Shop shop) { // 写入数据库 return shopService.update(shop); }
//todo 更新数据库删除缓存 @Override @Transactional //添加事务 public Result update(Shop shop) { //1. 更新数据库 updateById(shop); if(shop.getId() == null){ return Result.fail("店铺id不能为空!!!"); } Long id = shop.getId(); String key = CACHE_SHOP_KEY + id; //删除缓存 stringRedisTemplate.delete(key); return Result.ok(); } }
缓存穿透
指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
解决办法
1.缓存空对象 : 如果用户恶意多次查找数据库和缓存中都不存在的对象,我们可以给这种对象赋一个空的对象到redis,这样无论多少次恶意请求,他都不会多次访问数据库,只要一次访问不到,那么就只能到缓存中拿空对象了
- 优点:实现简单,维护方便
- 缺点:额外的内存消耗可能造成短期的不一致
- 控制ttl时间,可以实现短期不一致的降低
2.布隆过滤
- 优点 :内存占用非常小
- 缺点: 实现复杂、存在误判的可能
实现
- 如果用户第一次查询,没有从缓存和数据库中查出数据,那么就创建一个空值(key1, “” ),存入redis
- 后面如果再次查询不存在的用户key1,那么就可以从缓存中查询将空值拿出来,然后直接返回,这样就可以不用操作数据库
- 如果2中拿出来的值为null,那么就说明店铺
String key = CACHE_SHOP_KEY + id; /** * 使用缓存穿透 * 判断是否为null ,因为如果是null的话 * ---------------需要好好理解以下逻辑--------------------- */ /*isNotBlank : 判断某字符串是否不为空且长度不为0且不由空白符""(whitespace)构成 如果缓存中有查询需要的数据且不等于”“,那么就会在上面一步直接返回 如果查询出有需要的数据且值为空”“(不等于null) 那么就会到我们这一步进行返回,因为之前查过数据库,没有这个数据 如果之前没查过这个数据,那么就不会给他赋值为”“ 而至直接查出来的是null,就去数据库中查 */ if(shopJson != null){ return Result.fail("店铺不存在!"); } /** * --------------------------------------------------- */ Shop shopN = getById(id); // 4. 如果不存在: // 4.1根据传入的id查询数据库 ,判断数据库中是否存在商户 if(shopN == null){ //4.3 如果不存在就返回401 /**使用缓存穿透 *查出数据库中也不存在,那么设置一个空值,下次(在规定时间内)再查他就不会再到数据库中查了 */ stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES); return Result.fail("店铺不存在!"); }
整个逻辑的代码实现
@Override public Result queryById(Long id) { String key = CACHE_SHOP_KEY + id; //强转时会出现异常 // 1. 从redis中查询商铺的缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); // 2. 判断redis中是否存在该id的商户 if(StrUtil.isNotBlank(shopJson)){ // 3. 如果存在: 返回商户的信息 Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } /** * 使用缓存穿透 * 判断是否为null ,因为如果是null的话 * ---------------需要好好理解以下逻辑--------------------- */ /*isNotBlank : 判断某字符串是否不为空且长度不为0且不由空白符""(whitespace)构成 如果缓存中有查询需要的数据且不等于”“,那么就会在上面一步直接返回 如果查询出有需要的数据且值为空”“(不等于null) 那么就会到我们这一步进行返回,因为之前查过数据库,没有这个数据 如果之前没查过这个数据,那么就不会给他赋值为”“ 而至直接查出来的是null,就去数据库中查 */ if(shopJson != null){ return Result.fail("店铺不存在!"); } /** * --------------------------------------------------- */ Shop shopN = getById(id); // 4. 如果不存在: // 4.1根据传入的id查询数据库 ,判断数据库中是否存在商户 String key = CACHE_SHOP_KEY + id; if(shopN == null){ //4.3 如果不存在就返回401 /**使用缓存穿透 *查出数据库中也不存在,那么设置一个空值,下次(在规定时间内)再查他就不会再到数据库中查了 */ stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES); return Result.fail("店铺不存在!"); } // 4.2数据库中 商户如果存在就将商户信息写入redis ,超时删除30min stringRedisTemplate.opsForValue().set(key,shopN.toString(),30, TimeUnit.MINUTES); // 6. 返回商户信息 return Result.ok(shopN); }
缓存穿透产生的原因是什么?
用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力
缓存穿透的解决方案有哪些?
缓存null值、布隆过滤、增强id的复杂度,避免被猜测id规律、做好数据的基础格式校验、加强用户权限校验、做好热点参数的限流
缓存雪崩
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
服务延机最为可怕。
解决方案:
给不同的Key的TTL添加随机值、利用Redis集群提高服务的可用性、给缓存业务添加降级限流策略、给业务添加多级缓存
(后面的三个暂未实现)
String key = CACHE_SHOP_KEY + id + RandomUtil.randomInt(4);
缓存击穿
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
解决方法(需要在一致性和可用性上做出选择)
互斥锁优点 :
- 没有额外的内存消耗
- 保证了一致性
- 实现简单
缺点:
- 性能受影响
- 可能有死锁风险
- 从redis中查询数据,如果没查到,那么就返回null,到数据库中找,然后返回
- 如果找到了
- 判断缓存是否过期,未过期那就返回,并说明找到了
- 如果缓存过期了
1.尝试获取互斥锁,并判断是否获取到了互斥锁
2.如果获取到了,那么就开启独立线程,然后再从数据库中找打,然后写入redis并设置逻辑过期时间
3.释放互斥锁
逻辑过期
优点:
- 线程无需等待
- 性能优秀
缺点:
- 不保证一致性
- 有额外的内存消耗
- 实现复杂
实现
互斥锁的方式:
@Override public Result queryById(Long id) { // 方法一: 缓存空对象解决缓存穿透 //Shop shop = queryWithPassThrough(id); //方法二 : 互斥锁解决 缓存击穿 Shop shop = queryWithMutex(id); if(shop == null){ return Result.fail("店铺不存在!!!"); } // 3. 返回商户信息 return Result.ok(shop); }
//todo 缓存 public Shop queryWithMutex(Long id){ String key = CACHE_SHOP_KEY + id; System.out.println(key); // 1. 从redis中查询商铺的缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); // 2. 判断redis中是否存在该id的商户 if(StrUtil.isNotBlank(shopJson)){ //3. 如果存在: 返回商户的信息 Shop shop = JSONUtil.toBean(shopJson, Shop.class); return shop; } if(shopJson != null){ return null; } Shop shopN = null; //未命中---------尝试获取互斥锁-------- String lockKey = LOCK_SHOP_KEY + id; try { //1. 判断获取互斥锁是否成功 boolean isLock = tryLock(lockKey); //2.失败休眠,成功就获取 if (!isLock){ Thread.sleep(50); queryWithMutex(id); //递归重试获取互斥锁 } //!获取互斥锁成功 shopN = getById(id); if(shopN == null){ stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES); return null; } // 4.2数据库中 商户如果存在就将商户信息写入redis ,超时删除30min stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shopN),30, TimeUnit.MINUTES); }catch (InterruptedException e){ throw new RuntimeException(e); } finally { //释放互斥锁 unLock(lockKey); } //6. 返回商户信息 return shopN; } //todo 获取锁 boolean tryLock(String key){ Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.MINUTES); return BooleanUtil.isTrue(aBoolean); } //todo 释放锁 void unLock(String key){ stringRedisTemplate.delete(key); } //todo 缓存穿透 public Shop queryWithPassThrough(Long id){ String key = CACHE_SHOP_KEY + id; System.out.println(key); // 1. 从redis中查询商铺的缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); // 2. 判断redis中是否存在该id的商户 if(StrUtil.isNotBlank(shopJson)){ //3. 如果存在: 返回商户的信息 Shop shop = JSONUtil.toBean(shopJson, Shop.class); return shop; } if(shopJson != null){ return null; } Shop shopN = getById(id); // 4. 如果不存在: if(shopN == null){ stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES); return null; } // 4.2数据库中 商户如果存在就将商户信息写入redis ,超时删除30min stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shopN),30, TimeUnit.MINUTES); // 6. 返回商户信息 return shopN; }
方法二: 逻辑过期解决缓存击穿
暂未实现
缓存工具封装
方法: java转json
将Java对象序列化为json并存储在String类型的key中,并且能够设置TTL过期时间
public void set(String key, Object value, Long time , TimeUnit unit){ stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit); }
方法: java转json,用于处理缓存击穿
将Java对象序列化为json并存储在String类型的key中,并且能够设置TTL过期时间,用于处理缓存击穿问题
public void setWithLogicalExpire(String key, Object value, Long time , TimeUnit unit){ //设置逻辑过期时间 RedisData redisData = new RedisData(); redisData.setData(value); redisData.setExpireTime(LocalDateTime.now().plusSeconds(time)); //写入redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData)); }
方法: 根据指定key查询缓存,并转为指定类型
根据指定key查询缓存,并转为指定类型,利用缓存空值的方式解决缓存穿透问题
/** * 封装 解决缓存穿透问题 * @param keyPre key的实际前缀 * @param id 需要查询的XXX的id * @param type 查询的信息的类型 * @param dbFallback 函数式编程的方法 * @param time 缓存时间 * @param unit 时间单位(TimeUnit.MINUTES) * @param <R> 返回值类型 * @param <ID> id的类型 * @return 返回查询到的信息 */ public <R,ID> R queryWithPassThrough( String keyPre, ID id , Class<R> type , Function<ID , R> dbFallback, Long time , TimeUnit unit){ String key = keyPre + id; // 1. 从redis中查询商铺的缓存 String Json = stringRedisTemplate.opsForValue().get(key); // 2. 判断redis中是否存在该id的商户 if(StrUtil.isNotBlank(Json)){ //3. 如果存在 : 返回信息 return JSONUtil.toBean(Json, type); } if(Json != null){ return null; } R r = dbFallback.apply(id); // 4. 如果不存在: if(r == null){ stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES); return null; } this.set(key,r,time,unit); return r; }
2023.2.24 —– 优惠劵秒杀
全局唯一ID
问题描述:
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
- id的规律性太明显
- 受单表数据量的限制
- 容易造成数据泄露的问题
全局ID生成器
它是一种在分布式系统下用来生成全局唯一id的工具,(也称分布式唯一id)。
特性: 唯一性、高性能、高可用、安全性、递增性
ID的自增: 不使用redis自增的数值,而是拼接一些其他的信息 :
ID的组成 :
- 符号位: 1bit ,永远为0
- 时间戳:31bit,以秒为单位,可以使用69年
- 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
实现
/** * id生成器 * 时间戳 * - 符号位: 1bit ,永远为0 * - 时间戳:31bit,以秒为单位,可以使用69年 * - 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID */ @Component public class RedisIdWorker { @Resource private StringRedisTemplate stringRedisTemplate; //起始时间戳 private static final long BEGIN_TIMESTAMP = 1620995200L; //设置序列号的位数 private static final int COUNT_BITS = 32; public long nextId(String keyPre){ //1. 生成时间戳 LocalDateTime now = LocalDateTime.now(); long nowSeconds = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSeconds - BEGIN_TIMESTAMP; //2. 生成序列号 (将日期精确到天) String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd")); Long aLong = stringRedisTemplate.opsForValue().increment("icr:" + keyPre + ":" + date); //3. 拼接 return timestamp << COUNT_BITS | aLong; } }
全局ID的生成策略 : UUID、Redis自增、snowflake算法、数据库自增
Redis自增ID策略:每天一个key,方便统计订单量
ID构造是 时间戳 + 计数器
实现优惠卷秒杀下单
背景
实现
- 下单时需要判断两点:
- 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
库存是否充足,不足则无法下单
//1. 提交优惠卷id //2. 查询优惠卷信息 //3. 判断秒杀是否开启 //否 返回异常, 结束 //4. 是,判断库存是否充足 //否 返回异常, 结束 //5. 是,扣减库存,创建订单,返回订单信息
@Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private RedisIdWorker redisIdWorker; @Resource private ISeckillVoucherService seckillVoucherService; /** * 实现优惠卷秒杀下单 * @param voucherId * @return */ @Override public Result seckillVoucher(Long voucherId) { //1. 提交优惠卷id SeckillVoucher voucher = seckillVoucherService.getById(voucherId); //2. 查询优惠卷信息 //3. 判断秒杀是否开启 if(voucher.getBeginTime().isAfter(LocalDateTime.now())){ //否 返回异常, 结束 return Result.fail("秒杀未开始!"); } if(voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束!"); } //4. 是,判断库存是否充足 //否 返回异常, 结束 Integer stock = voucher.getStock(); if (stock == 0){ return Result.fail("已经被抢完了!"); } //5. 是,扣减库存 boolean update = seckillVoucherService.update().setSql("stock = stock - 1") .eq("voucher_id", voucherId).update(); //6. 创建订单 .返回订单信息 if(!update){ return Result.fail("库存不足!"); } VoucherOrder order = new VoucherOrder(); //创建用户id,代金卷id ,订单id long orderId = redisIdWorker.nextId("order"); order.setId(orderId); Long UserId = UserHolder.getUser().getId(); order.setUserId(UserId); order.setVoucherId(voucherId); save(order); return Result.ok(orderId); } }
超卖问题
问题描述:
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:
实现
乐观锁 和悲观锁!!!
乐观锁
乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种: 版本号法、CAS法…
//5. 是,扣减库存 boolean update = seckillVoucherService.update().setSql("stock = stock - 1") .eq("voucher_id", voucherId).eq("stock",voucher.getStock()) //对乐观锁的判断 .update()
一人一单
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
1.提交优惠卷id
2.查询优惠卷信息
3.判断秒杀是否开启
//否 返回异常, 结束
4.是,判断库存是否充足
//否 返回异常, 结束
5.是,扣减库存,创建订单,返回订单信息
6.根据用户id和优惠卷id查询该用户是否下过单,如果下过直接返回异常,反之继续执行之前的操作
@Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private RedisIdWorker redisIdWorker; @Resource private ISeckillVoucherService seckillVoucherService; /** * 实现优惠卷秒杀下单 * @param voucherId * @return */ @Override public Result seckillVoucher(Long voucherId) { //1. 提交优惠卷id SeckillVoucher voucher = seckillVoucherService.getById(voucherId); //2. 查询优惠卷信息 //3. 判断秒杀是否开启 if(voucher.getBeginTime().isAfter(LocalDateTime.now())){ //否 返回异常, 结束 return Result.fail("秒杀未开始!"); } if(voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束!"); } //4. 是,判断库存是否充足 //否 返回异常, 结束 Integer stock = voucher.getStock(); if (stock < 1){ return Result.fail("已经被抢完了!"); } Long UserId = UserHolder.getUser().getId(); synchronized (UserId.toString().intern()){ IVoucherOrderService orderService = (IVoucherOrderService) AopContext.currentProxy(); return createVoucherOrder(voucherId); } } /** * 对于一人一单加安全锁 * @param voucherId * @return */ @Transactional public Result createVoucherOrder(Long voucherId){ //判断用户是否购买过 Long UserId = UserHolder.getUser().getId(); /** * 一人一单解决,加锁 */ Integer count = query().eq("user_id", UserId).eq("voucher_id", voucherId).count(); if(count > 0){ return Result.fail("该用户已经购买过了!"); } //5. 库存充足,扣减库存 boolean update = seckillVoucherService.update().setSql("stock = stock - 1") .eq("voucher_id", voucherId).eq("stock",0) //对乐观锁的判断 .update(); if(!update){ return Result.fail("库存不足!"); } VoucherOrder order = new VoucherOrder(); //创建用户id,代金卷id ,订单id long orderId = redisIdWorker.nextId("order"); order.setId(orderId); //6. 创建订单 .返回订单信息 order.setUserId(UserId); order.setVoucherId(voucherId); save(order); return Result.ok(voucherId); } }
2023.2.28—–实现秒杀业务出现问题
优惠卷秒杀持续ing…..
分布式锁
多线程状态下实现同步的锁。
之前我们设置的锁,只是相对于同一jvm下的,如果部署在集群模式下那么这种情况就会出现危险,还是会引起插麦问题,或者说超问题下单。所以这里就需要用到分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
功能
实现分布式锁
分布式锁的核心是多进程之间互斥, 满足这一点,常见的有三种
获取锁
互斥,确保只有一个线程获取锁
//如果获取锁失败,直接返回false //成功则执行下面的业务逻辑 SETNX lock thread1 EXPIRE lock 10
添加过期时间,防止服务延机导致服务挂了。
业务逻辑
1.先尝试获取锁,返回结果。失败返回false
2.成功就执行业务,最后释放锁
public class SimpleRedisLock implements ILock { private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:"; //todo 获取分布式锁 @Override public boolean tryLock(long timeOutSec) { //获取线程的表示 long value = Thread.currentThread().getId(); //获取锁 Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, value + "", timeOutSec, TimeUnit.MINUTES); //注意自动拆箱出现的空指针错误 return Boolean.TRUE.equals(aBoolean); } //todo 释放锁 @Override public void unLock() { stringRedisTemplate.delete(KEY_PREFIX + name); } }
释放锁
//手动释放 //超时自动释放:根据上面设置的过期时间 //del lock //todo 释放锁 @Override public void unLock() { stringRedisTemplate.delete(KEY_PREFIX + name); }
实现业务
/** * 获取互斥锁,只允许一个进入 */ //1. 创建锁对象 SimpleRedisLock lock = new SimpleRedisLock("order:" + UserId, stringRedisTemplate); //2. 获取锁 ( 设置超时时间) boolean isLock = lock.tryLock(1200); //2.1 S获取锁不成功 if(!isLock){ return Result.fail("不允许重复下单!"); } //2.2 获取锁成功 try { IVoucherOrderService orderService = (IVoucherOrderService) AopContext.currentProxy(); return orderService.createVoucherOrder(voucherId); } finally { //关闭锁 lock.unLock(); }
/** * 实现优惠卷秒杀下单 * @param voucherId * @return */ @Override public Result seckillVoucher(Long voucherId) { //1. 提交优惠卷id SeckillVoucher voucher = seckillVoucherService.getById(voucherId); //2. 查询优惠卷信息 //3. 判断秒杀是否开启 if(voucher.getBeginTime().isAfter(LocalDateTime.now())){ //否 返回异常, 结束 return Result.fail("秒杀未开始!"); } if(voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束!"); } //4. 是,判断库存是否充足 //否 返回异常, 结束 Integer stock = voucher.getStock(); if (stock < 1){ return Result.fail("已经被抢完了!"); } Long UserId = UserHolder.getUser().getId(); /** * 获取互斥锁,只允许一个进入 */ //1. 创建锁对象 SimpleRedisLock lock = new SimpleRedisLock("order:" + UserId, stringRedisTemplate); //2. 获取锁 ( 设置超时时间) boolean isLock = lock.tryLock(1200); //2.1 获取锁不成功 if(!isLock){ return Result.fail("不允许重复下单!"); } //2.2 获取锁成功 try { IVoucherOrderService orderService = (IVoucherOrderService) AopContext.currentProxy(); return orderService.createVoucherOrder(voucherId); } finally { //关闭锁 lock.unLock(); } } /** * 对于一人一单加安全锁 * @param voucherId * @return */ @Transactional public Result createVoucherOrder(Long voucherId){ //判断用户是否购买过 Long UserId = UserHolder.getUser().getId(); /** * 一人一单解决,加锁 */ Integer count = query().eq("user_id", UserId).eq("voucher_id", voucherId).count(); if(count > 0){ return Result.fail("该用户已经购买过了!"); } //5. 库存充足,扣减库存 boolean update = seckillVoucherService.update().setSql("stock = stock - 1") .eq("voucher_id", voucherId).eq("stock",0) //对乐观锁的判断 .update(); if(!update){ return Result.fail("库存不足!"); } VoucherOrder order = new VoucherOrder(); //创建用户id,代金卷id ,订单id long orderId = redisIdWorker.nextId("order"); order.setId(orderId); //6. 创建订单 .返回订单信息 order.setUserId(UserId); order.setVoucherId(voucherId); save(order); return Result.ok(voucherId); }
分布式锁的改进
防止因为业务阻塞而引起的误删除其他人的锁。
改进方法:
在执行释放锁的时候需要再次确认是否是自己的锁
1.在获取锁时存入线程标示(可以用UUID表示)
2.在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
1.如果一致则释放锁 ; b. 如果不一致则不释放锁
public class SimpleRedisLock implements ILock { private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:"; private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"="; //todo 获取分布式锁 @Override public boolean tryLock(long timeOutSec) { //获取线程的表示 String value = ID_PREFIX + Thread.currentThread().getId(); //获取锁 Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, value + "", timeOutSec, TimeUnit.MINUTES); //注意自动拆箱出现的空指针错误 return Boolean.TRUE.equals(aBoolean); } //todo 释放锁 @Override public void unLock() { //获取线程标识 String threadId = ID_PREFIX + Thread.currentThread().getId(); //获取锁中的标识 String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); //判断两种标识是否一致 if(id.equals(threadId)){ stringRedisTemplate.delete(KEY_PREFIX + name); } } }
基于Redis的分布式锁实现思路:
- 利用set nx ex获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
- 利用set nx满足互斥性
- 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发特性
之前实现的分布式锁存在的问题
总结问题
根据项目实现思路一步步排查,依旧出现无法秒杀的问题,以及一人一单的问题依旧无法准确实现
暂未解决!!!