04、Netty学习笔记—(黏包半包及协议设计解析)(二)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 04、Netty学习笔记—(黏包半包及协议设计解析)(二)

二、协议设计与解析


TCP/IP 中消息传输基于流的方式,没有边界。


协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则。例如HTTP协议、redis通信协议、websocket协议等等。


如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用


定长字节表示内容长度 + 实际内容


2.1、redis协议示例


介绍


redis对于整个命令会看成一个数组。


例:set key value


//举例:set name changlu   //下面每个命令都由一个回车符、换行符分割 字节对应13,10
*3
$3
set
$4
name 
$7
changlu


*3:首先需要让你发送数组的长度 *表示的是命令的数量,3则是命令组成的长度。

$3:$表示的是某个命令参数的长度,3表示该命令参数长度为3。

每个命令参数都由\r\n来进行分割

案例


案例目的:使用redis协议模拟与redis服务端进行通信,执行一条set、get命令。


import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
/**
 * @ClassName Test
 * @Author ChangLu
 * @Date 2022/1/9 13:32
 * @Description 模拟Redis客户端来向redis服务端发送一条命令
 */
@Slf4j
public class Test {
    public static void main(String[] args) throws InterruptedException {
        byte[] LINE = {13, 10};//两个字节表示回车,换行
        NioEventLoopGroup group = new NioEventLoopGroup();
        Channel channel = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelActive(ChannelHandlerContext ctx){
                                //向redis服务端发送一个写指令:set name changlu
                                set(ctx);
                                //向redis服务端发送一个读指令:get name
                                get(ctx);
                            }
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug("收到 {} , 消息为:{}", ctx.channel(), buf.toString(Charset.defaultCharset()));
                                super.channelRead(ctx, msg);
                            }
                            //执行set命令:set name changlu
                            private void set(ChannelHandlerContext ctx){
                                final ByteBuf buffer = ctx.alloc().buffer();
                                buffer.writeBytes("*3".getBytes());
                                buffer.writeBytes(LINE);
                                buffer.writeBytes("$3".getBytes());
                                buffer.writeBytes(LINE);
                                buffer.writeBytes("set".getBytes());
                                buffer.writeBytes(LINE);
                                buffer.writeBytes("$4".getBytes());
                                buffer.writeBytes(LINE);
                                buffer.writeBytes("name".getBytes());
                                buffer.writeBytes(LINE);
                                buffer.writeBytes("$7".getBytes());
                                buffer.writeBytes(LINE);
                                buffer.writeBytes("changlu".getBytes());
                                buffer.writeBytes(LINE);
                                ctx.writeAndFlush(buffer);
                            }
                            //执行get命令:get name
                            private void get(ChannelHandlerContext ctx){
                                final ByteBuf buffer = ctx.alloc().buffer();
                                buffer.writeBytes("*2".getBytes());
                                buffer.writeBytes(LINE);
                                buffer.writeBytes("$3".getBytes());
                                buffer.writeBytes(LINE);
                                buffer.writeBytes("get".getBytes());
                                buffer.writeBytes(LINE);
                                buffer.writeBytes("$4".getBytes());
                                buffer.writeBytes(LINE);
                                buffer.writeBytes("name".getBytes());
                                buffer.writeBytes(LINE);
                                ctx.writeAndFlush(buffer);
                            }
                        });
                    }
                }).connect("127.0.0.1", 6379).sync().channel();
        log.debug("客户端连接成功:{}", channel);
        channel.closeFuture().addListener(future -> {
            group.shutdownGracefully();
        });
    }
}



效果:





2.2、HTTP协议示例


2.2.1、认识HttpServerCodec


HttpServerCodec:是一个编解码处理器,处理入站、出站处理器。入站处理器会对http请求进行解码


//CombinedChannelDuplexHandler组合其他两个handler,分别是InBound和OutBound 编解码处理器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
        implements HttpServerUpgradeHandler.SourceCodec {


使用方式:


ch.pipeline().addLast(new HttpServerCodec());


浏览器发送一次请求(无论什么方法请求)实际上会解析成两部分:


若是我们重写channelRead方法,那么一个http请求就会走两次该handler方法,每次执行方法其中的Object msg分别为不同部分的解析对象


DefaultHttpRequest:解析出来请求行和请求头。

LastHttpContent$1:表示请求体。(即便是get请求,请求体内容为空也会专门解析一个请求体对象)

情况一:若是想区分请求头、请求行走的handler那么就需要写一个简单的判断:


ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //DefaultHttpRequest实现了HttpRequest接口
        if (msg instanceof HttpRequest){
            System.out.println("请求行、头");
        //LastHttpContent实现了HttpContent接口
        }else if (msg instanceof HttpContent){
            System.out.println("请求体");
        }
        super.channelRead(ctx, msg);
    }
});



