Netty基础篇:NIO中缓冲区设置太小

简介: Netty基础篇:NIO中缓冲区设置太小


B:缓冲区设置太小的问题
package com.suns.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
public class MyServer2 {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8000));
        serverSocketChannel.configureBlocking(false);//Selector 只有在非阻塞的情况下 才可以使用。
        //引入监管者
        Selector selector = Selector.open();//1. 工厂,2. 单例
        //监管者 管理谁? selector.xxxx(ssc); //管理者 ssc  ---> Accept
        SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);
        // selector监控 SSC ACCEPT
        // selector
        //   keys --> HashSet
        //  register注册 ssc
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
        System.out.println("MyServler2.main");
        //监控
        while (true) {
            selector.select();//等待.只有监控到了 有实际的连接 或者 读写操作 ,才会处理。
            //对应的 有ACCEPT状态的SSC 和 READ WRITE状态的 SC 存起来
            // SelectionsKeys HashSet
            System.out.println("-------------------------");
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {//ServerSocketChannel ScoketChannel
                SelectionKey key = iterator.next();
                //用完之后 就要把他从SelectedKeys集合中删除掉。问题? ServerScoketChannel---SelectedKeys删除 ,后续 SSC建立新的连接?
                iterator.remove();
                if (key.isAcceptable()) {
                    //serverSocketChannel.accept();
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    //监控sc状态 ---> keys
                    SelectionKey sckey = sc.register(selector, 0, null);
                    sckey.interestOps(SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    try {
                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(5);
                        int read = sc.read(buffer);
                        if (read == -1) {
                            key.cancel();
                        } else {
                            buffer.flip();
                            System.out.println("Charset.defaultCharset().decode(buffer).toString() = " + Charset.defaultCharset().decode(buffer).toString());
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();
                    }
                }
            }
        }
    }
}
package com.suns.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class MyClient {
    public static void main(String[] args) throws IOException {
        //连接服务端  端口号?
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(8000));
        //        socketChannel.write(Charset.defaultCharset().encode("hello\nsuns\n"));
        socketChannel.write(Charset.defaultCharset().encode("hellosuns\n"));
        System.out.println("------------------------------");
    }
}
//ByteBuffer buffer = ByteBuffer.allocate(7);
   //hellosuns\n
    private static void doLineSplit(ByteBuffer buffer) {
        buffer.flip();
        for (int i = 0; i < buffer.limit(); i++) {
            if (buffer.get(i) == '\n') {//hellosu
                int length = i + 1 - buffer.position();
                ByteBuffer target = ByteBuffer.allocate(length);
                for (int j = 0; j < length; j++) {
                    target.put(buffer.get());
                }
                //截取工作完成
                target.flip();
                System.out.println("StandardCharsets.UTF_8.decode(target).toString() = 
                " + StandardCharsets.UTF_8.decode(target).toString());
            }
        }
        buffer.compact();
    }
    //这种情况就是缓冲区太小,导致解析数据的时候,读取到ByteBuffer当中没有任何的\n,
    //导致后续的代码没发执行,compact方法执行了个寂寞。

所以需要有接下来的处理的处理:

doLineSplit(buffer);
      //以为缓冲区 不够了 没压懂 需要扩容了
      if (buffer.position() == buffer.limit()) {
         //1 空间扩大
          ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
         //2 老的缓冲区的数据 ---> 新的缓冲区
          buffer.flip();
          newBuffer.put(buffer);
          //3 channel -- byteBuffer 绑定 newBuffer
          key.attach(newBuffer);
      }

网络通信过程中采用处理半包粘包的范式,采用的方式是通过特殊字符进行分割\n,同时使用compact压缩

1:channel要和:byteBuffer绑定的问题,保证byte可以复用,而且不会丢失数据

2:bytebuffer容量不够,考虑扩容问题。库容之后,将旧的buyffer拷贝到新的buyffer当中,绑定新的buffer

注意:

1:bytebuffer不够了就要扩容,如果越扩越大,浪费了空间需要考虑缩容问题,毕竟当我们的客户端很大的时候,每一个客户端都会对应一个byteBuffer,如果只考虑扩容,不考虑缩容,那么会浪费计算机内存资源,最终导致内存不够,netty当中会帮我们解决这个问题。

开发当中需要把这个场景都考虑到位的话,会很麻烦

2:扩容问题:

老bytebuffer会考虑到新的bytebuffer当中,这个效率是非常低的,后续我们通过0拷贝的方式进行解决。现在我们的代码基本解决了半包粘包的问题,但是不完美,将来netty当中解决的很完美。

3:解决半包粘包的方式:

1:\n+compact \n区分了不同的信息,–保证了信息的完整性,这种方式需要不断的检索这个\n这个效率是非常低的,这个是一个线性的查找。下边这个好

2:头体分离: 一个信息分成两个部分,一个信息是头部分,记录的是元数据,记录信息的大小,体信息才是真正的数据本身,这个时候,我们结合这个大小。很快的可以处理掉后续的消息内容,这个是应用很广泛的,是很高效的。Http协议本身就是依赖于这种设计,他的头里边有一个Content-Length 基于这个属性,就可以去数据中直接读取到多大的数据。Netty如何解决半包和粘包,一定是采用这两种方式的一种。所有的协议都有头部分,说明这是一个好的设计,后续我们在自定义一种通信方式的时候也要遵循这种方式。

一个较为完美的服务器端代码:

package com.suns.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
/*
   缓冲区的扩容
     1.  如何判断 缓冲区 是否该扩容
     2.  怎么扩容 。
 */
public class MyServer4 {
    private static void doLineSplit(ByteBuffer buffer) {
        buffer.flip();
        for (int i = 0; i < buffer.limit(); i++) {
            if (buffer.get(i) == '\n') {//hellosu
                int length = i + 1 - buffer.position();
                ByteBuffer target = ByteBuffer.allocate(length);
                for (int j = 0; j < length; j++) {
                    target.put(buffer.get());
                }
                //截取工作完成
                target.flip();
                System.out.println("StandardCharsets.UTF_8.decode(target).toString() = " + StandardCharsets.UTF_8.decode(target).toString());
            }
        }
        buffer.compact();
    }
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8000));
        serverSocketChannel.configureBlocking(false);//Selector 只有在非阻塞的情况下 才可以使用。
        //引入监管者
        Selector selector = Selector.open();//1. 工厂,2. 单例
        //监管者 管理谁? selector.xxxx(ssc); //管理者 ssc  ---> Accept
        SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);
        // selector监控 SSC ACCEPT
        // selector
        //   keys --> HashSet
        //  register注册 ssc
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
        System.out.println("MyServler2.main");
        //监控
        while (true) {
            selector.select();//等待.只有监控到了 有实际的连接 或者 读写操作 ,才会处理。
            //对应的 有ACCEPT状态的SSC 和 READ WRITE状态的 SC 存起来
            // SelectionsKeys HashSet
            System.out.println("-------------------------");
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {//ServerSocketChannel ScoketChannel
                SelectionKey key = iterator.next();
                //用完之后 就要把他从SelectedKeys集合中删除掉。问题? ServerScoketChannel---SelectedKeys删除 ,后续 SSC建立新的连接?
                iterator.remove();
                if (key.isAcceptable()) {
                    //serverSocketChannel.accept();
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    //监控sc状态 ---> keyr
                    ByteBuffer buffer = ByteBuffer.allocate(7);
                    SelectionKey sckey = sc.register(selector, 0, buffer);
                    sckey.interestOps(SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    try {
                        SocketChannel sc = (SocketChannel) key.channel();    //ByteBuffer ---> s...
                        //ByteBuffer buffer = ByteBuffer.allocate(7);//创建一个ByteBuffer 新的ByteBuffer..
                        //从channel中获得 绑定的那个bytebuffer
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        int read = sc.read(buffer);
                        if (read == -1) {
                            key.cancel();
                        } else {
                            /*
                            buffer.flip();
                            System.out.println("Charset.defaultCharset().decode(buffer).toString() = " + Charset.defaultCharset().decode(buffer).toString());
                            */
                            doLineSplit(buffer);
                            //以为缓冲区 不够了 没压懂 需要扩容了
                            if (buffer.position() == buffer.limit()) {
                               //1 空间扩大
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
                               //2 老的缓冲区的数据 ---> 新的缓冲区
                                buffer.flip();
                                newBuffer.put(buffer);
                                //3 channel -- byteBuffer 绑定 newBuffer 把之前的覆盖掉。
                                key.attach(newBuffer);
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();
                    }
                }
            }
        }
          /*  如何保证 前后2次操作的ByteBuffer是同一个。
              ByteBuffer和谁有关的呢?---> channel 相关
              ByteBuffer和Channel绑定就行了
              怎么绑定 ?  channel.register(selector,0,attament)--> attarment 附件
          *
          * */
    }
}

TCP协议进行通讯的时候的两个问题需要注意:

1:分包的问题

(TCP协议)假如我们需要传递10TB的数据给到服务端,不会一次性直接传递过去,会打成十个包,一个接一个的发送数据,一个包多大呢?1460K实际上有可能比1460小,这个是基于双方的第三次握手的MSS值得较小值确定的。

2:流量控制的问题

我们发包的过程中,我们担心会出现发包的过程中不要太快,但是服务端来不及接收导致丢包的情况。甚至为了控制速率发一些空包。

总结:

当前我们做的是web服务,也就是Server,对于Server来讲有两个很重要的问题

1:如何接收请求的问题?这里我们使用的是NIO,基于SSC的accept()即可

2:数据的读写操作:

分析了Select()方法的特点,只要是数据没处理完,就必须的处理

1):数据没有一次性处理完,频繁调用select();

