RabbitMq实战——外卖派单通过补单系统实现分布式事务

简介: RabbitMq实战——外卖派单通过补单系统实现分布式事务

正文


一、分布式事务


请参考之前的文章


二、思路原理


111.png


当派单系统派单成功之后,订单系统报错,此时将会产生分布式事务的问题,派单数据生成,但此时订单数据异常事务回滚,就发生了分布式事务问题。此时解决分布式事务,生成一个订单的消费者,专门去消费生成订单异常时的一个程序,我们称之为补单系统。


三、代码


订单派单


package com.xiaojie.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.xiaojie.entity.Order;
import com.xiaojie.mapper.OrderMapper;
import com.xiaojie.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
/**
 * @author xiaojie
 * @version 1.0
 * @description:
 * @date 2021/10/11 22:09
 */
@Service
@Slf4j
public class OrderServiceImpl implements OrderService, RabbitTemplate.ConfirmCallback {
    //定义交换机
    private static final String XIAOJIE_ORDER_EXCHANGE = "xiaojie_order_exchange";
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    @Transactional
    public String saveOrder() {
        Order order = new Order();
        String orderId = UUID.randomUUID().toString();
        order.setOrderId(orderId);
        order.setOrderName("小谷姐姐麻辣烫");
        order.setPayMoney(35.68);
        order.setStatus(1);//假设订单支付完成
        int result = orderMapper.addOrder(order);
        if (result < 0) {
            return "下单失败";
        }
        //发送派单
        String orderJson = JSONObject.toJSONString(order);
        sendDispatchMsg(orderJson);
        //模拟报错
        int i = 1 / 0;
        return orderId;
    }
    @Async
    public void sendDispatchMsg(String jsonMSg) {
        // 设置生产者消息确认机制
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(jsonMSg);
        //将订单数据发送
        rabbitTemplate.convertAndSend(XIAOJIE_ORDER_EXCHANGE, "", jsonMSg, correlationData);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        if (ack) {
            log.info(">>>>>>>>消息发送成功:correlationData:{},ack:{},s:{}", correlationData, ack, s);
        } else {
            log.info(">>>>>>>消息发送失败{}", ack);
        }
    }
}


补单系统消费端


package com.xiaojie.consumer;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.xiaojie.entity.Order;
import com.xiaojie.mapper.OrderMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 补单消费者
 * @date 2021/10/11 22:37
 */
@Component
public class OrderConsumer {
    @Autowired
    private OrderMapper orderMapper;
    @RabbitListener(queues = {"xiaojie_order_queue"})
    /**
     * @description: 补单消费者,补偿分布式事务解决框架 数据最终一致性
     * @param:
     * @param: message
     * @param: channel
     * @return: void
     * @author xiaojie
     * @date: 2021/10/11 22:41
     */
    public void compensateOrder(Message message, Channel channel) throws IOException {
        // 1.获取消息
        String msg = new String(message.getBody());
        // 2.获取order对象
        Order orderEntity = JSONObject.parseObject(msg, Order.class);
        //根据订单号查询订单是否存在
        Order dbOrder = orderMapper.getOrder(orderEntity.getOrderId());
        if (dbOrder != null) {
            // 手动ack丢弃消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            return;
        }
        //订单没有生成,开始补单
        int result = orderMapper.addOrder(orderEntity);
        if (result > 0) {
            // 手动ack 删除该消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}


派单消费者


package com.xiaojie.consumer;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.xiaojie.entity.Dispatch;
import com.xiaojie.mapper.DispatchMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 派单消费者
 * @date 2021/10/11 22:58
 */
@Component
public class DispatchConsumer {
    @Autowired
    private DispatchMapper dispatchMapper;
    @RabbitListener(queues = "dispatch_order_queue")
    public void dispatchConsumer(Message message, Channel channel) throws IOException {
        // 1.获取消息
        String msg = new String(message.getBody());
        // 2.转换json
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String orderId = jsonObject.getString("orderId");
        // 计算分配的快递员id
        Dispatch dispatch=new Dispatch();
        dispatch.setOrderId(orderId);
        //经过一系列的算法得到送餐时间为30分钟
        dispatch.setSendTime(30*60L);
        dispatch.setRiderId(1000012L);
        dispatch.setUserId(15672L);
        // 3.插入我们的数据库
        int result = dispatchMapper.saveDispatch(dispatch);
        if (result > 0) {
            // 手动ack 删除该消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}


完整代码参考:项目中mq-transaction子模块

spring-boot: Springboot整合redis、消息中间件等相关代码

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
人工智能 Kubernetes 数据可视化
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
本文回顾了一次关键词监测任务在容器集群中失效的全过程,分析了中转IP复用、调度节奏和异常处理等隐性风险,并提出通过解耦架构、动态IP分发和行为模拟优化采集策略,最终实现稳定高效的数据抓取与分析。
132 2
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
632 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
9月前
|
Kubernetes 大数据 调度
Airflow vs Argo Workflows:分布式任务调度系统的“华山论剑”
本文对比了Apache Airflow与Argo Workflows两大分布式任务调度系统。两者均支持复杂的DAG任务编排、社区支持及任务调度功能,且具备优秀的用户界面。Airflow以Python为核心语言,适合数据科学家使用,拥有丰富的Operator库和云服务集成能力;而Argo Workflows基于Kubernetes设计,支持YAML和Python双语定义工作流,具备轻量化、高性能并发调度的优势,并通过Kubernetes的RBAC机制实现多用户隔离。在大数据和AI场景中,Airflow擅长结合云厂商服务,Argo则更适配Kubernetes生态下的深度集成。
1123 35
|
10月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
10月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
281 11
|
5月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
2143 1
|
5月前
|
存储 算法 安全
“卧槽,系统又崩了!”——别慌,这也许是你看过最通俗易懂的分布式入门
本文深入解析分布式系统核心机制:数据分片与冗余副本实现扩展与高可用,租约、多数派及Gossip协议保障一致性与容错。探讨节点故障、网络延迟等挑战,揭示CFT/BFT容错原理,剖析规模与性能关系,为构建可靠分布式系统提供理论支撑。
298 2
|
5月前
|
机器学习/深度学习 算法 安全
新型电力系统下多分布式电源接入配电网承载力评估方法研究(Matlab代码实现)
新型电力系统下多分布式电源接入配电网承载力评估方法研究(Matlab代码实现)
196 3
|
7月前
|
数据采集 缓存 NoSQL
分布式新闻数据采集系统的同步效率优化实战
本文介绍了一个针对高频新闻站点的分布式爬虫系统优化方案。通过引入异步任务机制、本地缓存池、Redis pipeline 批量写入及身份池策略,系统采集效率提升近两倍,数据同步延迟显著降低,实现了分钟级热点追踪能力,为实时舆情监控与分析提供了高效、稳定的数据支持。
314 1
分布式新闻数据采集系统的同步效率优化实战