ActiveMQ笔记(7):如何清理无效的延时消息?

简介: ActiveMQ的延时消息是一个让人又爱又恨的功能,具体使用可参考上篇ActiveMQ笔记(6):消息延时投递,在很多需要消息延时投递的业务场景十分有用,但是也有一个缺陷,在一些大访问量的场景,如果瞬间向MQ发送海量的延时消息,超过MQ的调度能力,就会造成很多消息到了该投递的时刻,却没有投递出去,形成积压,一直停留在ActiveMQ web控制台的Scheduled面板中。

ActiveMQ的延时消息是一个让人又爱又恨的功能,具体使用可参考上篇ActiveMQ笔记(6):消息延时投递,在很多需要消息延时投递的业务场景十分有用,但是也有一个缺陷,在一些大访问量的场景,如果瞬间向MQ发送海量的延时消息,超过MQ的调度能力,就会造成很多消息到了该投递的时刻,却没有投递出去,形成积压,一直停留在ActiveMQ web控制台的Scheduled面板中。

下面的代码演示了,如何清理activemq中的延时消息(包括:全部清空及清空指定时间段的延时消息),这也是目前唯一可行的办法。

为了演示方便,先封装一个小工具类:

package cn.mwee.utils.mq;

import cn.mwee.utils.list.ListUtil;
import cn.mwee.utils.log4j2.MwLogger;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessagePostProcessor;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 * Created by yangjunming on 6/20/16.
 */
public final class MessageUtil {

    private Logger logger = new MwLogger(MessageUtil.class);//这里就是一个Log4j2的实例,大家可以换成原生的log4j2或类似工具

    private ConnectionFactory connectionFactory;
    private long receiveTimeout;//接收超时时间
    private JmsTemplate jmsTemplate;
    private List<String> destinationQueueNames;
    private final static String BACKUP_QUEUE_SUFFIX = "_B";
    private boolean autoBackup = false;//是否自动将消息备份到_b的队列,方便调试


    public MessageUtil(final ConnectionFactory connectionFactory, final long receiveTimeout, final List<String> destinationQueueNames) {
        this.connectionFactory = connectionFactory;
        this.receiveTimeout = receiveTimeout;
        this.destinationQueueNames = new ArrayList<>();
        this.destinationQueueNames.addAll(destinationQueueNames.stream().collect(Collectors.toList()));
        jmsTemplate = new JmsTemplate(this.connectionFactory);
        jmsTemplate.setReceiveTimeout(this.receiveTimeout);
    }

    public MessageUtil(ConnectionFactory connectionFactory, List<String> destinationQueueNames) {
        this(connectionFactory, 10000, destinationQueueNames);
    }


    public void convertAndSend(Object message) {
        if (ListUtil.isEmpty(destinationQueueNames)) {
            logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString());
            return;
        }
        for (String dest : destinationQueueNames) {
            jmsTemplate.convertAndSend(dest, message);
            if (autoBackup) {
                jmsTemplate.convertAndSend(dest + BACKUP_QUEUE_SUFFIX, message);
            }
        }
    }

    public void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) {
        if (ListUtil.isEmpty(destinationQueueNames)) {
            logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString());
            return;
        }
        for (String dest : destinationQueueNames) {
            jmsTemplate.convertAndSend(dest, message, messagePostProcessor);
            if (autoBackup) {
                jmsTemplate.convertAndSend(dest + BACKUP_QUEUE_SUFFIX, message, messagePostProcessor);
            }
        }
    }

    public void convertAndSend(String destinationName, Object message) {
        if (StringUtils.isBlank(destinationName)) {
            logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString());
            return;
        }
        jmsTemplate.convertAndSend(destinationName, message);
        if (autoBackup) {
            jmsTemplate.convertAndSend(destinationName + BACKUP_QUEUE_SUFFIX, message);
        }
    }


    public void convertAndSend(String destinationName, Object message, MessagePostProcessor messagePostProcessor) {
        if (StringUtils.isBlank(destinationName)) {
            logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString());
            return;
        }
        jmsTemplate.convertAndSend(destinationName, message, messagePostProcessor);
        if (autoBackup) {
            jmsTemplate.convertAndSend(destinationName + BACKUP_QUEUE_SUFFIX, message, messagePostProcessor);
        }
    }

    public static String getText(javax.jms.Message message) {
        if (message instanceof TextMessage) {
            try {
                return ((TextMessage) message).getText();
            } catch (JMSException e) {
                return message.toString();
            }
        }
        return message.toString();
    }

    public String getFirstDestination() {
        if (ListUtil.isEmpty(destinationQueueNames)) {
            return null;
        }
        return destinationQueueNames.get(0);
    }


    public boolean isAutoBackup() {
        return autoBackup;
    }

    public void setAutoBackup(boolean autoBackup) {
        this.autoBackup = autoBackup;
    }
}

