RocketMQ的常规运维实践应用

简介: RocketMQ的常规运维实践应用

RocketMQ的常规运维实践应用


1. 实验环境说明

实验环境

  1. 体验手册。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验操作手册。

  1. 云产品资源。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验环境使用的云服务器(ECS实例),并挂载弹性IP, 可通过公网在本地访问。弹性IP需要大家记住如何查看到,后续的实验会用到。

  1. 体验报告。

小伟老师希望大家多多填写,我们多多优化,帮助大家快速方便的通过实验了解RocketMQ。

  1. 实验环境。

实验室为您提供一个云服务器ECS实例,操作系统为Alibaba Cloud Linux 2.1903 64位版本。

  1. 实验体验时间。

体验时间一般为一个小时。

  1. 实验环境功能栏。

功能栏一般包括全屏、切换至Web Terminal、FAQ、热门问题、主题色、钉钉交流群二维码和问题反馈等七个功能。

  1. 编写实验报告。

填写实验报告,帮助大家快速通过实验了解RocketMQ。

实验帮助

如果您在使用RocketMQ实验时有需要咨询的问题,可以扫描二维码加入钉钉钉钉群。

启动一个集群

  1. 启动namesrv。观察到启动成功的日志后, ctrl + c,终止日志输出。
cd /usr/local/services/5-rocketmq/namesrv-01
restart.sh
  1. 启动broker

修改broker配置项brokerIP1为实验公网IP,启动broker。观察到启动成功的日志后, ctrl + c,终止日志输出。

操作命令如下:

cd /usr/local/services/5-rocketmq/broker-01
vim ./conf/broker.conf 
restart.sh


2. 如何编译、部署RocketMQ dashboard

如何编译、部署RocketMQ dashboard。

本教程会以如何利用源码编译并打包RocketMQ为例, 演示如何下载、编译任意版本的RocketMQ.

0. 启动一个集群 (已启动则不用操作)

1. 安装git,jdk, maven等工具(当前实验环境已经安装好。参考或者baidu/google)

  • jdk安装
  • maven安装

2. 下载最新release代码(这里以git为例,如果没有安装git直接从github release页面下载)

在自己电脑上, 进入命令行, 选择一个保存源码的目录, 这里我把源码保存到 /tiger/tmp为例

2.1 创建代码保存目录(已创建则不操作)并进入代码保存目录:

mkdir -p /tiger/tmp
cd /tiger/tmp

2.2 克隆master代码

git clone https://github.com/apache/rocketmq-dashboard.git
or 
git clone https://gitee.com/mirrors_apache/rocketmq-dashboard.git

2.3 进入源码根目录:

cd rocketmq-dashboard

可以看到如下代码信息:

3. 编译和打包源码(这里面包含前后端一起打包,前端yarn, node, npm都已经在实验环境准备好了,及时下载大概率会失败)

  • 修改dashboard配置的namesrv地址
vim src/main/resources/application.yml

  • 执行打包命令
mvn clean package -Dmaven.test.skip=true

看到如下结果就是成功了

编译打包成功后, 我们执行:

cd target
ls -l

查看打包结果

  • 运行dashboard
java -Xms400m -Xmx400m -Xmn400m  -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar


3. 如何查看和扩容Topic

扩容Topic,有多种方式:

  1. 在topic所在broker节点上增加queue数量
  2. 在topic所在broker节点外,已存在的broker节点上增加queue数量。
  3. 新添加broker节点,将增加的queue放到新broker节点。

本节主要讲解第一种, 一般的,增加queue数量可以增大生产和消费吞吐量。

0. 启动一个集群和安装dashboard (已完成则不用操作)

创建topic

通过访问实验环境http://公网IP:30904, 可以访问到实验环境部署的dashboard。创建一个topic, 配置如下

查看topic的队列数

这里显示2个队列都在broker-a上。

增加topic队列数

增加队列和创建一样, 可以选择选择新增加的队列分布在哪些集群或者哪些broker上。 这里我们只有一个broker,

保持集群、broker不变。预期增加的队列都在broker-a上。

增加队列成功后,我们再次查看topic状态, 发现队列已经变化。

一般的, 增加队列数,可以增大消费效率。 同一个消费者组中的消费者个数最大为队列数,一般消费能力随着队列数增大而增加。


4. 如何重置消费位点

