ActiveMQ笔记(6):消息延时投递

简介: 在开发业务系统时,某些业务场景需要消息定时发送或延时发送(类似:飞信的短信定时发送需求),这时候就需要用到activemq的消息延时投递,详细的文档可参考官网说明,本文只介绍二种常用的用法: 注:本文采用spring的JmsTemplate来发送消息 步骤1、首先要修改activemq.

在开发业务系统时,某些业务场景需要消息定时发送或延时发送(类似:飞信的短信定时发送需求),这时候就需要用到activemq的消息延时投递,详细的文档可参考官网说明,本文只介绍二种常用的用法:

注:本文采用spring的JmsTemplate来发送消息

步骤1、首先要修改activemq.xml配置文件,启用延时投递

1 <broker xmlns="http://activemq.apache.org/schema/core" ... schedulerSupport="true" >
2     ...
3   </broker>

即:在broker节点加上schedulerSupport="true",然后重启activemq即可

 

步骤2、定义一个MessagePostProcessor的实现类

import javax.jms.JMSException;
import javax.jms.Message;

import lombok.Data;
import org.apache.activemq.ScheduledMessage;
import org.apache.commons.lang3.StringUtils;
import org.springframework.jms.core.MessagePostProcessor;

/**
 * MQ延时投递处理器(注:ActiveMQ的配置文件中,要配置schedulerSupport="true",否则不起作用)
 * by: 杨俊明 2016-06-16
 */
@Data
public class ScheduleMessagePostProcessor implements MessagePostProcessor {

    private long delay = 0l;

    private String corn = null;

    public ScheduleMessagePostProcessor(long delay) {
        this.delay = delay;
    }

    public ScheduleMessagePostProcessor(String cron) {
        this.corn = cron;
    }

    public Message postProcessMessage(Message message) throws JMSException {
        if (delay > 0) {
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
        }
        if (!StringUtils.isEmpty(corn)) {
            message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, corn);
        }
        return message;
    }

}

 

步骤3、jmsTemplate发送示例

        Object message1 = "corn消息内容:" + DateUtil.formatDate(new Date());
        //分 时 天 月 星期几
        jmsTemplate.convertAndSend(message1, new ScheduleMessagePostProcessor("40 22 * * *"));
        logger.info("消息1:[" + message1 + "] 延时发送成功!");

        jmsTemplate.convertAndSend(message1, new ScheduleMessagePostProcessor("50 22 * * *"));
        logger.info("消息1:[" + message1 + "] 延时发送成功!");


        Object message2 = "message:" + DateUtil.formatDate(new Date());
        jmsTemplate.convertAndSend(message2, new ScheduleMessagePostProcessor(30 * 1000));//延时30秒
        jmsTemplate.convertAndSend(message2, new ScheduleMessagePostProcessor(3600 * 24 * 1000));//延时24小时
        logger.info("消息2:[" + message2 + "] 延时发送成功!");

上面的代码演示了二种延时的用法:延时N毫秒、按corn表达式延时(注:此corn表达式并非Quartz框架中的corn表达式,而是linux中corntab中的表达 式,基本顺序是"分(0-59) 时(0-23) 日(1-31) 月(1-12) 星期几(1-7) ")

 

发送成功后,可以登录activemq的webconsole查看消息的属性:

在scheduled面板中,可以看到延时的消息

点击看大图

注:在开启消息持久化存储的前提下,就算把相应的queue在webconsole面板中删除(即删除队列),只要投递的时间尚未到,该消息也不会删除,仍然能正常延时投递。

此外,在queues面板中,如何查看某条具体的消息,也可以通过属性发现这条消息是延时消息,参考下图:

点击看大图

 

参考文章:
1、Delay and Schedule Message Delivery

2、喂鸡百科上的Corn表达式解释 (中文)

3、喂鸡百科上的Corn表达式解释 (英文)

4、kahaDB官方文档

 

目录
相关文章
|
3月前
|
消息中间件 存储 RocketMQ
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
53 0
|
消息中间件 RocketMQ
RocketMQ极简入门-RocketMQ消息批量发送
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB,如果超过可以有2种处理方案: 1.将消息进行切割成多个小于4M的内容进行发送 2.修改4M的限制改成更大 可以设置Producer的maxMessageSize属性 修改配置文件中的maxMessageSize属性
376 0
|
消息中间件 RocketMQ
六.RocketMQ极简入门-RocketMQ消息批量发送
RocketMQ极简入门-RocketMQ消息批量发送
|
消息中间件 Shell RocketMQ
RocketMQ进阶-延时消息
RocketMQ进阶-延时消息
1094 0
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2631 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 安全 NoSQL
ActiveMQ系列:高级特性之异步投递、延时投递与定时投递
ActiveMQ 支持同步、异步两种发送的模式将消息发送到 broker,模式的选择对发送延时有巨大的影响。producer 能达到怎样的产出率(产出率 = 发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。
400 0
ActiveMQ系列:高级特性之异步投递、延时投递与定时投递
|
存储 消息中间件 固态存储
RocketMQ 消息存储和发送性能保证|学习笔记
快速学习 RocketMQ 消息存储和发送性能保证
110 0
RocketMQ 消息存储和发送性能保证|学习笔记
|
消息中间件
『RabbitMQ』通过死信队列实现延时消费功能
📣读完这篇文章里你能收获到 - 延时队列的应用场景介绍 - RabbitMQ通过TTL+DLX 实现定时任务
228 1
『RabbitMQ』通过死信队列实现延时消费功能
|
消息中间件 存储 Java
MQ系列5:RocketMQ消息的发送模式
MQ系列5:RocketMQ消息的发送模式
380 0
MQ系列5:RocketMQ消息的发送模式
|
消息中间件 存储 Java
RocketMQ延时消息的原理与实现
本文分享了RocketMQ的延时消息的原理和实现,手把手带你从源码角度了解到内部实现机制。
752 13
RocketMQ延时消息的原理与实现