Netty 源码深度解析(九) - 编码

简介:

概述

一个问题



编码器实现了 ChannelOutboundHandler,并将出站数据从 一种格式转换为另一种格式,和我们方才学习的解码器的功能正好相反。Netty 提供了一组类, 用于帮助你编写具有以下功能的编码器:

  • 将消息编码为字节
  • 将消息编码为消息
    我们将首先从抽象基类 MessageToByteEncoder 开始来对这些类进行考察

1 抽象类 MessageToByteEncoder

MessageToByteEncoder API
解码器通常需要在 Channel 关闭之后产生最后一个消息(因此也就有了 decodeLast()方法)
这显然不适于编码器的场景——在连接被关闭之后仍然产生一个消息是毫无意义的

1.1 ShortToByteEncoder

其接受一 Short 型实例作为消息,编码为Short的原子类型值,并写入ByteBuf,随后转发给ChannelPipeline中的下一个 ChannelOutboundHandler
每个传出的 Short 值都将会占用 ByteBuf 中的 2 字节
ShortToByteEncoder

1.2 Encoder

Netty 提供了一些专门化的 MessageToByteEncoder,可基于此实现自己的编码器
WebSocket08FrameEncoder 类提供了一个很好的实例

2 抽象类 MessageToMessageEncoder

你已经看到了如何将入站数据从一种消息格式解码为另一种
为了完善这幅图,将展示 对于出站数据将如何从一种消息编码为另一种。MessageToMessageEncoder 类的 encode() 方法提供了这种能力
MessageToMessageEncoderAPI
为了演示,使用 IntegerToStringEncoder 扩展了 MessageToMessageEncoder

  • 编码器将每个出站 Integer 的 String 表示添加到了该 List 中

IntegerToStringEncoder的设计

关于有趣的 MessageToMessageEncoder 的专业用法,请查看 io.netty.handler. codec.protobuf.ProtobufEncoder 类,它处理了由 Google 的 Protocol Buffers 规范所定义 的数据格式。

一个java对象最后是如何转变成字节流,写到socket缓冲区中去的

pipeline中的标准链表结构
java对象编码过程
write:写队列
flush:刷新写队列
writeAndFlush: 写队列并刷新

pipeline中的标准链表结构

标准的pipeline链式结构
数据从head节点流入,先拆包,然后解码成业务对象,最后经过业务Handler处理,调用write,将结果对象写出去
而写的过程先通过tail节点,然后通过encoder节点将对象编码成ByteBuf,最后将该ByteBuf对象传递到head节点,调用底层的Unsafe写到JDK底层管道

Java对象编码过程

为什么我们在pipeline中添加了encoder节点,java对象就转换成netty可以处理的ByteBuf,写到管道里?

我们先看下调用write的code

业务处理器接受到请求之后,做一些业务处理,返回一个user

  • 然后,user在pipeline中传递
    AbstractChannel#

DefaultChannelPipeline#
AbstractChannelHandlerContext#
AbstractChannelHandlerContext#

  • 情形一
    AbstractChannelHandlerContext#

AbstractChannelHandlerContext#

  • 情形二
    AbstractChannelHandlerContext#


AbstractChannelHandlerContext#invokeWrite0
AbstractChannelHandlerContext#invokeFlush0
handler 如果不覆盖 flush 方法,就会一直向前传递直到 head 节点

落到 Encoder节点,下面是 Encoder 的处理流程

按照简单自定义协议,将Java对象 User 写到传入的参数 out中,这个out到底是什么?

需知 User 对象,从BizHandler传入到 MessageToByteEncoder时,首先传到 write

1. 判断当前Handelr是否能处理写入的消息(匹配对象)



  • 判断该对象是否是该类型参数匹配器实例可匹配到的类型
    TypeParameterMatcher#

具体实例

2 分配内存


3 编码实现

  • 调用encode,这里就调回到 Encoder 这个Handler
  • 其为抽象方法,因此自定义实现类实现编码方法

4 释放对象

  • 既然自定义Java对象转换成ByteBuf了,那么这个对象就已经无用,释放掉 (当传入的msg类型是ByteBuf时,就不需要自己手动释放了)

5 传播数据

//112 如果buf中写入了数据,就把buf传到下一个节点,直到 header 节点

6 释放内存

//115 否则,释放buf,将空数据传到下一个节点
// 120 如果当前节点不能处理传入的对象,直接扔给下一个节点处理
// 127 当buf在pipeline中处理完之后,释放

