RabbitMQ——SpringBoot集成RabbitMQ

简介: RabbitMQ——SpringBoot集成RabbitMQ

文章目录:


1.创建一个SpringBoot工程——消息发送者

1.创建一个SpringBoot工程——消息接收者

3.测试结果

3.1 direct

3.2 fanout

3.3 topic

3.4 RabbitMQ管控台中查看SpringBoot工程创建的交换机和消息队列

1.创建一个SpringBoot工程——消息发送者


前两步都是一样的,只不过在依赖项页面中,要勾选RabbitMQ这个选项。


在核心配置文件中,配置RabbitMQ的相关连接信息。

#配置RabbitMQ的相关连接信息
spring.rabbitmq.host=192.168.40.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root

编写实现消息发送的接口和实现类。

接口中的三个方法分别对应 directfanouttopic三种类型的交换机,我这里测试这三种类型的交换机来发送接收消息。

package com.szh.springboot.rabbitmq.service;
/**
 *
 */
public interface SendService {
    void sendMessage(String message);
    void sendFanout(String message);
    void sendTopic(String message);
}
package com.szh.springboot.rabbitmq.service.impl;
import com.szh.springboot.rabbitmq.service.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
 *
 */
@Service("sendService")
public class SendServiceImpl implements SendService {
    @Autowired
    private AmqpTemplate amqpTemplate;
    @Override
    public void sendMessage(String message) {
        /**
         * 发送消息
         * 参数1:交换机名称
         * 参数2:RoutingKey
         * 参数3:具体发送的消息内容
         */
        amqpTemplate.convertAndSend("springbootDirectExchange","springbootDirectRouting",message);
    }
    @Override
    public void sendFanout(String message) {
        amqpTemplate.convertAndSend("fanoutExchange","",message);
    }
    @Override
    public void sendTopic(String message) {
        amqpTemplate.convertAndSend("topicExchange","aa.bb.cc",message);
    }
}

然后写一个关于三种类型交换机的配置类。

package com.szh.springboot.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 *
 */
@Configuration
public class RabbitMQConfig {
    //配置一个Direct类型的交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("springbootDirectExchange");
    }
    //配置一个队列
    @Bean
    public Queue directQueue() {
        return new Queue("springbootDirectQueue");
    }
    /**
     * 配置一个队列和交换机的绑定
     * @param directQueue : 需要绑定的队列对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .bind()
     * @param directExchange : 需要绑定的交换机对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .to()
     *                       .with() 方法对应的RoutingKey
     * @return
     */
    @Bean
    public Binding directBinding(Queue directQueue,DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("springbootDirectRouting");
    }
    //配置一个Fanout类型的交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }
    //配置一个Topic类型的交换机
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }
}

最后是SpringBoot项目的启动入口类。

这里首先是通过ApplicationContext获取到了Spring容器,然后从容器中拿到sendService这个对象,最后的三行代码分别对应的是测试这三种类型的交换机。

package com.szh.springboot.rabbitmq;
import com.szh.springboot.rabbitmq.service.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        ApplicationContext context=SpringApplication.run(Application.class, args);
        SendService service= (SendService) context.getBean("sendService");
        service.sendMessage("SpringBoot集成RabbitMQ的测试数据");
        //service.sendFanout("SpringBoot集成RabbitMQ的测试数据");
        //service.sendTopic("SpringBoot集成RabbitMQ的测试数据");
    }
}


2.创建一个SpringBoot工程——消息接收者


前两步都是一样的,只不过在依赖项页面中,要勾选RabbitMQ这个选项。



在核心配置文件中,配置RabbitMQ的相关连接信息。

#配置RabbitMQ的相关连接信息
spring.rabbitmq.host=192.168.40.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root

编写实现消息接收的接口和实现类。

接口中的这些方法分别对应 directfanouttopic三种类型的交换机,我这里测试这三种类型的交换机来发送接收消息。

package com.szh.sprringboot.rabbitmq.service;
/**
 *
 */
public interface ReceiveService {
    void receiveMessage();
    void directReceive(String message);
    void fanoutReceive01(String message);
    void fanoutReceive02(String message);
    void topicReceive01(String message);
    void topicReceive02(String message);
    void topicReceive03(String message);
}
package com.szh.sprringboot.rabbitmq.service.impl;
import com.szh.sprringboot.rabbitmq.service.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
 *
 */
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService {
    @Autowired
    private AmqpTemplate amqpTemplate;
    /**
     * receiveAndConvert()这个方法,每执行一次只能接收一次消息
     * 如果有消息进入,则不会自动接收消息(不建议使用)
     */
    @Override
    public void receiveMessage() {
//        String message= (String) amqpTemplate.receiveAndConvert("springbootDirectQueue");
//        System.out.println(message);
    }
    /**
     * @RabbitListener : 用于标记当前方法是一个RabbitMQ的消息监听方法,可以持续性的自动接收消息
     * @param message
     * 该方法不需要手动调用,Spring会自动运行这个监听方法
     *
     * 注意:如果该监听方法正常结束,那么Spring会自动确认消息
     *      如果出现异常,则Spring不会确认消息,该消息一直存在于消息队列中
     */
    @Override
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "springbootDirectQueue"),
                    exchange = @Exchange(name = "springbootDirectExchange"),
                    key = {"springbootDirectRouting"}
            )
    })
    public void directReceive(String message) {
        System.out.println(message);
    }
    @Override
    @RabbitListener(bindings = {
            @QueueBinding( //完成队列和交换机的绑定
                    value = @Queue(), //创建一个队列,没有name属性,表示创建一个随即名称的消息队列
                    exchange = @Exchange(name = "fanoutExchange",type = "fanout") //创建一个交换机
            )
    })
    public void fanoutReceive01(String message) {
        System.out.println(message);
    }
    @Override
    @RabbitListener(bindings = {
            @QueueBinding( //完成队列和交换机的绑定
                    value = @Queue(), //创建一个队列,没有name属性,表示创建一个随即名称的消息队列
                    exchange = @Exchange(name = "fanoutExchange",type = "fanout") //创建一个交换机
            )
    })
    public void fanoutReceive02(String message) {
        System.out.println(message);
    }
    @Override
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("topic01"),
                    exchange = @Exchange(name = "topicExchange",type = "topic"),
                    key = {"aa"}
            )
    })
    public void topicReceive01(String message) {
        System.out.println("topic01 接收到的数据:" + message);
    }
    @Override
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("topic02"),
                    exchange = @Exchange(name = "topicExchange",type = "topic"),
                    key = {"aa.*"}
            )
    })
    public void topicReceive02(String message) {
        System.out.println("topic02 接收到的数据:" + message);
    }
    @Override
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("topic03"),
                    exchange = @Exchange(name = "topicExchange",type = "topic"),
                    key = {"aa.#"}
            )
    })
    public void topicReceive03(String message) {
        System.out.println("topic03 接收到的数据:" + message);
    }
}

