Channel

简介: 提起Channel,JDK的NIO类库的重要组成部分,就是提供了java.nio.SocketChannel和java.nio.ServerSocketChannel,用于非阻塞的I/O操作。 类似于NIO的Channel,Netty提供了自己的Channel和其子类实现,用于异步I/O操作和其他相关的操作。

提起Channel,JDK的NIO类库的重要组成部分,就是提供了java.nio.SocketChannel和java.nio.ServerSocketChannel,用于非阻塞的I/O操作。

类似于NIO的Channel,Netty提供了自己的Channel和其子类实现,用于异步I/O操作和其他相关的操作。Unsafe是个内部接口,聚合在Channel中协助进行网络读写相关的操作,因为它的设计初衷就是Channel的内部辅助类,不应该被Netty框架的上层使用者调用,所以被命名为Unsafe。这里不能仅从字面理解认为它是不安全的操作,而要从整个架构的设计层面体会它的设计初衷和职责。

Channel 功能说明

io.netty.channel.Channel是Netty网络操作抽象类,它聚合了一组功能,包括但不限于网路的读、写,客户端发起连接、主动关闭连接,链路关闭,获取通信双方的网络地址等。它也包含了Netty框架相关的一些功能,包括获取该Chanel的EventLoop,获取缓冲分配器ByteBufAllocator和pipeline等。 

Channel的工作原理

Channel是Netty抽象出来的网络I/O读写相关的接口,为什么不使用JDK NIO 原生的Channel而要另起炉灶呢,主要原因如下:

(1)JDK的SocketChannel和ServerSocketChannel没有统一的Channel接口供业务开发者使用,对于用户而言,没有统一的操作视图,使用起来并不方便。

(2)JDK的SocketChannel和ServerSocketChannel的主要职责就是网络I/O操作,由于它们是SPI类接口,由具体的虚拟机厂家来提供,所以通过继承SPI功能类来扩展其功能的难度很大;直接实现ServerSocketChannel和SocketChannel抽象类,其工作量和重新开发一个新的Channel功能类是差不多的。

(3)Netty的Channel需要能够跟Netty的整体架构融合在一起,例如I/O模型、基于ChannelPipeline的定制模型,以及基于元数据描述配置化的TCP参数等,这些JDK的SocketChannel和ServerSocketChannel都没有提供,需要重新封装。

(4)自定义的Channel,功能实现更加灵活。

基于上述4个原因,Netty重新设计了Channel接口,并且给予了很多不同的实现。它的设计原理比较简单,但是功能却比较繁杂,主要的设计理念如下。

(1)在Channel接口层,采用Facade模式进行统一封装,将网络I/O操作、网络I/O相关联的其他操作封装起来,统一对外提供。

(2)Channel接口的定义尽量大而全,为SocketChannel和ServerSocketChannel提供统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度上实现功能和接口的重用。

(3)具体实现采用聚合而非包含的方式,将相关的功能类聚合在Channel中,由Channel统一负责分配和调度,功能实现更加灵活。

Channel的功能介绍

Channel的功能比较繁杂,通过分类的方式对它的主要功能进行介绍。

1.网络I/O操作

读写相关的API列表。

(1)Channel read():从当前的Channel中读取数据到第一个inbound缓冲区中,如果数据被成功读取,触发ChannelHandler.channelRead(ChannelHandlerContext, Object)事件,读取操作API调用完成之后,紧接着会触发ChannelHandler.channelReadComplete (Channel HandlerContext)事件,这样业务的ChannelHandler可以决定是否需要继续读取数据。如果已经有读操作请求被挂起,则后续的读操作会被忽略。

(2)ChannelFuture write(Object msg):请求将当前的msg通过ChannelPipeline写入到目标Channel中。注意,write操作只是将消息存入到消息发送环形数组中,并没有真正被发送,只有调用flush操作才会被写入到Channel中,发送给对方。

(3)ChannelFuture write(Object msg, ChannelPromise promise):功能与write(Object msg)相同,但是携带了ChannelPromise参数负责设置写入操作的结果。