Encoder处理传入的Java对象

  • 判断当前Handler是否能处理写入的消息

    • 如果能处理,进入下面的流程
    • 否则,直接扔给下一个节点处理
  • 将对象强制转换成Encoder 可以处理的 Response对象
  • 分配一个ByteBuf
  • 调用encoder,即进入到 Encoder 的 encode方法,该方法是用户代码,用户将数据写入ByteBuf
  • 既然自定义Java对象转换成ByteBuf了,那么这个对象就已经无用了,释放掉(当传入的msg类型是ByteBuf时,无需自己手动释放)
  • 如果buf中写入了数据,就把buf传到下一个节点,否则,释放buf,将空数据传到下一个节点
  • 最后,当buf在pipeline中处理完之后,释放节点

总结就是,Encoder节点分配一个ByteBuf,调用encode方法,将Java对象根据自定义协议写入到ByteBuf,然后再把ByteBuf传入到下一个节点,在我们的例子中,最终会传入到head节点

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

这里的msg就是前面在Encoder节点中,载有java对象数据的自定义ByteBuf对象

write - 写buffer队列


ChannelOutboundInvoker#

write(Object msg, boolean flush, ChannelPromise promise)



HeadContext in DefaultChannelPipeline#write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
Unsafe in Channel#write(Object msg, ChannelPromise promise)
以下过程分三步讲解

direct ByteBuf


AbstractChannel#filterOutboundMessage(Object msg)

  • 首先,调用 assertEventLoop 确保该方法的调用是在reactor线程中
  • 然后,调用 filterOutboundMessage() ,将待写入的对象过滤,把非ByteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer

AbstractNioChannel#newDirectBuffer

插入写队列

  • 接下来,估算出需要写入的ByteBuf的size

  • 最后,调用 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,所以,接下来,我们需要重点看一下这个方法干了什么事情
    ChannelOutboundBuffer

想要理解上面这段代码,须掌握写缓存中的几个消息指针

ChannelOutboundBuffer 里面的数据结构是一个单链表结构,每个节点是一个 Entry,Entry 里面包含了待写出ByteBuf 以及消息回调 promise下面分别是

三个指针的作用

  • flushedEntry
    表第一个被写到OS Socket缓冲区中的节点

ChannelOutboundBuffer

  • unFlushedEntry
    表第一个未被写入到OS Socket缓冲区中的节点

ChannelOutboundBuffer

  • tailEntry
    ChannelOutboundBuffer缓冲区的最后一个节点

ChannelOutboundBuffer

图解过程

  • 初次调用write 即 addMessage

fushedEntry指向空,unFushedEntrytailEntry 都指向新加入节点

  • 第二次调用 addMessage
  • 第n次调用 addMessage

可得,调用n次addMessage

  • flushedEntry指针一直指向null,表此时尚未有节点需写到Socket缓冲区
  • unFushedEntry后有n个节点,表当前还有n个节点尚未写到Socket缓冲区

设置写状态

ChannelOutboundBuffer#addMessage

  • 统计当前有多少字节需要需要被写出
    ChannelOutboundBuffer#addMessage(Object msg, int size, ChannelPromise promise)
  • 当前缓冲区中有多少待写字节
    ChannelOutboundBuffer#


ChannelConfig#getWriteBufferHighWaterMark()

  • 所以默认不能超过64k
    WriteBufferWaterMark

  • 自旋锁+CAS 操作,通过 pipeline 将事件传播到channelhandler 中监控

flush:刷新buffer队列

添加刷新标志并设置写状态

  • 不管调用channel.flush(),还是ctx.flush(),最终都会落地到pipeline中的head节点
    DefaultChannelPipeline#flush
  • 之后进入到AbstractUnsafe
    AbstractChannel#flush()
  • flush方法中,先调用
    ChannelOutboundBuffer#addFlush

ChannelOutboundBuffer#decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability)

和之前那个实例相同,不再赘述

  • 结合前面的图来看,上述过程即
    首先拿到 unflushedEntry 指针,然后将 flushedEntry 指向unflushedEntry所指向的节点,调用完毕后

遍历 buffer 队列,过滤bytebuf

  • 接下来,调用 flush0()
  • 发现这里的核心代码就一个 doWrite
    AbstractChannel#

AbstractNioByteChannel

  • 继续跟
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;

    boolean setOpWrite = false;
    for (;;) {
        // 拿到第一个需要flush的节点的数据
        Object msg = in.current();

        if (msg instanceof ByteBuf) {
            boolean done = false;
            long flushedAmount = 0;
            // 拿到自旋锁迭代次数
            if (writeSpinCount == -1) {
                writeSpinCount = config().getWriteSpinCount();
            }
            // 自旋,将当前节点写出
            for (int i = writeSpinCount - 1; i >= 0; i --) {
                int localFlushedAmount = doWriteBytes(buf);
                if (localFlushedAmount == 0) {
                    setOpWrite = true;
                    break;
                }

                flushedAmount += localFlushedAmount;
                if (!buf.isReadable()) {
                    done = true;
                    break;
                }
            }

            in.progress(flushedAmount);

            // 写完之后,将当前节点删除
            if (done) {
                in.remove();
            } else {
                break;
            }
        } 
    }
}
  • 第一步,调用current()先拿到第一个需要flush的节点的数据
    ChannelOutboundBuffer#current
  • 第二步,拿到自旋锁的迭代次数
  • 第三步 调用 JDK 底层 API 进行自旋写
    自旋的方式将ByteBuf写到JDK NIO的Channel

