SpringBoot集成RabbitMQ-三种模式的实现

简介: SpringBoot集成RabbitMQ-三种模式的实现


准备环境

使用IDEA创建一个MAVEN的SpringBoot项目。并勾选如下使用的依赖等

一、fanout模式

(一)生产者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8989
#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: 你的rabbitmq的IP或者域名
    virtual-host: /
#配置其他 这些是自定义的,方便之后使用
mq:
  fannout:
    exchangName: order.fanout.ex

3、Producer.java

package com.zh.srp.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;
@Component
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //1、定义交换机
    @Value("${mq.fannout.exchangName}")
    private String exchangName;
    //2、路由Key
    private String routeKey = "";
    public void sendMessage(int i){
        //订单信息
        String orderId = UUID.randomUUID().toString();
        //消息
        String message = "你的订单第【"+i+"】个信息是:" + orderId + new Date().toString();
        System.out.println("正在发送----->:"+message);
        rabbitTemplate.convertAndSend(exchangName,routeKey,message);
    }
}

4、SRCApplicationTests.java

package com.zh.srp;
import com.zh.srp.mq.Producer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SRCApplicationTests {
    @Autowired
    private Producer producer;
    @Test
    void contextLoads() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            producer.sendMessage(i);
            Thread.sleep(2);
        }
    }
}

(二)消费者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8081
#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: ip
    virtual-host: /
#配置其他
mq:
  fannout:
    exchangName: order.fanout.ex #交换机
    log: #日志队列
      queue: order.fanout.log.queue #C1队列
    email:
      queue: order.fanout.email.queue #C2队列

3、EmailService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性
@RabbitListener(
        bindings = @QueueBinding(value = @Queue(value = "${mq.fannout.email.queue}",
        autoDelete = "true"),
        exchange = @Exchange(value = "${mq.fannout.exchangName}",type = ExchangeTypes.FANOUT)))
public class EmailService {
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("Email-log--------->"+Message);
    }
}

4、LogService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;
@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性
@RabbitListener(
        bindings = @QueueBinding(value = @Queue(value = "${mq.fannout.log.queue}",
        autoDelete = "true"),
        exchange = @Exchange(value = "${mq.fannout.exchangName}",type = ExchangeTypes.FANOUT)))
public class LogService {
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("log--------->"+Message);
    }
}

(三)fanout模式测试

1、启动消费者的SRApplication.java

2、启动生产者的Test.java中的测试方法

生产:

消费:

二、direct模式

(一)生产者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8989
#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: IP
    virtual-host: /
#配置其他
mq:
  direct:
    exchangName: order.direct.ex

3、Producer.java

package com.zh.srp.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;
@Component
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
//    这个是一个接口,最终也会找到实现类RabbitTemplate
//    @Autowired
//    private AmqpTemplate amqpTemplate;
    //1、定义交换机
    @Value("${mq.direct.exchangName}")
    private String exchangName;
//    保存用户
public void saveUser(int i){
    //订单信息
    String orderId = UUID.randomUUID().toString();
    //消息
    String message = "保存用户:" + orderId + new Date().toString();
    System.out.println("正在发送user----->:"+message);
    rabbitTemplate.convertAndSend(exchangName,"email",message);
    rabbitTemplate.convertAndSend(exchangName,"log",message);
    rabbitTemplate.convertAndSend(exchangName,"wx",message);
}
//    保存用户
public void WX(int i){
    //订单信息
    String orderId = UUID.randomUUID().toString();
    //消息
    String message = "你的微信第【"+i+"】个信息是:" + orderId + new Date().toString();
    System.out.println("正在发送WX----->:"+message);
    rabbitTemplate.convertAndSend(exchangName,"email",message);
    rabbitTemplate.convertAndSend(exchangName,"log",message);
}
}

4、DirectApplicationTests.java

package com.zh.srp;
import com.zh.srp.mq.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DirectApplicationTests {
    @Autowired
    private OrderService producer;
    @Test
    void contextLoads() throws InterruptedException {
        for (int i = 0; i < 1; i++) {
            producer.saveUser(i);
            producer.WX(i);
            Thread.sleep(2);
        }
    }
}

(二)消费者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8081
#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: IP
    virtual-host: /
#配置其他
mq:
  direct:
    exchangName: order.direct.ex
    log: #日志队列
      queue: order.direct.log.queue #C1队列
    email:
      queue: order.direct.email.queue #C2队列
    wx:
      queue: order.direct.wx.queue #C3队列

3、EMailService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
public class EMailService {
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性 autoDelete = "false":代表持久化
    @RabbitListener(
         bindings = @QueueBinding(value = @Queue(value = "${mq.direct.email.queue}",
        autoDelete = "false"),
        exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT),
        key = "email" //路由Key
    ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("email-log--------->"+Message);
    }
}

