RocketMQ的常规运维实践应用

简介: RocketMQ的常规运维实践应用

RocketMQ的常规运维实践应用


1. 实验环境说明

实验环境

  1. 体验手册。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验操作手册。

  1. 云产品资源。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验环境使用的云服务器(ECS实例),并挂载弹性IP, 可通过公网在本地访问。弹性IP需要大家记住如何查看到,后续的实验会用到。

  1. 体验报告。

小伟老师希望大家多多填写,我们多多优化,帮助大家快速方便的通过实验了解RocketMQ。

  1. 实验环境。

实验室为您提供一个云服务器ECS实例,操作系统为Alibaba Cloud Linux 2.1903 64位版本。

  1. 实验体验时间。

体验时间一般为一个小时。

  1. 实验环境功能栏。

功能栏一般包括全屏、切换至Web Terminal、FAQ、热门问题、主题色、钉钉交流群二维码和问题反馈等七个功能。

  1. 编写实验报告。

填写实验报告,帮助大家快速通过实验了解RocketMQ。

实验帮助

如果您在使用RocketMQ实验时有需要咨询的问题,可以扫描二维码加入钉钉钉钉群。

启动一个集群

  1. 启动namesrv。观察到启动成功的日志后, ctrl + c,终止日志输出。
cd /usr/local/services/5-rocketmq/namesrv-01
restart.sh
  1. 启动broker

修改broker配置项brokerIP1为实验公网IP,启动broker。观察到启动成功的日志后, ctrl + c,终止日志输出。

操作命令如下:

cd /usr/local/services/5-rocketmq/broker-01
vim ./conf/broker.conf 
restart.sh


2. 如何编译、部署RocketMQ dashboard

如何编译、部署RocketMQ dashboard。

本教程会以如何利用源码编译并打包RocketMQ为例, 演示如何下载、编译任意版本的RocketMQ.

0. 启动一个集群 (已启动则不用操作)

1. 安装git,jdk, maven等工具(当前实验环境已经安装好。参考或者baidu/google)

  • jdk安装
  • maven安装

2. 下载最新release代码(这里以git为例,如果没有安装git直接从github release页面下载)

在自己电脑上, 进入命令行, 选择一个保存源码的目录, 这里我把源码保存到 /tiger/tmp为例

2.1 创建代码保存目录(已创建则不操作)并进入代码保存目录:

mkdir -p /tiger/tmp
cd /tiger/tmp

2.2 克隆master代码

git clone https://github.com/apache/rocketmq-dashboard.git
or 
git clone https://gitee.com/mirrors_apache/rocketmq-dashboard.git

2.3 进入源码根目录:

cd rocketmq-dashboard

可以看到如下代码信息:

3. 编译和打包源码(这里面包含前后端一起打包,前端yarn, node, npm都已经在实验环境准备好了,及时下载大概率会失败)

  • 修改dashboard配置的namesrv地址
vim src/main/resources/application.yml

  • 执行打包命令
mvn clean package -Dmaven.test.skip=true

看到如下结果就是成功了

编译打包成功后, 我们执行:

cd target
ls -l

查看打包结果

  • 运行dashboard
java -Xms400m -Xmx400m -Xmn400m  -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar


3. 如何查看和扩容Topic

扩容Topic,有多种方式:

  1. 在topic所在broker节点上增加queue数量
  2. 在topic所在broker节点外,已存在的broker节点上增加queue数量。
  3. 新添加broker节点,将增加的queue放到新broker节点。

本节主要讲解第一种, 一般的,增加queue数量可以增大生产和消费吞吐量。

0. 启动一个集群和安装dashboard (已完成则不用操作)

创建topic

通过访问实验环境http://公网IP:30904, 可以访问到实验环境部署的dashboard。创建一个topic, 配置如下

查看topic的队列数

这里显示2个队列都在broker-a上。

增加topic队列数

增加队列和创建一样, 可以选择选择新增加的队列分布在哪些集群或者哪些broker上。 这里我们只有一个broker,

保持集群、broker不变。预期增加的队列都在broker-a上。

增加队列成功后,我们再次查看topic状态, 发现队列已经变化。

一般的, 增加队列数,可以增大消费效率。 同一个消费者组中的消费者个数最大为队列数,一般消费能力随着队列数增大而增加。


4. 如何重置消费位点

重置消费位点,是消费者想重新消费部分消息,此时可以通过社区的dashboard直接操作。

前提是消费者已经做好消费幂等。

下面简单总结了重置消费位点的流程:

通过上图可以看出重置消费位点需要broker,client都支持才能进行,下面我们分享下操作过程。

扩容Topic,这里是狭义的扩容, 指增加topic的queue数量,以此来对生产和消费效率产生影响。

0. 启动一个集群和安装dashboard (已完成则不用操作)

查找到需要重置位点的topic

通过访问实验环境http://公网IP:30904, 可以访问到实验环境部署的dashboard。查找到需要重置位点的topic,点击重置消费位点

选择需要重置消费位点的消费者组和位点起始时间。

注意:

2.1 消费者有消费幂等实现,如果没有的话,位点重置后,可能带来数据重复等问题。

2.2 一个topic可能被多个消费者组订阅,需要准确找到你需要重置的那个消费者组名字

2.3 每一个消息有一个位点对应,每一个位点有一个距离其最近的时间。所以这里的时间并不能完全对应位点值,只是一个预估。

观察重置后旧的消息是否被消费者重新消费。这个过程可能需要一点时间, 耐心等待观察结果

一般的,重置位点一般有2种场景可以用到

  1. 业务消费逻辑有bug,修复后需要重新处理之前业务消息
  2. 流计算中,重放数据。


5. 如何动态修改Broker配置

动态修改Broker配置可以动态生效,并且可以帮你快速了解Broker的全部配置项。rocketmq发布的工具包中有单独的命令可以支持。

下面以maxMsgsNumBatch配置项为例,我们修改下这个配置值。

0. 启动一个集群 (已完成则不用操作)

1. 进入一个broker的根目录,可以看到如下目录结构

2. 查看一个broker全部配置值。

bin/mqadmin getBrokerConfig -b 127.0.0.1:10911

-b : 后面跟broker ip:port

查出的结果会类似:

3. 更新maxMsgsNumBatch的值为64

bin/mqadmin updateBrokerConfig -b 127.0.0.1:10911 -k maxMsgsNumBatch -v 64

更新成功后会打印:

4. 再通过第2步的命令查看,值是否被改变了。

一般的, 只要调用过更新broker配置接口, broker都会持久化一份全部配置到broker.conf(配置文件名以启动命令参数为准),我们登录broker可以看到。

以下是我本地的一份配置demo,大家参考