强转为ByteBuf,若发现没有数据可读,直接删除该节点

  • 拿到自旋锁迭代次数

image.png

  • 在并发编程中使用自旋锁可以提高内存使用率和写的吞吐量,默认值为16
    ChannelConfig
  • 继续看源码

AbstractNioByteChannel#

  • javaChannel(),表明 JDK NIO Channel 已介入此次事件
    NioSocketChannel#

ByteBuf#readBytes(GatheringByteChannel out, int length)

  • 得到向JDK 底层已经写了多少字节
    PooledDirectByteBuf#

  • 从 Netty 的 bytebuf 写到 JDK 底层的 bytebuffer

  • 第四步,删除该节点
    节点的数据已经写入完毕,接下来就需要删除该节点


首先拿到当前被flush掉的节点(flushedEntry所指)
然后拿到该节点的回调对象 ChannelPromise, 调用 removeEntry()移除该节点

这里是逻辑移除,只是将flushedEntry指针移到下个节点,调用后

随后,释放该节点数据的内存,调用 safeSuccess 回调,用户代码可以在回调里面做一些记录,下面是一段Example

ctx.write(xx).addListener(new GenericFutureListener<Future<? super Void>>() {
    @Override
    public void operationComplete(Future<? super Void> future) throws Exception {
       // 回调 
    }
})

最后,调用 recycle,将当前节点回收

writeAndFlush: 写队列并刷新

writeAndFlush在某个Handler中被调用之后,最终会落到 TailContext 节点


public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);

    return promise;
}

AbstractChannelHandlerContext#
AbstractChannelHandlerContext#

最终,通过一个boolean变量,表示是调用 invokeWriteAndFlush,还是 invokeWriteinvokeWrite便是我们上文中的write过程
AbstractChannelHandlerContext#
可以看到,最终调用的底层方法和单独调用 write flush 一样的


由此看来,invokeWriteAndFlush基本等价于write之后再来一次flush

总结

  • 调用write并没有将数据写到Socket缓冲区中,而是写到了一个单向链表的数据结构中,flush才是真正的写出
  • writeAndFlush等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到Socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功
  • netty中的缓冲区中的ByteBuf为DirectByteBuf

当 BizHandler 通过 writeAndFlush 方法将自定义对象往前传播时,其实可以拆分成两个过程

  • 通过 pipeline逐渐往前传播,传播到其中的一个 encode 节点后,其负责重写 write 方法将自定义的对象转化为 ByteBuf,接着继续调用 write 向前传播
  • pipeline中的编码器原理是创建一个ByteBuf,将Java对象转换为ByteBuf,然后再把ByteBuf继续向前传递,若没有再重写了,最终会传播到 head 节点,其中缓冲区列表拿到缓存写到 JDK 底层 ByteBuffer
目录
相关文章
|
7月前
|
算法 Java 容器
Netty源码—4.客户端接入流程
本文主要介绍了关于Netty客户端连接接入问题整理、Reactor线程模型和服务端启动流程、Netty新连接接入的整体处理逻辑、新连接接入之检测新连接、新连接接入之创建NioSocketChannel、新连接接入之绑定NioEventLoop线程、新连接接入之注册Selector和注册读事件、注册Reactor线程总结、新连接接入总结
|
7月前
|
安全 Java 调度
Netty源码—3.Reactor线程模型二
本文主要介绍了NioEventLoop的执行总体框架、Reactor线程执行一次事件轮询、Reactor线程处理产生IO事件的Channel、Reactor线程处理任务队列之添加任务、Reactor线程处理任务队列之执行任务、NioEventLoop总结。
|
7月前
|
安全 Java
Netty源码—2.Reactor线程模型一
本文主要介绍了关于NioEventLoop的问题整理、理解Reactor线程模型主要分三部分、NioEventLoop的创建和NioEventLoop的启动。
|
7月前
|
编解码 安全 Java
Netty源码—1.服务端启动流程
本文主要介绍了服务端启动整体流程及关键方法、服务端启动的核心步骤、创建服务端Channel的源码、初始化服务端Channel的源码、注册服务端Channel的源码、绑定服务端端口的源码、服务端启动流程源码总结。
|
9月前
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
932 29
|
9月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
396 4
|
9月前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
9月前
|
移动开发 前端开发 JavaScript
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。
|
9月前
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
|
9月前
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~

推荐镜像

更多
  • DNS