02、Netty学习笔记—(NIO网络编程和IO模型)(一)

简介: 02、Netty学习笔记—(NIO网络编程和IO模型)(一)

一、网络编程


1.1、非阻塞VS阻塞


1.1.1、阻塞(默认)


阻塞模式下,相关方法都会导致线程暂停

ServerSocketChannel.accept 会在没有连接建立时让线程暂停

SocketChannel.read 会在没有数据可读时让线程暂停

阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置

单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持

但多线程下,有新的问题,体现在以下方面

32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低

可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接

服务器端:


import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import static com.changlu.ByteBuffer.utils.ByteBufferUtil.debugAll;
/**
 * @ClassName Socket
 * @Author ChangLu
 * @Date 2021/12/18 14:28
 * @Description 阻塞NIO服务器
 */
@Slf4j
public class NioServer {
    private static List<SocketChannel> channels = new ArrayList<>();
    private static final ByteBuffer buffer = ByteBuffer.allocate(20);
    public static void main(String[] args)throws Exception{
        //1、创建服务器
        final ServerSocketChannel ssc = ServerSocketChannel.open();
        // 2. 绑定监听端口
        ssc.bind(new InetSocketAddress(8198));
        log.debug("server start ...");
        while (true) {
            log.debug("server accept ...");
            // 3. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信(阻塞)
            final SocketChannel channel = ssc.accept();
            log.debug("channel => {}",channel);
            // 4. 添加连接至集合
            channels.add(channel);
            for (SocketChannel c : channels) {
                log.debug("before server read from {}...", c.getRemoteAddress());
                // 5. 接收客户端发送的数据(阻塞)
                c.read(buffer);
                //打印读取到的buffer内容
                debugAll(buffer);
                buffer.flip();//切换到读模式
                System.out.println("收到客户端:" + c + ",信息为:" + CharsetUtil.UTF_8.decode(buffer).toString());
                buffer.clear();//切换到写模式
                log.debug("end server read ...");
            }
        }
    }
}



客户端:


import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
/**
 * @ClassName NioClient
 * @Author ChangLu
 * @Date 2021/12/18 14:55
 * @Description Nio客户端
 */
@Slf4j
public class NioClient {
    public static void main(String[] args) throws Exception{
        final SocketChannel sc = SocketChannel.open();
        log.debug("client is connecting ...");
        final boolean result = sc.connect(new InetSocketAddress("localhost", 8198));
        if (result){
            log.debug("client connect success!");
            //之后的一些请求内容通过debug调试发出!
            //示例(在Evaluate中执行):sc.write(StandardCharsets.UTF_8.encode("hello!"))
        }else {
            log.debug("client is retrying...");
            sc.finishConnect();
        }
    }
}






问题:由于服务器端的accept()、read()都是阻塞方法,当连接来临一个的时候会进行read()阻塞,这时候再来请求不会立刻接收到;若是来了一个连接并发送了一个请求,此时若是该连接再发送一个请求也是不能够立即接收到的,这是因为此时在accept()阻塞。


这也是单线程带来的弊端!接下来对服务器端进行优化:每来一个请求就开辟一个线程来对其进行处理


import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import static com.changlu.ByteBuffer.utils.ByteBufferUtil.debugAll;
/**
 * @ClassName NioServer2
 * @Author ChangLu
 * @Date 2021/12/18 15:12
 * @Description 改进NioServer:对于每一个连接的客户端单独开辟一个线程来处理(解决accept()、read()的阻塞问题)
 */
