RabbitMQ整合Spring AMQP

简介: RabbitMQ整合Spring AMQP

RabbitMQ整合Spring AMQP

添加依赖

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

RabbitMQ配置类,注意:rabbitAdmin与rabbitTemplate方法变量中的ConnectionFactory connectionFactory需要与ConnectionFactory连接工厂的名称保持一致,否则会注入失败。

@Configuration
@ComponentScan({"com.bfxy.spring.*"})
public class RabbitMQConfig {
  @Bean
  public ConnectionFactory connectionFactory(){
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setAddresses("192.168.11.76:5672");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("/");
    return connectionFactory;
  }
  @Bean
  public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    rabbitAdmin.setAutoStartup(true);
    return rabbitAdmin;
  }
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      return rabbitTemplate;
    }
        /**  
     * 针对消费者配置  
     * 1. 设置交换机类型  
     * 2. 将队列绑定到交换机  
        FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  
        HeadersExchange :通过添加属性key-value匹配  
        DirectExchange:按照routingkey分发到指定队列  
        TopicExchange:多关键字匹配  
     */  
    @Bean
    public TopicExchange exchange001() {
        return new TopicExchange("topic001", true, false);
    }
    @Bean
    public Queue queue001() {
        return new Queue("queue001", true); //队列持久
    }
    @Bean
    public Binding binding001() {
        return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
    }
    @Bean  
    public TopicExchange exchange002() {  
        return new TopicExchange("topic002", true, false);  
    }  
    @Bean  
    public Queue queue002() {  
        return new Queue("queue002", true); //队列持久  
    }
    @Bean  
    public Binding binding002() {  
        return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");  
    } 
    @Bean  
    public Queue queue003() {  
        return new Queue("queue003", true); //队列持久  
    }
    @Bean  
    public Binding binding003() {  
        return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*");  
    } 
    @Bean  
    public Queue queue_image() {  
        return new Queue("image_queue", true); //队列持久  
    }
    @Bean  
    public Queue queue_pdf() {  
        return new Queue("pdf_queue", true); //队列持久  
    }
}

RabbitAdmin使用案例

该类封装了对 RabbitMQ 的管理操作可以用来创建队列,交换机,以及他们之间的绑定

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
  @Test
  public void contextLoads() {
  }
  @Autowired
  private RabbitAdmin rabbitAdmin;
  @Test
  public void testAdmin() throws Exception {
    rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
    rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
    rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
    rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
    rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
    rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
    rabbitAdmin.declareBinding(new Binding("test.direct.queue",
        Binding.DestinationType.QUEUE,
        "test.direct", "direct", new HashMap<>()));
    rabbitAdmin.declareBinding(
        BindingBuilder
        .bind(new Queue("test.topic.queue", false))   //直接创建队列
        .to(new TopicExchange("test.topic", false, false))  //直接创建交换机 建立关联关系
        .with("user.#")); //指定路由Key
    rabbitAdmin.declareBinding(
        BindingBuilder
        .bind(new Queue("test.fanout.queue", false))    
        .to(new FanoutExchange("test.fanout", false, false)));
    //清空队列数据
    rabbitAdmin.purgeQueue("test.topic.queue", false);
  }

消息模板RabbitTemplate类

Spring AMQP 提供了 RabbitTemplate 来简化 RabbitMQ 发送和接收消息操作

该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCalallback、返回值确认接口ReturnCallback等等。 同样我们需要进行注入到Spring容器中,然后直接使

  @Autowired
  private RabbitTemplate rabbitTemplate;
  @Test
  public void testSendMessage() throws Exception {
    //1 创建消息
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.getHeaders().put("desc", "信息描述..");
    messageProperties.getHeaders().put("type", "自定义消息类型..");
    Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
    rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
        System.err.println("------添加额外的设置---------");
        message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
        message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
        return message;
      }
    });
  }
  @Test
  public void testSendMessage2() throws Exception {
    //1 创建消息
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("text/plain");
    Message message = new Message("mq 消息1234".getBytes(), messageProperties);
    rabbitTemplate.send("topic001", "spring.abc", message);
    rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
    rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
  }

SimpleMessageListenerContainer简单消息监听器

该类可以监听多个队列,设置事物特性,事物管理器,事物属性,事物容量(并发),回滚消息等

设置消费者标签生成策略,是否独占模式,消费者属性等。

设置具体的监听器、消息转换器等。

配置类中注入

    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
      container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
      container.setConcurrentConsumers(1);//当前消费者数量
      container.setMaxConcurrentConsumers(5);//最大消费者数量
      container.setDefaultRequeueRejected(false);//是否重回队列
      container.setAcknowledgeMode(AcknowledgeMode.AUTO);//自动签收
      container.setExposeListenerChannel(true);
      //消费端标签策略
      container.setConsumerTagStrategy(new ConsumerTagStrategy() {
      @Override
      public String createConsumerTag(String queue) {
        return queue + "_" + UUID.randomUUID().toString();
      }
    });
      //监听消息
      container.setMessageListener(new ChannelAwareMessageListener() {
      @Override
      public void onMessage(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        System.err.println("----------消费者: " + msg);
      }
    });
    }

