使用RocketMQ遇到过问题?细数RocketMQ的5个问题

简介: 前分享一一些关于RocketMQ的源码文章,认识到RocketMQ里面真的非常丰富

前言

大家好,我是小郭,之前分享一一些关于RocketMQ的源码文章,认识到RocketMQ里面真的非常丰富,

在开发的过程中消息中间件已经成为我们常用的技术方案,但是他也给我们带来了很多麻烦。

今天主要和大家分享一下在实际使用中遇到遇到的问题,以及如何是如何解决的。

1. 消息消费积压不处理

【问题】正常发送的消息,消息消费积压不处理,消费端未处理

【业务场景】

在之前的这篇文章中# 以为很熟悉CountDownLatch的使用了,没想到在生产环境翻车了,我提到了因为没有合理的使用导致线程阻塞。

因为这里阻塞导致了RocketMQ的20个线程都被阻塞住了,发送的消息过来之后,没有线程能够去执行。

【解决思路】

  1. 排查 Broker 是否异常,通过查看偏移量来确认是否出现积压
  2. RocketMQ 中每一客户端会单独创建一个线程 PullMessageService 会循环从 Broker 拉取一批消息,

然后提交到消费端的线程池中进行消费,线程池中的线程消费完一条消息后会上服务端上报当前消费端的消费进度,

而且在提交消费进度时是提交当前处理队列中消息消费偏移量最小的消息作为消费组的进度,

即如果消息偏移量为 100 的消息,如果由于某种原因迟迟没有消费成功,那该消费组的进度则无法向前推进。

【解决步骤】

  1. 打印出栈信息,jstack pid > j.log
  2. 先确定是否线程的状态在正常进行
  3. 重点搜索ConsumeMessageThread_开头的日志,来确定是否哪里造成了阻塞
"ConsumeMessageThread_1 #1 prio=5 os_prio=0 tid=0x00007fe51000c000 nid=0x8 waiting on condition [0x00007fe519590000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006c3a00070> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingDeque.putLast(LinkedBlockingDeque.java:396)
        at java.util.concurrent.LinkedBlockingDeque.put(LinkedBlockingDeque.java:649)
        at cn.DequeUtil.producer(DequeUtil.java:49)
        at cn.bo.impl.TaskBOImpl.lambda$compensateTaskDeque$0(TaskBOImpl.java:42)
        at cn.bo.impl.TaskBOImpl$$Lambda$1110/1194575856.accept(Unknown Source)

2. 设置线程数没用,正确设置消费组线程数

【问题】

  1. 在源码中,他们两个线程数都设置为20,将这两个值设置为相同。
    认为在消费端消息很多的情况下,将最大线程数提高会创建更多的线程来提高消息的处理速度,
  2. 参数设置过大,导致配置检查失败

【问题分析】

我们先来看一下RocketMQ是如何进行监听消息的

网络异常,图片无法展示
|

它主要启动了一个线程池,不间断的拉取消息,由于线程池内部持有的队列为一个无界队列,

导致 consumeThreadMax 大于 consumeThreadMin,线程个数最大也只能 consumeThreadMin 个线程数量

this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl(consumeThreadPrefix));

问题二: 在创建Consumer监听消息的时候,会进行配置的校验,。

那区间只能是 [1,1000] 如果超出这个值则会报错

// consumeThreadMin
        if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
            || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
            throw new MQClientException(
                "consumeThreadMin Out of range [1, 1000]"
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                null);
        }
        // consumeThreadMax
        if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
            throw new MQClientException(
                "consumeThreadMax Out of range [1, 1000]"
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                null);
        }

【解决方案】

  1. 在设置最大和最小线程数量的时候并不会因为最大线程数提高而提高消息的处理速率,所以在设置参数的时候需要注意设置的范围。
在 RocketMQ 中,每一个消费组都会启动一个线程池用来实现消费端在消费组的隔离,
RocketMQ 也提供了 consumeThreadMin、consumeThreadMax 两个参数来设置线程池中的线程个数
// 消费者最小线程数
consumer.setConsumeThreadMin(20);
// 消费者最大线程数
consumer.setConsumeThreadMax(20);
  1. 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。
  2. 可以通过加机器,或者在已有机器启动多个进程的方式。

3. 批量拉取数据解决默认32的限制

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setPullBatchSize(100);
    consumer.setConsumeMessageBatchMaxSize(200);
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    /*
     * Subscribe one more topic to consume.
     * 设置监听主题以及过滤条件
     */
    consumer.subscribe("TopicTest999", "*");
    /*
     *  Register callback to execute on arrival of messages fetched from brokers.
     *  注册消息监听器
     */
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            //System.out.println("待消费条数:"+ msgs.size());
            LOGGER.info("Receive New Messages : {}", Thread.currentThread().getName());
            /*try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/
            LOGGER.info("success");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    /*
     *  Launch the consumer instance.
     */
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

【问题】

通过设置PullBatchSize、ConsumeMessageBatchMaxSize

consumer.setPullBatchSize(100); 
consumer.setConsumeMessageBatchMaxSize(200);

来修改批量拉取消息的值,发现默认情况下一次消息会拉取 32 条消息,但业务监听器收到的消息默认一条

【问题分析】

因为RocketMQ采取了保护机制,需要修改Broker配置的参数才能够允许一次拉取的最大条数调整

  • pullBatchSize:消息客户端一次向 Broker 发送拉取消息每批返回最大的消息条数,默认为 32。
  • consumeMessageBatchMaxSize:提交到消息消费监听器中的消息条数,默认为 1。

