RocketMQ 平滑升级到主从切换(实战篇)

简介: RocketMQ 平滑升级到主从切换(实战篇)

本文主要介绍如何将 RocketMQ 集群从原先的主从同步升级到主从切换。


本文首先介绍与 DLedger 多副本即 RocketMQ 主从切换相关的核心配置属性,然后尝试搭建一个主从同步集群,最后将原先的 RocketMQ 集群平滑升级到 DLedger 集群的示例,并简单测试一下主从切换功能。


1、RocketMQ 主从切换核心配置参数详解


其主要的配置参数如下所示:


  • enableDLegerCommitLog
    是否启用 DLedger,即是否启用 RocketMQ 主从切换,默认值为 false。如果需要开启主从切换,则该值需要设置为 true 。
  • dLegerGroup
    节点所属的 raft 组,建议与 brokerName 保持一致,例如 broker-a。
  • dLegerPeers
    集群节点信息,示例配置如下:n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913,多个节点用英文冒号隔开,单个条目遵循   legerSlefId-ip:端口,这里的端口用作 dledger 内部通信。
  • dLegerSelfId
    当前节点id。取自 legerPeers 中条目的开头,即上述示例中的 n0,并且特别需要强调,只能第一个字符为英文,其他字符需要配置成数字。
  • storePathRootDir
    DLedger 日志文件的存储根目录,为了能够支持平滑升级,该值与 storePathCommitLog 设置为不同的目录。


2、搭建主从同步环境


首先先搭建一个传统意义上的主从同步架构,往集群中灌一定量的数据,然后升级到 DLedger 集群。


在 Linux 服务器上搭建一个 rocketmq 主从同步集群我想不是一件很难的事情,故本文就不会详细介绍按照过程,只贴出相关配置。


实验环境的部署结构采取 一主一次,其部署图如下:

e4b1d1a5e5c669e1059fe61214d54e1a.png

下面我就重点贴一下 broker 的配置文件。


220 上的 broker 配置文件如下:

1brokerClusterName = DefaultCluster
 2brokerName = broker-a
 3brokerId = 0
 4deleteWhen = 04
 5fileReservedTime = 48
 6brokerRole = ASYNC_MASTER
 7flushDiskType = ASYNC_FLUSH
 8brokerIP1=192.168.0.220
 9brokerIP2=192.168.0.220
10namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
11storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
12storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
13autoCreateTopicEnable=false
14autoCreateSubscriptionGroup=false

221 上 broker 的配置文件如下:

1brokerClusterName = DefaultCluster
 2brokerName = broker-a
 3brokerId = 1
 4deleteWhen = 04
 5fileReservedTime = 48
 6brokerRole = SLAVE
 7flushDiskType = ASYNC_FLUSH
 8brokerIP1=192.168.0.221
 9brokerIP2=192.168.0.221
10namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
11storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
12storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
13autoCreateTopicEnable=false
14autoCreateSubscriptionGroup=false

相关的启动命令如下:

1nohup bin/mqnamesrv  /dev/null  2>&1 &
2nohup bin/mqbroker -c conf/broker.conf  /dev/null  2>&1 &

安装后的集群信息如图所示:


932e7cebe8f36eaa6c0a13a41ff91a7b.png


3、主从同步集群升级到DLedger


3.1 部署架构


DLedger 集群至少需要3台机器,故搭建 DLedger 还需要再引入一台机器,其部署结构图如下:

4fa223419b677ef30a9a3e6e9215291b.png

从主从同步集群升级到 DLedger 集群,用户最关心的还是升级后的集群是否能够兼容原先的数据,即原先存储在消息能否能被消息消费者消费端,甚至于能否查询到。


为了方便后续验证,首先我使用下述程序向 mq 集群中添加了一篇方便查询的消息(设置消息的key)。


1public class Producer {
 2    public static void main(String[] args) throws MQClientException, InterruptedException {
 3        DefaultMQProducer producer = new DefaultMQProducer("producer_dw_test");
 4        producer.setNamesrvAddr("192.168.0.220:9876;192.168.0.221:9876");
 5        producer.start();
 6        for(int i =600000; i < 600100; i ++) {
 7            try {
 8                Message msg = new Message("topic_dw_test_by_order_01",null , "m" + i,("Hello RocketMQ" + i ).getBytes(RemotingHelper.DEFAULT_CHARSET));
 9                SendResult sendResult = producer.send(msg);
10               //System.out.printf("%s%n", sendResult);
11            } catch (Exception e) {
12                e.printStackTrace();
13                Thread.sleep(1000);
14            }
15        }
16        producer.shutdown();
17        System.out.println("end");
18    }
19}

