netty系列之:netty中的自动解码器ReplayingDecoder

简介: netty系列之:netty中的自动解码器ReplayingDecoder

目录



简介


netty提供了一个从ByteBuf到用户自定义的message的解码器叫做ByteToMessageDecoder,要使用这个decoder,我们需要继承这个decoder,并实现decode方法,从而在这个方法中实现ByteBuf中的内容到用户自定义message对象的转换。


那么在使用ByteToMessageDecoder的过程中会遇到什么问题呢?为什么又会有一个ReplayingDecoder呢?带着这个问题我们一起来看看吧。


ByteToMessageDecoder可能遇到的问题


要想实现自己的解码器将ByteBuf转换成为自己的消息对象,可以继承ByteToMessageDecoder,然后实现其中的decode方法即可,先来看下decode方法的定义:


protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception


输入的参数中buf是要解码的ByteBuf,out是解码过后的对象列表,我们需要把ByteBuf中的数据转换成为我们自己的对象加入out的list中。


那么这里可能会遇到一个问题,因为我们在调用decode方法的时候buf中的数据可能还没有准备好,比如我们需要一个Integer,但是buf中的数据不够一个整数,那么就需要一些buf中数据逻辑的判断,我们以一个带有消息长度的Buf对象来描述一下这个过程。


所谓带有消息长度的Buf对象,就是说Buf消息中的前4位,构成了一个整数,这个整数表示的是buf中后续消息的长度。


所以我们读取消息进行转换的流程是,先读取前面4个字节,得到消息的长度,然后再读取该长度的字节,这就是我们真正要获取的消息内容。


来看一下如果是继承自ByteToMessageDecoder应该怎么实现这个逻辑呢?


public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
       if (buf.readableBytes() < 4) {
          return;
       }
       buf.markReaderIndex();
       int length = buf.readInt();
       if (buf.readableBytes() < length) {
          buf.resetReaderIndex();
          return;
       }
       out.add(buf.readBytes(length));
     }
   }


在decode中,我们首先需要判断buf中可读的字节有没有4个,没有的话直接返回。如果有,则先读取这4个字节的长度,然后再判断buf中的可读字节是否小于应该读取的长度,如果小于,则说明数据还没有准备好,需要调用resetReaderIndex进行重置。


最后,如果所有的条件都满足,才真正进行读取工作。


有没有一个办法可以不提前进行判断,可以直接按照自己想要的内容来读取buf的方式呢?答案就是ReplayingDecoder。


我们先来看一下上面的例子用ReplayingDecoder重写是什么情况:


public class IntegerHeaderFrameDecoder
        extends ReplayingDecoder<Void> {
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
       out.add(buf.readBytes(buf.readInt()));
     }
   }


使用ReplayingDecoder,我们可以忽略buf是否已经接收到了足够的可读数据,直接读取即可。


相比之下ReplayingDecoder非常的简单。接下来,我们来探究一下ReplayingDecoder的实现原理。


ReplayingDecoder的实现原理


ReplayingDecoder实际上是ByteToMessageDecoder的一个子类,它的定义如下:


public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder


在ByteToMessageDecoder中,最重要的方法是channelRead,在这个方法中实际调用了callDecode(ctx, cumulation, out);来实现cumulation到out的解码过程。


ReplayingDecoder的秘密就在于对这个方法的重写,我们来看下这个方法的具体实现:


protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        replayable.setCumulation(in);
        try {
            while (in.isReadable()) {
                int oldReaderIndex = checkpoint = in.readerIndex();
                int outSize = out.size();
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }
                S oldState = state;
                int oldInputLength = in.readableBytes();
                try {
                    decodeRemovalReentryProtection(ctx, replayable, out);
                    if (ctx.isRemoved()) {
                        break;
                    }
                    if (outSize == out.size()) {
                        if (oldInputLength == in.readableBytes() && oldState == state) {
                            throw new DecoderException(
                                    StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +
                                    "data or change its state if it did not decode anything.");
                        } else {
                            continue;
                        }
                    }
                } catch (Signal replay) {
                    replay.expect(REPLAY);
                    if (ctx.isRemoved()) {
                        break;
                    }
                    // Return to the checkpoint (or oldPosition) and retry.
                    int checkpoint = this.checkpoint;
                    if (checkpoint >= 0) {
                        in.readerIndex(checkpoint);
                    } else {
                    }
                    break;
                }
                if (oldReaderIndex == in.readerIndex() && oldState == state) {
                    throw new DecoderException(
                           StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +
                           "or change its state if it decoded something.");
                }
                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }


