RabbitMQ详解(二)------消息通信的概念

简介:   PS:近期在南宁出差,工作比较忙,所以更新会比较慢。   说到消息通信,可能我们首先会想到的是邮箱,QQ,微信,短信等等这些通信方式,这些通信方式都有发送者,接收者,还有一个中间存储离线消息的容器。

  PS:近期在南宁出差,工作比较忙,所以更新会比较慢。

  说到消息通信,可能我们首先会想到的是邮箱,QQ,微信,短信等等这些通信方式,这些通信方式都有发送者,接收者,还有一个中间存储离线消息的容器。但是这些通信方式和我们要讲的 RabbitMQ 的通信模型是不一样的,比如和邮件的通信方式相比,邮件服务器基于 POP3/SMTP 协议,通信双方需要明确指定,并且发送的邮件内容有固定的结构。而 RabbitMQ 服务器基于 AMQP 协议,这个协议是不需要明确指定发送方和接收方的,而且发送的消息也没有固定的结构,甚至可以直接存储二进制数据,并且和邮件服务器一样,也能存储离线消息,最关键的是 RabbitMQ 既能够以一对一的方式进行路由,还能够以一对多的方式进行广播。

  下面这张图是大致展示了 RabbitMQ 消息通信的过程:

  

  ps:看不懂没关系,后面会通过具体的例子进行讲解。

1、生产者和消费者

  在 RabbitMQ 的通信过程中,有两个主要的角色:生产者和消费者。类比于邮件通信的发送方和接收方。

  这里首先我们要明确 RabbtiMQ 服务器是不能够产生数据的,正如同其名字——消息中间件,是一个用来传递消息的中间商。生产者产生创建消息,然后发布到代理服务器(RabbitMQ),而消费者则从代理服务器获取消息(不是直接找生产者要消息),而且在实际应用中,生产者和消费者也是可以角色互相转换的,所以当我们应用程序连接到 RabbitMQ 服务器时,必须要明确我是生产者呢还是消费者。

2、消息

  生产者创建消息,然后发布到 RabbitMQ 服务器中,那么什么是消息?

  这里的消息分为两部分:有效内容和内容标签。

  ①、有效内容:可以是任何内容,一个数组,一个集合,甚至二进制数据都可以。RabbitMQ 不会在意你发什么数据,尽管发就行了。

  ②、内容标签:描述有效内容,是 RabbitMQ 用来决定谁将获得消息。前面说的邮件通信,必须明确指定发送方地址和收件方地址,而基于 AMQP 协议的 RabbitMQ 则是通过生产者发送消息附带的内容标签将消息发送个感兴趣的消费者。

  

  后面我们会详细解析标签是什么,这里只需要知道生产者会创建消息并设置标签。注意最上面的大图,一般来说生产者创建消息会设置标签,但是传输到消费者那里就没有标签了,除非你在有效内容中说明谁是生产者,一般消费者是不知道谁产生的消息的。

3、信道

  生产者产生了消息,然后发布到 RabbitMQ 服务器,发布之前肯定要先连接上服务器,也就是要在应用程序和rabbitmq 服务器之间建立一条 TCP 连接,一旦连接建立,应用程序就可以创建一条 AMQP 信道。

  信道是建立在“真实的”TCP 连接内的虚拟连接,AMQP 命令都是通过信道发送出去的,每条信道都会被指派一个唯一的ID(AMQP库会帮你记住ID的),不论是发布消息、订阅队列或者接收消息,这些动作都是通过信道来完成的。

  

  可能有人会问,为什么不直接通过 TCP 连接来发送AMQP命令呢?

  这里原因是效率问题,因为对于操作系统来说,每次建立和销毁 TCP 会话是非常昂贵的开销,而实际系统中,比如电商双十一,每秒钟高峰期成千上万条连接,一般来说操作系统建立TCP连接是有数量限制的,那么这就会遇到瓶颈。

  引入信道的概念,我们可以在一条 TCP 连接上创建 N多个信道,这样既能发送命令,也能够保证每条信道的私密性,我们可以将其想象为光纤电缆。

  

4、交换器和队列

  截取上面的一部分图:

  

  交换器和队列都是 RabbitMQ 服务器的一部分,我们知道生产者会将消息发送到 RabbitMQ 服务器,而进入该服务器后,首先进入交换机部分,然后由交换器根据消息附带的内容标签,将消息绑定到相应的队列。我们首先来看什么是队列:

  ①、容纳消息的场所,生产者发送到RabbitMQ服务器的消息会在队列中等待消费者消费。

  ②、队列是 RabbitMQ 服务器中最后的终点(除非消息进入了黑洞,黑洞的概念下面会介绍)。

  ③、队列可以实现负载均衡,我们可以增加一堆消费者,然后让 RabbitMQ 以循环的方式来均匀的分配消息。

  搞清楚了队列是什么了,那么消息是如何到达队列的呢?没错,就是通过交换器。

  消息进入RabbitMQ 服务器时,会首先将消息发送到交换器,然后交换器会根据特定的路由算法以及消息的内容标签将消息绑定到相应的队列。在 AMQP 协议中有四种交换器:direct、fanout、topic和 headers,每种交换器都实现了不同的路由算法,这也对应 RabbitMQ 工作的几种不同方式,这是重点,后面博客会进行详细介绍。

