【SpringBoot MQ 系列】RabbitMq 消息发送基本使用姿势

简介: 前面两篇博文,分别介绍了RabbitMq的核心知识点,以及整合SpringBoot的demo应用;接下来也该进入正题,看一下SpringBoot的环境下,如何玩转rabbitmq

image.png


前面两篇博文,分别介绍了RabbitMq的核心知识点,以及整合SpringBoot的demo应用;接下来也该进入正题,看一下SpringBoot的环境下,如何玩转rabbitmq


本篇内容主要为消息发送,包括以下几点


  • RabbitTemplate 发送消息的基本使用姿势
  • 自定义消息基本属性
  • 自定义消息转换器AbstractMessageConverter
  • 发送Object类型消息失败的case


I. 基本使用姿势



1. 配置


我们借助SpringBoot 2.2.1.RELEASE + rabbitmq 3.7.5来完整项目搭建与测试

项目pom.xml如下


<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码


配置文件application.yml内容如下


spring:
  rabbitmq:
    virtual-host: /
    username: admin
    password: admin
    port: 5672
    host: 127.0.0.1
复制代码


2. 配置类


通过前面rabbitmq的知识点学习,我们可以知道发送端的主要逻辑 “将消息发送给exchange,然后根据不同的策略分发给对应的queue”


本篇博文主要讨论的是消息发送,为了后续的实例演示,我们定义一个topic模式的exchange,并绑定一个的queue;(因为对发送端而言,不同的exchange类型,对发送端的使用姿势影响并不大,有影响的是消费者)


public class MqConstants {
    public static final String exchange = "topic.e";
    public static final String routing = "r";
    public final static String queue = "topic.a";
}
@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(MqConstants.exchange);
    }
    @Bean
    public Queue queue() {
        // 创建一个持久化的队列
        return new Queue(MqConstants.queue, true);
    }
    @Bean
    public Binding binding(TopicExchange topicExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(topicExchange).with(MqConstants.routing);
    }
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}
复制代码


3. 消息发送


消息发送,主要借助的是RabbitTemplate#convertAndSend方法来实现,通常情况下,我们直接使用即可


@Service
public class BasicPublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 一般的用法,推送消息
     *
     * @param ans
     * @return
     */
    private String publish2mq1(String ans) {
        String msg = "Durable msg = " + ans;
        System.out.println("publish: " + msg);
        rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
        return msg;
    }
}
复制代码


上面的核心点就一行rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);


  • 表示将msg发送给指定的exchange,并设置消息的路由键


请注意


通过上面的方式,发送的消息默认是持久化的,当持久化的消息,分发到持久化的队列时,会有消息的落盘操作;


在某些场景下,我们对消息的完整性要求并没有那么严格,反而更在意mq的性能,丢失一些数据也可以接受;这个时候我们可能需要定制一下发送的消息属性(比如将消息设置为非持久化的)


下面提供两种姿势,推荐第二种

/**
 * 推送一个非持久化的消息,这个消息推送到持久化的队列时,mq重启,这个消息会丢失;上面的持久化消息不会丢失
 *
 * @param ans
 * @return
 */
private String publish2mq2(String ans) {
    MessageProperties properties = new MessageProperties();
    properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
    Message message = rabbitTemplate.getMessageConverter().toMessage("NonDurable = " + ans, properties);
    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, message);
    System.out.println("publish: " + message);
    return message.toString();
}
private String publish2mq3(String ans) {
    String msg = "Define msg = " + ans;
    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setHeader("ta", "测试");
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        }
    });
    return msg;
}
复制代码

image.png


注意


  • 在实际的项目开发中,推荐使用MessagePostProcessor来定制消息属性
  • 其次不推荐在每次发送消息时都创建一个MessagePostProcessor对象,请定义一个通用的对象,能复用就复用


4. 非序列化对象发送异常case


通过查看rabbitTemplate#convertAndSend的接口定义,我们知道发送的消息可以是Object类型,那么是不是意味着任何对象,都可以推送给mq呢?


下面是一个测试case

private String publish2mq4(String ans) {
    NonSerDO nonSerDO = new NonSerDO(18, ans);
    System.out.println("publish: " + nonSerDO);
    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
    return nonSerDO.toString();
}
@Data
public static class NonSerDO {
    private Integer age;
    private String name;
    public NonSerDO(int age, String name) {
        this.age = age;
        this.name = name;
    }
}
复制代码


当我们调用上面的publish2mq4方法时,并不会是想象中的直接成功,相反抛出一个参数类型异常


image.png


为什么会出现这个问题呢?从堆栈分析,我们知道RabbitTemplate默认是利用SimpleMessageConverter来实现封装Message逻辑的,核心代码为


// 下面代码来自 org.springframework.amqp.support.converter.SimpleMessageConverter#createMessage
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
  byte[] bytes = null;
  if (object instanceof byte[]) {
    bytes = (byte[]) object;
    messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
  }
  else if (object instanceof String) {
    try {
      bytes = ((String) object).getBytes(this.defaultCharset);
    }
    catch (UnsupportedEncodingException e) {
      throw new MessageConversionException(
          "failed to convert to Message content", e);
    }
    messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
    messageProperties.setContentEncoding(this.defaultCharset);
  }
  else if (object instanceof Serializable) {
    try {
      bytes = SerializationUtils.serialize(object);
    }
    catch (IllegalArgumentException e) {
      throw new MessageConversionException(
          "failed to convert to serialized Message content", e);
    }
    messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
  }
  if (bytes != null) {
    messageProperties.setContentLength(bytes.length);
    return new Message(bytes, messageProperties);
  }
  throw new IllegalArgumentException(getClass().getSimpleName()
      + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}
