RabbitMQ精讲6:与Spring AMQP整合实战

简介: RabbitMQ精讲6:与Spring AMQP整合实战

目录

1. AMQP 核心组件

2. RabbitAdmin

3. SpringAMQP声明和RabbitTemplate

SpringAMQP声明

RabbitTemplate

4. SimpleMessageListenerContainer

SimpleMessageListenerContainer原理:为什么可以动态感知配置变更?

5. MessageListenerAdapter

6. MessageConverter

7 代码演示

7.1 引入Pom文件

7.2 配置类

7.3 消息转换器

图片转换器

PDF转换器

文本转换器

7.4 实体

7.5 测试类



1. AMQP 核心组件

  1. RabbitAdmin
  2. SpringAMQP声明
  3. RabbitTemplate
  4. SimpleMessageListenerContainer
  5. MessageListenerAdapter
  6. MessageConverter


2. RabbitAdmin

RabbitAdmin

 

RabbitAdmin

RabbitAdmin

 

注意:

  1. autoStartUp必须要设置为true,否则Spring容器不会加载RabbitAdmin类
  2. RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明
  3. 使用RabbitTemplate的execute方法执行对应的什么、修改、删除等一系列RabbitMQ基础功能操作
  4. 例如:添加一个交换机、删除一个绑定、清空一个队列里的消息等等


3. SpringAMQP声明和RabbitTemplate

SpringAMQP声明

  • 以前,在Rabbit基础API里面声明一个Exchange、声明一个绑定、一个队列
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 手工签收 必须要关闭 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));


  • 使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式
 /**  
     * 针对消费者配置  
     * 1. 设置交换机类型  
     * 2. 将队列绑定到交换机  
        FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  
        HeadersExchange :通过添加属性key-value匹配  
        DirectExchange:按照routingkey分发到指定队列  
        TopicExchange:多关键字匹配  
     */  
    @Bean  
    public TopicExchange exchange001() {  
        return new TopicExchange("topic001", true, false);  
    }  
    @Bean  
    public Queue queue001() {  
        return new Queue("queue001", true); //队列持久  
    }  
    @Bean  
    public Binding binding001() {  
        return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");  
    }  


RabbitTemplate

RabbitTemplate

RabbitTemplate,即消息模板

  • 我们在与SpringAMQP整合的时候进行发送消息的关键词
  • 该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口ReturnCallback等等。同样我们需要进行注入到Spring容器中,然后直接使用
  • 在与Spring整合时需要实例化,但是在与SpringBoot整合时,在配置文件里添加配置即可


4. SimpleMessageListenerContainer

SimpleMessageListenerContainer

SimpleMessageListenerContainer

 

SimpleMessageListenerContainer

 

SimpleMessageListenerContainer

 

简单消息监听容器

  • 这个类非常的强大,我们可以对它进行很多设置,对于消费者的配置项,这个类都可以满足
  • 监听队列(多个队列)、自动启动、自动声明功能
  • 设置事务特性、事务管理器、事务属性、事务容器(并发)、是否开启事务、回滚消息等
  • 设置消费者数量、最小最大数量、批量消费
  • 设置消息确认和自动确认模式、是否重回队列、异常捕捉handler函数
  • 设置消费者标签生成策略、是否独占模式、消费者属性等
  • 设置具体的监听器、消息转换器等等。

注意:

  • SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等
  • 很多机遇RabbitMQ的自制定话后端管控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出SpringAMQP非常的强大


SimpleMessageListenerContainer原理:为什么可以动态感知配置变更?

SimpleMessageListenerContainer用一个List<String> 来存储所有queue,当要添加queue时,就往这个List添加,然后调用queuesChanged()方法,把所有的consumer都取消订阅,然后把他们都移除,然后有多少个consumer再重新new相同数量的consumer出来,然后每个consumer都把List中的每个都都订阅,到此就完成了。


5. MessageListenerAdapter

MessageListenerAdapter

MessageListenerAdapter

 

默认方法名就是叫handleMessage。当然也可以自己去指定设置。通过messageListenerAdapter的代码我们可以看出如下核心属性

  • defaultListenerMethod默认监听方法名称:用于设置监听方法名称
  • Delegate 委托对象:实际真实的委托对象,用于处理消息
  • queueOrTagToMethodName 队列标识与方法名称组成集合
  • 可以一一进行队列与方法名称的匹配
  • 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理


