7、topic类型:主题模式
消费者订阅主题,获取符合条件的队列中的消息
- 匹配一个单词
匹配一个或多个单词
例:
cat.*.pop cat.temp.pop
cat.# cat.end.yield
*.keep.believe queen.keep.believe
代码示例:
生产者:
public class Producer { private static final String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); System.out.println("生产者发送消息:" + message); channel.basicPublish(EXCHANGE_NAME, "start.sss.so", null, message.getBytes("UTF-8")); } } }
消费者C1 :
public class Consumer1 { private static final String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = "Queue1"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.middle.*"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("C1接收到的消息:" + message); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息被中断"); }; channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
消费者C2 :
public class Consumer2 { private static final String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = "Queue2"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "#.end"); channel.queueBind(queueName, EXCHANGE_NAME, "start.*.*"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("C2接收到的消息:" + message); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息被中断"); }; channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
在生产者控制台输入:
hello
生产者发送消息:hello
nihao
生产者发送消息:nihao
zaima
生产者发送消息:zaima
消费者C1控制台没有显示内容。
消费者C2控制台显示:
C2接收到的消息:hello
C2接收到的消息:nihao
C2接收到的消息:zaima
六、死信队列:
1、死信:不能被接受的消息
2、造成死信的原因:
消息TTL过期,
队列达到最大长度,
消息被拒绝,
3、消息TTL过期
代码:
生产者:
public class Producer { private static final String EXCHANGE_NAME = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); AMQP.BasicProperties pro = new AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 0; i < 10; i++) { String message = "消息" + i; System.out.println("生产者发送消息:" + message); channel.basicPublish(EXCHANGE_NAME, "zhangsan", pro, message.getBytes("UTF-8")); } } }
消费者1:
public class Consumer1 { private static final String NORMAL_EXCHANGE = "normal_exchange"; private static final String NORMAL_QUEUE = "normal_queue"; private static final String DEAD_EXCHANGE = "dead_exchange"; private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); channel.exchangeDeclare(NORMAL_EXCHANGE, "direct"); channel.exchangeDeclare(DEAD_EXCHANGE, "direct"); Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange", DEAD_EXCHANGE); map.put("x-dead-letter-routing-key", "lisi"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, map); channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); channel.queueDeclare(DEAD_QUEUE, false, false, false, null); channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); DeliverCallback deliverCallback = (consumerTag, message) -> { String mes = new String(message.getBody(), "UTF-8"); System.out.println("C1接收到的消息:" + mes); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息被中断"); }; channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback); channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback); } }
消费者2:
public class Consumer2 { private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("C2接收到的消息:" + message); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息被中断"); }; channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback); } }
操作流程:
(1)先运行消费者1,声明 普通交换机、普通队列、死信交换机、死信队列
(2)然后关闭消费者1,运行生产者,发送消息
生产者控制台:
生产者发送消息:消息0
生产者发送消息:消息1
生产者发送消息:消息2
生产者发送消息:消息3
生产者发送消息:消息4
生产者发送消息:消息5
生产者发送消息:消息6
生产者发送消息:消息7
生产者发送消息:消息8
(3)然后运行消费者2,接收消息
消费者2控制台:
C2接收到的消息:消息0
C2接收到的消息:消息1
C2接收到的消息:消息2
C2接收到的消息:消息3
C2接收到的消息:消息4
C2接收到的消息:消息5
C2接收到的消息:消息6
C2接收到的消息:消息7
C2接收到的消息:消息8
C2接收到的消息:消息9
4、队列达到最大长度:
C1消费者添加代码:
map.put(“x-max-length”, 6);
5、消息被拒绝:
public class Consumer1 { private static final String NORMAL_EXCHANGE = "normal_exchange"; private static final String NORMAL_QUEUE = "normal_queue"; private static final String DEAD_EXCHANGE = "dead_exchange"; private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); channel.exchangeDeclare(NORMAL_EXCHANGE, "direct"); channel.exchangeDeclare(DEAD_EXCHANGE, "direct"); Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange", DEAD_EXCHANGE); map.put("x-dead-letter-routing-key", "lisi"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, map); channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); channel.queueDeclare(DEAD_QUEUE, false, false, false, null); channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); DeliverCallback deliverCallback = (consumerTag, message) -> { String mes = new String(message.getBody(), "UTF-8"); if (mes.equals("消息6")) { System.out.println("C1拒绝接收的消息:" + mes); channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println("C1接收到的消息:" + mes); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息被中断"); }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback); channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback); } }
操作步骤:
先打开C1、C2消费者等待接收消息,然后打开生产者
生产者控制台:
生产者发送消息:消息0
生产者发送消息:消息1
生产者发送消息:消息2
生产者发送消息:消息3
生产者发送消息:消息4
生产者发送消息:消息5
生产者发送消息:消息6
生产者发送消息:消息7
生产者发送消息:消息8
生产者发送消息:消息9
C1消费者控制台:
C1接收到的消息:消息0
C1接收到的消息:消息1
C1接收到的消息:消息2
C1接收到的消息:消息3
C1接收到的消息:消息4
C1接收到的消息:消息5
C1拒绝接收的消息:消息6
C1接收到的消息:消息7
C1接收到的消息:消息8
C1接收到的消息:消息9
C2消费者控制台:
C2接收到的消息:消息6
七、Springboot整合RabbitMQ:
pom.xml
org.springframework.boot
spring-boot-starter-web
2.4.6
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-starter-amqp
2.2.2.RELEASE
org.springframework.boot
spring-boot-starter
2.4.6
com.alibaba
fastjson
1.2.76
application.properties spring.rabbitmq.host=192.168.1.100 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
主启动类;
@SpringBootApplication
public class RabbitApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}
}
八、延迟队列:
优化版:添加QC队列
声明队列、交换机的配置类
@Configuration
public class RabbitmqConfig {
public static final String A_QUEUE = "aQueue"; public static final String B_QUEUE = "bQueue"; public static final String C_QUEUE = "cQueue"; public static final String D_QUEUE = "dQueue"; public static final String X_EXCHANGE = "xExchange"; public static final String Y_EXCHANGE = "yExchange"; @Bean("aQueue") public Queue aQueue() { HashMap<String, Object> map = new HashMap<>(3); map.put("x-dead-letter-exchange", Y_EXCHANGE); map.put("x-dead-letter-routing-key", "YD"); map.put("x-message-ttl", 10000); return QueueBuilder.durable(A_QUEUE).withArguments(map).build(); } @Bean("bQueue") public Queue bQueue() { HashMap<String, Object> map = new HashMap<>(3); map.put("x-dead-letter-exchange", Y_EXCHANGE); map.put("x-dead-letter-routing-key", "YD"); map.put("x-message-ttl", 40000); return QueueBuilder.durable(B_QUEUE).withArguments(map).build(); } @Bean("cQueue") public Queue cQueue() { HashMap<String, Object> map = new HashMap<>(3); map.put("x-dead-letter-exchange", Y_EXCHANGE); map.put("x-dead-letter-routing-key", "YD"); return QueueBuilder.durable(C_QUEUE).withArguments(map).build(); } @Bean("dQueue") public Queue dQueue() { return QueueBuilder.durable(D_QUEUE).build(); } @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_EXCHANGE); } @Bean public Binding aQueueBindX(@Qualifier("aQueue") Queue aQueue, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(aQueue).to(xExchange).with("XA"); } @Bean public Binding bQueueBindX(@Qualifier("bQueue") Queue bQueue, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(bQueue).to(xExchange).with("XB"); } @Bean public Binding cQueueBindX(@Qualifier("cQueue") Queue cQueue, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(cQueue).to(xExchange).with("XC"); } @Bean public Binding dQueueBindY(@Qualifier("dQueue") Queue dQueue, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(dQueue).to(yExchange).with("YD"); }
}
消息生产者:
@RestController
@RequestMapping(“/ttl”)
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable("message") String message) { System.out.println("当前时间:" + new Date().toString() +",发送的消息是:" + message); rabbitTemplate.convertAndSend("xExchange", "XA", "消息来自ttl为10秒的队列:" + message); rabbitTemplate.convertAndSend("xExchange", "XB", "消息来自ttl为40秒的队列:" + message); } @GetMapping("/sendMsg/{message}/{ttl}") public void sendMsg(@PathVariable("message") String message, @PathVariable("ttl") String ttl) { System.out.println("当前时间:" + new Date().toString() + ",发送的消息是:" + message); rabbitTemplate.convertAndSend("xExchange", "XC", "消息来自队列C:" + message, msg -> { msg.getMessageProperties().setExpiration(ttl); return msg; }); }
}
消息消费者;
@Component
public class MsgConsumer {
@RabbitListener(queues = “dQueue”)
public void deliverMsg(Message message, Channel channel) {
String s = new String(message.getBody());
System.out.println(“当前时间:” + new Date().toString() + “,接收到的队列消息—>” + s);
}
}
九、发布确认高级篇:
消息发布者发布消息之后,在消息队列服务器遇到中断,消息会丢失。
消息队列服务器中断的情形有两种:
(1)在交换机环节中断
(2)在队列环节中断
代码示例:
application.properties
开启发布确认回调方法
spring.rabbitmq.publisher-confirm-type=correlated
开启发布返回回调方法
spring.rabbitmq.publisher-returns=true
@Configuration
public class ConfirmConfig {
private static final String CONFIRM_EXCHANGE_NAME = “confirm_exchange”;
private static final String CONFIRM_QUEUE_NAME = “confirm_queue”;
private static final String ROUTING_KEY_NAME = “key1”;
@Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE_NAME); } @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding queueBindExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY_NAME); }
}
@RestController
@RequestMapping(“/confirm”)
public class ConfirmController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable("message") String message) {
// Ⅰ、正确的发送消息
CorrelationData cor = new CorrelationData(“10001”);
System.out.println(“发送的消息是:” + message + “001”);
rabbitTemplate.convertAndSend(“confirm_exchange”, “key1”, “消息:” + message + “001”, cor);
// Ⅱ、交换机错误
CorrelationData cor2 = new CorrelationData(“10002”);
System.out.println(“发送的消息是:” + message + “002”);
rabbitTemplate.convertAndSend(“confirm_exchange123”, “key1”, “消息:” + message, cor2);
// Ⅲ、队列错误
CorrelationData cor3 = new CorrelationData(“10003”);
System.out.println(“发送的消息是:” + message + “003”);
rabbitTemplate.convertAndSend(“confirm_exchange”, “key3”, “消息:” + message + “003”, cor3);
}
}
@Component
public class ConfirmConsumer {
@RabbitListener(queues = "confirm_queue") public void deliverMsg2(Message message, Channel channel) { String s = new String(message.getBody()); System.out.println("接收到的队列消息--->" + s); }
}
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { System.out.println("已经接收id为" + id + "的消息"); } else { System.out.println("没有接收到id为" + id + "的消息,原因为" + cause); } } @Override public void returnedMessage(ReturnedMessage returnedMessage) { String message = new String(returnedMessage.getMessage().getBody()); String exchange = returnedMessage.getExchange(); String replyText = returnedMessage.getReplyText(); String routingKey = returnedMessage.getRoutingKey(); System.out.println("消息" + message + "被交换机" + exchange + "退回,原因是" + replyText + ",路由key是" + routingKey); }
}
浏览器访问: http://localhost:8080/confirm/sendMsg/chrome
ConfirmController 只放开 Ⅰ ,控制台显示
发送的消息是:chrome001
接收到的队列消息—>消息:chrome001
已经接收id为10001的消息
ConfirmController 只放开 Ⅱ ,控制台显示
发送的消息是:chrome002
2022-06-06 19:34:47.360 ERROR 18088 — [.168.1.100:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘confirm_exchange123’ in vhost ‘/’, class-id=60, method-id=40)
没有接收到id为10002的消息,原因为channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘confirm_exchange123’ in vhost ‘/’, class-id=60, method-id=40)
ConfirmController 只放开 Ⅲ,控制台显示
发送的消息是:chrome003
消息消息:chrome003被交换机confirm_exchange退回,原因是NO_ROUTE,路由key是key3
已经接收id为10003的消息
备份交换机:
代码:
ConfirmConfig :添加备份交换机、备份队列、报警队列,绑定队列和交换机
@Configuration
public class ConfirmConfig {
private static final String CONFIRM_EXCHANGE_NAME = “confirm_exchange”;
private static final String CONFIRM_QUEUE_NAME = “confirm_queue”;
private static final String ROUTING_KEY_NAME = “key1”;
private static final String BACKUP_EXCHANGE_NAME = “backup_exchange”;
private static final String BACKUP_QUEUE_NAME = “backup_queue”;
private static final String WARNING_QUEUE_NAME = “warning_queue”;
@Bean("confirmExchange") public DirectExchange confirmExchange() { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME) .durable(true).alternate(BACKUP_EXCHANGE_NAME).build(); } @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding confirmQueueBindConfirmExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY_NAME); } @Bean("backupExchange") public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE_NAME); } @Bean("backupQueue") public Queue backupQueue() { return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } @Bean("warningQueue") public Queue warningQueue() { return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } @Bean public Binding backupQueueBindBackupExchange(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding warningQueueBindBackupExchange(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(warningQueue).to(backupExchange); }
}
ConfirmController 放开Ⅲ。
MsgConsumer: 添加备份消费者和报警消费者
@Component
public class MsgConsumer {
@RabbitListener(queues = "dQueue") public void deliverMsg(Message message, Channel channel) { String s = new String(message.getBody()); System.out.println("当前时间:" + new Date().toString() + ",接收到的队列消息--->" + s); } @RabbitListener(queues = "confirm_queue") public void deliverMsg2(Message message, Channel channel) { String s = new String(message.getBody()); System.out.println("接收到的队列消息--->" + s); } @RabbitListener(queues = "backup_queue") public void deliverMsg3(Message message, Channel channel) { String s = new String(message.getBody()); System.out.println("备份队列接收到的消息--->" + s); } @RabbitListener(queues = "warning_queue") public void deliverMsg4(Message message, Channel channel) { String s = new String(message.getBody()); System.out.println("WARNING!WARNING!WARNING!报警队列接收到的消息--->" + s); }
}
浏览器访问: http://localhost:8080/confirm/sendMsg/chrome
控制台显示:
发送的消息是:chrome003
已经接收id为10003的消息
WARNING!WARNING!WARNING!报警队列接收到的消息—>消息:chrome003
备份队列接收到的消息—>消息:chrome003
配置了备份交换机,则不走消息回退,而是走备份交换机。
表明:备份交换机比消息回退优先级高。
十、其他知识点
1、幂等性
对一个操作发起多次请求。
解决思路:使用全局ID或者唯一标识
解决办法:
(1)唯一ID+指纹锁机制
(2)redis的原子性
利用redis执行setx命令,天然具有幂等性,从而实现不重复消费
2、优先级队列:
生产者发送消息时,给消息设置优先级数,在队列中按照优先级对消息重新排序,优先级高的先发送到消费者,优先级低的后发送。
public class Producer {
private static final String QUEUE_NAME = “priority_queue”;
public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); HashMap<String, Object> map = new HashMap<>(); map.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, false, false, false, map); for (int i = 0; i < 10; i++) { String msg = "hello" + i; if (i % 3 == 0) { AMQP.BasicProperties build = new AMQP.BasicProperties().builder().priority(6).build(); channel.basicPublish("", QUEUE_NAME, build, msg.getBytes()); } else if (i == 8) { AMQP.BasicProperties build = new AMQP.BasicProperties().builder().priority(3).build(); channel.basicPublish("", QUEUE_NAME, build, msg.getBytes()); } else { channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } } System.out.println("消息发送完毕"); }
}
public class Consumer {
private static final String QUEUE_NAME = “priority_queue”;
public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); DeliverCallback d = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; CancelCallback c = consumerTag -> { System.out.println("消息被中断"); }; channel.basicConsume(QUEUE_NAME, true, d, c); }
}
控制台显示:
hello0
hello3
hello6
hello9
hello8
hello1
hello2
hello4
hello5
hello7
3、惰性队列:
队列的模式分为默认(default)和惰性(lazy)
默认队列消息保存在内存中,惰性队列消息保存在磁盘中。
在声明队列时 ,添加 “x-queue-mode” 属性,设置值为 “lazy”