其中主要就用到了convertAndSend(Object message, MessagePostProcessor messagePostProcessor) 这个方法,其它代码可以无视。

先来模拟瞬间向MQ发送大量延时消息:

    /**
     * 发送延时消息
     *
     * @param messageUtil
     */
    private static void sendScheduleMessage(MessageUtil messageUtil) {
        for (int i = 0; i < 10000; i++) {
            Object obj = "test:" + i;
            messageUtil.convertAndSend(obj, new ScheduleMessagePostProcessor(1000 + i * 1000));
        }
    }

这里向MQ发送了1w条延时消息,每条消息延时1秒*i,上面代码中的ScheduleMessagePostProcessor类可在上篇中找到。

运行完之后,MQ中应该堆积着了很多消息了:

下面的代码可以清空所有延时消息:

    /**
     * 删除所有延时消息
     *
     * @param connectionFactory
     * @throws JMSException
     */
    private static void deleteAllScheduleMessage(final ConnectionFactory connectionFactory) throws JMSException {
        Connection conn = connectionFactory.createConnection();
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
        MessageProducer producer = session.createProducer(management);
        Message request = session.createMessage();
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
        producer.send(request);
    }

清空所有延时消息,有些用力过猛了,很多时候,我们只需要清理掉过期的延时消息(即:本来计划是8:00投递出去的消息,结果过了8点还没投递出去) 

    /**
     * 删除过期的延时消息
     *
     * @param connectionFactory
     * @throws JMSException
     */
    private static void deleteExpiredScheduleMessage(final ConnectionFactory connectionFactory) throws JMSException {
        long start = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(12);//删除:当前时间前12小时范围的延时消息
        long end = System.currentTimeMillis();
        Connection conn = connectionFactory.createConnection();
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
        MessageProducer producer = session.createProducer(management);
        Message request = session.createMessage();
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
        producer.send(request);
    }

与上一段代码基本相似,只是多指定了删除消息的起止时间段。  

最后贴一段spring的配置文件及main函数入口

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
 5 
 6     <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
 7         <property name="connectionFactory">
 8             <bean class="org.apache.activemq.ActiveMQConnectionFactory">
 9                 <property name="brokerURL"
10                           value="failover:(tcp://localhost:61616,tcp://localhost:61626)?randomize=false&amp;backup=true"/>
11                 <property name="maxThreadPoolSize" value="100"/>
12             </bean>
13         </property>
14     </bean>
15 
16     <bean id="messageUtil" class="cn.mwee.utils.mq.MessageUtil">
17         <constructor-arg index="0" ref="jmsFactory"/>
18         <constructor-arg index="1" value="10000"/>
19         <constructor-arg index="2">
20             <list>
21                 <value>dest1</value>
22                 <value>dest2</value>
23             </list>
24         </constructor-arg>
25         <property name="autoBackup" value="true"/>
26     </bean>
27 
28 </beans>
View Code

main函数:

    public static void main(String[] args) throws InterruptedException, JMSException {
        ApplicationContext context = new ClassPathXmlApplicationContext("spring-sender.xml");
        ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class, "jmsFactory");
        MessageUtil messageUtil = context.getBean(MessageUtil.class);