@Slf4j
public class NioServer2 {
    private static final ByteBuffer buffer = ByteBuffer.allocate(20);
    public static void main(String[] args)throws Exception{
        final ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8198));
        log.debug("server start ...");
        while (true) {
            log.debug("server accept ...");
            final SocketChannel channel = ssc.accept();
            log.debug("channel => {}",channel);
            //来了一个客户端连接就开辟一个线程来进行单独处理
            submitAccept(channel);
        }
    }
    public static void submitAccept(SocketChannel c){
        new Thread(()->{
            try {
                while (true) {
                    log.debug("before server read from {}...", c.getRemoteAddress());
                    c.read(buffer);
                    //打印读取到的buffer内容
                    debugAll(buffer);
                    buffer.flip();//切换到读模式
                    System.out.println("收到客户端:" + c + ",信息为:" + CharsetUtil.UTF_8.decode(buffer).toString());
                    buffer.clear();//切换到写模式
                    log.debug("end server read ...");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}



1.1.2、非阻塞(设置参数)


设置方式:对ServerSocketChannel、SocketChannel调用configureBlocking(false)方法设置为非阻塞。


设置ServerSocketChannel为非阻塞:此时accept()就是非阻塞的,返回null说明没有连接。

设置SocketChannel为非阻塞:此时read()就是非阻塞的,返回0则表示没有数据,>0表示有。

效果:无论是否来连接、是否有发送数据都会直接取得返回值,而不是在那一直阻塞等待!


好处:在非阻塞模式下,单线程程序依然能够进行处理!


缺点:没有发送连接时,其实程序也还是在不断的执行循环操作,CPU一直在运行中…,更好的方式是有连接、请求了再进行处理(这就要涉及到selector)!对于read()、write()操作是否真正读到或写入数据都会直接返回结果!那么就能可能会造成CPU的资源浪费。


import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import static com.changlu.ByteBuffer.utils.ByteBufferUtil.debugAll;
/**
 * @ClassName NioServer
 * @Author ChangLu
 * @Date 2021/12/18 14:28
 * @Description 非阻塞NIO服务器
 */
@Slf4j
public class NioServer {
    private static List<SocketChannel> channels = new ArrayList<>();
    private static final ByteBuffer buffer = ByteBuffer.allocate(20);
    public static void main(String[] args)throws Exception{
        final ServerSocketChannel ssc = ServerSocketChannel.open();
        //设置ServerSocketChannel为非阻塞:此时accept()就是非阻塞的,返回null说明没有连接
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8198));
        log.debug("server start ...");
        while (true) {
//            log.debug("server accept ...");
            final SocketChannel channel = ssc.accept();
            if (channel != null){
                //设置SocketChannel为非阻塞:此时read()就是非阻塞的,返回0则表示没有数据,>0表示有
                channel.configureBlocking(false);
                log.debug("channel => {}",channel);
                channels.add(channel);
            }
            for (SocketChannel c : channels) {
//                log.debug("before server read from {}...", c.getRemoteAddress());
                final int readSize = c.read(buffer);
                if (readSize > 0){
                    debugAll(buffer);
                    buffer.flip();
                    System.out.println("收到客户端:" + c + ",信息为:" + CharsetUtil.UTF_8.decode(buffer).toString());
                    buffer.clear();
                    log.debug("end server read ...");
                }
            }
        }
    }
}



效果:这就是非阻塞带来的好处,我们不需要借助多线程来去额外处理每个来进行的连接也能够照常运行




1.1.3、多路复用(selector)


在非阻塞的基础上加了事件的概念,只有事件发生了selector才会让你的线程去继续运行,如果事件没有发生,selector是阻塞的,不会让你的线程白忙乎。


在非阻塞情况下,例如read()是否读到数据都会直接给你返回这就造成了CPU的浪费,通过使用多路复用将channel注册到selector选择器上后,只有对应channel感兴趣的事件发生了才会停止阻塞,拿到返回值!这就很有效的解决了非阻塞存留的问题!


channel包含四个状态:


accept:会在连接请求时触发。

connect:是在客户端,连接建立后触发。

read:可读事件。

write:可写事件。


1.2、单线程selector实现(多路复用)


1.2.1、Selector(课件)


selector 版

selector

thread

channel

channel

channel

好处


一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功

让这个线程能够被充分利用

节约了线程的数量

减少了线程上下文切换


创建

Selector selector = Selector.open();



绑定 Channel 事件

也称之为注册事件,绑定的事件 selector 才会关心


channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);


channel 必须工作在非阻塞模式

FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用

绑定的事件类型可以有

connect - 客户端连接成功时触发

accept - 服务器端成功接受连接时触发

read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况

write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况


监听 Channel 事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件


方法1,阻塞直到绑定事件发生


int count = selector.select();



方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)


