单体应用解决超卖问题
@Service @Slf4j public class OrderService { @Resource private OrderMapper orderMapper; @Resource private OrderItemMapper orderItemMapper; @Resource private ProductMapper productMapper; //购买商品id private int purchaseProductId = 100100; //购买商品数量 private int purchaseProductNum = 1; @Autowired private PlatformTransactionManager platformTransactionManager; @Autowired private TransactionDefinition transactionDefinition; private Lock lock = new ReentrantLock();// 可重入锁 public Integer createOrder() throws Exception{ Product product = null; //上锁 lock.lock(); try { //手动开启事务 TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition); product = productMapper.selectByPrimaryKey(purchaseProductId); if (product==null){ //抛异常,回滚事务 platformTransactionManager.rollback(transaction1); throw new Exception("购买商品:"+purchaseProductId+"不存在"); } //商品当前库存 Integer currentCount = product.getCount(); System.out.println(Thread.currentThread().getName()+"库存数:"+currentCount); //校验库存 if (purchaseProductNum > currentCount){ //抛异常,回滚事务 platformTransactionManager.rollback(transaction1); throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买"); } productMapper.updateProductCount(purchaseProductNum,"xxx",new Date(),product.getId()); //正常执行,提交事务 platformTransactionManager.commit(transaction1); }finally { // 释放锁 lock.unlock(); } // 其他创建订单业务逻辑 } }
我们再finally里面释放锁,方法还未结束,事务还未提交,高并发下,恰好在锁释放后,事务提交前,读到了未提交的脏数据,导致超卖!
解决上述问题,需要使用aop锁
1、定义注解:
@Target({ElementType.PARAMETER,ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface ServiceLock { String description() default ""; }
2、定义切面
@Component @Scope @Aspect @Order(1) // 表示在事务之前执行 public class LockAspect { // 定义锁对象 private static Lock lock = new ReentrantLock(true); @Pointcut("@annotation(com.sugo.seckill.aop.lock.ServiceLock)") public void lockAspect(){ } // 增强方法 @Around("lockAspect()") public Object around(ProceedingJoinPoint joinPoint){ Object obj = null; // 加锁 lock.lock(); // 执行业务 try { obj = joinPoint.proceed(); } catch (Throwable throwable) { throwable.printStackTrace(); } finally { // 释放锁 lock.unlock(); } return obj; } }
3、使用注解
@Transactional @ServiceLock @Override public HttpResult startKilledByLocked(Long killId, String userId) { try { // 从数据库查询商品数据 TbSeckillGoods seckillGoods = seckillGoodsMapper.selectByPrimaryKey(killId); //判断 if(seckillGoods == null){ return HttpResult.error(HttpStatus.SEC_GOODS_NOT_EXSISTS,"商品不存在"); } if(seckillGoods.getStatus() != 1){ return HttpResult.error(HttpStatus.SEC_NOT_UP,"商品未审核"); } if(seckillGoods.getStockCount() <= 0){ return HttpResult.error(HttpStatus.SEC_GOODS_END,"商品已售罄"); } if(seckillGoods.getStartTimeDate().getTime() > new Date().getTime()){ return HttpResult.error(HttpStatus.SEC_ACTIVE_NOT_START,"活动未开始"); } if(seckillGoods.getEndTimeDate().getTime() <= new Date().getTime()){ return HttpResult.error(HttpStatus.SEC_ACTIVE_END,"活动结束"); } //库存扣减 seckillGoods.setStockCount(seckillGoods.getStockCount() - 1); //更新库存 seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods); //下单 TbSeckillOrder order = new TbSeckillOrder(); order.setSeckillId(killId); order.setUserId(userId); order.setCreateTime(new Date()); order.setStatus("0"); order.setMoney(seckillGoods.getCostPrice()); seckillOrderMapper.insertSelective(order); return HttpResult.ok("秒杀成功"); } catch (Exception e) { e.printStackTrace(); } return null; }
分布式锁
基于Redis实现分布式锁
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
定义RedisLock
@Slf4j public class RedisLock implements AutoCloseable { private RedisTemplate redisTemplate; private String key; private String value; //单位:秒 private int expireTime; public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){ this.redisTemplate = redisTemplate; this.key = key; this.expireTime=expireTime; this.value = UUID.randomUUID().toString(); } /** * 获取分布式锁 * @return */ public boolean getLock(){ RedisCallback<Boolean> redisCallback = connection -> { //设置NX RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent(); //设置过期时间 Expiration expiration = Expiration.seconds(expireTime); //序列化key byte[] redisKey = redisTemplate.getKeySerializer().serialize(key); //序列化value byte[] redisValue = redisTemplate.getValueSerializer().serialize(value); //执行setnx操作 Boolean result = connection.set(redisKey, redisValue, expiration, setOption); return result; }; //获取分布式锁 Boolean lock = (Boolean)redisTemplate.execute(redisCallback); return lock; } /** * 释放分布式锁 * @return */ public boolean unLock() { String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class); List<String> keys = Arrays.asList(key); Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value); log.info("释放锁的结果:"+result); return result; } /** * 实现了AutoCloseable,实现自动释放锁,避免忘记释放锁 * @throws Exception */ @Override public void close() throws Exception { unLock(); } }
使用RedisLock
@RestController @Slf4j public class RedisLockController { @Autowired private RedisTemplate redisTemplate; @RequestMapping("redisLock") public String redisLock(){ log.info("我进入了方法!"); // redisKey是业务key,可以自定义 try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){ if (redisLock.getLock()) { log.info("我进入了锁!!"); // 这里可以写具体并发业务 Thread.sleep(15000); } } catch (Exception e) { e.printStackTrace(); } log.info("方法执行完成"); return "方法执行完成"; } }
使用RedisLock就可以用@Scheduled实现分布式定时任务(启动类上要加@EnableScheduling)
@Service @Slf4j public class SchedulerService { @Autowired private RedisTemplate redisTemplate; @Scheduled(cron = "0/5 * * * * ?") public void sendSms(){ // autoSms 是业务key, 可以自定义 try(RedisLock redisLock = new RedisLock(redisTemplate,"autoSms",30)) { if (redisLock.getLock()){ log.info("向138xxxxxxxx发送短信!"); } } catch (Exception e) { e.printStackTrace(); } } }
上面同样有锁失效的问题,所以同样有Redis aop锁,具体实现如下:
@Target({ElementType.PARAMETER,ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface ServiceRedisLock { String description() default ""; }
@Component @Scope @Aspect @Order(1) // 表示在事务之前执行 public class LockRedisAspect { @Pointcut("@annotation(com.sugo.seckill.aop.redis.ServiceRedisLock)") public void lockAspect(){ } // 增强方法 @Around("lockAspect()") public Object around(ProceedingJoinPoint joinPoint){ Object obj = null; // 加锁 boolean res = RedissLockUtil.tryLock(Constants.DISTRIBUTED_REDIS_LOCK_KEY, TimeUnit.SECONDS, 3, 10); // 执行业务 try { if(res){ obj = joinPoint.proceed(); } } catch (Throwable throwable) { throwable.printStackTrace(); } finally { // 释放锁 if(res){ RedissLockUtil.unlock(Constants.DISTRIBUTED_REDIS_LOCK_KEY); } } return obj; } }
public class RedissLockUtil { private static RedissonClient redissonClient; public void setRedissonClient(RedissonClient locker) { redissonClient = locker; } /** * 加锁 * @param lockKey * @return */ public static RLock lock(String lockKey) { RLock lock = redissonClient.getLock(lockKey); lock.lock(); return lock; } /** * 释放锁 * @param lockKey */ public static void unlock(String lockKey) { RLock lock = redissonClient.getLock(lockKey); lock.unlock(); } /** * 释放锁 * @param lock */ public static void unlock(RLock lock) { lock.unlock(); } /** * 带超时的锁 * @param lockKey * @param timeout 超时时间 单位:秒 */ public static RLock lock(String lockKey, int timeout) { RLock lock = redissonClient.getLock(lockKey); lock.lock(timeout, TimeUnit.SECONDS); return lock; } /** * 带超时的锁 * @param lockKey * @param unit 时间单位 * @param timeout 超时时间 */ public static RLock lock(String lockKey, TimeUnit unit ,int timeout) { RLock lock = redissonClient.getLock(lockKey); lock.lock(timeout, unit); return lock; } /** * 尝试获取锁 * @param lockKey * @param waitTime 最多等待时间 * @param leaseTime 上锁后自动释放锁时间 * @return */ public static boolean tryLock(String lockKey, int waitTime, int leaseTime) { RLock lock = redissonClient.getLock(lockKey); try { return lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS); } catch (InterruptedException e) { return false; } } /** * 尝试获取锁 * @param lockKey * @param unit 时间单位 * @param waitTime 最多等待时间 * @param leaseTime 上锁后自动释放锁时间 * @return */ public static boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) { RLock lock = redissonClient.getLock(lockKey); try { return lock.tryLock(waitTime, leaseTime, unit); } catch (InterruptedException e) { return false; } } /** * 初始红包数量 * @param key * @param count */ public void initCount(String key,int count) { RMapCache<String, Integer> mapCache = redissonClient.getMapCache("skill"); mapCache.putIfAbsent(key,count,3,TimeUnit.DAYS); } /** * 递增 * @param key * @param delta 要增加几(大于0) * @return */ public int incr(String key, int delta) { RMapCache<String, Integer> mapCache = redissonClient.getMapCache("skill"); if (delta < 0) { throw new RuntimeException("递增因子必须大于0"); } return mapCache.addAndGet(key, 1);//加1并获取计算后的值 } /** * 递减 * @param key 键 * @param delta 要减少几(小于0) * @return */ public int decr(String key, int delta) { RMapCache<String, Integer> mapCache = redissonClient.getMapCache("skill"); if (delta < 0) { throw new RuntimeException("递减因子必须大于0"); } return mapCache.addAndGet(key, -delta);//加1并获取计算后的值 } }
基于Zookeeper实现分布式锁
依赖
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.14</version> </dependency>
定义ZkLock
import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; @Slf4j public class ZkLock implements AutoCloseable, Watcher { private ZooKeeper zooKeeper; private String znode; public ZkLock() throws IOException { this.zooKeeper = new ZooKeeper("localhost:2181", 10000,this); } public boolean getLock(String businessCode) { try { //创建业务 根节点 Stat stat = zooKeeper.exists("/" + businessCode, false); if (stat==null){ zooKeeper.create("/" + businessCode,businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } //创建瞬时有序节点 /order/order_00000001 znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //获取业务节点下 所有的子节点 List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false); //子节点排序 Collections.sort(childrenNodes); //获取序号最小的(第一个)子节点 String firstNode = childrenNodes.get(0); //如果创建的节点是第一个子节点,则获得锁 if (znode.endsWith(firstNode)){ return true; } //不是第一个子节点,则监听前一个节点 String lastNode = firstNode; for (String node:childrenNodes){ if (znode.endsWith(node)){ zooKeeper.exists("/"+businessCode+"/"+lastNode,true); break; }else { lastNode = node; } } synchronized (this){ wait(); } return true; } catch (Exception e) { e.printStackTrace(); } return false; } @Override public void close() throws Exception { zooKeeper.delete(znode,-1); zooKeeper.close(); log.info("我已经释放了锁!"); } @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDeleted){ synchronized (this){ notify(); } } } }
使用ZkLock
@RestController @Slf4j public class ZookeeperController { @Autowired private CuratorFramework client; @RequestMapping("zkLock") public String zookeeperLock(){ log.info("我进入了方法!"); try (ZkLock zkLock = new ZkLock()) { if (zkLock.getLock("order")){ log.info("我获得了锁"); Thread.sleep(10000); } } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } log.info("方法执行完成!"); return "方法执行完成!"; } }
基于curator实现分布式锁
依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency>
启动类注入CuratorFramework
@Bean(initMethod="start",destroyMethod = "close") public CuratorFramework getCuratorFramework() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy); return client; }
使用
@RestController @Slf4j public class ZookeeperController { @Autowired private CuratorFramework client; @RequestMapping("curatorLock") public String curatorLock(){ log.info("我进入了方法!"); // /order表示业务路径 InterProcessMutex lock = new InterProcessMutex(client, "/order"); try{ if (lock.acquire(30, TimeUnit.SECONDS)){ log.info("我获得了锁!!"); Thread.sleep(10000); } } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); }finally { try { log.info("我释放了锁!!"); lock.release(); } catch (Exception e) { e.printStackTrace(); } } log.info("方法执行完成!"); return "方法执行完成!"; } }
基于Redisson实现分布式锁(SpringBoot项目推荐用法)
依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.17.2</version> </dependency>
配置redis
使用
@RestController @Slf4j public class RedissonLockController { @Autowired private RedissonClient redisson; @RequestMapping("redissonLock") public String redissonLock() { RLock rLock = redisson.getLock("order"); log.info("我进入了方法!!"); try { rLock.lock(30, TimeUnit.SECONDS); log.info("我获得了锁!!!"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); }finally { log.info("我释放了锁!!"); rLock.unlock(); } log.info("方法执行完成!!"); return "方法执行完成!!"; } }
基于Redisson实现分布式锁(通过xml配置)
依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.11.2</version> </dependency>
定义redisson.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:redisson="http://redisson.org/schema/redisson" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://redisson.org/schema/redisson http://redisson.org/schema/redisson/redisson.xsd "> <redisson:client> <!--单机模式--> <redisson:single-server address="redis://127.0.0.1:6379"/> </redisson:client> </beans>
启动类上添加@ImportResource(“classpath*:redisson.xml”)
@SpringBootApplication @ImportResource("classpath*:redisson.xml") public class RedissonLockApplication { public static void main(String[] args) { SpringApplication.run(RedissonLockApplication.class, args); } }
使用
@RestController @Slf4j public class RedissonLockController { @Autowired private RedissonClient redisson; @RequestMapping("redissonLock") public String redissonLock() { RLock rLock = redisson.getLock("order"); log.info("我进入了方法!!"); try { rLock.lock(30, TimeUnit.SECONDS); log.info("我获得了锁!!!"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); }finally { log.info("我释放了锁!!"); rLock.unlock(); } log.info("方法执行完成!!"); return "方法执行完成!!"; } }
多种分布式锁,我如何选择
推荐使用Redisson和Cutator实现分布式锁, 成熟,有大量的实践!