重置消费位点,是消费者想重新消费部分消息,此时可以通过社区的dashboard直接操作。

前提是消费者已经做好消费幂等。

下面简单总结了重置消费位点的流程:

通过上图可以看出重置消费位点需要broker,client都支持才能进行,下面我们分享下操作过程。

扩容Topic,这里是狭义的扩容, 指增加topic的queue数量,以此来对生产和消费效率产生影响。

0. 启动一个集群和安装dashboard (已完成则不用操作)

查找到需要重置位点的topic

通过访问实验环境http://公网IP:30904, 可以访问到实验环境部署的dashboard。查找到需要重置位点的topic,点击重置消费位点

选择需要重置消费位点的消费者组和位点起始时间。

注意:

2.1 消费者有消费幂等实现,如果没有的话,位点重置后,可能带来数据重复等问题。

2.2 一个topic可能被多个消费者组订阅,需要准确找到你需要重置的那个消费者组名字

2.3 每一个消息有一个位点对应,每一个位点有一个距离其最近的时间。所以这里的时间并不能完全对应位点值,只是一个预估。

观察重置后旧的消息是否被消费者重新消费。这个过程可能需要一点时间, 耐心等待观察结果

一般的,重置位点一般有2种场景可以用到

  1. 业务消费逻辑有bug,修复后需要重新处理之前业务消息
  2. 流计算中,重放数据。


5. 如何动态修改Broker配置

动态修改Broker配置可以动态生效,并且可以帮你快速了解Broker的全部配置项。rocketmq发布的工具包中有单独的命令可以支持。

下面以maxMsgsNumBatch配置项为例,我们修改下这个配置值。

0. 启动一个集群 (已完成则不用操作)

1. 进入一个broker的根目录,可以看到如下目录结构

2. 查看一个broker全部配置值。

bin/mqadmin getBrokerConfig -b 127.0.0.1:10911

-b : 后面跟broker ip:port

查出的结果会类似:

3. 更新maxMsgsNumBatch的值为64

bin/mqadmin updateBrokerConfig -b 127.0.0.1:10911 -k maxMsgsNumBatch -v 64

更新成功后会打印:

4. 再通过第2步的命令查看,值是否被改变了。

一般的, 只要调用过更新broker配置接口, broker都会持久化一份全部配置到broker.conf(配置文件名以启动命令参数为准),我们登录broker可以看到。

以下是我本地的一份配置demo,大家参考

