RabbitMQ整合Spring AMQP

简介: RabbitMQ整合Spring AMQP

RabbitMQ整合Spring AMQP

添加依赖

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

RabbitMQ配置类,注意:rabbitAdmin与rabbitTemplate方法变量中的ConnectionFactory connectionFactory需要与ConnectionFactory连接工厂的名称保持一致,否则会注入失败。

@Configuration
@ComponentScan({"com.bfxy.spring.*"})
public class RabbitMQConfig {
  @Bean
  public ConnectionFactory connectionFactory(){
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setAddresses("192.168.11.76:5672");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("/");
    return connectionFactory;
  }
  @Bean
  public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    rabbitAdmin.setAutoStartup(true);
    return rabbitAdmin;
  }
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      return rabbitTemplate;
    }
        /**  
     * 针对消费者配置  
     * 1. 设置交换机类型  
     * 2. 将队列绑定到交换机  
        FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  
        HeadersExchange :通过添加属性key-value匹配  
        DirectExchange:按照routingkey分发到指定队列  
        TopicExchange:多关键字匹配  
     */  
    @Bean
    public TopicExchange exchange001() {
        return new TopicExchange("topic001", true, false);
    }
    @Bean
    public Queue queue001() {
        return new Queue("queue001", true); //队列持久
    }
    @Bean
    public Binding binding001() {
        return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
    }
    @Bean  
    public TopicExchange exchange002() {  
        return new TopicExchange("topic002", true, false);  
    }  
    @Bean  
    public Queue queue002() {  
        return new Queue("queue002", true); //队列持久  
    }
    @Bean  
    public Binding binding002() {  
        return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");  
    } 
    @Bean  
    public Queue queue003() {  
        return new Queue("queue003", true); //队列持久  
    }
    @Bean  
    public Binding binding003() {  
        return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*");  
    } 
    @Bean  
    public Queue queue_image() {  
        return new Queue("image_queue", true); //队列持久  
    }
    @Bean  
    public Queue queue_pdf() {  
        return new Queue("pdf_queue", true); //队列持久  
    }
}

RabbitAdmin使用案例

该类封装了对 RabbitMQ 的管理操作可以用来创建队列,交换机,以及他们之间的绑定

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
  @Test
  public void contextLoads() {
  }
  @Autowired
  private RabbitAdmin rabbitAdmin;
  @Test
  public void testAdmin() throws Exception {
    rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
    rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
    rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
    rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
    rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
    rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
    rabbitAdmin.declareBinding(new Binding("test.direct.queue",
        Binding.DestinationType.QUEUE,
        "test.direct", "direct", new HashMap<>()));
    rabbitAdmin.declareBinding(
        BindingBuilder
        .bind(new Queue("test.topic.queue", false))   //直接创建队列
        .to(new TopicExchange("test.topic", false, false))  //直接创建交换机 建立关联关系
        .with("user.#")); //指定路由Key
    rabbitAdmin.declareBinding(
        BindingBuilder
        .bind(new Queue("test.fanout.queue", false))    
        .to(new FanoutExchange("test.fanout", false, false)));
    //清空队列数据
    rabbitAdmin.purgeQueue("test.topic.queue", false);
  }

消息模板RabbitTemplate类

Spring AMQP 提供了 RabbitTemplate 来简化 RabbitMQ 发送和接收消息操作

该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCalallback、返回值确认接口ReturnCallback等等。 同样我们需要进行注入到Spring容器中,然后直接使

  @Autowired
  private RabbitTemplate rabbitTemplate;
  @Test
  public void testSendMessage() throws Exception {
    //1 创建消息
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.getHeaders().put("desc", "信息描述..");
    messageProperties.getHeaders().put("type", "自定义消息类型..");
    Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
    rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
        System.err.println("------添加额外的设置---------");
        message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
        message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
        return message;
      }
    });
  }
  @Test
  public void testSendMessage2() throws Exception {
    //1 创建消息
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("text/plain");
    Message message = new Message("mq 消息1234".getBytes(), messageProperties);
    rabbitTemplate.send("topic001", "spring.abc", message);
    rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
    rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
  }

SimpleMessageListenerContainer简单消息监听器

该类可以监听多个队列,设置事物特性,事物管理器,事物属性,事物容量(并发),回滚消息等

设置消费者标签生成策略,是否独占模式,消费者属性等。

设置具体的监听器、消息转换器等。

配置类中注入

    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
      container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
      container.setConcurrentConsumers(1);//当前消费者数量
      container.setMaxConcurrentConsumers(5);//最大消费者数量
      container.setDefaultRequeueRejected(false);//是否重回队列
      container.setAcknowledgeMode(AcknowledgeMode.AUTO);//自动签收
      container.setExposeListenerChannel(true);
      //消费端标签策略
      container.setConsumerTagStrategy(new ConsumerTagStrategy() {
      @Override
      public String createConsumerTag(String queue) {
        return queue + "_" + UUID.randomUUID().toString();
      }
    });
      //监听消息
      container.setMessageListener(new ChannelAwareMessageListener() {
      @Override
      public void onMessage(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        System.err.println("----------消费者: " + msg);
      }
    });
    }