(4)ChannelFuture writeAndFlush(Object msg, ChannelPromise promise):与方法(3)功能类似,不同之处在于它会将消息写入到Channel中发送,等价于单独调用write和flush操作的组合。

(5)ChannelFuture writeAndFlush(Object msg):功能等同于方法(4),但是没有携带writeAndFlush(Object msg)参数。

(6)Channel flush():将之前写入到发送环形数组中的消息全部写入到目标Chanel中,发送给通信对方。

(7)ChannelFuture close(ChannelPromise promise):主动关闭当前连接,通过ChannelPromise设置操作结果并进行结果通知,无论操作是否成功,都可以通过ChannelPromise获取操作结果。该操作会级联触发ChannelPipeline中所有ChannelHandler的ChannelHandler.close(ChannelHandlerContext, ChannelPromise)事件。

(8)ChannelFuture disconnect(ChannelPromise promise):请求断开与远程通信对端的连接并使用ChannelPromise来获取操作结果的通知消息。该方法会级联触发ChannelHandler.disconnect(ChannelHandlerContext, ChannelPromise)事件。

(9)ChannelFuture connect(SocketAddress remoteAddress):客户端使用指定的服务端地址remoteAddress发起连接请求,如果连接因为应答超时而失败,ChannelFuture中的操作结果就是ConnectTimeoutException异常,如果连接被拒绝,操作结果为ConnectException。该方法会级联触发ChannelHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)事件。

(10)ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress):与方法9)功能类似,唯一不同的就是先绑定指定的本地地址localAddress,然后再连接服务端。

(11)ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise):与方法9)功能类似,唯一不同的是携带了ChannelPromise参数用于写入操作结果。

(12)connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise):与方法(11)功能类似,唯一不同的就是绑定了本地地址。

(13)ChannelFuture bind(SocketAddress localAddress):绑定指定的本地Socket地址localAddress,该方法会级联触发ChannelHandler.bind(ChannelHandlerContext, SocketAddress, ChannelPromise)事件。

(14)ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise):与方法13)功能类似,多携带了了一个ChannelPromise用于写入操作结果。

(15)ChannelConfig config():获取当前Channel的配置信息,例如CONNECT_ TIMEOUT_MILLIS。

(16)boolean isOpen():判断当前Channel是否已经打开。

(17)boolean isRegistered():判断当前Channel是否已经注册到EventLoop上。

(18)boolean isActive():判断当前Channel是否已经处于激活状态。

(19)ChannelMetadata metadata():获取当前Channel的元数据描述信息,包括TCP参数配置等。

(20)SocketAddress localAddress():获取当前Channel的本地绑定地址。

(21)SocketAddress remoteAddress():获取当前Channel通信的远程Socket地址。

2.其他常用的API功能说明

第一个比较重要的方法是eventLoop()。Channel需要注册到EventLoop的多路复用器上,用于处理I/O事件,通过eventLoop()方法可以获取到Channel注册的EventLoop。EventLoop本质上就是处理网络读写事件的Reactor线程。在Netty中,它不仅仅用来处理网络事件,也可以用来执行定时任务和用户自定义NioTask等任务。

第二个比较常用的方法是metadata()方法。熟悉TCP协议的可能知道,当创建Socket的时候需要指定TCP参数,例如接收和发送的TCP缓冲区大小,TCP的超时时间,是否重用地址等等。在Netty中,每个Channel对应一个物理连接,每个连接都有自己的TCP参数配置。所以,Channel会聚合一个ChannelMetadata用来对TCP参数提供元数据描述信息,通过metadata()方法就可以获取当前Channel的TCP参数配置。

第三个方法是parent()。对于服务端Channel而言,它的父Channel为空;对于客户端Channel,它的父Channel就是创建它的ServerSocketChannel。

第四个方法是用户获取Channel标识的id(),它返回ChannelId对象,ChannelId是Channel的唯一标识,它的可能生成策略如下:

(1)机器的MAC地址(EUI-48或者EUI-64)等可以代表全局唯一的信息;

(2)当前的进程ID;

(3)当前系统时间的毫秒——System.currentTimeMillis();

(4)当前系统时间纳秒数——System.nanoTime();

(5)32位的随机整型数;

(6)32位自增的序列数。

