RabbitMQ:第三章:Springboot集成RabbitMQ(直连模式,工作队列模式,发布订阅模式,路由模式,通配符模式)

简介: RabbitMQ:第三章:Springboot集成RabbitMQ(直连模式,工作队列模式,发布订阅模式,路由模式,通配符模式)

系列文章目录

RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)

RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)

RabbitMQ:第三章:Springboot集成RabbitMQ(直连模式,工作队列模式,发布订阅模式,路由模式,通配符模式

RabbitMQ:第四章:RabbitMQ集群搭建

前言

提示:Springboot集成Rabbitmq实战案例,通过接口调用的方式演示。


提示:以下是本篇文章正文内容,下面案例可供参考

一、集成步骤

一、生产者:

  1. 创建生产者SpringBoot工程
  2. 引入pom依赖
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 编写rabbitmq配置消息
  2. 定义交换机,队列以及绑定关系的配置类
  3. 注入RabbitTemplate,调用方法,完成消息发送

二、消费者:

  1. 创建生产者SpringBoot工程
  2. 引入pom依赖
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 编写rabbitmq配置消息
  2. 定义监听类,使用@RabbitListener注解完成队列监听。

二、实现步骤

1.项目架构

2.创建项目

代码如下(示例):

1.pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.sky</groupId>
    <artifactId>springboot-rabbitmq-module</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq-module</name>
    <description>springboot-rabbitmq-module</description>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <finalName>springboot_rabbitmq</finalName>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
            <plugins>
                <plugin>
                    <artifactId>maven-clean-plugin</artifactId>
                    <version>3.1.0</version>
                </plugin>
                <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <version>3.0.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                </plugin>
                <plugin>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.22.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-war-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-install-plugin</artifactId>
                    <version>2.5.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-deploy-plugin</artifactId>
                    <version>2.8.2</version>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

2.application.properties配置

server.port=8080
#spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.addresses=110.42.239.246
spring.rabbitmq.virtual-host=springboot
#spring.rabbitmq.addresses=110.42.239.246:5672,110.42.239.247:5672,110.42.239.248:5672

说明:这里免费提供rabbitmq连接方式给大家使用学习

3.config配置

HelloWorldConfig

package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * HelloWorld rabbitmq课上讲解的第一个工作模式
 * 直连模式只需要声明队列,所有消息都通过队列转发。
 * 无需设置交换机
 */
@Configuration
public class HelloWorldConfig {
  @Bean
  public Queue setQueue() {
    return new Queue("helloWorldqueue");
  }
}

FanoutConfig

package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * Fanout模式需要声明exchange,并绑定queue,由exchange负责转发到queue上。
 * 广播模式 交换机类型设置为:fanout
 */
@Configuration
public class FanoutConfig {
  //声明队列
  @Bean
  public Queue fanoutQ1() {
    return new Queue("fanout.q1");
  }
  @Bean
  public Queue fanoutQ2() {
    return new Queue("fanout.q2");
  }
  //声明exchange
  @Bean
  public FanoutExchange setFanoutExchange() {
    return new FanoutExchange("fanoutExchange");
  }
  //声明Binding,exchange与queue的绑定关系
  @Bean
  public Binding bindQ1() {
    return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange());
  }
  @Bean
  public Binding bindQ2() {
    return BindingBuilder.bind(fanoutQ2()).to(setFanoutExchange());
  }
}

WorkConfig

package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class WorkConfig {
    //声明队列
    @Bean
    public Queue workQ1() {
        return new Queue("work_sb_mq_q");
    }
}

DirectConfig

package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*
   路由模式|Routing模式   交换机类型:direct
*/
@Configuration
public class DirectConfig {
  //声明队列
  @Bean
  public Queue directQ1() {
    return new Queue("direct_sb_mq_q1");
  }
  @Bean
  public Queue directQ2() {
    return new Queue("direct_sb_mq_q2");
  }
  //声明exchange
  @Bean
  public DirectExchange setDirectExchange() {
    return new DirectExchange("directExchange");
  }
  //声明binding,需要声明一个routingKey
  @Bean
  public Binding bindDirectBind1() {
    return BindingBuilder.bind(directQ1()).to(setDirectExchange()).with("directBind.one");
  }
  @Bean
  public Binding bindDirectBind2() {
      return BindingBuilder.bind(directQ2()).to(setDirectExchange()).with("directBind.two");
  }
}

TopicConfig