这里的实现和ByteToMessageDecoder不同的是ReplayingDecoder中定义了一个checkpoint,这个checkpint是在尝试进行数据解码之初设置的:


int oldReaderIndex = checkpoint = in.readerIndex();


如果是在解码的过程中出现了异常,则使用checkpoint重置index:


int checkpoint = this.checkpoint;
         if (checkpoint >= 0) {
            in.readerIndex(checkpoint);
        } else {
    }


这里捕获的异常是Signal,Signal是什么呢?


Signal是一个Error对象:


public final class Signal extends Error implements Constant<Signal>


这个异常是从replayable中抛出来的。


replayable是一个特有的ByteBuf对象,叫做ReplayingDecoderByteBuf:


final class ReplayingDecoderByteBuf extends ByteBuf


在ReplayingDecoderByteBuf中定义了Signal属性:


private static final Signal REPLAY = ReplayingDecoder.REPLAY;


这个Signal异常是从ReplayingDecoderByteBuf中的get方法中抛出的,这里以getInt为例,看一下异常是如何抛出的:


public int getInt(int index) {
        checkIndex(index, 4);
        return buffer.getInt(index);
    }


getInt方法首先会去调用checkIndex方法进行buff中的长度检测,如果小于要读取的长度,则会抛出异常REPLAY:


private void checkIndex(int index, int length) {
        if (index + length > buffer.writerIndex()) {
            throw REPLAY;
        }
    }


这就是Signal异常的由来。


总结



以上就是对ReplayingDecoder的介绍,虽然ReplayingDecoder好用,但是从它的实现可以看出,ReplayingDecoder是通过抛出异常来不断的重试,所以在某些特殊的情况下会造成性能的下降。


相关文章
|
3月前
|
移动开发 编解码 Java
Netty编码器和解码器
Netty从底层Java通道读到ByteBuf二进制数据,传入Netty通道的流水线,随后开始入站处理。在入站处理过程中,需要将ByteBuf二进制类型解码成Java POJO对象。这个解码过程可以通过Netty的Decoder解码器去完成。在出站处理过程中,业务处理后的结果需要从某个Java POJO对象编码为最终的ByteBuf二进制数据,然后通过底层 Java通道发送到对端。在编码过程中,需要用到Netty的Encoder编码器去完成数据的编码工作。
|
18天前
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
|
3月前
|
移动开发 网络协议 Java
通信密码学:探秘Netty中解码器的神奇力量
通信密码学:探秘Netty中解码器的神奇力量
97 0
|
3月前
|
编解码
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
68 0
|
3月前
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
88 0
|
3月前
|
编解码 安全 前端开发
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
128 0
|
3月前
|
存储 编解码 Java
Netty使用篇:自定义编解码器
Netty使用篇:自定义编解码器
|
3月前
|
设计模式 JSON 编解码
Netty使用篇:编解码器
Netty使用篇:编解码器
|
XML 存储 编解码
Netty入门到超神系列-Netty使用Protobuf编码解码
当我们的Netty客户端和服务端进行通信时数据在传输的过程中需要进行序列化,比如以二进制数据进行传输,那么我们的业务数据就需要有相应的编码器进行编码为二进制数据,当服务端拿到二进制数据后需要有相应的解码器进行解码得到真实的业务数据。
125 0
|
安全 Java 关系型数据库
高性能IO框架Netty五 - Netty内置的编解码器
高性能IO框架Netty五 - Netty内置的编解码器
116 0