Channel的继承类介绍

Channel的实现子类非常多,继承关系复杂,从学习的角度我们抽取最重要的两个——Channel-io.netty.channel.socket.nio.NioServerSocketChannel和io.netty.channel.socket.nio.NioSocketChannel进行重点分析。

服务端NioServerSocketChannel的继承关系类图如图:

客户端NioSocketChannel的继承关系类图如图:

AbstractChannel源码分析

首先定义了两个静态全局异常。

CLOSED_CHANNEL_EXCEPTION:链路已经关闭已经异常;

NOT_YET_CONNECTED_EXCEPTION:物理链路尚未建立异常。

声明完上述两个异常之后,通过静态块将它们的堆栈设置为空的StackTraceElement。

estimatorHandle用于预测下一个报文的大小,它基于之前数据的采样进行分析预测。

AbstractChannel采用聚合的方式封装各种功能,从成员变量的定义可以看出,它聚合了以下内容。

parent:代表父类Channel;

id:采用默认方式生成的全局唯一ID;

unsafe:Unsafe实例;

pipeline:当前Channel对应的DefaultChannelPipeline;

eventLoop:当前Channel注册的EventLoop;

……

在此不一一枚举。通过变量定义可以看出,AbstractChannel聚合了所有Channel使用到的能力对象,由AbstractChannel提供初始化和统一封装,如果功能和子类强相关,则定义成抽象方法由子类具体实现

网络I/O操作时讲到它会触发ChannelPipeline中对应的事件方法。Netty基于事件驱动,我们也可以理解为当Channel进行I/O操作时会产生对应的I/O事件,然后驱动事件在ChannelPipeline中传播,由对应的ChannelHandler对事件进行拦截和处理,不关心的事件可以直接忽略。采用事件驱动的方式可以非常轻松地通过事件定义来划分事件拦截切面,方便业务的定制和功能扩展,相比AOP,其性能更高,但是功能却基本等价。

网络I/O操作直接调用DefaultChannelPipeline的相关方法,由DefaultChannelPipeline中对应的ChannelHandler进行具体的逻辑处理。

AbstractNioChannel源码分析 

    private final SelectableChannel ch;
    protected final int readInterestOp;
    private volatile SelectionKey selectionKey;
    private volatile boolean inputShutdown;

    private ChannelPromise connectPromise;
    private ScheduledFuture<?> connectTimeoutFuture;
    private SocketAddress requestedRemoteAddress;

    protected AbstractNioChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
        super(parent, eventLoop);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

由于NIOChannel、NioSocketChannel和NioServerSocketChannel需要共用,所以定义了一个java.nio.SocketChannel和java.nio.ServerSocketChannel的公共父类SelectableChannel,用于设置SelectableChannel参数和进行I/O操作。

第二个参数是readInterestOp,它代表了JDK SelectionKey的OP_READ。

随后定义了一个volatile修饰的SelectionKey,该SelectionKey是Channel注册到EventLoop后返回的选择键。由于Channel会面临多个业务线程的并发写操作,当SelectionKey由SelectionKey修改之后,为了能让其他业务线程感知到变化,所以需要使用volatile保证修改的可见性。

最后定义了代表连接操作结果的ChannelPromise以及连接超时定时器ScheduledFuture和请求的通信地址信息。

在AbstractNioChannel实现的主要API:

(1)首先是Channel的注册

注册Channel的时候需要指定监听的网络操作位来表示Channel对哪几类网络事件感兴趣,具体的定义如下。

public static final int OP_READ = 1 << 0:读操作位;

public static final int OP_WRITE = 1 << 2:写操作位;

public static final int OP_CONNECT = 1 << 3:客户端连接服务端操作位;

public static final int OP_ACCEPT = 1 << 4:服务端接收客户端连接操作位。

AbstractNioChannel注册的是0,说明对任何事件都不感兴趣,仅仅完成注册操作。注册的时候可以指定附件,后续Channel接收到网络事件通知时可以从SelectionKey中重新获取之前的附件进行处理,此处将AbstractNioChannel的实现子类自身当作附件注册。如果注册Channel成功,则返回selectionKey,通过selectionKey可以从多路复用器中获取Channel对象。