情况二:若是我们只对某个特定类型感兴趣的话,例如只对解析出来的DefaultHttpRequest请求体对象感兴趣,可以实现一个SimpleChannelInboundHandler


//②若是只对HTTP请求的请求头感兴趣,那么实现SimpleChannelInboundHandler实例,指明感兴趣的请求对象为HttpRequest(实际就是DefaultHttpRequest)
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
        log.debug("解析对象类型:{}", msg.getClass());
        log.debug(msg.uri());
        //进行响应返回
        //①构建响应对象
        final DefaultFullHttpResponse response =
            new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
        // 响应内容
        final byte[] content = "<h1>Hello,world!</h1>".getBytes();
        //设置响应头:content-length:内容长度。不设置的话浏览器就不能够知道确切的响应内容大小则会造成一直没有处理完的现象
        response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, content.length);
        response.content().writeBytes(content);
        //②写会响应
        ctx.writeAndFlush(response);
    }
});



效果:



若是不设置请求头content-length指明内容长度就会出现下面问题:hello,world是能够正常显示出来的





应该是由于没有指定Content-length,浏览器不能够保证当前内容是全部内容,就会出现一直转圈圈等待的效果,并且响应结果也不会出现。


若是设置了的话就不会出现转圈圈现象以及检查response也会有内容:





2.2.2、案例


案例目的:服务器响应http请求并返回helloworld标签对。


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import lombok.extern.slf4j.Slf4j;
/**
 * @ClassName Test
 * @Author ChangLu
 * @Date 2022/1/9 14:01
 * @Description HTTP协议示例:测试协议的编解码
 */
@Slf4j
public class Test {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup(2);
        new ServerBootstrap()
            .group(boss, worker)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    //                        ch.pipeline().addLast(new LoggingHandler());
                    //netty提供的对HTTP协议编解码处理器类
                    ch.pipeline().addLast(new HttpServerCodec());
                    //②若是只对HTTP请求的请求头感兴趣,那么实现SimpleChannelInboundHandler实例,指明感兴趣的请求对象为HttpRequest(实际就是DefaultHttpRequest)
                    ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
                            log.debug("解析对象类型:{}", msg.getClass());
                            log.debug(msg.uri());
                            //进行响应返回
                            //①构建响应对象
                            final DefaultFullHttpResponse response =
                                new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
                            // 响应内容
                            final byte[] content = "<h1>Hello,world!</h1>".getBytes();
                            //设置响应头:content-length:内容长度。不设置的话浏览器就不能够知道确切的响应内容大小则会造成一直没有处理完的现象
                            response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, content.length);
                            response.content().writeBytes(content);
                            //②写会响应
                            ctx.writeAndFlush(response);
                        }
                    });
                    //①对于HTTP一次请求会解析成两个对象,每个对象会走一次channelRead方法(也就是说该方法会执行两次,每次得到的msg是一个对象)
                    //                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    //                            @Override
                    //                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                                    log.debug("msg.getClass() => {}", msg.getClass());
                    //                                if (msg instanceof HttpRequest){
                    //                                    System.out.println("请求行、头");
                    //                                }else if (msg instanceof HttpContent){
                    //                                    System.out.println("请求体");
                    //                                }
                    //                                super.channelRead(ctx, msg);
                    //                            }
                    //                        });
                }
            }).bind(8080);
        log.debug("服务器启动成功!");
    }
}



效果:




2.3、自定义协议


2.3.1、自定义协议要求


下面是自定义协议案例的协议内容:


魔数(四个字节):用来在第一时间判定是否是无效数据包
版本号(一个字节):可以支持协议的升级
序列化算法(一个字节):消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
指令类型(一个字节):是登录、注册、单聊、群聊... 跟业务相关
请求序号(四个字节):为了双工通信,提供异步能力
对齐填充(一个字节):除正文长度和消息正文凑满16个字节,也就是2的n次方
正文长度(四个字节):正文的长度
消息正文:正文内容(根据序列化算法进行序列化成字节)


