RocketMQ的常规运维实践应用

简介: RocketMQ的常规运维实践应用

RocketMQ的常规运维实践应用


1. 实验环境说明

实验环境

  1. 体验手册。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验操作手册。

  1. 云产品资源。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验环境使用的云服务器(ECS实例),并挂载弹性IP, 可通过公网在本地访问。弹性IP需要大家记住如何查看到,后续的实验会用到。

  1. 体验报告。

小伟老师希望大家多多填写,我们多多优化,帮助大家快速方便的通过实验了解RocketMQ。

  1. 实验环境。

实验室为您提供一个云服务器ECS实例,操作系统为Alibaba Cloud Linux 2.1903 64位版本。

  1. 实验体验时间。

体验时间一般为一个小时。

  1. 实验环境功能栏。

功能栏一般包括全屏、切换至Web Terminal、FAQ、热门问题、主题色、钉钉交流群二维码和问题反馈等七个功能。

  1. 编写实验报告。

填写实验报告,帮助大家快速通过实验了解RocketMQ。

实验帮助

如果您在使用RocketMQ实验时有需要咨询的问题,可以扫描二维码加入钉钉钉钉群。

启动一个集群

  1. 启动namesrv。观察到启动成功的日志后, ctrl + c,终止日志输出。
cd /usr/local/services/5-rocketmq/namesrv-01
restart.sh
  1. 启动broker

修改broker配置项brokerIP1为实验公网IP,启动broker。观察到启动成功的日志后, ctrl + c,终止日志输出。

操作命令如下:

cd /usr/local/services/5-rocketmq/broker-01
vim ./conf/broker.conf 
restart.sh


2. 如何编译、部署RocketMQ dashboard

如何编译、部署RocketMQ dashboard。

本教程会以如何利用源码编译并打包RocketMQ为例, 演示如何下载、编译任意版本的RocketMQ.

0. 启动一个集群 (已启动则不用操作)

1. 安装git,jdk, maven等工具(当前实验环境已经安装好。参考或者baidu/google)

  • jdk安装
  • maven安装

2. 下载最新release代码(这里以git为例,如果没有安装git直接从github release页面下载)

在自己电脑上, 进入命令行, 选择一个保存源码的目录, 这里我把源码保存到 /tiger/tmp为例

2.1 创建代码保存目录(已创建则不操作)并进入代码保存目录:

mkdir -p /tiger/tmp
cd /tiger/tmp

2.2 克隆master代码

git clone https://github.com/apache/rocketmq-dashboard.git
or 
git clone https://gitee.com/mirrors_apache/rocketmq-dashboard.git

2.3 进入源码根目录:

cd rocketmq-dashboard

可以看到如下代码信息:

3. 编译和打包源码(这里面包含前后端一起打包,前端yarn, node, npm都已经在实验环境准备好了,及时下载大概率会失败)

  • 修改dashboard配置的namesrv地址
vim src/main/resources/application.yml

  • 执行打包命令
mvn clean package -Dmaven.test.skip=true

看到如下结果就是成功了

编译打包成功后, 我们执行:

cd target
ls -l

查看打包结果

  • 运行dashboard
java -Xms400m -Xmx400m -Xmn400m  -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar


3. 如何查看和扩容Topic

扩容Topic,有多种方式:

  1. 在topic所在broker节点上增加queue数量
  2. 在topic所在broker节点外,已存在的broker节点上增加queue数量。
  3. 新添加broker节点,将增加的queue放到新broker节点。

本节主要讲解第一种, 一般的,增加queue数量可以增大生产和消费吞吐量。

0. 启动一个集群和安装dashboard (已完成则不用操作)

创建topic

通过访问实验环境http://公网IP:30904, 可以访问到实验环境部署的dashboard。创建一个topic, 配置如下

查看topic的队列数

这里显示2个队列都在broker-a上。

增加topic队列数

增加队列和创建一样, 可以选择选择新增加的队列分布在哪些集群或者哪些broker上。 这里我们只有一个broker,

保持集群、broker不变。预期增加的队列都在broker-a上。

增加队列成功后,我们再次查看topic状态, 发现队列已经变化。

一般的, 增加队列数,可以增大消费效率。 同一个消费者组中的消费者个数最大为队列数,一般消费能力随着队列数增大而增加。


4. 如何重置消费位点

重置消费位点,是消费者想重新消费部分消息,此时可以通过社区的dashboard直接操作。

前提是消费者已经做好消费幂等。

