[五]RabbitMQ-客户端源码之AMQChannel

简介: AMQChannel是一个抽象类,是ChannelN的父类。其中包含唯一的抽象方法:/** * Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand.

AMQChannel是一个抽象类,是ChannelN的父类。其中包含唯一的抽象方法:

/**
 * Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method
 * returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as
 * usual. This is used in subclasses to implement handling of Basic.Return and Basic.Deliver messages, as well as Channel.Close and Connection.Close.
 * @param command the command to handle asynchronously
 * @return true if we handled the command; otherwise the caller should consider it "unhandled"
 */
public abstract boolean processAsync(Command command) throws IOException;

有关processAsync()这个方法的会在介绍ChannelN类的时候详细阐述([八]RabbitMQ-客户端源码之ChannelN)。


首先来说下AMQChannel的成员变量:

protected final Object _channelMutex = new Object();
/** The connection this channel is associated with. */
private final AMQConnection _connection;
/** This channel's channel number. */
private final int _channelNumber;
/** Command being assembled */
private AMQCommand _command = new AMQCommand();
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
private RpcContinuation _activeRpc = null;
/** Whether transmission of content-bearing methods should be blocked */
public volatile boolean _blockContent = false;
  • _channelMutex这个是内部用来当对象锁的,没有实际的意义,可忽略
  • _connection是指AMQConnection这个对象。
  • _channelNumber是指channel number, 这个应该不用多解释了吧。通道编号为0的代表全局连接中的所有帧,1-65535代表特定通道的帧.
  • _command是内部处理使用的对象,调用AMQCommand的方法来处理一些东西。
  • _activeRpc是指当前未处理完的rpc请求(the current outstanding rpc request)。
  • _blockContent 是在Channel.Flow里用到的,其余情况都是false
    在AMQChannel的构造函数中,只有两个参数:AMQConnection connection以及int channelNumber.

AMQChannel中有个handleFrame方法:

/**
 * Private API - When the Connection receives a Frame for this
 * channel, it passes it to this method.
 * @param frame the incoming frame
 * @throws IOException if an error is encountered
 */
public void handleFrame(Frame frame) throws IOException {
    AMQCommand command = _command;
    if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
        _command = new AMQCommand(); // prepare for the next one
        handleCompleteInboundCommand(command);
    }
}

/**
 * Private API - handle a command which has been assembled
 * @throws IOException if there's any problem
 *
 * @param command the incoming command
 * @throws IOException
 */
public void handleCompleteInboundCommand(AMQCommand command) throws IOException {
    // First, offer the command to the asynchronous-command
    // handling mechanism, which gets to act as a filter on the
    // incoming command stream.  If processAsync() returns true,
    // the command has been dealt with by the filter and so should
    // not be processed further.  It will return true for
    // asynchronous commands (deliveries/returns/other events),
    // and false for commands that should be passed on to some
    // waiting RPC continuation.
    if (!processAsync(command)) {
        // The filter decided not to handle/consume the command,
        // so it must be some reply to an earlier RPC.
        nextOutstandingRpc().handleCommand(command);
        markRpcFinished();
    }
}

这个在[六]RabbitMQ-客户端源码之AMQCommand有所介绍,主要是用来处理Frame帧的,当调用AMQCommand的handleFrame处理之后返回为true是,即处理完毕时继续调用handleCompleteInboundCommand方法。这其中也牵涉到AMQConnection的MainLoop内部类,具体可以看看:[六]RabbitMQ-客户端源码之AMQCommand


AMQChannel中有很多方法带有rpc的字样,这来做一个整理。
首先是:

public void enqueueRpc(RpcContinuation k)
{
    synchronized (_channelMutex) {
        boolean waitClearedInterruptStatus = false;
        while (_activeRpc != null) {
            try {
                _channelMutex.wait();
            } catch (InterruptedException e) {
                waitClearedInterruptStatus = true;
            }
        }
        if (waitClearedInterruptStatus) {
            Thread.currentThread().interrupt();
        }
        _activeRpc = k;
    }
}

这个方法在AMQConnection.start()方法中有过使用:_channel0.enqueueRpc(conStartBroker)。这个方法就是将参数付给成员变量_activeRpc,至于这个RpcContinuation到底是个什么gui,我们下面再讲。
继续下一个方法:

public boolean isOutstandingRpc()
{
    synchronized (_channelMutex) {
        return (_activeRpc != null);
    }
}

这个方法是判断一下当前的_activeRpc是否为null,为null则为false,否则为true。看方法的名字应该猜出大半。
下面一个方法:

public RpcContinuation nextOutstandingRpc()
{
    synchronized (_channelMutex) {
        RpcContinuation result = _activeRpc;
        _activeRpc = null;
        _channelMutex.notifyAll();
        return result;
    }
}

方法将当前的_activeRpc返回,并置AQMChannel的_activeRpc为null。

接下来几个方法联系性很强:

/**
 * Protected API - sends a {@link Method} to the broker and waits for the
 * next in-bound Command from the broker: only for use from
 * non-connection-MainLoop threads!
 */
public AMQCommand rpc(Method m)
    throws IOException, ShutdownSignalException
{
    return privateRpc(m);
}

public AMQCommand rpc(Method m, int timeout)
        throws IOException, ShutdownSignalException, TimeoutException {
    return privateRpc(m, timeout);
}

private AMQCommand privateRpc(Method m)
    throws IOException, ShutdownSignalException
{
    SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
    rpc(m, k);
    // At this point, the request method has been sent, and we
    // should wait for the reply to arrive.
    //
    // Calling getReply() on the continuation puts us to sleep
    // until the connection's reader-thread throws the reply over
    // the fence.
    return k.getReply();
}

private AMQCommand privateRpc(Method m, int timeout)
        throws IOException, ShutdownSignalException, TimeoutException {
    SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
    rpc(m, k);

    return k.getReply(timeout);
}

public void rpc(Method m, RpcContinuation k)
    throws IOException
{
    synchronized (_channelMutex) {
        ensureIsOpen();
        quiescingRpc(m, k);
    }
}

public void quiescingRpc(Method m, RpcContinuation k)
    throws IOException
{
    synchronized (_channelMutex) {
        enqueueRpc(k);
        quiescingTransmit(m);
    }
}

主要是看最后一个方法——quiescingRpc.这个方法说白就两行代码:
enqueueRpc(k);是将由privateRpc等方法内部创建的SimpleBlockingRpcContinuation对象附给当前的AQMChannel对象的成员变量_activeRpc
关于quiescingTransmit(m)就要接下去看了:

public void quiescingTransmit(Method m) throws IOException {
    synchronized (_channelMutex) {
        quiescingTransmit(new AMQCommand(m));
    }
}
public void quiescingTransmit(AMQCommand c) throws IOException {
    synchronized (_channelMutex) {
        if (c.getMethod().hasContent()) {
            while (_blockContent) {
                try {
                    _channelMutex.wait();
                } catch (InterruptedException e) {}

                // This is to catch a situation when the thread wakes up during
                // shutdown. Currently, no command that has content is allowed
                // to send anything in a closing state.
                ensureIsOpen();
            }
        }
        c.transmit(this);
    }
}

上面代码只需要看: c.transmit(this);这一句,其余的都是摆设。看到这里,就调用了AMQCommand的transmit方法,这个transmit方法就是讲AMQChannel中封装的内容发给broker,然后等待broker返回,进而通过之前附值的_activeRpc来处理回传的帧。

虽然之前在AMQConnection([二]RabbitMQ-客户端源码之AMQConnection)中详细讲述了start()方法,但是这里还是要来拿这个来举例这个AMQChannel中的rpc怎么使用
在AMQConnection中有这么一段代码:

Method method = (challenge == null)
                        ? new AMQP.Connection.StartOk.Builder()
                                  .clientProperties(_clientProperties)
                                  .mechanism(sm.getName())
                                  .response(response)
                                  .build()
                        : new AMQP.Connection.SecureOk.Builder().response(response).build();

