开发者社区 > 云原生 > 云消息队列 > 正文

我的rocketMQ生产端可以生产消息,但是消费端会无法顺利消费消息。

  • My rocketMQ producer can send messages to the queue, but the consumer cannot consume messages in a timely manner. Sometimes it will be delayed for about 10 seconds, and most of the time it will not consume messages at all.
  • I checked the rocketMQ control panel, and the unconsumed messages look like this: CleanShot 2023-07-28 at 22.10.45.png
  • My application.yml is:
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: ProducerGroup
  consumer:
    group: ConsumerGroup
    topic: MyTopic
  • My maven dependencies are as follows:
<properties>
    <java.version>11</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.5.12</version>
</parent>
....
<dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-spring-boot-starter</artifactId>
     <version>2.2.1</version>
</dependency>
  • My producer side code is as follows:
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class RocketMQProducer {

    private DefaultMQProducer producer;

    @Autowired
    public RocketMQProducer(@Value("${rocketmq.producer.group}") String producerGroup,
                            @Value("${rocketmq.name-server}") String nameServerAddress) {
        producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(nameServerAddress);
        try {
            producer.start(); 
        } catch (MQClientException e) {
            System.out.println("RocketMQProducer is wrong: ");
            e.printStackTrace();
        }
    }

    public void sendMessages(String topic, String tags, String message) {
        try {
            Message mqMessage = new Message(topic, tags, message.getBytes());
            producer.send(mqMessage);
            System.out.println("Message sent to queue: " + message);
        } catch (Exception e) {
            System.out.println("RocketMQProducer sendMessages() is wrong: ");
            e.printStackTrace();
        }
    }
    public void shutdown() {
        producer.shutdown();
    }
}
  • Next, I called the sendMessages method of the producer in the NettyClientHandler code,redundant code omitted:
@Component
public class NettyClientHandler extends ChannelInboundHandlerAdapter {


    @Autowired
    private RocketMQProducer rocketMQProducer;

    @Value("${rocketmq.consumer.topic}")
    private String topic;


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Client,channelActive");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        LogUtil.log("Client,Received a message from the server");

        if (msg instanceof ByteBuf) { 
            ByteBuf byteBuf = (ByteBuf) msg;

            String message = byteBuf.toString(StandardCharsets.UTF_8);

            rocketMQProducer.sendMessages(topic, "", message);

            System.out.println("Received message: " + message);
        }
    }


}
  • My consumer side code is as follows:
@Service
public class RocketMQCommonConsumerListener implements CommandLineRunner {

    @Autowired
    private subway.service.Service service;

    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;

    @Value("${rocketmq.name-server}")
    private String nameServerAddress;

    @Value("${rocketmq.consumer.topic}")
    private String topic;


    public void consumeMessages() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(nameServerAddress);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        System.out.println("consume_step1");
        try {
            consumer.subscribe(topic, "*");
            System.out.println("consume_step2");
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messages, ConsumeOrderlyContext context) {
                    StringBuilder sb = new StringBuilder();
                    boolean needMerge = true;
                    System.out.print("consume_step3, ");
                    long threadId = Thread.currentThread().getId();
                    System.out.println("Current Thread ID: " + threadId);
                    for (MessageExt message : messages) {
                        String str = new String(message.getBody());
                        System.out.println("RocketMQ received message:" + str);

                     }

                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }


    private void handleJson(String json) {

        System.out.println("json data is :");
        System.out.println(json);
        System.out.println("\t\t\t============\t\t\t\n");
    }

    @Async("taskExecutor")
    @Override
    public void run(String... args){
        consumeMessages();
    }
}
  • I have modified it according to chatGPT3.5, and searched for answers on the Internet, but none of them can be solved. This question has troubled me for a long time, I am looking forward to the answer.