int count = selector.select(long timeout);



方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件


int count = selector.selectNow();



select 何时不阻塞


1、事件发生时。


客户端发起连接请求,会触发 accept 事件

客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件

channel 可写,会触发 write 事件

在 linux 下 nio bug 发生时

2、调用 selector.wakeup()。


3、调用 selector.close()。


4、selector 所在线程 interrupt。



1.2.2、代码实现


代码实现过程思路(7点)

1、处理accept()


为了解决非阻塞不断占用CPU的问题,引入了selector选择器 。此时只有连接发生、请求发生,selector才会获得这些事件,然后我们的线程可以去处理这些事件,没活可干的时候线程会进行阻塞


就算没有连接、请求来临,CPU就会不断的在运行,此时占用率就会达到100%,这是一种浪费与损害

对于accept、connect事件是由serversocketchannel管理的,而对于read、write是socketchannel进行关注的



2、selectkey.cancel():取消事件


应用场景:正常情况是监听得到key事件,然后使用accept()来处理该事件,若是不处理那么下一次依旧会监听到导致无限循环,那么若是来了连接不想进行accept()接收是想直接将该事件进行忽略,那么就可以使用该cancel()取消当前该事件(连接)的情况!



3、处理read()


何时注册?当我们接受到accept()请求时,创建的channel可以注册到selector中,并设置其对read事件感兴趣。


处理方式:当获取到新的selectkeys时,判断是否是read事件,若是的话指定read()方法来进行处理,读取到bytebuffer中!



3、用完key为何使用remove()?


用完key之后要进行remove(),否则在进行accept()就会出现空指针


使用迭代器的原因是能够在遍历的过程中进行删除,否则若是使用foreach无法进行删除




4、处理客户端断开?异常断开与正常断开


对于关闭连接,会在read()方法中抛出异常导致程序直接结束运行! 客户端关闭时依然会发送一个read()事件!


异常断开(强制断开):


操作:直接强制关闭客户端连接(例如本地debug调试时直接关闭)。

现象:在read()事件中会直接抛出一个异常,导致直接程序结束!

解决方案:在read()事件外进行catch捕捉异常,接着对该key执行一个cancel()处理,也就是取消掉对该key的监听即可。

正常断开:


操作:执行socketchannel.close()关闭操作。

现象:对于正常关闭,read()是不会抛出异常的,使能够进行读取并得到返回值的,但由于这是一个断开操作,所以其返回值为-1


5、消息边界问题?没有正确处理消息边界产生的问题


原因:发送的数据,服务端无法一次读完,例如缓冲区4个字节,客户端发送了中文内容,服务器端出现了乱码(一个汉字3个字节,连个汉字6个字节),这就是没有处理消息边界产生出来的问题!


三种方案:


客户端与服务端进行约定好每次传输的数据大小,例如1024字节。缺点:服务器端接收时很浪费空间。

以指定分隔符号来进行表示,根据分隔符号来进行读取相应大小的数据。缺点:需要先进行遍历找到分隔符位置才行,比对效率低。

把每条消息分割成两个部分,第一部分内容是一个整型表示的是要发送数据的内容大小,第二次部分就是指定发送的数据,第二段的缓冲区长度根据第一部分数据内容来进行指定分配创建。(HTTP也是类似的方式)

当前小结实现第二种方案。在netty章节学习中就会使用第三种方案。


若是每段分隔符相隔的内容都满足<=bytebuffer容量时是很容易解决的,此时就有额外的一个问题:若是分隔符分割的内容>bytebuffer的容量时就会产生丢失内容的情况!


解决两个问题:①字节缓冲区扩容问题。②ByteBuffer不能作为一个局部变量,在两次读事件发生的时候用到同一个bytebuffer。

方案:使用扩容,初始使用buffer1先接收指定容量大小的内容,若是需要扩容接着创建一个buffer2,首先将buffer1中的内容进行拷贝到buffer2中,接着再将未读的内容再次读入即可!