魔数:一般发送的头几个都是数字都是魔数,例如Java的二进制字节码的起始8个字节就是魔数。

版本号:一般用于支持协议的升级,若是协议升级可能其中的字段或者其他内容有所更改。

序列化算法:一般指的是消息正文,对消息正文采用特殊的格式。JDK的话缺点就是不能跨平台;谷歌出品的protobuf,其与hession都是二进制的,其可读性不好,但是字节数占用最少性能更高。

指令类型:消息是个什么类型,这与业务相关。

请求序号:例如发送是1 2 3,但是由于网络或其他因素受到的时候不是1 2 3顺序。

正文长度:通过正文长度知道正文接下来需要读取多少字节。


2.3.2、自定义消息对象(编解码器、消息抽象类、具体消息类)



Message:消息抽象类,定义了消息相关的一些字段内容。

LoginRequestMessage:一条业务消息,实现了Message抽象类,是登陆请求消息的抽象。

MessageCodec:实现了ByteToMessageCodec执行器,需要传入一个泛型,该泛型就是你要将Bytebuf转换的对象,并且其中需要你重写编解码方法,也就是解析、封装你自定义的一些协议。


Message:
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Data
public abstract class Message implements Serializable {
    /**
     * 根据消息类型字节,获得对应的消息 class
     * @param messageType 消息类型字节
     * @return 消息 class
     */
    public static Class<? extends Message> getMessageClass(int messageType) {
        return messageClasses.get(messageType);
    }
    private int sequenceId;
    private int messageType;
    public abstract int getMessageType();
    public static final int LoginRequestMessage = 0;
    private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
}



LoginRequestMessage:
import lombok.Data;
import lombok.ToString;
/**
 * @ClassName OwnMessage
 * @Author ChangLu
 * @Date 2022/1/9 18:07
 * @Description 登陆请求消息类
 */
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message{
    private String username;
    private String password;
    public LoginRequestMessage() {
    }
    public LoginRequestMessage(String username, String password) {
        this.username = username;
        this.password = password;
    }
    @Override
    public int getMessageType() {
        return LoginRequestMessage;
    }
}


MessageCodec:实现了对自定义协议的编解码


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
/**
 * @ClassName MessageCodec
 * @Author ChangLu
 * @Date 2022/1/9 15:27
 * @Description 实现ByteToMessageCodec:将ByteBuf转为指定的一个对象类型(可自己指定)
 */
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
    /**
     * 出站前将封装好的Message对象写入到ByteBuf中
     * @param ctx
     * @param msg 封装好的消息对象
     * @param out 写入到消息对象中
     * @throws Exception
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        //1、4个字节的魔数
        out.writeBytes(new byte[]{1,2,3,4});
        //2、1个字节版本号:1 表示版本1
        out.writeByte(1);
        //3、1个字节序列化算法:0 jdk;1 json
        out.writeByte(0);
        //4、1个字节指令类型:在Message对象中定义
        out.writeByte(msg.getMessageType());
        //5、4个字节:表示请求序号
        out.writeInt(msg.getSequenceId());
        //获取内容的字节数组(默认直接采用JDK对象序列化方式)
        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
        final ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeObject(msg);
        final byte[] data = baos.toByteArray();
        //(额外):为了满足2的N次方倍,要再加入一个字节凑满16个字节(除实际内容)
        // 仅仅目的是为了对齐填充
        out.writeByte(0xff);
        //6、4个字节length内容长度
        out.writeInt(data.length);
        //7、写入内容
        out.writeBytes(data);
    }
    /**
     * 进行解码:之前怎么封装的就怎么取
     * @param ctx
     * @param in
     * @param out
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        final int magicNum = in.readInt();//魔术字
        final byte version = in.readByte();//版本号
        final byte serializerType = in.readByte();//序列号
        final byte messageType = in.readByte();//消息类型
        final int sequencedId = in.readInt();//请求序号
        in.readByte();//填充号
        final int length = in.readInt();//内容长度
        final byte[] data = new byte[length];
//        in.readBytes(data, 0, length);//内容(字节数组)
        in.readBytes(data, 0, in.readableBytes());//内容(字节数组)
        Message message = null;
        //进行jdk序列化(字节数组转为对象)
        if (serializerType == 0) {
            final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
            message = (Message) ois.readObject();
        }
        out.add(message);
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, messageType, sequencedId);
        log.debug("{}", message);
    }
}




2.3.3、案例测试(自定义编解码测试、半包问题出现及解决)


案例1:自定义编解码器测试


/**
     * 案例1:自定义编解码器测试
     */
