交换机
在RabbitMQ中,生产者发送信息不会直接将消息投递到队列中,而是将消息投递到交换机中,再由交换机转发到具体的队列中,队列再将消息以推送或者拉取方式给消费进行消费。
交换机类型:
- direct直接类型;
- topic主题类型;
- headers标题类型;不常用
- fanout扇出类型。
无名类型:使用空字符串来指定
fanout交换机
生产者
public class Producer { public static final String QUEUE="hello"; public static void main(String args[]) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); String message="hello"; channel.basicPublish("exchange2","",null,message.getBytes()); System.out.println("发送成功"); } }
消费者
public class Consumer { public static void main(String args[]) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare("exchange2","fanout"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"exchange2",""); DeliverCallback deliverCallback=(consumerTag,message)->{ System.out.println(new String(message.getBody())); }; channel.basicConsume(queue,true,deliverCallback,consumerTag->{}); } }
Direct Exchange 直连交换其
这里整合Springboot代码如下:
application.yml文件
spring: rabbitmq: username: admin password: admin host: 182.92.167.13 port: 5672
Configuration配置类
@Configuration public class DirectExchange1 { @Bean public Queue queue(){ return new Queue("queue2",true); } @Bean public DirectExchange directExchange(){ return new DirectExchange("exchange3",true,false); } @Bean Binding binding(){ Binding binding= BindingBuilder.bind(queue()).to(directExchange()).with("123"); return binding; } }
生产者
rabbitTemplate.convertAndSend("exchange3","123","message");
消费者
@Component @RabbitListener(queues = "queue2") public class rabbitmq { @RabbitHandler public void consume(String message){ System.out.println(message); } }
Topic Exchange代码省略
死信队列
死信,就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
死信的来源
消息 TTL(存活时间) 过期
队列达到最大长度(队列满了,无法再添加数据到队列中)
消息被拒绝并且(不放回队列中: requeue=false)
生产者
MessageProperties messageProperties=new MessageProperties(); messageProperties.setExpiration("20000"); Message message1=new Message("hello".getBytes(),messageProperties); rabbitTemplate.convertAndSend("exchange4","2.1",message1);
Configuration配置
@Configuration public class RabbitmqConfig { @Bean public TopicExchange topicExchange(){ return new TopicExchange("exchange4"); } @Bean public Queue queue(){ Map<String,Object> map=new HashMap<>(); map.put("x-dead-letter-exchange","exchange4_1"); map.put("x-dead-letter-routing-key","error.er"); return new Queue("queue3",true,false,false,map); } @Bean public Binding binding(){ return BindingBuilder.bind(queue()).to(topicExchange()).with("2.*"); } @Bean public TopicExchange topicExchange1(){ return new TopicExchange("exchange4_1"); } @Bean public Queue queue1(){ return new Queue("queue3_1"); } @Bean public Binding binding1(){ return BindingBuilder.bind(queue1()).to(topicExchange1()).with("error.#"); } }
消费者
@Component public class rabbitmq { @RabbitListener(queues = "queue3_1") public void consume(String msg, Message message, Channel channel){ System.out.println(msg); } }