正文
一、分布式事务
请参考之前的文章
二、思路原理
当派单系统派单成功之后,订单系统报错,此时将会产生分布式事务的问题,派单数据生成,但此时订单数据异常事务回滚,就发生了分布式事务问题。此时解决分布式事务,生成一个订单的消费者,专门去消费生成订单异常时的一个程序,我们称之为补单系统。
三、代码
订单派单
package com.xiaojie.service.impl; import com.alibaba.fastjson.JSONObject; import com.xiaojie.entity.Order; import com.xiaojie.mapper.OrderMapper; import com.xiaojie.service.OrderService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.UUID; /** * @author xiaojie * @version 1.0 * @description: * @date 2021/10/11 22:09 */ @Service @Slf4j public class OrderServiceImpl implements OrderService, RabbitTemplate.ConfirmCallback { //定义交换机 private static final String XIAOJIE_ORDER_EXCHANGE = "xiaojie_order_exchange"; @Autowired private OrderMapper orderMapper; @Autowired private RabbitTemplate rabbitTemplate; @Override @Transactional public String saveOrder() { Order order = new Order(); String orderId = UUID.randomUUID().toString(); order.setOrderId(orderId); order.setOrderName("小谷姐姐麻辣烫"); order.setPayMoney(35.68); order.setStatus(1);//假设订单支付完成 int result = orderMapper.addOrder(order); if (result < 0) { return "下单失败"; } //发送派单 String orderJson = JSONObject.toJSONString(order); sendDispatchMsg(orderJson); //模拟报错 int i = 1 / 0; return orderId; } @Async public void sendDispatchMsg(String jsonMSg) { // 设置生产者消息确认机制 this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); CorrelationData correlationData = new CorrelationData(); correlationData.setId(jsonMSg); //将订单数据发送 rabbitTemplate.convertAndSend(XIAOJIE_ORDER_EXCHANGE, "", jsonMSg, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean ack, String s) { if (ack) { log.info(">>>>>>>>消息发送成功:correlationData:{},ack:{},s:{}", correlationData, ack, s); } else { log.info(">>>>>>>消息发送失败{}", ack); } } }
补单系统消费端
package com.xiaojie.consumer; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import com.xiaojie.entity.Order; import com.xiaojie.mapper.OrderMapper; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author xiaojie * @version 1.0 * @description: 补单消费者 * @date 2021/10/11 22:37 */ @Component public class OrderConsumer { @Autowired private OrderMapper orderMapper; @RabbitListener(queues = {"xiaojie_order_queue"}) /** * @description: 补单消费者,补偿分布式事务解决框架 数据最终一致性 * @param: * @param: message * @param: channel * @return: void * @author xiaojie * @date: 2021/10/11 22:41 */ public void compensateOrder(Message message, Channel channel) throws IOException { // 1.获取消息 String msg = new String(message.getBody()); // 2.获取order对象 Order orderEntity = JSONObject.parseObject(msg, Order.class); //根据订单号查询订单是否存在 Order dbOrder = orderMapper.getOrder(orderEntity.getOrderId()); if (dbOrder != null) { // 手动ack丢弃消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } //订单没有生成,开始补单 int result = orderMapper.addOrder(orderEntity); if (result > 0) { // 手动ack 删除该消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } }
派单消费者
package com.xiaojie.consumer; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import com.xiaojie.entity.Dispatch; import com.xiaojie.mapper.DispatchMapper; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author xiaojie * @version 1.0 * @description: 派单消费者 * @date 2021/10/11 22:58 */ @Component public class DispatchConsumer { @Autowired private DispatchMapper dispatchMapper; @RabbitListener(queues = "dispatch_order_queue") public void dispatchConsumer(Message message, Channel channel) throws IOException { // 1.获取消息 String msg = new String(message.getBody()); // 2.转换json JSONObject jsonObject = JSONObject.parseObject(msg); String orderId = jsonObject.getString("orderId"); // 计算分配的快递员id Dispatch dispatch=new Dispatch(); dispatch.setOrderId(orderId); //经过一系列的算法得到送餐时间为30分钟 dispatch.setSendTime(30*60L); dispatch.setRiderId(1000012L); dispatch.setUserId(15672L); // 3.插入我们的数据库 int result = dispatchMapper.saveDispatch(dispatch); if (result > 0) { // 手动ack 删除该消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } }
完整代码参考:项目中mq-transaction子模块