如果当前注册返回的selectionKey已经被取消,则抛出CancelledKeyException异常,捕获该异常进行处理。如果是第一次处理该异常,调用多路复用器的selectNow()方法将已经取消的selectionKey从多路复用器中删除掉。操作成功之后,将selected置为true,说明之前失效的selectionKey已经被删除掉。继续发起下一次注册操作,如果成功则退出,如果仍然发生CancelledKeyException异常,说明我们无法删除已经被取消的selectionKey,按照JDK的API说明,这种意外不应该发生。如果发生这种问题,则说明可能NIO的相关类库存在不可恢复的BUG,直接抛出CancelledKeyException异常到上层进行统一处理。

(2)准备处理读操作之前需要设置网路操作位为读  doBeginRead

先判断下Channel是否关闭,如果处于关闭中,则直接返回。获取当前的SelectionKey进行判断,如果可用,说明Channel当前状态正常,则可以进行正常的操作位修改。将SelectionKey当前的操作位与读操作位进行按位与操作,如果等于0,说明目前并没有设置读操作位,通过interestOps | readInterestOp设置读操作位,最后调用selectionKey的interestOps方法重新设置通道的网络操作位,这样就可以监听网络的读事件了。实际上,对于读操作位的判断和修改与JDK NIO SelectionKey的相关方法实现是等价的。

AbstractNioByteChannel源码分析 

成员变量只有一个Runnable类型的flushTask来负责继续写半包消息。

最主要的方法就是doWrite(ChannelOutboundBuffer in),下面一起看看它的实现,由于该方法过长,所以我们按照其逻辑进行拆分介绍。 

从发送消息环形数组ChannelOutboundBuffer弹出一条消息,判断该消息是否为空,如果为空,说明消息发送数组中所有待发送的消息都已经发送完成,清除半包标识,然后退出循环。清除半包标识的clearOpWrite方法实现。

从当前SelectionKey中获取网络操作位,然后与SelectionKey.OP_WRITE做按位与,如果不等于0,说明当前的SelectionKey是isWritable的,需要清除写操作位。清除方法很简单,就是SelectionKey.OP_WRITE取非之后与原操作位做按位与操作,清除SelectionKey的写操作位。

如果需要发送的消息不为空,则继续处理。调用doWriteBytes进行消息发送,不同的Channel子类有不同的实现,因此它是抽象方法。如果本次发送的字节数为0,说明发送TCP缓冲区已满,发生了ZERO_WINDOW。此时再次发送仍然可能出现写0字节,空循环会占用CPU的资源,导致I/O线程无法处理其他I/O操作,所以将写半包标识setOpWrite设置为true,退出循环,释放I/O线程。如果发送的字节数大于0,则对发送总数进行计数。判断当前消息是否已经发送成功(缓冲区没有可读字节),如果发送成功则设置done为true,退出当前循环。消息发送操作完成之后调用ChannelOutboundBuffer更新发送进度信息,然后对发送结果进行判断。如果发送成功,则将已经发送的消息从发送数组中删除;否则调用incompleteWrite方法,设置写半包标识,启动刷新线程继续发送之前没有发送完全的半包消息(写半包)。  

处理半包发送任务的方法incompleteWrite的实现,首先判断是否需要设置写半包标识,如果需要则调用setOpWrite设置写半包标识,设置写半包标识就是将SelectionKey设置成可写的,通过原操作位与SelectionKey.OP_WRITE做按位或操作即可实现。如果SelectionKey的OP_WRITE被设置,多路复用器会不断轮询对应的Channel用于处理没有发送完成的半包消息,直到清除SelectionKey的OP_WRITE操作位。因此,设置了OP_WRITE操作位后,就不需要启动独立的Runnable来负责发送半包消息了。如果没有设置OP_WRITE操作位,需要启动独立的Runnable,将其加入到EventLoop中执行,由Runnable负责半包消息的发送,它的实现很简单,就是调用flush()方法来发送缓冲数组中的消息。

消息发送的另一个分支是文件传输,它的实现原理与ByteBuf类似。

AbstractNioMessageChannel源码分析 

