RocketMQ 分布式事务消息实战指南

简介: RocketMQ 分布式事务消息实战指南

在当今的微服务架构中,分布式事务管理是一个核心问题。RocketMQ,作为阿里巴巴开源的一款分布式消息中间件,为解决这一难题提供了强有力的支持。本篇文章将详细介绍如何在实战中使用 RocketMQ 实现分布式事务消息,同时包含相应的代码示例。


一、RocketMQ 分布式事务简介


RocketMQ 支持消息的完全顺序性、消息的幂等性以及高可用性。在分布式系统中,由于网络分区、节点故障等原因,有时会出现部分服务提交、部分服务失败的情况,导致数据不一致。RocketMQ 的分布式事务消息功能可以在这种情况下保证数据的完整性。


二、RocketMQ 分布式事务消息实现


  1. 配置 RocketMQ Broker

首先,需要在 RocketMQ Broker 的配置文件中开启事务消息功能。具体配置如下:

# 开启事务消息功能
transaction.message.Enable=true
  1. 生产者发送事务消息

在生产者端,我们需要使用 RocketMQ 的 TransactionMQProducer 来发送事务消息。以下是一个简单的示例代码:

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("127.0.0.1:9876"); // 设置 RocketMQ Broker 地址
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 创建事务消息请求
            TransactionMQProducer producer1 = new TransactionMQProducer("ProducerGroupName");
            producer1.setNamesrvAddr("127.0.0.1:9876"); // 设置 RocketMQ Broker 地址
            producer1.setRetryTimesWhenSendAsyncFailed(3); // 设置重试次数
            producer1.setMessageValidator(new MessageValidatorImpl()); // 设置自定义的消息验证器
            producer1.start();
            TransactionSendResult result = producer1.send(buildTransactionMessage("TopicTest", "TagA", "OrderID" + i), new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 根据业务规则选择一个 MessageQueue 进行发送
                    return mqs.get(0);
                }
            });
            // 等待消息发送结果,如果发送失败则回滚操作,否则提交操作。
            if (result.getSendStatus() == SendStatus.SEND_ERROR) {
                System.out.println("Send Error, roll back action");
                // 进行回滚操作...
                producer1.retryMessageQueue(result.getMsgQueue(), result.getMsg());
            } else {
                System.out.println("Send Success, commit action");
                // 进行提交操作...
                producer1.commitMessageQueue(result.getMsgQueue(), result.getMsg());
            }
            producer1.shutdown(); // 关闭生产者实例
        }
        producer.shutdown(); // 关闭生产者实例
    }
}
  1. 消费者处理事务消息

在消费者端,我们需要使用 RocketMQ 的 TransactionalConsumer 来消费事务消息。以下是一个简单的示例代码:

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例并订阅主题和标签
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.subscribe("TopicTest", "*"); // 订阅主题和标签
        consumer.setNamesrvAddr("127.0.0.1:9876"); // 设置 RocketMQ Broker 地址
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) { // 处理消息列表
                System.out.println("Receive message: " + msg); // 打印收到的消息内容
                try { // 在业务代码块中完成业务逻辑处理,并在最后调用 confirm 方法提交或 reject 方法回滚。若业务处理失败或需要回滚,则抛出异常。当业务代码执行成功,但无法调用 confirm 或 reject 时,可以在 catch 块中抛出业务异常。这样 RocketMQ 会自动将该消息放到死信队列中,供其他消费者处理。死信队列中的消息优先级高于普通队列。默认情况下,死信队列名为原队列名后加一个点,例如,原队列名为“test_queue”,则死信队列名为“test_queue.DLQ”。
```java
            try {
                // 业务逻辑处理...
                System.out.println("Message processing success, commit it.");
                // 提交消息
                msg.setReconsumeTimes(0); // 重试次数清零
                context.getConsumer().confirm(msg);
            } catch (Exception e) {
                System.out.println("Message processing error, roll back it.");
                // 回滚消息
                msg.setReconsumeTimes(5); // 设置重试次数为5
                context.getConsumer().reject(msg);
            }
        });
        consumer.start(); // 启动消费者实例
    }
}


三、注意事项


  1. 在生产者发送事务消息时,需要保证网络连接的稳定性,避免出现网络分区、延迟等问题。
  2. 在消费者处理事务消息时,需要保证业务逻辑的正确性和健壮性,避免出现异常导致消息无法提交或回滚。
  3. 在生产者发送事务消息时,可以根据业务需要自定义消息验证器,对消息内容进行校验,确保消息的正确性和合法性。
  4. 在消费者处理事务消息时,可以根据业务需要设置不同的重试次数和回滚策略,以实现不同场景下的容错处理。
  5. 在使用 RocketMQ 分布式事务消息时,需要注意消息的顺序性和幂等性,避免出现重复处理或遗漏处理的情况。
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
人工智能 Kubernetes 数据可视化
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
本文回顾了一次关键词监测任务在容器集群中失效的全过程,分析了中转IP复用、调度节奏和异常处理等隐性风险,并提出通过解耦架构、动态IP分发和行为模拟优化采集策略,最终实现稳定高效的数据抓取与分析。
133 2
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
632 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
10月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
10月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
281 11
|
5月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
2144 1
|
10月前
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
7月前
|
数据采集 缓存 NoSQL
分布式新闻数据采集系统的同步效率优化实战
本文介绍了一个针对高频新闻站点的分布式爬虫系统优化方案。通过引入异步任务机制、本地缓存池、Redis pipeline 批量写入及身份池策略,系统采集效率提升近两倍,数据同步延迟显著降低,实现了分钟级热点追踪能力,为实时舆情监控与分析提供了高效、稳定的数据支持。
315 1
分布式新闻数据采集系统的同步效率优化实战
|
8月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
2317 7