【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)(二)

简介: 【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)

【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)(一)https://developer.aliyun.com/article/1395315

java.net.PlainSocketImpl#socketAccept

不同的操作系统实现不同,这里仅以个人看到的JDK11版本源码为例。


@Override
void socketAccept(SocketImpl s) throws IOException {
  int nativefd = checkAndReturnNativeFD();
  if (s == null)
    throw new NullPointerException("socket is null");
  int newfd = -1;
  InetSocketAddress[] isaa = new InetSocketAddress[1];
  if (timeout <= 0) {
    newfd = accept0(nativefd, isaa);
  } else {
    configureBlocking(nativefd, false);
    try {
      waitForNewConnection(nativefd, timeout);
      newfd = accept0(nativefd, isaa);
      if (newfd != -1) {
        configureBlocking(newfd, true);
      }
    } finally {
      configureBlocking(nativefd, true);
    }
  }
  /* Update (SocketImpl)s' fd */
  fdAccess.set(s.fd, newfd);
  /* Update socketImpls remote port, address and localport */
  InetSocketAddress isa = isaa[0];
  s.port = isa.getPort();
  s.address = isa.getAddress();
  s.localport = localport;
  if (preferIPv4Stack && !(s.address instanceof Inet4Address))
    throw new SocketException("Protocol family not supported");
}

我们只关心下面这部分代码,方法中首先判断 timeout 是否小于等于0(如果没有设置,那么默认就是 0),如果是则走accept0(nativefd, isaa)方法。

前面反复提到的,accept操作核心实现这是下面的 native accept0 方法,具体操作是:

在操作系统层面检查bind的端口上是否有客户端数据接入,如果没有则一直阻塞等待


if (timeout <= 0) {
  newfd = accept0(nativefd, isaa);
} else {
  configureBlocking(nativefd, false);
  try {
    waitForNewConnection(nativefd, timeout);
    newfd = accept0(nativefd, isaa);  
    if (newfd != -1) {
      configureBlocking(newfd, true);
    }
  } finally {
    configureBlocking(nativefd, true);
  }
} // <4>


static native int accept0(int fd, InetSocketAddress[] isaa) throws IOException;

因为操作系统层面的阻塞需要影响到应用程序级别阻塞?显然accept0(nativefd, isaa)的操作系统层面阻塞是无 法避免的。

仔细观察代码,上面的代码分支提供了另外一种选择, timeout 的值设置大于0的值,此时程序会在等到我们设置的时间后返回,并且只会阻塞设置的这个时间量的值(单位毫秒)。

注意,这里的 newfd 如果是 -1,表示底层没有任何数据返回,在Linux的文档中也有对应的介绍。

java.net.ServerSocket#setSoTimeout

既然不阻塞的关键参数是timeout , 接下来我们看下 timeout 值要如何设置。


/**
Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds. With this option set to a non-zero timeout, a call to accept() for this ServerSocket will block for only this amount of time. If the timeout expires, a java.net.SocketTimeoutException is raised, though the ServerSocket is still valid. The option must be enabled prior to entering the blocking operation to have effect. The timeout must be > 0. A timeout of zero is interpreted as an infinite timeout.
启用/禁用SO_TIMEOUT,指定超时时间,单位为毫秒。在这个选项被设置为非零超时的情况下,对这个ServerSocket的accept()的调用将只阻塞这个时间量。如果超时过后,会引发java.net.SocketTimeoutException,尽管ServerSocket仍然有效。该选项必须在进入阻塞操作之前启用才能生效。超时必须大于0。超时为0会被解释为无限期超时。
*/
public synchronized void setSoTimeout(int timeout) throws SocketException {  
    if (isClosed())  
        throw new SocketException("Socket is closed");  
    getImpl().setOption(SocketOptions.SO_TIMEOUT, timeout);  
}

简单明了,java.net.SocketOptions#setOption 方法最终调用的是java.net.AbstractPlainSocketImpl#setOption()

java.net.AbstractPlainSocketImpl#setOption