//        sendScheduleMessage(messageUtil);
//        deleteAllScheduleMessage(connectionFactory);
        deleteExpiredScheduleMessage(connectionFactory);
    }

参考文章:

Enhanced JMS Scheduler in ActiveMQ

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 Java 数据库连接
JMS 消息队列接口基本使用指南
JMS 消息队列接口基本使用指南
935 0
|
消息中间件 存储 开发者
死信是什么,如何运用RabbitMQ的死信机制?
死信是什么,如何运用RabbitMQ的死信机制?
803 0
|
存储 人工智能 前端开发
AI 网关零代码解决 AI 幻觉问题
本文主要介绍了 AI Agent 的背景,概念,探讨了 AI Agent 网关插件的使用方法,效果以及实现原理。
19161 127
|
存储 关系型数据库 MySQL
"深入探索MySQL临时表:性能优化利器,数据处理的灵活之选"
【8月更文挑战第9天】MySQL临时表专为存储临时数据设计,自动创建与删除,仅在当前会话中存在,有助于性能优化。它分为本地临时表和全局临时表(通过特定逻辑模拟)。创建语法类似于普通表,但加TEMPORARY或TEMP关键字。适用于性能优化、数据预处理和复杂查询,需注意内存占用和事务支持问题。合理使用可大幅提升查询效率。
762 2
WK
|
安全 Java 编译器
C++和Java哪个更适合开发web网站
在Web开发领域,C++和Java各具优势。C++以其高性能、低级控制和跨平台性著称,适用于需要高吞吐量和低延迟的场景,如实时交易系统和在线游戏服务器。Java则凭借其跨平台性、丰富的生态系统和强大的安全性,广泛应用于企业级Web开发,如企业管理系统和电子商务平台。选择时需根据项目需求和技术储备综合考虑。
WK
417 0
|
存储 JSON 前端开发
SpringBoot 如何实现无感刷新Token
【8月更文挑战第30天】在Web开发中,Token(尤其是JWT)作为一种常见的认证方式,被广泛应用于身份验证和信息加密。然而,Token的有效期问题常常导致用户需要重新登录,从而影响用户体验。为了实现更好的用户体验,SpringBoot可以通过无感刷新Token的机制来解决这一问题。以下将详细介绍SpringBoot如何做到无感刷新Token。
801 2
|
人工智能 自然语言处理
AI ppt生成器 Tome(二)
Tome 是一个AI PPT生成器,能根据用户输入自动生成内容和图片。用户可通过工具栏与AI对话来调整PPT,支持样式定制。优点包括:AI生成内容(支持中英文)、图片生成、多媒体嵌入及多语言输入。缺点:不支持导出下载和模板有限。
|
SQL 数据可视化 atlas
用DataV Atlas探索杭州美食
DataV 可视分析地图 Atlas 作为一款面向时空地理数据的可视分析工具,支持海量时空数据的快显渲染和实时分析,能够通过 SQL 分析方式对用户的海量时空数据进行实时渲染和多维分析,帮助用户快速构建自己的地理分析地图,挖掘时空数据价值。 下面通过一份杭州的美食娱乐兴趣点数据在 DataV Atlas 产品上为大家探索一下所谓的美食荒漠城市到底有没有美食?
52160 10
|
机器学习/深度学习 人工智能 NoSQL
【AIGC】深入浅出理解检索增强技术(RAG)
【5月更文挑战第10天】本文介绍了检索增强生成(RAG)技术,这是一种将AI模型与内部数据结合,提升处理和理解能力的方法。通过实时从大型文档库检索信息,扩展预训练语言模型的知识。文章通过示例说明了当模型需要回答未公开来源的内容时,RAG如何通过添加上下文信息来增强模型的回答能力。讨论了实际应用中令牌限制和文本分块的问题,以及使用文本嵌入技术解决相关性匹配的挑战。最后,概述了实现RAG的步骤,并预告后续将分享构建检索增强服务的详情。
861 3
|
存储 运维 算法
课时1:微服务系统中的异常检测与根因定位分析
课时1:微服务系统中的异常检测与根因定位分析
下一篇
oss云网关配置