RocketMQ的常规运维实践应用
1. 实验环境说明
实验环境
- 体验手册。
a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。
b.实验操作手册。
- 云产品资源。
a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。
b.实验环境使用的云服务器(ECS实例),并挂载弹性IP, 可通过公网在本地访问。弹性IP需要大家记住如何查看到,后续的实验会用到。
- 体验报告。
小伟老师希望大家多多填写,我们多多优化,帮助大家快速方便的通过实验了解RocketMQ。
- 实验环境。
实验室为您提供一个云服务器ECS实例,操作系统为Alibaba Cloud Linux 2.1903 64位版本。
- 实验体验时间。
体验时间一般为一个小时。
- 实验环境功能栏。
功能栏一般包括全屏、切换至Web Terminal、FAQ、热门问题、主题色、钉钉交流群二维码和问题反馈等七个功能。
- 编写实验报告。
填写实验报告,帮助大家快速通过实验了解RocketMQ。
实验帮助
如果您在使用RocketMQ实验时有需要咨询的问题,可以扫描二维码加入钉钉钉钉群。
启动一个集群
- 启动namesrv。观察到启动成功的日志后, ctrl + c,终止日志输出。
cd /usr/local/services/5-rocketmq/namesrv-01 restart.sh
- 启动broker
修改broker配置项brokerIP1为实验公网IP,启动broker。观察到启动成功的日志后, ctrl + c,终止日志输出。
操作命令如下:
cd /usr/local/services/5-rocketmq/broker-01 vim ./conf/broker.conf restart.sh
2. 如何编译、部署RocketMQ dashboard
如何编译、部署RocketMQ dashboard。
本教程会以如何利用源码编译并打包RocketMQ为例, 演示如何下载、编译任意版本的RocketMQ.
0. 启动一个集群 (已启动则不用操作)
1. 安装git,jdk, maven等工具(当前实验环境已经安装好。参考或者baidu/google)
- jdk安装
- macos: https://juejin.cn/post/6844903878694010893
- windows: https://www.runoob.com/w3cnote/windows10-java-setup.html
- maven安装
- maven 下载二进制: https://dist.apache.org/repos/dist/release/maven/maven-3/
- maven 安装(windows + macos): https://www.runoob.com/maven/maven-setup.html
- 如果对于国外网站访问慢, 可以配置maven国内镜像:https://cloud.tencent.com/developer/article/1452479
- git安装 ( 可选 , 不安装的话直接下载4.9.3源代码:https://github.com/apache/rocketmq/releases)
2. 下载最新release代码(这里以git为例,如果没有安装git直接从github release页面下载)
在自己电脑上, 进入命令行, 选择一个保存源码的目录, 这里我把源码保存到 /tiger/tmp为例
2.1 创建代码保存目录(已创建则不操作)并进入代码保存目录:
mkdir -p /tiger/tmp cd /tiger/tmp
2.2 克隆master代码
git clone https://github.com/apache/rocketmq-dashboard.git or git clone https://gitee.com/mirrors_apache/rocketmq-dashboard.git
2.3 进入源码根目录:
cd rocketmq-dashboard
可以看到如下代码信息:
3. 编译和打包源码(这里面包含前后端一起打包,前端yarn, node, npm都已经在实验环境准备好了,及时下载大概率会失败)
- 修改dashboard配置的namesrv地址
vim src/main/resources/application.yml
- 执行打包命令
mvn clean package -Dmaven.test.skip=true
看到如下结果就是成功了
编译打包成功后, 我们执行:
cd target ls -l
查看打包结果
- 运行dashboard
java -Xms400m -Xmx400m -Xmn400m -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar
3. 如何查看和扩容Topic
扩容Topic,有多种方式:
- 在topic所在broker节点上增加queue数量
- 在topic所在broker节点外,已存在的broker节点上增加queue数量。
- 新添加broker节点,将增加的queue放到新broker节点。
本节主要讲解第一种, 一般的,增加queue数量可以增大生产和消费吞吐量。
0. 启动一个集群和安装dashboard (已完成则不用操作)
创建topic
通过访问实验环境http://公网IP:30904, 可以访问到实验环境部署的dashboard。创建一个topic, 配置如下
查看topic的队列数
这里显示2个队列都在broker-a上。
增加topic队列数
增加队列和创建一样, 可以选择选择新增加的队列分布在哪些集群或者哪些broker上。 这里我们只有一个broker,
保持集群、broker不变。预期增加的队列都在broker-a上。
增加队列成功后,我们再次查看topic状态, 发现队列已经变化。
一般的, 增加队列数,可以增大消费效率。 同一个消费者组中的消费者个数最大为队列数,一般消费能力随着队列数增大而增加。
4. 如何重置消费位点
重置消费位点,是消费者想重新消费部分消息,此时可以通过社区的dashboard直接操作。
前提是消费者已经做好消费幂等。
下面简单总结了重置消费位点的流程:
通过上图可以看出重置消费位点需要broker,client都支持才能进行,下面我们分享下操作过程。
扩容Topic,这里是狭义的扩容, 指增加topic的queue数量,以此来对生产和消费效率产生影响。
0. 启动一个集群和安装dashboard (已完成则不用操作)
查找到需要重置位点的topic
通过访问实验环境http://公网IP:30904, 可以访问到实验环境部署的dashboard。查找到需要重置位点的topic,点击重置消费位点
选择需要重置消费位点的消费者组和位点起始时间。
注意:
2.1 消费者有消费幂等实现,如果没有的话,位点重置后,可能带来数据重复等问题。
2.2 一个topic可能被多个消费者组订阅,需要准确找到你需要重置的那个消费者组名字
2.3 每一个消息有一个位点对应,每一个位点有一个距离其最近的时间。所以这里的时间并不能完全对应位点值,只是一个预估。
观察重置后旧的消息是否被消费者重新消费。这个过程可能需要一点时间, 耐心等待观察结果
一般的,重置位点一般有2种场景可以用到
- 业务消费逻辑有bug,修复后需要重新处理之前业务消息
- 流计算中,重放数据。
5. 如何动态修改Broker配置
动态修改Broker配置可以动态生效,并且可以帮你快速了解Broker的全部配置项。rocketmq发布的工具包中有单独的命令可以支持。
下面以maxMsgsNumBatch配置项为例,我们修改下这个配置值。
0. 启动一个集群 (已完成则不用操作)
1. 进入一个broker的根目录,可以看到如下目录结构
2. 查看一个broker全部配置值。
bin/mqadmin getBrokerConfig -b 127.0.0.1:10911
-b : 后面跟broker ip:port
查出的结果会类似:
3. 更新maxMsgsNumBatch的值为64
bin/mqadmin updateBrokerConfig -b 127.0.0.1:10911 -k maxMsgsNumBatch -v 64
更新成功后会打印:
4. 再通过第2步的命令查看,值是否被改变了。
一般的, 只要调用过更新broker配置接口, broker都会持久化一份全部配置到broker.conf(配置文件名以启动命令参数为准),我们登录broker可以看到。
以下是我本地的一份配置demo,大家参考
serverSelectorThreads=3 brokerRole=ASYNC_MASTER serverSocketRcvBufSize=131072 osPageCacheBusyTimeOutMills=1000 shortPollingTimeMills=1000 clientSocketRcvBufSize=131072 clusterTopicEnable=true brokerTopicEnable=true autoCreateTopicEnable=true maxErrorRateOfBloomFilter=20 maxMsgsNumBatch=64 cleanResourceInterval=10000 commercialBaseCount=1 maxTransferCountOnMessageInMemory=32 brokerFastFailureEnable=true brokerClusterName=DefaultCluster flushDiskType=ASYNC_FLUSH commercialBigCount=1 mappedFileSizeConsumeQueue=6000000 consumerFallbehindThreshold=17179869184 autoCreateSubscriptionGroup=true transientStorePoolEnable=false flushConsumerOffsetInterval=5000 waitTimeMillsInHeartbeatQueue=31000 diskMaxUsedSpaceRatio=75 flushCommitLogLeastPages=4 cleanFileForciblyEnable=true slaveReadEnable=false msgTraceTopicName=RMQ_SYS_TRACE_TOPIC expectConsumerNumUseFilter=32 traceTopicEnable=false useEpollNativeSelector=false enablePropertyFilter=true messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h deleteCommitLogFilesInterval=100 brokerName=broker-a maxTransferBytesOnMessageInDisk=65536 listenPort=10911 flushConsumeQueueLeastPages=2 pullMessageThreadPoolNums=20 useReentrantLockWhenPutMessage=false flushIntervalConsumeQueue=1000 sendThreadPoolQueueCapacity=10000 debugLockEnable=false haHousekeepingInterval=20000 diskFallRecorded=true messageIndexEnable=true clientAsyncSemaphoreValue=65535 clientCallbackExecutorThreads=2 putMsgIndexHightWater=600000 sendMessageThreadPoolNums=1 clientManagerThreadPoolQueueCapacity=1000000 serverSocketSndBufSize=131072 maxDelayTime=40 clientSocketSndBufSize=131072 namesrvAddr=localhost:9876 commercialEnable=true maxHashSlotNum=5000000 heartbeatThreadPoolNums=2 transactionTimeOut=6000 maxMessageSize=4194304 adminBrokerThreadPoolNums=16 defaultQueryMaxNum=32 maxTransferBytesOnMessageInMemory=262144 forceRegister=true enableConsumeQueueExt=false longPollingEnable=true serverWorkerThreads=8 messageIndexSafe=false deleteConsumeQueueFilesInterval=100 haSlaveFallbehindMax=268435456 serverCallbackExecutorThreads=0 flushCommitLogThoroughInterval=10000 isEnableBatchPush=false commercialTimerCount=1 enableDLegerCommitLog=false useTLS=false redeleteHangedFileInterval=120000 flushIntervalCommitLog=500 rocketmqHome=/opt/rocketmq queryMessageThreadPoolNums=10 messageStorePlugIn= serverChannelMaxIdleTimeSeconds=120 maxIndexNum=20000000 filterDataCleanTimeSpan=86400000 filterServerNums=0 commitCommitLogLeastPages=4 waitTimeMillsInPullQueue=5000 haSendHeartbeatInterval=5000 processReplyMessageThreadPoolNums=20 clientChannelMaxIdleTimeSeconds=120 filterSupportRetry=false flushDelayOffsetInterval=10000 duplicationEnable=false replyThreadPoolQueueCapacity=10000 offsetCheckInSlave=false clientCloseSocketIfTimeout=false transientStorePoolSize=5 waitTimeMillsInSendQueue=200 warmMapedFileEnable=false endTransactionThreadPoolNums=12 flushCommitLogTimed=false flushLeastPagesWhenWarmMapedFile=4096 clientWorkerThreads=4 endTransactionPoolQueueCapacity=100000 registerNameServerPeriod=30000 registerBrokerTimeoutMills=6000 accessMessageInMemoryMaxRatio=40 highSpeedMode=false transactionCheckMax=15 checkCRCOnRecover=true destroyMapedFileIntervalForcibly=120000 brokerIP2=xxx.xxx.xxx.xxx brokerIP1=xxx.xxx.xxx.xxx commitIntervalCommitLog=200 clientOnewaySemaphoreValue=65535 storeReplyMessageEnable=true traceOn=true clientManageThreadPoolNums=32 channelNotActiveInterval=60000 mappedFileSizeConsumeQueueExt=50331648 consumerManagerThreadPoolQueueCapacity=1000000 serverOnewaySemaphoreValue=256 haListenPort=10912 enableCalcFilterBitMap=false clientPooledByteBufAllocatorEnable=false aclEnable=false storePathRootDir=/root/store syncFlushTimeout=5000 rejectTransactionMessage=false commitCommitLogThoroughInterval=200 connectTimeoutMillis=3000 queryThreadPoolQueueCapacity=20000 regionId=DefaultRegion consumerManageThreadPoolNums=32 disableConsumeIfConsumerReadSlowly=false flushConsumerOffsetHistoryInterval=60000 fetchNamesrvAddrByAddressServer=false haTransferBatchSize=32768 compressedRegister=false storePathCommitLog=/root/store/commitlog commercialTransCount=1 transactionCheckInterval=60000 mappedFileSizeCommitLog=1073741824 startAcceptSendRequestTimeStamp=0 serverPooledByteBufAllocatorEnable=true serverAsyncSemaphoreValue=64 autoDeleteUnusedStats=false waitTimeMillsInTransactionQueue=3000 heartbeatThreadPoolQueueCapacity=50000 deleteWhen=05 bitMapLengthConsumeQueueExt=112 fastFailIfNoBufferInStorePool=false defaultTopicQueueNums=8 flushConsumeQueueThoroughInterval=60000 notifyConsumerIdsChangedEnable=true brokerPermission=6 fileReservedTime=48 transferMsgByHeap=true pullThreadPoolQueueCapacity=100000 brokerId=0 maxTransferCountOnMessageInDisk=8
实验链接:https://developer.aliyun.com/adc/scenario/14828d336c1547b0ab3f7514ca229ea2