6. MessageConverter

MessageConverter

MessageConverter

MessageConverter

我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter

自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口

重写下面两个方法:

  • toMessage:java对象转换为Message
  • fromMessage:Message对象转换为java对象

Json转换器:Jackson2JsonMessageConverter:可以进行Java对象的转换功能

DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系

自定义二进制转换器:比如图片类型、PDF、PPT、流媒体


7 代码演示

7.1 引入Pom文件

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
  </dependency>
  <!--amqp-->
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
</dependencies>

7.2 配置类

package com.bfxy.spring;
import java.util.UUID;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import com.bfxy.spring.adapter.MessageDelegate;
import com.bfxy.spring.convert.ImageMessageConverter;
import com.bfxy.spring.convert.PDFMessageConverter;
import com.bfxy.spring.convert.TextMessageConverter;
@Configuration
@ComponentScan({"com.bfxy.spring.*"})
public class RabbitMQConfig {
  @Bean
  public ConnectionFactory connectionFactory(){
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setAddresses("192.168.11.76:5672");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("/");
    return connectionFactory;
  }
  @Bean
  public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    rabbitAdmin.setAutoStartup(true);
    return rabbitAdmin;
  }
    /**  
     * 针对消费者配置  
     * 1. 设置交换机类型  
     * 2. 将队列绑定到交换机  
        FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  
        HeadersExchange :通过添加属性key-value匹配  
        DirectExchange:按照routingkey分发到指定队列  
        TopicExchange:多关键字匹配  
     */  
    @Bean  
    public TopicExchange exchange001() {  
        return new TopicExchange("topic001", true, false);  
    }  
    @Bean  
    public Queue queue001() {  
        return new Queue("queue001", true); //队列持久  
    }  
    @Bean  
    public Binding binding001() {  
        return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");  
    }  
    @Bean  
    public TopicExchange exchange002() {  
        return new TopicExchange("topic002", true, false);  
    }  
    @Bean  
    public Queue queue002() {  
        return new Queue("queue002", true); //队列持久  
    }
    @Bean  
    public Binding binding002() {  
        return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");  
    } 
    @Bean  
    public Queue queue003() {  
        return new Queue("queue003", true); //队列持久  
    }
    @Bean  
    public Binding binding003() {  
        return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*");  
    } 
    @Bean  
    public Queue queue_image() {  
        return new Queue("image_queue", true); //队列持久  
    }
    @Bean  
    public Queue queue_pdf() {  
        return new Queue("pdf_queue", true); //队列持久  
    }
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      return rabbitTemplate;
    }
    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
      container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
      container.setConcurrentConsumers(1);
      container.setMaxConcurrentConsumers(5);
      container.setDefaultRequeueRejected(false);
      container.setAcknowledgeMode(AcknowledgeMode.AUTO);
      container.setExposeListenerChannel(true);
      container.setConsumerTagStrategy(new ConsumerTagStrategy() {
      @Override
      public String createConsumerTag(String queue) {
        return queue + "_" + UUID.randomUUID().toString();
      }
    });
      /**
      container.setMessageListener(new ChannelAwareMessageListener() {
      @Override
      public void onMessage(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        System.err.println("----------消费者: " + msg);
      }
    });
      */
      /**
       * 1 适配器方式. 默认是有自己的方法名字的:handleMessage
        // 可以自己指定一个方法的名字: consumeMessage
        // 也可以添加一个转换器: 从字节数组转换为String
      MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
      adapter.setDefaultListenerMethod("consumeMessage");
      adapter.setMessageConverter(new TextMessageConverter());
      container.setMessageListener(adapter);
      */
      /**
       * 2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
       * 
      MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
      adapter.setMessageConverter(new TextMessageConverter());
      Map<String, String> queueOrTagToMethodName = new HashMap<>();
      queueOrTagToMethodName.put("queue001", "method1");
      queueOrTagToMethodName.put("queue002", "method2");
      adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
      container.setMessageListener(adapter);      
      */
        // 1.1 支持json格式的转换器
        /**
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);
        */
        // 1.2 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换
        /**
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);
        */
        //1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换
        /**
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
    idClassMapping.put("order", com.bfxy.spring.entity.Order.class);
    idClassMapping.put("packaged", com.bfxy.spring.entity.Packaged.class);
    javaTypeMapper.setIdClassMapping(idClassMapping);
    jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);
        */
        //1.4 ext convert
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        //全局的转换器:
    ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
    TextMessageConverter textConvert = new TextMessageConverter();
    convert.addDelegate("text", textConvert);
    convert.addDelegate("html/text", textConvert);
    convert.addDelegate("xml/text", textConvert);
    convert.addDelegate("text/plain", textConvert);
    Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
    convert.addDelegate("json", jsonConvert);
    convert.addDelegate("application/json", jsonConvert);
    ImageMessageConverter imageConverter = new ImageMessageConverter();
    convert.addDelegate("image/png", imageConverter);
    convert.addDelegate("image", imageConverter);
    PDFMessageConverter pdfConverter = new PDFMessageConverter();
    convert.addDelegate("application/pdf", pdfConverter);
    adapter.setMessageConverter(convert);
    container.setMessageListener(adapter);
      return container;
    }
}


