dubbo2.5-spring4-mybastis3.2-springmvc4-mongodb3.4-redis3.2整合(十二)之 spring中RabbitMQ延迟队列的实现

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010046908/article/details/57079566

在前面写过一篇dubbo2.5-spring4-mybastis3.2-springmvc4-mongodb3.4-redis3.2整合(七)RabbitMQ工作原理和Spring的集成
,今天在进一步使用一下RabbitMQ的延迟队列的实现。

1. 简介

RabbitMQ如何实现延迟队列:延迟队列存储的对象肯定是对应的延迟消息,所谓”延迟消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

2. RabbitMQ的延迟队列使用场景

场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延迟队列将订单信息发送到延迟队列。

场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到只能设备。

3.RabbitMQ实现延迟队列

AMQP协议,以及RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过TTL和DLX模拟出延迟队列的功能。

3.1 TTL(Time To Live)

RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
RabbitMQ针对队列中的消息过期时间有两种方法可以设置。

  1. 通过队列属性设置,队列中所有消息都有相同的过期时间。
  2. 对消息进行单独设置,每条消息TTL可以不同。

如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter

详细可以参考:RabbitMQ之TTL(Time-To-Live 过期时间)

3.2 DLX (Dead-Letter-Exchange)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。

x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange x-dead-letter-routing-key:指定routing-key发送队列出现dead letter的情况有:消息或者队列的TTL过期 队列达到最大长度 消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false,利DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。

4.案例的实现

4.1 rabbit.properties

rabbit_username=lidong1665
rabbit_password=123456
rabbit_host=192.168.0.107
rabbit_port=5672

4.2 spring-rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">


    <!--配置connection-factory,指定连接rabbit server参数 -->
    <rabbit:connection-factory id="rabbitConnectionFactory"
                               username="${rabbit_username}"
                               password="${rabbit_password}"
                               host="${rabbit_host}"
                               port="${rabbit_port}"/>

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin id="connectAdmin" connection-factory="rabbitConnectionFactory" />

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="3"/>
        <property name="maxPoolSize" value="5"/>
        <property name="queueCapacity" value="15"/>
    </bean>
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

    <rabbit:topic-exchange name="delayChangeTest"
declared-by="connectAdmin" delayed="true">
    <rabbit:bindings>
            <rabbit:binding queue="delay_queue"
                    pattern="order.delay.notify"
            />
        </rabbit:bindings>
    </rabbit:topic-exchange>


    <!--定义queue  配置延迟队列的信息-->
    <rabbit:queue name="delay_queue"
                  durable="true"
                  auto-declare="true"
                  auto-delete="false"
                  declared-by="connectAdmin">
    </rabbit:queue>


    <rabbit:template id="rabbitTemplate2" connection-factory="rabbitConnectionFactory"
                     exchange="delayChangeTest"/>


    <bean id="orderConsumer" class="com.lidong.dubbo.core.util.customer.OrderConsumer"></bean>


    <rabbit:listener-container
            connection-factory="rabbitConnectionFactory"
            acknowledge="manual"
            channel-transacted="false"
            message-converter="jsonMessageConverter">
        <rabbit:listener queues="queueTest"
        ref="messageReceiver"  method="onMessage"/>
    </rabbit:listener-container>
</beans>

4.3 创建生产者

package com.lidong.dubbo.core.spittle.service;

import com.lidong.dubbo.api.spittle.service.IMessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @项目名称:lidong-dubbo
 * @类名:MessageProducerImp
 * @类的描述:
 * @作者:lidong
 * @创建时间:2017/2/4 上午10:01
 * @公司:chni
 * @QQ:1561281670
 * @邮箱:lidong1665@163.com
 */
@Service
public class MessageProducerServiceImp implements IMessageProducer {


    private Logger logger = LoggerFactory.getLogger(MessageProducerServiceImp.class);
    @Resource
    private RabbitTemplate rabbitTemplate2;

