开发者社区> 问答> 正文

RocketMq收不到延时消息

RocketMq收不到延时消息?

本问题来自阿里云开发者社区的【11大垂直技术领域开发者社群】。 点击链接欢迎加入感兴趣的技术领域群。

展开
收起
游客pklijor6gytpx 2019-10-24 15:45:35 3297 0
2 条回答
写回答
取消 提交回答
  • 可以考虑用阿里云RocketMQ,支持任意精度的定时消息

    2020-03-12 19:34:06
    赞同 展开评论 打赏
  • RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推

    如何配置 1、可以直接在服务器端的broker.conf中进行配置, 在服务器端(rocketmq-broker端)的属性配置文件中加入以下行: messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 当然这种方式不够灵活,不推荐

    2、第二种方式就是在程序中进行指定,这个会在代码中展示,上述的时间配置述了各级别与延时时间的对应映射关系,

    这个配置项配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间; 时间单位支持:s、m、h、d,分别表示秒、分、时、天; 默认值就是上面声明的,可手工调整; 默认值已够用,不建议修改这个值。 了解了这些基本的概念后,下面通过一段简单的程序演示一下效果,相对于rabbitmq的延迟消息的使用,rocketmq的延迟消息使用起来简单了很多,

    3、我们使用一个controller模拟浏览器调用接口发送一个延迟的消息,这里为了演示方便发送消息的操作直接放在了controller里面了,实际开发中不要这样做,

    @RestController @RequestMapping("/api/order") public class OrderController {

    //http://localhost:8088/api/v1/order?msg=hello&tag=testtag
    
    @Autowired
    private MsgProducer msgProducer;
    
    @Autowired
    private PayProducer payProducer;
    
    /**
     * @param msg 支付信息
     * @param tag 消息二级分类
     * @return
     */
    @GetMapping("/order")
    public Object order(String msg, String tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
       Message message = new Message("testTopic",tag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
       SendResult result = msgProducer.getProducer().send(message);
       System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
       return JsonData.buildSuccess();
    }
    
    //http://localhost:8082/api/order/delay?text=hello order
    /**
     * 发送延迟消息
     * @param text
     * @return
     */
    @GetMapping("/delay")
    public Object sendDelayMsg(String text) throws MQClientException, RemotingException, InterruptedException{
    	Message message = new Message(JmsConfig.TOPIC, "delay_order",("this is a delay message:" + text).getBytes());
    	
        //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
    	message.setDelayTimeLevel(3);
    	payProducer.getProducer().send(message, new SendCallback() {
    		
    		//消息发送成功回调
    		@Override
    		public void onSuccess(SendResult sendResult) {
    			System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());
    		}
    		
    		//消息异常回调
    		@Override
    		public void onException(Throwable e) {
    			e.printStackTrace();
                //补偿机制,根据业务情况进行使用,看是否进行重试
    		}
    	});
    	return "send ok";
    }
    

    }

    2、payProducer类,

    @Component public class PayProducer {

    private String producerGroup = "pay_producer_group";
    
    private DefaultMQProducer producer;
    
    public  PayProducer(){
        producer = new DefaultMQProducer(producerGroup);
    
        //生产者投递消息重试次数
        producer.setRetryTimesWhenSendFailed(3);
    
        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
    
        start();
    }
    
    public DefaultMQProducer getProducer(){
        return this.producer;
    }
    
    /**
     * 对象在使用之前必须要调用一次,只能初始化一次
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown(){
        this.producer.shutdown();
    }
    

    }

    对了,我的rocketmq得服务器地址放在配置类里面了,如下,

    public class JmsConfig {

    public static final String NAME_SERVER = "192.168.111.132:9876";
    
    public static final String TOPIC = "DELAY_TOPIC";
    

    }

    基本上就可以了,然后我们启动一下程序,浏览器调用,然后看一下后台打印的日志, http://localhost:8082/api/order/delay?text=hello delayorder

    2019-11-06 17:06:43
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
RocketMQ Client-GO 介绍 立即下载
RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载