深入理解 Netty编码流程及WriteAndFlush()的实现 (二)

简介: 深入理解 Netty编码流程及WriteAndFlush()的实现 (二)

第二波事件传递 flush()#


我们重新回到,AbstractChannel中,看他的第二波flush事件的传播状态, 源码如下:它也是主要做了下面的三件事

  • 添加刷新标志,设置写状态
  • 遍历buffer队列,过滤可以flush的buffer
  • 调用jdk底层的api,进行自旋写


// todo 最终传递到 这里
@Override
public final void flush() {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    // todo 添加刷新标志, 设置写状态
    outboundBuffer.addFlush();
    // todo  遍历buffer队列, 过滤byteBuf
    flush0();
}


添加刷新标志,设置写状态#


什么是添加刷新标志呢? 其实就是更改链表中的指针位置,三个指针之间的可以完美的把entry划分出曾经flush过的和未flush节点

ok,继续

下面看一下如何设置状态,addflush() 源码如下:


* todo 给 ChannelOutboundBuffer 添加缓存, 这意味着, 原来添加进 ChannelOutboundBuffer 中的所有 Entry, 全部会被标记为 flushed 过
 */
public void addFlush() {
// todo 默认让 entry 指向了 unflushedEntry ==> 其实链表中的最左边的 未被使用过的 entry
// todo
Entry entry = unflushedEntry;
if (entry != null) {
    if (flushedEntry == null) {
        // there is no flushedEntry yet, so start with the entry
        flushedEntry = entry;
    }
    do {
        flushed ++;
        if (!entry.promise.setUncancellable()) {
            // Was cancelled so make sure we free up memory and notify about the freed bytes
            int pending = entry.cancel();
            // todo 跟进这个方法
            decrementPendingOutboundBytes(pending, false, true);
        }
        entry = entry.next;
    } while (entry != null);
    // All flushed so reset unflushedEntry
    unflushedEntry = null;
}
}


目标是移动指针,改变每一个节点的状态, 哪一个指针呢? 是 flushedEntry, 它指向读被flush的节点,也就是说,它左边的,都被处理过了

下面的代码,是选出一开始位置, 因为, 如果flushedEntry == null,说明没有任何一个曾经被flush过的节点,于是就将开始的位置定位到最左边开始,


if (flushedEntry == null) {
    // there is no flushedEntry yet, so start with the entry
    flushedEntry = entry;
}


紧接着一个do-while循环,从最后一个被flushedEntry的地方,到尾部,挨个遍历每一个节点, 因为这些节点要被flush进缓存,我们需要把write时累加的他们的容量减掉, 源码如下


private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
        return;
    }
    // todo 每次 减去 -size
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    // todo   默认 getWriteBufferLowWaterMark() -32kb
    // todo   newWriteBufferSize<32 就把不可写状态改为可写状态
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
        setWritable(invokeLater);
    }
}


同样是使用原子类做到的这件事, 此外,经过减少的容量,如果小于了32kb就会传播 channel可写的事件


遍历buffer队列, 过滤byteBuf#


这是flush的重头戏,它实现了将数据写入socket的操作

我们跟进它的源码,doWrite(ChannelOutboundBuffer in) 这是本类AbstractChannel的抽象方法, 写如的逻辑方法,被设计成抽象的,具体往那个channel写,和具体的实现有关, 当前我们想往客户端写, 它的实现是AbstractNioByteChannel,我们进入它的实现,源码如下


boolean setOpWrite = false;
        // todo 整体是无限循环, 过滤ByteBuf
