rabbitmq_delayed_message_exchange
机制
安装插件后会生成新的Exchange类型x-delayed-message
,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia
(一个分布式数据系统)表中,并且当前节点是磁盘节点,那么节点重启后,消息还能保留。检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type
类型标记的交换机类型投递至目标队列。但是要注意的是,如果集群中只有一个磁盘节点,如果说磁盘节点丢失,或者节点上的插件失效。意味着消息将会丢失。
特性
- 可通过 x-delayed-type 指定类型 为 direct fanout topic 等
- 检测消息延迟时间,如果达到投递时间,通过x-delayed-type 标记的交换机类型进行投递。
- 消息存储在声明交换机的那个节点上,消息发送到延时交换机上,消息还未到发送时间,此时停机该节点,消费者不能消费此延时消息,后启动该节点,消息会被重新投递,消费者能够消费此延时消息。
- 磁盘节点,消息持久化,交换机持久化,消息不会丢失。
延时队列申明
Map<String,Object> map =new HashMap<>(); map.put("x-delayed-type", "direct"); channel.exchangeDeclare(exchangeName, "x-delayed-message", true, false, map);
插件安装
下载
https://www.rabbitmq.com/community-plugins.html
wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
解压
下载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/local/rabbitmq/plugins中(windows安装目录\rabbitmq_server-version\plugins )
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins
unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
启用插件
使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchang
输出结果
The following plugins have been enabled: rabbitmq_delayed_message_exchange
代码示例
生产者
import java.util.Date; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setHost("172.31.1.135"); connectionFactory.setUsername("xx"); connectionFactory.setPassword("xx"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "delay-exchange"; String routingkey = "delay.delay"; String queueName = "delay_queueName"; //x-delayed-message 声明 Map<String,Object> map =new HashMap<>(); map.put("x-delayed-type", "direct"); channel.exchangeDeclare(exchangeName, "x-delayed-message", true, false, map); //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。 channel.queueDeclare(queueName, true, false, false, map); channel.queueBind(queueName,exchangeName,routingkey); for (int i = 0; i < 3; i++) { // deliveryMode=2 持久化,expiration 消息有效时间 String msg = "delayed payload".getBytes("UTF-8") +" "+new Date().getTime(); byte[] messageBodyBytes = msg.getBytes(); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", 50000); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish(exchangeName, routingkey, props.build(), messageBodyBytes); } } }
消费者
import java.io.IOException; import java.util.Date; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setHost("172.31.1.135"); connectionFactory.setPort(5672); connectionFactory.setUsername("xxx"); connectionFactory.setPassword("xxx"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String queueName = "delay_queueName"; channel.queueDeclare(queueName,true,false,false,null); channel.basicConsume(queueName, false, "myConsumer Tag", new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String convernType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body)+ " "+new Date().getTime()); channel.basicAck(deliveryTag, false); } }); } }