RocketMQ的常规运维实践应用

简介: RocketMQ的常规运维实践应用

RocketMQ的常规运维实践应用


1. 实验环境说明

实验环境

  1. 体验手册。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验操作手册。

  1. 云产品资源。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验环境使用的云服务器(ECS实例),并挂载弹性IP, 可通过公网在本地访问。弹性IP需要大家记住如何查看到,后续的实验会用到。

  1. 体验报告。

小伟老师希望大家多多填写,我们多多优化,帮助大家快速方便的通过实验了解RocketMQ。

  1. 实验环境。

实验室为您提供一个云服务器ECS实例,操作系统为Alibaba Cloud Linux 2.1903 64位版本。

  1. 实验体验时间。

体验时间一般为一个小时。

  1. 实验环境功能栏。

功能栏一般包括全屏、切换至Web Terminal、FAQ、热门问题、主题色、钉钉交流群二维码和问题反馈等七个功能。

  1. 编写实验报告。

填写实验报告,帮助大家快速通过实验了解RocketMQ。

实验帮助

如果您在使用RocketMQ实验时有需要咨询的问题,可以扫描二维码加入钉钉钉钉群。

启动一个集群

  1. 启动namesrv。观察到启动成功的日志后, ctrl + c,终止日志输出。
cd /usr/local/services/5-rocketmq/namesrv-01
restart.sh
  1. 启动broker

修改broker配置项brokerIP1为实验公网IP,启动broker。观察到启动成功的日志后, ctrl + c,终止日志输出。

操作命令如下:

cd /usr/local/services/5-rocketmq/broker-01
vim ./conf/broker.conf 
restart.sh


2. 如何编译、部署RocketMQ dashboard

如何编译、部署RocketMQ dashboard。

本教程会以如何利用源码编译并打包RocketMQ为例, 演示如何下载、编译任意版本的RocketMQ.

0. 启动一个集群 (已启动则不用操作)

1. 安装git,jdk, maven等工具(当前实验环境已经安装好。参考或者baidu/google)

  • jdk安装
  • maven安装

2. 下载最新release代码(这里以git为例,如果没有安装git直接从github release页面下载)

在自己电脑上, 进入命令行, 选择一个保存源码的目录, 这里我把源码保存到 /tiger/tmp为例

2.1 创建代码保存目录(已创建则不操作)并进入代码保存目录:

mkdir -p /tiger/tmp
cd /tiger/tmp

2.2 克隆master代码

git clone https://github.com/apache/rocketmq-dashboard.git
or 
git clone https://gitee.com/mirrors_apache/rocketmq-dashboard.git

2.3 进入源码根目录:

cd rocketmq-dashboard

可以看到如下代码信息:

3. 编译和打包源码(这里面包含前后端一起打包,前端yarn, node, npm都已经在实验环境准备好了,及时下载大概率会失败)

  • 修改dashboard配置的namesrv地址
vim src/main/resources/application.yml

  • 执行打包命令
mvn clean package -Dmaven.test.skip=true

看到如下结果就是成功了

编译打包成功后, 我们执行:

cd target
ls -l

查看打包结果

  • 运行dashboard
java -Xms400m -Xmx400m -Xmn400m  -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar


3. 如何查看和扩容Topic

扩容Topic,有多种方式:

  1. 在topic所在broker节点上增加queue数量
  2. 在topic所在broker节点外,已存在的broker节点上增加queue数量。
  3. 新添加broker节点,将增加的queue放到新broker节点。

本节主要讲解第一种, 一般的,增加queue数量可以增大生产和消费吞吐量。

0. 启动一个集群和安装dashboard (已完成则不用操作)

创建topic

通过访问实验环境http://公网IP:30904, 可以访问到实验环境部署的dashboard。创建一个topic, 配置如下

查看topic的队列数

这里显示2个队列都在broker-a上。

增加topic队列数

增加队列和创建一样, 可以选择选择新增加的队列分布在哪些集群或者哪些broker上。 这里我们只有一个broker,

保持集群、broker不变。预期增加的队列都在broker-a上。

增加队列成功后,我们再次查看topic状态, 发现队列已经变化。

一般的, 增加队列数,可以增大消费效率。 同一个消费者组中的消费者个数最大为队列数,一般消费能力随着队列数增大而增加。


4. 如何重置消费位点

重置消费位点,是消费者想重新消费部分消息,此时可以通过社区的dashboard直接操作。

前提是消费者已经做好消费幂等。

下面简单总结了重置消费位点的流程:

通过上图可以看出重置消费位点需要broker,client都支持才能进行,下面我们分享下操作过程。

