Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器

简介: Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器


概述


Pre

Netty Review - 借助SimpleTalkRoom初体验异步网络编程的魅力

Netty Review - 优化Netty通信:如何应对粘包和拆包挑战 中我们遗留了一个内容

今天我们就通过自定义长度分包解码器来解决粘包拆包的问题


概述

在Netty中,自定义长度分包编解码器通常涉及到两个组件:

  • 一个用于编码的MessageToByteEncoder
  • 另一个用于解码的ByteToMessageDecoder

Code

核心思路:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束。

服务器端程序如下,其目的是创建一个服务,该服务器监听1234端口,并使用自定义的编解码器处理接收到的消息。

package com.artisan.pack_custom_codec;
import com.artisan.pack_custom_codec.codec.CustomMessageDecoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ArtisanServer {
    public static void main(String[] args) throws InterruptedException {
        // 主事件循环组,用于接受进来的连接,这里只设置了1个线程
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 从事件循环组,用于处理已接受连接的IO操作,这里设置了8个线程
        EventLoopGroup workerGroup = new NioEventLoopGroup(8);
        try {
            // 创建服务器启动类
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 配置事件循环组
            bootstrap.group(bossGroup, workerGroup)
                    // 使用NioServerSocketChannel作为服务器通道
                    .channel(NioServerSocketChannel.class)
                    // 设置一个选项,用于配置服务器同步接受连接的backlog大小
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // 配置子通道初始化处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 获取通道的管道
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加自定义的解码器
                            pipeline.addLast("customDecoder",new CustomMessageDecoder());
                            // 添加业务处理handler
                            pipeline.addLast(new ArtisanServerHandler());
                        }
                    });
            // 绑定端口并同步等待成功,打印成功启动信息
            ChannelFuture channelFuture = bootstrap.bind(1234).sync();
            System.out.println("Talk Room Server启动成功,监听1234端口");
            // 关闭通道
            // 这里的关闭是等待服务器socket关闭,通常在服务器关闭时调用
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭主从事件循环组,释放资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

代码中主要涉及以下部分:

  • 服务器端的启动类 ArtisanServer
  • NioEventLoopGroup 的使用,用于接受和处理网络事件。
  • ServerBootstrap 的配置,用于设置服务器参数。
  • CustomMessageDecoder 的使用,用于自定义消息的编解码方式。
  • ArtisanServerHandler 的添加,用于处理具体的业务逻辑。
  • 端口绑定和通道关闭的操作。

自定义协议

package com.artisan.pack_custom_codec.protocol;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 * @description: 自定义协议
 */
public class CustomMessageProtocol {
    /**
     * 定义一次发送包体长度
     */
    private int len;
    /**
     * 一次发送包体内容
     */
    private byte[] content;
    public int getLen() {
        return len;
    }
    public void setLen(int len) {
        this.len = len;
    }
    public byte[] getContent() {
        return content;
    }
    public void setContent(byte[] content) {
        this.content = content;
    }
}

自定义解码器

server端接收网卡过来的消息,入站事件,自然是要解码,将网络传输的二进制数据转换为对象

package com.artisan.pack_custom_codec.codec;
import com.artisan.pack_custom_codec.protocol.CustomMessageProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomMessageDecoder extends ByteToMessageDecoder {
    int length = 0;
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println();
        System.out.println("CustomMessageDecoder decode 被调用");
        //需要将得到二进制字节码-> CustomMessageDecoder 数据包(对象)
        System.out.println(in);
        // int 占 4个字节 (int 类型占用4个字节,即32位)
        if (in.readableBytes() >= 4) {
            if (length == 0) {
                length = in.readInt();
            }
            if (in.readableBytes() < length) {
                System.out.println("当前可读数据不够,继续等待。。");
                return;
            }
            byte[] content = new byte[length];
            if (in.readableBytes() >= length) {
                in.readBytes(content);
                //封装成CustomMessageDecoder对象,传递到下一个handler业务处理
                CustomMessageProtocol messageProtocol = new CustomMessageProtocol();
                messageProtocol.setLen(length);
                messageProtocol.setContent(content);
                out.add(messageProtocol);
            }
            length = 0;
        }
    }
}

服务端的消息处理