for (;;) {
    // todo 获取第一个 flushedEntity, 这个entity中 有我们需要的 byteBuf
    Object msg = in.current();
    if (msg == null) {
        // Wrote all messages.
        clearOpWrite();
        // Directly return here so incompleteWrite(...) is not called.
        return;
    }
    if (msg instanceof ByteBuf) {
        // todo 第三部分,jdk底层, 进行自旋的写
        ByteBuf buf = (ByteBuf) msg;
        int readableBytes = buf.readableBytes();
        if (readableBytes == 0) {
            // todo 当前的 ByteBuf 中,没有可写的, 直接remove掉
            in.remove();
            continue;
        }
        boolean done = false;
        long flushedAmount = 0;
        if (writeSpinCount == -1) {
            // todo 获取自旋锁, netty使用它进行
            writeSpinCount = config().getWriteSpinCount();
        }
        // todo 这个for循环是在自旋尝试往 jdk底层的 ByteBuf写入数据
        for (int i = writeSpinCount - 1; i >= 0; i --) {
            // todo  把 对应的 buf , 写到socket中
            // todo localFlushedAmount就是 本次 往jdk底层的 ByteBuffer 中写入了多少字节
            int localFlushedAmount = doWriteBytes(buf);
            if (localFlushedAmount == 0) {
                setOpWrite = true;
                break;
            }
            // todo 累加一共写了多少字节
            flushedAmount += localFlushedAmount;
            // todo 如果buf中的数据全部写完了, 设置完成的状态, 退出循环
            if (!buf.isReadable()) {
                done = true;
                break;
            }
        }
        in.progress(flushedAmount);
        // todo 自旋结束,写完了  done = true
        if (done) {
            // todo 跟进去
            in.remove();
        } else {
            // Break the loop and so incompleteWrite(...) is called.
            break;
        }
    ....


这一段代码也是非常长, 它的主要逻辑如下:

通过一个无限循环,保证可以拿到所有的节点上的ByteBuf,通过这个函数获取节点,Object msg = in.current();

我们进一步看它的实现,如下,它只会取出我们标记的节点


public Object current() {
        Entry entry = flushedEntry;
        if (entry == null) {
            return null;
        }
        return entry.msg;
    }


下一步, 使用jdk的自旋锁,循环16次,尝试往jdk底层的ByteBuffer中写数据, 调用函数doWriteBytes(buf);他是本类的抽象方法, 具体的实现是,客户端chanel的封装类NioSocketChannel实现的源码如下:


// todo
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
    final int expectedWrittenBytes = buf.readableBytes();
    // todo 将字节数据, 写入到 java 原生的 channel中
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}


这个readBytes()依然是抽象方法,因为前面我们曾经把从ByteBuf转化成了Dirct类型的, 所以它的实现类是PooledDirctByteBuf 继续跟进如下: 终于见到了亲切的一幕


// todo
    @Override
    public int readBytes(GatheringByteChannel out, int length) throws IOException {
        checkReadableBytes(length);
        //todo  关键的就是 getBytes()  跟进去
        int readBytes = getBytes(readerIndex, out, length, true);
        readerIndex += readBytes;
        return readBytes;
    }
    跟进getBytes(){
        index = idx(index);
        // todo 将netty 的 ByteBuf 塞进 jdk的    ByteBuffer tmpBuf;
        tmpBuf.clear().position(index).limit(index + length);
        // todo 调用jdk的write()方法
        return out.write(tmpBuf);
    }


此外,被使用过的节点会被remove()掉, 源码如下, 也是针对链表的操作#


private void removeEntry(Entry e) {
        if (-- flushed == 0) { // todo 如果是最后一个节点, 把所有的指针全部设为 null
            // processed everything
            flushedEntry = null;
            if (e == tailEntry) {
                tailEntry = null;
                unflushedEntry = null;
            }
        } else { //todo 如果 不是最后一个节点, 把当前节点,移动到最后的 节点
            flushedEntry = e.next;
        }
    }


小结#


到这里, 第二波任务的传播就完成了

write

  • 将buffer 转换成DirctBuffer
  • 将消息entry 插入写队列
  • 设置写状态

flush

  • 刷新标志,设置写状态
  • 变量buffer队列,过滤Buffer
  • 调用jdk底层的api,把ByteBuf写入jdk原生的ByteBuffer
相关文章
|
6月前
|
Java
【Netty 网络通信】Netty 工作流程分析
【1月更文挑战第9天】Netty 工作流程分析
|
6月前
|
移动开发 编解码 Java
Netty编码器和解码器
Netty从底层Java通道读到ByteBuf二进制数据,传入Netty通道的流水线,随后开始入站处理。在入站处理过程中,需要将ByteBuf二进制类型解码成Java POJO对象。这个解码过程可以通过Netty的Decoder解码器去完成。在出站处理过程中,业务处理后的结果需要从某个Java POJO对象编码为最终的ByteBuf二进制数据,然后通过底层 Java通道发送到对端。在编码过程中,需要用到Netty的Encoder编码器去完成数据的编码工作。
|
6月前
|
编解码 安全 前端开发
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
250 0
|
XML 存储 编解码
Netty入门到超神系列-Netty使用Protobuf编码解码
当我们的Netty客户端和服务端进行通信时数据在传输的过程中需要进行序列化,比如以二进制数据进行传输,那么我们的业务数据就需要有相应的编码器进行编码为二进制数据,当服务端拿到二进制数据后需要有相应的解码器进行解码得到真实的业务数据。
162 0
|
前端开发
Netty(八)之Netty服务端启动流程
Netty(八)之Netty服务端启动流程
139 0
Netty(八)之Netty服务端启动流程
|
存储 缓存 数据处理
Netty源码剖析之数据通信流程
NIO事件/感兴趣事件 OP_REGISTER = 0 通道注册事件 OP_READ = 1 << 0 OP_WRITE = 1 << 2 OP_CONNECT = 1 << 3 OP_ACCEPT = 1 << 4
|
编解码
Netty源码剖析之Netty启动流程
了解netty启动流程,有助于学习netty,进行自定义组件扩展
133 0
Netty源码剖析之NIOEventLoopGroup创建流程
Netty中事件循环机制非常重要,通过NIOEventLoopGroup可以了解到netty如何实现处理请求,如何实现事件监听处理,转发,有助于平时学习使用
121 0
Netty源码剖析之NIOEventLoopGroup创建流程
|
网络协议 前端开发 Java
Netty服务端启动流程分析
Netty服务端启动流程分析
172 0
|
编解码 缓存 Dubbo
Netty流程学习
连接完成之后,不能无所事事,此时应该会执行业务处理。也即此时可以看到上面的NettyServerHandler。因此可以看到dubbo的线程模型: 配置 Dubbo 中的线程模型 如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识,则直接在 IO 线程上处理更快,因为减少了线程池调度。 但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。 如果用 IO 线程处理事件,又在事件处理过程中发起新的 IO 请求,比如在连接事件中发起登录请求,会报“可能引发死锁”异常,但不会真死锁
118 0
Netty流程学习