思路:①每一个socketchannel都拥有自己的一个bytebuffer,不要直接定义成一个公共的(造成多个socketchannel共用),需要使用到一个附件的知识。attachment(就是register()中的第三个参数,第二个参数指的是关注的事件),这个附件随着channel一起关联到selectkey上,与channel一一对应。②在每次读事件开始时,拿到bytebuffer,进行写操作,判断是否满根据首次读入数据的关联bytebuffer的position==limit来确定是否要创建一个新的bytebuffer并关联到socketchannel上!

问题:我们写的还是有很大问题,每次扩容直接扩容2倍,不能够有效的节省内存空间!

netty的细节更精细:不仅仅能够做到扩容,还能够做到自适应更替大小,若是发现传输的数据越来越小,那么bytebuffer也会越来越小,同理其他情况!好处:更能够节省空间!



6、Bytebuffer大小分配问题


netty实现了bytebuffer自适应的效果。不仅仅要处理扩容,还要处理缩容的情况!方案一:首先分配一个较小bytebuffer,接着随着数据的增大分配一个较大的bytebuffer。方案二:使用多个数组来组成bytebuffer。



7、写入内容过多


对客户端进行响应,若是一次封装过多的内容可能并不能够一次就写完。例如八位数数据的内容不能够一次就发送给客户端,可能要发送多次,每次的字节数量也不定。


问题:虽然说能够把数据大量多次的发送给客户端,但是会有大量CPU浪费的情况(有时候会碰到缓冲区写满的情况,写不进去)!


解决思路:在网络缓冲区写满的情况下,让CPU去处理其他的事情!例如可以去读,一旦缓冲区空了,那么又可以去进行写操作。


解决方案:若是一次写不完,那么该key就去新增关注可写事件!之后若是网络缓冲区清空了或可写了,那么就会触发该可写事件,那么继续去执行,直到完全写完在bytebuffer中没有任何数据了,此时我们可以取消关注该key的可写事件以及取消附件!



案例1:处理accept()、read()事件

服务端:


有效处理了强制停止、正常停止的问题。

解决了read()读取时黏包、半包问题,客户端传输字节过大问题(采用方案二扩容解决)

import lombok.extern.slf4j.Slf4j;


import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.*;

import java.util.Iterator;


import static com.changlu.ByteBuffer.utils.ByteBufferUtil.debugAll;


/**
 * @ClassName NioServer
 * @Author ChangLu
 * @Date 2021/12/19 21:32
 * @Description 多路复用实践:使用selector实现单线程处理各类请求实践,含accept、read事件
 */