7.3 消息转换器

图片转换器

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class ImageMessageConverter implements MessageConverter {
  @Override
  public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    throw new MessageConversionException(" convert error ! ");
  }
  @Override
  public Object fromMessage(Message message) throws MessageConversionException {
    System.err.println("-----------Image MessageConverter----------");
    Object _extName = message.getMessageProperties().getHeaders().get("extName");
    String extName = _extName == null ? "png" : _extName.toString();
    byte[] body = message.getBody();
    String fileName = UUID.randomUUID().toString();
    String path = "d:/010_test/" + fileName + "." + extName;
    File f = new File(path);
    try {
      Files.copy(new ByteArrayInputStream(body), f.toPath());
    } catch (IOException e) {
      e.printStackTrace();
    }
    return f;
  }
}


PDF转换器

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class PDFMessageConverter implements MessageConverter {
  @Override
  public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    throw new MessageConversionException(" convert error ! ");
  }
  @Override
  public Object fromMessage(Message message) throws MessageConversionException {
    System.err.println("-----------PDF MessageConverter----------");
    byte[] body = message.getBody();
    String fileName = UUID.randomUUID().toString();
    String path = "d:/010_test/" + fileName + ".pdf";
    File f = new File(path);
    try {
      Files.copy(new ByteArrayInputStream(body), f.toPath());
    } catch (IOException e) {
      e.printStackTrace();
    }
    return f;
  }
}


文本转换器

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class TextMessageConverter implements MessageConverter {
  @Override
  public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    return new Message(object.toString().getBytes(), messageProperties);
  }
  @Override
  public Object fromMessage(Message message) throws MessageConversionException {
    String contentType = message.getMessageProperties().getContentType();
    if(null != contentType && contentType.contains("text")) {
      return new String(message.getBody());
    }
    return message.getBody();
  }
}


7.4 实体

public class Order {
  private String id;
  private String name;
  private String content;
  public Order() {
  }
  public Order(String id, String name, String content) {
    this.id = id;
    this.name = name;
    this.content = content;
  }
  public String getId() {
    return id;
  }
  public void setId(String id) {
    this.id = id;
  }
  public String getName() {
    return name;
  }
  public void setName(String name) {
    this.name = name;
  }
  public String getContent() {
    return content;
  }
  public void setContent(String content) {
    this.content = content;
  }
}

 

public class Packaged {
  private String id;
  private String name;
  private String description;
  public Packaged() {
  }
  public Packaged(String id, String name, String description) {
    this.id = id;
    this.name = name;
    this.description = description;
  }
  public String getId() {
    return id;
  }
  public void setId(String id) {
    this.id = id;
  }
  public String getName() {
    return name;
  }
  public void setName(String name) {
    this.name = name;
  }
  public String getDescription() {
    return description;
  }
  public void setDescription(String description) {
    this.description = description;
  }
}