serverSelectorThreads=3
brokerRole=ASYNC_MASTER
serverSocketRcvBufSize=131072
osPageCacheBusyTimeOutMills=1000
shortPollingTimeMills=1000
clientSocketRcvBufSize=131072
clusterTopicEnable=true
brokerTopicEnable=true
autoCreateTopicEnable=true
maxErrorRateOfBloomFilter=20
maxMsgsNumBatch=64
cleanResourceInterval=10000
commercialBaseCount=1
maxTransferCountOnMessageInMemory=32
brokerFastFailureEnable=true
brokerClusterName=DefaultCluster
flushDiskType=ASYNC_FLUSH
commercialBigCount=1
mappedFileSizeConsumeQueue=6000000
consumerFallbehindThreshold=17179869184
autoCreateSubscriptionGroup=true
transientStorePoolEnable=false
flushConsumerOffsetInterval=5000
waitTimeMillsInHeartbeatQueue=31000
diskMaxUsedSpaceRatio=75
flushCommitLogLeastPages=4
cleanFileForciblyEnable=true
slaveReadEnable=false
msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
expectConsumerNumUseFilter=32
traceTopicEnable=false
useEpollNativeSelector=false
enablePropertyFilter=true
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
deleteCommitLogFilesInterval=100
brokerName=broker-a
maxTransferBytesOnMessageInDisk=65536
listenPort=10911
flushConsumeQueueLeastPages=2
pullMessageThreadPoolNums=20
useReentrantLockWhenPutMessage=false
flushIntervalConsumeQueue=1000
sendThreadPoolQueueCapacity=10000
debugLockEnable=false
haHousekeepingInterval=20000
diskFallRecorded=true
messageIndexEnable=true
clientAsyncSemaphoreValue=65535
clientCallbackExecutorThreads=2
putMsgIndexHightWater=600000
sendMessageThreadPoolNums=1
clientManagerThreadPoolQueueCapacity=1000000
serverSocketSndBufSize=131072
maxDelayTime=40
clientSocketSndBufSize=131072
namesrvAddr=localhost:9876
commercialEnable=true
maxHashSlotNum=5000000
heartbeatThreadPoolNums=2
transactionTimeOut=6000
maxMessageSize=4194304
adminBrokerThreadPoolNums=16
defaultQueryMaxNum=32
maxTransferBytesOnMessageInMemory=262144
forceRegister=true
enableConsumeQueueExt=false
longPollingEnable=true
serverWorkerThreads=8
messageIndexSafe=false
deleteConsumeQueueFilesInterval=100
haSlaveFallbehindMax=268435456
serverCallbackExecutorThreads=0
flushCommitLogThoroughInterval=10000
isEnableBatchPush=false
commercialTimerCount=1
enableDLegerCommitLog=false
useTLS=false
redeleteHangedFileInterval=120000
flushIntervalCommitLog=500
rocketmqHome=/opt/rocketmq
queryMessageThreadPoolNums=10
messageStorePlugIn=
serverChannelMaxIdleTimeSeconds=120
maxIndexNum=20000000
filterDataCleanTimeSpan=86400000
filterServerNums=0
commitCommitLogLeastPages=4
waitTimeMillsInPullQueue=5000
haSendHeartbeatInterval=5000
processReplyMessageThreadPoolNums=20
clientChannelMaxIdleTimeSeconds=120
filterSupportRetry=false
flushDelayOffsetInterval=10000
duplicationEnable=false
replyThreadPoolQueueCapacity=10000
offsetCheckInSlave=false
clientCloseSocketIfTimeout=false
transientStorePoolSize=5
waitTimeMillsInSendQueue=200
warmMapedFileEnable=false
endTransactionThreadPoolNums=12
flushCommitLogTimed=false
flushLeastPagesWhenWarmMapedFile=4096
clientWorkerThreads=4
endTransactionPoolQueueCapacity=100000
registerNameServerPeriod=30000
registerBrokerTimeoutMills=6000
accessMessageInMemoryMaxRatio=40
highSpeedMode=false
transactionCheckMax=15
checkCRCOnRecover=true
destroyMapedFileIntervalForcibly=120000
brokerIP2=xxx.xxx.xxx.xxx
brokerIP1=xxx.xxx.xxx.xxx
commitIntervalCommitLog=200
clientOnewaySemaphoreValue=65535
storeReplyMessageEnable=true
traceOn=true
clientManageThreadPoolNums=32
channelNotActiveInterval=60000
mappedFileSizeConsumeQueueExt=50331648
consumerManagerThreadPoolQueueCapacity=1000000
serverOnewaySemaphoreValue=256
haListenPort=10912
enableCalcFilterBitMap=false
clientPooledByteBufAllocatorEnable=false
aclEnable=false
storePathRootDir=/root/store
syncFlushTimeout=5000
rejectTransactionMessage=false
commitCommitLogThoroughInterval=200
connectTimeoutMillis=3000
queryThreadPoolQueueCapacity=20000
regionId=DefaultRegion
consumerManageThreadPoolNums=32
disableConsumeIfConsumerReadSlowly=false
flushConsumerOffsetHistoryInterval=60000
fetchNamesrvAddrByAddressServer=false
haTransferBatchSize=32768
compressedRegister=false
storePathCommitLog=/root/store/commitlog
commercialTransCount=1
transactionCheckInterval=60000
mappedFileSizeCommitLog=1073741824
startAcceptSendRequestTimeStamp=0
serverPooledByteBufAllocatorEnable=true
serverAsyncSemaphoreValue=64
autoDeleteUnusedStats=false
waitTimeMillsInTransactionQueue=3000
heartbeatThreadPoolQueueCapacity=50000
deleteWhen=05
bitMapLengthConsumeQueueExt=112
fastFailIfNoBufferInStorePool=false
defaultTopicQueueNums=8
flushConsumeQueueThoroughInterval=60000
notifyConsumerIdsChangedEnable=true
brokerPermission=6
fileReservedTime=48
transferMsgByHeap=true
pullThreadPoolQueueCapacity=100000
brokerId=0
maxTransferCountOnMessageInDisk=8