public static void test01() throws Exception {
    final EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(), new MessageCodec());
    //入站方法测试(编码):encode()
    final LoginRequestMessage message = new LoginRequestMessage("changlu", "123456");
    channel.writeOutbound(message);
    //出站方法测试(解码):decode
    final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    new MessageCodec().encode(null, message, buffer);//根据协议来进行编码到ByteBuf中
    final ArrayList<Object> list = new ArrayList<>();
    new MessageCodec().decode(null, buffer, list);//之后对按照协议进行编码的ByteBuf进行解码取得一系列参数及对象
    log.debug("解码得到的对象是:{}", list.get(0));
}



对于取出来的四个字节魔术字为16909060,原因是当时存储是一个一个字节存的1 2 3 4,有对应一个字节为8位,取得时候是直接取四个字节转为整数如下,那么整体就得到值为16909060:



案例2:解码出现半包问题及解决方案


半包问题出现原因:若是我们将一个编码过后的ByteBuf分为两个包来入站,那么每发一个包就会走一个decode()也就是解码方法,那么此时可以肯定的是由于包没有发完整,序列化字符串肯定也不完整,那么此时进行解序列化肯定就会报错出现异常!


解决半包思路:我们可以使用LTC解码器来进行解决,按照指定的长度规则来进行解码,那么之前半包会走两次handler再使用了解码器之后,由于半包不完整就会进行等待继续接收包,直到取到完整的包才会走handler那么此时执行decode解码自然不会出现序列化问题!


/**
     * 案例2:解码出现半包问题及解决方案,这里仅演示解码情况
     *  问题描述:若是出现半包问题,那么可能就会出现接解析序列化异常!
     *  解决方案:使用LTC(基于长度的帧解码器)来解决半包、黏包问题。
     */
public static void test02() throws Exception {
    final EmbeddedChannel channel = new EmbeddedChannel(
        new LoggingHandler(),
        //使用LTC解码器 12就是用于找到确认正文长度的字节数  4则是表示正文长度数字的字节数
        new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
        new MessageCodec()   //自定义的编解码器
    );
    final LoginRequestMessage message = new LoginRequestMessage("changlu", "123456");
    final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    new MessageCodec().encode(null, message, buffer);//根据协议来进行编码到ByteBuf中
    //进行切片将数据切分为两块(问题产生源头:此时就会出现半包问题,那么接序列化就会失败)
    final ByteBuf firBuf = buffer.slice(0, 100);
    final ByteBuf secBuf = buffer.slice(100, buffer.readableBytes() - 100);
    final ArrayList<Object> list = new ArrayList<>();
    buffer.retain();
    //模拟入站操作,此时就会执行decode方法
    channel.writeInbound(firBuf);//执行一次writeInbound实际上就会执行release()进行释放内存,由于这里切片所以为了避免释放,在此之前进行引用计数+1
    channel.writeInbound(secBuf);
}



上述对第10行进行注释时,就会出现解序列化对象失败异常:



对第10行不进行注释来使用解码器并指定长度字段:




2.4、@Sharable:可共享handler(引出+案例demo)


引出@sharable


我们之前编写代码时都会在初始化channel时添加执行器handler,添加的方式如下,我们可以看到每次都是添加到channel的pipeline管道对象中,那么就会有一个疑问:每个channel在创建之初都会添加新的handler到管道中吗?


//实际写服务端的handler
.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast(new LoggingHandler())
        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){})
    }
}
//EmbeddedChannel添加handler方式
final EmbeddedChannel channel = new EmbeddedChannel(
    new LoggingHandler(),
    new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
    new MessageCodec()   //自定义的编解码器
);



通过debug测试可以得出结论:若是我们直接在addLast()中添加新的handler,那么就是每初始化一个连接就会创建多个新的handler对象!


那么我们可以想到一个优化思路,既然如此那么为什么我们不提前直接new出来一个handler作为多个channel的共享执行器?对的确实可以,对于我们自定义的一些执行器我们可以清楚的知道有没有使用一些共享对象,有没有线程安全问题,而对于netty提供给我们的我们不能够确定,这时候nettty给了我们思路来看其提供的handler是否是可共享、是线程安全的:实际上就是@Sharable。


