RabbitMQ——SpringBoot集成RabbitMQ

简介: RabbitMQ——SpringBoot集成RabbitMQ

文章目录:


1.创建一个SpringBoot工程——消息发送者

1.创建一个SpringBoot工程——消息接收者

3.测试结果

3.1 direct

3.2 fanout

3.3 topic

3.4 RabbitMQ管控台中查看SpringBoot工程创建的交换机和消息队列

1.创建一个SpringBoot工程——消息发送者


前两步都是一样的,只不过在依赖项页面中,要勾选RabbitMQ这个选项。


在核心配置文件中,配置RabbitMQ的相关连接信息。

#配置RabbitMQ的相关连接信息
spring.rabbitmq.host=192.168.40.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root

编写实现消息发送的接口和实现类。

接口中的三个方法分别对应 directfanouttopic三种类型的交换机,我这里测试这三种类型的交换机来发送接收消息。

package com.szh.springboot.rabbitmq.service;
/**
 *
 */
public interface SendService {
    void sendMessage(String message);
    void sendFanout(String message);
    void sendTopic(String message);
}
package com.szh.springboot.rabbitmq.service.impl;
import com.szh.springboot.rabbitmq.service.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
 *
 */
@Service("sendService")
public class SendServiceImpl implements SendService {
    @Autowired
    private AmqpTemplate amqpTemplate;
    @Override
    public void sendMessage(String message) {
        /**
         * 发送消息
         * 参数1:交换机名称
         * 参数2:RoutingKey
         * 参数3:具体发送的消息内容
         */
        amqpTemplate.convertAndSend("springbootDirectExchange","springbootDirectRouting",message);
    }
    @Override
    public void sendFanout(String message) {
        amqpTemplate.convertAndSend("fanoutExchange","",message);
    }
    @Override
    public void sendTopic(String message) {
        amqpTemplate.convertAndSend("topicExchange","aa.bb.cc",message);
    }
}

然后写一个关于三种类型交换机的配置类。

package com.szh.springboot.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 *
 */
@Configuration
public class RabbitMQConfig {
    //配置一个Direct类型的交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("springbootDirectExchange");
    }
    //配置一个队列
    @Bean
    public Queue directQueue() {
        return new Queue("springbootDirectQueue");
    }
    /**
     * 配置一个队列和交换机的绑定
     * @param directQueue : 需要绑定的队列对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .bind()
     * @param directExchange : 需要绑定的交换机对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .to()
     *                       .with() 方法对应的RoutingKey
     * @return
     */
    @Bean
    public Binding directBinding(Queue directQueue,DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("springbootDirectRouting");
    }
    //配置一个Fanout类型的交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }
    //配置一个Topic类型的交换机
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }
}

最后是SpringBoot项目的启动入口类。

这里首先是通过ApplicationContext获取到了Spring容器,然后从容器中拿到sendService这个对象,最后的三行代码分别对应的是测试这三种类型的交换机。

package com.szh.springboot.rabbitmq;
import com.szh.springboot.rabbitmq.service.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        ApplicationContext context=SpringApplication.run(Application.class, args);
        SendService service= (SendService) context.getBean("sendService");
        service.sendMessage("SpringBoot集成RabbitMQ的测试数据");
        //service.sendFanout("SpringBoot集成RabbitMQ的测试数据");
        //service.sendTopic("SpringBoot集成RabbitMQ的测试数据");
    }
}


2.创建一个SpringBoot工程——消息接收者


前两步都是一样的,只不过在依赖项页面中,要勾选RabbitMQ这个选项。



在核心配置文件中,配置RabbitMQ的相关连接信息。

#配置RabbitMQ的相关连接信息
spring.rabbitmq.host=192.168.40.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root

编写实现消息接收的接口和实现类。

接口中的这些方法分别对应 directfanouttopic三种类型的交换机,我这里测试这三种类型的交换机来发送接收消息。

package com.szh.sprringboot.rabbitmq.service;
/**
 *
 */
