深入理解 Netty编码流程及WriteAndFlush()的实现 (一)

简介: 深入理解 Netty编码流程及WriteAndFlush()的实现 (一)

编码器的执行时机#


首先, 我们想通过服务端,往客户端发送数据, 通常我们会调用ctx.writeAndFlush(数据)的方式, 入参位置的数据可能是基本数据类型,也可能对象

其次,编码器同样属于handler,只不过他是特化的专门用于编码作用的handler, 在我们的消息真正写入jdk底层的ByteBuffer时前,数据需要经过编码处理, 不是说不进行编码就发送不出去,而是不经过编码,客户端可能接受到的是乱码

然后,我们知道,ctx.writeAndFlush(数据)它其实是出站处理器特有的行为,因此注定了它需要在pipeline中进行传递,从哪里进行传递呢? 从tail节点开始,一直传播到header之前的我们自己添加的自定义的解码器


WriteAndFlush()的逻辑#


我们跟进源码WriteAndFlush()相对于Write(),它的flush字段是true


private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            //todo 因为flush 为 true
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }



于是就会这样

  • 逐个调用handler的write()
  • 逐个调用handler的flush()

知道这一点很重要,这意味这我们知道了,事件传播分成两波进行, 一波write,一波flush, 这两波事件传播的大体流程我写在这里, 在下面


write

  • 将ByteBuf 转换成DirctBuffer
  • 将消息(DirctBuffer)封装进entry 插入写队列
  • 设置写状态

flush

  • 刷新标志,设置写状态
  • 变量buffer队列,过滤Buffer
  • 调用jdk底层的api,把ByteBuf写入jdk原生的ByteBuffer


自定义一个简单的编码器#


/**
 * @Author: Changwu
 * @Date: 2019/7/21 20:49
 */
public class MyPersonEncoder extends MessageToByteEncoder<PersonProtocol> {
    // todo write动作会传播到 MyPersonEncoder的write方法, 但是我们没有重写, 于是就执行 父类 MessageToByteEncoder的write, 我们进去看
    @Override
    protected void encode(ChannelHandlerContext ctx, PersonProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyPersonEncoder....");
        // 消息头  长度
        out.writeInt(msg.getLength());
        // 消息体
        out.writeBytes(msg.getContent());
    }
}


选择继承MessageToByteEncoder<T> 从消息到字节的编码器


继续跟进#


ok,现在来到了我们自定义的 解码器MyPersonEncoder ,

但是,并没看到正在传播的writeAndFlush(),没关系, 我们自己的解码器继承了MessageToByteEncoder,这个父类中实现了writeAndFlush(),源码如下:解析写在源码后面


// todo 看他的write方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) {// todo 1  判断当前是否可以处理这个对象
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            // todo 2 内存分配
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                // todo 3 调用本类的encode(), 这个方法就是我们自己实现的方法
                encode(ctx, cast, buf);
            } finally {
                // todo 4 释放
                ReferenceCountUtil.release(cast);
            }
            if (buf.isReadable()) {
                // todo 5. 往前传递
                ctx.write(buf, promise);
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            // todo 释放
            buf.release();
        }
    }


  • 将我们发送的消息msg,封装进了 ByteBuf 中
  • 编码: 执行encode()方法,这是个抽象方法,由我们自定义的编码器实现
  • 我们的实现很简单,分别往Buf里面写入下面两次数据
  • int类型的消息的长度
  • 消息体
  • 将msg释放
  • 继续向前传递 write()事件
  • 最终,释放第一步创建的ByteBuf


小结#


到这里为止,编码器的执行流程已经完成了,我们可以看到,和解码器的架构逻辑相似,类似于模板设计模式,对我们来说,只不过是做了个填空题


其实到上面的最后一步 释放第一步创建的ByteBuf之前 ,消息已经被写到jdk底层的 ByteBuffer 中了,怎么做的呢? 别忘了它的上一步, 继续向前传递write()事件,再往前其实就是HeaderContext了,和HeaderContext直接关联的就是unsafe类, 这并不奇怪,我们都知道,netty中无论是客户端还是服务端channel底层的数据读写,都依赖unsafe


下面开始分析,WriteAndFlush()底层的两波任务细节


第一波事件传递 write()#


