RabbitMQ整合SpringBoot(七)

简介: RabbitMQ整合SpringBoot(七)

自动配置

1、RabbitAutoConfiguration

2、有自动配置了连接工厂ConnectionFactory;

3、RabbitProperties 封装了 RabbitMQ的配置

4、RabbitTemplate :给RabbitMQ发送和接受消息;

5、@EnableRabbit + @RabbitListener 监听消息队列的内容

1.引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.bfxy</groupId>
  <artifactId>rabbitmq-springboot-producer</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>rabbitmq-springboot-producer</name>
  <description>rabbitmq-springboot-producer</description>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.2.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>

2.yml配置

publisher-confirms,实现一个监听器用于 监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback

publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:Rabbit Template. ReturnCalllback


注意一点, 在发送消息的时候对template进行配置mandatory=true保证监听有效

生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等

spring:
  rabbitmq:
    addresses: 192.168.254.129:5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 15000
    publisher-confirms: true  #消息确认模式
    publisher-returns: true  #消息返回模式
    template:
      mandatory: true

3.启动类添加注解@EnableRabbit

4.生产端配置类

package com.lq.springboot.producer;
import java.util.Map;
import com.lq.springboot.entity.Order;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class RabbitSender {
  //自动注入RabbitTemplate模板类
  @Autowired
  private RabbitTemplate rabbitTemplate;  
  //回调函数: confirm确认
  final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
      System.err.println("correlationData: " + correlationData);
      System.err.println("ack: " + ack);
      if(!ack){
        System.err.println("异常处理....");
      }else{
        System.out.println("更新数据库...");
      }
    }
  };
  //回调函数: return返回
  final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
    @Override
    public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
        String exchange, String routingKey) {
      System.err.println("return exchange: " + exchange + ", routingKey: " 
        + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
    }
  };
  //发送消息方法调用: 构建Message消息
  public void send(Object message, Map<String, Object> properties) throws Exception {
    MessageHeaders mhs = new MessageHeaders(properties);
    Message msg = MessageBuilder.createMessage(message, mhs);
    rabbitTemplate.setConfirmCallback(confirmCallback);
    rabbitTemplate.setReturnCallback(returnCallback);
    //id + 时间戳 全局唯一 
    CorrelationData correlationData = new CorrelationData("1234567890");
    rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
  }
  //发送消息方法调用: 构建自定义对象消息
  public void sendOrder(Order order) throws Exception {
    rabbitTemplate.setConfirmCallback(confirmCallback);
    rabbitTemplate.setReturnCallback(returnCallback);
    //id + 时间戳 全局唯一 
    CorrelationData correlationData = new CorrelationData("0987654321");
    rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
  }
}

消费端properties

spring.rabbitmq.addresses=192.168.254.129:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
#手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual 
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*

消费监听类

package com.bfxy.springboot.conusmer;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class RabbitReceiver {
  @RabbitListener(bindings = @QueueBinding(
      value = @Queue(value = "queue-1", 
      durable="true"),
      exchange = @Exchange(value = "exchange-1", 
      durable="true", 
      type= "topic", 
      ignoreDeclarationExceptions = "true"),
      key = "springboot.*"
      )
  )
  /**
   * @deprecated : 自动创建队列和交换机并进行绑定
   * @param order
   * @param channel
   * @param headers
   * @throws Exception
   */
  @RabbitListener(bindings = @QueueBinding(
      value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", 
      durable="${spring.rabbitmq.listener.order.queue.durable}"),
      exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", 
      durable="${spring.rabbitmq.listener.order.exchange.durable}", 
      type= "${spring.rabbitmq.listener.order.exchange.type}", 
      ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
      key = "${spring.rabbitmq.listener.order.key}"
      )
  )
  @RabbitHandler
  public void onMessage(Message message, Channel channel) throws Exception {
    System.err.println("--------------------------------------");
    System.err.println("消费端Payload: " + message.getPayload());
    Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    //手工ACK
    channel.basicAck(deliveryTag, false);
  }
  //接收java对象
  @RabbitHandler
  public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, 
      Channel channel, 
      @Headers Map<String, Object> headers) throws Exception {
    System.err.println("--------------------------------------");
    System.err.println("消费端order: " + order.getId());
    Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
    //手工ACK
    channel.basicAck(deliveryTag, false);
  }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
39 6
|
7月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
5月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
1102 3
|
5月前
|
消息中间件 Java Maven
|
6月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
422 1
|
6月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
6月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
6月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成