它的主要实现方法只有一个:doWrite(ChannelOutboundBuffer in)

    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        final SelectionKey key = selectionKey();
        final int interestOps = key.interestOps();

        for (;;) {
            Object msg = in.current();
            if (msg == null) {
                // Wrote all messages.
                if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                    key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
                }
                break;
            }

            boolean done = false;
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                if (doWriteMessage(msg, in)) {
                    done = true;
                    break;
                }
            }

            if (done) {
                in.remove();
            } else {
                // Did not write all messages.
                if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                    key.interestOps(interestOps | SelectionKey.OP_WRITE);
                }
                break;
            }
        }
    }

在循环体内对消息进行发送,从ChannelOutboundBuffer中弹出一条消息进行处理,如果消息为空,说明发送缓冲区为空,所有消息都已经被发送完成。清除写半包标识,退出循环。

与AbstractNioByteChannel的循环发送类似,利用writeSpinCount对单条消息进行发送,调用doWriteMessage(Object msg, ChannelOutboundBuffer in)判断消息是否发送成功,如果成功,则将发送标识done设置为true,退出循环;否则继续执行循环,直到执行writeSpinCount次。

发送操作完成之后,判断发送结果,如果当前的消息被完全发送出去,则将该消息从缓冲数组中删除;否则设置半包标识,注册SelectionKey.OP_WRITE到多路复用器上,由多路复用器轮询对应的Channel重新发送尚未发送完全的半包消息。

AbstractNioMessageChannel和AbstractNioByteChannel的消息发送实现比较相似,不同之处在于:一个发送的是ByteBuf或者FileRegion,它们可以直接被发送;另一个发送的则是POJO对象。

AbstractNioMessageServerChannel源码分析

AbstractNioMessageServerChannel的实现非常简单,它定义了一个EventLoopGroup类型的childGroup,用于给新接入的客户端NioSocketChannel分配EventLoop。

public abstract class AbstractNioMessageServerChannel extends AbstractNioMessageChannel implements ServerChannel {

    private final EventLoopGroup childGroup;

    protected AbstractNioMessageServerChannel(
            Channel parent, EventLoop eventLoop, EventLoopGroup childGroup, SelectableChannel ch, int readInterestOp) {
        super(parent, eventLoop, ch, readInterestOp);
        this.childGroup = childGroup;
    }

    @Override
    public EventLoopGroup childEventLoopGroup() {
        return childGroup;
    }
}

每当服务端接入一个新的客户端连接NioSocketChannel时,都会调用childEventLoopGroup方法获取EventLoopGroup线程组,用于给NioSocketChannel分配Reactor线程EventLoop。

    NioServerSocketChannel

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

NioServerSocketChannel源码分析

首先创建了静态的ChannelMetadata成员变量,然后定义了ServerSocketChannelConfig用于配置ServerSocketChannel的TCP参数。静态的newSocket方法用于通过ServerSocketChannel的open打开新的ServerSocketChannel通道。

ServerSocketChannel相关的接口实现:isActive、remoteAddress、javaChannel和doBind,基本都委托给了javaChannel。

通过java.net.ServerSocket的isBound方法判断服务端监听端口是否处于绑定状态,它的remoteAddress为空。javaChannel的实现是java.nio.ServerSocketChannel,服务端在进行端口绑定的时候,可以指定backlog,也就是允许客户端排队的最大长度。

服务端Channel的doReadMessages(List<Object> buf)的实现,代码见上面:

首先通过ServerSocketChannel的accept接收新的客户端连接,如果SocketChannel不为空,则利用当前的NioServerSocketChannel、EventLoop和SocketChannel创建新的NioSocketChannel,并将其加入到List<Object> buf中,最后返回1,表示服务端消息读取成功。

对于NioServerSocketChannel,它的读取操作就是接收客户端的连接,创建NioSocketChannel对象。