7.5 测试类

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.bfxy.spring.entity.Order;
import com.bfxy.spring.entity.Packaged;
import com.fasterxml.jackson.databind.ObjectMapper;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
  @Test
  public void contextLoads() {
  }
  @Autowired
  private RabbitAdmin rabbitAdmin;
  @Test
  public void testAdmin() throws Exception {
    rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
    rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
    rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
    rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
    rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
    rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
    rabbitAdmin.declareBinding(new Binding("test.direct.queue",
        Binding.DestinationType.QUEUE,
        "test.direct", "direct", new HashMap<>()));
    rabbitAdmin.declareBinding(
        BindingBuilder
        .bind(new Queue("test.topic.queue", false))   //直接创建队列
        .to(new TopicExchange("test.topic", false, false))  //直接创建交换机 建立关联关系
        .with("user.#")); //指定路由Key
    rabbitAdmin.declareBinding(
        BindingBuilder
        .bind(new Queue("test.fanout.queue", false))    
        .to(new FanoutExchange("test.fanout", false, false)));
    //清空队列数据
    rabbitAdmin.purgeQueue("test.topic.queue", false);
  }
  @Autowired
  private RabbitTemplate rabbitTemplate;
  @Test
  public void testSendMessage() throws Exception {
    //1 创建消息
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.getHeaders().put("desc", "信息描述..");
    messageProperties.getHeaders().put("type", "自定义消息类型..");
    Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
    rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
        System.err.println("------添加额外的设置---------");
        message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
        message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
        return message;
      }
    });
  }
  @Test
  public void testSendMessage2() throws Exception {
    //1 创建消息
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("text/plain");
    Message message = new Message("mq 消息1234".getBytes(), messageProperties);
    rabbitTemplate.send("topic001", "spring.abc", message);
    rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
    rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
  }
  @Test
  public void testSendMessage4Text() throws Exception {
    //1 创建消息
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("text/plain");
    Message message = new Message("mq 消息1234".getBytes(), messageProperties);
    rabbitTemplate.send("topic001", "spring.abc", message);
    rabbitTemplate.send("topic002", "rabbit.abc", message);
  }
  @Test
  public void testSendJsonMessage() throws Exception {
    Order order = new Order();
    order.setId("001");
    order.setName("消息订单");
    order.setContent("描述信息");
    ObjectMapper mapper = new ObjectMapper();
    String json = mapper.writeValueAsString(order);
    System.err.println("order 4 json: " + json);
    MessageProperties messageProperties = new MessageProperties();
    //这里注意一定要修改contentType为 application/json
    messageProperties.setContentType("application/json");
    Message message = new Message(json.getBytes(), messageProperties);
    rabbitTemplate.send("topic001", "spring.order", message);
  }
  @Test
  public void testSendJavaMessage() throws Exception {
    Order order = new Order();
    order.setId("001");
    order.setName("订单消息");
    order.setContent("订单描述信息");
    ObjectMapper mapper = new ObjectMapper();
    String json = mapper.writeValueAsString(order);
    System.err.println("order 4 json: " + json);
    MessageProperties messageProperties = new MessageProperties();
    //这里注意一定要修改contentType为 application/json
    messageProperties.setContentType("application/json");
    messageProperties.getHeaders().put("__TypeId__", "com.bfxy.spring.entity.Order");
    Message message = new Message(json.getBytes(), messageProperties);
    rabbitTemplate.send("topic001", "spring.order", message);
  }
  @Test
  public void testSendMappingMessage() throws Exception {
    ObjectMapper mapper = new ObjectMapper();
    Order order = new Order();
    order.setId("001");
    order.setName("订单消息");
    order.setContent("订单描述信息");
    String json1 = mapper.writeValueAsString(order);
    System.err.println("order 4 json: " + json1);
    MessageProperties messageProperties1 = new MessageProperties();
    //这里注意一定要修改contentType为 application/json
    messageProperties1.setContentType("application/json");
    messageProperties1.getHeaders().put("__TypeId__", "order");
    Message message1 = new Message(json1.getBytes(), messageProperties1);
    rabbitTemplate.send("topic001", "spring.order", message1);
    Packaged pack = new Packaged();
    pack.setId("002");
    pack.setName("包裹消息");
    pack.setDescription("包裹描述信息");
    String json2 = mapper.writeValueAsString(pack);
    System.err.println("pack 4 json: " + json2);
    MessageProperties messageProperties2 = new MessageProperties();
    //这里注意一定要修改contentType为 application/json
    messageProperties2.setContentType("application/json");
    messageProperties2.getHeaders().put("__TypeId__", "packaged");
    Message message2 = new Message(json2.getBytes(), messageProperties2);
    rabbitTemplate.send("topic001", "spring.pack", message2);
  }
  @Test
  public void testSendExtConverterMessage() throws Exception {
//      byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png"));
//      MessageProperties messageProperties = new MessageProperties();
//      messageProperties.setContentType("image/png");
//      messageProperties.getHeaders().put("extName", "png");
//      Message message = new Message(body, messageProperties);
//      rabbitTemplate.send("", "image_queue", message);
      byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf"));
      MessageProperties messageProperties = new MessageProperties();
      messageProperties.setContentType("application/pdf");
      Message message = new Message(body, messageProperties);
      rabbitTemplate.send("", "pdf_queue", message);
  }
}

 


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
28天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
32 6
|
2月前
|
自然语言处理 Java API
Spring Boot 接入大模型实战:通义千问赋能智能应用快速构建
【10月更文挑战第23天】在人工智能(AI)技术飞速发展的今天,大模型如通义千问(阿里云推出的生成式对话引擎)等已成为推动智能应用创新的重要力量。然而,对于许多开发者而言,如何高效、便捷地接入这些大模型并构建出功能丰富的智能应用仍是一个挑战。
289 6
|
2月前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
160 2
|
2月前
|
Java 数据库连接 Spring
【2021Spring编程实战笔记】Spring开发分享~(下)
【2021Spring编程实战笔记】Spring开发分享~(下)
36 1
|
2月前
|
XML Java 数据格式
Spring IOC容器的深度解析及实战应用
【10月更文挑战第14天】在软件工程中,随着系统规模的扩大,对象间的依赖关系变得越来越复杂,这导致了系统的高耦合度,增加了开发和维护的难度。为解决这一问题,Michael Mattson在1996年提出了IOC(Inversion of Control,控制反转)理论,旨在降低对象间的耦合度,提高系统的灵活性和可维护性。Spring框架正是基于这一理论,通过IOC容器实现了对象间的依赖注入和生命周期管理。
80 0
|
2月前
|
XML Java 数据库连接
【2020Spring编程实战笔记】Spring开发分享~(上)
【2020Spring编程实战笔记】Spring开发分享~
60 0
|
4月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
75 0
|
4月前
|
JSON Java API
解码Spring Boot与JSON的完美融合:提升你的Web开发效率,实战技巧大公开!
【8月更文挑战第29天】Spring Boot作为Java开发的轻量级框架,通过`jackson`库提供了强大的JSON处理功能,简化了Web服务和数据交互的实现。本文通过代码示例介绍如何在Spring Boot中进行JSON序列化和反序列化操作,并展示了处理复杂JSON数据及创建RESTful API的方法,帮助开发者提高效率和应用性能。
203 0
|
4月前
|
SQL Java 数据库连接
Spring Boot联手MyBatis,打造开发利器:从入门到精通,实战教程带你飞越编程高峰!
【8月更文挑战第29天】Spring Boot与MyBatis分别是Java快速开发和持久层框架的优秀代表。本文通过整合Spring Boot与MyBatis,展示了如何在项目中添加相关依赖、配置数据源及MyBatis,并通过实战示例介绍了实体类、Mapper接口及Controller的创建过程。通过本文,你将学会如何利用这两款工具提高开发效率,实现数据的增删查改等复杂操作,为实际项目开发提供有力支持。
307 0
|
4月前
|
缓存 NoSQL Java
惊!Spring Boot遇上Redis,竟开启了一场缓存实战的革命!
【8月更文挑战第29天】在互联网时代,数据的高速读写至关重要。Spring Boot凭借简洁高效的特点广受开发者喜爱,而Redis作为高性能内存数据库,在缓存和消息队列领域表现出色。本文通过电商平台商品推荐系统的实战案例,详细介绍如何在Spring Boot项目中整合Redis,提升系统响应速度和用户体验。
76 0