第二波事件传递 flush()
#
我们重新回到,AbstractChannel
中,看他的第二波flush事件的传播状态, 源码如下:它也是主要做了下面的三件事
- 添加刷新标志,设置写状态
- 遍历buffer队列,过滤可以flush的buffer
- 调用jdk底层的api,进行自旋写
// todo 最终传递到 这里 @Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } // todo 添加刷新标志, 设置写状态 outboundBuffer.addFlush(); // todo 遍历buffer队列, 过滤byteBuf flush0(); }
添加刷新标志,设置写状态#
什么是添加刷新标志呢? 其实就是更改链表中的指针位置,三个指针之间的可以完美的把entry
划分出曾经flush过的和未flush节点
ok,继续
下面看一下如何设置状态,addflush() 源码如下:
* todo 给 ChannelOutboundBuffer 添加缓存, 这意味着, 原来添加进 ChannelOutboundBuffer 中的所有 Entry, 全部会被标记为 flushed 过 */ public void addFlush() { // todo 默认让 entry 指向了 unflushedEntry ==> 其实链表中的最左边的 未被使用过的 entry // todo Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); // todo 跟进这个方法 decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry unflushedEntry = null; } }
目标是移动指针,改变每一个节点的状态, 哪一个指针呢? 是 flushedEntry
, 它指向读被flush的节点,也就是说,它左边的,都被处理过了
下面的代码,是选出一开始位置, 因为, 如果flushedEntry == null,说明没有任何一个曾经被flush过的节点,于是就将开始的位置定位到最左边开始,
if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; }
紧接着一个do-while循环,从最后一个被flushedEntry
的地方,到尾部,挨个遍历每一个节点, 因为这些节点要被flush进缓存,我们需要把write时累加的他们的容量减掉, 源码如下
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } // todo 每次 减去 -size long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); // todo 默认 getWriteBufferLowWaterMark() -32kb // todo newWriteBufferSize<32 就把不可写状态改为可写状态 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); } }
同样是使用原子类做到的这件事, 此外,经过减少的容量,如果小于了32kb就会传播 channel可写的事件
遍历buffer队列, 过滤byteBuf#
这是flush的重头戏,它实现了将数据写入socket的操作
我们跟进它的源码,doWrite(ChannelOutboundBuffer in)
这是本类AbstractChannel
的抽象方法, 写如的逻辑方法,被设计成抽象的,具体往那个channel写,和具体的实现有关, 当前我们想往客户端写, 它的实现是AbstractNioByteChannel
,我们进入它的实现,源码如下
boolean setOpWrite = false; // todo 整体是无限循环, 过滤ByteBuf for (;;) { // todo 获取第一个 flushedEntity, 这个entity中 有我们需要的 byteBuf Object msg = in.current(); if (msg == null) { // Wrote all messages. clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } if (msg instanceof ByteBuf) { // todo 第三部分,jdk底层, 进行自旋的写 ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); if (readableBytes == 0) { // todo 当前的 ByteBuf 中,没有可写的, 直接remove掉 in.remove(); continue; } boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { // todo 获取自旋锁, netty使用它进行 writeSpinCount = config().getWriteSpinCount(); } // todo 这个for循环是在自旋尝试往 jdk底层的 ByteBuf写入数据 for (int i = writeSpinCount - 1; i >= 0; i --) { // todo 把 对应的 buf , 写到socket中 // todo localFlushedAmount就是 本次 往jdk底层的 ByteBuffer 中写入了多少字节 int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } // todo 累加一共写了多少字节 flushedAmount += localFlushedAmount; // todo 如果buf中的数据全部写完了, 设置完成的状态, 退出循环 if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); // todo 自旋结束,写完了 done = true if (done) { // todo 跟进去 in.remove(); } else { // Break the loop and so incompleteWrite(...) is called. break; } ....
这一段代码也是非常长, 它的主要逻辑如下:
通过一个无限循环,保证可以拿到所有的节点上的ByteBuf
,通过这个函数获取节点,Object msg = in.current();
我们进一步看它的实现,如下,它只会取出我们标记的节点
public Object current() { Entry entry = flushedEntry; if (entry == null) { return null; } return entry.msg; }
下一步, 使用jdk的自旋锁,循环16次,尝试往jdk底层的ByteBuffer中写数据, 调用函数doWriteBytes(buf);
他是本类的抽象方法, 具体的实现是,客户端chanel的封装类NioSocketChannel
实现的源码如下:
// todo @Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); // todo 将字节数据, 写入到 java 原生的 channel中 return buf.readBytes(javaChannel(), expectedWrittenBytes); }
这个readBytes()
依然是抽象方法,因为前面我们曾经把从ByteBuf
转化成了Dirct类型的, 所以它的实现类是PooledDirctByteBuf
继续跟进如下: 终于见到了亲切的一幕
// todo @Override public int readBytes(GatheringByteChannel out, int length) throws IOException { checkReadableBytes(length); //todo 关键的就是 getBytes() 跟进去 int readBytes = getBytes(readerIndex, out, length, true); readerIndex += readBytes; return readBytes; } 跟进getBytes(){ index = idx(index); // todo 将netty 的 ByteBuf 塞进 jdk的 ByteBuffer tmpBuf; tmpBuf.clear().position(index).limit(index + length); // todo 调用jdk的write()方法 return out.write(tmpBuf); }
此外,被使用过的节点会被remove()掉, 源码如下, 也是针对链表的操作#
private void removeEntry(Entry e) { if (-- flushed == 0) { // todo 如果是最后一个节点, 把所有的指针全部设为 null // processed everything flushedEntry = null; if (e == tailEntry) { tailEntry = null; unflushedEntry = null; } } else { //todo 如果 不是最后一个节点, 把当前节点,移动到最后的 节点 flushedEntry = e.next; } }
小结#
到这里, 第二波任务的传播就完成了
write
- 将buffer 转换成DirctBuffer
- 将消息entry 插入写队列
- 设置写状态
flush
- 刷新标志,设置写状态
- 变量buffer队列,过滤Buffer
- 调用jdk底层的api,把ByteBuf写入jdk原生的
ByteBuffer