我们跟进HenderContext的write() ,而HenderContext的中依赖的是unsafe.wirte()所以直接去 AbstractChannel的Unsafe 源码如下:


@Override
    public final void write(Object msg, ChannelPromise promise) {
        assertEventLoop();
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) { // todo 缓存 写进来的 buffer
            ReferenceCountUtil.release(msg);
            return;
        }
        int size;
        try {
            // todo buffer  Dirct化 , (我们查看 AbstractNioByteBuf的实现)
            msg = filterOutboundMessage(msg);
            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            ReferenceCountUtil.release(msg);
            return;
        }
        // todo 插入写队列  将 msg 插入到 outboundBuffer
        // todo outboundBuffer 这个对象是 ChannelOutBoundBuf类型的,它的作用就是起到一个容器的作用
        // todo 下面看, 是如何将 msg 添加进 ChannelOutBoundBuf中的
        outboundBuffer.addMessage(msg, size, promise);
    }


参数位置的msg,就是经过我们自定义解码器的父类进行包装了的ByteBuf类型消息

这个方法主要做了三件事


  • 第一: filterOutboundMessage(msg); 将ByteBuf转换成DirctByteBuf

当我们进入查看他的实现时,idea会提示,它的子类重写了这个方法, 是谁重写的呢? 是AbstractNioByteChannel 这个类其实是属于客户端阵营的类,和服务端的AbstractNioMessageChannel相提并论


源码如下:


protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }
        return newDirectBuffer(buf);
    }
    if (msg instanceof FileRegion) {
        return msg;
    }
    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}


  • 第二件事: 将转换后的DirectBuffer插入到写队列中


什么是写队列 ? 作用是啥?#


它其实就是一个netty自定义的容器,使用的单向链表的结构,为什么要有这个容器呢? 回想一下,服务端需要向客户端发送消息,消息进而被封装进ByteBuf,但是呢, 往客户端写的方法有两个

  • write()
  • writeAndFlush()

这个方法的区别是有的,前者只是进行了写,(写到了ByteBuf) 却没有将内容刷新到ByteBuffer,没有刷新到缓存中,就没办法进一步把它写入jdk原生的ByteBuffer中, 而writeAndFlush()就比较方便,先把msg写入ByteBuf,然后直接刷进socket,一套带走,打完收工

但是如果客户端偏偏就是不使用writeAndFlush(),而使用前者,那么盛放消息的ByteBuf被传递到handler的最开始的位置,怎么办? unsafe也无法把它写给客户端, 难道丢弃不成?

于是写队列就解决了这个问题,它以链表当做数据结构,新传播过来的ByteBuf就会被他封装成一个一个的节点(entry)进行维护,为了区分这个链表中,哪个节点是被使用过的,哪个节点是没有使用过的,他就用三个标记指针进行标记,如下:

  • flushedEntry 被刷新过的entry
  • tailEntry 尾节点
  • unflushedEntry 未被刷的entry


下面我们看一下,它如何将一个新的节点,添加到写队列


addMessage(Object msg, int size, ChannelPromise promise) 添加写队列#


public void addMessage(Object msg, int size, ChannelPromise promise) {
    // todo 将上面的三者封装成实体
    // todo 调用工厂方法, 创建  Entry  , 在 当前的ChannelOutboundBuffer 中每一个单位都是一个 Entry, 用它进一步包装 msg
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    // todo 调整三个指针, 去上面查看这三个指针的定义
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }
    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    // todo 跟进这个方法
    incrementPendingOutboundBytes(entry.pendingSize, false);
}


看他的源码,其实就是简单的针对链表进行插入的操作,尾插入法, 一直往最后的位置插入,链表的头被标记成unflushedEntry 这两个节点之间entry,表示是可以被flush的节点

在每次添加新的 节点后都调用incrementPendingOutboundBytes(entry.pendingSize, false)方法, 这个方法的作用是设置写状态, 设置怎样的状态呢? 我们看它的源码, 可以看到,它会记录下累计的ByteBuf的容量,一旦超出了阈值,就会传播channel不可写的事件


  • 这也是write()的第三件事


private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    // todo TOTAL_PENDING_SIZE_UPDATER 当前缓存中 存在的代写的 字节
    // todo 累加
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    // todo 判断 新的将被写的 buffer的容量不能超过  getWriteBufferHighWaterMark() 默认是 64*1024  64字节
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        // todo 超过64 字节,进入这个方法
        setUnwritable(invokeLater);
    }
}