@Slf4j
public class NioServer {
    public static void main(String[] args) throws Exception{
        //1、创建selector,管理多个channel
        final Selector selector = Selector.open();
        final ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);//设置为非阻塞
        //2、建立selector和channel的联系(注册)
        // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
        final SelectionKey sscKey = ssc.register(selector, 0, null);//注册当前的socketchannel到selector上,参数为:选择器、感兴趣事件、附件
        log.debug("sscKey => {}",sscKey);
        // key只关注 accept事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));
        while (true) {
            //3、select方法,没有事件发生,线程阻塞,有事件线程才会恢复运行
            // select 在事件未处理时,他不会阻塞
            selector.select();//阻塞事件,若是选择器中的channel有感兴趣的事件发生,那么这里就不会进入阻塞状态
            //4、处理事件,selectKeys 内部包含了所有发生的事件
            final Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()){
                SelectionKey key = iter.next();
                log.debug("SelectionKey:{}" + key);
                if (key.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    //注意:若是不执行accept()事件接收,那么selector会一直监听到事件情况!
                    SocketChannel sc = channel.accept();
                    log.debug("accept SocketChannel:{}" + sc);
                    sc.configureBlocking(false);
                    //初始化每个channel携带一个16字节的缓冲区用来进行读取
                    ByteBuffer initBuffer = ByteBuffer.allocate(16);
                    //注册该channel为读事件,每个channel携带一个ByteBuffer用来进行读取数据
                    SelectionKey scKey = sc.register(selector, 0, initBuffer);
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.debug("注册read事件 => {}",scKey);
                }else if (key.isReadable()){
                    SocketChannel sc = (SocketChannel) key.channel();
//                    ByteBuffer buffer = ByteBuffer.allocate(16);
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    try {
                        int size = sc.read(buffer);
                        log.debug("read字节数为:" + size);
                        //若是读取数量为-1,表示客户端正常执行close()关闭,取消订阅
                        if (size == -1){
                            key.cancel();
                        }else{
                            handle(buffer);
                            //若是初始read读到缓冲区的内容没有读完整,那么就会出现position=limit情况(因为找不到\n分隔符)
                            if (buffer.position() == buffer.limit()) {
                                buffer.flip();//切换到读模式
                                ByteBuffer newCapBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                newCapBuffer.put(buffer);//重新进行读取,注意这里并没有切换到读模式,这是为了下次read()接上数据做准备
                                key.attach(newCapBuffer);
                                debugAll(newCapBuffer);
                            }
//                            buffer.flip();//切换为读模式
//                            System.out.println("读取内容:" + Charset.defaultCharset().decode(buffer).toString());
//                            debugAll(newCapBuffer);
                        }
                    }catch (IOException e){
                        e.printStackTrace();
                        //异常断开,直接取消对该key事件的关注
                        key.cancel();
                    }
                }
                //每次处理完一个事件都要直接将该事件进行移除,否则之后可能会依旧获取事件key,例如调用accept()出现null
                iter.remove();
//                key.cancel();//取消事件
            }
        }
    }
    /**
     * 处理黏包、半包情况:每次能够将\n结尾的内容读取到一个ByteBuffer,并测该ByteBuffer对象
     * @param buffer
     */
    private static void handle(ByteBuffer buffer) {
        buffer.flip();//切换到读状态
        for (int i = 0; i < buffer.limit(); i++) {
            //get(index):仅仅只是获取当前索引内容,不会造成position移动
            if (buffer.get(i) == '\n') {
                int readLen = i - buffer.position() + 1;
                ByteBuffer temp = ByteBuffer.allocate(readLen);
                for (int j = 0; j < readLen; j++) {
                    temp.put(buffer.get());
                }
                debugAll(temp);
            }
        }
        buffer.compact();//切换写状态(压缩):保留未读取的内容
    }
}



客户端:


import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
/**
 * @ClassName NioClient
 * @Author ChangLu
 * @Date 2021/12/19 21:38
 * @Description NIO客户端
 */
public class NioClient {
    public static void main(String[] args) throws Exception{
        final SocketChannel sc = SocketChannel.open();
        final boolean result = sc.connect(new InetSocketAddress("localhost", 8080));
        if (result) {
            System.out.println("客户端连接成功");
        }
        //示例(在Evaluate中执行):sc.write(StandardCharsets.UTF_8.encode("hello!"))
        System.in.read();
    }
}


效果:


1.客户端连接、异常关闭



2、客户端发送一条长记录,以\n来分割




案例2:处理write()事件

服务端:


解决了服务端向客户端写入过多内容的问题(过度占用CPU),通过订阅可写事件来进行解决!

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
/**
 * @ClassName NioServer2
 * @Author ChangLu
 * @Date 2021/12/20 19:51
 * @Description 多路复用实践:使用selector实现单线程处理各类请求实践,单独来处理写事件
 */
