开发者社区 > 云原生 > 云消息队列 > 正文

rocketmq5 生产消息的定时时间和实际消费时间不匹配,有提前几分钟有延迟几分钟后消费的情况,?

rocketmq5 生产消息的定时时间和实际消费时间不匹配,有提前几分钟有延迟几分钟后消费的情况,rocketmq开启的代理模式,pom包引入rocketmq-client-java 5.0.5,具体的投递消息代码如下,请各位大佬帮忙看下是不是我的调用方式有问题?image.png

展开
收起
真的很搞笑 2023-06-06 13:37:28 363 0
3 条回答
写回答
取消 提交回答
  • 提供的代码来看,没有明显的问题。有可能是 RocketMQ 服务端的时间同步不够准确,或者是消费者端的消费线程数不够多导致延迟。

    您可以尝试以下方案:

    1. 确认 RocketMQ 服务端和客户端的时间同步是否准确,建议使用 NTP 协议进行时间同步。

    2. 调整消费者端的线程数,增加消费能力,以减少消息延迟。

    3. 如果您使用的是 RocketMQ 的定时消息功能,可以尝试使用延迟消息功能,以确保消息在指定时间点被消费。

    2023-06-11 15:49:12
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    RocketMQ的消息投递是不保证精确到毫秒级别的,存在一定程度的时间误差,这是由于RocketMQ在处理消息时需要考虑网络传输和负载均衡等因素所导致的。因此,在使用RocketMQ时,建议您将消息投递的时间与实际消费时间相差几秒钟或几分钟均视为正常情况。

    另外,根据您提供的代码,您是使用了延迟发送的方式来投递消息。如果您要确保消息在指定的时间点被消费者接收到,可以考虑使用可靠消息投递机制,例如同步发送、异步发送或者单向发送等方式。具体的示例如下:

    1. 同步发送
    DefaultMQProducer producer = new DefaultMQProducer("your_group_name");
    producer.setNamesrvAddr("your_namesrv_addr:9876");
    producer.start();
    
    Message message = new Message("your_topic_name", "your_tag_name", "Hello, RocketMQ!".getBytes());
    SendResult sendResult = producer.send(message, 3000);
    System.out.printf("Message sent: %s\n", sendResult);
    
    producer.shutdown();
    
    1. 异步发送
    DefaultMQProducer producer = new DefaultMQProducer("your_group_name");
    producer.setNamesrvAddr("your_namesrv_addr:9876");
    producer.start();
    
    Message message = new Message("your_topic_name", "your_tag_name", "Hello, RocketMQ!".getBytes());
    producer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.printf("Message sent: %s\n", sendResult);
        }
    
        @Override
        public void onException(Throwable throwable) {
            throwable.printStackTrace();
        }
    });
    
    producer.shutdown();
    
    1. 单向发送
    DefaultMQProducer producer = new DefaultMQProducer("your_group_name");
    producer.setNamesrvAddr("your_namesrv_addr:9876");
    producer.start();
    
    Message message = new Message("your_topic_name", "your_tag_name", "Hello, RocketMQ!".getBytes());
    producer.sendOneway(message);
    
    producer.shutdown();
    

    以上示例中的所有发送方式都是可靠的,可以保证消息一定会被送达。如果您需要确保消息在指定的时间点被消费者接收到,可以将消息的投递时间设置为离实际消费时间不远的时刻(例如提前1-2秒钟),从而减少时间误差的影响。

    2023-06-06 16:57:59
    赞同 展开评论 打赏
  • 从事java行业9年至今,热爱技术,热爱以博文记录日常工作,csdn博主,座右铭是:让技术不再枯燥,让每一位技术人爱上技术

    您好,云消息队列 RocketMQ 版的定时消息和延时消息的定义如下: 定时消息:Producer将消息发送到云消息队列 RocketMQ 版服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。 延时消息:Producer将消息发送到云消息队列 RocketMQ 版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。 关于定时消息与延时消息的使用场景及使用方式可以参考文档:文档 文档下面有各种语言的代码示例连接,可以自行对比一下您当前的代码

    2023-06-06 14:16:24
    赞同 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 MQ
  • 热门讨论

    热门文章

    相关电子书

    更多
    RocketMQ Client-GO 介绍 立即下载
    RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
    基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载