看下与服务端Channel无关的接口定义,由于这些方法是客户端Channel相关的,因此,对于服务端Channel无须实现,如果这些方法被误调,则返回UnsupportedOperation Exception异常。

    // Unnecessary stuff
    @Override
    protected boolean doConnect(
            SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        throw new UnsupportedOperationException();
    }
    @Override
    protected void doFinishConnect() throws Exception {
        throw new UnsupportedOperationException();
    }
    @Override
    protected SocketAddress remoteAddress0() {
        return null;
    }
    @Override
    protected void doDisconnect() throws Exception {
        throw new UnsupportedOperationException();
    }
    @Override
    protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
        throw new UnsupportedOperationException();
    }

NioSocketChannel源码分析

1.连接操作

判断本地Socket地址是否为空,如果不为空则调用java.nio.channels.SocketChannel.socket().bind()方法绑定本地地址。如果绑定成功,则继续调用java.nio.channels.SocketChannel.connect(SocketAddress remote)发起TCP连接。对连接结果进行判断,连接结果有以下三种可能。

(1)连接成功,返回true;

(2)暂时没有连接上,服务端没有返回ACK应答,连接结果不确定,返回false;

(3)连接失败,直接抛出I/O异常。

如果是结果(2),需要将NioSocketChannel中的selectionKey设置为OP_CONNECT,监听连接网络操作位。如果抛出了I/O异常,说明客户端的TCP握手请求直接被REST或者被拒绝,此时需要关闭客户端连接。

    @Override
    protected void doClose() throws Exception {
        javaChannel().close();
    }

2.写半包

分析完连接操作之后,继续分析写操作,由于它的实现比较复杂,所以仍然需要将其拆分后分段进行分析

获取待发送的ByteBuf个数,如果小于等于1,则调用父类AbstractNioByteChannel的doWrite方法,操作完成之后退出。

在批量发送缓冲区的消息之前,先对一系列的局部变量进行赋值,首先,获取需要发送的ByteBuffer数组个数nioBufferCnt,然后,从ChannelOutboundBuffer中获取需要发送的总字节数,从NioSocketChannel中获取NIO的SocketChannel,将是否发送完成标识设置为false,将是否有写半包标识设置为false。如图16-30所示。

就像循环读一样,我们需要对一次Selector轮询的写操作次数进行上限控制,因为如果TCP的发送缓冲区满,TCP处于KEEP-ALIVE状态,消息会无法发送出去,如果不对上限进行控制,就会长时间地处于发送状态,Reactor线程无法及时读取其他消息和执行排队的Task。所以,我们必须对循环次数上限做控制。

调用NIOSocketChannel的write方法,它有三个参数:第一个是需要发送的ByteBuffer数组,第二个是数组的偏移量,第三个参数是发送的ByteBuffer个数。返回值是写入SocketChannel的字节个数。

对写入的字节进行判断,如果为0,说明TCP发送缓冲区已满,很有可能无法再写进去,因此从循环中跳出,同时将写半包标识设置为true,用于向多路复用器注册写操作位,告诉多路复用器有没发完的半包消息,需要轮询出就绪的SocketChannel继续发送。

发送操作完成后进行两个计算:需要发送的字节数要减去已经发送的字节数;发送的字节总数+已经发送的字节数。更新完这两个变量后,判断缓冲区中所有的消息是否已经发送完成,如果是,则把发送完成标识设置为true同时退出循环。如果没有发送完成,则继续循环。从循环发送中退出之后,首先对发送完成标识done进行判断,如果发送完成,则循环释放已经发送的消息。环形数组的发送缓冲区释放完成后,取消半包标识,告诉多路复用器消息已经全部发送完成。

当缓冲区中的消息没有发送完成,甚至某个ByteBuffer只发送了几个字节,出现了所谓的“写半包”时,该怎么办?下面我们继续看看Netty是如何处理“写半包”的。

首先,循环遍历发送缓冲区,对消息的发送结果进行判断:

(1)从ChannelOutboundBuffer弹出第一条发送的ByteBuf,然后获取该ByteBuf的读索引和可读字节数。

(2)对可读字节数和发送的总字节数进行比较,如果发送的字节数大于可读的字节数,说明当前的ByteBuf已经被完全发送出去,更新ChannelOutboundBuffer的发送进度信息,将已经发送的ByteBuf删除,释放相关资源。最后,发送的字节数要减去第一条发送的字节数,得到后续消息发送的总字节数,然后继续循环判断第二条消息、第三条消息……