【解决方案】

通过修改Broker配置的参数来解决,通常建议修只修改命中内存相关的

参数的含义:

int maxTransferCountOnMessageInMemory
如果此次消息拉取能全部命中,内存允许一次消息拉取的最大条数,默认值为 32 条。
int maxTransferBytesOnMessageInMemory
如果此次消息拉取能全部命中,内存允许一次消息拉取的最大消息大小,默认为 256K。

如果使用场景是大数据领域,建议的配置如下:

maxTransferCountOnMessageInMemory=5000
maxTransferBytesOnMessageInMemory = 5000 * 1024

如果是业务类场景,建议配置如下:

maxTransferCountOnMessageInMemory=2000
maxTransferBytesOnMessageInMemory = 2000 * 1024

通过修改完配置,我们再次启动就可以看到能够拉取到代销费的数量超过默认的32条。

Consumer Started.
待消费条数:100
待消费条数:100
待消费条数:100
待消费条数:100
待消费条数:38
待消费条数:38
待消费条数:100
待消费条数:100
待消费条数:100
待消费条数:100
待消费条数:38
待消费条数:39

4. 对当前版本的业务进行修改,业务希望从最新的消息开始消费

【问题】

对当前版本的业务进行修改,业务希望从最新的消息开始消费

【解决方案】

  1. 重置点位,sh ./mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g CID_CONSUMER_TEST -t TopicTest -s now
  2. 设置ConsumeFromWhere,从最新的点位开始读取

ConsumeFromWhere 这个参数的含义是,初次启动从何处开始消费。更准确的表述是,如果查询不到消息消费进度时,从什么地方开始消费

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

5. 基于多机房队列负载算法,实现优先消费本机房中的消息

【问题】

从消费者的角度来看,如果采取平均分配,特别是采取 AllocateMessageQueueAveragelyByCircle 方案,

会出现消费者跨消费这种情况,如果能实现本机房的消费者优先消费本机房中的消息,可有效避免消息跨机房消费

RocketMQ 设计者已经为我们了提供了解决方案——AllocateMachineRoomNearby。

【解决方案】

AllocateMachineRoomNearby 核心属性

1. AllocateMessageQueueStrategy allocateMessageQueueStrategy

内部分配算法,可以看成机房就近分配算法,其实是一个代理,内部还是需要持有一种分配算法,例如平均分配算法。

2. MachineRoomResolver machineRoomResolver

多机房解析器,即从 brokerName、客户端 clientId 中识别出所在的机房。

  1. 修改 broker.conf 配置文件,机房信息加上broker名称,这样做是为了识别出哪个 Broker 属于哪个机房
brokerName = MachineRoom1-broker-a
  1. 修改消费者的clientIp
consumer.setClientIP("MachineRoom1-" + RemotingUtil.getLocalAddress());
  1. 修改规则
AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new AllocateMachineRoomNearby.MachineRoomResolver() {
            // Broker部署
            @Override
            public String brokerDeployIn(MessageQueue messageQueue) {
                System.out.println(messageQueue.getBrokerName().split("-")[0]);
                return messageQueue.getBrokerName().split("-")[0];
            }
            // 消费端部署
            @Override
            public String consumerDeployIn(String clientID) {
                System.out.println(clientID.split("-")[0]);
                return clientID.split("-")[0];
            }
        };
        consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), machineRoomResolver));

总结

主要介绍了RocketMQ在使用中遇到的一些问题,从中认识到使用中间件的时候,需要特别注意它所带来的一些意想不到的影响,不低估每一个功能的实现,进而避免故障的产生。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 存储 JSON
RocketMQ-初体验RocketMQ(05)_RocketMQ架构解读
RocketMQ-初体验RocketMQ(05)_RocketMQ架构解读
74 0
|
2月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
2月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
5月前
|
消息中间件 弹性计算 运维
对比阿里云的SofaMQ与RocketMQ
对比阿里云的SofaMQ与RocketMQ
678 2
|
5月前
|
消息中间件 存储 缓存
技术好文:RocketMQ之一:RocketMQ整体介绍
技术好文:RocketMQ之一:RocketMQ整体介绍
64 0
|
消息中间件 Kafka RocketMQ
消息中间件(RocketMQ)笔记
分布式消息中间件,主要是实现分布式系统中解耦、异步消息、流量销锋、日志处理等场景。生产中用的最多的消息队列有Activemq,rabbitmq,kafka,rocketmq等。 以 Jms 规范和 rocketmq 为主来分享。版本基于 3.2.6 。 主要分享:JMS规范、Rocketmq的介绍、部署方式、特性的一些使用。
消息中间件(RocketMQ)笔记
|
消息中间件 RocketMQ
rocketmq学习2
前面我们已经通过quickstrat可以看到nameServer的启动:从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。 NamesrvController的属性信息、构造函数:
103 2
rocketmq学习2
|
消息中间件 RocketMQ 索引
Rocketmq学习一
首先从github中拉取Rocketmq的代码,进行运行。 1.由于rocketmq需要依赖nameServer,类似于zookeeper。首先启动时,配置好NamesrvStartup的环境变量信息,也即rocketmq的ROCKEMQ_HOME与你的项目对应。接着就可以启动了。
292 2
Rocketmq学习一
|
消息中间件 存储 SQL
RocketMQ实战
消息中间件的入门和导引
642 1
|
消息中间件 存储 SQL
RocketMQ技术分享
RocketMQ基础入门技术分享
下一篇
无影云桌面