下面简单总结了重置消费位点的流程:

通过上图可以看出重置消费位点需要broker,client都支持才能进行,下面我们分享下操作过程。

扩容Topic,这里是狭义的扩容, 指增加topic的queue数量,以此来对生产和消费效率产生影响。

0. 启动一个集群和安装dashboard (已完成则不用操作)

查找到需要重置位点的topic

通过访问实验环境http://公网IP:30904, 可以访问到实验环境部署的dashboard。查找到需要重置位点的topic,点击重置消费位点

选择需要重置消费位点的消费者组和位点起始时间。

注意:

2.1 消费者有消费幂等实现,如果没有的话,位点重置后,可能带来数据重复等问题。

2.2 一个topic可能被多个消费者组订阅,需要准确找到你需要重置的那个消费者组名字

2.3 每一个消息有一个位点对应,每一个位点有一个距离其最近的时间。所以这里的时间并不能完全对应位点值,只是一个预估。

观察重置后旧的消息是否被消费者重新消费。这个过程可能需要一点时间, 耐心等待观察结果

一般的,重置位点一般有2种场景可以用到

  1. 业务消费逻辑有bug,修复后需要重新处理之前业务消息
  2. 流计算中,重放数据。


5. 如何动态修改Broker配置

动态修改Broker配置可以动态生效,并且可以帮你快速了解Broker的全部配置项。rocketmq发布的工具包中有单独的命令可以支持。

下面以maxMsgsNumBatch配置项为例,我们修改下这个配置值。

0. 启动一个集群 (已完成则不用操作)

1. 进入一个broker的根目录,可以看到如下目录结构

2. 查看一个broker全部配置值。

bin/mqadmin getBrokerConfig -b 127.0.0.1:10911

-b : 后面跟broker ip:port

查出的结果会类似:

3. 更新maxMsgsNumBatch的值为64

bin/mqadmin updateBrokerConfig -b 127.0.0.1:10911 -k maxMsgsNumBatch -v 64

更新成功后会打印:

4. 再通过第2步的命令查看,值是否被改变了。

一般的, 只要调用过更新broker配置接口, broker都会持久化一份全部配置到broker.conf(配置文件名以启动命令参数为准),我们登录broker可以看到。

以下是我本地的一份配置demo,大家参考