结论:如果netty提供给我们的handler是可共享的,就会标注该类为@Sharable,我们可以放心大胆的创建一个公共handler来被多个channel共同使用!


对于LoggingHandler是有该注解的,表示是可共享的!


初使用


先学现卖,对于我们在2.3.2中自定义的编解码器就不会有线程安全问题,那么其就可以标注为@Sharable,这样其他人使用我们定义的handler也可以放心使用!


@ChannelHandler.Sharable
public class MessageCodec extends ByteToMessageCodec<Message> {


问题描述:ByteToMessageCodec这个类在初始化方法中有检测该注解的逻辑代码,一旦其实现类有@Sharable就会直接抛出异常:



分析:netty框架设计者在设计ByteToMessageCodec类时设定就是实现该类的一些编解码器正常来说都会使用到一个公共变量存储及操作,其认为本该是不可共享的,如下代码注释说明,该类不允许被标注



解决方案:那么若是我们自定义的编解码器确实没有使用到公共变量,我们想将其标注为@Sharable呢?可以将原本实现ByteToMessageCodec更改为MessageToMessageCodec


import com.changlu.No4Netty进阶.No2协议设计与解析.No1常用协议示例.No3自定义协议示例.Test.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import java.util.List;
/**
 * @ClassName MessageCodec2
 * @Author ChangLu
 * @Date 2022/1/9 23:30
 * @Description 可标注共享注解@ChannelHandler.Sharable的编解码器抽象类MessageToMessageCodec
 */
@ChannelHandler.Sharable   //可以看到这里的话泛型有两个,分别用于编解码
public class MessageCodec extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
        //...
    }
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        //...
    }
}


这里提供2.3.2中编解码器使用@ChannelHandler.Sharable示例:


import com.changlu.No4Netty进阶.No2协议设计与解析.No1常用协议示例.No3自定义协议示例.Test.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
/**
 * @ClassName MessageCodec2
 * @Author ChangLu
 * @Date 2022/1/9 23:30
 * @Description 可标注共享注解@ChannelHandler.Sharable的编解码器抽象类MessageToMessageCodec
 */
@Slf4j
@ChannelHandler.Sharable
public class MessageCodec2 extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
        final ByteBuf buffer = ctx.alloc().buffer();
        //1、4个字节的魔数
        buffer.writeBytes(new byte[]{1,2,3,4});
        //2、1个字节版本号:1 表示版本1
        buffer.writeByte(1);
        //3、1个字节序列化算法:0 jdk;1 json
        buffer.writeByte(0);
        //4、1个字节指令类型:在Message对象中定义
        buffer.writeByte(msg.getMessageType());
        //5、4个字节:表示请求序号
        buffer.writeInt(msg.getSequenceId());
        //获取内容的字节数组(默认直接采用JDK对象序列化方式)
        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
        final ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeObject(msg);
        final byte[] data = baos.toByteArray();
        //(额外):为了满足2的N次方倍,要再加入一个字节凑满16个字节(除实际内容)
        // 仅仅目的是为了对齐填充
        buffer.writeByte(0xff);
        //6、4个字节length内容长度
        buffer.writeInt(data.length);
        //7、写入内容
        buffer.writeBytes(data);
        out.add(buffer);
    }
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        final int magicNum = in.readInt();//魔术字
        final byte version = in.readByte();//版本号
        final byte serializerType = in.readByte();//序列号
        final byte messageType = in.readByte();//消息类型
        final int sequencedId = in.readInt();//请求序号
        in.readByte();//填充号
        final int length = in.readInt();//内容长度
        final byte[] data = new byte[length];
//        in.readBytes(data, 0, length);//内容(字节数组)
        in.readBytes(data, 0, in.readableBytes());//内容(字节数组)
        Message message = null;
        //进行jdk序列化(字节数组转为对象)
        if (serializerType == 0) {
            final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
            message = (Message) ois.readObject();
        }
        out.add(message);
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, messageType, sequencedId);
        log.debug("{}", message);
    }
}



最后再附上引子引出案例的改进代码:此时之后每来临一个channel就都直接该指定的一个执行器