package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*
Topics模式  交换机类型 topic
* */
@Configuration
public class TopicConfig {
  //声明队列
  @Bean
  public Queue topicQ1() {
    return new Queue("topic_sb_mq_q1");
  }
  @Bean
  public Queue topicQ2() {
    return new Queue("topic_sb_mq_q2");
  }
  //声明exchange
  @Bean
  public TopicExchange setTopicExchange() {
    return new TopicExchange("topicExchange");
  }
  //声明binding,需要声明一个roytingKey
  @Bean
  public Binding bindTopicHebei1() {
    return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("directBind.*");
  }
  @Bean
  public Binding bindTopicHebei2() {
    return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.two");
  }
}

4.消费端component

package com.sky.springbootrabbitmqmodule.component;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class ConcumerReceiver {
  //直连模式的多个消费者,会分到其中一个消费者进行消费。类似task模式
  //通过注入RabbitContainerFactory对象,来设置一些属性,相当于task里的channel.basicQos
  @RabbitListener(queues="helloWorldqueue")
  public void helloWorldReceive(String message) {
       System.out.println("helloWorld模式 received message : " +message);
  }
  //工作队列模式
    @RabbitListener(queues="work_sb_mq_q")
    public void wordQueueReceiveq1(String message) {
    System.out.println("工作队列模式1 received message : " +message);
    }
    @RabbitListener(queues="work_sb_mq_q")
    public void wordQueueReceiveq2(String message) {
    System.out.println("工作队列模式2 received message : " +message);
    }
  //pub/sub模式进行消息监听
  @RabbitListener(queues="fanout.q1")
  public void fanoutReceiveq1(String message) {
      System.out.println("发布订阅模式1received message : " +message);
  }
  @RabbitListener(queues="fanout.q2")
  public void fanoutReceiveq2(String message) {
      System.out.println("发布订阅模式2 received message : " +message);
  }
    //Routing路由模式
    @RabbitListener(queues="direct_sb_mq_q1")
    public void routingReceiveq1(String message) {
      System.out.println("Routing路由模式routingReceiveqOne received message : " +message);
    }
    @RabbitListener(queues="direct_sb_mq_q2")
    public void routingReceiveq2(String message) {
      System.out.println("Routing路由模式routingReceiveqTwo received message : " +message);
    }
    //topic 模式
  //注意这个模式会有优先匹配原则。例如发送routingKey=hunan.IT,那匹配到hunan.*(hunan.IT,hunan.eco),之后就不会再去匹配*.ITd
  @RabbitListener(queues="topic_sb_mq_q1")
  public void topicReceiveq1(String message) {
    System.out.println("Topic模式 topic_sb_mq_q1 received message : " +message);
  }
  @RabbitListener(queues="topic_sb_mq_q2")
  public void topicReceiveq2(String message) {
    System.out.println("Topic模式 topic_sb_mq_q2 received  message : " +message);
  }
}

5.生产者controller

