[Conclusion]RabbitMQ-客户端源码之总结

简介: RabbitMQ遵从的是AMQP协议,其broker端代码采用erlang编写,对于没有接触过erlang的同学(包括博主我)来说,想要了解其中的奥秘实在是不容易,大多只能从网上“搜刮”点散碎的知识点来充实一下。

RabbitMQ遵从的是AMQP协议,其broker端代码采用erlang编写,对于没有接触过erlang的同学(包括博主我)来说,想要了解其中的奥秘实在是不容易,大多只能从网上“搜刮”点散碎的知识点来充实一下。但是这样是不能究其然,更不能究其所以然。博主这里翻阅了amqp-client的java客户端的源码,通过其来学习下AMQP协议,进而更深刻的了解RabbitMQ.

注:如无特殊说明,本系列的文章采用的amqp-client版本均为3.5.3。

本系列的文章主要是来阐述客户端与broker交互需要经历那些具体步骤,需要涉及那些重要的类以及方法,整体的轮廓又是如何。

本文主要涉及的类有(本系列的blog地址):
[一]RabbitMQ-客户端源码之ConnectionFactory
[二]RabbitMQ-客户端源码之AMQConnection
[三]RabbitMQ-客户端源码之ChannelManager
[四]RabbitMQ-客户端源码之Frame
[五]RabbitMQ-客户端源码之AMQChannel
[六]RabbitMQ-客户端源码之AMQCommand
[七]RabbitMQ-客户端源码之AMQPImpl+Method
[八]RabbitMQ-客户端源码之ChannelN
[九]RabbitMQ-客户端源码之Consumer

以发送消息来看看从源码级的逻辑流转情况。

首先看看发送消息的业务代码(部分主要的代码):

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ip);
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String message = "RabbitMQ Demo Test:" + System.currentTimeMillis();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.close();
connection.close();

从上面来看,主要牵涉ConnectionFactory, Connection, Channel这个几个类(有关Connection和Channel的AMQP流转流程可以参考文中最后部分)。实际情况是怎么样的呢,我们来分析下。

首先流转过程如下:

ConnectionFactory.newConnection()
       -- AMQConnection.start()
           -- MainLoop
            -- Frame frame =  SocketFrameHandler.readFrame()
            -- AMQChannel.handleFrame(Frame frame)
  • ConnectionFactory主要用来配置一些参数,并初始化AMQConnection, 这个版本的客户端与broker底层通信用的是java的原生Socket, 处理模块为SocketFrameHandler,SocketFrameHandler也在ConnectionFactory调用newConnection()时创建。之后根据参数以及SocketFrameHandler初始化了AMQConnection对象。
  • AMQConnection的核心在于这个start()方法。包括Protocol-Header, Connection.Start/.StartOk, Connection.Tune/.TuneOk, Connection.Open/.OpenOk,以及启动MainLoop线程
  • MainLoop是AMQConnection的内部私有类,主要用来(循环)读取(SocketFrameHandler.readFrame)并封装Socket中的帧Frame, 并进行进一步的处理,这个由AMQChannel来完成
  • ChannelManager,这个在上面并没有展示出来,但是这里也需要说明下,这个是在MainLoop中处理Frame所使用的,用来管理Channel的,确切的来说是ChannelN.
AMQChannel.handleFrame(Frame frame)
    -- AMQCommand.handleFrame(Frame frame) 
    -- AMQChannel.handleCompleteInboundCommand(AMQCommand command)
        -- ChannelN.processAsync(AMQCommand command)

这个接着上面的流程继续:

AMQChannel.handleFrame(Frame frame)首先调用AMQCommand的handleFrame(Frame frame)方法来处理,AMQCommand内部其实是调用了CommandAssember(对Method, Content-Header以及Content-Body做了一下封装,其实忽略这个类也是可以的)的handleFrame, 说白了作用就是处理下Frame帧,方法返回值是boolean, 当一个AMQComand处理完毕后返回true,否则返回false。这里就有疑问了,什么叫做处理完毕?这里就又要说到MainLoop了,MainLoop线程主要循环读取Frame帧,像Connection.Start/.StartOk这种命令(AMQCommand)一般只包括Method类型的帧(Frame),AMQCommand的handleFrame方法直接返回true,但是像Basic.Publish这种命令一般包括Method帧,Content-Header帧,以及若干Content-Body帧,就需要handleFrame多次才能返回true。