扩容Topic,这里是狭义的扩容, 指增加topic的queue数量,以此来对生产和消费效率产生影响。

0. 启动一个集群和安装dashboard (已完成则不用操作)

查找到需要重置位点的topic

通过访问实验环境http://公网IP:30904, 可以访问到实验环境部署的dashboard。查找到需要重置位点的topic,点击重置消费位点

选择需要重置消费位点的消费者组和位点起始时间。

注意:

2.1 消费者有消费幂等实现,如果没有的话,位点重置后,可能带来数据重复等问题。

2.2 一个topic可能被多个消费者组订阅,需要准确找到你需要重置的那个消费者组名字

2.3 每一个消息有一个位点对应,每一个位点有一个距离其最近的时间。所以这里的时间并不能完全对应位点值,只是一个预估。

观察重置后旧的消息是否被消费者重新消费。这个过程可能需要一点时间, 耐心等待观察结果

一般的,重置位点一般有2种场景可以用到

  1. 业务消费逻辑有bug,修复后需要重新处理之前业务消息
  2. 流计算中,重放数据。


5. 如何动态修改Broker配置

动态修改Broker配置可以动态生效,并且可以帮你快速了解Broker的全部配置项。rocketmq发布的工具包中有单独的命令可以支持。

下面以maxMsgsNumBatch配置项为例,我们修改下这个配置值。

0. 启动一个集群 (已完成则不用操作)

1. 进入一个broker的根目录,可以看到如下目录结构

2. 查看一个broker全部配置值。

bin/mqadmin getBrokerConfig -b 127.0.0.1:10911

-b : 后面跟broker ip:port

查出的结果会类似:

3. 更新maxMsgsNumBatch的值为64

bin/mqadmin updateBrokerConfig -b 127.0.0.1:10911 -k maxMsgsNumBatch -v 64

更新成功后会打印:

4. 再通过第2步的命令查看,值是否被改变了。

一般的, 只要调用过更新broker配置接口, broker都会持久化一份全部配置到broker.conf(配置文件名以启动命令参数为准),我们登录broker可以看到。

以下是我本地的一份配置demo,大家参考

