编码器的执行时机#
首先, 我们想通过服务端,往客户端发送数据, 通常我们会调用ctx.writeAndFlush(数据)
的方式, 入参位置的数据可能是基本数据类型,也可能对象
其次,编码器同样属于handler,只不过他是特化的专门用于编码作用的handler, 在我们的消息真正写入jdk底层的ByteBuffer时前,数据需要经过编码处理, 不是说不进行编码就发送不出去,而是不经过编码,客户端可能接受到的是乱码
然后,我们知道,ctx.writeAndFlush(数据)
它其实是出站处理器特有的行为,因此注定了它需要在pipeline中进行传递,从哪里进行传递呢? 从tail节点开始,一直传播到header之前的我们自己添加的自定义的解码器
中
WriteAndFlush()
的逻辑#
我们跟进源码WriteAndFlush()
相对于Write()
,它的flush字段是true
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { //todo 因为flush 为 true next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); }
于是就会这样
- 逐个调用handler的
write()
- 逐个调用handler的
flush()
知道这一点很重要,这意味这我们知道了,事件传播分成两波进行, 一波write,一波flush, 这两波事件传播的大体流程我写在这里, 在下面
write
- 将ByteBuf 转换成DirctBuffer
- 将消息(DirctBuffer)封装进entry 插入写队列
- 设置写状态
flush
- 刷新标志,设置写状态
- 变量buffer队列,过滤Buffer
- 调用jdk底层的api,把ByteBuf写入jdk原生的
ByteBuffer
自定义一个简单的编码器#
/** * @Author: Changwu * @Date: 2019/7/21 20:49 */ public class MyPersonEncoder extends MessageToByteEncoder<PersonProtocol> { // todo write动作会传播到 MyPersonEncoder的write方法, 但是我们没有重写, 于是就执行 父类 MessageToByteEncoder的write, 我们进去看 @Override protected void encode(ChannelHandlerContext ctx, PersonProtocol msg, ByteBuf out) throws Exception { System.out.println("MyPersonEncoder...."); // 消息头 长度 out.writeInt(msg.getLength()); // 消息体 out.writeBytes(msg.getContent()); } }
选择继承MessageToByteEncoder<T>
从消息到字节的编码器
继续跟进#
ok,现在来到了我们自定义的 解码器MyPersonEncoder
,
但是,并没看到正在传播的writeAndFlush()
,没关系, 我们自己的解码器继承了MessageToByteEncoder
,这个父类中实现了writeAndFlush()
,源码如下:解析写在源码后面
// todo 看他的write方法 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) {// todo 1 判断当前是否可以处理这个对象 @SuppressWarnings("unchecked") I cast = (I) msg; // todo 2 内存分配 buf = allocateBuffer(ctx, cast, preferDirect); try { // todo 3 调用本类的encode(), 这个方法就是我们自己实现的方法 encode(ctx, cast, buf); } finally { // todo 4 释放 ReferenceCountUtil.release(cast); } if (buf.isReadable()) { // todo 5. 往前传递 ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { // todo 释放 buf.release(); } }
- 将我们发送的消息msg,封装进了 ByteBuf 中
- 编码: 执行
encode()
方法,这是个抽象方法,由我们自定义的编码器实现
- 我们的实现很简单,分别往Buf里面写入下面两次数据
- int类型的消息的长度
- 消息体
- 将msg释放
- 继续向前传递
write()
事件 - 最终,释放第一步创建的ByteBuf
小结#
到这里为止,编码器的执行流程已经完成了,我们可以看到,和解码器的架构逻辑相似,类似于模板设计模式,对我们来说,只不过是做了个填空题
其实到上面的最后一步 释放第一步创建的ByteBuf
之前 ,消息已经被写到jdk底层的 ByteBuffer 中了,怎么做的呢? 别忘了它的上一步, 继续向前传递write()
事件,再往前其实就是HeaderContext
了,和HeaderContext
直接关联的就是unsafe类, 这并不奇怪,我们都知道,netty中无论是客户端还是服务端channel底层的数据读写,都依赖unsafe
下面开始分析,
WriteAndFlush()
底层的两波任务细节
第一波事件传递 write()
#
我们跟进HenderContext的write()
,而HenderContext的中依赖的是unsafe.wirte()
所以直接去 AbstractChannel
的Unsafe 源码如下:
@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // todo 缓存 写进来的 buffer ReferenceCountUtil.release(msg); return; } int size; try { // todo buffer Dirct化 , (我们查看 AbstractNioByteBuf的实现) msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } // todo 插入写队列 将 msg 插入到 outboundBuffer // todo outboundBuffer 这个对象是 ChannelOutBoundBuf类型的,它的作用就是起到一个容器的作用 // todo 下面看, 是如何将 msg 添加进 ChannelOutBoundBuf中的 outboundBuffer.addMessage(msg, size, promise); }
参数位置的msg,就是经过我们自定义解码器的父类进行包装了的ByteBuf
类型消息
这个方法主要做了三件事
- 第一:
filterOutboundMessage(msg);
将ByteBuf转换成DirctByteBuf
当我们进入查看他的实现时,idea会提示,它的子类重写了这个方法, 是谁重写的呢? 是AbstractNioByteChannel
这个类其实是属于客户端阵营的类,和服务端的AbstractNioMessageChannel
相提并论
源码如下:
protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) { return msg; } return newDirectBuffer(buf); } if (msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }
- 第二件事: 将转换后的
DirectBuffer
插入到写队列中
什么是写队列 ? 作用是啥?#
它其实就是一个netty自定义的容器,使用的单向链表的结构,为什么要有这个容器呢? 回想一下,服务端需要向客户端发送消息,消息进而被封装进ByteBuf
,但是呢, 往客户端写的方法有两个
- write()
- writeAndFlush()
这个方法的区别是有的,前者只是进行了写,(写到了ByteBuf) 却没有将内容刷新到ByteBuffer
,没有刷新到缓存中,就没办法进一步把它写入jdk原生的ByteBuffer
中, 而writeAndFlush()
就比较方便,先把msg写入ByteBuf
,然后直接刷进socket,一套带走,打完收工
但是如果客户端偏偏就是不使用writeAndFlush()
,而使用前者,那么盛放消息的ByteBuf
被传递到handler的最开始的位置,怎么办? unsafe也无法把它写给客户端, 难道丢弃不成?
于是写队列就解决了这个问题,它以链表当做数据结构,新传播过来的ByteBuf
就会被他封装成一个一个的节点(entry)进行维护,为了区分这个链表中,哪个节点是被使用过的,哪个节点是没有使用过的,他就用三个标记指针进行标记,如下:
- flushedEntry 被刷新过的entry
- tailEntry 尾节点
- unflushedEntry 未被刷的entry
下面我们看一下,它如何将一个新的节点,添加到写队列
addMessage(Object msg, int size, ChannelPromise promise)
添加写队列#
public void addMessage(Object msg, int size, ChannelPromise promise) { // todo 将上面的三者封装成实体 // todo 调用工厂方法, 创建 Entry , 在 当前的ChannelOutboundBuffer 中每一个单位都是一个 Entry, 用它进一步包装 msg Entry entry = Entry.newInstance(msg, size, total(msg), promise); // todo 调整三个指针, 去上面查看这三个指针的定义 if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 // todo 跟进这个方法 incrementPendingOutboundBytes(entry.pendingSize, false); }
看他的源码,其实就是简单的针对链表进行插入的操作,尾插入法, 一直往最后的位置插入,链表的头被标记成unflushedEntry
这两个节点之间entry,表示是可以被flush的节点
在每次添加新的 节点后都调用incrementPendingOutboundBytes(entry.pendingSize, false)
方法, 这个方法的作用是设置写状态, 设置怎样的状态呢? 我们看它的源码, 可以看到,它会记录下累计的ByteBuf
的容量,一旦超出了阈值,就会传播channel不可写的事件
- 这也是
write()
的第三件事
private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } // todo TOTAL_PENDING_SIZE_UPDATER 当前缓存中 存在的代写的 字节 // todo 累加 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); // todo 判断 新的将被写的 buffer的容量不能超过 getWriteBufferHighWaterMark() 默认是 64*1024 64字节 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { // todo 超过64 字节,进入这个方法 setUnwritable(invokeLater); } }
小结:#
到目前为止,第一波write()
事件已经完成了,我们可以看到了,这个事件的功能就是使用ChannelOutBoundBuf
将write事件传播过去的单个ByteBuf
维护起来,等待 flush事件的传播