设置写状态
- 统计当前有多少字节需要需要被写出
- 当前缓冲区中有多少待写字节
- 所以默认不能超过64k
- 自旋锁+CAS 操作,通过 pipeline 将事件传播到channelhandler 中监控
flush:刷新buffer队列
添加刷新标志并设置写状态
- 不管调用
channel.flush()
,还是ctx.flush()
,最终都会落地到pipeline
中的head
节点
- 之后进入到
AbstractUnsafe
- flush方法中,先调用
- 结合前面的图来看,上述过程即
首先拿到 unflushedEntry
指针,然后将flushedEntry
指向unflushedEntry
所指向的节点,调用完毕后
遍历 buffer 队列,过滤bytebuf
- 接下来,调用
flush0()
- 发现这里的核心代码就一个
doWrite
AbstractNioByteChannel
- 继续跟
protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { // 拿到第一个需要flush的节点的数据 Object msg = in.current(); if (msg instanceof ByteBuf) { boolean done = false; long flushedAmount = 0; // 拿到自旋锁迭代次数 if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } // 自旋,将当前节点写出 for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); // 写完之后,将当前节点删除 if (done) { in.remove(); } else { break; } } } }
- 第一步,调用
current()
先拿到第一个需要flush
的节点的数据
- 第二步,拿到自旋锁的迭代次数
- 第三步 调用 JDK 底层 API 进行自旋写
自旋的方式将ByteBuf
写到JDK NIO的Channel
强转为ByteBuf,若发现没有数据可读,直接删除该节点
- 拿到自旋锁迭代次数
- 在并发编程中使用自旋锁可以提高内存使用率和写的吞吐量,默认值为16
- 继续看源码
javaChannel()
,表明 JDK NIO Channel 已介入此次事件- 得到向JDK 底层已经写了多少字节
- 从 Netty 的 bytebuf 写到 JDK 底层的 bytebuffer
- 第四步,删除该节点
节点的数据已经写入完毕,接下来就需要删除该节点
首先拿到当前被flush
掉的节点(flushedEntry
所指)
然后拿到该节点的回调对象 ChannelPromise
, 调用 removeEntry()
移除该节点
这里是逻辑移除,只是将flushedEntry指针移到下个节点,调用后
随后,释放该节点数据的内存,调用safeSuccess
回调,用户代码可以在回调里面做一些记录,下面是一段Example
最后,调用 recycle
,将当前节点回收
writeAndFlush - 写队列并刷新
writeAndFlush在某个Handler
中被调用后,最终会落到 TailContext
节点
通过一个boolean变量flush,表明调用invokeWriteAndFlush
or invokeWrite
,invokeWrite
便是我们上文中的write过程。
可以看到,最终调用的底层方法和单独调用write
和flush
一样的
由此看来,invokeWriteAndFlush
基本等价于write
之后再来一次flush
。
总结
- 调用write并没有将数据写到Socket缓冲区中,而是写到了一个单向链表的数据结构中,flush才是真正的写出
- writeAndFlush等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到Socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功
- netty中的缓冲区中的ByteBuf为DirectByteBuf
如何把对象变成字节流,最终写到socket底层?
当 BizHandler 通过 writeAndFlush 方法将自定义对象往前传播时,其实可以拆分成两个过程
- 通过 pipeline逐渐往前传播,传播到其中的一个 encode 节点后,其负责重写 write 方法将自定义的对象转化为 ByteBuf,接着继续调用 write 向前传播
- pipeline中的编码器原理是创建一个ByteBuf,将Java对象转换为ByteBuf,然后再把ByteBuf继续向前传递,若没有再重写了,最终会传播到 head 节点,其中缓冲区列表拿到缓存写到 JDK 底层 ByteBuffer