小结:#


到目前为止,第一波write()事件已经完成了,我们可以看到了,这个事件的功能就是使用ChannelOutBoundBuf将write事件传播过去的单个ByteBuf维护起来,等待 flush事件的传播

相关文章
|
存储 缓存 数据处理
Netty源码剖析之数据通信流程
NIO事件/感兴趣事件 OP_REGISTER = 0 通道注册事件 OP_READ = 1 << 0 OP_WRITE = 1 << 2 OP_CONNECT = 1 << 3 OP_ACCEPT = 1 << 4
Netty源码剖析之NIOEventLoopGroup创建流程
Netty中事件循环机制非常重要,通过NIOEventLoopGroup可以了解到netty如何实现处理请求,如何实现事件监听处理,转发,有助于平时学习使用
Netty源码剖析之NIOEventLoopGroup创建流程
|
编解码 缓存 Dubbo
Netty流程学习
连接完成之后,不能无所事事,此时应该会执行业务处理。也即此时可以看到上面的NettyServerHandler。因此可以看到dubbo的线程模型: 配置 Dubbo 中的线程模型 如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识,则直接在 IO 线程上处理更快,因为减少了线程池调度。 但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。 如果用 IO 线程处理事件,又在事件处理过程中发起新的 IO 请求,比如在连接事件中发起登录请求,会报“可能引发死锁”异常,但不会真死锁
98 0
Netty流程学习
|
前端开发
Netty流程学习一-netty启动服务
问题:我们的线程:openSelector在什么时候创建的。 在创建NioEventLoop的时候,创建openSelector。 什么时候创建severSocketChannel、初始化serverSocketChannel,同时给serverSocketChannel从bossGroup中选择一个NioEventLoop 创建serverSocketChannel是在initAndRegister的时候,通过泛型+放射+工厂的方式创建serverSocketChannel。 而初始化则是设置channelOptions的相关参数信息、设置属性信息,同时通过channel的pipeline方
97 0
Netty流程学习一-netty启动服务
一些关于Netty的工作架构流程的问题
一些关于Netty的工作架构流程的问题
|
缓存 安全 NoSQL
基于Netty,从零开发IM(四):编码实践篇(系统优化)
虽然 Netty 的性能很高,但是也不能保证随意写出来的项目就是性能很高的,所以本篇将主要讲解几个基于Netty的IM系统的优化实战技术点。
183 0
基于Netty,从零开发IM(四):编码实践篇(系统优化)
|
存储 Java Go
基于Netty,从零开发IM(三):编码实践篇(群聊功能)
接上两篇《IM系统设计篇》、《编码实践篇(单聊功能)》,本篇主要讲解的是通过实战编码实现IM的群聊功能,内容涉及群聊技术实现原理、编码实践等知识。
182 0
基于Netty,从零开发IM(三):编码实践篇(群聊功能)
|
存储 fastjson Java
|
存储 算法 Java
netty系列之:netty中常用的对象编码解码器
我们在程序中除了使用常用的字符串进行数据传递之外,使用最多的还是JAVA对象。在JDK中,对象如果需要在网络中传输,必须实现Serializable接口,表示这个对象是可以被序列化的。这样就可以调用JDK自身的对象对象方法,进行对象的读写。 那么在netty中进行对象的传递可不可以直接使用JDK的对象序列化方法呢?如果不能的话,又应该怎么处理呢? 今天带大家来看看netty中提供的对象编码器。
|
XML JSON 数据格式
netty系列之:netty中常用的xml编码解码器
在json之前,xml是最常用的数据传输格式,虽然xml的冗余数据有点多,但是xml的结构简单清晰,至今仍然运用在程序中的不同地方,对于netty来说自然也提供了对于xml数据的支持。 netty对xml的支持表现在两个方面,第一个方面是将编码过后的多个xml数据进行frame拆分,每个frame包含一个完整的xml。另一方面是将分割好的frame进行xml的语义解析。 进行frame拆分可以使用XmlFrameDecoder,进行xml文件内容的解析则可以使用XmlDecoder,接下来我们会详细讲解两个decoder实现和使用。