RabbitMQ入门指南(三):Java入门示例

简介: RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了AMQP、Spring AMQP和使用SpringAMQP实现对RabbitMQ的消息收发等内容。

一、AMQP协议

1.AMQP

全称为Advanced Message Queuing Protocol,是一种用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。通过AMQP,不同的应用程序可以在不改变各自实现方式的情况下进行跨平台、跨语言的消息通信。

AMQP协议定义了消息的传输方式和消息的元数据,例如消息的发送者、接收者、消息体、消息类型等。这些元数据可以帮助应用程序对消息进行正确的处理。

2.Spring AMQP

在Spring框架中,有一个Spring AMQP的项目,它基于AMQP协议定义了一套API规范,提供了模板来发送和接收消息。这个项目包含两部分,其中spring-amqp是基础抽象,而spring-rabbit是底层的默认实现。

Spring AMQP通过提供模板和抽象层,简化了应用程序与RabbitMQ的交互。它提供了一组易于使用的API,用于发送和接收消息。这些API可以帮助开发人员更专注于业务逻辑,而不是消息的发送和接收细节。

spring-rabbit是Spring AMQP的一部分,它基于RabbitMQ实现了AMQP协议。spring-rabbit提供了对RabbitMQ的封装,使开发人员可以通过简单的配置和API调用与RabbitMQ进行交互。

Spring AMQP 主要功能:

  • 自动声明和配置队列、交换机及其绑定关系:通过简化队列和交换器的创建和管理过程,Spring AMQP 帮助开发人员专注于实现业务逻辑,而不是手动配置消息中间件。
  • 基于注解的监听器模式,实现异步消息接收:通过注解,Spring AMQP 可以自动将方法与特定的队列或交换机绑定,从而实现异步接收和处理消息。这种模式提高了应用程序的响应性能和吞吐量。
  • 封装了 RabbitTemplate 工具,用于发送消息:RabbitTemplate 是 RabbitMQ 的核心类之一,用于发送和接收消息。Spring AMQP 提供了对这个工具的封装,使得开发人员可以方便地使用它来发送消息。

官方文档:

https://spring.io/projects/spring-amqp

二、使用Spring AMQP实现对RabbitMQ的消息收发

1.案例准备阶段

项目结构如下:

image.gif 项目结构介绍:

  • mq-demo:父工程,管理项目依赖
  • publisher:消息的发送者
  • consumer:消息的消费者


在父工程引入spring-amqp依赖:

<!--AMQP依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

image.gif

项目完整依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.rye.demo</groupId>
    <artifactId>mq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>
    <packaging>pom</packaging>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.15</version>
        <relativePath/>
    </parent>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <!--Jackson-->
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
        </dependency>
    </dependencies>
</project>

image.gif

在application.yml中配置RabbitMQ服务端信息(每个微服务都需要配置):

spring:
  rabbitmq:
    host: 10.0.0.100
    port: 5672
    virtual-host: /demo
    username: user
    password: 123456

image.gif

2.入门案例(无交换机)

案例模型:

image.gif 在RabbitMQ管理控制台新建队列:

image.gif 查看新建结果:

image.gif 在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送

@Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void testSendMessage2Queue() {
        // 队列名称
        String queueName = "demo.queue";
        // 消息
        String msg = "First demo";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, msg);
    }

image.gif

运行测试用例,查看结果:

image.gif


在consumer服务中新建一个类实现消息接收

@Component
public class MqListener {
    @RabbitListener(queues = "demo.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消息:" + msg);
    }
}

image.gif

启动consumer服务,查看消息(一旦监听的队列中有了消息,就会推送给当前服务):


3.任务模型案例(Work Queues)

让多个消费者绑定一个队列,共同消费队列中的消息。

案例模型:


在RabbitMQ管理控制台新建队列:


查看新建结果:


在publisher服务中的测试类添加一个测试方法(通过循环发送,模拟大量消息堆积现象 ):