package com.artisan.pack_custom_codec;
import com.artisan.pack_custom_codec.protocol.CustomMessageProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ArtisanServerHandler extends SimpleChannelInboundHandler<CustomMessageProtocol> {
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, CustomMessageProtocol msg) throws Exception {
        System.out.println("====服务端接收到消息如下====");
        System.out.println("长度=" + msg.getLen());
        System.out.println("内容=" + new String(msg.getContent(), CharsetUtil.UTF_8));
        System.out.println("服务端接收到消息包数量=" + (++this.count));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

接下来继续看看客户端该如何处理 ?

客户端启动类

package com.artisan.pack_custom_codec;
import com.artisan.pack_custom_codec.codec.CustomMessageEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Scanner;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ArtisanClient {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 自定义长度分包编码器
                            pipeline.addLast("customEncoder",new CustomMessageEncoder());
                            //加入自己的业务处理handler
                            pipeline.addLast(new ArtisanClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync();
            //得到 channel
            Channel channel = channelFuture.channel();
            System.out.println("========" + channel.localAddress() + "========");
            //客户端需要输入信息, 创建一个扫描器
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                //通过 channel 发送到服务器端
                channel.writeAndFlush(msg);
            }
            // 阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

这段代码定义了一个名为CustomMessageEncoder的Netty编解码器,用于将自定义协议的消息编码为字节流。下面是代码的详细解释:

  1. CustomMessageEncoder类继承自MessageToByteEncoder,这意味着它将被用于编码CustomMessageProtocol类型的消息。
  2. encode方法在需要将消息编码为字节流时调用。在这个方法中,首先打印了一条消息,表明encode方法被调用了。然后,它将消息的长度写入到输出缓冲区out中,接着将消息内容写入到输出缓冲区。

这个编解码器的主要作用是将自定义协议的消息转换为字节流,以便可以在网络上传输。它首先写入消息的长度,然后写入消息的内容,这样接收方就可以根据长度来解析消息的内容。


自定义编码器

package com.artisan.pack_custom_codec.codec;
import com.artisan.pack_custom_codec.protocol.CustomMessageProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomMessageEncoder extends MessageToByteEncoder<CustomMessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, CustomMessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("CustomMessageEncoder encode called~");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}

客户端发数据到网卡,出站事件 , 需要编码。


客户端业务处理Handler

package com.artisan.pack_custom_codec;
import com.artisan.pack_custom_codec.protocol.CustomMessageProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ArtisanClientHandler extends SimpleChannelInboundHandler<CustomMessageProtocol> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 2; i++) {
            String msg = "小工匠的Netty Review之旅";
            //创建协议包对象
            CustomMessageProtocol customMessageProtocol = new CustomMessageProtocol();
            customMessageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length);
            customMessageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8));
            ctx.writeAndFlush(customMessageProtocol);
        }
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, CustomMessageProtocol msg) throws Exception {
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

这段代码定义了一个名为ArtisanClientHandler的Netty处理器,用于处理自定义协议的消息。下面是代码的详细解释:

  1. ArtisanClientHandler类继承自SimpleChannelInboundHandler,这意味着它将被用于处理CustomMessageProtocol类型的消息。
  2. channelActive方法在Netty通道激活时调用。在这个方法中,代码循环两次,发送一个包含特定字符串的消息。每次循环,它都会创建一个CustomMessageProtocol对象,设置消息长度,并填充内容,然后通过ctx.writeAndFlush方法将消息写入通道。
  3. channelRead0方法在接收到消息时调用。在这个例子中,该方法为空,没有实现任何功能。
  4. exceptionCaught方法在捕获到异常时调用。在这个方法中,它打印了异常的堆栈跟踪,并关闭了通道。

测试

启动server 和 client


相关文章
|
4月前
|
编解码 网络协议 开发者
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
|
4月前
|
前端开发 网络协议
Netty实战巅峰:从零构建高性能IM即时通讯系统,解锁并发通信新境界
【8月更文挑战第3天】Netty是一款高性能、异步事件驱动的网络框架,适用于开发高并发网络应用,如即时通讯(IM)系统。本文将指导你利用Netty从零构建高性能IM程序,介绍Netty基础及服务器/客户端设计。服务器端使用`ServerBootstrap`启动,客户端通过`Bootstrap`连接服务器。示例展示了简单的服务器启动过程。通过深入学习,可进一步实现用户认证等功能,打造出更完善的IM系统。
188 1
|
4月前
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
241 2
|
6月前
|
网络协议
netty粘包问题分析
netty粘包问题分析
47 0
|
6月前
|
Java
Netty传输object并解决粘包拆包问题
Netty传输object并解决粘包拆包问题
56 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13521 1
|
7月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
140 1
|
NoSQL Java Redis
跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。 原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
175 1
|
7月前
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
170 0
|
7月前
|
编解码 安全 前端开发
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
263 0