RabbitMQ——消息发送和消息接收机制

简介: RabbitMQ——消息发送和消息接收机制

文章目录:


1.写在前面

2.案例详解

2.1 编写消息发送类

2.2 编写消息接收类

2.3 测试结果1

2.4 测试结果2

2.5 测试结果3

1.写在前面


所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

上面是MQ的基本抽象模型,但是不同的MQ产品有有者不同的机制,RabbitMQ实际基于AMQP协议的一个开源实现,因此RabbitMQ内部也是AMQP的基本概念。

RabbitMQ的内部接收如下:


1Message
消息,消息是不具体的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

2Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。

3Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

4Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

5Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

6Connection
网络连接,比如一个TCP连接。

7Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

8Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

9Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost /

10Broker
表示消息队列服务器实体。

2.案例详解


上面说了RabbitMQ中的消息发送和消息接收机制,下面我用一个案例来讲解一下。

首先,我们需要创建两个maven-java工程,一个表示消息发送者,另一个表示消息接收者。

在这两个项目的pom文件中加入下面的依赖:👇👇👇

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>5.10.0</version>
</dependency>

2.1 编写消息发送类

package com.szh.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 *
 */
public class Send {
    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //配置RabbitMQ的连接相关信息
        factory.setHost("192.168.40.130"); //ip
        factory.setPort(5672); //端口号
        factory.setUsername("root"); //用户名
        factory.setPassword("root"); //密码
        Connection connection=null; //定义连接
        Channel channel=null;//定义通道
        try {
            connection=factory.newConnection(); //获取连接
            channel=connection.createChannel(); //获取通道
            /**
             * 声明一个队列
             * 参数1:队列名称取值 任意的
             * 参数2:是否为持久化队列
             * 参数3:是否排外,如果排外,则这个队列只允许一个消费者监听
             * 参数4:是否自动删除队列,如果为true表示当队列中没有消息,也没有消费者连接时,会自动删除这个队列
             * 参数5:队列的其他属性,通常设置为null即可
             * 注意:
             *      1) 声明队列时,这个队列名称如果存在,则放弃声明;如果不存在,则会声明一个新的队列
             *      2) 队列名可以任意设置,但是要与消息接收时的队列名一致
             *      3) 这行代码可有可无,但是一定要在发送消息前确认队列名已经存在于RabbitMQ中,否则会出现问题
             */
            channel.queueDeclare("myQueue",true,false,false,null);
            String message="RabbitMQ测试发送消息"; //定义需要发送的消息
            /**
             * 发送消息到MQ
             * 参数1:交换机名称,这里为空是因为不使用交换机
             * 参数2:如果不指定交换机,这个值就是队列名称;如果指定了交换机,这个值就是RoutingKey
             * 参数3:消息属性信息,null即可
             * 参数4:具体的消息数据内容、字符集格式
             */
            channel.basicPublish("","myQueue",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息发送成功:" + message);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

2.2 编写消息接收类

package com.szh.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 *
 */
public class Receive {
    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //配置RabbitMQ的连接相关信息
        factory.setHost("192.168.40.130"); //ip
        factory.setPort(5672); //端口号
        factory.setUsername("root"); //用户名
        factory.setPassword("root"); //密码
        Connection connection=null; //定义连接
        Channel channel=null;//定义通道
        try {
            connection=factory.newConnection(); //获取连接
            channel=connection.createChannel(); //获取通道
            channel.queueDeclare("myQueue",true,false,false,null);
            /**
             * 接收消息
             * 参数1:当前消费者需要监听的队列,该队列名必须要与发送时的队列名一致,否则接收不到消息
             * 参数2:消息是否自动确认,true表示自动确认,接收完消息后会自动将消息从队列中移除;false则相反
             * 参数3:消费者的标签,用于当多个消费者同时监听一个队列时,来确认不同的消费者,通常为空字符串即可
             * 参数4:消费者加收的回调方法,这个方法中具体完成对消息的处理
             * 注意:使用了basicConsume方法以后,会启动一个线程在持续监听队列,如果队列中有消息,则会自动接收消息,因此不能关闭连接和通道对象
             */
            channel.basicConsume("myQueue",true,new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //获取消息数据
                    String message=new String(body,"utf-8");
                    System.out.println("消息接收成功:" + message);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

注意:

1Queue的消息只能被同一个消费者消费,如果没有消费监听队列那么消息会存放到队列中持久化保存,直到有消费者来消费这个消息,如果以有消费者监听队列则立即消费发送到队列中的消息

2Queue的消息可以保证每个消息都一定能被消费


2.3 测试结果1

首先确保你的RabbitMQ是开启状态,在Linux中执行 ps -ef | grep rabbitmq 命令即可查看,如果没启动,则执行 rabbitmq-server start & 命令即可。之后执行 rabbitmqctl add_user root root 添加一个root用户,再执行 rabbitmqctl set_permissions -p / root '.*' '.*' '.*' 用于设置root用户拥有对所有资源的读写配置权限。

然后在浏览器中输入 http:// RabbitMQ服务器ip:15672,访问登录(用户名root,密码root),之后执行消息发送类。


执行完消息发送类之后,等待大概5秒,可以在RabbitMQ的管理界面看到队列中接收到了一条数据。


这里不使用消息接收类也可以将数据取出。


2.4 测试结果2

先执行消息发送类。


之后再执行消息接收类。


而在RabbitMQ的管理界面中可以看到的是,执行完消息发送类之后,队列中多了一条数据,此时再执行消息接收类,那么队列中的数据就会被取出。


2.5 测试结果3

首先执行消息接收类,确保消息的接收者一直处于监听消息队列的状态,然后我们依次执行消息发送类,看看会出现什么结果。


可以看到的是,当消息接收者一直处于监听消息队列的时候,当我们的消息发送者每发送一个数据,消息接收者它就会知道,然后就取走消息队列中的数据。

 

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 存储 监控
|
6月前
|
消息中间件 存储 运维
|
6月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
98 0
|
4月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
89 0
|
4月前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
3月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
72 0
|
3月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
58 0
|
3月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
51 0
|
5月前
|
消息中间件 监控 Java
使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收
使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收
483 3