try {
    Method serverResponse = _channel0.rpc(method, HANDSHAKE_TIMEOUT/2).getMethod();
    if (serverResponse instanceof AMQP.Connection.Tune) {
        connTune = (AMQP.Connection.Tune) serverResponse;
    } else {
        challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
        response = sm.handleChallenge(challenge, this.username, this.password);
    }

客户端将Method封装成Connection.StartOk帧之后等待broker返回Connection.Tune帧。
此时调用了AMQChannel的rpc(Method m, int timeout)方法,其间接调用了AMQChannel的privateRpc(Method, int timeout)方法。代码详情上面已经罗列出来。

注意privateRpc(Method, int timeout)方法的最有一句返回:return k.getReply(timeout);这句代码的意思是SimpleBlockingRpcContinuation对象在等待broker的返回,确切的来说是MainLoop线程处理之后返回,即AMQChannel类中handleCompleteInboundCommand方法的nextOutstandingRpc().handleCommand(command)这行代码。


AQMChannel还有些其他的内容,都是边缘性的东西,这里还剩下个RpcContinuation要着重阐述下的:

public interface RpcContinuation {
    void handleCommand(AMQCommand command);
    void handleShutdownSignal(ShutdownSignalException signal);
}

public static abstract class BlockingRpcContinuation<T> implements RpcContinuation {
    public final BlockingValueOrException<T, ShutdownSignalException> _blocker =
        new BlockingValueOrException<T, ShutdownSignalException>();

    public void handleCommand(AMQCommand command) {
        _blocker.setValue(transformReply(command));
    }

    public void handleShutdownSignal(ShutdownSignalException signal) {
        _blocker.setException(signal);
    }

    public T getReply() throws ShutdownSignalException
    {
        return _blocker.uninterruptibleGetValue();
    }

    public T getReply(int timeout)
        throws ShutdownSignalException, TimeoutException
    {
        return _blocker.uninterruptibleGetValue(timeout);
    }

    public abstract T transformReply(AMQCommand command);
}

public static class SimpleBlockingRpcContinuation
    extends BlockingRpcContinuation<AMQCommand>
{
    public AMQCommand transformReply(AMQCommand command) {
        return command;
    }
}

RPCContinuation只是一个接口,而BlockingRpcContinuation这个抽象类缺似乎略有门道。而SimpleBlockingRpcContinuation只是将BlockingRpcContinuation中的handleCommand方法便成为:

_blocker.setValue(command);

BlockingRpcContinuation类主要操纵了BlockingValueOrException _blocker这个成员变量。再接下深究BlockingValueOrException其实是继承了BlockingCell,对其做了一下简单的封装。最后来看下BlockingCell是个什么鬼, 截取部分代码如下:

public class BlockingCell<T> {
    private boolean _filled = false;
    private T _value;

    public synchronized T get() throws InterruptedException {
        while (!_filled) {
            wait();
        }
        return _value;
    }

其实这个就是capacity为1的BlockingQueue,顾美其名曰BlockingCell,绕了大半圈,原来AMQChannel中的_activeRpc就是个这么玩意儿~


附:本系列全集

  1. [Conclusion]RabbitMQ-客户端源码之总结
  2. [一]RabbitMQ-客户端源码之ConnectionFactory
  3. [二]RabbitMQ-客户端源码之AMQConnection
  4. [三]RabbitMQ-客户端源码之ChannelManager
  5. [四]RabbitMQ-客户端源码之Frame
  6. [五]RabbitMQ-客户端源码之AMQChannel
  7. [六]RabbitMQ-客户端源码之AMQCommand
  8. [七]RabbitMQ-客户端源码之AMQPImpl+Method
  9. [八]RabbitMQ-客户端源码之ChannelN
  10. [九]RabbitMQ-客户端源码之Consumer
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
89 12
|
2月前
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
49 2
|
7月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ产品使用合集之如何关闭客户端的日志记录
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 数据可视化 Go
Rabbitmq 搭建使用案例 [附源码]
Rabbitmq 搭建使用案例 [附源码]
58 0
|
3月前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
64 0
|
6月前
|
消息中间件 安全 PHP
消息队列 MQ使用问题之如何获取PHP客户端代码
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
安全 网络性能优化
MQTT 客户端 MQTT.fx 使用说明
MQTT 客户端 MQTT.fx 使用说明
520 0
|
6月前
|
消息中间件 JavaScript Linux
消息队列 MQ操作报错合集之客户端在启动时遇到了连接错误,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。