package com.sky.springbootrabbitmqmodule.controller;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.UnsupportedEncodingException;
@RestController
public class ProducerController {
  @Autowired
  private RabbitTemplate rabbitTemplate;
  //helloWorld 直连模式
  @GetMapping(value="/helloWorldSend")
  public Object helloWorldSend(String message) throws AmqpException, UnsupportedEncodingException {
    //设置部分请求参数
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
    //发消息
    rabbitTemplate.send("helloWorldqueue",new Message(message.getBytes("UTF-8"),messageProperties));
    return "message sended : "+message;
  }
  //工作队列模式
  @GetMapping(value="/workqueueSend")
  public Object workqueueSend(String message) throws AmqpException, UnsupportedEncodingException {
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
    //制造多个消息进行发送操作
    for (int i = 0; i <10 ; i++) {
      rabbitTemplate.send("work_sb_mq_q",  new Message(message.getBytes("UTF-8"),messageProperties));
    }
    return "message sended : "+message;
  }
  // pub/sub 发布订阅模式   交换机类型 fanout
  @GetMapping(value="/fanoutSend")
  public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException {
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
    //fanout模式只往exchange里发送消息。分发到exchange下的所有queue
    rabbitTemplate.send("fanoutExchange", "", new Message(message.getBytes("UTF-8"),messageProperties));
    return "message sended : "+message;
  }
  //routing路由工作模式  交换机类型 direct
  @GetMapping(value="/directSend")
  public Object routingSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {
    if(null == routingKey) {
      routingKey="directBind.one";
    }
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
    //fanout模式只往exchange里发送消息。分发到exchange下的所有queue
    rabbitTemplate.send("directExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
    return "message sended : routingKey >"+routingKey+";message > "+message;
  }
  //topic 工作模式   交换机类型 topic
  @GetMapping(value="/topicSend")
  public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {
    if(null == routingKey) {
      routingKey="directBind.one";
    }
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
    //fanout模式只往exchange里发送消息。分发到exchange下的所有queue
    rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
    return "message sended : routingKey >"+routingKey+";message > "+message;
  }
}

上面就是这个项目的所有代码了,下面就是Demo演示内容。


三、演示步骤

1.启动项目

2.调用接口演示

1.直连模式

1.接口调用


2.控制台打印

2.工作队列模式

1.接口调用



2.控制台打印


3.发布订阅模式(交换机类型:fanout)

1.接口调用

2.控制台打印

4.路由工作模式(交换机类型:direct)

1.接口调用


2.控制台打印

5.通配符模式(交换机类型:topic)

1.接口调用

2.控制台打印


除此之外,我还提供了项目地址提供给大家clone,地址链接:https://gitee.com/java_wxid/liao

总结

提示:以上就是今天要讲的内容,本文介绍了Springboot如何快速集成Rabbitmq,提供了五种模式的Demo案例演示给大家参考,希望对大家有所帮助。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3天前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
9天前
|
存储 前端开发 Java
Spring Boot 集成 MinIO 与 KKFile 实现文件预览功能
本文详细介绍如何在Spring Boot项目中集成MinIO对象存储系统与KKFileView文件预览工具,实现文件上传及在线预览功能。首先搭建MinIO服务器,并在Spring Boot中配置MinIO SDK进行文件管理;接着通过KKFileView提供文件预览服务,最终实现文档管理系统的高效文件处理能力。
|
13天前
|
XML Java 关系型数据库
springboot 集成 mybatis-plus 代码生成器
本文介绍了如何在Spring Boot项目中集成MyBatis-Plus代码生成器,包括导入相关依赖坐标、配置快速代码生成器以及自定义代码生成器模板的步骤和代码示例,旨在提高开发效率,快速生成Entity、Mapper、Mapper XML、Service、Controller等代码。
springboot 集成 mybatis-plus 代码生成器
|
13天前
|
Java Spring
springboot 集成 swagger 2.x 和 3.0 以及 Failed to start bean ‘documentationPluginsBootstrapper‘问题的解决
本文介绍了如何在Spring Boot项目中集成Swagger 2.x和3.0版本,并提供了解决Swagger在Spring Boot中启动失败问题“Failed to start bean ‘documentationPluginsBootstrapper’; nested exception is java.lang.NullPointerEx”的方法,包括配置yml文件和Spring Boot版本的降级。
springboot 集成 swagger 2.x 和 3.0 以及 Failed to start bean ‘documentationPluginsBootstrapper‘问题的解决
|
2月前
|
Java Spring
【Azure Developer】Springboot 集成 中国区的Key Vault 报错 AADSTS90002: Tenant 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx' not found
【Azure Developer】Springboot 集成 中国区的Key Vault 报错 AADSTS90002: Tenant 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx' not found
|
3月前
|
监控 druid Java
spring boot 集成配置阿里 Druid监控配置
spring boot 集成配置阿里 Druid监控配置
202 6
|
3月前
|
Java 关系型数据库 MySQL
如何实现Springboot+camunda+mysql的集成
【7月更文挑战第2天】集成Spring Boot、Camunda和MySQL的简要步骤: 1. 初始化Spring Boot项目,添加Camunda和MySQL驱动依赖。 2. 配置`application.properties`,包括数据库URL、用户名和密码。 3. 设置Camunda引擎属性,指定数据源。 4. 引入流程定义文件(如`.bpmn`)。 5. 创建服务处理流程操作,创建控制器接收请求。 6. Camunda自动在数据库创建表结构。 7. 启动应用,测试流程启动,如通过服务和控制器开始流程实例。 示例代码包括服务类启动流程实例及控制器接口。实际集成需按业务需求调整。
224 4
|
3月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
188 1
|
4月前
|
消息中间件 Java Kafka
springboot集成kafka
springboot集成kafka
125 2
|
3月前
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka Streams的集成
Spring Boot与Apache Kafka Streams的集成