消息的查询结果示例如下:

5bceb27fd9e6afa22338ea302c968d37.png


3.2 升级步骤


Step1:将 192.168.0.220 的 rocketmq 拷贝到 192.168.0.222,可以使用如下命令进行操作。在 192.168.0.220 上敲如下命令:

1 scp -r rocketmq-all-4.5.2-bin-release/ root@192.168.0.222:/opt/application/rocketmq-all-4.5.2-bin-release

温馨提示:示例中由于版本是一样,实际过程中,版本需要升级,故需先下载最新的版本,然后将老集群中的 store 目录完整的拷贝到新集群的 store 目录。


Step2:依次在三台服务器的 broker.conf 配置文件中添加与 dledger 相关的配置属性。

192.168.0.220 broker配置文件如下:


1brokerClusterName = DefaultCluster
 2brokerId = 0
 3deleteWhen = 04
 4fileReservedTime = 48
 5brokerRole = ASYNC_MASTER
 6flushDiskType = ASYNC_FLUSH
 7brokerIP1=192.168.0.220
 8brokerIP2=192.168.0.220
 9namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
10storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
11storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
12autoCreateTopicEnable=false
13autoCreateSubscriptionGroup=false
14# 与 dledger 相关的属性
15enableDLegerCommitLog=true
16storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store
17dLegerGroup=broker-a
18dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911
19dLegerSelfId=n0

192.168.0.221 broker配置文件如下:

1brokerClusterName = DefaultCluster
 2brokerName = broker-a
 3brokerId = 1
 4deleteWhen = 04
 5fileReservedTime = 48
 6brokerRole = SLAVE
 7flushDiskType = ASYNC_FLUSH
 8brokerIP1=192.168.0.221
 9brokerIP2=192.168.0.221
10namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
11storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
12storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
13autoCreateTopicEnable=false
14autoCreateSubscriptionGroup=false
15# 与dledger 相关的配置属性
16enableDLegerCommitLog=true
17storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store
18dLegerGroup=broker-a
19dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911
20dLegerSelfId=n1

192.168.0.222 broker配置文件如下:

1brokerClusterName = DefaultCluster
 2brokerName = broker-a
 3brokerId = 0
 4deleteWhen = 04
 5fileReservedTime = 48
 6brokerRole = ASYNC_MASTER
 7flushDiskType = ASYNC_FLUSH
 8brokerIP1=192.168.0.222
 9brokerIP2=192.168.0.222
10namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
11storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
12storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
13autoCreateTopicEnable=false
14autoCreateSubscriptionGroup=false
15# 与 dledger 相关的配置
16enableDLegerCommitLog=true
17storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store
18dLegerGroup=broker-a
19dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911
20dLegerSelfId=n2

温馨提示:legerSelfId 分别为 n0、n1、n2。在真实的生产环境中,broker配置文件中的 storePathRootDir、storePathCommitLog 尽量使用单独的根目录,这样判断其磁盘使用率时才不会相互影响。


Step3:将 store/config 下的 所有文件拷贝到 dledger store 的 congfig 目录下。


