SpringBoot集成RabbitMQ-三种模式的实现

简介: SpringBoot集成RabbitMQ-三种模式的实现


准备环境

使用IDEA创建一个MAVEN的SpringBoot项目。并勾选如下使用的依赖等

一、fanout模式

(一)生产者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8989
#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: 你的rabbitmq的IP或者域名
    virtual-host: /
#配置其他 这些是自定义的,方便之后使用
mq:
  fannout:
    exchangName: order.fanout.ex

3、Producer.java

package com.zh.srp.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;
@Component
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //1、定义交换机
    @Value("${mq.fannout.exchangName}")
    private String exchangName;
    //2、路由Key
    private String routeKey = "";
    public void sendMessage(int i){
        //订单信息
        String orderId = UUID.randomUUID().toString();
        //消息
        String message = "你的订单第【"+i+"】个信息是:" + orderId + new Date().toString();
        System.out.println("正在发送----->:"+message);
        rabbitTemplate.convertAndSend(exchangName,routeKey,message);
    }
}

4、SRCApplicationTests.java

package com.zh.srp;
import com.zh.srp.mq.Producer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SRCApplicationTests {
    @Autowired
    private Producer producer;
    @Test
    void contextLoads() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            producer.sendMessage(i);
            Thread.sleep(2);
        }
    }
}

(二)消费者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8081
#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: ip
    virtual-host: /
#配置其他
mq:
  fannout:
    exchangName: order.fanout.ex #交换机
    log: #日志队列
      queue: order.fanout.log.queue #C1队列
    email:
      queue: order.fanout.email.queue #C2队列

3、EmailService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性
@RabbitListener(
        bindings = @QueueBinding(value = @Queue(value = "${mq.fannout.email.queue}",
        autoDelete = "true"),
        exchange = @Exchange(value = "${mq.fannout.exchangName}",type = ExchangeTypes.FANOUT)))
public class EmailService {
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("Email-log--------->"+Message);
    }
}

4、LogService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;
@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性
@RabbitListener(
        bindings = @QueueBinding(value = @Queue(value = "${mq.fannout.log.queue}",
        autoDelete = "true"),
        exchange = @Exchange(value = "${mq.fannout.exchangName}",type = ExchangeTypes.FANOUT)))
public class LogService {
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("log--------->"+Message);
    }
}

(三)fanout模式测试

1、启动消费者的SRApplication.java

2、启动生产者的Test.java中的测试方法

生产:

消费:

二、direct模式

(一)生产者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8989
#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: IP
    virtual-host: /
#配置其他
mq:
  direct:
    exchangName: order.direct.ex

3、Producer.java

package com.zh.srp.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;
@Component
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
//    这个是一个接口,最终也会找到实现类RabbitTemplate
//    @Autowired
//    private AmqpTemplate amqpTemplate;
    //1、定义交换机
    @Value("${mq.direct.exchangName}")
    private String exchangName;
//    保存用户
public void saveUser(int i){
    //订单信息
    String orderId = UUID.randomUUID().toString();
    //消息
    String message = "保存用户:" + orderId + new Date().toString();
    System.out.println("正在发送user----->:"+message);
    rabbitTemplate.convertAndSend(exchangName,"email",message);
    rabbitTemplate.convertAndSend(exchangName,"log",message);
    rabbitTemplate.convertAndSend(exchangName,"wx",message);
}
//    保存用户
public void WX(int i){
    //订单信息
    String orderId = UUID.randomUUID().toString();
    //消息
    String message = "你的微信第【"+i+"】个信息是:" + orderId + new Date().toString();
    System.out.println("正在发送WX----->:"+message);
    rabbitTemplate.convertAndSend(exchangName,"email",message);
    rabbitTemplate.convertAndSend(exchangName,"log",message);
}
}

4、DirectApplicationTests.java

package com.zh.srp;
import com.zh.srp.mq.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DirectApplicationTests {
    @Autowired
    private OrderService producer;
    @Test
    void contextLoads() throws InterruptedException {
        for (int i = 0; i < 1; i++) {
            producer.saveUser(i);
            producer.WX(i);
            Thread.sleep(2);
        }
    }
}

(二)消费者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8081
#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: IP
    virtual-host: /
#配置其他
mq:
  direct:
    exchangName: order.direct.ex
    log: #日志队列
      queue: order.direct.log.queue #C1队列
    email:
      queue: order.direct.email.queue #C2队列
    wx:
      queue: order.direct.wx.queue #C3队列

3、EMailService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
public class EMailService {
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性 autoDelete = "false":代表持久化
    @RabbitListener(
         bindings = @QueueBinding(value = @Queue(value = "${mq.direct.email.queue}",
        autoDelete = "false"),
        exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT),
        key = "email" //路由Key
    ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("email-log--------->"+Message);
    }
}