public class NioServer2 {
    public static void main(String[] args) throws Exception{
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.configureBlocking(false);
        //将服务器channel设置为对accept()感兴趣
        ssc.register(selector, SelectionKey.OP_ACCEPT,null);
        while (true) {
            selector.select();
            final Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                final SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {
                    ServerSocketChannel sc = (ServerSocketChannel) key.channel();
                    SocketChannel channel = sc.accept();
                    channel.configureBlocking(false);//若是不设置非阻塞的话,在进行write写时就会一直阻塞等待到写完成位置!
                    SelectionKey selKey = channel.register(selector, SelectionKey.OP_READ);
                    //1、向客户端发送大量数据
                    StringBuilder str = new StringBuilder();
                    for (int i = 0; i < 30000000; i++) {
                        str.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(str.toString());
                    //2、返回值代表实际写入的字节数
                    //开始进行写操作:若是一次不能够直接写完所有内容,那么就将其添加至写事件
                    int writeSize = channel.write(buffer);
                    System.out.println(writeSize);
                    //3、判断是否有剩余内容
                    if (buffer.hasRemaining()) {
                        //4、关注可写事件
                        //注意:不能够直接设置写事件,需要在原有基础上添加指定的感兴趣事件
                        selKey.interestOps(selKey.interestOps() + SelectionKey.OP_WRITE);
                        selKey.attach(buffer);
                    }
                }else if (key.isWritable()) {  // 5、若是网络缓冲区又有空间能够写入,则会触发该事件
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    SocketChannel channel = (SocketChannel) key.channel();
                    //6、继续通过通道向客户端写入内容
                    int writeSize = channel.write(buffer);
                    System.out.println(writeSize);
                    //7、最终的清理操作
                    //  若是当前已经写完所有内容了,那么就取消关注该key的写事件,并不携带附件内容
                    if (!buffer.hasRemaining()) {
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                        key.attach(null);
                    }
                }
            }
        }
    }
}


客户端:


import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
 * @ClassName NioClient
 * @Author ChangLu
 * @Date 2021/12/19 21:38
 * @Description NIO客户端:接收服务端的大量写内容
 */
public class NioClient2 {
    public static void main(String[] args) throws Exception{
        final SocketChannel sc = SocketChannel.open();
        final boolean result = sc.connect(new InetSocketAddress("localhost", 8080));
        if (result) {
            System.out.println("客户端连接成功");
        }
        //当前的channel是阻塞的,那么每次read()都能够读取到内容,这里的话是接收一次大数据集的内容
        int count = 0;
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            int size = sc.read(buffer);
            if (size == 0) {
                break;
            }
            count += size;
            System.out.println("读取字节数:" + count);
        }
        //示例(在Evaluate中执行):sc.write(StandardCharsets.UTF_8.encode("hello!"))
        System.in.read();
    }
}



效果:对于服务端向客户端进行写大量内容时可能一次写不完,可通过借助订阅写事件来有效的节省CPU的开销




1.3、多线程实现


1.3.1、理论说明(多线程带来的问题及解决方案)


现在都是多核 cpu,设计时要充分考虑别让 cpu 的力量被白白浪费


如何拿到 cpu 个数:


Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数。

这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启。

单线程虽说也能够很好的处理多个连接与请求,但是并没有很好的发挥了多核CPU的用处。若是某个事件耗费时间较长实际上就会影响其他事件的处理


redis的底层也是单线程,也是使用了类似NIO、selector来进行编写。redis的缺点:若是某一个操作耗时较长,那么就会影响其他的操作。

分两组选择器:


单线程配一个选择器,专门处理 accept 事件

创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件

采用多线程,每个线程配对一个selector来进行分工合作:充分提高CPU利用率,下面Boss处理accept,其他worker处理读写事件



选择器selector注意点:若是在某个线程中的selector1被阻塞了,那么其他线程在使用selector时也会被阻塞!register()只有在select()不再阻塞的时候才会允许被注册。



黑马教程中提出的疑惑


boss线程:

SocketChannel sc = ssc.accept();
  sc.register(worker.selector,SelectionKey.OP_READ,null);
  worker.register();