public void setOption(int opt, Object val) throws SocketException {  
    if (isClosedOrPending()) {  
        throw new SocketException("Socket Closed");  
    }  
    boolean on = true;  
    switch (opt) {  
  case SO_LINGER:  
        //..
    case SO_TIMEOUT:  
        if (val == null || (!(val instanceof Integer)))
                throw new SocketException("Bad parameter for SO_TIMEOUT");
            int tmp = ((Integer) val).intValue();
            if (tmp < 0)
                throw new IllegalArgumentException("timeout < 0");
            timeout = tmp;
            break;
    case TCP_NODELAY:  
        //....
    case SO_RCVBUF:  
        //....
    case SO_KEEPALIVE:  
        //....
    }  
    socketSetOption(opt, on, val);  
}

为了方便阅读,这里把其他的代码都删除了,只保留传参部分。

可以看到,这里仅仅是将setOption里面传入的timeout值,设置到了AbstractPlainSocketImpl的全局变量timeout

画图小结

个人认为整个accept() 操作比较”恶心“(个人观点)的是几个引用的赋值变化上面,暂时”解绑“的目的是在进行底层Socket连接的时候,如果Socket出现异常也没有影响,此时Socket持有的引用也是null,可以无阻碍的重新进行下一次Socket连接。

换句话说,整个Socket要么对接成功,要么就是重置回没对接之前的状态可以进行下一次尝试,保证ServerSocket会收到一个没有任何异常的Socket连接

最后再看一眼图:

image.png

改造并实现accept的非阻塞实现

在进行案例程序的改造之前,必须要先理解同步、异步、阻塞、非阻塞这几个概念。

这个概念在之前的笔记中 [[《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty]] 【洗衣机案例理解阻塞非阻塞,同步异步概念】这一部分提到过,[[【Java】《2小时搞定多线程》个人笔记]] 中又一次对于这几个概念做了个区分。

区分同步异步的关键点是被调用方的行为,没有得到结果之前,服务端不返回任何结果,那么操作就是同步的。

如果没有得到结果之前,服务器可以返回结果,比如给一个句柄,通过这个句柄可以在未来某个时间点之后获得结果,那么操作就是异步的。

这个句柄可以对应Java 并发编程的 Future 的概念

再举个例子,比如说前面的accept0应用程序调用操作系统,在Linux中就是访问系统内核,此时这一整块逻辑处理是选择”永久等待一个客户端连接“,符合 没有得到结果之前,服务端不返回任何结果 这种情况,所以它是同步的。

区分阻塞和非阻塞的关键点则是 对于调用者而言的服务端状态*,比如我们站在线程状态的角度,阻塞对应 Blocking,非阻塞此时应该对应Running正常执行。再比如站在线程发出请求之后请求方的角度,阻塞和非阻塞分别对应waitingNo waiting

理解同步异步阻塞和非阻塞之后,下面来尝试改造相关代码accept的阻塞问题,实现方式很简单,那就是设置 ** timeout ** , 然后在异常处理上continue重试。


/**  
 * accept 超时时间设置  
 */  
private static final int SO_TIMEOUT = 2000;
/***
 * NIO 改写
 * @description NIO 改写
 * @param port
 * @return void
 * @author xander
 * @date 2023/7/12 10:35
 */