最后是SpringBoot项目的启动入口类。

package com.szh.sprringboot.rabbitmq;
import com.szh.sprringboot.rabbitmq.service.ReceiveService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        ApplicationContext context=SpringApplication.run(Application.class, args);
        ReceiveService service= (ReceiveService) context.getBean("receiveService");
        //service.receiveMessage();
    }
}


3.测试结果


3.1 direct

先启动消息发送者工程,再启动消息接收者工程。

3.2 fanout

先启动消息接收者工程,再启动消息发送者工程。

因为这里fanout交换机中定义了两个消息队列,它是一对多、不需要绑定RoutingKey的,所以这些消息队列都会接收到消息数据。

3.3 topic

先启动消息接收者工程,再启动消息发送者工程。

因为这里topic交换机中定义了三个消息队列,它是一对多、需要绑定RoutingKey的,根据RoutingKey的不同会限制哪些消息队列能够接收到消息、哪些不能。当绑定的RoutingKeyaa时,只有BingKey aaaa.# 这两个消息队列可以接收到(aa顾名思义、而aa.#是因为#表示0个或多个单词,aa.*接收不到是因为*仅能表示1个单词)。


3.4 RabbitMQ管控台中查看SpringBoot工程创建的交换机和消息队列

这里的消息队列只有directtopic的,至于为什么没有fanout的,是因为fanout类型的交换机在消息发送/接收服务停止之后,对应的交换机还在,但是消息队列会自动清除掉。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
3月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
66 3
|
1月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
11天前
|
XML Java 数据库连接
SpringBoot集成Flowable:打造强大的工作流管理系统
在企业级应用开发中,工作流管理是一个核心组件,它能够帮助我们定义、执行和管理业务流程。Flowable是一个开源的工作流和业务流程管理(BPM)平台,它提供了强大的工作流引擎和建模工具。结合SpringBoot,我们可以快速构建一个高效、灵活的工作流管理系统。本文将探讨如何将Flowable集成到SpringBoot应用中,并展示其强大的功能。
45 1
|
21天前
|
JSON Java API
springboot集成ElasticSearch使用completion实现补全功能
springboot集成ElasticSearch使用completion实现补全功能
24 1
|
11天前
|
XML 存储 Java
SpringBoot集成Flowable:构建强大的工作流引擎
在企业级应用开发中,工作流管理是核心功能之一。Flowable是一个开源的工作流引擎,它提供了BPMN 2.0规范的实现,并且与SpringBoot框架完美集成。本文将探讨如何使用SpringBoot和Flowable构建一个强大的工作流引擎,并分享一些实践技巧。
31 0
|
1月前
|
前端开发 Java 程序员
springboot 学习十五:Spring Boot 优雅的集成Swagger2、Knife4j
这篇文章是关于如何在Spring Boot项目中集成Swagger2和Knife4j来生成和美化API接口文档的详细教程。
92 1
|
1月前
|
存储 前端开发 Java
Spring Boot 集成 MinIO 与 KKFile 实现文件预览功能
本文详细介绍如何在Spring Boot项目中集成MinIO对象存储系统与KKFileView文件预览工具,实现文件上传及在线预览功能。首先搭建MinIO服务器,并在Spring Boot中配置MinIO SDK进行文件管理;接着通过KKFileView提供文件预览服务,最终实现文档管理系统的高效文件处理能力。
268 11
|
2月前
|
XML Java 关系型数据库
springboot 集成 mybatis-plus 代码生成器
本文介绍了如何在Spring Boot项目中集成MyBatis-Plus代码生成器,包括导入相关依赖坐标、配置快速代码生成器以及自定义代码生成器模板的步骤和代码示例,旨在提高开发效率,快速生成Entity、Mapper、Mapper XML、Service、Controller等代码。
springboot 集成 mybatis-plus 代码生成器
|
2月前
|
Java Spring
springboot 集成 swagger 2.x 和 3.0 以及 Failed to start bean ‘documentationPluginsBootstrapper‘问题的解决
本文介绍了如何在Spring Boot项目中集成Swagger 2.x和3.0版本,并提供了解决Swagger在Spring Boot中启动失败问题“Failed to start bean ‘documentationPluginsBootstrapper’; nested exception is java.lang.NullPointerEx”的方法,包括配置yml文件和Spring Boot版本的降级。
springboot 集成 swagger 2.x 和 3.0 以及 Failed to start bean ‘documentationPluginsBootstrapper‘问题的解决