SpringBoot整合RabbitMQ实践详解

简介: SpringBoot整合RabbitMQ实践详解

【1】添加starter导入组件

pom文件如下:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--<dependency>-->
  <!--<groupId>javax.cache</groupId>-->
  <!--<artifactId>cache-api</artifactId>-->
<!--</dependency>-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.mybatis.spring.boot</groupId>
  <artifactId>mybatis-spring-boot-starter</artifactId>
  <version>1.3.2</version>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <scope>runtime</scope>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>

【2】RabbitMQ配置

application.properties中配置如下

spring.rabbitmq.host=192.168.2.110
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 下面两个是默认值,可以不用显示指定
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

自定义RabbitMQ配置类


如下实例,自定义了连接工厂和SimpleRabbitListenerContainerFactory 以及在项目启动时初始化了交换器、队列并进行了路由绑定。当然,如果MQ服务器实现配置了好交换机、路由以及绑定规则,则不需要这些自定义配置,直接使用消息模板进行发送即可。

@Configuration
public class MQConfig {
  public static final String MIAOSHA_QUEUE = "miaosha.queue";
  public static final String QUEUE = "queue";
  public static final String TOPIC_QUEUE1 = "topic.queue1";
  public static final String TOPIC_QUEUE2 = "topic.queue2";
  public static final String HEADER_QUEUE = "header.queue";
  public static final String TOPIC_EXCHANGE = "topicExchage";
  public static final String FANOUT_EXCHANGE = "fanoutxchage";
  public static final String HEADERS_EXCHANGE = "headersExchage";
  @Bean
  public ConnectionFactory connectionFactory() {
      CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
      connectionFactory.setUsername("admin");
      connectionFactory.setPassword("123456");
      connectionFactory.setVirtualHost("/");
      connectionFactory.setPublisherConfirms(true);
      connectionFactory.setAddresses("192.168.18.128:5672");//集群
      return connectionFactory;
  }
  @Bean
    public SimpleRabbitListenerContainerFactory myFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory =
                new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory());
//        factory.setMessageConverter(myMessageConverter());
        return factory;
    }
  /**
   * Direct模式 交换机Exchange
   */
  @Bean
  public Queue miaoshaQueue() {
    return new Queue("miaosha.queue", true);
  }
  @Bean
  public Queue queue() {
    return new Queue(QUEUE, true);
  }
  /**
   * Topic模式 交换机Exchange
   * */
  @Bean
  public Queue topicQueue1() {
    return new Queue(TOPIC_QUEUE1, true);
  }
  @Bean
  public Queue topicQueue2() {
    return new Queue(TOPIC_QUEUE2, true);
  }
  @Bean
  public TopicExchange topicExchage(){
    return new TopicExchange(TOPIC_EXCHANGE);
  }
  @Bean
  public Binding topicBinding1() {
    return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1");
  }
  @Bean
  public Binding topicBinding2() {
    return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#");
  }
  /**
   * Fanout模式 交换机Exchange
   * */
  @Bean
  public FanoutExchange fanoutExchage(){
    return new FanoutExchange(FANOUT_EXCHANGE);
  }
  /**
   * 广播模式不再需要路由键
   * @return
   */
  @Bean
  public Binding FanoutBinding1() {
    return BindingBuilder.bind(topicQueue1()).to(fanoutExchage());
  }
  @Bean
  public Binding FanoutBinding2() {
    return BindingBuilder.bind(topicQueue2()).to(fanoutExchage());
  }
  /**
   * Header模式 交换机Exchange
   * */
  @Bean
  public HeadersExchange headersExchage(){
    return new HeadersExchange(HEADERS_EXCHANGE);
  }
  @Bean
  public Queue headerQueue1() {
    return new Queue(HEADER_QUEUE, true);
  }
  @Bean
  public Binding headerBinding() {
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("header1", "value1");
    map.put("header2", "value2");
    return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match();
  }
}

【3】测试发送接收消息

引入rabbitmq相关的starter后,RabbitAutoConfiguration就会默认对RabbitMQ进行配置。

主要配置如下:

  • 自动配置了连接工厂ConnectionFactory;
  • RabbitProperties封装了RabbitMQ的属性配置;
  • RabbitTemplate用来发送接收消息;
  • AmqpAdmin–RabbitMQ系统管理功能组件。

这里使用RabbitTemplate测试消息发送和接收,源码如下:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootRabbitMQTests {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void testSend(){
        Map<String,Object> map = new HashMap<>();
        map.put("msg","第一条消息");
        map.put("data", Arrays.asList("hello",true,"中"));
        rabbitTemplate.convertAndSend("exchange.direct","rabbitmq",map);
    }
    @Test
    public void testReceive(){
        Object o = rabbitTemplate.receiveAndConvert("rabbitmq");
        System.out.println(o.getClass());
        System.out.println(o);
    }
}

发送后查看后台队列并获取消息如下图:


接收如下图所示:


RabbitTemplate使用详解

先看启类继承图:

可以看到其主要实现了AmqpTemplate接口的一些抽象方法,故而也可以使用如下方式引入消息模板,debug发现其实现类为RabbitTemplate:

@Autowired
AmqpTemplate amqpTemplate ;
public void sendMiaoshaMessage(MiaoshaMessage mm) {
  String msg = RedisService.beanToString(mm);
  log.info("send message:"+msg);
  //使用默认交换机,参数为路由键和消息体
  amqpTemplate.convertAndSend(MQConfig.MIAOSHA_QUEUE, msg);
}

这里需要注意下,如果没有指定交换机(exchange),那么将会使用默认的交换机,默认交换机将会根据路由键的名字寻找对应的queue把消息转发过去。默认的交换机是direct机制(根据名字精确匹配),如下图所示:

指定交换器名字发送消息

参数为交换器名字,路由键以及消息体

public void sendTopic(Object message) {
  String msg = RedisService.beanToString(message);
  log.info("send topic message:"+msg);
  amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg+"1");
  amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg+"2");
}

【4】替换默认的MessageConverter

如下图所示,RabbitTemplate默认使用的是SimpleMessageConverter:


该转换器默认使用SerializationUtils进行序列化和反序列化。这里如果想将数据序列化为JSON格式,方便查看,可以注册自定义的转换器(ampq包下的转换器)。

源码示例如下:

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * Created by Janus on 2018/7/6.
 */
@Configuration
public class MyAMQPConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

再次发送消息后台管理页面查看队列:

其他交换器类型的发送和接收,与上面相同,只是改一下对应交换器的名字即可。


【5】基于注解的RabbitMQ

① @EnableRabbit注解开启基于注解的RabbitMQ

@SpringBootApplication
@EnableCaching
@EnableRabbit
public class SpringBoot01CacheApplication {
  public static void main(String[] args) {
    SpringApplication.run(SpringBoot01CacheApplication.class, args);
  }
}

② @RabbitListener

源码如下:

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(RabbitListeners.class)
public @interface RabbitListener {
    String id() default "";
    String containerFactory() default "";
    String[] queues() default {};
    boolean exclusive() default false;
    String priority() default "";
    String admin() default "";
    QueueBinding[] bindings() default {};
    String group() default "";
}

测试实例如下:

@Service
public class RabbitListenerService {
    @RabbitListener(queues = {"rabbitmq"})
    public void receive(Map<String,Object> map){
        System.out.println("从队列rabbitmq获取到数据 : "+map);
    }
}

启动项目,一旦rabbitmq队列中有消息, 即会获取并打印,测试结果如下:


③ 获取消息头信息

不光可以直接获取消息体信息,还可以使用Message作为参数,从中分别获取消息体和消息头,源码示例如下:

 @RabbitListener(queues = {"rabbitmq.news"})
 public void receiveMessage(Message message){
      System.out.println("从消息中获取的消息体:"+message.getBody());
      System.out.println("从消息中获取的消息头信息 : "+message.getMessageProperties());
  }

测试结果如下图:


【6】AmqpAdmin编码创建交换器、队列并进行绑定

使用AmqpAdmin创建交换器、队列并将其进行了绑定,源码示例如下:

  @Autowired
    AmqpAdmin amqpAdmin;
    @Test
    public void createByAmqpAdmin(){
        //创建交换器
        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
        System.out.println("创建交换器完成");
        //创建队列
        amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
        Binding binding = new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqpadmin",null);
        amqpAdmin.declareBinding(binding);
    }

测试结果如下:




201807061845161.jpg



20180706184534160.jpg


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
9月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
2924 1
|
9月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
626 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
9月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
489 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
JSON 前端开发 Java
深入理解 Spring Boot 中日期时间格式化:@DateTimeFormat 与 @JsonFormat 完整实践
在 Spring Boot 开发中,日期时间格式化是前后端交互的常见痛点。本文详细解析了 **@DateTimeFormat** 和 **@JsonFormat** 两个注解的用法,分别用于将前端传入的字符串解析为 Java 时间对象,以及将时间对象序列化为指定格式返回给前端。通过完整示例代码,展示了从数据接收、业务处理到结果返回的全流程,并总结了解决时区问题和全局配置的最佳实践,助你高效处理日期时间需求。
2144 0
|
存储 Java 数据库
Spring Boot 注册登录系统:问题总结与优化实践
在Spring Boot开发中,注册登录模块常面临数据库设计、密码加密、权限配置及用户体验等问题。本文以便利店销售系统为例,详细解析四大类问题:数据库字段约束(如默认值缺失)、密码加密(明文存储风险)、Spring Security配置(路径权限不当)以及表单交互(数据丢失与提示不足)。通过优化数据库结构、引入BCrypt加密、完善安全配置和改进用户交互,提供了一套全面的解决方案,助力开发者构建更 robust 的系统。
479 0
|
消息中间件 存储 监控
活动实践 | 快速体验云消息队列RocketMQ版
本方案介绍如何使用阿里云消息队列RocketMQ版Serverless实例进行消息管理。主要步骤包括获取接入点、创建Topic和订阅组、收发消息、查看消息轨迹及仪表盘监控。通过这些操作,用户可以轻松实现消息的全生命周期管理,确保消息收发的高效与可靠。此外,还提供了消费验证、下载消息等功能,方便用户进行详细的消息处理与调试。
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
383 32