serverSelectorThreads=3
brokerRole=ASYNC_MASTER
serverSocketRcvBufSize=131072
osPageCacheBusyTimeOutMills=1000
shortPollingTimeMills=1000
clientSocketRcvBufSize=131072
clusterTopicEnable=true
brokerTopicEnable=true
autoCreateTopicEnable=true
maxErrorRateOfBloomFilter=20
maxMsgsNumBatch=64
cleanResourceInterval=10000
commercialBaseCount=1
maxTransferCountOnMessageInMemory=32
brokerFastFailureEnable=true
brokerClusterName=DefaultCluster
flushDiskType=ASYNC_FLUSH
commercialBigCount=1
mappedFileSizeConsumeQueue=6000000
consumerFallbehindThreshold=17179869184
autoCreateSubscriptionGroup=true
transientStorePoolEnable=false
flushConsumerOffsetInterval=5000
waitTimeMillsInHeartbeatQueue=31000
diskMaxUsedSpaceRatio=75
flushCommitLogLeastPages=4
cleanFileForciblyEnable=true
slaveReadEnable=false
msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
expectConsumerNumUseFilter=32
traceTopicEnable=false
useEpollNativeSelector=false
enablePropertyFilter=true
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
deleteCommitLogFilesInterval=100
brokerName=broker-a
maxTransferBytesOnMessageInDisk=65536
listenPort=10911
flushConsumeQueueLeastPages=2
pullMessageThreadPoolNums=20
useReentrantLockWhenPutMessage=false
flushIntervalConsumeQueue=1000
sendThreadPoolQueueCapacity=10000
debugLockEnable=false
haHousekeepingInterval=20000
diskFallRecorded=true
messageIndexEnable=true
clientAsyncSemaphoreValue=65535
clientCallbackExecutorThreads=2
putMsgIndexHightWater=600000
sendMessageThreadPoolNums=1
clientManagerThreadPoolQueueCapacity=1000000
serverSocketSndBufSize=131072
maxDelayTime=40
clientSocketSndBufSize=131072
namesrvAddr=localhost:9876
commercialEnable=true
maxHashSlotNum=5000000
heartbeatThreadPoolNums=2
transactionTimeOut=6000
maxMessageSize=4194304
adminBrokerThreadPoolNums=16
defaultQueryMaxNum=32
maxTransferBytesOnMessageInMemory=262144
forceRegister=true
enableConsumeQueueExt=false
longPollingEnable=true
serverWorkerThreads=8
messageIndexSafe=false
deleteConsumeQueueFilesInterval=100
haSlaveFallbehindMax=268435456
serverCallbackExecutorThreads=0
flushCommitLogThoroughInterval=10000
isEnableBatchPush=false
commercialTimerCount=1
enableDLegerCommitLog=false
useTLS=false
redeleteHangedFileInterval=120000
flushIntervalCommitLog=500
rocketmqHome=/opt/rocketmq
queryMessageThreadPoolNums=10
messageStorePlugIn=
serverChannelMaxIdleTimeSeconds=120
maxIndexNum=20000000
filterDataCleanTimeSpan=86400000
filterServerNums=0
commitCommitLogLeastPages=4
waitTimeMillsInPullQueue=5000
haSendHeartbeatInterval=5000
processReplyMessageThreadPoolNums=20
clientChannelMaxIdleTimeSeconds=120
filterSupportRetry=false
flushDelayOffsetInterval=10000
duplicationEnable=false
replyThreadPoolQueueCapacity=10000
offsetCheckInSlave=false
clientCloseSocketIfTimeout=false
transientStorePoolSize=5
waitTimeMillsInSendQueue=200
warmMapedFileEnable=false
endTransactionThreadPoolNums=12
flushCommitLogTimed=false
flushLeastPagesWhenWarmMapedFile=4096
clientWorkerThreads=4
endTransactionPoolQueueCapacity=100000
registerNameServerPeriod=30000
registerBrokerTimeoutMills=6000
accessMessageInMemoryMaxRatio=40
highSpeedMode=false
transactionCheckMax=15
checkCRCOnRecover=true
destroyMapedFileIntervalForcibly=120000
brokerIP2=xxx.xxx.xxx.xxx
brokerIP1=xxx.xxx.xxx.xxx
commitIntervalCommitLog=200
clientOnewaySemaphoreValue=65535
storeReplyMessageEnable=true
traceOn=true
clientManageThreadPoolNums=32
channelNotActiveInterval=60000
mappedFileSizeConsumeQueueExt=50331648
consumerManagerThreadPoolQueueCapacity=1000000
serverOnewaySemaphoreValue=256
haListenPort=10912
enableCalcFilterBitMap=false
clientPooledByteBufAllocatorEnable=false
aclEnable=false
storePathRootDir=/root/store
syncFlushTimeout=5000
rejectTransactionMessage=false
commitCommitLogThoroughInterval=200
connectTimeoutMillis=3000
queryThreadPoolQueueCapacity=20000
regionId=DefaultRegion
consumerManageThreadPoolNums=32
disableConsumeIfConsumerReadSlowly=false
flushConsumerOffsetHistoryInterval=60000
fetchNamesrvAddrByAddressServer=false
haTransferBatchSize=32768
compressedRegister=false
storePathCommitLog=/root/store/commitlog
commercialTransCount=1
transactionCheckInterval=60000
mappedFileSizeCommitLog=1073741824
startAcceptSendRequestTimeStamp=0
serverPooledByteBufAllocatorEnable=true
serverAsyncSemaphoreValue=64
autoDeleteUnusedStats=false
waitTimeMillsInTransactionQueue=3000
heartbeatThreadPoolQueueCapacity=50000
deleteWhen=05
bitMapLengthConsumeQueueExt=112
fastFailIfNoBufferInStorePool=false
defaultTopicQueueNums=8
flushConsumeQueueThoroughInterval=60000
notifyConsumerIdsChangedEnable=true
brokerPermission=6
fileReservedTime=48
transferMsgByHeap=true
pullThreadPoolQueueCapacity=100000
brokerId=0
maxTransferCountOnMessageInDisk=8

