发布秒杀商品
设置库存和最多可以下单的人数到Redis中
/** * @param promoId 秒杀表中活动id */ @Override public void publishPromo(Integer promoId) { //通过活动id获取活动 PromoDO promoDO = promoDOMapper.selectByPrimaryKey(promoId); if(promoDO.getItemId() == null || promoDO.getItemId().intValue() == 0){ return; } //获取活动的商品信息 ItemModel itemModel = itemService.getItemById(promoDO.getItemId()); //将库存同步到redis内 redisTemplate.opsForValue().set("promo_item_stock_"+itemModel.getId(), itemModel.getStock()); //将大闸的限制数字(库存的5倍人可以下单成功,不是每个下单的人都支付)设到redis内 redisTemplate.opsForValue().set("promo_door_count_"+promoId,itemModel.getStock().intValue() * 5); }
流量的过滤
首先对过来的流量做一个验证码的过滤, 下面是生成验证码的逻辑
//生成验证码 @RequestMapping(value = "/generateverifycode",method = {RequestMethod.GET,RequestMethod.POST}) @ResponseBody public void generateverifycode(HttpServletResponse response) throws BusinessException, IOException { String token = httpServletRequest.getParameterMap().get("token")[0]; if(StringUtils.isEmpty(token)){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能生成验证码"); } UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token); if(userModel == null){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能生成验证码"); } Map<String,Object> map = CodeUtil.generateCodeAndPic(); redisTemplate.opsForValue().set("verify_code_"+userModel.getId(),map.get("code")); redisTemplate.expire("verify_code_"+userModel.getId(),10,TimeUnit.MINUTES); ImageIO.write((RenderedImage) map.get("codePic"), "jpeg", response.getOutputStream()); }
然后 秒杀需要获取秒杀令牌才能下单, 下面是获取秒杀令牌的逻辑,( 获取令牌需要验证码正确 )
//生成秒杀令牌 @RequestMapping(value = "/generatetoken",method = {RequestMethod.POST},consumes={CONTENT_TYPE_FORMED}) @ResponseBody public CommonReturnType generatetoken(@RequestParam(name="itemId")Integer itemId, @RequestParam(name="promoId")Integer promoId, @RequestParam(name="verifyCode")String verifyCode) throws BusinessException { //根据token获取用户信息 String token = httpServletRequest.getParameterMap().get("token")[0]; if(StringUtils.isEmpty(token)){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能下单"); } //获取用户的登陆信息 UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token); if(userModel == null){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能下单"); } //通过verifycode验证验证码的有效性 String redisVerifyCode = (String) redisTemplate.opsForValue().get("verify_code_"+userModel.getId()); if(StringUtils.isEmpty(redisVerifyCode)){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"请求非法"); } if(!redisVerifyCode.equalsIgnoreCase(verifyCode)){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"请求非法,验证码错误"); } //获取秒杀访问令牌 String promoToken = promoService.generateSecondKillToken(promoId,itemId,userModel.getId()); if(promoToken == null){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"生成令牌失败"); } //返回对应的结果 return CommonReturnType.create(promoToken); }
@Override public String generateSecondKillToken(Integer promoId,Integer itemId,Integer userId) { //判断是否库存已售罄,若对应的售罄key存在,则直接返回下单失败 if(redisTemplate.hasKey("promo_item_stock_invalid_"+itemId)){ return null; } PromoDO promoDO = promoDOMapper.selectByPrimaryKey(promoId); //dataobject->model PromoModel promoModel = convertFromDataObject(promoDO); if(promoModel == null){ return null; } //判断当前时间是否秒杀活动即将开始或正在进行 if(promoModel.getStartDate().isAfterNow()){ promoModel.setStatus(1); }else if(promoModel.getEndDate().isBeforeNow()){ promoModel.setStatus(3); }else{ promoModel.setStatus(2); } //判断活动是否正在进行 if(promoModel.getStatus().intValue() != 2){ return null; } //判断item信息是否存在 ItemModel itemModel = itemService.getItemByIdInCache(itemId); if(itemModel == null){ return null; } //判断用户信息是否存在 UserModel userModel = userService.getUserByIdInCache(userId); if(userModel == null){ return null; } //获取秒杀大闸的count数量, 发布秒杀的时候设置的是5倍库存量, 没发一个令牌减去1 long result = redisTemplate.opsForValue().increment("promo_door_count_"+promoId,-1); if(result < 0){ return null; } //生成token并且存入redis内并给一个5分钟的有效期 String token = UUID.randomUUID().toString().replace("-",""); redisTemplate.opsForValue().set("promo_token_"+promoId+"_userid_"+userId+"_itemid_"+itemId,token); redisTemplate.expire("promo_token_"+promoId+"_userid_"+userId+"_itemid_"+itemId,5, TimeUnit.MINUTES); return token; }
当你成功拿到秒杀令牌后,才能下单, 下面是下单逻辑, 下单的时候又加了一层RateLimiter 限流
@Controller("order") @RequestMapping("/order") @CrossOrigin(origins = {"*"},allowCredentials = "true") public class OrderController extends BaseController { @Autowired private HttpServletRequest httpServletRequest; @Autowired private RedisTemplate redisTemplate; @Autowired private MqProducer mqProducer; @Autowired private ItemService itemService; private ExecutorService executorService;// 线程池 private RateLimiter orderCreateRateLimiter;// 限流 @PostConstruct public void init(){ executorService = Executors.newFixedThreadPool(20); orderCreateRateLimiter = RateLimiter.create(300.0); } @RequestMapping(value = "/createorder",method = {RequestMethod.POST},consumes={CONTENT_TYPE_FORMED}) @ResponseBody public CommonReturnType createOrder(@RequestParam(name="itemId")Integer itemId, @RequestParam(name="amount")Integer amount, @RequestParam(name="promoId",required = false)Integer promoId, @RequestParam(name="promoToken",required = false)String promoToken) throws BusinessException { // 限流 boolean tryAcquire = orderCreateRateLimiter.tryAcquire(); if(!tryAcquire){ throw new BusinessException(EmBusinessError.RATELIMIT); } //校验登录 String token = httpServletRequest.getParameterMap().get("token")[0]; if(StringUtils.isEmpty(token)){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能下单"); } UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token); if(userModel == null){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能下单"); } //校验秒杀令牌是否正确 if(promoId != null){ String inRedisPromoToken = (String) redisTemplate.opsForValue().get("promo_token_"+promoId+"_userid_"+userModel.getId()+"_itemid_"+itemId); if(inRedisPromoToken == null){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"秒杀令牌校验失败"); } if(!org.apache.commons.lang3.StringUtils.equals(promoToken,inRedisPromoToken)){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"秒杀令牌校验失败"); } } //同步调用线程池的submit方法 //拥塞窗口为20的等待队列,用来队列化泄洪 Future<Object> future = executorService.submit(new Callable<Object>() { @Override public Object call() throws Exception { //加入库存流水init状态 String stockLogId = itemService.initStockLog(itemId,amount); //再去完成对应的下单事务型消息机制 if(!mqProducer.transactionAsyncReduceStock(userModel.getId(),itemId,promoId,amount,stockLogId)){ throw new BusinessException(EmBusinessError.UNKNOWN_ERROR,"下单失败"); } return null; } }); try { future.get(); } catch (InterruptedException e) { throw new BusinessException(EmBusinessError.UNKNOWN_ERROR); } catch (ExecutionException e) { throw new BusinessException(EmBusinessError.UNKNOWN_ERROR); } return CommonReturnType.create(null); } }
下单中用到了RocketMQ的事务型消息机制, 其中MqProducer 为:
@Component public class MqProducer { private DefaultMQProducer producer; private TransactionMQProducer transactionMQProducer; @Value("${mq.nameserver.addr}") private String nameAddr; @Value("${mq.topicname}") private String topicName; @Autowired private OrderService orderService; @Autowired private StockLogDOMapper stockLogDOMapper; @PostConstruct public void init() throws MQClientException { //做mq producer的初始化 producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr(nameAddr); producer.start(); transactionMQProducer = new TransactionMQProducer("transaction_producer_group"); transactionMQProducer.setNamesrvAddr(nameAddr); transactionMQProducer.start(); transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { //真正要做的事 创建订单 Integer itemId = (Integer) ((Map)arg).get("itemId"); Integer promoId = (Integer) ((Map)arg).get("promoId"); Integer userId = (Integer) ((Map)arg).get("userId"); Integer amount = (Integer) ((Map)arg).get("amount"); String stockLogId = (String) ((Map)arg).get("stockLogId"); try { orderService.createOrder(userId,itemId,promoId,amount,stockLogId); } catch (BusinessException e) { e.printStackTrace(); //设置对应的stockLog为回滚状态 StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId); stockLogDO.setStatus(3); stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } // 上面的orderService.createOrder 创建订单如果失败就会调这个方法回滚 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { //根据是否扣减库存成功,来判断要返回COMMIT,ROLLBACK还是继续UNKNOWN String jsonString = new String(msg.getBody()); Map<String,Object>map = JSON.parseObject(jsonString, Map.class); Integer itemId = (Integer) map.get("itemId"); Integer amount = (Integer) map.get("amount"); String stockLogId = (String) map.get("stockLogId"); StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId); if(stockLogDO == null){ return LocalTransactionState.UNKNOW; } if(stockLogDO.getStatus().intValue() == 2){ return LocalTransactionState.COMMIT_MESSAGE; }else if(stockLogDO.getStatus().intValue() == 1){ return LocalTransactionState.UNKNOW; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); } //事务型同步库存扣减消息 public boolean transactionAsyncReduceStock(Integer userId,Integer itemId,Integer promoId,Integer amount,String stockLogId){ Map<String,Object> bodyMap = new HashMap<>(); bodyMap.put("itemId",itemId); bodyMap.put("amount",amount); bodyMap.put("stockLogId",stockLogId); Map<String,Object> argsMap = new HashMap<>(); argsMap.put("itemId",itemId); argsMap.put("amount",amount); argsMap.put("userId",userId); argsMap.put("promoId",promoId); argsMap.put("stockLogId",stockLogId); Message message = new Message(topicName,"increase", JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8"))); TransactionSendResult sendResult = null; try { // 这里会调上面init()中的executeLocalTransaction(Message msg, Object arg) sendResult = transactionMQProducer.sendMessageInTransaction(message,argsMap); } catch (MQClientException e) { e.printStackTrace(); return false; } if(sendResult.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE){ return false; }else if(sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE){ return true; }else{ return false; } } }
生产者发送的消息, 需要消费者来消费, 下面是消费者MqConsumer
@Component public class MqConsumer { private DefaultMQPushConsumer consumer; @Value("${mq.nameserver.addr}") private String nameAddr; @Value("${mq.topicname}") private String topicName; @Autowired private ItemStockDOMapper itemStockDOMapper; @PostConstruct public void init() throws MQClientException { consumer = new DefaultMQPushConsumer("stock_consumer_group"); consumer.setNamesrvAddr(nameAddr); consumer.subscribe(topicName,"*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //实现库存真正到数据库内扣减的逻辑 Message msg = msgs.get(0); String jsonString = new String(msg.getBody()); Map<String,Object>map = JSON.parseObject(jsonString, Map.class); Integer itemId = (Integer) map.get("itemId"); Integer amount = (Integer) map.get("amount"); itemStockDOMapper.decreaseStock(itemId,amount); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
上面的生产者和消费者需要配置rocketMQ的addr和topicname, 具体如下:
mq.nameserver.addr=192.168.174.128:9876 mq.topicname=stock