RabbitMq实战——外卖派单通过补单系统实现分布式事务

简介: RabbitMq实战——外卖派单通过补单系统实现分布式事务

正文


一、分布式事务


请参考之前的文章


二、思路原理


111.png


当派单系统派单成功之后,订单系统报错,此时将会产生分布式事务的问题,派单数据生成,但此时订单数据异常事务回滚,就发生了分布式事务问题。此时解决分布式事务,生成一个订单的消费者,专门去消费生成订单异常时的一个程序,我们称之为补单系统。


三、代码


订单派单


package com.xiaojie.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.xiaojie.entity.Order;
import com.xiaojie.mapper.OrderMapper;
import com.xiaojie.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
/**
 * @author xiaojie
 * @version 1.0
 * @description:
 * @date 2021/10/11 22:09
 */
@Service
@Slf4j
public class OrderServiceImpl implements OrderService, RabbitTemplate.ConfirmCallback {
    //定义交换机
    private static final String XIAOJIE_ORDER_EXCHANGE = "xiaojie_order_exchange";
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    @Transactional
    public String saveOrder() {
        Order order = new Order();
        String orderId = UUID.randomUUID().toString();
        order.setOrderId(orderId);
        order.setOrderName("小谷姐姐麻辣烫");
        order.setPayMoney(35.68);
        order.setStatus(1);//假设订单支付完成
        int result = orderMapper.addOrder(order);
        if (result < 0) {
            return "下单失败";
        }
        //发送派单
        String orderJson = JSONObject.toJSONString(order);
        sendDispatchMsg(orderJson);
        //模拟报错
        int i = 1 / 0;
        return orderId;
    }
    @Async
    public void sendDispatchMsg(String jsonMSg) {
        // 设置生产者消息确认机制
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(jsonMSg);
        //将订单数据发送
        rabbitTemplate.convertAndSend(XIAOJIE_ORDER_EXCHANGE, "", jsonMSg, correlationData);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        if (ack) {
            log.info(">>>>>>>>消息发送成功:correlationData:{},ack:{},s:{}", correlationData, ack, s);
        } else {
            log.info(">>>>>>>消息发送失败{}", ack);
        }
    }
}


补单系统消费端


package com.xiaojie.consumer;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.xiaojie.entity.Order;
import com.xiaojie.mapper.OrderMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 补单消费者
 * @date 2021/10/11 22:37
 */