public interface ReceiveService {
    void receiveMessage();
    void directReceive(String message);
    void fanoutReceive01(String message);
    void fanoutReceive02(String message);
    void topicReceive01(String message);
    void topicReceive02(String message);
    void topicReceive03(String message);
}
package com.szh.sprringboot.rabbitmq.service.impl;
import com.szh.sprringboot.rabbitmq.service.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
 *
 */
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService {
    @Autowired
    private AmqpTemplate amqpTemplate;
    /**
     * receiveAndConvert()这个方法,每执行一次只能接收一次消息
     * 如果有消息进入,则不会自动接收消息(不建议使用)
     */
    @Override
    public void receiveMessage() {
//        String message= (String) amqpTemplate.receiveAndConvert("springbootDirectQueue");
//        System.out.println(message);
    }
    /**
     * @RabbitListener : 用于标记当前方法是一个RabbitMQ的消息监听方法,可以持续性的自动接收消息
     * @param message
     * 该方法不需要手动调用,Spring会自动运行这个监听方法
     *
     * 注意:如果该监听方法正常结束,那么Spring会自动确认消息
     *      如果出现异常,则Spring不会确认消息,该消息一直存在于消息队列中
     */
    @Override
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "springbootDirectQueue"),
                    exchange = @Exchange(name = "springbootDirectExchange"),
                    key = {"springbootDirectRouting"}
            )
    })
    public void directReceive(String message) {
        System.out.println(message);
    }
    @Override
    @RabbitListener(bindings = {
            @QueueBinding( //完成队列和交换机的绑定
                    value = @Queue(), //创建一个队列,没有name属性,表示创建一个随即名称的消息队列
                    exchange = @Exchange(name = "fanoutExchange",type = "fanout") //创建一个交换机
            )
    })
    public void fanoutReceive01(String message) {
        System.out.println(message);
    }
    @Override
    @RabbitListener(bindings = {
            @QueueBinding( //完成队列和交换机的绑定
                    value = @Queue(), //创建一个队列,没有name属性,表示创建一个随即名称的消息队列
                    exchange = @Exchange(name = "fanoutExchange",type = "fanout") //创建一个交换机
            )
    })
    public void fanoutReceive02(String message) {
        System.out.println(message);
    }
    @Override
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("topic01"),
                    exchange = @Exchange(name = "topicExchange",type = "topic"),
                    key = {"aa"}
            )
    })
    public void topicReceive01(String message) {
        System.out.println("topic01 接收到的数据:" + message);
    }
    @Override
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("topic02"),
                    exchange = @Exchange(name = "topicExchange",type = "topic"),
                    key = {"aa.*"}
            )
    })
    public void topicReceive02(String message) {
        System.out.println("topic02 接收到的数据:" + message);
    }
    @Override
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("topic03"),
                    exchange = @Exchange(name = "topicExchange",type = "topic"),
                    key = {"aa.#"}
            )
    })
    public void topicReceive03(String message) {
        System.out.println("topic03 接收到的数据:" + message);
    }
}

最后是SpringBoot项目的启动入口类。

package com.szh.sprringboot.rabbitmq;
import com.szh.sprringboot.rabbitmq.service.ReceiveService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        ApplicationContext context=SpringApplication.run(Application.class, args);
        ReceiveService service= (ReceiveService) context.getBean("receiveService");
        //service.receiveMessage();
    }
}


3.测试结果


3.1 direct

先启动消息发送者工程,再启动消息接收者工程。

3.2 fanout

先启动消息接收者工程,再启动消息发送者工程。

因为这里fanout交换机中定义了两个消息队列,它是一对多、不需要绑定RoutingKey的,所以这些消息队列都会接收到消息数据。

3.3 topic

先启动消息接收者工程,再启动消息发送者工程。

因为这里topic交换机中定义了三个消息队列,它是一对多、需要绑定RoutingKey的,根据RoutingKey的不同会限制哪些消息队列能够接收到消息、哪些不能。当绑定的RoutingKeyaa时,只有BingKey aaaa.# 这两个消息队列可以接收到(aa顾名思义、而aa.#是因为#表示0个或多个单词,aa.*接收不到是因为*仅能表示1个单词)。