@Test
    void testWorkQueue() throws InterruptedException {
        String queueName = "work.queue";
        for (int i = 1; i <= 50; i++) {
            String msg = "Work Queues " + i;
            rabbitTemplate.convertAndSend(queueName, msg);
            Thread.sleep(20);
        }
    }

image.gif

在consumer服务的类中添加2个新的方法,模拟多个消费者绑定同一个队列 :

@RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收work.queue消息:" + msg);
    }
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收work.queue消息:" + msg);
    }

image.gif

运行结果:


修改consumer服务类中的方法:

  • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
  • 消费者2 sleep了200毫秒,相当于每秒处理5个消息
@RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收work.queue消息:" + msg);
        Thread.sleep(20);
    }
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收work.queue消息:" + msg);
        Thread.sleep(200);
    }

image.gif

重启后查看运行结果:


以上结果表明:默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者,并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

修改consumer服务的application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息(每次只能获取一条消息,处理完成才能获取下一个消息):

spring:
  rabbitmq:
    host: 10.0.0.100
    port: 5672
    virtual-host: /demo
    username: user
    password: 123456
    listener:
      simple:
        prefetch: 1

image.gif

重启后查看运行结果:



总结

RabbitMQ是一个开源的消息队列软件,旨在提供可靠的消息传递和消息队列功能。本文主要介绍了AMQP、Spring AMQP和使用Spring AMQP实现对RabbitMQ的消息收发等内容,希望对大家有所帮助。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
24天前
|
自然语言处理 Java
Java中的字符集编码入门-增补字符(转载)
本文探讨Java对Unicode的支持及其发展历程。文章详细解析了Unicode字符集的结构,包括基本多语言面(BMP)和增补字符的表示方法,以及UTF-16编码中surrogate pair的使用。同时介绍了代码点和代码单元的概念,并解释了UTF-8的编码规则及其兼容性。
98 60
|
1月前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
85 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
1月前
|
监控 架构师 Java
Java虚拟机调优的艺术:从入门到精通####
本文作为一篇深入浅出的技术指南,旨在为Java开发者揭示JVM调优的神秘面纱,通过剖析其背后的原理、分享实战经验与最佳实践,引领读者踏上从调优新手到高手的进阶之路。不同于传统的摘要概述,本文将以一场虚拟的对话形式,模拟一位经验丰富的架构师向初学者传授JVM调优的心法,激发学习兴趣,同时概括性地介绍文章将探讨的核心议题——性能监控、垃圾回收优化、内存管理及常见问题解决策略。 ####
|
2月前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
59 3
|
2月前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
2月前
|
Java 大数据 API
14天Java基础学习——第1天:Java入门和环境搭建
本文介绍了Java的基础知识,包括Java的简介、历史和应用领域。详细讲解了如何安装JDK并配置环境变量,以及如何使用IntelliJ IDEA创建和运行Java项目。通过示例代码“HelloWorld.java”,展示了从编写到运行的全过程。适合初学者快速入门Java编程。
|
2月前
|
安全 Java 调度
Java中的多线程编程入门
【10月更文挑战第29天】在Java的世界中,多线程就像是一场精心编排的交响乐。每个线程都是乐团中的一个乐手,他们各自演奏着自己的部分,却又和谐地共同完成整场演出。本文将带你走进Java多线程的世界,让你从零基础到能够编写基本的多线程程序。
42 1
|
2月前
|
Java 程序员 数据库连接
Java中的异常处理:从入门到精通
在Java编程的海洋中,异常处理是一艘不可或缺的救生艇。它不仅保护你的代码免受错误数据的侵袭,还能确保用户体验的平稳航行。本文将带你领略异常处理的风浪,让你学会如何在Java中捕捉、处理和预防异常,从而成为一名真正的Java航海家。
|
3月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
241 12
|
3月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
124 9