serverSelectorThreads=3
brokerRole=ASYNC_MASTER
serverSocketRcvBufSize=131072
osPageCacheBusyTimeOutMills=1000
shortPollingTimeMills=1000
clientSocketRcvBufSize=131072
clusterTopicEnable=true
brokerTopicEnable=true
autoCreateTopicEnable=true
maxErrorRateOfBloomFilter=20
maxMsgsNumBatch=64
cleanResourceInterval=10000
commercialBaseCount=1
maxTransferCountOnMessageInMemory=32
brokerFastFailureEnable=true
brokerClusterName=DefaultCluster
flushDiskType=ASYNC_FLUSH
commercialBigCount=1
mappedFileSizeConsumeQueue=6000000
consumerFallbehindThreshold=17179869184
autoCreateSubscriptionGroup=true
transientStorePoolEnable=false
flushConsumerOffsetInterval=5000
waitTimeMillsInHeartbeatQueue=31000
diskMaxUsedSpaceRatio=75
flushCommitLogLeastPages=4
cleanFileForciblyEnable=true
slaveReadEnable=false
msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
expectConsumerNumUseFilter=32
traceTopicEnable=false
useEpollNativeSelector=false
enablePropertyFilter=true
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
deleteCommitLogFilesInterval=100
brokerName=broker-a
maxTransferBytesOnMessageInDisk=65536
listenPort=10911
flushConsumeQueueLeastPages=2
pullMessageThreadPoolNums=20
useReentrantLockWhenPutMessage=false
flushIntervalConsumeQueue=1000
sendThreadPoolQueueCapacity=10000
debugLockEnable=false
haHousekeepingInterval=20000
diskFallRecorded=true
messageIndexEnable=true
clientAsyncSemaphoreValue=65535
clientCallbackExecutorThreads=2
putMsgIndexHightWater=600000
sendMessageThreadPoolNums=1
clientManagerThreadPoolQueueCapacity=1000000
serverSocketSndBufSize=131072
maxDelayTime=40
clientSocketSndBufSize=131072
namesrvAddr=localhost:9876
commercialEnable=true
maxHashSlotNum=5000000
heartbeatThreadPoolNums=2
transactionTimeOut=6000
maxMessageSize=4194304
adminBrokerThreadPoolNums=16
defaultQueryMaxNum=32
maxTransferBytesOnMessageInMemory=262144
forceRegister=true
enableConsumeQueueExt=false
longPollingEnable=true
serverWorkerThreads=8
messageIndexSafe=false
deleteConsumeQueueFilesInterval=100
haSlaveFallbehindMax=268435456
serverCallbackExecutorThreads=0
flushCommitLogThoroughInterval=10000
isEnableBatchPush=false
commercialTimerCount=1
enableDLegerCommitLog=false
useTLS=false
redeleteHangedFileInterval=120000
flushIntervalCommitLog=500
rocketmqHome=/opt/rocketmq
queryMessageThreadPoolNums=10
messageStorePlugIn=
serverChannelMaxIdleTimeSeconds=120
maxIndexNum=20000000
filterDataCleanTimeSpan=86400000
filterServerNums=0
commitCommitLogLeastPages=4
waitTimeMillsInPullQueue=5000
haSendHeartbeatInterval=5000
processReplyMessageThreadPoolNums=20
clientChannelMaxIdleTimeSeconds=120
filterSupportRetry=false
flushDelayOffsetInterval=10000
duplicationEnable=false
replyThreadPoolQueueCapacity=10000
offsetCheckInSlave=false
clientCloseSocketIfTimeout=false
transientStorePoolSize=5
waitTimeMillsInSendQueue=200
warmMapedFileEnable=false
endTransactionThreadPoolNums=12
flushCommitLogTimed=false
flushLeastPagesWhenWarmMapedFile=4096
clientWorkerThreads=4
endTransactionPoolQueueCapacity=100000
registerNameServerPeriod=30000
registerBrokerTimeoutMills=6000
accessMessageInMemoryMaxRatio=40
highSpeedMode=false
transactionCheckMax=15
checkCRCOnRecover=true
destroyMapedFileIntervalForcibly=120000
brokerIP2=xxx.xxx.xxx.xxx
brokerIP1=xxx.xxx.xxx.xxx
commitIntervalCommitLog=200
clientOnewaySemaphoreValue=65535
storeReplyMessageEnable=true
traceOn=true
clientManageThreadPoolNums=32
channelNotActiveInterval=60000
mappedFileSizeConsumeQueueExt=50331648
consumerManagerThreadPoolQueueCapacity=1000000
serverOnewaySemaphoreValue=256
haListenPort=10912
enableCalcFilterBitMap=false
clientPooledByteBufAllocatorEnable=false
aclEnable=false
storePathRootDir=/root/store
syncFlushTimeout=5000
rejectTransactionMessage=false
commitCommitLogThoroughInterval=200
connectTimeoutMillis=3000
queryThreadPoolQueueCapacity=20000
regionId=DefaultRegion
consumerManageThreadPoolNums=32
disableConsumeIfConsumerReadSlowly=false
flushConsumerOffsetHistoryInterval=60000
fetchNamesrvAddrByAddressServer=false
haTransferBatchSize=32768
compressedRegister=false
storePathCommitLog=/root/store/commitlog
commercialTransCount=1
transactionCheckInterval=60000
mappedFileSizeCommitLog=1073741824
startAcceptSendRequestTimeStamp=0
serverPooledByteBufAllocatorEnable=true
serverAsyncSemaphoreValue=64
autoDeleteUnusedStats=false
waitTimeMillsInTransactionQueue=3000
heartbeatThreadPoolQueueCapacity=50000
deleteWhen=05
bitMapLengthConsumeQueueExt=112
fastFailIfNoBufferInStorePool=false
defaultTopicQueueNums=8
flushConsumeQueueThoroughInterval=60000
notifyConsumerIdsChangedEnable=true
brokerPermission=6
fileReservedTime=48
transferMsgByHeap=true
pullThreadPoolQueueCapacity=100000
brokerId=0
maxTransferCountOnMessageInDisk=8

