Socket粘包问题终极解决方案—Netty版(2W字)!(10)

简介: Socket粘包问题终极解决方案—Netty版(2W字)!(10)

完整的服务器端和客户端的实现代码如下:


import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
 * 通过封装 Netty 来解决粘包
 */
public class NettyExample {
    // 定义服务器的端口号
    static final int PORT = 8007;
    /**
     * 服务器端
     */
    static class MyNettyServer {
        public static void main(String[] args) {
            // 创建一个线程组,用来负责接收客户端连接
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            // 创建另一个线程组,用来负责 I/O 的读写
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                // 创建一个 Server 实例(可理解为 Netty 的入门类)
                ServerBootstrap b = new ServerBootstrap();
                // 将两个线程池设置到 Server 实例
                b.group(bossGroup, workerGroup)
                        // 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器)
                        .channel(NioServerSocketChannel.class)
                        // 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类)
                        .childHandler(new NettyExample.ServerInitializer());
                // 绑定端口并且进行同步
                ChannelFuture future = b.bind(PORT).sync();
                // 对关闭通道进行监听
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 资源关闭
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    /**
     * 服务端通道初始化
     */
    static class ServerInitializer extends ChannelInitializer<SocketChannel> {
        // 字符串编码器和解码器
        private static final StringDecoder DECODER = new StringDecoder();
        private static final StringEncoder ENCODER = new StringEncoder();
        // 服务器端连接之后的执行器(自定义的类)
        private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();
        /**
         * 初始化通道的具体执行方法
         */
        @Override
        public void initChannel(SocketChannel ch) {
            // 通道 Channel 设置
            ChannelPipeline pipeline = ch.pipeline();
            // 消息解码:读取消息头和消息体
            pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
            // 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度
            pipeline.addLast(new LengthFieldPrepender(4));
            // 设置(字符串)编码器和解码器
            pipeline.addLast(DECODER);
            pipeline.addLast(ENCODER);
            // 服务器端连接之后的执行器,接收到消息之后的业务处理
            pipeline.addLast(SERVER_HANDLER);
        }
    }
    /**
     * 服务器端接收到消息之后的业务处理类
     */
    static class ServerHandler extends SimpleChannelInboundHandler<String> {
        /**
         * 读取到客户端的消息
         */
        @Override
        public void channelRead0(ChannelHandlerContext ctx, String request) {
            if (!request.isEmpty()) {
                System.out.println("接到客户端的消息:" + request);
            }
        }
        /**
         * 数据读取完毕
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }
        /**
         * 异常处理,打印异常并关闭通道
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
    /**
     * 客户端
     */
    static class MyNettyClient {
        public static void main(String[] args) {
            // 创建事件循环线程组(客户端的线程组只有一个)
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                // Netty 客户端启动对象
                Bootstrap b = new Bootstrap();
                // 设置启动参数
                b.group(group)
                        // 设置通道类型
                        .channel(NioSocketChannel.class)
                        // 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类)
                        .handler(new NettyExample.ClientInitializer());
                // 连接服务器端并同步通道
                Channel ch = b.connect("127.0.0.1", PORT).sync().channel();
                // 发送消息
                ChannelFuture lastWriteFuture = null;
                // 给服务器端发送 10 条消息
                for (int i = 0; i < 10; i++) {
                    // 发送给服务器消息
                    lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");
                }
                // 在关闭通道之前,同步刷新所有的消息
                if (lastWriteFuture != null) {
                    lastWriteFuture.sync();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 释放资源
                group.shutdownGracefully();
            }
        }
    }
    /**
     * 客户端通道初始化类
     */
    static class ClientInitializer extends ChannelInitializer<SocketChannel> {
        // 字符串编码器和解码器
        private static final StringDecoder DECODER = new StringDecoder();
        private static final StringEncoder ENCODER = new StringEncoder();
        // 客户端连接成功之后业务处理
        private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();
        /**
         * 初始化客户端通道
         */
        @Override
        public void initChannel(SocketChannel ch) {
            ChannelPipeline pipeline = ch.pipeline();
            // 消息解码:读取消息头和消息体
            pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
            // 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度
            pipeline.addLast(new LengthFieldPrepender(4));
            // 设置(字符串)编码器和解码器
            pipeline.addLast(DECODER);
            pipeline.addLast(ENCODER);
            // 客户端连接成功之后的业务处理
            pipeline.addLast(CLIENT_HANDLER);
        }
    }
    /**
     * 客户端连接成功之后的业务处理
     */
    static class ClientHandler extends SimpleChannelInboundHandler<String> {
        /**
         * 读取到服务器端的消息
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) {
            System.err.println("接到服务器的消息:" + msg);
        }
        /**
         * 异常处理,打印异常并关闭通道
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
}
相关文章
|
5月前
|
网络协议
【Netty 网络通信】Socket 通信原理
【1月更文挑战第9天】【Netty 网络通信】Socket 通信原理
|
5月前
|
编解码 缓存 移动开发
TCP粘包/拆包与Netty解决方案
TCP粘包/拆包与Netty解决方案
89 0
|
2月前
|
编解码 网络协议 开发者
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
|
15天前
Netty 与硬件设备交互,下行命令时(服务对设备),如何等待设备响应,再进行业务操作解决方案
Netty 与硬件设备交互,下行命令时(服务对设备),如何等待设备响应,再进行业务操作解决方案
|
2月前
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
|
4月前
|
消息中间件 存储 网络协议
拼多多面试:Netty如何解决粘包问题?
粘包和拆包问题也叫做粘包和半包问题,**它是指在数据传输时,接收方未能正常读取到一条完整数据的情况(只读取了部分数据,或多读取到了另一条数据的情况)就叫做粘包或拆包问题。** 从严格意义上来说,粘包问题和拆包问题属于两个不同的问题,接下来我们分别来看。 ## 1.粘包问题 粘包问题是指在网络通信中,发送方连续发送的多个小数据包被接收方一次性接收的现象。这可能是因为底层传输层协议(如 TCP)会将多个小数据包合并成一个大的数据块进行传输,导致接收方在接收数据时一次性接收了多个数据包,造成粘连。 例如以下案例,正常情况下客户端发送了两条消息,分别为“ABC”和“DEF”,那么接收端也应该收到两
31 0
拼多多面试:Netty如何解决粘包问题?
|
4月前
|
网络协议
netty粘包问题分析
netty粘包问题分析
36 0
|
4月前
|
Java
Netty传输object并解决粘包拆包问题
Netty传输object并解决粘包拆包问题
41 0
|
4月前
|
Java
Netty中粘包拆包问题解决探讨
Netty中粘包拆包问题解决探讨
27 0
|
5月前
|
监控 网络协议 iOS开发
程序退到后台的时候,所有线程被挂起,系统回收所有的socket资源问题及解决方案
程序退到后台的时候,所有线程被挂起,系统回收所有的socket资源问题及解决方案
108 0