基于RocketMq解决分布式事务

简介: 基于RocketMq解决分布式事务

正文


一、原理


111.png


1、生产者投递事务消息到Broker中,设置该消息为半消息,不可以被消费 。

2、broker在刷盘成功之后返回ack给生产者。

3、生产者执行本地事务

4、生产者将本地事务执行结果,告知Broker。

5、如果事务执行成功,则将半消息设置成可以消费,然后消费者进行消费,,如果本地事务执行失败,则将半消息删除,进行回滚。

6、如果由于网络原因或者其他原因,Broker一直没有收到本地事务执行的结果,则Broker每隔60s主动获取本地事务执行的结果,若果获取到则设置半消息可以消费,反之继续重试。


二、代码


生产者代码


package com.xiaojie.rocket.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 订单生产者
 * @date 2021/11/14 23:25
 */
@Component
@Slf4j
public class OrderProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    public TransactionSendResult sendSyncMessage(String msg, String destination, String tag) {
        log.info("【发送消息】:{}........", msg);
        Message<String> message = MessageBuilder.withPayload(msg).build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(destination,  message,null);
        log.info("【发送状态】:{}", result.getLocalTransactionState());
        return result;
    }
}


生产者监听


package com.xiaojie.rocket.listener;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.xiaojie.rocket.mapper.OrderMapper;
import com.xiaojie.rocket.pojo.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import java.io.InputStream;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 回调监听
 * @date 2021/11/14 23:59
 */
@Slf4j
@Component
@RocketMQTransactionListener
public class ProducerListener implements RocketMQLocalTransactionListener {
    @Autowired
    private OrderMapper orderMapper;
    /**
     * @description: 执行本地事务
     * @author xiaojie
     * @date 2021/11/15 0:05
     * @version 1.0
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            if (message == null) {
                return null;
            }
            String msg = new String((byte[]) message.getPayload());
            log.info("发送的消息是message>>>>>>>>>",msg);
            Order order = JSONObject.parseObject(msg, Order.class);
            int insert = orderMapper.insert(order);
            if (insert>0){
                //事务执行成功
                return RocketMQLocalTransactionState.COMMIT;
            }else{
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    /**
     * @description: 检查本地事务
     * @author xiaojie
     * @date 2021/11/15 0:06
     * @version 1.0
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        try {
            if (message == null) {
                //如果为空,有可能是网络原因,不能删除数据,继续重试
                return RocketMQLocalTransactionState.UNKNOWN;
            }
            String msg = new String((byte[]) message.getPayload());
            log.info("发送的消息是message>>>>>>>>>",msg);
            Order order = JSONObject.parseObject(msg, Order.class);
            QueryWrapper queryWrapper=new QueryWrapper(order.getOrderid());
            Order dbOrder = orderMapper.selectOne(queryWrapper);
            if (dbOrder == null) {
                return RocketMQLocalTransactionState.UNKNOWN;
            }
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}


消费者


package com.xiaojie.rocket.consumer;
import com.alibaba.fastjson.JSONObject;
import com.xiaojie.rocket.mapper.DispatchMapper;
import com.xiaojie.rocket.pojo.Dispatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 派单消费者
 * @date 2021/11/15 0:23
 */
@Component
@RocketMQMessageListener(consumerGroup = "order-consumer", topic = "order-topic-test")
@Slf4j
public class DispatchConsumer implements RocketMQListener<String> {
    @Autowired
    private DispatchMapper dispatchMapper;
    @Override
    public void onMessage(String msg) {
        log.info(">>>>>>>>>>>>>>>>>>>>>",msg);
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String orderId = jsonObject.getString("orderId");
        // 计算分配的快递员id
        Dispatch dispatch=new Dispatch();
        dispatch.setOrderid(orderId);
        //经过一系列的算法得到送餐时间为30分钟
        dispatch.setSendtime(30*60L);
        dispatch.setRiderid(1000012L);
        dispatch.setUserid(15672L);
        // 3.插入我们的数据库
        int result = dispatchMapper.insert(dispatch);
    }
}


完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 RocketMQ 微服务
RocketMQ 分布式事务消息实战指南
RocketMQ 分布式事务消息实战指南
606 1
|
23天前
|
消息中间件 网络协议 C#
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
86 2
|
6月前
|
消息中间件 存储 负载均衡
分布式消息传递新时代:深入了解RabbitMQ_sharding插件的精髓【RabbitMQ 八】
分布式消息传递新时代:深入了解RabbitMQ_sharding插件的精髓【RabbitMQ 八】
134 0
|
5月前
|
消息中间件 中间件 程序员
分布式事务大揭秘:使用MQ实现最终一致性
本文由小米分享,介绍分布式事务中的MQ最终一致性实现,以RocketMQ为例。RocketMQ的事务消息机制包括准备消息、本地事务执行、确认/回滚消息及事务状态检查四个步骤。这种机制通过消息队列协调多系统操作,确保数据最终一致。MQ最终一致性具有系统解耦、提高可用性和灵活事务管理等优点,广泛应用于分布式系统中。文章还讨论了RocketMQ的事务消息处理流程和失败情况下的处理策略,帮助读者理解如何在实际应用中解决分布式事务问题。
399 6
|
5月前
|
消息中间件 监控 调度
构建Python中的分布式系统结合Celery与RabbitMQ
在当今的软件开发中,构建高效的分布式系统是至关重要的。Python作为一种流行的编程语言,提供了许多工具和库来帮助开发人员构建分布式系统。其中,Celery和RabbitMQ是两个强大的工具,它们结合在一起可以为你的Python应用程序提供可靠的异步任务队列和消息传递机制。
|
5月前
|
存储 关系型数据库 Java
技术经验解读:三种分布式事务LCN、Seata、MQ
技术经验解读:三种分布式事务LCN、Seata、MQ
177 0
|
6月前
基于MQ实现分布式事务
基于MQ实现分布式事务
51 1
|
6月前
|
消息中间件 存储 NoSQL
【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统
【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统
|
6月前
|
消息中间件 Kafka
消息队列 MQ:构建高效、可扩展的分布式系统
消息队列 MQ:构建高效、可扩展的分布式系统