实验链接:https://developer.aliyun.com/adc/scenario/14828d336c1547b0ab3f7514ca229ea2


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
17天前
|
机器学习/深度学习 人工智能 运维
智能运维加速交付:应用上线别再慢吞吞
智能运维加速交付:应用上线别再慢吞吞
65 2
|
13天前
|
数据采集 运维 数据可视化
AR 运维系统与 MES、EMA、IoT 系统的融合架构与实践
AR运维系统融合IoT、EMA、MES数据,构建“感知-分析-决策-执行”闭环。通过AR终端实现设备数据可视化,实时呈现温度、工单等信息,提升运维效率与生产可靠性。(238字)
|
2月前
|
存储 运维 安全
运维知识沉淀工具深度解析:从结构设计到落地实践全拆解
运维知识沉淀工具助力团队将零散经验结构化存储,实现问题处理路径标准化、知识复用化。通过标签、模板与自动化调取机制,让每次处理都留下可复用资产,提升团队协同效率与系统稳定性。
|
18天前
|
运维 Kubernetes 测试技术
应用多、交付快,研发运维怎么管?看云效+SAE 如何一站式破局
通过在云效中创建 SAE 服务连接并关联集群,团队可将应用环境直接部署到 SAE,实现从代码提交、镜像构建到 SAE 部署的自动化流水线。该集成打通了研发与运维的壁垒,特别适用于应用数量多、团队规模大、交付节奏快的组织,助力企业实现敏捷、可靠的持续交付。
|
1月前
|
机器学习/深度学习 人工智能 运维
三重Reward驱动的运维智能体进化:多智能体、上下文工程与强化学习的融合实践
这篇文章系统性地阐述了 AI 原生时代下,面向技术风险领域的智能体系统(DeRisk)的架构设计、核心理念、关键技术演进路径与实践落地案例。
三重Reward驱动的运维智能体进化:多智能体、上下文工程与强化学习的融合实践
|
3月前
|
运维 监控 应用服务中间件
运维打铁: Ruby 脚本在运维自动化中的应用探索
Ruby 是一种简洁、动态类型的编程语言,适合运维自动化任务。本文介绍了其在服务器配置管理、定时任务执行和日志分析处理中的应用,并提供了代码示例,展示了 Ruby 在运维自动化中的实际价值。
105 2
|
2月前
|
运维 数据可视化 vr&ar
AR远程协作在发电领域的运维应用方案
发电厂面临设备故障频发、运维人员经验不足、远程支持困难及维护成本高昂等挑战。为提升运维效率与设备可靠性,越来越多电厂开始采用增强现实(AR)远程协作技术。通过AR设备,现场人员可与远程专家实时协作,实现快速故障诊断与修复、可视化操作指导和精准培训支持。AR技术不仅缩短停机时间,降低运维成本,还提升了应急响应能力与决策效率,助力发电行业向智能化、高效化方向发展。
|
2月前
|
人工智能 运维 监控
云+应用一体化可观测:破局“云上困境”,让运维驱动业务增长
当云计算迈入深入上云新阶段,数智化升级的关键课题已从“简单上云”演进至“精细治云”。随着企业对云计算的依赖日益加深,如何高效管理云端资源及其稳定性成为新的挑战。为此,阿里云推出云+应用一体化可观测方案,通过阿里云应用运维平台(Application Operation Platform,简称“AOP”)构建覆盖应用全生命周期一体化可观测产品体系,推动运维模式由被动响应向主动预防转变,实现故障的快速发现、定界与恢复,保障云上业务稳定运行。 目前,该方案已成功服务超过50家行业头部客户,为政务云平台、金融核心系统、能源调度中枢等关键基础设施提供全天候安全运维保障。
|
3月前
|
运维 监控 负载均衡
高效运维实践:常见问题的应对策略与实践经验
本文探讨了运维工作中的五大核心挑战及应对策略,涵盖负载均衡优化、数据库性能提升、系统监控预警、容器化与微服务运维等方面,旨在帮助企业提升系统稳定性与运维效率。
|
3月前
|
运维 监控 安全
从实践到自动化:现代运维管理的转型与挑战
本文探讨了现代运维管理从传统人工模式向自动化转型的必要性与路径,分析了传统运维的痛点,如效率低、响应慢、依赖经验等问题,并介绍了自动化运维在提升效率、降低成本、增强系统稳定性与安全性方面的优势。结合技术工具与实践案例,文章展示了企业如何通过自动化实现运维升级,推动数字化转型,提升业务竞争力。

相关产品

  • 云消息队列 MQ