serverSelectorThreads=3
brokerRole=ASYNC_MASTER
serverSocketRcvBufSize=131072
osPageCacheBusyTimeOutMills=1000
shortPollingTimeMills=1000
clientSocketRcvBufSize=131072
clusterTopicEnable=true
brokerTopicEnable=true
autoCreateTopicEnable=true
maxErrorRateOfBloomFilter=20
maxMsgsNumBatch=64
cleanResourceInterval=10000
commercialBaseCount=1
maxTransferCountOnMessageInMemory=32
brokerFastFailureEnable=true
brokerClusterName=DefaultCluster
flushDiskType=ASYNC_FLUSH
commercialBigCount=1
mappedFileSizeConsumeQueue=6000000
consumerFallbehindThreshold=17179869184
autoCreateSubscriptionGroup=true
transientStorePoolEnable=false
flushConsumerOffsetInterval=5000
waitTimeMillsInHeartbeatQueue=31000
diskMaxUsedSpaceRatio=75
flushCommitLogLeastPages=4
cleanFileForciblyEnable=true
slaveReadEnable=false
msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
expectConsumerNumUseFilter=32
traceTopicEnable=false
useEpollNativeSelector=false
enablePropertyFilter=true
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
deleteCommitLogFilesInterval=100
brokerName=broker-a
maxTransferBytesOnMessageInDisk=65536
listenPort=10911
flushConsumeQueueLeastPages=2
pullMessageThreadPoolNums=20
useReentrantLockWhenPutMessage=false
flushIntervalConsumeQueue=1000
sendThreadPoolQueueCapacity=10000
debugLockEnable=false
haHousekeepingInterval=20000
diskFallRecorded=true
messageIndexEnable=true
clientAsyncSemaphoreValue=65535
clientCallbackExecutorThreads=2
putMsgIndexHightWater=600000
sendMessageThreadPoolNums=1
clientManagerThreadPoolQueueCapacity=1000000
serverSocketSndBufSize=131072
maxDelayTime=40
clientSocketSndBufSize=131072
namesrvAddr=localhost:9876
commercialEnable=true
maxHashSlotNum=5000000
heartbeatThreadPoolNums=2
transactionTimeOut=6000
maxMessageSize=4194304
adminBrokerThreadPoolNums=16
defaultQueryMaxNum=32
maxTransferBytesOnMessageInMemory=262144
forceRegister=true
enableConsumeQueueExt=false
longPollingEnable=true
serverWorkerThreads=8
messageIndexSafe=false
deleteConsumeQueueFilesInterval=100
haSlaveFallbehindMax=268435456
serverCallbackExecutorThreads=0
flushCommitLogThoroughInterval=10000
isEnableBatchPush=false
commercialTimerCount=1
enableDLegerCommitLog=false
useTLS=false
redeleteHangedFileInterval=120000
flushIntervalCommitLog=500
rocketmqHome=/opt/rocketmq
queryMessageThreadPoolNums=10
messageStorePlugIn=
serverChannelMaxIdleTimeSeconds=120
maxIndexNum=20000000
filterDataCleanTimeSpan=86400000
filterServerNums=0
commitCommitLogLeastPages=4
waitTimeMillsInPullQueue=5000
haSendHeartbeatInterval=5000
processReplyMessageThreadPoolNums=20
clientChannelMaxIdleTimeSeconds=120
filterSupportRetry=false
flushDelayOffsetInterval=10000
duplicationEnable=false
replyThreadPoolQueueCapacity=10000
offsetCheckInSlave=false
clientCloseSocketIfTimeout=false
transientStorePoolSize=5
waitTimeMillsInSendQueue=200
warmMapedFileEnable=false
endTransactionThreadPoolNums=12
flushCommitLogTimed=false
flushLeastPagesWhenWarmMapedFile=4096
clientWorkerThreads=4
endTransactionPoolQueueCapacity=100000
registerNameServerPeriod=30000
registerBrokerTimeoutMills=6000
accessMessageInMemoryMaxRatio=40
highSpeedMode=false
transactionCheckMax=15
checkCRCOnRecover=true
destroyMapedFileIntervalForcibly=120000
brokerIP2=xxx.xxx.xxx.xxx
brokerIP1=xxx.xxx.xxx.xxx
commitIntervalCommitLog=200
clientOnewaySemaphoreValue=65535
storeReplyMessageEnable=true
traceOn=true
clientManageThreadPoolNums=32
channelNotActiveInterval=60000
mappedFileSizeConsumeQueueExt=50331648
consumerManagerThreadPoolQueueCapacity=1000000
serverOnewaySemaphoreValue=256
haListenPort=10912
enableCalcFilterBitMap=false
clientPooledByteBufAllocatorEnable=false
aclEnable=false
storePathRootDir=/root/store
syncFlushTimeout=5000
rejectTransactionMessage=false
commitCommitLogThoroughInterval=200
connectTimeoutMillis=3000
queryThreadPoolQueueCapacity=20000
regionId=DefaultRegion
consumerManageThreadPoolNums=32
disableConsumeIfConsumerReadSlowly=false
flushConsumerOffsetHistoryInterval=60000
fetchNamesrvAddrByAddressServer=false
haTransferBatchSize=32768
compressedRegister=false
storePathCommitLog=/root/store/commitlog
commercialTransCount=1
transactionCheckInterval=60000
mappedFileSizeCommitLog=1073741824
startAcceptSendRequestTimeStamp=0
serverPooledByteBufAllocatorEnable=true
serverAsyncSemaphoreValue=64
autoDeleteUnusedStats=false
waitTimeMillsInTransactionQueue=3000
heartbeatThreadPoolQueueCapacity=50000
deleteWhen=05
bitMapLengthConsumeQueueExt=112
fastFailIfNoBufferInStorePool=false
defaultTopicQueueNums=8
flushConsumeQueueThoroughInterval=60000
notifyConsumerIdsChangedEnable=true
brokerPermission=6
fileReservedTime=48
transferMsgByHeap=true
pullThreadPoolQueueCapacity=100000
brokerId=0
maxTransferCountOnMessageInDisk=8