1cd /opt/application/rocketmq-all-4.5.2-bin-release/store/
2cp config/* dledger_store/config/
3

温馨提示:该步骤按照各自按照时配置的目录进行复制即可。

Step4:依次启动三台 broker。

1nohup bin/mqbroker -c conf/broker.conf  /dev/null  2>&1 &

如果启动成功,则在 rocketmq-console 中看到的集群信息如下

5dc7b67a90d64bfc2d2ae34ea37b7495.jpg


3.3 验证消息发送与消息查找


首先我们先验证升级之前的消息是否能查询到,那我们还是查找key 为 m600000 的消息,查找结果如图所示:

5450c3d776ad4d78e95e2b9f799d9a52.png

然后我们来测试一下消息发送。测试代码如下:

1public class Producer {
 2    public static void main(String[] args) throws MQClientException, InterruptedException {
 3        DefaultMQProducer producer = new DefaultMQProducer("producer_dw_test");
 4        producer.setNamesrvAddr("192.168.0.220:9876;192.168.0.221:9876");
 5        producer.start();
 6        for(int i =600200; i < 600300; i ++) {
 7            try {
 8                Message msg = new Message("topic_dw_test_by_order_01",null , "m" + i,("Hello RocketMQ" + i ).getBytes(RemotingHelper.DEFAULT_CHARSET));
 9                SendResult sendResult = producer.send(msg);
10                System.out.printf("%s%n", sendResult);
11            } catch (Exception e) {
12                e.printStackTrace();
13                Thread.sleep(1000);
14            }
15        }
16        producer.shutdown();
17        System.out.println("end");
18    }
19}

执行结果如下:

58c696ae3f50ae4eb95c829251c6c68b.jpg

再去控制台查询一下消息,其结果也表明新的消息也能查询到。

816c3e791426c2778ef2750f437bd815.png

最后我们再来验证一下主节点宕机,消息发送是否会受影响。


在消息发送的过程中,去关闭主节点,其截图如下:


0f269791d39c37e5b726cd3d2916e4ec.jpg

51cc81610a4c59d95a3fbc9797455abf.jpg

b9e64dfc23d4d7c50f85f29dcfdce60b.jpg

再来看一下集群的状态:

4cedd883980642565caac13ae1d4daca.png

等待该复制组重新完成主服务器选举后,即可继续处理消息发送。


温馨提示:由于本示例是一主一从,故在选举期间,消息不可用,但在真实的生产环境上,其部署架构是多主主从,即一个复制组在 leader 选举期间,其他复制组可以接替该复制组完成消息的发送,实现消息服务的高可用。


与 DLedger 相关的日志,默认存储在 broker_default.log 文件中。


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 大数据 关系型数据库
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
543 64
RocketMQ实战—3.基于RocketMQ升级订单系统架构
|
7月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
2月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
1085 1
|
7月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
7月前
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
5月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
4137 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
7月前
|
消息中间件 Java 中间件
RocketMQ实战—2.RocketMQ集群生产部署
本文主要介绍了大纲什么是消息中间件、消息中间件的技术选型、RocketMQ的架构原理和使用方式、消息中间件路由中心的架构原理、Broker的主从架构原理、高可用的消息中间件生产部署架构、部署一个小规模的RocketMQ集群进行压测、如何对RocketMQ集群进行可视化的监控和管理、进行OS内核参数和JVM参数的调整、如何对小规模RocketMQ集群进行压测、消息中间件集群生产部署规划梳理。
RocketMQ实战—2.RocketMQ集群生产部署
|
7月前
|
消息中间件 NoSQL 大数据
RocketMQ实战—5.消息重复+乱序+延迟的处理
本文围绕RocketMQ的使用与优化展开,分析了优惠券重复发放的原因及解决方案。首先,通过案例说明了优惠券系统因消息重复、数据库宕机或消费失败等原因导致重复发券的问题,并提出引入幂等性机制(如业务判断法、Redis状态判断法)来保证数据唯一性。其次,探讨了死信队列在处理消费失败时的作用,以及如何通过重试和死信队列解决消息处理异常。接着,分析了订单库同步中消息乱序的原因,提出了基于顺序消息机制的代码实现方案,确保消息按序处理。此外,介绍了利用Tag和属性过滤数据提升效率的方法,以及延迟消息机制优化定时退款扫描的功能。最后,总结了RocketMQ生产实践中的经验.
RocketMQ实战—5.消息重复+乱序+延迟的处理
|
7月前
|
消息中间件 Java 测试技术
RocketMQ实战—7.生产集群部署和生产参数
本文详细介绍了RocketMQ生产集群的部署与调优过程,包括集群规划、环境搭建、参数配置和优化策略。
RocketMQ实战—7.生产集群部署和生产参数
|
7月前
|
消息中间件 NoSQL Java
RocketMQ实战—10.营销系统代码优化
本文主要介绍了如何对营销系统的四大促销场景的代码进行优化,包括:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。

热门文章

最新文章