worker线程:
  register(){ ...//注册器注册,线程启动 }
  run(){
    selector.select();
  }


问题描述:上面也说到了selector选择器的注意点,一旦selector.select()进入阻塞状态,那么执行sc.register(selector,…,…)就会注册失败,上面代码中select()选择方法与注册方法register()是在不同线程下完成的,其执行顺序并不是同步的那么就会很容易出现注册失败的情况,从而导致某个channel订阅的事件无法接受到!


解决思路:最理想的状态就是让select()方法与register()方法在同一个线程中执行,也就是同步操作,并且register()在select()得到返回值后进行执行注册!


黑马思路1:在woker类中创建一个并发队列,队列中用于存放注册任务。在register()方法调用时添加任务到队列,在select()得到返回值后执行队列中的注册任务。


static class Worker implements Runnable{
    private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();
    public void register(SocketChannel sc) throws Exception {
        ...
      queue.add(()-{
            sc.register(this.selector,SelectionKey.OP_READ,null);
        });
    }
    @Override
    public void run() {
        while (true) {
            try {
                selector.select();
                //select()得到返回值后来进行执行注册任务
                Runnable task = queue.poll();
                if (task != null) {
                    task.run();
    }
                ...
    }


疑惑点:这样乍一看像是进行同步操作,不过要注意其添加到队列中的是某个线程任务,那么select()操作和注册操作就不是在一个线程里执行的,那么很有可能task.run()的注册方法还没执行完,run()的一轮结束,此时就会造成注册失败!


黑马思路2:通过wakeup()唤醒


static class Worker implements Runnable{
    private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();
    public void register(SocketChannel sc) throws Exception {
        ...
      selector.wakeup();
        sc.register(this.selector,SelectionKey.OP_READ,null);
    }
     @Override
    public void run() {
        while (true) {
            try {
                selector.select();
                ...
    }
}


疑惑点:wakeup()能够让其他线程中指定selector的select()进行返回,那么会不会有一个情况就是wakeup()唤醒,然后执行woker线程,select()取得返回值然后一轮while()结束又进入阻塞,此时再执行register()注册选择器方法,此时不就又注册失败了吗!


最终解决方案(个人目前认知):使用队列+wakeup(),首先队列中存储的是SocketChannel而不是线程任务,是在wakeup()之前添加,在woker的run()方法的select()下,从队列中取出来进行手动进行注册,此时则能够保证select()与register()是在进行同步操作,经过验证是ok的!



1.3.2、代码实现


多线程NIO服务器server端:


import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import static com.changlu.ByteBuffer.utils.ByteBufferUtil.debugAll;
/**
 * @ClassName MultThreadServer
 * @Author ChangLu
 * @Date 2021/12/24 21:03
 * @Description 多线程优化:①解决worker注册与select选择的阻塞问题。②多个worker(也就是多个selector)处理(非连接事件)。
 */
@Slf4j
public class MultiThreadNioServer {
    public static void main(String[] args) throws Exception{
        Thread.currentThread().setName("boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector boss = Selector.open();
        ssc.register(boss, SelectionKey.OP_ACCEPT, null);
        ssc.bind(new InetSocketAddress(8080));
        //1、创建固定数量的worker(由于Linux的bug问题,所以这里手动指定worker,否则通过代码来动态获取线程数量)
        Worker[] workers = new Worker[4];
//        Worker worker = new Worker("worker-0");
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new Worker("worker-" + i);
        }
        int count = 0;
        while (true) {
            //2、boss线程处理连接请求SocketChannel
            boss.select();
            Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    log.debug("connected...{},sc {}",sc.getRemoteAddress(),sc);
                    //3、多个请求连接处理注册到不同的worker线程中去
                    //负载均衡依次平衡的注册到某个线程中去
                    workers[count++ % workers.length].register(sc);
                }
            }
        }
    }
    static class Worker implements Runnable{
        private Thread thread;//Worker线程
        private Selector selector;//一个Worker线程对应一个selector
        private String name;//线程名称
        private volatile boolean start = false;//还未初始化
        //用来临时存储SocketChannel用于进行注册
        private ConcurrentLinkedDeque<SocketChannel> queue = new ConcurrentLinkedDeque<>();
        public Worker(String name){
            this.name = name;
        }
        //初始化线程,
        public void register(SocketChannel sc) throws Exception {
            if (!start) {
                this.thread = new Thread(this,this.name);
                this.selector = Selector.open();
                this.thread.start();
                start = true;
            }
            //向队列添加SocketChannel,方便之后某个线程来进行同步方法执行注册操作
            queue.add(sc);
            this.selector.wakeup();//唤醒相关联selector的其他线程的select()阻塞
        }
        @Override
        public void run() {
            while (true) {
                try {
                    selector.select();
                    //***这部分注册代码是针对于手动调用wakeup()唤醒的过程!***
                    SocketChannel sc = queue.poll();
                    if (sc != null) {
                        log.debug("before register...{}",sc.getRemoteAddress());
                        sc.register(this.selector,SelectionKey.OP_READ,null);
                        log.debug("after register...{}",sc.getRemoteAddress());
                    }
                    //******************************************************
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isReadable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = (SocketChannel) key.channel();
                            channel.read(buffer);
                            buffer.flip();
                            log.info("key is {}",key);
                            debugAll(buffer);
                        }
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}



客户端client:


import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
/**
 * @ClassName NIOClient
 * @Author ChangLu
 * @Date 2021/12/26 12:25
 * @Description NIO客户端
 */
public class NioClient {
    public static void main(String[] args) throws Exception{
        final SocketChannel sc = SocketChannel.open();
        final boolean result = sc.connect(new InetSocketAddress(8080));
        if (result) {
            System.out.println("连接成功");
        }else {
            sc.finishConnect();
        }
        //示例(在Evaluate中执行):sc.write(StandardCharsets.UTF_8.encode("hello!"))
        System.in.read();
    }
}



实现效果:本地调试电脑为4个CPU,这里就固定设置了四个woker


Boss线程(也就是主线程)负责处理连接请求。

woker线程(多个)通过负载均衡来将主线程得到的socketchannel依次注册到其线程的selector中。

下面我们来直接连接四个客户端,接着随机挑选两个客户端发送数据,通过查看log打印的线程名称我们就可以看到是否是多线程来处理连接后的请求!







相关文章
|
3天前
|
网络协议 安全 前端开发
网络技术基础(2)——网络参考模型
【2月更文挑战第6天】网络基础笔记
|
5天前
|
Java
网络 I/O:单 Selector 多线程(单线程模型)
网络 I/O:单 Selector 多线程(单线程模型)
|
6天前
|
存储 消息中间件 监控
一文搞懂常见的网络I/O模型
一文搞懂常见的网络I/O模型
20 0
|
21天前
|
机器学习/深度学习 编解码 网络架构
YOLOv8改进 | 主干篇 | 华为移动端模型Ghostnetv2改进特征提取网络
YOLOv8改进 | 主干篇 | 华为移动端模型Ghostnetv2改进特征提取网络
35 0
|
21天前
|
机器学习/深度学习 编解码 算法
YOLOv8改进 | 主干篇 | 低照度增强网络PE-YOLO改进主干(改进暗光条件下的物体检测模型)
YOLOv8改进 | 主干篇 | 低照度增强网络PE-YOLO改进主干(改进暗光条件下的物体检测模型)
16 0
|
21天前
|
机器学习/深度学习 测试技术
YOLOv8改进 | 主干篇 | 华为移动端模型Ghostnetv1改进特征提取网络
YOLOv8改进 | 主干篇 | 华为移动端模型Ghostnetv1改进特征提取网络
22 0
|
21天前
|
机器学习/深度学习 人工智能 API
人工智能应用工程师技能提升系列2、——TensorFlow2——keras高级API训练神经网络模型
人工智能应用工程师技能提升系列2、——TensorFlow2——keras高级API训练神经网络模型
14 0
|
22天前
|
网络协议 Linux
Linux下的网络编程——B/S模型HTTP(四)
Linux下的网络编程——B/S模型HTTP(四)
22 0
|
22天前
|
网络协议 大数据 Linux
Linux下的网络编程——C/S模型 UDP(三)
Linux下的网络编程——C/S模型 UDP(三)
39 0
Linux下的网络编程——C/S模型 UDP(三)
|
22天前
|
网络协议 关系型数据库 MySQL
Linux下的网络编程——C/S模型TCP(二)
Linux下的网络编程——C/S模型TCP(二)
23 0
Linux下的网络编程——C/S模型TCP(二)

热门文章

最新文章

相关产品

  • 云迁移中心