1. 判断当前Handelr是否能处理写入的消息(匹配对象)
判断该对象是否是该类型参数匹配器实例可匹配到的类型
2 分配内存
3 编码实现
- 调用
encode
,这里就调回到Encoder
这个Handler
中
- 其为抽象方法,因此自定义实现类实现编码方法
4 释放对象
- 既然自定义Java对象转换成
ByteBuf
了,那么这个对象就已经无用,释放掉 (当传入的msg
类型是ByteBuf
时,就不需要自己手动释放了)
5 传播数据
//112 如果buf中写入了数据,就把buf传到下一个节点,直到 header 节点
6 释放内存
//115 否则,释放buf,将空数据传到下一个节点
// 120 如果当前节点不能处理传入的对象,直接扔给下一个节点处理
// 127 当buf在pipeline中处理完之后,释放
Encoder处理传入的Java对象
- 判断当前Handler是否能处理写入的消息
- 如果能处理,进入下面的流程
- 否则,直接扔给下一个节点处理
- 将对象强制转换成Encoder 可以处理的 Response对象
- 分配一个ByteBuf
- 调用encoder,即进入到 Encoder 的 encode方法,该方法是用户代码,用户将数据写入ByteBuf
- 既然自定义Java对象转换成ByteBuf了,那么这个对象就已经无用了,释放掉(当传入的msg类型是ByteBuf时,无需自己手动释放)
- 如果buf中写入了数据,就把buf传到下一个节点,否则,释放buf,将空数据传到下一个节点
- 最后,当buf在pipeline中处理完之后,释放节点
总结就是,Encoder
节点分配一个ByteBuf
,调用encode
方法,将Java对象根据自定义协议写入到ByteBuf,然后再把ByteBuf传入到下一个节点,在我们的例子中,最终会传入到head节点
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
这里的msg就是前面在Encoder节点中,载有java对象数据的自定义ByteBuf对象
write - 写buffer队列
以下过程分三步讲解
direct ByteBuf
- 首先,调用
assertEventLoop
确保该方法的调用是在reactor
线程中 - 然后,调用
filterOutboundMessage()
,将待写入的对象过滤,把非ByteBuf
对象和FileRegion
过滤,把所有的非直接内存转换成直接内存DirectBuffer
插入写队列
- 接下来,估算出需要写入的ByteBuf的size
- 最后,调用 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,所以,接下来,我们需要重点看一下这个方法干了什么事情
想要理解上面这段代码,须掌握写缓存中的几个消息指针
ChannelOutboundBuffer 里面的数据结构是一个单链表结构,每个节点是一个 Entry,Entry 里面包含了待写出ByteBuf 以及消息回调 promise下面分别是
三个指针的作用
- flushedEntry
表第一个被写到OS Socket缓冲区中的节点
- unFlushedEntry
表第一个未被写入到OS Socket缓冲区中的节点
- tailEntry
表ChannelOutboundBuffer
缓冲区的最后一个节点
图解过程
- 初次调用write 即
addMessage
后
- 第二次调用
addMessage
后
- 第n次调用
addMessage
后
可得,调用n次addMessage
后
flushedEntry
指针一直指向null
,表此时尚未有节点需写到Socket缓冲区unFushedEntry
后有n个节点,表当前还有n个节点尚未写到Socket缓冲区