4、LOGService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性
public class LOGService {
    @RabbitListener(
        bindings = @QueueBinding(value = @Queue(value = "${mq.direct.log.queue}",
        autoDelete = "false"),
        exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT),
        key = "log" //路由Key
    ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("log-log--------->"+Message);
    }
}

5、WXService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性
public class WXService {
    @RabbitListener(
        bindings = @QueueBinding(value = @Queue(value = "${mq.direct.wx.queue}",
        autoDelete = "false"),
        exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT),
        key = "wx" //路由Key
    ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("WX-log--------->"+Message);
    }
}

(三)测试

1、启动消费者DirectCApplication.java

2、启动生产者的Test类的测试方法

消费

生产

三、topic模式

(一)生产者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8989
#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: ip
    virtual-host: /
#配置其他
mq:
  topic:
    exchangName: linux.topic

3、OrderService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;
@Component
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //1、定义交换机
    @Value("${mq.topic.exchangName}")
    private String exchangName;
    //WX
    public void QQ(int i){
        //订单信息
        String orderId = UUID.randomUUID().toString();
        //消息
        String message = "你的WX第【"+i+"】个信息是:" + orderId + new Date().toString();
        System.out.println("正在发送WX----->:"+message);
        //通配符topic
        rabbitTemplate.convertAndSend(exchangName,"qq.log",message);
    }
    //QQ
    public void WX(int i){
        //订单信息
        String orderId = UUID.randomUUID().toString();
        //消息
        String message = "你的QQ第【"+i+"】个信息是:" + orderId + new Date().toString();
        System.out.println("正在发送QQ----->:"+message);
        //通配符topic
        rabbitTemplate.convertAndSend(exchangName,"wx.log",message);
    }
}

4、TopicApplicationTests.java

package com.zh.srp;
import com.zh.srp.mq.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class TopicApplicationTests {
    @Autowired
    private OrderService producer;
    @Test
    void contextLoads() throws InterruptedException {
        for (int i = 0; i < 1; i++) {
            producer.QQ(i);
            producer.WX(i);
            Thread.sleep(2);
        }
    }
}

(二)消费者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8081
#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: ip
    virtual-host: /
#配置其他
mq:
  topic:
    exchangName: linux.topic
    qq: #日志队列
      queue: linux.topic.qq.queue #C1队列
    wx:
      queue: linux.topic.wx.queue #C3队列

3、QQService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
public class QQService {
    @RabbitListener(
            bindings = @QueueBinding(value = @Queue(value = "${mq.topic.qq.queue}",
                    autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.topic.exchangName}",type = ExchangeTypes.TOPIC),
                    key = "qq.*" //路由Key
            ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("QQ-log--------->"+Message);
    }
}

4、WXService.java

package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
public class WXService {
    @RabbitListener(
            bindings = @QueueBinding(value = @Queue(value = "${mq.topic.wx.queue}",
                    autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.topic.exchangName}",type = ExchangeTypes.TOPIC),
                    key = "wx.*" //路由Key
            ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("WX-log--------->"+Message);
    }
}

(三)测试

1、启动消费者

2、启动生产者

生产者

消费者

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
5月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
1月前
|
负载均衡 IDE Java
SpringBoot整合XXL-JOB【04】- 以GLUE模式运行与执行器负载均衡策略
在本节中,我们将介绍XXL-JOB的GLUE模式和集群模式下的路由策略。GLUE模式允许直接在线上改造方法为定时任务,无需重新部署。通过一个测试方法,展示了如何在调度中心配置并使用GLUE模式执行定时任务。接着,我们探讨了多实例环境下的负载均衡策略,确保任务不会重复执行,并可通过修改路由策略(如轮训)实现任务在多个实例间的均衡分配。最后,总结了GLUE模式和负载均衡策略的应用,帮助读者更深入理解XXL-JOB的使用。
42 9
|
6月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
131 3
|
2月前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
71 3
|
2月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
47 6
|
4月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
3月前
|
消息中间件 监控 Java
您是否已集成 Spring Boot 与 ActiveMQ?
您是否已集成 Spring Boot 与 ActiveMQ?
78 0
|
6月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
6月前
|
消息中间件 分布式计算 大数据
RabbitMQ与大数据平台的集成
【8月更文第28天】在现代的大数据处理架构中,消息队列作为数据传输的关键组件扮演着重要的角色。RabbitMQ 是一个开源的消息代理软件,它支持多种消息协议,能够为分布式系统提供可靠的消息传递服务。本篇文章将探讨如何使用 RabbitMQ 与 Hadoop 和 Spark 进行集成,以实现高效的数据处理和分析。
78 1

热门文章

最新文章