开发者社区 > 云原生 > 正文

RocketMQ使用consumerThreadMin、consumerThreadMax的数量进行

当我使用消息时,增加consumerThreadMax的值是不起作用的。

希望当consumerThreadMax的值增加时,我的专家消费速度会更快

现在是即使我增加consumerThreadMax的值,消耗速度也保持稳定

4.3.0,我检查了最后一个版本,它仍然会发生。

我在类ConsumerMessageConcurrentlyService中找到了一些: 公共ConsumerMessageConcurrentlyService(DefaultMQPushConsumerImpl DefaultMQPushConsumerImpl, 消息侦听器并发消息侦听器){ this.defaultMQPushConsumerImpl=defaultMQPush ConsumerImpl; this.messageListener=消息侦听器;

this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(),
    this.defaultMQPushConsumer.getConsumeThreadMax(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.consumeRequestQueue,
    new ThreadFactoryImpl("ConsumeMessageThread_"));

this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));

}

该构造函数将无界队列(new LinkedBlockingQueue())作为workQueue,它有一个风险,即当新任务到来时,如果数量等于consumeThreadMin,它将被推送到无界队列,consume线程数量将永远不会增加。因此consumerThreadMax值永远不会起作用因此consumerRequestQueue应该被分配为一个有界队列作为workQueue,比如this.consumerRequestQueue=new LinkedBlockingQueue(128);

原提问者GitHub用户zhangdp-taozi

展开
收起
芬奇福贵 2023-05-26 13:58:29 190 0
1 条回答
写回答
取消 提交回答
  • 是的,所以最好将minThread设置为maxThread,但如果您想将有界队列替换为workQueue,则在该队列已满的情况下会导致消息丢失

    原回答者GitHub用户duhenglucky

    2023-05-26 17:42:48
    赞同 展开评论 打赏

阿里云拥有国内全面的云原生产品技术以及大规模的云原生应用实践,通过全面容器化、核心技术互联网化、应用 Serverless 化三大范式,助力制造业企业高效上云,实现系统稳定、应用敏捷智能。拥抱云原生,让创新无处不在。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载