实验链接:https://developer.aliyun.com/adc/scenario/14828d336c1547b0ab3f7514ca229ea2


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5天前
|
运维 应用服务中间件 Linux
自动化运维的利器:Ansible在配置管理中的应用
【10月更文挑战第39天】本文旨在通过深入浅出的方式,向读者展示如何利用Ansible这一强大的自动化工具来优化日常的运维工作。我们将从基础概念讲起,逐步深入到实战操作,不仅涵盖Ansible的核心功能,还会分享一些高级技巧和最佳实践。无论你是初学者还是有经验的运维人员,这篇文章都会为你提供有价值的信息,帮助你提升工作效率。
|
9天前
|
机器学习/深度学习 数据采集 人工智能
智能化运维在现代IT基础设施中的应用与价值####
本文探讨了智能化运维(AIOps)在现代IT基础设施管理中的实际应用、面临的挑战及其带来的深远影响。通过引入先进的算法和机器学习模型,智能化运维不仅提高了故障检测与响应的速度,还显著优化了资源配置,降低了运营成本,为企业数字化转型提供了强有力的技术支撑。 ####
|
7天前
|
机器学习/深度学习 人工智能 运维
智能运维在现代IT系统中的应用与挑战####
本文深入探讨了智能运维(AIOps)在现代IT系统中的关键作用,通过具体案例分析,揭示了其在提升系统稳定性、优化资源配置及自动化故障处理方面的显著优势。同时,文章也指出了实施智能运维过程中面临的数据安全、技术整合及人员技能转型等挑战,并提出了相应的解决策略,为读者提供了全面而深刻的见解。 ####
31 6
|
6天前
|
运维 监控
构建高效运维体系:从理论到实践
在当今快速发展的信息化时代,高效的运维体系是保障企业信息系统稳定运行的关键。本文旨在探讨如何构建一个高效、可靠的运维体系,通过分析当前运维面临的挑战,提出相应的解决策略,并结合实际案例,展示这些策略的实施效果。文章首先介绍了高效运维的重要性,接着分析了运维过程中常见的问题,然后详细阐述了构建高效运维体系的策略和步骤,最后通过一个实际案例来验证这些策略的有效性。
|
7天前
|
人工智能 运维 监控
智能运维在现代数据中心的应用与挑战
随着云计算和大数据技术的迅猛发展,现代数据中心的运维管理面临着前所未有的挑战。本文探讨了智能运维技术在数据中心中的应用,包括自动化监控、故障预测与诊断、资源优化等方面,并分析了当前面临的主要挑战,如数据安全、系统集成复杂性等。通过实际案例分析,展示了智能运维如何帮助数据中心提高效率、降低成本,并提出了未来发展趋势和建议。
|
8天前
|
运维 Ubuntu 应用服务中间件
自动化运维工具Ansible的实战应用
【10月更文挑战第36天】在现代IT基础设施管理中,自动化运维已成为提升效率、减少人为错误的关键手段。本文通过介绍Ansible这一流行的自动化工具,旨在揭示其在简化日常运维任务中的实际应用价值。文章将围绕Ansible的核心概念、安装配置以及具体使用案例展开,帮助读者构建起自动化运维的初步认识,并激发对更深入内容的学习兴趣。
30 4
|
6天前
|
机器学习/深度学习 数据采集 人工智能
智能运维:从自动化到AIOps的演进与实践####
本文探讨了智能运维(AIOps)的兴起背景、核心组件及其在现代IT运维中的应用。通过对比传统运维模式,阐述了AIOps如何利用机器学习、大数据分析等技术,实现故障预测、根因分析、自动化修复等功能,从而提升系统稳定性和运维效率。文章还深入分析了实施AIOps面临的挑战与解决方案,并展望了其未来发展趋势。 ####
|
7天前
|
运维 安全 应用服务中间件
自动化运维的利剑:Ansible在配置管理中的应用
【10月更文挑战第37天】本文将深入探讨如何利用Ansible简化和自动化复杂的IT基础设施管理任务。我们将通过实际案例,展示如何用Ansible编写可重用的配置代码,以及这些代码如何帮助运维团队提高效率和减少人为错误。文章还将讨论如何构建Ansible playbook来自动部署应用、管理系统更新和执行常规维护任务。准备好深入了解这个强大的工具,让你的运维工作更加轻松吧!
23 2
|
14天前
|
数据采集 机器学习/深度学习 运维
智能化运维在现代IT系统中的应用与挑战####
【10月更文挑战第29天】 本文探讨了智能化运维(AIOps)在现代IT系统中的重要作用及其面临的主要挑战。通过引入机器学习和大数据分析,智能化运维能显著提高系统稳定性、降低运营成本,并增强故障预测能力。然而,数据质量、技术整合及安全性等问题仍是其广泛应用的主要障碍。本文详细分析了这些挑战,并提出了相应的解决方案和未来发展趋势。 ####
36 5
|
14天前
|
人工智能 运维 监控
构建高效运维体系:理论与实践的深度融合####
本文旨在探讨高效IT运维体系的构建策略,通过理论框架与实际案例并重的方式,深入剖析了现代企业面临的运维挑战。文章开篇概述了当前运维领域的新趋势,包括自动化、智能化及DevOps文化的兴起,随后详细阐述了如何将这些先进理念融入日常运维管理中,形成一套既灵活又稳定的运维机制。特别地,文中强调了数据驱动决策的重要性,以及在快速迭代的技术环境中保持持续学习与适应的必要性。最终,通过对比分析几个典型企业的运维转型实例,提炼出可复制的成功模式,为读者提供具有实操性的指导建议。 ####

相关产品

  • 云消息队列 MQ