上一篇:RabbitMQ基础知识
1、相关依赖
这里无需指定版本号,让其跟着SpringBoot版本走。本示例使用SpringBoot版本号为2.7.10。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
2、生产者、消费者
创建两个SpringBoot应用,模拟消息生产者与消费者【publisher、consumer】。
2-1生产者
编写配置文件,用户名和密码等自行修改 这里虚拟机的名称是上一篇文章中新建的。
server.port=8082 #rabbitmq服务器ip spring.rabbitmq.host=localhost #rabbitmq的端口 spring.rabbitmq.port=5672 #用户名 spring.rabbitmq.username=用户名 #密码 spring.rabbitmq.password=密码 #配置虚拟机 spring.rabbitmq.virtual-host=demo
声明交换机、队列并绑定:
publicclassRabbitMqConfig { publicRabbitTemplaterabbitTemplate(CachingConnectionFactoryconnectionFactory) { RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); returnrabbitTemplate; } publicMessageConverterjackson2JsonMessageConverter() { returnnewJackson2JsonMessageConverter(); } publicDirectExchangegetExchange(){ returnnewDirectExchange("directExchange",false,false); } publicQueuegetQueue(){ returnnewQueue("publisher.addUser",true,false,false); } publicBindinggetBinding(DirectExchangeexchange,Queuequeue){ returnBindingBuilder.bind(queue).to(exchange).with("publisher.addUser"); } }
新建User实体类
publicclassUser { privateLongid; privateStringname; privateStringdesc; }
在方法中使用RabbitTemplate来发送消息:
publicinterfacePublisherService { /*** 添加用户* @param user 用户信息*/voidaddUser(Useruser); }
publicclassPublisherServiceImplimplementsPublisherService{ privatefinalRabbitTemplaterabbitTemplate; publicvoidaddUser(Useruser) { rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user); } }
以上需要注意的就是交换机的名称、队列名、routingKey。示例中使用的是直连交换机,routingKey需要和队列名保持一致。不懂的可以查看上一篇文章。
controller:
"/user") (publicclassUserController { privatefinalPublisherServicepublisherService; "/add") (publicvoidadd(){ Useruser=newUser(); user.setId(1000L); user.setName("黄忠"); user.setDesc("老兵不死,只是逐渐凋零"); publisherService.addUser(user); } }
2-2消费者
消费者的配置和生产者一样,不赘述了,直接看代码:
publicclassConsumerService { queues="publisher.addUser") (publicvoidaddUser(StringuserStr){ Useruser=JSONObject.parseObject(userStr,User.class); log.info(user.toString()); } }
@RabbitListener 注解是指定某方法作为消息消费的方法,指定队列名称。@RabbitListener 如果标注在类上,需配合 @RabbitHandler 注解一起使用,根据接受的参数类型进入具体的方法中。
2-3测试
消费端在启动时可能会报找不到交换机或队列,只需要让生产者发送一次消息,从控制台就可以看到相关的交换机和队列等信息了。
可以看到消费者成功消费了消息:
3、消费流程
通过上述操作,我们已经会简单地使用RabbitMQ了,接下来了解一下它的整个流程。如此可以让我们掌握的更牢固。
生产者:
- 生产者连接到Message Broker【也就是RabbitMQ服务】,建立一个连接( Connection)开启一个信道(Channel)。
- 生产者声明一个交换机,并设置相关属性,比如交换机类型、是否持久化等。
- 生产者声明一个队列并设置相关属性。
- 生产者通过路由键【Routing Key】将交换机和队列绑定。
- 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息。
- 相应的交换机根据接收到的路由键查找相匹配的队列。
- 如果找到,则将从生产者发送过来的消息存入相应的队列中。
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 关闭信道。
- 关闭连接。
消费者:
- 消费者连接到RabbitMQ Broker ,建立一个连接(Connection),开启一个信道(Channel) 。
- 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,
- 等待RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
- 消费者确认(ack) 接收到的消息。
- RabbitMQ 从队列中删除相应己经被确认的消息。
- 关闭信道。
- 关闭连接。