基于RocketMq解决分布式事务

简介: 基于RocketMq解决分布式事务

正文


一、原理


111.png


1、生产者投递事务消息到Broker中,设置该消息为半消息,不可以被消费 。

2、broker在刷盘成功之后返回ack给生产者。

3、生产者执行本地事务

4、生产者将本地事务执行结果,告知Broker。

5、如果事务执行成功,则将半消息设置成可以消费,然后消费者进行消费,,如果本地事务执行失败,则将半消息删除,进行回滚。

6、如果由于网络原因或者其他原因,Broker一直没有收到本地事务执行的结果,则Broker每隔60s主动获取本地事务执行的结果,若果获取到则设置半消息可以消费,反之继续重试。


二、代码


生产者代码


package com.xiaojie.rocket.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 订单生产者
 * @date 2021/11/14 23:25
 */
@Component
@Slf4j
public class OrderProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    public TransactionSendResult sendSyncMessage(String msg, String destination, String tag) {
        log.info("【发送消息】:{}........", msg);
        Message<String> message = MessageBuilder.withPayload(msg).build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(destination,  message,null);
        log.info("【发送状态】:{}", result.getLocalTransactionState());
        return result;
    }
}


生产者监听


package com.xiaojie.rocket.listener;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.xiaojie.rocket.mapper.OrderMapper;
import com.xiaojie.rocket.pojo.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import java.io.InputStream;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 回调监听
 * @date 2021/11/14 23:59
 */
@Slf4j
@Component
@RocketMQTransactionListener
public class ProducerListener implements RocketMQLocalTransactionListener {
    @Autowired
    private OrderMapper orderMapper;
    /**
     * @description: 执行本地事务
     * @author xiaojie
     * @date 2021/11/15 0:05
     * @version 1.0
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            if (message == null) {
                return null;
            }
            String msg = new String((byte[]) message.getPayload());
            log.info("发送的消息是message>>>>>>>>>",msg);
            Order order = JSONObject.parseObject(msg, Order.class);
            int insert = orderMapper.insert(order);
            if (insert>0){
                //事务执行成功
                return RocketMQLocalTransactionState.COMMIT;
            }else{
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    /**
     * @description: 检查本地事务
     * @author xiaojie
     * @date 2021/11/15 0:06
     * @version 1.0
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        try {
            if (message == null) {
                //如果为空,有可能是网络原因,不能删除数据,继续重试
                return RocketMQLocalTransactionState.UNKNOWN;
            }
            String msg = new String((byte[]) message.getPayload());
            log.info("发送的消息是message>>>>>>>>>",msg);
            Order order = JSONObject.parseObject(msg, Order.class);
            QueryWrapper queryWrapper=new QueryWrapper(order.getOrderid());
            Order dbOrder = orderMapper.selectOne(queryWrapper);
            if (dbOrder == null) {
                return RocketMQLocalTransactionState.UNKNOWN;
            }
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}


消费者


package com.xiaojie.rocket.consumer;
import com.alibaba.fastjson.JSONObject;
import com.xiaojie.rocket.mapper.DispatchMapper;
import com.xiaojie.rocket.pojo.Dispatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 派单消费者
 * @date 2021/11/15 0:23
 */
@Component
@RocketMQMessageListener(consumerGroup = "order-consumer", topic = "order-topic-test")
@Slf4j
public class DispatchConsumer implements RocketMQListener<String> {
    @Autowired
    private DispatchMapper dispatchMapper;
    @Override
    public void onMessage(String msg) {
        log.info(">>>>>>>>>>>>>>>>>>>>>",msg);
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String orderId = jsonObject.getString("orderId");
        // 计算分配的快递员id
        Dispatch dispatch=new Dispatch();
        dispatch.setOrderid(orderId);
        //经过一系列的算法得到送餐时间为30分钟
        dispatch.setSendtime(30*60L);
        dispatch.setRiderid(1000012L);
        dispatch.setUserid(15672L);
        // 3.插入我们的数据库
        int result = dispatchMapper.insert(dispatch);
    }
}


完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码

相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
1
0
9
分享
相关文章
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
分布式消息传递新时代:深入了解RabbitMQ_sharding插件的精髓【RabbitMQ 八】
分布式消息传递新时代:深入了解RabbitMQ_sharding插件的精髓【RabbitMQ 八】
289 0
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
516 2
分布式事务大揭秘:使用MQ实现最终一致性
本文由小米分享,介绍分布式事务中的MQ最终一致性实现,以RocketMQ为例。RocketMQ的事务消息机制包括准备消息、本地事务执行、确认/回滚消息及事务状态检查四个步骤。这种机制通过消息队列协调多系统操作,确保数据最终一致。MQ最终一致性具有系统解耦、提高可用性和灵活事务管理等优点,广泛应用于分布式系统中。文章还讨论了RocketMQ的事务消息处理流程和失败情况下的处理策略,帮助读者理解如何在实际应用中解决分布式事务问题。
823 6
构建Python中的分布式系统结合Celery与RabbitMQ
在当今的软件开发中,构建高效的分布式系统是至关重要的。Python作为一种流行的编程语言,提供了许多工具和库来帮助开发人员构建分布式系统。其中,Celery和RabbitMQ是两个强大的工具,它们结合在一起可以为你的Python应用程序提供可靠的异步任务队列和消息传递机制。
基于MQ实现分布式事务
基于MQ实现分布式事务
86 1

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问