大家好,我是小悟。
第一章:认识 RabbitMQ —— 不是普通的兔子!
RabbitMQ 就像是一个超级邮差兔,不过它不送胡萝卜,专门传递消息!想象一下:
- 🏢 交换机(Exchange):邮局的分拣中心,负责把信件分到正确的路线
- 📮 队列(Queue):你的专属邮箱,消息就在这里等你来取
- 🏷️ 路由键(Routing Key):信封上的地址标签
- 📦 消息(Message):你要传递的包裹(可以是任何数据)
这只“兔子”有四种工作模式:
- Hello World 模式:简单直接,像扔飞盘一样
- Work Queues 模式:多个工人抢活干,卷起来了!
- Publish/Subscribe 模式:广播模式,一人说话全村听见
- Routing & Topics 模式:智能分发,精准投喂
第二章:搭建“胡萝卜农场” —— 环境准备
2.1 安装 RabbitMQ
# 用 Docker 快速召唤兔子 docker run -d \ --name rabbitmq \ -p 5672:5672 \ -p 15672:15672 \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin123 \ rabbitmq:management # 访问管理界面:http://localhost:15672 # 账号:admin / 密码:admin123
2.2 创建 SpringBoot 项目
<!-- pom.xml 添加“兔粮” --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
第三章:配置“兔子窝” —— 连接设置
# application.yml spring: rabbitmq: host: localhost port: 5672 username: admin password: admin123 # 虚拟主机,相当于兔子的公寓号 virtual-host: / # 开启消息确认(确保兔子没偷吃消息) publisher-confirm-type: correlated publisher-returns: true listener: simple: # 手动确认消息(收到要说谢谢哦) acknowledge-mode: manual # 并发消费者数量(几只兔子一起干活) concurrency: 3 max-concurrency: 10
第四章:定义“胡萝卜传输协议” —— 配置类
@Configuration public class RabbitConfig { // 定义一个队列 - 就像给兔子一个收件箱 @Bean public Queue carrotQueue() { // durable: true 表示兔子重启后队列还在(持久化) return new Queue("carrot.queue", true); } // 再定义一个队列,用来放烂掉的胡萝卜 @Bean public Queue deadCarrotQueue() { return new Queue("dead.carrot.queue", true); } // 定义一个直连交换机 - 最简单的邮局 @Bean public DirectExchange carrotExchange() { return new DirectExchange("carrot.exchange", true, false); } // 绑定队列和交换机(告诉兔子哪个邮箱放哪条路) @Bean public Binding bindCarrot() { return BindingBuilder.bind(carrotQueue()) .to(carrotExchange()) .with("carrot.routing.key"); // 路由键 } // JSON 消息转换器(把胡萝卜包装成标准快递盒) @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } // 死信队列配置(处理那些没人要的胡萝卜) @Bean public Queue carrotQueueWithDLX() { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "dead.letter.exchange"); args.put("x-dead-letter-routing-key", "dead.carrot"); return new Queue("carrot.with.dlx", true, false, false, args); } }
第五章:“种胡萝卜” —— 生产者代码
@Component @Slf4j public class CarrotProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送一根普通胡萝卜 */ public void sendFreshCarrot(String message) { log.info("发射一根新鲜胡萝卜: {}", message); // 确保消息持久化(放冰箱保存) MessageProperties props = MessagePropertiesBuilder.newInstance() .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); Message msg = new Message(message.getBytes(), props); rabbitTemplate.convertAndSend( "carrot.exchange", "carrot.routing.key", msg, new CorrelationData(UUID.randomUUID().toString()) ); log.info("胡萝卜已交给兔子快递员"); } /** * 发送一根 JSON 格式的豪华胡萝卜 */ public void sendLuxuryCarrot(CarrotDTO carrot) { log.info("准备发送豪华胡萝卜: {}", carrot); // 设置回调(确认兔子收到没) rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("兔子确认收到豪华胡萝卜,ID: {}", correlationData.getId()); } else { log.error("兔子拒收豪华胡萝卜!原因: {}", cause); } }); rabbitTemplate.convertAndSend( "carrot.exchange", "carrot.routing.key", carrot ); } /** * 发送延迟胡萝卜(10秒后才送达) */ public void sendDelayedCarrot(String message, int delaySeconds) { log.info("设置延迟胡萝卜,{}秒后送达: {}", delaySeconds, message); rabbitTemplate.convertAndSend( "delay.exchange", "delay.key", message, msg -> { msg.getMessageProperties().setDelay(delaySeconds * 1000); return msg; } ); } } // 胡萝卜数据传输对象 @Data @AllArgsConstructor @NoArgsConstructor class CarrotDTO { private String color; private Integer weight; private Date harvestTime; private List<String> vitamins; }
第六章:“吃胡萝卜” —— 消费者代码
@Component @Slf4j public class CarrotConsumer { /** * 处理新鲜胡萝卜 - 自动确认版 */ @RabbitListener(queues = "carrot.queue") public void eatFreshCarrot(String message) { log.info("兔子正在啃胡萝卜: {}", message); // 模拟吃胡萝卜的过程 try { Thread.sleep(1000); log.info("真香!胡萝卜吃完了"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } /** * 处理豪华胡萝卜 - 手动确认版 */ @RabbitListener(queues = "luxury.carrot.queue") public void eatLuxuryCarrot(Message message, Channel channel, @Payload CarrotDTO carrot) { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { log.info("收到豪华胡萝卜!颜色: {}, 重量: {}g", carrot.getColor(), carrot.getWeight()); // 模拟复杂的处理逻辑 processLuxuryCarrot(carrot); // 手动确认(告诉兔子:我吃完了,你可以送下一个了) channel.basicAck(deliveryTag, false); log.info("豪华胡萝卜处理完成,已确认"); } catch (Exception e) { log.error("吃豪华胡萝卜时噎住了: {}", e.getMessage()); try { // 处理失败,拒绝消息(可以设置重新入队或丢弃) channel.basicNack(deliveryTag, false, true); // 重新入队 // channel.basicReject(deliveryTag, false); // 直接丢弃 } catch (IOException ioException) { log.error("无法拒绝消息: {}", ioException.getMessage()); } } } /** * 批量吃胡萝卜(提高效率) */ @RabbitListener(queues = "batch.carrot.queue", containerFactory = "batchFactory") public void eatBatchCarrot(List<Message> messages, Channel channel) { log.info("收到一批胡萝卜,共{}根", messages.size()); for (Message message : messages) { try { String carrot = new String(message.getBody()); log.info("正在处理胡萝卜: {}", carrot); // 批量确认 channel.basicAck(message.getMessageProperties() .getDeliveryTag(), false); } catch (Exception e) { log.error("处理失败: {}", e.getMessage()); } } } private void processLuxuryCarrot(CarrotDTO carrot) { // 复杂的业务逻辑 log.info("分析胡萝卜营养成分..."); log.info("维生素含量: {}", carrot.getVitamins()); // 这里可以添加数据库操作、调用其他服务等 } }
第七章:特殊场景处理
7.1 死信队列配置
@Configuration public class DeadLetterConfig { @Bean public DirectExchange deadLetterExchange() { return new DirectExchange("dead.letter.exchange"); } @Bean public Queue deadLetterQueue() { return new Queue("dead.letter.queue"); } @Bean public Binding deadLetterBinding() { return BindingBuilder.bind(deadLetterQueue()) .to(deadLetterExchange()) .with("dead.carrot"); } /** * 监听死信队列 */ @Component @Slf4j public class DeadLetterConsumer { @RabbitListener(queues = "dead.letter.queue") public void handleDeadLetter(String message) { log.warn("收到死信胡萝卜,需要人工处理: {}", message); // 可以发送告警邮件、记录日志等 } } }
7.2 延迟队列(定时任务)
@Configuration public class DelayConfig { @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("delay.exchange", "x-delayed-message", true, false, args); } @Bean public Queue delayQueue() { return new Queue("delay.queue"); } @Bean public Binding delayBinding() { return BindingBuilder.bind(delayQueue()) .to(delayExchange()) .with("delay.key") .noargs(); } }
第八章:测试我们的“兔子农场”
@SpringBootTest @Slf4j class RabbitmqTest { @Autowired private CarrotProducer carrotProducer; @Test void testSendCarrot() { // 测试发送普通胡萝卜 carrotProducer.sendFreshCarrot("我是新鲜的胡萝卜!"); // 测试发送豪华胡萝卜 CarrotDTO luxuryCarrot = new CarrotDTO( "橙色", 200, new Date(), Arrays.asList("维生素A", "维生素C", "胡萝卜素") ); carrotProducer.sendLuxuryCarrot(luxuryCarrot); // 测试延迟胡萝卜 carrotProducer.sendDelayedCarrot("我是10秒后的胡萝卜", 10); log.info("所有胡萝卜已发送,请观察兔子们的表现!"); } @Test void testMassProduction() { // 批量生产胡萝卜,测试性能 for (int i = 1; i <= 100; i++) { carrotProducer.sendFreshCarrot("批量胡萝卜#" + i); } log.info("100根胡萝卜已投入生产线!"); } }
第九章:监控和运维
9.1 健康检查
management: endpoints: web: exposure: include: health,metrics,rabbit endpoint: health: show-details: always
9.2 自定义监控
@Component public class RabbitMonitor { @Autowired private RabbitAdmin rabbitAdmin; public void checkRabbitHealth() { Properties queueProperties = rabbitAdmin.getQueueProperties("carrot.queue"); if (queueProperties != null) { int messageCount = Integer.parseInt( queueProperties.get("QUEUE_MESSAGE_COUNT").toString() ); log.info("胡萝卜队列当前有 {} 根胡萝卜等待处理", messageCount); } } /** * 定期清理积压消息 */ @Scheduled(fixedDelay = 60000) // 每分钟检查一次 public void clearBacklog() { // 实现清理逻辑 log.info("正在打扫兔子窝..."); } }
第十章:总结 —— 和兔子相处的秘诀
学到了什么:
- RabbitMQ 是个好邮差:可靠、灵活、功能强大的消息中间件
- SpringBoot 是兔子的好朋友:通过简单的配置就能轻松集成
- 消息确认很重要:别让胡萝卜在半路丢了
- 死信队列是保险箱:处理异常情况的最佳实践
- 延迟消息很实用:实现定时任务的好方法
实践建议:
- 给消息上保险:总是开启消息持久化和确认机制
- 合理设置TTL:别让胡萝卜放太久烂掉了
- 监控不能少:时刻关注兔子的健康状况
- 错误处理要优雅:准备好处理各种异常情况
- 性能要平衡:消费者数量不是越多越好
学习方向:
- 集群部署:养一群兔子,避免单点故障
- 镜像队列:给胡萝卜做备份
- 优先级队列:VIP 胡萝卜优先处理
- 流控机制:别把兔子撑坏了
- 与其他组件集成:让兔子和别的动物(其他中间件)合作
结尾:
记住,RabbitMQ 就像一只训练有素的邮差兔:
- 你发送消息,它接收
- 你发布消息,它路由
- 你消费消息,它确认
- 你搞砸了,它死信
只要遵循 RabbitMQ 的“兔子法则”,你的消息系统就会像兔子繁殖一样——快速、可靠、生生不息!
最后:虽然 RabbitMQ 很可爱,但请不要真的喂它胡萝卜,它只吃 0 和 1(二进制数据)!
代码仓库建议结构:
rabbitmq-demo/ ├── src/main/java/com/example/rabbitmq/ │ ├── config/ # 配置类 │ ├── producer/ # 生产者 │ ├── consumer/ # 消费者 │ ├── dto/ # 数据传输对象 │ └── RabbitmqDemoApplication.java ├── src/main/resources/ │ └── application.yml └── pom.xml
现在就去和你的 RabbitMQ 兔子做好朋友吧!记得每天喂它足够的消息,但别把它撑着了哦~
谢谢你看我的文章,既然看到这里了,如果觉得不错,随手点个赞、转发、在看三连吧,感谢感谢。那我们,下次再见。
您的一键三连,是我更新的最大动力,谢谢
山水有相逢,来日皆可期,谢谢阅读,我们再会
我手中的金箍棒,上能通天,下能探海