实验链接:https://developer.aliyun.com/adc/scenario/14828d336c1547b0ab3f7514ca229ea2


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4天前
|
机器学习/深度学习 数据采集 人工智能
智能化运维:AI在故障预测中的应用
【5月更文挑战第31天】本文探讨了人工智能(AI)技术在运维领域的应用,特别是如何通过机器学习和数据分析实现故障预测。文章首先介绍了智能化运维的概念,然后详细阐述了AI技术在故障预测中的具体应用,最后讨论了实施AI故障预测的挑战和未来发展趋势。
|
20小时前
|
机器学习/深度学习 人工智能 运维
智能化运维的崛起:AI在系统管理中的应用
【6月更文挑战第3天】随着人工智能技术的不断进步,传统的IT运维模式正面临一场革命。本文将探讨智能化运维的概念、优势以及如何利用AI技术提升系统管理的效率和准确性。我们将通过实际案例分析,展示智能化运维在故障预测、自动化处理和性能优化等方面的应用,并讨论其对IT专业人员技能要求的影响。
|
4天前
|
数据采集 人工智能 运维
智能化运维:AI在系统管理中的应用与挑战
【5月更文挑战第32天】随着人工智能技术的飞速发展,其在IT运维领域的应用正逐步改变传统的系统管理模式。本文将探讨AI技术如何优化运维流程、提高效率,并分析在实践中遇到的挑战和解决方案。
|
4天前
|
机器学习/深度学习 人工智能 运维
智能运维:AI在IT管理中的应用与挑战
【5月更文挑战第31天】本文将探讨人工智能(AI)在IT运维领域的应用,分析其带来的效率提升和面临的挑战。文章将详细介绍AI技术如何优化传统的运维流程,提高故障预测的准确性,并自动化常规任务。同时,也将讨论在实施AI解决方案时可能遇到的技术和伦理问题。
|
4天前
|
机器学习/深度学习 人工智能 运维
智能化运维:AI在系统管理中的应用与挑战
【5月更文挑战第31天】随着人工智能技术的不断进步,其在运维领域的应用已成为提升效率、预测故障和自动化处理任务的关键。本文将探讨AI如何改变传统运维模式,实现智能化监控、故障预测及自愈能力,同时分析面临的技术挑战与未来发展趋势。
|
6天前
|
运维 Kubernetes 持续交付
构建高效自动化运维体系:基于容器技术的持续集成与持续部署实践
【5月更文挑战第30天】随着云计算和微服务架构的兴起,传统的运维模式已难以满足快速迭代和高可用性的需求。本文探讨了如何利用容器技术构建一个高效、可靠的自动化运维体系,重点分析了Docker和Kubernetes在这一过程中的关键作用,并提出了一套基于这些技术的持续集成(CI)与持续部署(CD)解决方案。通过实际案例和操作步骤的详细阐述,文章为读者提供了一种实现自动化运维的有效途径,同时对未来运维技术的发展趋势进行了展望。
|
6天前
|
运维 Kubernetes 持续交付
构建高效自动化运维体系:基于Docker和Kubernetes的实践
【5月更文挑战第30天】 在当今的快速迭代和持续部署的软件发布环境中,自动化运维的重要性愈发凸显。本文旨在探讨如何利用容器化技术与微服务架构,特别是Docker和Kubernetes,来构建一个高效、可伸缩且自愈的自动化运维体系。通过详细分析容器化的优势及Kubernetes的集群管理机制,文章将提供一个清晰的指南,帮助读者理解并实现现代软件部署的最佳实践。
|
6天前
|
机器学习/深度学习 人工智能 运维
智能化运维:AI在现代IT管理中的应用
随着人工智能(AI)技术的飞速发展,其在IT运维领域的应用正逐步改变着传统的运维模式。本文将探讨AI技术如何赋能运维工作,提升效率和准确性,并分析其在实际场景中的运用案例。
|
6天前
|
运维 监控 Devops
构建高效自动化运维系统:DevOps在企业级应用的实践
【5月更文挑战第30天】 随着信息技术的飞速发展,企业对软件交付速度和稳定性的要求越来越高。传统的运维模式已无法满足快速迭代和高效稳定的需求,因此,本文将探讨如何通过实施DevOps文化、流程和工具,构建一个高效的自动化运维系统。文章将详细描述DevOps的核心理念、关键技术组件以及如何在组织中落地实施策略,旨在帮助企业提升运维效率,加速产品的上市时间,同时保证系统的高可用性和稳定性。
|
6天前
|
运维 Kubernetes 持续交付
构建高效自动化运维体系:基于容器技术的持续集成与持续部署(CI/CD)实践
【5月更文挑战第30天】 在当今快速迭代的软件开发周期中,自动化运维成为确保交付速度和质量的关键因素。本文聚焦于如何利用容器技术实现高效自动化运维体系,特别是持续集成(CI)与持续部署(CD)的实践。通过深入分析容器化工具如Docker和Kubernetes在自动化流程中的应用,以及它们如何帮助实现环境的一致性、降低部署风险并提高生产效率,本文旨在为运维专业人员提供一套切实可行的参考方案。

热门文章

最新文章

相关产品

  • 云消息队列 MQ