AMQP技术术语
Method: 用于在节点之间传递特定类型的AMQP命令帧。
Content: 服务器和应用程序之间传送的数据.这个术语是“message”的同义词。
Content header:描述内容属性特定类型帧。
Content body: 包含原始应用程序数据的特定类型帧.内容体帧完全不透明-服务器不以任何方式检查或修改其body内容。

public void handleFrame(Frame frame) throws IOException {
    AMQCommand command = _command;
    if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
        _command = new AMQCommand(); // prepare for the next one
        handleCompleteInboundCommand(command);
    }
}

上面这个是AMQChannel中的handleFrame方法,当内部的AMQCommand的handleFrame方法返回true,即表示处理完毕一条AMQCommand之后再调用handleCompleteInboundCommand方法进行进一步处理。而这个handleCompleteInboundCommand方法的精髓在于processAsync方法,这个processAsync方法在AMQChannel中是一个抽象方法,真正的实现要看ChannelN这个类。
说到这里有一个点我没有提及,但是这个不影响主流程的阐述,这个点就是rpc的概念,具体的可以详细参考对AMQChannel类的介绍——[五]RabbitMQ-客户端源码之AMQChannel。


文中开篇的demo示例,采用wireshark工具抓包可得:

这里写图片描述

这里用来参考,以便更好的阐述Connection类和Channel类。

Connection类
AMQP是一个连接协议. 连接设计为长期的,且可运载多个通道. 连接生命周期是这样的:

  • client打开与服务器的TCP/IP连接并发送一个协议头(protocol header).这只是client发送的数据,而不是作为方法格式的数据.
  • server使用其协议版本和其它属性,包括它支持安全机制列表(Start方法)进行响应.
  • client选择一种安全机制(Start-Ok).
  • server开始认证过程, 它使用SASL的质询-响应模型(challenge-response model). 它向客户端发送一个质询(Secure).
  • client向server发送一个认证响应(Secure-Ok). 例如,对于使用”plain”机制,响应会包含登录用户名和密码.
    (server 重复质询(Secure) 或转到协商,发送一系列参数,如最大帧大小(Tune).)
  • client接受或降低这些参数(Tune-Ok).
  • client 正式打开连接并选择一个虚拟主机(Open).
  • 服务器确认虚拟主机是一个有效的选择 (Open-Ok).
  • 客户端现在使用希望的连接.
  • 一个节点(client 或 server) 结束连接(Close).
  • 另一个节点对连接结束握手(Close-Ok).
  • server 和 client关闭它们的套接字连接.

没有为不完全打开的连接上的错误进行握手. 根据成功协议头协商(后面有详细定义),在发送或收到Open 或Open-Ok之前,如果一个节点检测到错误,这个节点必须关闭socket,而不需要发送任何进一步的数据。

Channel类
AMQP是一个多通道协议. 通道提供了一种方式来将一个重量级TCP/IP连接分成多个轻量级连接。这使得协议对于防火墙更加友好,因为端口使用是可预测的. 这也意味着传输调整和网络服务质量可以得到更好的利用。通道是独立的,它们可以同时执行不同的功能,可用带宽会在当前活动之间共享。这是令人期待的,我们鼓励多线程客户端应用程序经常使用”每个通道一个线程”编程模型.。然而,从单个client打开一个或多个AMQP servers连接也是完全可以接受的.。
通道生命周期如下:

  • client打开一个新通道(Open).
  • server确认新通道准备就绪(Open-Ok).
  • client和server按预期来使用通道.
  • 一个节点(client或server) 关闭了通道(Close).
  • 另一个节点对通道关闭进行握手(Close-Ok).