public void initNioServer(int port) {
  ServerSocket serverSocket = null;//服务端Socket
  Socket socket = null;//客户端socket
  BufferedReader reader = null;
  String inputContent;
  int count = 0;
  try {
    serverSocket = new ServerSocket(port);
    // 1. 需要设置超时时间,会等待设置的时间之后再进行返回
    serverSocket.setSoTimeout(SO_TIMEOUT);
    log.info(stringNowTime() + ": serverSocket started");
    while (true) {
      // 2. 如果超时没有获取,这里会抛出异常,这里的处理策略是不处理异常
      try {
        socket = serverSocket.accept();
      } catch (SocketTimeoutException e) {
        //运行到这里表示本次accept是没有收到任何数据的,服务端的主线程在这里可以做一些其他事情
        log.info("now time is: " + stringNowTime());
        continue;
      }
      log.info(stringNowTime() + ": id为" + socket.hashCode() + "的Clientsocket connected");
      reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
      while ((inputContent = reader.readLine()) != null) {
        log.info("收到id为" + socket.hashCode() + "  " + inputContent);
        count++;
      }
      log.info("id为" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "读取结束");
    }
  } catch (IOException e) {
    e.printStackTrace();
  } finally {
    try {
      if(Objects.nonNull(reader)){
        reader.close();
      }
      if(Objects.nonNull(socket)){
        socket.close();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}/**运行结果:
     10:40:49.272 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - 2023-07-12 10:40:49: serverSocket started
     10:40:52.826 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:40:52
     10:40:54.830 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:40:54
     10:40:56.837 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:40:56
     10:40:58.840 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:40:58
     10:41:00.849 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:41:00
     10:41:02.852 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:41:02
     */

设置了timeout之后,accept 方法每次都会在间隔指定时间之后被唤醒一次,如果没有收到连接就会抛出异常,我们的处理方式是吞掉异常并且重新accept,这样就实现了类似非阻塞的效果。

小结

Socket 当中 getInputStream() 的方法解析以及后续的read操作结构图如下。

image.png

Socket 中的 getInputStream() 方法解析

实现了非阻塞的accept之后,再来看下另一个会产生阻塞的方法,那就是Socket.getInputStream,这个方法在Socket连接,服务端在read() 读取数据的时候会进行调用。

java.net.Socket#getInputStream


/**
返回该socket的输入流。
如果该套接字有一个关联的通道,那么生成的输入流会将其所有操作委托给该通道。如果通道处于非阻塞模式,那么输入流的读操作将抛出java.nio.channel.IllegalBlockingModeException。
在异常情况下,底层连接可能会被远程主机或网络软件中断(例如在TCP连接中的连接重置)。当网络软件检测到连接断开时,返回的输入流会出现以下情况:
*/
  public InputStream getInputStream() throws IOException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        if (!isConnected())
            throw new SocketException("Socket is not connected");
        if (isInputShutdown())
            throw new SocketException("Socket input is shutdown");
        InputStream is = null;
        try {
            is = AccessController.doPrivileged(
                new PrivilegedExceptionAction<>() {
                    public InputStream run() throws IOException {
                        return impl.getInputStream();
                    }
                });
        } catch (java.security.PrivilegedActionException e) {
            throw (IOException) e.getException();
        }
        return is;
    }

上面通过AccessController进行授权,run方法中调用java.net.AbstractPlainSocketImpl#getInputStream方法。


protected synchronized InputStream getInputStream() throws IOException {  
    synchronized (fdLock) {  
        if (isClosedOrPending())  
            throw new IOException("Socket Closed");  
        if (shut_rd)  
            throw new IOException("Socket input is shutdown");  
        if (socketInputStream == null)  
            socketInputStream = new SocketInputStream(this);  
    }  
    return socketInputStream;  
}

可以看到,代码中创建了 SocketInputStream 对象,并且会将当前AbstractPlainSocketImpl对象传进去(这个对象实际就是 SocksSocketImpl )。

read读数据的时候,则会调用如下方法:


public int read(byte b[], int off, int length) throws IOException {  
    return read(b, off, length, impl.getTimeout());  
}


int read(byte b[], int off, int length, int timeout) throws IOException {
  int n;
  // EOF already encountered
  if (eof) {
    return -1;
  }
  // connection reset
  if (impl.isConnectionReset()) {
    throw new SocketException("Connection reset");
  }
  // bounds check
  if (length <= 0 || off < 0 || length > b.length - off) {
    if (length == 0) {
      return 0;
    }
    throw new ArrayIndexOutOfBoundsException("length == " + length
        + " off == " + off + " buffer length == " + b.length);
  }
  // acquire file descriptor and do the read
  // 获取文件描述符并进行读取
  FileDescriptor fd = impl.acquireFD();
  try {
    n = socketRead(fd, b, off, length, timeout);
    if (n > 0) {
      return n;
    }
  } catch (ConnectionResetException rstExc) {
    impl.setConnectionReset();
  } finally {
    impl.releaseFD();
  }
  /*
   * If we get here we are at EOF, the socket has been closed,
   * or the connection has been reset.
   */
  if (impl.isClosedOrPending()) {
    throw new SocketException("Socket closed");
  }
  if (impl.isConnectionReset()) {
    throw new SocketException("Connection reset");
  }
  eof = true;
  return -1;
}

重点关注下面这一行代码,这里在读取的时候同样传递了 timeout 参数:


n = socketRead(fd, b, off, length, timeout);

socketRead 方法会调用 nativesocketRead0 方法,timeout 代表了读取的超时时间。


private native int socketRead0(FileDescriptor fd,  
                               byte b[], int off, int len,  
                               int timeout)  
    throws IOException;

timeout 参数源于前面的new SocketInputStream(this)(也就是 AbstractPlainSocketImpl 对象)中的this引用impl.getTimeout(),这个参数的作用是指定read的超时时间,超时之后没有结果抛出异常。


serverSocket.setSoTimeout(SO_TIMEOUT);

了解read方法中timeout的作用之后,我们便可以着手改造代码了,具体的改造部分个人放到后文单独的 titile 进行说明,方便后续回顾。

此外,这里经过仔细考虑,判断这部分代码读者很有可能会存在理解误区,误以为此处的 AbstractPlainSocketImpl 属于 ServerSocket,实际上它属于 Socket,也就是说我们设置的 timeout 是设置到 SocketAbstractPlainSocketImpl

最为简单的证明方法是先在  java.net.Socket#setImpl 中打上断点,在启动BIO的服务端之后,立即启动客户端,具体的Debug断点如下:

image.png

通过单步调试,我们在BioServerSocket 中看到两个对象是不一样的。

image.png

image.png

为什么不一样呢?这里需要回顾前面的【ServerSocket中accept 解读】这一部分的操作。这里把重要操作标记了一下:

image.png

这里复习之前提到的内容,在accept(); 中为了确保Socket连接是正确并且可用的,每次都会new Socket(),而这里的SocksSocketImpl 是属于 Socket 的成员变量。

在进行Socket套接字连接之前会先判断是否初始化,如果初始化没有就先进行初始化(具体可以看红框框的位置)。

如果还是理解不了,那么只能再次寄出另一张杀手锏图了:

image.png

实现 Socket 中的 read 方法非阻塞

AbstractPlainSocketImpl实现socketRead方法非阻塞,具体做法其实就是使用 AbstractPlainSocketImpl 传入了 timeout 参数,实现 SocketInputStream 非阻塞read

表面上看上去 read 方法是非阻塞的,实际上这里存在一个明显的 误区,那就是在socket = serverSocket.accept();这一段代码中,服务端构建出 Socket 连接之后,客户端和服务端交互是通过独立的Socket对象完成IO读写的。

然而在第一次改造过后,实际上还有两点不易察觉的问题:

(1)服务端read的非阻塞轮询效率非常低,基本上是“一核繁忙、多核围观”的情况。

(2)第一次改造设置的是设定的是ServerSocket级别SocksSocketImpl的timeout。每个新的客户端进来都是新的Socket连接,每个Socket又有各自的 SocksSocketImpl,这里客户端连接所产生新的Sockettimeout是没有做设置的,换句话说,服务端针对每个Socket的read依然是完全阻塞。

前文提到,在BIO非阻塞同步模型中,我们虽然没法解决 系统底层"同步" 问题,但是我们可以让“非阻塞”这一块更为优化合理和更为高效。

第一个问题的解决策略是启动多线程以非阻塞read()方式轮询,这样做的另一点好处是,某个Socket读写压力大并不会影响CPU 切到其他线程的正常工作。

解决第二点问题,我们需要为每个新的Socket设置 timeout

解决上面两个问题,真正BIO非阻塞实现才算是真正成立,下面我们来看下第二版代码优化:


/**
     * 1. NIO 改写,accept 非阻塞
     * 2. 实现 read() 同样非阻塞
     *
     * @param port
     * @return void
     * @description
     * @author xander
     * @date 2023/7/12 16:38
     */
    public void initNioAndNioReadServer(int port) {
        ServerSocket serverSocket = null;//服务端Socket
        Socket socket = null;//客户端socket
        BufferedReader reader = null;
        ExecutorService threadPool = Executors.newCachedThreadPool();
        String inputContent;
        int count = 0;
        try {
            serverSocket = new ServerSocket(port);
            // 1. 需要设置超时时间,会等待设置的时间之后再进行返回
            serverSocket.setSoTimeout(SO_TIMEOUT);
            log.info(stringNowTime() + ": serverSocket started");
            while (true) {
                // 2. 如果超时没有获取,这里会抛出异常,这里的处理策略是不处理异常
                try {
                    socket = serverSocket.accept();
                } catch (SocketTimeoutException e) {
                    //运行到这里表示本次accept是没有收到任何数据的,服务端的主线程在这里可以做一些其他事情
                    log.info("now time is: " + stringNowTime());
                    continue;
                }
                // 3. 拿到Socket 之后,应该使用线程池新开线程方式处理客户端连接,提高CPU利用率。
                Thread thread = new Thread(new ClientSocketThread(socket));
                threadPool.execute(thread);
//                log.info(stringNowTime() + ": id为" + socket.hashCode() + "的Clientsocket connected");
//                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//
//                while ((inputContent = reader.readLine()) != null) {
//                    log.info("收到 id为" + socket.hashCode() + "  " + inputContent);
//                    count++;
//                }
//                log.info("id为" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "读取结束");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (Objects.nonNull(reader)) {
                    reader.close();
                }
                if (Objects.nonNull(socket)) {
                    socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 改写 客户端 Socket 连接为单独线程处理
     */
    class ClientSocketThread implements Runnable {
        private static final int SO_TIMEOUT = 2000;
        private static final int SLEEP_TIME = 1000;
        public final Socket socket;
        public ClientSocketThread(Socket socket) {
            this.socket = socket;
        }
        @Override
        public void run() {
            BufferedReader reader = null;
            String inputContent;
            int count = 0;
            try {
                socket.setSoTimeout(SO_TIMEOUT);
            } catch (SocketException e1) {
                e1.printStackTrace();
            }
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                while (true) {
                    try {
                        while ((inputContent = reader.readLine()) != null) {
                            log.info("收到id为" + socket.hashCode() + "  " + inputContent);
                            count++;
                        }
                    } catch (Exception e) {
                        //执行到这里表示read方法没有获取到任何数据,线程可以执行一些其他的操作
                        log.info("Not read data: " + stringNowTime());
                        continue;
                    }
                    //执行到这里表示读取到了数据,我们可以在这里进行回复客户端的工作
                    log.info("id为" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "读取结束");
                    Thread.sleep(SLEEP_TIME);
                }
            } catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                try {
                    if (Objects.nonNull(reader)) {
                        reader.close();
                    }
                    if (Objects.nonNull(socket)) {
                        socket.close();
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

经过上面的改造,我们基本把 BIO 同步阻塞的工作方式更新为 同步非阻塞的工作方式,核心是对于 read()以及服务端接收新连接的accept()设置timeout参数。

在外部处理上,通过while(true) 加上“吞异常”方式,结合Thread.sleep()的套路实现“非阻塞”定期accept

当然,我们也可以看到,通过线程池每次都构建新线程的方式,在连接比较少的时候是比较高效的,但是一旦连接暴增,理论上JVM虽然可以构建非常多线程,实际上CPU肯定是吃不消,多线程“空轮询”判断的方式也十分浪费CPU资源,多线程切换起来更是雪上加霜。

基于BIO的种种弊端,Sun 在JDK1.4 提供了 NIO 来解决上面的几点问题。

native accept方法在Linux运作解读

accept(2): accept connection on socket - Linux man page (die.net)

原始文档相关解读:[[【Linux】accept(2) - Linux man page]],下面的内容基本为文档的翻译和理解介绍。

accept()本地方法,我们可以来试着看一看Linux这块的相关解读:


#include <sys/types.h>
#include <sys/socket.h>
int accept(int sockfd,struct sockaddr *addr,socklen_t *addrlen);

accept()系统调用主要用在基于连接的套接字类型,比如SOCK_STREAMSOCK_SEQPACKET。它提取出所监听套接字的等待连接队列中第一个连接请求,创建一个新的套接字,并返回指向该套接字的文件描述符。新建立的套接字不在监听状态,原来所监听的套接字也不受该系统调用的影响。

备注:新建立的套接字准备发送send()和接收数据recv()

sockfd,作用是 利用系统调用socket()建立的套接字描述符,通过bind()绑定到一个本地地址(一般为服务器的套接字),并且通过listen()一直在监听连接;

addr, 指向struct sockaddr的指针,该结构用通讯层服务器对等套接字的地址(一般为客户端地址)填写,返回地址addr的确切格式由套接字的地址类别(比如TCP或UDP)决定;

addr为NULL,没有有效地址填写,这种情况下,addrlen也不使用,应该置为NULL;

备注:addr是个指向局部数据结构sockaddr_in的指针,这就是要求接入的信息本地的套接字(地址和指针)。

addrlen, 代表一个值结果参数,调用函数必须初始化为包含addr所指向结构大小的数值,函数返回时包含对等地址(一般为服务器地址)的实际数值;

备注:addrlen是个局部整形变量,设置为sizeof(struct sockaddr_in)

如果队列中没有等待的连接,套接字也没有被标记为Non-blocking,accept()会阻塞调用函数直到连接出现;如果套接字被标记为Non-blocking,队列中也没有等待的连接,accept()返回错误EAGAINEWOULDBLOCK

备注:一般来说accept()为阻塞函数,当监听socket调用accept()时,它先到自己的receive_buf中查看是否有连接数据包;若有,把数据拷贝出来,删掉接收到的数据包,创建新的socket与客户发来的地址建立连接;若没有,就阻塞等待;

为了在套接字中有到来的连接时得到通知,可以使用select()poll()。当尝试建立新连接时,系统发送一个可读事件,然后调用accept()为该连接获取套接字。另一种方法是,当套接字中有连接到来时设定套接字发送SIGIO信号。

返回值成功时,返回非负整数,该整数是接收到套接字的描述符;出错时会返回-1,相应地设定全局变量error。

所以,在Java部分的源码里(java.net.ServerSocket#accept)会new 一个Socket出来,方便连接后拿到的新Socket的文件描述符的信息给设定到我们new出来的这个Socket 上来,这点在java.net.PlainSocketImpl#socketAccept中看到的尤为明显,读者可以回顾相关源码。

总结

本文一开始介绍了Bio Socket的基本代码,接着从ServerSocketbind方法解读,通过图文结合的方式介绍了源码如何处理,整个bind操作过程中有许多native层调用,所以Socket的代码调试是非常麻烦的。

介绍完bind之后,我们接着介绍了ServerSocketaccept方法,并且介绍了accept 方法的阻塞问题实际上和底层的操作系统行为有关,并且通过画图的方式理解accept中Socket连接比较“绕”的操作。

最后,文章的后半部分介绍了如何改造accept以及客户端的Socket连接解决非阻塞问题IO,最后我们介绍了  native accept方法在Linux运作,主要内容为Linux的相关文档理解。

写在最后

理解Socket的非阻塞操作有助于理解 NIO的Channel和Buffer的概念,实际上从我们的Demo代码可以看到Channel和非阻塞的BIO思路比较类似,而BufferReader缓冲流则贴合了 Buffer 的概念。

参考资料

Linux Network Programming, Part 1 (linuxjournal.com)

详解socket中的backlog 参数 - 知乎 (zhihu.com)

BIO到NIO源码的一些事儿之BIO - 掘金 (juejin.cn)

CachedThreadPool的工作原理

源码:


public static ExecutorService newCachedThreadPool() {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                  60L, TimeUnit.SECONDS, //60s 
                                  new SynchronousQueue<Runnable>());  
}

(1)corePoolSize = 0,maximumPoolSize = 最大值(无限大),keepAliveTime = 60s,workQueue = SynchronousQueue

(2)SynchronousQueue(实际上没有存储数据的空闲,是用来做多线程通信之间的协调作用的)。一开始提交一个任务过来,要求线程池里必须有一个线程对应可以处理这个任务,但是此时一个线程都没有,poolSize >= corePoolSize , workQueue已经满了,poolSize < maximumPoolSize(最大值),直接就会创建一个新的线程来处理这个任务

这样的效果也就是来一个任务就开一个线程,无界,无限开新线程,线程过多容易导致JVM的压力过大甚至直接崩溃。这也是为什么阿里巴巴规范禁掉这个方法的直接原因,容易误用。

(3)如果短期内有大量的任务都涌进来,实际上是走一个直接提交的思路,对每个任务,如果没法找到一个空闲的线程来处理它,那么就会立即创建一个新的线程出来,来处理这个新提交的任务

(4)短时间内,如果大量的任务涌入,可能会导致瞬间创建出来几百个线程,几千个线程,是不固定的。

(5)但是当这些线程工作完一段时间之后,就会处于空闲状态,就会看超过60s的空闲,就会直接将空闲的线程给释放掉。

相关文章
|
1月前
|
Cloud Native Java 编译器
Java生态系统的进化:从JDK 1.0到今天
Java生态系统的进化:从JDK 1.0到今天
|
1月前
|
Java 编译器 测试技术
滚雪球学Java(03):你知道JDK、JRE和JVM的不同吗?看这里就够了!
【2月更文挑战第12天】🏆本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,助你一臂之力,带你早日登顶🚀,欢迎大家关注&&收藏!持续更新中,up!up!up!!
105 4
|
1天前
|
缓存 Java 测试技术
Java基础BIO、NIO、AIO小结(上)
Java基础BIO、NIO、AIO小结
6 0
|
11天前
|
Oracle Java 关系型数据库
Java 开发者必备:JDK 版本详解与选择策略(含安装与验证)
Oracle Java SE 支持路线图显示,JDK 8(LTS)支持至2030年,非LTS版本如9-11每6个月发布且支持有限。JDK 11(LTS)支持至2032年,而JDK 17及以上版本现在提供免费商用许可。LTS版本提供长达8年的支持,每2年发布一次。Oracle JDK与OpenJDK有多个社区和公司构建版本,如Adoptium、Amazon Corretto和Azul Zulu,它们在许可证、商业支持和更新方面有所不同。个人选择JDK时,可考虑稳定性、LTS、第三方兼容性和提供商支持。
24 0
|
20天前
|
Java 应用服务中间件 Linux
Caused by: java.lang.UnsatisfiedLinkError: /root/jdk1.7.0_45/jre/lib/amd64/xawt/libmawt.so: libXrend
Caused by: java.lang.UnsatisfiedLinkError: /root/jdk1.7.0_45/jre/lib/amd64/xawt/libmawt.so: libXrend
|
1月前
|
Java Maven
运行maven项目出现Error:java: JDK isn‘t specified for module ‘XXX‘
运行maven项目出现Error:java: JDK isn‘t specified for module ‘XXX‘
14 0
|
1月前
|
Java
916.【Java】javap 查看 class 文件的jdk编译版本
916.【Java】javap 查看 class 文件的jdk编译版本
49 2
|
1月前
|
Java 编译器 测试技术
滚雪球学Java(04):JDK、IntelliJ IDEA的安装和环境变量配置
【2月更文挑战第11天】🏆本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,助你一臂之力,带你早日登顶🚀,欢迎大家关注&&收藏!持续更新中,up!up!up!!
49 1
|
3天前
|
数据采集 存储 Java
高德地图爬虫实践:Java多线程并发处理策略
高德地图爬虫实践:Java多线程并发处理策略
|
5天前
|
安全 Java 调度
Java线程:深入理解与实战应用
Java线程:深入理解与实战应用
24 0