前言
大家好,我是小郭,之前分享一一些关于RocketMQ的源码文章,认识到RocketMQ里面真的非常丰富,
在开发的过程中消息中间件已经成为我们常用的技术方案,但是他也给我们带来了很多麻烦。
今天主要和大家分享一下在实际使用中遇到遇到的问题,以及如何是如何解决的。
1. 消息消费积压不处理
【问题】正常发送的消息,消息消费积压不处理,消费端未处理
【业务场景】
在之前的这篇文章中# 以为很熟悉CountDownLatch的使用了,没想到在生产环境翻车了,我提到了因为没有合理的使用导致线程阻塞。
因为这里阻塞导致了RocketMQ的20个线程都被阻塞住了,发送的消息过来之后,没有线程能够去执行。
【解决思路】
- 排查 Broker 是否异常,通过查看偏移量来确认是否出现积压
- RocketMQ 中每一客户端会单独创建一个线程 PullMessageService 会循环从 Broker 拉取一批消息,
然后提交到消费端的线程池中进行消费,线程池中的线程消费完一条消息后会上服务端上报当前消费端的消费进度,
而且在提交消费进度时是提交当前处理队列中消息消费偏移量最小的消息作为消费组的进度,
即如果消息偏移量为 100 的消息,如果由于某种原因迟迟没有消费成功,那该消费组的进度则无法向前推进。
【解决步骤】
- 打印出栈信息,jstack pid > j.log
- 先确定是否线程的状态在正常进行
- 重点搜索ConsumeMessageThread_开头的日志,来确定是否哪里造成了阻塞
"ConsumeMessageThread_1 #1 prio=5 os_prio=0 tid=0x00007fe51000c000 nid=0x8 waiting on condition [0x00007fe519590000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000006c3a00070> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingDeque.putLast(LinkedBlockingDeque.java:396) at java.util.concurrent.LinkedBlockingDeque.put(LinkedBlockingDeque.java:649) at cn.DequeUtil.producer(DequeUtil.java:49) at cn.bo.impl.TaskBOImpl.lambda$compensateTaskDeque$0(TaskBOImpl.java:42) at cn.bo.impl.TaskBOImpl$$Lambda$1110/1194575856.accept(Unknown Source)
2. 设置线程数没用,正确设置消费组线程数
【问题】
- 在源码中,他们两个线程数都设置为20,将这两个值设置为相同。
认为在消费端消息很多的情况下,将最大线程数提高会创建更多的线程来提高消息的处理速度, - 参数设置过大,导致配置检查失败
【问题分析】
我们先来看一下RocketMQ是如何进行监听消息的
它主要启动了一个线程池,不间断的拉取消息,由于线程池内部持有的队列为一个无界队列,
导致 consumeThreadMax 大于 consumeThreadMin,线程个数最大也只能 consumeThreadMin 个线程数量
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl(consumeThreadPrefix));
问题二: 在创建Consumer监听消息的时候,会进行配置的校验,。
那区间只能是 [1,1000] 如果超出这个值则会报错
// consumeThreadMin if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1 || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) { throw new MQClientException( "consumeThreadMin Out of range [1, 1000]" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } // consumeThreadMax if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) { throw new MQClientException( "consumeThreadMax Out of range [1, 1000]" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); }
【解决方案】
- 在设置最大和最小线程数量的时候并不会因为最大线程数提高而提高消息的处理速率,所以在设置参数的时候需要注意设置的范围。
在 RocketMQ 中,每一个消费组都会启动一个线程池用来实现消费端在消费组的隔离, RocketMQ 也提供了 consumeThreadMin、consumeThreadMax 两个参数来设置线程池中的线程个数 // 消费者最小线程数 consumer.setConsumeThreadMin(20); // 消费者最大线程数 consumer.setConsumeThreadMax(20);
- 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。
- 可以通过加机器,或者在已有机器启动多个进程的方式。
3. 批量拉取数据解决默认32的限制
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setPullBatchSize(100); consumer.setConsumeMessageBatchMaxSize(200); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /* * Subscribe one more topic to consume. * 设置监听主题以及过滤条件 */ consumer.subscribe("TopicTest999", "*"); /* * Register callback to execute on arrival of messages fetched from brokers. * 注册消息监听器 */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //System.out.println("待消费条数:"+ msgs.size()); LOGGER.info("Receive New Messages : {}", Thread.currentThread().getName()); /*try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); }*/ LOGGER.info("success"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /* * Launch the consumer instance. */ consumer.start(); System.out.printf("Consumer Started.%n"); }
【问题】
通过设置PullBatchSize、ConsumeMessageBatchMaxSize
consumer.setPullBatchSize(100); consumer.setConsumeMessageBatchMaxSize(200);
来修改批量拉取消息的值,发现默认情况下一次消息会拉取 32 条消息,但业务监听器收到的消息默认一条
【问题分析】
因为RocketMQ采取了保护机制,需要修改Broker配置的参数才能够允许一次拉取的最大条数调整
- pullBatchSize:消息客户端一次向 Broker 发送拉取消息每批返回最大的消息条数,默认为 32。
- consumeMessageBatchMaxSize:提交到消息消费监听器中的消息条数,默认为 1。
【解决方案】
通过修改Broker配置的参数来解决,通常建议修只修改命中内存相关的
参数的含义:
int maxTransferCountOnMessageInMemory 如果此次消息拉取能全部命中,内存允许一次消息拉取的最大条数,默认值为 32 条。 int maxTransferBytesOnMessageInMemory 如果此次消息拉取能全部命中,内存允许一次消息拉取的最大消息大小,默认为 256K。
如果使用场景是大数据领域,建议的配置如下:
maxTransferCountOnMessageInMemory=5000 maxTransferBytesOnMessageInMemory = 5000 * 1024
如果是业务类场景,建议配置如下:
maxTransferCountOnMessageInMemory=2000 maxTransferBytesOnMessageInMemory = 2000 * 1024
通过修改完配置,我们再次启动就可以看到能够拉取到代销费的数量超过默认的32条。
Consumer Started. 待消费条数:100 待消费条数:100 待消费条数:100 待消费条数:100 待消费条数:38 待消费条数:38 待消费条数:100 待消费条数:100 待消费条数:100 待消费条数:100 待消费条数:38 待消费条数:39
4. 对当前版本的业务进行修改,业务希望从最新的消息开始消费
【问题】
对当前版本的业务进行修改,业务希望从最新的消息开始消费
【解决方案】
- 重置点位,sh ./mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g CID_CONSUMER_TEST -t TopicTest -s now
- 设置ConsumeFromWhere,从最新的点位开始读取
ConsumeFromWhere 这个参数的含义是,初次启动从何处开始消费。更准确的表述是,如果查询不到消息消费进度时,从什么地方开始消费。
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
5. 基于多机房队列负载算法,实现优先消费本机房中的消息
【问题】
从消费者的角度来看,如果采取平均分配,特别是采取 AllocateMessageQueueAveragelyByCircle 方案,
会出现消费者跨消费这种情况,如果能实现本机房的消费者优先消费本机房中的消息,可有效避免消息跨机房消费。
RocketMQ 设计者已经为我们了提供了解决方案——AllocateMachineRoomNearby。
【解决方案】
AllocateMachineRoomNearby 核心属性
1. AllocateMessageQueueStrategy allocateMessageQueueStrategy
内部分配算法,可以看成机房就近分配算法,其实是一个代理,内部还是需要持有一种分配算法,例如平均分配算法。
2. MachineRoomResolver machineRoomResolver
多机房解析器,即从 brokerName、客户端 clientId 中识别出所在的机房。
- 修改 broker.conf 配置文件,机房信息加上broker名称,这样做是为了识别出哪个 Broker 属于哪个机房
brokerName = MachineRoom1-broker-a
- 修改消费者的clientIp
consumer.setClientIP("MachineRoom1-" + RemotingUtil.getLocalAddress());
- 修改规则
AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new AllocateMachineRoomNearby.MachineRoomResolver() { // Broker部署 @Override public String brokerDeployIn(MessageQueue messageQueue) { System.out.println(messageQueue.getBrokerName().split("-")[0]); return messageQueue.getBrokerName().split("-")[0]; } // 消费端部署 @Override public String consumerDeployIn(String clientID) { System.out.println(clientID.split("-")[0]); return clientID.split("-")[0]; } }; consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), machineRoomResolver));
总结
主要介绍了RocketMQ在使用中遇到的一些问题,从中认识到使用中间件的时候,需要特别注意它所带来的一些意想不到的影响,不低估每一个功能的实现,进而避免故障的产生。