    @Override
    public void sendMessage(Object message) {
        logger.info("to send message:{}",message);
        final int xdelay= 300*1000;
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        //发送延迟消息
        rabbitTemplate2.convertAndSend("order.delay.notify", message,
                new MessagePostProcessor() {

                    @Override
                    public Message postProcessMessage(Message message)
                            throws AmqpException {
                        //设置消息持久化
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        //设置延迟时间(5分钟后执行)
                        message.getMessageProperties().setDelay(xdelay);
                        logger.info("----"+sf.format(new Date()) + " Delay sent.");

                        return message;
                    }
                });
    }
}

4.4 创建消费者

package com.lidong.dubbo.core.util.customer;

import com.rabbitmq.client.Channel;
import org.activiti.engine.impl.util.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

/**
 * @项目名称:lidong-dubbo
 * @类名:OrderConsumer
 * @类的描述:
 * @作者:lidong
 * @创建时间:2017/2/25 下午12:59
 * @公司:chni
 * @QQ:1561281670
 * @邮箱:lidong1665@163.com
 */
public class OrderConsumer implements ChannelAwareMessageListener {
    private Logger logger = LoggerFactory.getLogger(OrderConsumer.class);
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        logger.info("[延时消息]" + message.getMessageProperties());
        if (message != null) {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            logger.debug("deliveryTag= "+deliveryTag);
            //手动确认
            channel.basicAck(deliveryTag,false);

        }

    }
}

发送消息之后。消费5分钟之后接受到消息,开始处理。

代码地址

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
编解码 NoSQL Java
使用Spring Boot + Redis 队列实现视频文件上传及FFmpeg转码的技术分享
【8月更文挑战第30天】在当前的互联网应用中,视频内容的处理与分发已成为不可或缺的一部分。对于视频平台而言,高效、稳定地处理用户上传的视频文件,并对其进行转码以适应不同设备的播放需求,是提升用户体验的关键。本文将围绕使用Spring Boot结合Redis队列技术来实现视频文件上传及FFmpeg转码的过程,分享一系列技术干货。
105 3
|
4天前
|
消息中间件 存储 NoSQL
如何用Redis实现延迟队列?
综上所述,通过Redis的有序集合和一些基本命令,我们可以轻松地构建出功能完善的延迟队列系统。根据具体需求,可以进一步优化和扩展,以满足高性能和高可靠性的业务需求。
12 1
|
1月前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
1月前
|
消息中间件 NoSQL Go
PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技术实践
【9月更文挑战第7天】在从 PHP 的 ThinkPHP 框架迁移到 Go 的 Gin 框架时,涉及 Redis 延时消息队列的技术实践主要包括:理解延时消息队列概念,其能在特定时间处理消息,适用于定时任务等场景;在 ThinkPHP 中使用 Redis 实现延时队列;在 Gin 中结合 Go 的 Redis 客户端库实现类似功能;Go 具有更高性能和简洁性,适合处理大量消息。迁移过程中需考虑业务需求及系统稳定性。
|
3月前
|
NoSQL Linux Redis
Redis性能优化问题之想确认Redis延迟变大是否因为fork耗时导致的,如何解决
Redis性能优化问题之想确认Redis延迟变大是否因为fork耗时导致的,如何解决
|
3月前
|
NoSQL Redis
Redis性能优化问题之为什么配置为 appendfsync everysec 的 AOF 也可能导致 Redis 延迟变大
Redis性能优化问题之为什么配置为 appendfsync everysec 的 AOF 也可能导致 Redis 延迟变大
|
3月前
|
监控 NoSQL Redis
Redis性能优化问题之配置 Redis 的自动碎片整理功能,如何解决
Redis性能优化问题之配置 Redis 的自动碎片整理功能,如何解决
|
4月前
|
监控 NoSQL Java
在Spring Boot中集成Redisson实现延迟队列
在Spring Boot中集成Redisson实现延迟队列
242 6
|
4月前
|
设计模式 前端开发 Java
【Spring MVC】快速学习使用Spring MVC的注解及三层架构
【Spring MVC】快速学习使用Spring MVC的注解及三层架构
57 1
|
3月前
|
消息中间件 NoSQL Kafka
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别