4、LOGService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性
public class LOGService {
    @RabbitListener(
        bindings = @QueueBinding(value = @Queue(value = "${mq.direct.log.queue}",
        autoDelete = "false"),
        exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT),
        key = "log" //路由Key
    ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("log-log--------->"+Message);
    }
}

5、WXService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性
public class WXService {
    @RabbitListener(
        bindings = @QueueBinding(value = @Queue(value = "${mq.direct.wx.queue}",
        autoDelete = "false"),
        exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT),
        key = "wx" //路由Key
    ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("WX-log--------->"+Message);
    }
}

(三)测试

1、启动消费者DirectCApplication.java

2、启动生产者的Test类的测试方法

消费

生产

三、topic模式

(一)生产者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8989
#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: ip
    virtual-host: /
#配置其他
mq:
  topic:
    exchangName: linux.topic

3、OrderService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;
@Component
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //1、定义交换机
    @Value("${mq.topic.exchangName}")
    private String exchangName;
    //WX
    public void QQ(int i){
        //订单信息
        String orderId = UUID.randomUUID().toString();
        //消息
        String message = "你的WX第【"+i+"】个信息是:" + orderId + new Date().toString();
        System.out.println("正在发送WX----->:"+message);
        //通配符topic
        rabbitTemplate.convertAndSend(exchangName,"qq.log",message);
    }
    //QQ
    public void WX(int i){
        //订单信息
        String orderId = UUID.randomUUID().toString();
        //消息
        String message = "你的QQ第【"+i+"】个信息是:" + orderId + new Date().toString();
        System.out.println("正在发送QQ----->:"+message);
        //通配符topic
        rabbitTemplate.convertAndSend(exchangName,"wx.log",message);
    }
}

4、TopicApplicationTests.java

package com.zh.srp;
import com.zh.srp.mq.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class TopicApplicationTests {
    @Autowired
    private OrderService producer;
    @Test
    void contextLoads() throws InterruptedException {
        for (int i = 0; i < 1; i++) {
            producer.QQ(i);
            producer.WX(i);
            Thread.sleep(2);
        }
    }
}

(二)消费者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8081
#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: ip
    virtual-host: /
#配置其他
mq:
  topic:
    exchangName: linux.topic
    qq: #日志队列
      queue: linux.topic.qq.queue #C1队列
    wx:
      queue: linux.topic.wx.queue #C3队列