MessageListenerAdapter消息监听适配器

        //1 适配器方式. 默认是有自己的方法名字的:handleMessage
        // 可以自己指定一个方法的名字: consumeMessage
        // 也可以添加一个转换器: 从字节数组转换为String
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");//修改默认方法
        adapter.setMessageConverter(new TextMessageConverter());
        container.setMessageListener(adapter);

转换器

public class MessageDelegate {
  public void handleMessage(byte[] messageBody) {
    System.err.println("默认方法, 消息内容:" + new String(messageBody));
  }
}
public class TextMessageConverter implements MessageConverter {
  @Override
  public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    return new Message(object.toString().getBytes(), messageProperties);
  }
  @Override
  public Object fromMessage(Message message) throws MessageConversionException {
    String contentType = message.getMessageProperties().getContentType();
    if(null != contentType && contentType.contains("text")) {
      return new String(message.getBody());
    }
    return message.getBody();
  }
}

发送消息测试方法

  @Test
  public void testSendMessage4Text() throws Exception {
    //1 创建消息
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("text/plain");
    Message message = new Message("mq 消息1234".getBytes(), messageProperties);
    rabbitTemplate.send("topic001", "spring.abc", message);
  }

可以让适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配,MessageDilegate写上对应的方法。使用setQueueOrTagToMethodName方法即可。

        //2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
         MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
         adapter.setMessageConverter(new TextMessageConverter());
         Map<String, String> queueOrTagToMethodName = new HashMap<>();
         queueOrTagToMethodName.put("queue001", "method1");
         queueOrTagToMethodName.put("queue002", "method2");
         adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
         container.setMessageListener(adapter);

MessageConverter消息转换器

 

我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter。

Json转换器: Jackson2JsonMessageConverter: 可以进行java对象的转换功能

DefaultJackson2Java TypeMapper映射器:可以进行java对象的映射关系

自定义二进制转换器:比如图片类型、PDF、PPT、 流媒体


Json转换器:

        // 1.1 支持json格式的转换器
         MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
         adapter.setDefaultListenerMethod("consumeMessage");
         Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
         adapter.setMessageConverter(jackson2JsonMessageConverter);
         container.setMessageListener(adapter);

MessagDelegate添加对应的方法

1.  public void consumeMessage(Map messageBody) {
2.    System.err.println("map方法, 消息内容:" + messageBody);
3.  }

测试类,首先创建Order类

  @Test
  public void testSendJsonMessage() throws Exception {
    Order order = new Order();
    order.setId("001");
    order.setName("消息订单");
    order.setContent("描述信息");
    ObjectMapper mapper = new ObjectMapper();
    String json = mapper.writeValueAsString(order);
    System.err.println("order 4 json: " + json);
    MessageProperties messageProperties = new MessageProperties();
    //这里注意一定要修改contentType为 application/json
    messageProperties.setContentType("application/json");
    Message message = new Message(json.getBytes(), messageProperties);
    rabbitTemplate.send("topic001", "spring.order", message);
  }

java对象转换只要添加这两句就可以了


        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();

        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);

测试方法添加

messageProperties.getHeaders().put("__TypeId__", "com.bfxy.spring.entity.Order");

全局转换器

        //全局的转换器:
        ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
        TextMessageConverter textConvert = new TextMessageConverter();
        convert.addDelegate("text", textConvert);
        convert.addDelegate("html/text", textConvert);
        convert.addDelegate("xml/text", textConvert);
        convert.addDelegate("text/plain", textConvert);
        Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
        convert.addDelegate("json", jsonConvert);
        convert.addDelegate("application/json", jsonConvert);
        ImageMessageConverter imageConverter = new ImageMessageConverter();
        convert.addDelegate("image/png", imageConverter);
        convert.addDelegate("image", imageConverter);
        PDFMessageConverter pdfConverter = new PDFMessageConverter();
        convert.addDelegate("application/pdf", pdfConverter);
        adapter.setMessageConverter(convert);
        container.setMessageListener(adapter);
        return container;


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 Java Maven
一文搞懂Spring Boot整合RocketMQ
一文搞懂Spring Boot整合RocketMQ
110 0
|
4月前
|
Java API 网络架构
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码
173 0
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
|
2月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
34 1
|
2月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
73 0
|
1月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
|
3月前
|
消息中间件 开发者 微服务
RabbitMQ和AMQP
RabbitMQ和AMQP
34 1
|
3月前
|
消息中间件 Java Spring
一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费
一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费
66 0
|
3月前
|
消息中间件 Java Spring
RabbitMQ各种模式的含义与Spring Boot实例详解
RabbitMQ各种模式的含义与Spring Boot实例详解
36 0
|
4月前
|
消息中间件 前端开发 架构师
华为架构师复盘2024最全2340页面试题jvm+spring+redis+MQ+微服务
包括 Java 集合、JVM、多线程、并发编程、设计模式、Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、MySQL、RabbitMQ、Kafka、Linux、Netty、Tomcat、Python、HTML、CSS、Vue、React、JavaScript、Android 大数据、阿里巴巴等大厂面试题等、等技术栈!
|
4月前
|
消息中间件 Java 中间件
Spring Boot异步消息之AMQP讲解及实战(附源码)
Spring Boot异步消息之AMQP讲解及实战(附源码)
97 1