RocketMQ 分布式事务消息实战指南

简介: RocketMQ 分布式事务消息实战指南

在当今的微服务架构中,分布式事务管理是一个核心问题。RocketMQ,作为阿里巴巴开源的一款分布式消息中间件,为解决这一难题提供了强有力的支持。本篇文章将详细介绍如何在实战中使用 RocketMQ 实现分布式事务消息,同时包含相应的代码示例。


一、RocketMQ 分布式事务简介


RocketMQ 支持消息的完全顺序性、消息的幂等性以及高可用性。在分布式系统中,由于网络分区、节点故障等原因,有时会出现部分服务提交、部分服务失败的情况,导致数据不一致。RocketMQ 的分布式事务消息功能可以在这种情况下保证数据的完整性。


二、RocketMQ 分布式事务消息实现


  1. 配置 RocketMQ Broker

首先,需要在 RocketMQ Broker 的配置文件中开启事务消息功能。具体配置如下:

# 开启事务消息功能
transaction.message.Enable=true
  1. 生产者发送事务消息

在生产者端,我们需要使用 RocketMQ 的 TransactionMQProducer 来发送事务消息。以下是一个简单的示例代码:

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("127.0.0.1:9876"); // 设置 RocketMQ Broker 地址
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 创建事务消息请求
            TransactionMQProducer producer1 = new TransactionMQProducer("ProducerGroupName");
            producer1.setNamesrvAddr("127.0.0.1:9876"); // 设置 RocketMQ Broker 地址
            producer1.setRetryTimesWhenSendAsyncFailed(3); // 设置重试次数
            producer1.setMessageValidator(new MessageValidatorImpl()); // 设置自定义的消息验证器
            producer1.start();
            TransactionSendResult result = producer1.send(buildTransactionMessage("TopicTest", "TagA", "OrderID" + i), new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 根据业务规则选择一个 MessageQueue 进行发送
                    return mqs.get(0);
                }
            });
            // 等待消息发送结果,如果发送失败则回滚操作,否则提交操作。
            if (result.getSendStatus() == SendStatus.SEND_ERROR) {
                System.out.println("Send Error, roll back action");
                // 进行回滚操作...
                producer1.retryMessageQueue(result.getMsgQueue(), result.getMsg());
            } else {
                System.out.println("Send Success, commit action");
                // 进行提交操作...
                producer1.commitMessageQueue(result.getMsgQueue(), result.getMsg());
            }
            producer1.shutdown(); // 关闭生产者实例
        }
        producer.shutdown(); // 关闭生产者实例
    }
}
  1. 消费者处理事务消息

在消费者端,我们需要使用 RocketMQ 的 TransactionalConsumer 来消费事务消息。以下是一个简单的示例代码:

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例并订阅主题和标签
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.subscribe("TopicTest", "*"); // 订阅主题和标签
        consumer.setNamesrvAddr("127.0.0.1:9876"); // 设置 RocketMQ Broker 地址
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) { // 处理消息列表
                System.out.println("Receive message: " + msg); // 打印收到的消息内容
                try { // 在业务代码块中完成业务逻辑处理,并在最后调用 confirm 方法提交或 reject 方法回滚。若业务处理失败或需要回滚,则抛出异常。当业务代码执行成功,但无法调用 confirm 或 reject 时,可以在 catch 块中抛出业务异常。这样 RocketMQ 会自动将该消息放到死信队列中,供其他消费者处理。死信队列中的消息优先级高于普通队列。默认情况下,死信队列名为原队列名后加一个点,例如,原队列名为“test_queue”,则死信队列名为“test_queue.DLQ”。
```java
            try {
                // 业务逻辑处理...
                System.out.println("Message processing success, commit it.");
                // 提交消息
                msg.setReconsumeTimes(0); // 重试次数清零
                context.getConsumer().confirm(msg);
            } catch (Exception e) {
                System.out.println("Message processing error, roll back it.");
                // 回滚消息
                msg.setReconsumeTimes(5); // 设置重试次数为5
                context.getConsumer().reject(msg);
            }
        });
        consumer.start(); // 启动消费者实例
    }
}


三、注意事项


  1. 在生产者发送事务消息时,需要保证网络连接的稳定性,避免出现网络分区、延迟等问题。
  2. 在消费者处理事务消息时,需要保证业务逻辑的正确性和健壮性,避免出现异常导致消息无法提交或回滚。
  3. 在生产者发送事务消息时,可以根据业务需要自定义消息验证器,对消息内容进行校验,确保消息的正确性和合法性。
  4. 在消费者处理事务消息时,可以根据业务需要设置不同的重试次数和回滚策略,以实现不同场景下的容错处理。
  5. 在使用 RocketMQ 分布式事务消息时,需要注意消息的顺序性和幂等性,避免出现重复处理或遗漏处理的情况。
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
数据管理 API 调度
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
HarmonyOS Next 是华为新一代操作系统,专注于分布式技术的深度应用与生态融合。本文通过技术特点、应用场景及实战案例,全面解析其核心技术架构与开发流程。重点介绍分布式软总线2.0、数据管理、任务调度等升级特性,并提供基于 ArkTS 的原生开发支持。通过开发跨设备协同音乐播放应用,展示分布式能力的实际应用,涵盖项目配置、主界面设计、分布式服务实现及部署调试步骤。此外,深入分析分布式数据同步原理、任务调度优化及常见问题解决方案,帮助开发者掌握 HarmonyOS Next 的核心技术和实战技巧。
251 76
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
|
1天前
|
人工智能 Kubernetes 异构计算
大道至简-基于ACK的Deepseek满血版分布式推理部署实战
本教程演示如何在ACK中多机分布式部署DeepSeek R1满血版。
|
24天前
|
存储 缓存 Java
Java中的分布式缓存与Memcached集成实战
通过在Java项目中集成Memcached,可以显著提升系统的性能和响应速度。合理的缓存策略、分布式架构设计和异常处理机制是实现高效缓存的关键。希望本文提供的实战示例和优化建议能够帮助开发者更好地应用Memcached,实现高性能的分布式缓存解决方案。
38 9
|
2月前
|
物联网 调度 vr&ar
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
鸿蒙技术分享:HarmonyOS Next 深度解析 随着万物互联时代的到来,华为发布的 HarmonyOS Next 在技术架构和生态体验上实现了重大升级。本文从技术架构、生态优势和开发实践三方面深入探讨其特点,并通过跨设备笔记应用实战案例,展示其强大的分布式能力和多设备协作功能。核心亮点包括新一代微内核架构、统一开发语言 ArkTS 和多模态交互支持。开发者可借助 DevEco Studio 4.0 快速上手,体验高效、灵活的开发过程。 239个字符
236 13
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
|
2月前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
80 10
|
2月前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
351 7
|
4月前
|
消息中间件 网络协议 C#
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
4月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
4月前
|
消息中间件 数据采集 中间件
RabbitMQ的使用—实战
RabbitMQ的使用—实战
142 0
|
7月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
190 23