2):特殊情况,例如client的正常和不正常退出Selectkey.cancel();里边封装的是channel

数据的编解码操作:

1):编码:将String转成Byte数组

2):解码:将buffer当中的数据转成String

对应的就是encode()和decode方法

数据的完整性问题-避免出现半包或者粘包的情况。

TCP协议当中的分包问题:一个包的数据最大是1460B(字节bit)

TCP协议当中的流量控制问题:数据发的太快的话,服务端有可能来不及接收,导致丢包的情况。需要控制发包的速度。

相关文章
|
1月前
|
Java 应用服务中间件 API
从零手写实现 tomcat-06-servlet bio/thread/nio/netty 池化处理
该文介绍了逐步改进的网络服务器实现,从最初的 BIO 基础版到使用线程池的 BIO+Thread,再到 NIO 版本和 NIO+Thread,最后展示了一个使用 Netty 框架的简洁实现。文章旨在说明如何解决阻塞问题,并对比不同模型的优劣,最终推荐使用 Netty 以简化 NIO 编程。
|
1月前
|
编解码 网络协议 Java
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
|
1月前
|
移动开发 编解码 网络协议
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
|
1月前
|
设计模式 网络协议 Java
Java NIO 网络编程 | Netty前期知识(二)
Java NIO 网络编程 | Netty前期知识(二)
93 0
|
1月前
|
存储 缓存 监控
Netty基础篇:详解Netty底层NIO
Netty基础篇:详解Netty底层NIO
|
1月前
|
存储 网络协议 Java
NIO - 基础入门之通道和缓冲区
NIO - 基础入门之通道和缓冲区
58 0
|
8月前
|
缓存 网络协议 前端开发
从BIO到NIO在到Netty线程模型详解
从BIO到NIO在到Netty线程模型详解
146 1
|
1月前
|
存储 Java 数据处理
|
1月前
|
Java API
java中IO与NIO有什么不同
java中IO与NIO有什么不同
|
23天前
|
存储 监控 Java
深入探索Java语言的NIO(New I/O)技术
深入探索Java语言的NIO(New I/O)技术