@Component
public class OrderConsumer {
    @Autowired
    private OrderMapper orderMapper;
    @RabbitListener(queues = {"xiaojie_order_queue"})
    /**
     * @description: 补单消费者,补偿分布式事务解决框架 数据最终一致性
     * @param:
     * @param: message
     * @param: channel
     * @return: void
     * @author xiaojie
     * @date: 2021/10/11 22:41
     */
    public void compensateOrder(Message message, Channel channel) throws IOException {
        // 1.获取消息
        String msg = new String(message.getBody());
        // 2.获取order对象
        Order orderEntity = JSONObject.parseObject(msg, Order.class);
        //根据订单号查询订单是否存在
        Order dbOrder = orderMapper.getOrder(orderEntity.getOrderId());
        if (dbOrder != null) {
            // 手动ack丢弃消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            return;
        }
        //订单没有生成,开始补单
        int result = orderMapper.addOrder(orderEntity);
        if (result > 0) {
            // 手动ack 删除该消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}


派单消费者


package com.xiaojie.consumer;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.xiaojie.entity.Dispatch;
import com.xiaojie.mapper.DispatchMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 派单消费者
 * @date 2021/10/11 22:58
 */
@Component
public class DispatchConsumer {
    @Autowired
    private DispatchMapper dispatchMapper;
    @RabbitListener(queues = "dispatch_order_queue")
    public void dispatchConsumer(Message message, Channel channel) throws IOException {
        // 1.获取消息
        String msg = new String(message.getBody());
        // 2.转换json
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String orderId = jsonObject.getString("orderId");
        // 计算分配的快递员id
        Dispatch dispatch=new Dispatch();
        dispatch.setOrderId(orderId);
        //经过一系列的算法得到送餐时间为30分钟
        dispatch.setSendTime(30*60L);
        dispatch.setRiderId(1000012L);
        dispatch.setUserId(15672L);
        // 3.插入我们的数据库
        int result = dispatchMapper.saveDispatch(dispatch);
        if (result > 0) {
            // 手动ack 删除该消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}


完整代码参考:项目中mq-transaction子模块

spring-boot: Springboot整合redis、消息中间件等相关代码

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
数据管理 API 调度
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
HarmonyOS Next 是华为新一代操作系统,专注于分布式技术的深度应用与生态融合。本文通过技术特点、应用场景及实战案例,全面解析其核心技术架构与开发流程。重点介绍分布式软总线2.0、数据管理、任务调度等升级特性,并提供基于 ArkTS 的原生开发支持。通过开发跨设备协同音乐播放应用,展示分布式能力的实际应用,涵盖项目配置、主界面设计、分布式服务实现及部署调试步骤。此外,深入分析分布式数据同步原理、任务调度优化及常见问题解决方案,帮助开发者掌握 HarmonyOS Next 的核心技术和实战技巧。
255 76
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
|
3天前
|
人工智能 Kubernetes 异构计算
大道至简-基于ACK的Deepseek满血版分布式推理部署实战
本教程演示如何在ACK中多机分布式部署DeepSeek R1满血版。
|
25天前
|
存储 缓存 Java
Java中的分布式缓存与Memcached集成实战
通过在Java项目中集成Memcached,可以显著提升系统的性能和响应速度。合理的缓存策略、分布式架构设计和异常处理机制是实现高效缓存的关键。希望本文提供的实战示例和优化建议能够帮助开发者更好地应用Memcached,实现高性能的分布式缓存解决方案。
38 9
|
30天前
|
存储 运维 安全
盘古分布式存储系统的稳定性实践
本文介绍了阿里云飞天盘古分布式存储系统的稳定性实践。盘古作为阿里云的核心组件,支撑了阿里巴巴集团的众多业务,确保数据高可靠性、系统高可用性和安全生产运维是其关键目标。文章详细探讨了数据不丢不错、系统高可用性的实现方法,以及通过故障演练、自动化发布和健康检查等手段保障生产安全。总结指出,稳定性是一项系统工程,需要持续迭代演进,盘古经过十年以上的线上锤炼,积累了丰富的实践经验。
|
2月前
|
物联网 调度 vr&ar
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
鸿蒙技术分享:HarmonyOS Next 深度解析 随着万物互联时代的到来,华为发布的 HarmonyOS Next 在技术架构和生态体验上实现了重大升级。本文从技术架构、生态优势和开发实践三方面深入探讨其特点,并通过跨设备笔记应用实战案例,展示其强大的分布式能力和多设备协作功能。核心亮点包括新一代微内核架构、统一开发语言 ArkTS 和多模态交互支持。开发者可借助 DevEco Studio 4.0 快速上手,体验高效、灵活的开发过程。 239个字符
236 13
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
|
1月前
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
50 7
|
2月前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
83 10
|
2月前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
365 7
|
2月前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
118 4
|
3月前
|
存储 运维 负载均衡
构建高可用性GraphRAG系统:分布式部署与容错机制
【10月更文挑战第28天】作为一名数据科学家和系统架构师,我在构建和维护大规模分布式系统方面有着丰富的经验。最近,我负责了一个基于GraphRAG(Graph Retrieval-Augmented Generation)模型的项目,该模型用于构建一个高可用性的问答系统。在这个过程中,我深刻体会到分布式部署和容错机制的重要性。本文将详细介绍如何在生产环境中构建一个高可用性的GraphRAG系统,包括分布式部署方案、负载均衡、故障检测与恢复机制等方面的内容。
226 4
构建高可用性GraphRAG系统:分布式部署与容错机制