MessageListenerAdapter消息监听适配器

        //1 适配器方式. 默认是有自己的方法名字的:handleMessage
        // 可以自己指定一个方法的名字: consumeMessage
        // 也可以添加一个转换器: 从字节数组转换为String
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");//修改默认方法
        adapter.setMessageConverter(new TextMessageConverter());
        container.setMessageListener(adapter);

转换器

public class MessageDelegate {
  public void handleMessage(byte[] messageBody) {
    System.err.println("默认方法, 消息内容:" + new String(messageBody));
  }
}
public class TextMessageConverter implements MessageConverter {
  @Override
  public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    return new Message(object.toString().getBytes(), messageProperties);
  }
  @Override
  public Object fromMessage(Message message) throws MessageConversionException {
    String contentType = message.getMessageProperties().getContentType();
    if(null != contentType && contentType.contains("text")) {
      return new String(message.getBody());
    }
    return message.getBody();
  }
}

发送消息测试方法

  @Test
  public void testSendMessage4Text() throws Exception {
    //1 创建消息
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("text/plain");
    Message message = new Message("mq 消息1234".getBytes(), messageProperties);
    rabbitTemplate.send("topic001", "spring.abc", message);
  }

可以让适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配,MessageDilegate写上对应的方法。使用setQueueOrTagToMethodName方法即可。

        //2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
         MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
         adapter.setMessageConverter(new TextMessageConverter());
         Map<String, String> queueOrTagToMethodName = new HashMap<>();
         queueOrTagToMethodName.put("queue001", "method1");
         queueOrTagToMethodName.put("queue002", "method2");
         adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
         container.setMessageListener(adapter);

MessageConverter消息转换器

 

我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter。

Json转换器: Jackson2JsonMessageConverter: 可以进行java对象的转换功能

DefaultJackson2Java TypeMapper映射器:可以进行java对象的映射关系

自定义二进制转换器:比如图片类型、PDF、PPT、 流媒体


Json转换器:

        // 1.1 支持json格式的转换器
         MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
         adapter.setDefaultListenerMethod("consumeMessage");
         Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
         adapter.setMessageConverter(jackson2JsonMessageConverter);
         container.setMessageListener(adapter);

MessagDelegate添加对应的方法

1.  public void consumeMessage(Map messageBody) {
2.    System.err.println("map方法, 消息内容:" + messageBody);
3.  }

测试类,首先创建Order类

  @Test
  public void testSendJsonMessage() throws Exception {
    Order order = new Order();
    order.setId("001");
    order.setName("消息订单");
    order.setContent("描述信息");
    ObjectMapper mapper = new ObjectMapper();
    String json = mapper.writeValueAsString(order);
    System.err.println("order 4 json: " + json);
    MessageProperties messageProperties = new MessageProperties();
    //这里注意一定要修改contentType为 application/json
    messageProperties.setContentType("application/json");
    Message message = new Message(json.getBytes(), messageProperties);
    rabbitTemplate.send("topic001", "spring.order", message);
  }

java对象转换只要添加这两句就可以了


        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();

        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);

测试方法添加

messageProperties.getHeaders().put("__TypeId__", "com.bfxy.spring.entity.Order");

全局转换器

        //全局的转换器:
        ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
        TextMessageConverter textConvert = new TextMessageConverter();
        convert.addDelegate("text", textConvert);
        convert.addDelegate("html/text", textConvert);
        convert.addDelegate("xml/text", textConvert);
        convert.addDelegate("text/plain", textConvert);
        Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
        convert.addDelegate("json", jsonConvert);
        convert.addDelegate("application/json", jsonConvert);
        ImageMessageConverter imageConverter = new ImageMessageConverter();
        convert.addDelegate("image/png", imageConverter);
        convert.addDelegate("image", imageConverter);
        PDFMessageConverter pdfConverter = new PDFMessageConverter();
        convert.addDelegate("application/pdf", pdfConverter);
        adapter.setMessageConverter(convert);
        container.setMessageListener(adapter);
        return container;


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 Java 网络架构
|
22天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
31 6
|
4月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
4月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
73 0
|
5月前
|
消息中间件 物联网 API
消息队列 MQ使用问题之如何在物联网项目中搭配使用 MQTT、AMQP 与 RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
5月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
5月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
6月前
|
消息中间件 JavaScript Java
消息队列 MQ产品使用合集之如何嵌入到Spring Boot中运行
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java Spring
Spring Boot与RabbitMQ的集成应用
Spring Boot与RabbitMQ的集成应用