3.4 RabbitMQ管控台中查看SpringBoot工程创建的交换机和消息队列

这里的消息队列只有directtopic的,至于为什么没有fanout的,是因为fanout类型的交换机在消息发送/接收服务停止之后,对应的交换机还在,但是消息队列会自动清除掉。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
24天前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
169 43
|
3天前
|
Cloud Native Java Nacos
springcloud/springboot集成NACOS 做注册和配置中心以及nacos源码分析
通过本文,我们详细介绍了如何在 Spring Cloud 和 Spring Boot 中集成 Nacos 进行服务注册和配置管理,并对 Nacos 的源码进行了初步分析。Nacos 作为一个强大的服务注册和配置管理平台,为微服务架构提供
29 14
|
3天前
|
消息中间件 XML 前端开发
springBoot集成websocket实时消息推送
本文介绍了如何在Spring Boot项目中集成WebSocket实现实时消息推送。首先,通过引入`spring-boot-starter-websocket`依赖,配置`WebSocketConfig`类来启用WebSocket支持。接着,创建`WebSocketTest`服务器类,处理连接、消息收发及错误等事件,并使用`ConcurrentHashMap`管理用户连接。最后,前端通过JavaScript建立WebSocket连接,监听消息并进行相应处理。此方案适用于需要实时通信的应用场景,如聊天室、通知系统等。
|
26天前
|
监控 前端开发 Java
SpringBoot集成Tomcat、DispatcherServlet
通过这些配置,您可以充分利用 Spring Boot 内置的功能,快速构建和优化您的 Web 应用。
57 21
|
1月前
|
监控 Java Nacos
使用Spring Boot集成Nacos
通过上述步骤,Spring Boot应用可以成功集成Nacos,利用Nacos的服务发现和配置管理功能来提升微服务架构的灵活性和可维护性。通过这种集成,开发者可以更高效地管理和部署微服务。
243 17
|
1月前
|
XML JavaScript Java
SpringBoot集成Shiro权限+Jwt认证
本文主要描述如何快速基于SpringBoot 2.5.X版本集成Shiro+JWT框架,让大家快速实现无状态登陆和接口权限认证主体框架,具体业务细节未实现,大家按照实际项目补充。
98 11
|
1月前
|
缓存 安全 Java
Spring Boot 3 集成 Spring Security + JWT
本文详细介绍了如何使用Spring Boot 3和Spring Security集成JWT,实现前后端分离的安全认证概述了从入门到引入数据库,再到使用JWT的完整流程。列举了项目中用到的关键依赖,如MyBatis-Plus、Hutool等。简要提及了系统配置表、部门表、字典表等表结构。使用Hutool-jwt工具类进行JWT校验。配置忽略路径、禁用CSRF、添加JWT校验过滤器等。实现登录接口,返回token等信息。
477 12
|
1月前
|
存储 安全 Java
Spring Boot 3 集成Spring AOP实现系统日志记录
本文介绍了如何在Spring Boot 3中集成Spring AOP实现系统日志记录功能。通过定义`SysLog`注解和配置相应的AOP切面,可以在方法执行前后自动记录日志信息,包括操作的开始时间、结束时间、请求参数、返回结果、异常信息等,并将这些信息保存到数据库中。此外,还使用了`ThreadLocal`变量来存储每个线程独立的日志数据,确保线程安全。文中还展示了项目实战中的部分代码片段,以及基于Spring Boot 3 + Vue 3构建的快速开发框架的简介与内置功能列表。此框架结合了当前主流技术栈,提供了用户管理、权限控制、接口文档自动生成等多项实用特性。
91 8
|
2月前
|
XML Java API
Spring Boot集成MinIO
本文介绍了如何在Spring Boot项目中集成MinIO,一个高性能的分布式对象存储服务。主要步骤包括:引入MinIO依赖、配置MinIO属性、创建MinIO配置类和服务类、使用服务类实现文件上传和下载功能,以及运行应用进行测试。通过这些步骤,可以轻松地在项目中使用MinIO的对象存储功能。
181 5
|
2月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
51 6

热门文章

最新文章