(3)如果可读的消息大于已经发送的总字节数,说明这条消息没有被完整地发送出去,仅仅发送了部分数据报,也就是出现了所谓的“写半包”问题。此时,需要更新可读的索引为当前索引 + 已经发送的总字节数,然后更新ChannelOutboundBuffer的发送进度信息,退出循环。

(4)如果可读字节数等于已经发送的总字节数,则说明最后一次发送的消息是个整包消息,没有剩余的半包消息待发送。更新发送进度信息,将最后一条已发送的消息从缓冲区中删除,最后退出循环。

循环发送操作完成之后,更新SocketChannel的操作位为OP_WRITE,由多路复用器在下一次轮询中触发SocketChannel,继续处理没有发送完成的半包消息。

3.读写操作

NioSocketChannel的读写操作实际上是基于NIO的SocketChannel和Netty的ByteBuf封装而成。

下面我们首先分析从SocketChannel中读取数据报,

它有两个参数,说明如下。

java.nio.channels.SocketChannel:JDK NIO的SocketChannel;

length:ByteBuf的可写最大字节数。

实际上就是从SocketChannel中读取L个字节到ByteBuf中,L为ByteBuf可写的字节数。

下面我们看下ByteBuf writeBytes方法的实现,

首先分析setBytes(int index, ScatteringByteChannel in, int length)在UnpooledHeapByteBuf中的实现

从SocketChannel中读取字节数组到缓冲区java.nio.ByteBuffer中,它的起始position为writeIndex,limit为writeIndex+length。

 

目录
相关文章
|
4月前
|
Cloud Native Java Go
这些 channel 用法你都用起来了吗?
这些 channel 用法你都用起来了吗?
|
7月前
3.2 两个 Channel 传输数据
3.2 两个 Channel 传输数据
45 0
|
9月前
Netty之Channel解读
Netty之Channel解读
|
9月前
|
消息中间件 数据库
Rabbmit channel.QueueDeclare参数初识
接触MQ易经有一段时间了,对QueueDeclare一直没有一个全面的认识。
119 0
|
11月前
|
域名解析 消息中间件 Kubernetes
【消息队列】解决ERR 1 [topic/channel] (: no such host
【消息队列】解决ERR 1 [topic/channel] (: no such host
239 0
|
Go 调度
一文初探 Goroutine 与 channel
哈喽大家好,我是陈明勇,本文介绍的内容是 Go 并发模块的两个重要角色 → goroutine 与 channel。如果本文对你有帮助,不妨点个赞,如果你是 Go 语言初学者,不妨点个关注,一起成长一起进步,如果本文有错误的地方,欢迎指出!
12362 0
一文初探 Goroutine 与 channel
|
Java API
NIO学习三-Channel
在学习NIO时,ByteBuffer、Channel、Selector三个组件是必须了解的。前面我们说到ByteBuffer是作为缓冲区进行数据的存放或者获取。通常我们需要进行flip翻转操作,但是这个在Netty中,有一个更为强大的类可以替代ByteBuf,其不需要进行翻转,也可以进行读写的双向操作。要将数据打包到缓冲区中,通常需要使用通道,而通道作为传输数据的载体,也即它可以使数据从一端到另一端,因此就必须进行了解。 Channel中,我们也看到其子类有很多,通常都是用于读写操作的。其中ByteChannel可以进行读写操作,也即可以进行双向操作。 操作过程:首先创建流对象,有了流对象获取
62 0
NIO学习三-Channel
|
程序员 Go
什么时候用Goroutine?什么时候用Channel?
通过全局变量加锁同步来实现通讯,并不利于多个协程对全局变量的读写操作。 加锁虽然可以解决goroutine对全局变量的抢占资源问题,但是影响性能,违背了原则。 总结:为了解决上述的问题,我们可以引入channel,使用channel进行协程goroutine间的通信。
|
设计模式 缓存 网络协议
Netty4 Channel 概述(通道篇)
Netty4 Channel 概述(通道篇)
Netty4 Channel 概述(通道篇)
|
前端开发 数据处理
netty系列之:channel和channelGroup
netty系列之:channel和channelGroup