实验链接:https://developer.aliyun.com/adc/scenario/14828d336c1547b0ab3f7514ca229ea2


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
13天前
|
消息中间件 存储 监控
活动实践 | 快速体验云消息队列RocketMQ版
本方案介绍如何使用阿里云消息队列RocketMQ版Serverless实例进行消息管理。主要步骤包括获取接入点、创建Topic和订阅组、收发消息、查看消息轨迹及仪表盘监控。通过这些操作,用户可以轻松实现消息的全生命周期管理,确保消息收发的高效与可靠。此外,还提供了消费验证、下载消息等功能,方便用户进行详细的消息处理与调试。
|
9天前
|
人工智能 运维 负载均衡
智能运维新时代:AI在云资源管理中的应用与实践
智能运维新时代:AI在云资源管理中的应用与实践
90 23
|
4天前
|
运维 Cloud Native 开发工具
智能运维:云原生大规模集群GitOps实践
智能运维:云原生大规模集群GitOps实践,由阿里云运维专家钟炯恩分享。内容涵盖云原生运维挑战、管理实践、GitOps实践及智能运维体系。通过OAM模型和GitOps优化方案,解决大规模集群的发布效率与稳定性问题,推动智能运维工程演进。适用于云原生环境下的高效运维管理。
|
11天前
|
Kubernetes Java 持续交付
小团队 CI/CD 实践:无需运维,Java Web应用的自动化部署
本文介绍如何使用GitHub Actions和阿里云Kubernetes(ACK)实现Java Web应用的自动化部署。通过CI/CD流程,开发人员无需手动处理复杂的运维任务,从而提高效率并减少错误。文中详细讲解了Docker与Kubernetes的概念,并演示了从创建Kubernetes集群、配置容器镜像服务到设置GitHub仓库Secrets及编写GitHub Actions工作流的具体步骤。最终实现了代码提交后自动构建、推送镜像并部署到Kubernetes集群的功能。整个过程不仅简化了部署流程,还确保了应用在不同环境中的稳定运行。
49 9
|
18天前
|
存储 弹性计算 运维
云端问道 7 期实践教学-使用操作系统智能助手 OS Copilot 轻松运维与编程
使用操作系统智能助手 OS Copilot 轻松运维与编程
44 14
|
1月前
|
消息中间件 Java 开发工具
【实践】快速学会使用云消息队列RabbitMQ版
本次分享的主题是快速学会使用云消息队列RabbitMQ版的实践。内容包括:如何创建和配置RabbitMQ实例,如Vhost、Exchange、Queue等;如何通过阿里云控制台管理静态用户名密码和AccessKey;以及如何使用RabbitMQ开源客户端进行消息生产和消费测试。最后介绍了实验资源的回收步骤,确保资源合理利用。通过详细的操作指南,帮助用户快速上手并掌握RabbitMQ的使用方法。
106 10
|
1月前
|
消息中间件 存储 监控
说说MQ在你项目中的应用(一)
本文总结了消息队列(MQ)在项目中的应用,主要围绕异步处理、系统解耦和流量削峰三大功能展开。通过分析短信通知和业务日志两个典型场景,介绍了MQ的实现方式及其优势。短信通知中,MQ用于异步发送短信并处理状态更新;业务日志中,Kafka作为高吞吐量的消息系统,负责收集和传输系统及用户行为日志,确保数据的可靠性和高效处理。MQ不仅提高了系统的灵活性和响应速度,还提供了重试机制和状态追踪等功能,保障了业务的稳定运行。
73 6
|
21天前
|
运维 监控 Cloud Native
云原生之运维监控实践:使用 taosKeeper 与 TDinsight 实现对 时序数据库TDengine 服务的监测告警
在数字化转型的过程中,监控与告警功能的优化对保障系统的稳定运行至关重要。本篇文章是“2024,我想和 TDengine 谈谈”征文活动的三等奖作品之一,详细介绍了如何利用 TDengine、taosKeeper 和 TDinsight 实现对 TDengine 服务的状态监控与告警功能。作者通过容器化安装 TDengine 和 Grafana,演示了如何配置 Grafana 数据源、导入 TDinsight 仪表板、以及如何设置告警规则和通知策略。欢迎大家阅读。
47 0
|
3月前
|
运维 Linux Apache
,自动化运维成为现代IT基础设施的关键部分。Puppet是一款强大的自动化运维工具
【10月更文挑战第7天】随着云计算和容器化技术的发展,自动化运维成为现代IT基础设施的关键部分。Puppet是一款强大的自动化运维工具,通过定义资源状态和关系,确保系统始终处于期望配置状态。本文介绍Puppet的基本概念、安装配置及使用示例,帮助读者快速掌握Puppet,实现高效自动化运维。
84 4
|
18天前
|
人工智能 运维 监控
AI辅助的运维流程自动化:实现智能化管理的新篇章
AI辅助的运维流程自动化:实现智能化管理的新篇章
354 22

相关产品

  • 云消息队列 MQ