3、QQService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
public class QQService {
    @RabbitListener(
            bindings = @QueueBinding(value = @Queue(value = "${mq.topic.qq.queue}",
                    autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.topic.exchangName}",type = ExchangeTypes.TOPIC),
                    key = "qq.*" //路由Key
            ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("QQ-log--------->"+Message);
    }
}

4、WXService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
public class WXService {
    @RabbitListener(
            bindings = @QueueBinding(value = @Queue(value = "${mq.topic.wx.queue}",
                    autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.topic.exchangName}",type = ExchangeTypes.TOPIC),
                    key = "wx.*" //路由Key
            ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("WX-log--------->"+Message);
    }
}

(三)测试

1、启动消费者

2、启动生产者

生产者

消费者

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6天前
|
NoSQL Java 关系型数据库
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 介绍
本文介绍在 Spring Boot 中集成 Redis 的方法。Redis 是一种支持多种数据结构的非关系型数据库(NoSQL),具备高并发、高性能和灵活扩展的特点,适用于缓存、实时数据分析等场景。其数据以键值对形式存储,支持字符串、哈希、列表、集合等类型。通过将 Redis 与 Mysql 集群结合使用,可实现数据同步,提升系统稳定性。例如,在网站架构中优先从 Redis 获取数据,故障时回退至 Mysql,确保服务不中断。
43 0
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 介绍
|
6天前
|
JSON Java API
微服务——SpringBoot使用归纳——Spring Boot集成 Swagger2 展现在线接口文档——Swagger2 的使用
本文详细介绍了Swagger2的使用方法,包括在Spring Boot项目中的配置与应用。重点讲解了Swagger2中常用的注解,如实体类上的`@ApiModel`和`@ApiModelProperty`,Controller类上的`@Api`、`@ApiOperation`以及参数上的`@ApiParam`等。通过示例代码展示了如何为实体类和接口添加注解,并在页面上生成在线接口文档,实现接口测试。最后总结了Swagger的优势及其在项目开发中的重要性,提供了课程源代码下载链接供学习参考。
47 0
微服务——SpringBoot使用归纳——Spring Boot集成 Swagger2 展现在线接口文档——Swagger2 的使用
|
6天前
|
缓存 Java API
微服务——SpringBoot使用归纳——Spring Boot集成 Swagger2 展现在线接口文档——Swagger2 的配置
本文介绍了在Spring Boot中配置Swagger2的方法。通过创建一个配置类,添加`@Configuration`和`@EnableSwagger2`注解,使用Docket对象定义API文档的详细信息,包括标题、描述、版本和包路径等。配置完成后,访问`localhost:8080/swagger-ui.html`即可查看接口文档。文中还提示了可能因浏览器缓存导致的问题及解决方法。
44 0
微服务——SpringBoot使用归纳——Spring Boot集成 Swagger2 展现在线接口文档——Swagger2 的配置
|
6天前
|
安全 Java Apache
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 身份和权限认证
本文介绍了 Apache Shiro 的身份认证与权限认证机制。在身份认证部分,分析了 Shiro 的认证流程,包括应用程序调用 `Subject.login(token)` 方法、SecurityManager 接管认证以及通过 Realm 进行具体的安全验证。权限认证部分阐述了权限(permission)、角色(role)和用户(user)三者的关系,其中用户可拥有多个角色,角色则对应不同的权限组合,例如普通用户仅能查看或添加信息,而管理员可执行所有操作。
39 0
|
6天前
|
安全 Java 数据安全/隐私保护
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 三大核心组件
本课程介绍如何在Spring Boot中集成Shiro框架,主要讲解Shiro的认证与授权功能。Shiro是一个简单易用的Java安全框架,用于认证、授权、加密和会话管理等。其核心组件包括Subject(认证主体)、SecurityManager(安全管理员)和Realm(域)。Subject负责身份认证,包含Principals(身份)和Credentials(凭证);SecurityManager是架构核心,协调内部组件运作;Realm则是连接Shiro与应用数据的桥梁,用于访问用户账户及权限信息。通过学习,您将掌握Shiro的基本原理及其在项目中的应用。
40 0
|
6天前
|
XML Java 数据库连接
微服务——SpringBoot使用归纳——Spring Boot集成MyBatis——基于 xml 的整合
本教程介绍了基于XML的MyBatis整合方式。首先在`application.yml`中配置XML路径,如`classpath:mapper/*.xml`,然后创建`UserMapper.xml`文件定义SQL映射,包括`resultMap`和查询语句。通过设置`namespace`关联Mapper接口,实现如`getUserByName`的方法。Controller层调用Service完成测试,访问`/getUserByName/{name}`即可返回用户信息。为简化Mapper扫描,推荐在Spring Boot启动类用`@MapperScan`注解指定包路径避免逐个添加`@Mapper`
28 0
|
6天前
|
前端开发 Java 数据库
微服务——SpringBoot使用归纳——Spring Boot集成Thymeleaf模板引擎——Thymeleaf 介绍
本课介绍Spring Boot集成Thymeleaf模板引擎。Thymeleaf是一款现代服务器端Java模板引擎,支持Web和独立环境,可实现自然模板开发,便于团队协作。与传统JSP不同,Thymeleaf模板可以直接在浏览器中打开,方便前端人员查看静态原型。通过在HTML标签中添加扩展属性(如`th:text`),Thymeleaf能够在服务运行时动态替换内容,展示数据库中的数据,同时兼容静态页面展示,为开发带来灵活性和便利性。
32 0
|
6天前
|
Java Maven 微服务
微服务——SpringBoot使用归纳——Spring Boot集成 Swagger2 展现在线接口文档——Swagger2 的 maven 依赖
在项目中使用Swagger2工具时,需导入Maven依赖。尽管官方最高版本为2.8.0,但其展示效果不够理想且稳定性欠佳。实际开发中常用2.2.2版本,因其稳定且界面友好。以下是围绕2.2.2版本的Maven依赖配置,包括`springfox-swagger2`和`springfox-swagger-ui`两个模块。
29 0
|
6天前
|
前端开发 Java API
微服务——SpringBoot使用归纳——Spring Boot集成 Swagger2 展现在线接口文档—— Swagger 简介
第6课介绍了在Spring Boot中集成Swagger2以展示在线接口文档的方法。随着前后端分离架构的发展,API文档成为连接前端与后端开发的重要纽带。然而,代码更新频繁导致文档难以同步维护,Swagger2解决了这一问题。通过Swagger,在线API文档不仅方便了接口调用方查看和测试,还支持开发者实时测试接口数据。本文使用Swagger 2.2.2版本,讲解如何在Spring Boot项目中导入并配置Swagger2工具,从而高效管理接口文档。
41 0
|
6天前
|
消息中间件 存储 Java
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ安装
本教程介绍ActiveMQ的安装与基本使用。首先从官网下载apache-activemq-5.15.3版本,解压后即可完成安装,非常便捷。启动时进入解压目录下的bin文件夹,根据系统选择win32或win64,运行activemq.bat启动服务。通过浏览器访问`http://127.0.0.1:8161/admin/`可进入管理界面,默认用户名密码为admin/admin。ActiveMQ支持两种消息模式:点对点(Queue)和发布/订阅(Topic)。前者确保每条消息仅被一个消费者消费,后者允许多个消费者同时接收相同消息。
36 0
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ安装