netty系列之:自定义编码和解码器要注意的问题

简介: netty系列之:自定义编码和解码器要注意的问题

目录



简介


在之前的系列文章中,我们提到了netty中的channel只接受ByteBuf类型的对象,如果不是ByteBuf对象的话,需要用编码和解码器对其进行转换,今天来聊一下netty自定义的编码和解码器实现中需要注意的问题。


自定义编码器和解码器的实现


在介绍netty自带的编码器和解码器之前,告诉大家怎么实现自定义的编码器和解码器。


netty中所有的编码器和解码器都是从ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter衍生而来的。


对于ChannelOutboundHandlerAdapter来说,最重要的两个类是MessageToByteEncoder 和 MessageToMessageEncoder 。


MessageToByteEncoder是将消息编码成为ByteBuf,这个类也是我们自定义编码最常用的类,直接继承这个类并实现encode方法即可。注意到这个类有一个泛型,这个泛型指定的就是消息的对象类型。


例如我们想将Integer转换成为ByteBuf,可以这样写:


public class IntegerEncoder extends MessageToByteEncoder<Integer> {
            @Override
           public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
                   throws Exception {
               out.writeInt(msg);
           }
       }


MessageToMessageEncoder是在消息和消息之间进行转换,因为消息并不能直接写入到channel中,所以需要和MessageToByteEncoder配合使用。


下面是一个Integer到String的例子:


public class IntegerToStringEncoder extends
               MessageToMessageEncoder<Integer> {
            @Override
           public void encode(ChannelHandlerContext ctx, Integer message, List<Object> out)
                   throws Exception {
               out.add(message.toString());
           }
       }


对于ChannelInboundHandlerAdapter来说,最重要的两个类是ByteToMessageDecoder和MessageToMessageDecoder 。


ByteToMessageDecoder是将ByteBuf转换成对应的消息类型,我们需要继承这个类,并实现decode方法,下面是一个从ByteBuf中读取所有可读的字节,并将结果放到一个新的ByteBuf中,


public class SquareDecoder extends ByteToMessageDecoder {
            @Override
           public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
                   throws Exception {
               out.add(in.readBytes(in.readableBytes()));
           }
       }


MessageToMessageDecoder是消息和消息之间的转换,同样的只需要实现decode方法即可,如下从String转换到Integer:


public class StringToIntegerDecoder extends
               MessageToMessageDecoder<String> {
            @Override
           public void decode(ChannelHandlerContext ctx, String message,
                              List<Object> out) throws Exception {
               out.add(message.length());
           }
       }


ReplayingDecoder


上面的代码看起来很简单,但是在实现的过程中还有一些问题要注意。


对于Decoder来说,我们从ByteBuf中读取数据,然后进行转换。但是在读取的过程中,并不知道ByteBuf中数据的变动情况,有可能在读取的过程中ByteBuf还没有准备好,那么就需要在读取的时候对ByteBuf中可读字节的大小进行判断。


比如我们需要解析一个数据结构,这个数据结构的前4个字节是一个int,表示后面byte数组的长度,我们需要先判断ByteBuf中是否有4个字节,然后读取这4个字节作为Byte数组的长度,然后再读取这个长度的Byte数组,最终得到要读取的结果,如果其中的某一步出现问题,或者说可读的字节长度不够,那么就需要直接返回,等待下一次的读取。如下所示:


public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
       if (buf.readableBytes() < 4) {
          return;
       }
       buf.markReaderIndex();
       int length = buf.readInt();
       if (buf.readableBytes() < length) {
          buf.resetReaderIndex();
          return;
       }
       out.add(buf.readBytes(length));
     }
   }


这种判断是比较复杂同时也是可能出错的,为了解决这个问题,netty提供了 ReplayingDecoder用来简化上面的操作,在ReplayingDecoder中,假设所有的ByteBuf已经处于准备好的状态,直接从中间读取即可。


上面的例子用ReplayingDecoder重写如下:


public class IntegerHeaderFrameDecoder
        extends ReplayingDecoder<Void> {
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
       out.add(buf.readBytes(buf.readInt()));
     }
   }


它的实现原理是去尝试读取对应的字节信息,如果没有读到,则抛出异常,ReplayingDecoder接收到异常之后,会重新调用decode方法。


虽然ReplayingDecoder使用起来非常简单,但是它有两个问题。


第一个问题是性能问题,因为会去重复调用decode方法,如果ByteBuf本身并没有变化,就会导致重复decode同一个ByteBuf,照成性能的浪费。解决这个问题就是在在decode的过程中分阶段进行,比如上面的例子中,我们需要先读取Byte数组的长度,然后再读取真正的byte数组。所以在读完byte数组长度之和,可以调用checkpoint()方法做一个保存点,下次再执行decode方法的时候就可以跳过这个保存点,继续后续的执行过程,如下所示:


public enum MyDecoderState {
     READ_LENGTH,
     READ_CONTENT;
   }
   public class IntegerHeaderFrameDecoder
        extends ReplayingDecoder<MyDecoderState> {
     private int length;
     public IntegerHeaderFrameDecoder() {
       // Set the initial state.
       super(MyDecoderState.READ_LENGTH);
     }
      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
       switch (state()) {
       case READ_LENGTH:
         length = buf.readInt();
         checkpoint(MyDecoderState.READ_CONTENT);
       case READ_CONTENT:
         ByteBuf frame = buf.readBytes(length);
         checkpoint(MyDecoderState.READ_LENGTH);
         out.add(frame);
         break;
       default:
         throw new Error("Shouldn't reach here.");
       }
     }
   }


第二个问题是同一个实例的decode方法可能会被调用多次,如果我们在ReplayingDecoder中有私有变量的话,则需要考虑对这个私有变量的清洗工作,避免多次调用造成的数据污染。


总结



通过继承上面的几个类,我们就可以自己实现编码和解码的逻辑了。但是好像还有点问题,自定义编码和解码器是不是太复杂了?还需要判断要读取的byte数组的大小。有没有更加简单的方法呢?

相关文章
|
8月前
|
移动开发 编解码 Java
Netty编码器和解码器
Netty从底层Java通道读到ByteBuf二进制数据,传入Netty通道的流水线,随后开始入站处理。在入站处理过程中,需要将ByteBuf二进制类型解码成Java POJO对象。这个解码过程可以通过Netty的Decoder解码器去完成。在出站处理过程中,业务处理后的结果需要从某个Java POJO对象编码为最终的ByteBuf二进制数据,然后通过底层 Java通道发送到对端。在编码过程中,需要用到Netty的Encoder编码器去完成数据的编码工作。
|
5月前
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
281 2
|
8月前
|
Java Spring
Springboot整合Netty,自定义协议实现
以上就是在Spring Boot中整合Netty并实现自定义协议的基本步骤。你需要根据你的自定义协议的具体需求,来实现你的编码器、解码器和处理器。
302 0
|
8月前
|
移动开发 网络协议 Java
通信密码学:探秘Netty中解码器的神奇力量
通信密码学:探秘Netty中解码器的神奇力量
235 0
|
8月前
|
编解码
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
127 0
|
8月前
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
179 0
|
8月前
|
编解码 安全 前端开发
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
279 0
|
8月前
|
存储 编解码 Java
Netty使用篇:自定义编解码器
Netty使用篇:自定义编解码器
|
8月前
|
设计模式 JSON 编解码
Netty使用篇:编解码器
Netty使用篇:编解码器
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13549 1