5、虚拟主机

  最上面那张大图,我画了虚拟主机A以及虚拟主机B,说明在 RabbitMQ 服务器中存在着多个虚拟主机,那么虚拟主机到底是什么?

  首先我们抛出这样一个问题,一个 RabbitMQ 肯定不是只服务一个应用程序,那么多个应用程序同时使用 RabbitMQ 服务器,如何保证彼此之间不会冲突?

  答案就是使用虚拟主机,虚拟主机其实就是一个迷你版的RabbitMQ 服务器,它拥有自己的交换器和队列,更重要的是虚拟主机拥有自己的权限机制,一个服务器能够创建多个虚拟主机。那么我们在使用RabbitMQ服务器的时候,只需要将一个应用程序对应一个虚拟主机,这种各个实例间逻辑上的分离就能够保证不同的应用程序安全的传递消息。

  默认的虚拟主机是“/”。

6、简单实例

  介绍完RabbitMQ 消息通信过程中的一些基本概念后,下面我们通过一个代码实例来实际感受一下。

  这是一个Maven工程,首先我们看 pom.xml 文件:导入 amqp-client 依赖即可

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.ys.rabbitmq</groupId>
  <artifactId>RabbitMQTest</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>war</packaging>

  <name>RabbitMQTest Maven Webapp</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.4.1</version>
    </dependency>

  </dependencies>

</project>

  生产者:

 1 package com.ys.simple;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.ys.utils.ConnectionUtil;
 6 
 7 /**
 8  * Create by hadoop
 9  */
10 public class Send {
11     private final static String QUEUE_NAME = "hello";
12 
13     public static void main(String[] args) throws Exception{
14         //1、获取连接
15         Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
16         //2、声明通道
17         Channel channel = connection.createChannel();
18         //3、声明(创建)队列
19         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
20         //4、定义消息内容
21         String message = "hello rabbitmq ";
22         //5、发布消息
23         channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
24         System.out.println("[x] Sent'"+message+"'");
25         //6、关闭通道和连接
26         channel.close();
27         connection.close();
28     }
29 }

  消费者:

 1 package com.ys.simple;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.QueueingConsumer;
 6 import com.ys.utils.ConnectionUtil;
 7 
 8 
 9 /**
10  * Create by hadoop
11  */
12 public class Recv {
13 
14     private final static String QUEUE_NAME = "hello";
15 
16     public static void main(String[] args) throws Exception{
17         //1、获取连接
18         Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
19         //2、声明通道
20         Channel channel = connection.createChannel();
21         //3、声明队列
22         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
23         //4、定义队列的消费者
24         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
25         //5、监听队列
26         channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
27         //6、获取消息
28         while (true){
29             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
30             String message = new String(delivery.getBody());
31             System.out.println(" [x] Received '" + message + "'");
32         }
33     }
34 
35 }

  工具类:ConnectionUtil

 1 package com.ys.utils;
 2 
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5 
 6 /**
 7  * Create by hadoop
 8  */
 9 public class ConnectionUtil {
10 
11     public static Connection getConnection(String host,int port,String vHost,String userName,String passWord) throws Exception{
12         //1、定义连接工厂
13         ConnectionFactory factory = new ConnectionFactory();
14         //2、设置服务器地址
15         factory.setHost(host);
16         //3、设置端口
17         factory.setPort(port);
18         //4、设置虚拟主机、用户名、密码
19         factory.setVirtualHost(vHost);
20         factory.setUsername(userName);
21         factory.setPassword(passWord);
22         //5、通过连接工厂获取连接
23         Connection connection = factory.newConnection();
24         return connection;
25     }
26 }

 

作者: YSOcean
本文版权归作者所有,欢迎转载,但未经作者同意不能转载,否则保留追究法律责任的权利。
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 中间件 数据安全/隐私保护
RabbitMQ 的核心概念
RabbitMQ 的核心概念
43 2
|
2月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
3月前
|
消息中间件 Kafka 数据安全/隐私保护
RabbitMQ异步通信详解
RabbitMQ异步通信详解
95 16
|
4月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
4月前
|
物联网 C# Windows
看看如何使用 C# 代码让 MQTT 进行完美通信
看看如何使用 C# 代码让 MQTT 进行完美通信
602 0
|
4月前
|
物联网 网络性能优化 Python
"掌握MQTT协议,开启物联网通信新篇章——揭秘轻量级消息传输背后的力量!"
【8月更文挑战第21天】MQTT是一种轻量级的消息传输协议,以其低功耗、低带宽的特点在物联网和移动应用领域广泛应用。基于发布/订阅模型,MQTT支持三种服务质量级别,非常适合受限网络环境。本文详细阐述了MQTT的工作原理及特点,并提供了使用Python `paho-mqtt`库实现的发布与订阅示例代码,帮助读者快速掌握MQTT的应用技巧。
91 0
|
5月前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
134 2
|
5月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
135 1
|
5月前
|
消息中间件 存储 RocketMQ
【RocketMQ系列十】RocketMQ的核心概念说明
【RocketMQ系列十】RocketMQ的核心概念说明
73 1
下一篇
无影云桌面