附:本系列全集

  1. [Conclusion]RabbitMQ-客户端源码之总结
  2. [一]RabbitMQ-客户端源码之ConnectionFactory
  3. [二]RabbitMQ-客户端源码之AMQConnection
  4. [三]RabbitMQ-客户端源码之ChannelManager
  5. [四]RabbitMQ-客户端源码之Frame
  6. [五]RabbitMQ-客户端源码之AMQChannel
  7. [六]RabbitMQ-客户端源码之AMQCommand
  8. [七]RabbitMQ-客户端源码之AMQPImpl+Method
  9. [八]RabbitMQ-客户端源码之ChannelN
  10. [九]RabbitMQ-客户端源码之Consumer
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件 Java Spring
RocketMQ-JAVA客户端不同版本接入方式
RocketMQ4.0 RocketMQ5.0 JAVA接入 spring springboot
RocketMQ-JAVA客户端不同版本接入方式
|
9月前
UE DTMqtt 虚幻引擎 Mqtt 客户端插件说明
UE DTMqtt 虚幻引擎 Mqtt 客户端插件说明
341 0
|
8月前
|
安全 网络协议 物联网
不看后悔系列之一篇搞懂LinuxCentOS搭建MQTT服务器及客户端操作使用
linux CentOS上搭建MQTT服务器并不难,主要就是用到了mosquitto这款消息代理服务软件。其采用发布/订阅模式传输机制,轻量、简单、开放并易于实现,被广泛应用于物联网之中。
1419 0
|
消息中间件 Apache RocketMQ
rocketmq客户端发送消息报错和超时问题
org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <10.0.21.69:10911> timeout, 1000(ms)、 closeChannel: close the connection to remote address
1894 1
rocketmq客户端发送消息报错和超时问题
|
监控 物联网 API
【.NET+MQTT】.NET6 环境下实现MQTT通信,以及服务端、客户端的双边消息订阅与发布的代码演示
MQTT广泛应用于工业物联网、智能家居、各类智能制造或各类自动化场景等。MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,在很多受限的环境下,比如说机器与机器通信、机器与物联网通信等。好了,科普的废话不多说,下面直接通过.NET环境来实现一套MQTT通信demo,实现服务端与客户端的双边消息发布与订阅的功能和演示。
962 0
【.NET+MQTT】.NET6 环境下实现MQTT通信,以及服务端、客户端的双边消息订阅与发布的代码演示
|
1月前
|
Java Maven
【开源视频联动物联网平台】vertx写一个mqtt客户端
【开源视频联动物联网平台】vertx写一个mqtt客户端
40 1
|
4月前
|
消息中间件 运维 负载均衡
负载均衡中后端连了三个rabbitmq,如果挂了一个,客户端连接mq会变慢吗
在负载均衡中使用三个 RabbitMQ 实例,如果其中一个实例发生故障,可能会影响客户端连接到 RabbitMQ 的性能。具体影响取决于负载均衡的配置和客户端的实现方式。 如果负载均衡器能够及时检测到故障的 RabbitMQ 实例并将流量路由到正常的实例,那么客户端连接的性能影响可能较小。但如果负载均衡器不能迅速切换流量或者客户端实现不支持及时的连接故障转移,那么可能会导致客户端连接的延迟或失败。 在设计这样的架构时,有一些考虑因素: 1. **健康检查和故障切换:** 确保负载均衡器能够定期检查 RabbitMQ 实例的健康状态,并在出现故障时快速将流量切换到其他正常的实例。 2.
|
5月前
|
消息中间件
RabbitMQ客户端清空所有消息
RabbitMQ客户端清空所有消息
179 0
|
5月前
|
传感器 JavaScript 物联网
如何在Node.js中使用MQTT客户端库?
如何在Node.js中使用MQTT客户端库?
70 0
|
5月前
|
物联网 Python
如何通过示例在Python中使用Paho MQTT客户端?
如何通过示例在Python中使用Paho MQTT客户端?
93 2
如何通过示例在Python中使用Paho MQTT客户端?

热门文章

最新文章