大技术
线程池来实现异步任务(亮点1)
/** * 去结算确认页时封装订单确认页返回需要用的数据 * @return * @throws ExecutionException * @throws InterruptedException */ @Override public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException { //构建OrderConfirmVo OrderConfirmVo confirmVo = new OrderConfirmVo(); //获取当前用户登录的信息(直接获取) MemberResponseVo memberResponseVo = LoginUserInterceptor.loginUser.get(); //获取当前线程请求头信息(用于解决Feign异步调用丢失上下文的问题) RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); /**开启第一个异步任务来远程查询所有的收获地址列表**/ CompletableFuture<Void> addressFuture = CompletableFuture.runAsync(() -> { //每一个线程都来共享之前的请求数据(用于解决Feign异步调用丢失上下文的问题) RequestContextHolder.setRequestAttributes(requestAttributes); //1、远程查询所有的收获地址列表 List<MemberAddressVo> address = memberFeignService.getAddress(memberResponseVo.getId()); confirmVo.setMemberAddressVos(address); }, threadPoolExecutor); /**开启第二个异步任务来远程查询购物车所有选中的购物项**/ CompletableFuture<Void> cartInfoFuture = CompletableFuture.runAsync(() -> { //每一个线程都来共享之前的请求数据(用于解决Feign异步调用丢失上下文的问题) RequestContextHolder.setRequestAttributes(requestAttributes); //2、远程查询购物车所有选中的购物项 List<OrderItemVo> currentCartItems = cartFeignService.getCurrentCartItems(); confirmVo.setItems(currentCartItems); //feign在远程调用之前要构造请求,调用很多的拦截器 }, threadPoolExecutor).thenRunAsync(() -> { /** 开启第三个异步任务来远程批量查询所有商品的库存是否有货**/ List<OrderItemVo> items = confirmVo.getItems(); //获取全部商品的id List<Long> skuIds = items.stream().map((itemVo -> itemVo.getSkuId())).collect(Collectors.toList()); //远程查询商品库存信息 R skuHasStock = wmsFeignService.getSkuHasStock(skuIds); //SkuStockVo就是下面的SkuHasStockVo类 List<SkuStockVo> skuStockVos = skuHasStock.getData("data", new TypeReference<List<SkuStockVo>>() {}); if (skuStockVos != null && skuStockVos.size() > 0) { //将skuStockVos集合转换为map Map<Long, Boolean> skuHasStockMap = skuStockVos.stream().collect(Collectors.toMap(SkuStockVo::getSkuId, SkuStockVo::getHasStock)); confirmVo.setStocks(skuHasStockMap); } },threadPoolExecutor); //3、查询用户积分 Integer integration = memberResponseVo.getIntegration(); confirmVo.setIntegration(integration); //4、价格数据由OrderConfirmVo的getTotal方法自动计算 //TODO 5、防重令牌(防止表单重复提交) //为用户设置一个token,三十分钟过期时间(存在redis) String token = UUID.randomUUID().toString().replace("-", ""); //防重令牌一个放到redis里 USER_ORDER_TOKEN_PREFIX = "order:token" redisTemplate.opsForValue().set(USER_ORDER_TOKEN_PREFIX+memberResponseVo.getId(),token,30, TimeUnit.MINUTES); //防重令牌一个放到前台页面(不过隐藏了) confirmVo.setOrderToken(token); //阻塞异步线程,只有两个异步都完成了才可以进行下一步 CompletableFuture.allOf(addressFuture,cartInfoFuture).get(); return confirmVo; }
@Autowired
private ThreadPoolExecutor executor; //线程池
CompletableFuture<Void> addressFuture = CompletableFuture.runAsync(() -> {
/**开启第一个异步任务来远程查询所有的收获地址列表**/
}, executor);
CompletableFuture<Void> cartInfoFuture = CompletableFuture.runAsync(() -> {
/**开启第二个异步任务来远程查询购物车所有选中的购物项**/
}, executor).thenRunAsync(() -> {
/** 开启第三个异步任务来远程批量查询所有商品的库存是否有货**/
},executor);
//阻塞异步线程,只有两个异步都完成了才可以进行下一步
CompletableFuture.allOf(addressFuture,cartInfoFuture).get();
注意:addressFuture和cartInfoFuture是两个同时执行的异步任务,而cartInfoFuture里面又开了一个thenRunAsync异步任务,这个异步任务是必须在cartInfoFuture执行完后才可以执行,有先后顺序
防重令牌存到Redis中并且通过lure脚本保证令牌的原子性(亮点2)
防重令牌的格式为key:order:token:+用户id,value:防重令牌的值
//防重令牌(防止表单重复提交) //为用户设置一个token,三十分钟过期时间(存在redis) String token = UUID.randomUUID().toString().replace("-", ""); //防重令牌一个放到redis里 USER_ORDER_TOKEN_PREFIX = "order:token" redisTemplate.opsForValue().set(USER_ORDER_TOKEN_PREFIX+memberResponseVo.getId(),token,30, TimeUnit.MINUTES);
在点击去结算按钮后生成一个防重令牌,然后放到redis中和当到订单页面(隐藏起来,用户看不到),这个防重令牌是为了防止用户因为网络延迟等原因连续多次点击提交订单按钮,如果不做这个功能那就会生成多个一样的订单
/**1、验证令牌是否合法【令牌的对比和删除必须保证原子性】**/ String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; String orderToken = vo.getOrderToken(); //通过lure脚本原子验证令牌和删除令牌(执行脚本后返回long类型的0或1,0表示令牌验证失败,1表示成功) //Arrays.asList(USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId())表示存到redis里的值 // orderToken是页面传过来的令牌(表示当前需要验证的值) Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId()), orderToken); if (result == 0L) { //令牌验证失败 } else { //令牌验证成功
这里的防重令牌删除的时候需要考虑下面的问题
先执行业务后删除令牌
先删除令牌后执行业务
如果是先执行业务的话那是不行的,因为在并发的情况下,会有多个请求同时来校验令牌,然后这个令牌还没有删除,那么多个请求就都执行了业务代码,这不合理
如果先删除令牌在执行业务更合理一些,但是也会有问题,加入有两个请求同时校验令牌,同时删除令牌,同时执行业务代码,那还是有问题,所以要保证令牌的校验和删除是原子性操作,也就是只能有一个线程来执行令牌的校验和删除,其他线程只能等待,这样就可以解决上面的问题,这也是为什么要用lure脚本来保证令牌的校验和删除是原子性的原因
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";这句是lure脚本用来保证在redis中校验和删除令牌的原子性
含义:如果在redis中get的第一个key和argv值(也就是要和redis中比较的值)相同那就删除key,删除key成功返回1,如果不相同就返回0
使用RabbitMQ来解库存的分布式事务问题
问题就是提交订单submitOrder这个方法的所有代码实现的业务都要保持原子性,如果出现任何异常都要进行数据回滚,例如这个方法中是先保存订单和订单项的信息到数据库对应的表格中,然后再调用远程服务去锁库存,如果锁库存成功后再执行下面的代码出现异常或者电脑嗝屁了这类情况那前面保存好的订单和订单项就应该数据回滚(也就是删除刚才录入的数据),同时库存服务也应该解库存。一般情况下只需要在submitOrder方法上加一个@Transactional注解就可以了,但是这个注解只能回滚本地的服务,不能回滚库存服务中的锁库存(也就是这个注解可以实现出现异常后删除刚才录入数据库的订单和订单项的数据,但是没办法把锁库存的数量给改回原来的数量,因为锁库存是库存服务的,不是订单服务的,所以没法靠@Transactional注解来实现回滚)
解决办法
这里采用的是通过RabbitMQ的延时队列来实现保持数据的最终一致性,也就是保持数据库的数据最后的一致性,而不是像@Transactional注解一样立马就回滚,立马保持数据的一致性(这里可以理解成@Transactional 是出现异常后飞快的把数据恢复到原来的样子,而保持数据的最终一致性是过了一段时间后才恢复到原来的样子)
那RabbitMQ的延时队列是怎样来实现数据的最终一致性的呢?
简单说下,其实就是用户下订单后就给延时队列发送消息,如果这个订单的状态在指定的时间过后还是待付款就自动取消这个订单,取消了这个订单后就立马发消息给队列去解库存,可以看出这里就不在乎你到底是异常导致的没支付成功还是用户没付款导致的,再或者是电脑嗝屁了,无论是哪种情况都不管,我只管这个订单有没有在指定的时间内把订单的状态改成已支付状态,只要是过了指定的时间订单的状态还是待付款的话那我就给你回滚 数据,不但之前已经存到数据库的订单和订单项数据我都给你删了,而且库存也给你解了,就是这么霸气!从而这样来保持数据库的最终一致性!
图解:用户点击提交订单按钮后会创建一个订单,把这个订单作为一个消息发给topic交换机order-event-exchange,交换机根据路由键order.create.order路由到延时队列order.delay.queue,然后消息在延时队列里等待指定的时间,当时间过期后把这个消息通过交换机order-event-exchange识别他的新的路由键order.release.order路由到新的队列order.release.order.queue(取消订单的队列),这个队列是用于处理用户提交订单但是没有支付订单的情况,然后订单就会被自动取消,但是这里要考虑一个情况(这个特殊情况是需要额外的处理的),那就是取消订单操作因为网络卡顿导致解库存操作先执行的话,这样就导致导致卡顿的订单永远都不能解锁库存,所以订单取消后立马发消息给order-event-exchange交换机,交换机把这个消息通过路由键order.release.other发到stock.release.stock.queue队列(解库存的队列),这个队列其中有个监听方法就是来监听这个消息的,只要监听到这个消息就会立马执行解库存,还有一种就是正常解库存的操作,当订单成功创建后就进行锁库存,锁库存成功后就会发消息给stock-event-exchange交换机,交换机根据路由键stock.locked把消息路由到stock.delay.queue延时队列,时间过期后就把消息根据路由键stock.release路由到stock.release.stock.queue队列(解库存的队列),然后这个消息队列的消息是被一个专门解库存的监听器来监听(注意这里有两种解库存的监听方法,一个是自动解库存的监听,一个是订单服务的订单取消后立马解库存的监听)
具体实现
1、在MyRabbitMQConfig配置类中创建队列、交换机、队列和交换机的绑定关系
订单服务和库存服务都是创建一个主题交换机,两个队列,一个队列是用来存放消息的,另外一个队列是用来存放死信的(也就是死了的消息,这里并没有直接处理掉,而是放到这个队列里)
整体思路
首先生产者发送一个消息给topic交换机order-event-exchange,交换机根据路由键order.create.order路由到延时队列order.delay.queue,然后消息在延时队列里等待指定的时间,当时间过期后还没有被消费就会被当成死信,然后把这个消息通过交换机order-event-exchange识别他的新的路由键order.release.order路由到新的队列order.release.order.queue
package com.saodai.saodaimall.order.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; /** * RabbitMQ配置类 * 整体思路 * 首先生产者发送一个消息给topic交换机order-event-exchange,交换机根据路由键order.create.order路由到延时队列order.delay.queue * 然后消息在延时队列里等待指定的时间,当时间过期后还没有被消费就会被当成死信,然后把这个消息通过交换机order-event-exchange识别他的新的 * 路由键order.release.order路由到新的队列order.release.order.queue * * 这里只有一个交换机,两个队列,一个队列是用来存放消息的,另外一个队列是用来存放死信的(也就是死了的消息,这里并没有直接处理掉,而是放到这个队列里) **/ @Configuration public class MyRabbitMQConfig { /** * *创建延时队列 *延时队列是通过参数来设置的 * arguments.put("x-dead-letter-exchange", "order-event-exchange");前面的固定的前缀,表示这个队列延时后的消息 * @return */ @Bean public Queue orderDelayQueue() { //用map构造参数 HashMap<String, Object> arguments = new HashMap<>(); //指定延时后的消息的交换机(x-dead-letter-exchange是固定的前缀,order-event-exchange是自定义的交换机) arguments.put("x-dead-letter-exchange", "order-event-exchange"); //死信路由(也就是消息如果超时了就会被当作死信,然后通过路由键order.release.order路由到指定的消息队列) arguments.put("x-dead-letter-routing-key", "order.release.order"); //设置消息过期时间 arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟 /* Queue(String name, 队列名字 boolean durable, 是否持久化 boolean exclusive, 是否排他 boolean autoDelete, 是否自动删除 Map<String, Object> arguments) 参数 */ Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; } /** * 死信队列(也就是到了这个队列的都是要死的消息) * * @return */ @Bean public Queue orderReleaseQueue() { Queue queue = new Queue("order.release.order.queue", true, false, false); return queue; } /** * TopicExchange *创建主题类型的交换机 * @return */ @Bean public Exchange orderEventExchange() { /* * String name, * boolean durable, * boolean autoDelete, * Map<String, Object> arguments * */ return new TopicExchange("order-event-exchange", true, false); } /** * 创建交换机和队列的捆绑关系(延时队列捆绑) * @return */ @Bean public Binding orderCreateBinding() { /* * String destination, 目的地(队列名或者交换机名字) * DestinationType destinationType, 目的地类型(Queue、Exhcange) * String exchange, * String routingKey, * Map<String, Object> arguments * */ return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null); } /** * 创建交换机和队列的捆绑关系(死信队列捆绑) * @return */ @Bean public Binding orderReleaseBinding() { return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); } /** * 订单释放直接和库存释放进行绑定 * @return */ @Bean public Binding orderReleaseOtherBinding() { return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null); } /** * 商品秒杀队列 * @return */ @Bean public Queue orderSecKillOrrderQueue() { Queue queue = new Queue("order.seckill.order.queue", true, false, false); return queue; } @Bean public Binding orderSecKillOrrderQueueBinding() { //String destination, DestinationType destinationType, String exchange, String routingKey, // Map<String, Object> arguments Binding binding = new Binding( "order.seckill.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.seckill.order", null); return binding; } } /** * *创建延时队列 * @return */ @Bean public Queue orderDelayQueue() { //用map构造参数 HashMap<String, Object> arguments = new HashMap<>(); //指定延时后的消息的交换机(x-dead-letter-exchange是固定的前缀,order-event-exchange是自定义的交换机) arguments.put("x-dead-letter-exchange", "order-event-exchange"); //死信路由(也就是消息如果超时了就会被当作死信,然后通过路由键order.release.order路由到指定的消息队列) arguments.put("x-dead-letter-routing-key", "order.release.order"); //设置消息过期时间 arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟 /* Queue(String name, 队列名字 boolean durable, 是否持久化 boolean exclusive, 是否排他 boolean autoDelete, 是否自动删除 Map<String, Object> arguments) 参数 */ Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; }
创建特殊的延时队列只需要传入一个map类型的参数进去就可以让普通队列成为一个延时队列
//指定延时后的消息的交换机(x-dead-letter-exchange是固定的前缀,order-event-exchange是自定义的交换机) arguments.put("x-dead-letter-exchange", "order-event-exchange"); //死信路由(也就是消息如果超时了就会被当作死信,然后通过路由键order.release.order路由到指定的消息队列) arguments.put("x-dead-letter-routing-key", "order.release.order"); //设置消息过期时间 arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
x-dead-letter-exchange这个key是rabbitMQ封装好了的固定的前缀,表示这个队列延时后的消息指定使用order-event-exchange这个交换机,这个交换机来把延时的消息进行传输,x-dead-letter-routing-key也是rabbitMQ封装好了的固定的前缀,表示这个队列延时后的消息使用的新路由键是order.release.order,x-message-ttl也是rabbitMQ封装好了的固定的前缀,表示设置延时队列的延时时间是多少,也就是上面的三个key值都是封装好的固定前缀,后面的值才是自定义的
package com.saodai.saodaimall.ware.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; /** * RabbitMQ配置类(一个交换机,两个队列,两个绑定,跟订单服务的基本一样,详细介绍看订单服务的队列) * */ @Configuration public class MyRabbitMQConfig { /** * 使用JSON序列化机制,进行消息转换 * @return */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } /** * RabbitMQ要第一次连接上发现没有队列或者交换机才会创建,所以如果没有下面的代码运行会发现官网中并没有创建交换机和队列 * 下面代码就是模拟监听,这样就可以连接上RabbitMQ,然后可以创建交换机和队列 * 但是后面要注释掉(自动解锁库存时这里也会监听队列导致多一个消费者,所以要注释掉) */ // @RabbitListener(queues = "stock.release.stock.queue") // public void handle(Message message) { // // } /** * 库存服务默认的交换机 * @return */ @Bean public Exchange stockEventExchange() { //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false); return topicExchange; } /** * 普通队列 * @return */ @Bean public Queue stockReleaseStockQueue() { //String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments Queue queue = new Queue("stock.release.stock.queue", true, false, false); return queue; } /** * 延迟队列 * @return */ @Bean public Queue stockDelay() { HashMap<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "stock-event-exchange"); arguments.put("x-dead-letter-routing-key", "stock.release"); // 消息过期时间 2分钟 arguments.put("x-message-ttl", 120000); Queue queue = new Queue("stock.delay.queue", true, false, false,arguments); return queue; } /** * 交换机与普通队列绑定 * @return */ @Bean public Binding stockLocked() { //String destination, DestinationType destinationType, String exchange, String routingKey, // Map<String, Object> arguments Binding binding = new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null); return binding; } /** * 交换机与延迟队列绑定 * @return */ @Bean public Binding stockLockedBinding() { return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null); } }
2、理解订单服务和库存服务的RabbitMQ队列的图
(1)订单服务使用RabbitMQ的整个过程:
1、订单创建成功后发送消息给topic主题交换机order-event-exchange,交换机根据order.create.order路由键把消息路由到order.delay.queue延时队列(订单创建成功是指OrderServiceImpl类中的submitOrder方法执行成功后给order.delay.queue队列发送消息)
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder());
2、消息在order.delay.queue延时队列里等待指定的时间,当时间过期后还没有被消费就会被当成死信,然后把这个消息通过order-event-exchange交换机的新的路由键order.release.order路由到order.release.order.queue队列(过期后的路由键和交换机设置是由订单服务的MyRabbitMQConfig配置的)
注意:这里order.delay.queue队列只是作为延时队列来使用的(正常情况是会有队列的监听器来监听这个队列的消息然后消费掉,但是在这个场景中是没有消费者来消费这个队列的消息的,因为这个队列只需要延时就可以了,并不需要消费者,这个队列的消息等待指定的时间后就会被送到order.release.order.queue队列里,从而达到延时队列的效果)
//交换机(x-dead-letter-exchange是固定的,order-event-exchange是自定义的交换机) arguments.put("x-dead-letter-exchange", "order-event-exchange"); //死信路由(也就是消息如果超时了就会被当作死信,然后通过路由键order.release.order路由到指定的消息队列) arguments.put("x-dead-letter-routing-key", "order.release.order"); //设置消息过期时间 arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
3、设置一个监听器用来消费order.release.order.queue队列的消息(这个队列的消息都是已经超时了的消息,也就是模拟用户生成订单后没有支付的订单,所以要写个监听器来取消之前生成的订单)
package com.saodai.saodaimall.order.listener; import com.rabbitmq.client.Channel; import com.saodai.saodaimall.order.entity.OrderEntity; import com.saodai.saodaimall.order.service.OrderService; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; /** * 订单监听器,监听的是队列order.release.order.queue(定时关闭订单) * 但凡被这个监听器监听到的消息都是过期的死信 **/ @RabbitListener(queues = "order.release.order.queue") @Service public class OrderCloseListener { @Autowired private OrderService orderService; @RabbitHandler public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException { System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn()); try { //关闭订单 orderService.closeOrder(orderEntity); //消费者的手动ack确认这条消息被成功消费了 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } } } /** * 关闭订单(这个方法由OrderCloseListener监听器调用) * 这个方法被调用说明这个订单已经过了指定的时间还没有付款 * 所谓的关闭订单其实就是修改订单的状态,修改成已取消就行了 * @param orderEntity 前面生成订单时发送给RabbitMQ队列的消息orderEntity */ @Override public void closeOrder(OrderEntity orderEntity) { //关闭订单之前先查询一下数据库,判断此订单状态是否已支付 OrderEntity orderInfo = this.getOne(new QueryWrapper<OrderEntity>(). eq("order_sn",orderEntity.getOrderSn())); // CREATE_NEW(0,"待付款")(说明这个订单已经过了指定的时间还没有付款) if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) { //如果是待付款状态就可以进行关单 OrderEntity orderUpdate = new OrderEntity(); orderUpdate.setId(orderInfo.getId()); //把待付款修改成已取消的状态即可CANCLED(4,"已取消") orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode()); this.updateById(orderUpdate); /** 这里要考虑一个情况(这个特殊情况是需要下面的额外处理) * 防止订单服务卡顿,导致订单状态消息一直改不了,也就是上面的代码因为卡顿导致没有执行 解库存服务先执行,查订单状态发现不是取消状态,然后什么都不处理 * 导致卡顿的订单,永远都不能解锁库存 * 所以订单释放直接和库存释放进行绑定 */ // 发送消息给MQ OrderTo orderTo = new OrderTo(); BeanUtils.copyProperties(orderInfo, orderTo); try { //订单释放直接和库存释放进行绑定 /** * 订单取消后立马发消息给交换机,交换机把这个消息通过路由键order.release.other发到队列stock.release.stock.queue * 这个路由设置是由MyRabbitMQConfig中的orderReleaseOtherBinding方法进行绑定的 */ rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo); } catch (Exception e) { //TODO 定期扫描数据库,重新发送失败的消息 } } }
关闭订单之前先查询一下数据库,判断此订单状态是否已支付
关闭订单其实就是修改订单的状态,修改成已取消
这里要考虑一个情况(这个特殊情况是需要额外的处理的)
按理来说是订单服务的取消订单操作是在解库存操作的前面的,也就是一般先会取消订单操作后再去解库存操作,但是如果取消订单操作因为网络卡顿导致解库存操作先执行的话就会出现下面的情况:
解库存的实现逻辑又是先来看看订单的状态是不是已取消,如果是已取消才会去解库存,否则就不会执行解库存操作了,上面的情况就会出现解库存操作来看订单状态的时候发现订单状态是待支付,不是已取消状态, 所以就不执行解库存操作,由于解库存操作只会来查看一次,所以就会导致卡顿的订单,永远都不能解锁库存
解决办法:订单取消后立马发消息给order-event-exchange交换机,交换机把这个消息通过路由键order.release.other发到stock.release.stock.queue队列,这个队列其中有个监听方法就是来监听这个消息的,只要监听到这个消息就会立马执行解库存
rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
package com.saodai.common.to; import lombok.Data; import java.math.BigDecimal; import java.util.Date; /** *订单类 */ @Data public class OrderTo { private Long id; /** * member_id */ private Long memberId; /** * 订单号 */ private String orderSn; /** * 使用的优惠券 */ private Long couponId; /** * create_time */ private Date createTime; /** * 用户名 */ private String memberUsername; /** * 订单总额 */ private BigDecimal totalAmount; /** * 应付总额 */ private BigDecimal payAmount; /** * 运费金额 */ private BigDecimal freightAmount; /** * 促销优化金额(促销价、满减、阶梯价) */ private BigDecimal promotionAmount; /** * 积分抵扣金额 */ private BigDecimal integrationAmount; /** * 优惠券抵扣金额 */ private BigDecimal couponAmount; /** * 后台调整订单使用的折扣金额 */ private BigDecimal discountAmount; /** * 支付方式【1->支付宝;2->微信;3->银联; 4->货到付款;】 */ private Integer payType; /** * 订单来源[0->PC订单;1->app订单] */ private Integer sourceType; /** * 订单状态【0->待付款;1->待发货;2->已发货;3->已完成;4->已关闭;5->无效订单】 */ private Integer status; /** * 物流公司(配送方式) */ private String deliveryCompany; /** * 物流单号 */ private String deliverySn; /** * 自动确认时间(天) */ private Integer autoConfirmDay; /** * 可以获得的积分 */ private Integer integration; /** * 可以获得的成长值 */ private Integer growth; /** * 发票类型[0->不开发票;1->电子发票;2->纸质发票] */ private Integer billType; /** * 发票抬头 */ private String billHeader; /** * 发票内容 */ private String billContent; /** * 收票人电话 */ private String billReceiverPhone; /** * 收票人邮箱 */ private String billReceiverEmail; /** * 收货人姓名 */ private String receiverName; /** * 收货人电话 */ private String receiverPhone; /** * 收货人邮编 */ private String receiverPostCode; /** * 省份/直辖市 */ private String receiverProvince; /** * 城市 */ private String receiverCity; /** * 区 */ private String receiverRegion; /** * 详细地址 */ private String receiverDetailAddress; /** * 订单备注 */ private String note; /** * 确认收货状态[0->未确认;1->已确认] */ private Integer confirmStatus; /** * 删除状态【0->未删除;1->已删除】 */ private Integer deleteStatus; /** * 下单时使用的积分 */ private Integer useIntegration; /** * 支付时间 */ private Date paymentTime; /** * 发货时间 */ private Date deliveryTime; /** * 确认收货时间 */ private Date receiveTime; /** * 评价时间 */ private Date commentTime; /** * 修改时间 */ private Date modifyTime; }
(2)库存服务使用RabbitMQ的整个过程
锁库存成功后就会发消息给stock-event-exchange交换机,交换机根据路由键stock.locked把消息路由到stock.delay.queue延时队列(跟上面一样,这个延时队列的消息不会被消费掉),时间过期后就把消息根据路由键stock.release路由到stock.release.stock.queue队列,然后这个消息队列的消息是被一个专门解库存的监听器来监听(注意这里有两种解库存的监听方法,一个是自动解库存的监听,一个是订单服务的订单取消后立马解库存的监听)