//抽离出来logginghandler
final LoggingHandler loggingHandler = new LoggingHandler();
//...
.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast(loggingHandler)
        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){})
    }
}
//对于EmbeddedChannel也是同理
final LoggingHandler loggingHandler = new LoggingHandler();
final MessageCodec2 messageCodec2 = new MessageCodec2();
final EmbeddedChannel channel = new EmbeddedChannel(
    loggingHandler,
    new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
    messageCodec2 //自定义的编解码器
);
相关文章
|
21天前
|
XML JSON API
ServiceStack:不仅仅是一个高性能Web API和微服务框架,更是一站式解决方案——深入解析其多协议支持及简便开发流程,带您体验前所未有的.NET开发效率革命
【10月更文挑战第9天】ServiceStack 是一个高性能的 Web API 和微服务框架,支持 JSON、XML、CSV 等多种数据格式。它简化了 .NET 应用的开发流程,提供了直观的 RESTful 服务构建方式。ServiceStack 支持高并发请求和复杂业务逻辑,安装简单,通过 NuGet 包管理器即可快速集成。示例代码展示了如何创建一个返回当前日期的简单服务,包括定义请求和响应 DTO、实现服务逻辑、配置路由和宿主。ServiceStack 还支持 WebSocket、SignalR 等实时通信协议,具备自动验证、自动过滤器等丰富功能,适合快速搭建高性能、可扩展的服务端应用。
81 3
|
2月前
|
域名解析 存储 网络协议
深入解析网络通信关键要素:IP 协议、DNS 及相关技术
本文详细介绍了IP协议报头结构及其各字段的功能,包括版本、首部长度、服务类型、总长度、标识、片偏移、标志、生存时间(TTL)、协议、首部检验和等内容。此外,还探讨了IP地址的网段划分、特殊IP地址的应用场景,以及路由选择的大致流程。最后,文章简要介绍了DNS协议的作用及其发展历史,解释了域名解析系统的工作原理。
109 5
深入解析网络通信关键要素:IP 协议、DNS 及相关技术
http数据包抓包解析
http数据包抓包解析
|
2月前
|
前端开发 JavaScript 安全
深入解析 http 协议
HTTP(超文本传输协议)不仅用于传输文本,还支持图片、音频和视频等多种类型的数据。当前广泛使用的版本为 HTTP/1.1。HTTPS 可视为 HTTP 的安全增强版,主要区别在于添加了加密层。HTTP 请求和响应均遵循固定格式,包括请求行/状态行、请求/响应头、空行及消息主体。URL(统一资源定位符)用于标识网络上的资源,其格式包含协议、域名、路径等信息。此外,HTTP 报头提供了附加信息,帮助客户端和服务端更好地处理请求与响应。状态码则用于指示请求结果,如 200 表示成功,404 表示未找到,500 表示服务器内部错误等。
50 0
深入解析 http 协议
|
2月前
|
网络协议 网络虚拟化
接收网络包的过程——从硬件网卡解析到IP
【9月更文挑战第18天】这段内容详细描述了网络包接收过程中机制。当网络包触发中断后,内核处理完这批网络包,会进入主动轮询模式,持续处理后续到来的包,直至处理间隙返回其他任务,从而减少中断次数,提高处理效率。此机制涉及网卡驱动初始化时注册轮询函数,通过软中断触发后续处理,并逐步深入内核网络协议栈,最终到达TCP层。整个接收流程分为多个层次,包括DMA技术存入Ring Buffer、中断通知CPU、软中断处理、以及进入内核网络协议栈等多个步骤。
|
2月前
|
数据采集 存储 JSON
从零到一构建网络爬虫帝国:HTTP协议+Python requests库深度解析
在网络数据的海洋中,网络爬虫遵循HTTP协议,穿梭于互联网各处,收集宝贵信息。本文将从零开始,使用Python的requests库,深入解析HTTP协议,助你构建自己的网络爬虫帝国。首先介绍HTTP协议基础,包括请求与响应结构;然后详细介绍requests库的安装与使用,演示如何发送GET和POST请求并处理响应;最后概述爬虫构建流程及挑战,帮助你逐步掌握核心技术,畅游数据海洋。
64 3
http数据包抓包解析课程笔记
http数据包抓包解析课程笔记
|
2月前
|
消息中间件 安全 Kafka
Kafka支持SSL/TLS协议技术深度解析
SSL(Secure Socket Layer,安全套接层)及其继任者TLS(Transport Layer Security,传输层安全)是为网络通信提供安全及数据完整性的一种安全协议。这些协议在传输层对网络连接进行加密,确保数据在传输过程中不被窃取或篡改。
135 0
|
3月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
107 0
|
23天前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
58 0

推荐镜像

更多