展开
收起
游客7zy5u6363dy54 2023-07-28 22:20:45 382 0
5 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    可能是以下几个原因之一:

    消费者组未启动或未正常运行:消费者组是RocketMQ中的一个重要概念,用于标识一组消费者。如果消费者组未启动或未正常运行,可能会导致消费者无法消费消息。因此,可以检查消费者组的配置和运行状态,确保消费者组正常运行。

    消费者未订阅消息主题或标签:在RocketMQ中,消费者需要订阅消息主题或标签,才能接收到相应的消息。如果消费者未订阅消息主题或标签,可能会导致消费者无法消费消息。因此,可以检查消费者的订阅配置,确保消费者已订阅相应的消息主题或标签。

    消费者消费速度过慢:如果消费者的消费速度过慢,可能会导致消息累积过多,从而导致消费者无法正常消费消息。因此,可以检查消费者的消费速度,是否适合当前的消息生产速度。

    消息消费失败或处理异常:如果消息消费失败或处理异常,可能会导致消费者无法正常消费消息。因此,可以检查消费者消费的消息内容和消费逻辑,确保消息消费和处理逻辑正确无误。

    RocketMQ服务异常或故障:如果RocketMQ服务出现异常或故障,可能会导致消息无法正常传输和消费。因此,可以检查RocketMQ服务的健康状态和日志,以及相关的网络配置和安全设置,确保RocketMQ服务正常运行。

    2023-07-31 23:26:39
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    您好,请按照下面步骤排查:

    1. 检查接入点是否设置正确。接入点请从控制台实例详情页获取。
    2. 检查网络连通性,telnet 接入点域名和端口。
    3. 设置正确的Topic名,不要有空格等。
    4. 设置正确的AccessKey、SecretKey。4.0实例请使用RAM的AccessKey、SecretKey并正确授权。5.0实例请使用MQ控制台实例详情页的实例用户和密码。
    2023-07-31 15:25:40
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    如果你的 RocketMQ 生产端可以成功生产消息,但是消费端无法顺利消费消息,可能有以下几个可能的原因:

    1. 消费者配置错误:请确保消费者的配置正确并与生产者保持一致。例如,检查消费者的主题(Topic)和消费者组(Consumer Group)是否正确设置。

    2. 网络或防火墙问题:检查网络连接是否正常,并确保消费者的消息监听端口没有被防火墙或其他网络安全机制拦截。

    3. 消费者订阅规则错误:请确保消费者订阅的主题与生产者发送的消息主题匹配。消费者可以通过订阅表达式(如"TagA || TagB")或者模糊匹配(如"TopicA.*")来过滤和选择特定的消息。

    4. 消费者偏移量设置不当:在 RocketMQ 中,每个消费者组都有一个消费进度偏移量(Offset),用于记录消费者消费消息的位置。如果消费者组的偏移量设置不当,可能导致消费者无法消费新的消息。可以尝试重置偏移量或者让消费者从最早的消息开始消费。

    5. 消费者异常处理不完整:当消费者遇到异常情况时,需要进行异常处理。例如,记录日志、重试等。如果消费者没有良好的异常处理机制,可能导致消费端无法正常消费消息。

    2023-07-29 23:48:03
    赞同 展开评论 打赏
  • 十分耕耘,一定会有一分收获!

    楼主你好,根据你的描述,这个问题可能是由于以下几个原因之一引起的:

    1. 消费者的消费能力不足

    消费者的消费能力不足可能会导致消息消费延迟或者无法及时消费消息。您可以尝试增加消费者的数量或者优化消费者的代码,以提高消费者的消费能力。

    1. 消费者的网络延迟或者阻塞

    消费者的网络延迟或者阻塞也会导致消息消费延迟或者无法及时消费消息。您可以检查消费者的网络状态和防火墙设置,以确保消费者能够顺畅地消费消息。

    1. 消息队列的负载过重

    消息队列的负载过重也会导致消息消费延迟或者无法及时消费消息。您可以尝试增加消息队列的数量或者优化消息队列的代码,以提高消息队列的处理能力。

    1. 消息发送端的问题

    消息发送端的问题也可能会导致消息消费延迟或者无法及时消费消息。您可以检查消息发送端的代码和配置,以确保消息能够正常发送到消息队列中。

    根据您提供的信息,您可以先尝试检查消费者的消费能力和网络状态,以及消息队列的负载情况。如果问题仍然存在,您可以进一步检查消息发送端的代码和配置,以确定是否存在问题。同时,您也可以在RocketMQ控制面板中查看未消费的消息,以帮助您确定问题的具体原因。

    2023-07-29 09:00:29
    赞同 展开评论 打赏
  • 根据您提供的描述,您的RocketMQ生产者能够将消息发送到队列,但是消费者无法及时消费消息。有时消息会延迟约10秒钟,大部分情况下消息根本无法被消费。

    这种情况可能有多种潜在原因导致,以下是一些可能的原因和对应的解决方法:

    1. 消费者配置:请检查消费者的配置,确保与生产者设置相匹配。确保消费者订阅了正确的主题和标签,并且具有适当的消息模式(广播或集群)。

    2. 消费者组和实例管理:确保为消费者实例设置了唯一的消费者组。如果同一个消费者组的多个实例同时运行,它们会共享消费负载。验证消费者实例是否正常运行,并避免它们之间的冲突。

    3. 消息消费速率:检查消费者处理消息的速度。如果消费者无法跟上消息的到达速度,可能会导致延迟。检查消费者代码,确保其足够高效以处理工作负载。必要时考虑增加消费者实例的数量。

    4. 网络或服务器问题:调查消费者和RocketMQ Broker之间是否存在任何网络连接问题。另外,请检查服务器状态和资源使用情况,确保它能够有效地处理消息负载。

    5. 消息确认和偏移提交:确认消费者代码正确地对消息进行消费状态的确认,并提交偏移量。否则,消息可能被认为是未消费的,不会被转发给下一个消费者。

    6. RocketMQ版本兼容性:确保生产者和消费者都使用兼容的RocketMQ客户端库和Broker版本。版本不兼容可能导致意外行为。

    建议记录和监控您的消费者应用程序,以捕获在消息消费过程中可能出现的任何错误或异常。启用RocketMQ日志记录,并检查日志以确定任何潜在问题或错误消息。

    如果问题仍然存在,请考虑联系RocketMQ社区或支持团队寻求进一步的帮助。他们可以根据您的配置帮助诊断具体问题,并提供更针对性的解决方案。

    2023-07-28 23:19:40
    赞同 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 MQ
  • 热门讨论

    热门文章

    相关电子书

    更多
    RocketMQ Client-GO 介绍 立即下载
    RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
    基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载