一、DelayExchange插件
使用死信队列可以实现延迟消息,但这种方法过于繁琐。为了简化这一过程,RabbitMQ的官方推出了一款插件,该插件原生支持延迟消息功能。该插件的运作原理是设计了一种特殊的交换机,当消息投递到这种交换机时,它能够暂存一段时间,直到达到设定的延迟时间后再将消息投递到相应的队列。这种设计大大简化了延迟消息的处理过程,提高了系统的效率和可靠性。
官方文档:
https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq
1.下载插件
插件下载地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
下载对应版本的插件:
2.安装插件
由于之前是基于Docker安装的RabbitMQ,所以需要查看RabbitMQ插件目录对应的数据卷:
docker volume inspect mq-plugins
运行结果:
[ { "CreatedAt": "2023-12-15T09:57:39+08:00", "Driver": "local", "Labels": null, "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data", "Name": "mq-plugins", "Options": null, "Scope": "local" } ]
切换到指定的路径:
cd /var/lib/docker/volumes/mq-plugins/_data
上传插件到该目录下:
安装插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
运行结果:
二、延迟消息插件实现延迟消息
1.基于注解方式
在consumer服务基于@RabbitListener注解来声明队列、交换机和绑定队列和交换机,并且设置交换机为延迟交换机:
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "delay.queue", durable = "true"), exchange = @Exchange(value = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayQueue(String msg) { log.info("delay.queue:" + msg); }
delayed = "true" | 设置交换机为延迟交换机 |
2.基于@Bean方式
在consumer服务基于@Bean注解来声明交换机、队列和绑定队列和交换机,并且设置交换机为延迟交换机:
@Configuration public class DirectConfiguration { @Bean public DirectExchange delayExchange() { return ExchangeBuilder .directExchange("delay.direct") .delayed() .durable(true) .build(); } @Bean public Queue delayedQueue() { return new Queue("delay.queue"); } @Bean public Binding delayQueueBinding() { return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay"); } }
.delayed() | 设置交换机为延迟交换机 |
3.声明结果
启动consumer服务,查看结果(注解方式和@Bean方式选一种):
4.延迟消息发送
在publisher服务中的测试类添加一个测试方法,通过消息头x-delay来设置过期时间,实现延迟消息发送:
@Test void testSendDelayMessage() { rabbitTemplate.convertAndSend("delay.direct", "delay", "hello", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(10000); return message; } }); log.info("消息发送成功"); }
.setDelay() | 通过消息头x-delay来设置消息的延迟时间 |
5.运行结果
publisher服务成功发送了一条消息,但consumer服务在10秒后才收到该消息,从而实现了延迟消息传递的目标:
总结
RabbitMQ是一个开源的消息队列软件,旨在提供可靠的消息传递和消息队列功能。本文主要介绍了DelayExchange插件、延迟消息插件实现延迟消息等内容,希望对大家有所帮助。