复制代码


上面逻辑很明确的指出了,只接受byte数组,string字符串,可序列化对象(这里使用的是jdk的序列化方式来实现对象和byte数组之间的互转)


  • 所以我们传递一个非序列化的对象会参数非法的异常


自然而然的,我们会想有没有其他的MessageConverter来友好的支持任何类型的对象


5. 自定义MessageConverter


接下来我们希望通过自定义一个json序列化方式的MessageConverter来解决上面的问题


一个比较简单的实现(利用FastJson来实现序列化/反序列化)


public static class SelfConverter extends AbstractMessageConverter {
    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        messageProperties.setContentType("application/json");
        return new Message(JSON.toJSONBytes(object), messageProperties);
    }
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return JSON.parse(message.getBody());
    }
}
复制代码


重新定义一个rabbitTemplate,并设置它的消息转换器为自定义的SelfConverter

@Bean
public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new SelfConverter());
    return rabbitTemplate;
}
复制代码


然后再次测试一下

@Service
public class JsonPublisher {
    @Autowired
    private RabbitTemplate jsonRabbitTemplate;
    private String publish1(String ans) {
        Map<String, Object> msg = new HashMap<>(8);
        msg.put("msg", ans);
        msg.put("type", "json");
        msg.put("version", 123);
        System.out.println("publish: " + msg);
        jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
        return msg.toString();
    }
    private String publish2(String ans) {
        BasicPublisher.NonSerDO nonSerDO = new BasicPublisher.NonSerDO(18, "SELF_JSON" + ans);
        System.out.println("publish: " + nonSerDO);
        jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
        return nonSerDO.toString();
    }
}
复制代码


mq内接收到的推送消息如下

image.png


6. Jackson2JsonMessageConverter


上面虽然实现了Json格式的消息转换,但是比较简陋;而且这么基础通用的功能,按照Spring全家桶的一贯作风,肯定是有现成可用的,没错,这就是

Jackson2JsonMessageConverter


所以我们的使用姿势也可以如下


//定义RabbitTemplate
@Bean
public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
}
// 测试代码
@Autowired
private RabbitTemplate jacksonRabbitTemplate;
private String publish3(String ans) {
    Map<String, Object> msg = new HashMap<>(8);
    msg.put("msg", ans);
    msg.put("type", "jackson");
    msg.put("version", 456);
    System.out.println("publish: " + msg);
    jacksonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
    return msg.toString();
}
复制代码


下面是通过Jackson序列化消息后的内容,与我们自定义的有一些不同,多了headerscontent_encoding


image.png


7. 小结


本篇博文主要的知识点如下


  • 通过RabbitTemplate#convertAndSend来实现消息分发
  • 通过MessagePostProcessor来自定义消息的属性(请注意默认投递的消息时持久化的)
  • 默认的消息封装类为SimpleMessageConverter,只支持分发byte数组,字符串和可序列化的对象;不满足上面三个条件的方法调用会抛异常
  • 我们可以通过实现MessageConverter接口,来定义自己的消息封装类,解决上面的问题


在RabbitMq的知识点博文中,明确提到了,为了确保消息被brocker正确接收,提供了消息确认机制和事务机制两种case,那么如果需要使用这两种方式,消息生产者需要怎么做呢?


限于篇幅,下一篇博文将带来在消息确认机制/事务机制下的发送消息使用姿势




相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 Java Kafka
消息传递新纪元:探索RabbitMQ、RocketMQ和Kafka的魅力所在
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。其中,RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型;RocketMQ 则是由阿里巴巴开源的具备高性能、高可用性和高可靠性的分布式消息队列,支持事务消息等多种特性;而 Kafka 作为一个由 LinkedIn 开源的分布式流处理平台,以高吞吐量和良好的可扩展性著称。此外,还提供了使用这三种消息队列发送和接收消息的代码示例。总之,这三种消息队列各有优势,适用于不同的业务场景。
77 3
|
4月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 大数据 Kafka
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
本文深入探讨了消息队列的核心概念、应用场景及Kafka、RocketMQ、RabbitMQ的优劣势比较,大厂面试高频,必知必会,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
|
26天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
31 6
|
5月前
|
消息中间件 Java 测试技术
消息队列 MQ使用问题之数据流出规则是否支持平台的云RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
111 1
|
2月前
|
消息中间件 数据采集 数据库
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
37 1
|
3月前
|
消息中间件 监控 物联网
MQTT协议对接及RabbitMQ的使用记录
通过合理对接MQTT协议并利用RabbitMQ的强大功能,可以构建一个高效、可靠的消息通信系统。无论是物联网设备间的通信还是微服务架构下的服务间消息传递,MQTT和RabbitMQ的组合都提供了一个强有力的解决方案。在实际应用中,应根据具体需求和环境进行适当的配置和优化,以发挥出这两个技术的最大效能。
221 0
|
4月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
4月前
|
消息中间件 存储 监控
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别,设计目标、适用场景、吞吐量、消息存储和持久化、可靠性、集群负载均衡
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别