【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)(二)

简介: 【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)

【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)(一)https://developer.aliyun.com/article/1395315

java.net.PlainSocketImpl#socketAccept

不同的操作系统实现不同,这里仅以个人看到的JDK11版本源码为例。


@Override
void socketAccept(SocketImpl s) throws IOException {
  int nativefd = checkAndReturnNativeFD();
  if (s == null)
    throw new NullPointerException("socket is null");
  int newfd = -1;
  InetSocketAddress[] isaa = new InetSocketAddress[1];
  if (timeout <= 0) {
    newfd = accept0(nativefd, isaa);
  } else {
    configureBlocking(nativefd, false);
    try {
      waitForNewConnection(nativefd, timeout);
      newfd = accept0(nativefd, isaa);
      if (newfd != -1) {
        configureBlocking(newfd, true);
      }
    } finally {
      configureBlocking(nativefd, true);
    }
  }
  /* Update (SocketImpl)s' fd */
  fdAccess.set(s.fd, newfd);
  /* Update socketImpls remote port, address and localport */
  InetSocketAddress isa = isaa[0];
  s.port = isa.getPort();
  s.address = isa.getAddress();
  s.localport = localport;
  if (preferIPv4Stack && !(s.address instanceof Inet4Address))
    throw new SocketException("Protocol family not supported");
}

我们只关心下面这部分代码,方法中首先判断 timeout 是否小于等于0(如果没有设置,那么默认就是 0),如果是则走accept0(nativefd, isaa)方法。

前面反复提到的,accept操作核心实现这是下面的 native accept0 方法,具体操作是:

在操作系统层面检查bind的端口上是否有客户端数据接入,如果没有则一直阻塞等待


if (timeout <= 0) {
  newfd = accept0(nativefd, isaa);
} else {
  configureBlocking(nativefd, false);
  try {
    waitForNewConnection(nativefd, timeout);
    newfd = accept0(nativefd, isaa);  
    if (newfd != -1) {
      configureBlocking(newfd, true);
    }
  } finally {
    configureBlocking(nativefd, true);
  }
} // <4>


static native int accept0(int fd, InetSocketAddress[] isaa) throws IOException;

因为操作系统层面的阻塞需要影响到应用程序级别阻塞?显然accept0(nativefd, isaa)的操作系统层面阻塞是无 法避免的。

仔细观察代码,上面的代码分支提供了另外一种选择, timeout 的值设置大于0的值,此时程序会在等到我们设置的时间后返回,并且只会阻塞设置的这个时间量的值(单位毫秒)。

注意,这里的 newfd 如果是 -1,表示底层没有任何数据返回,在Linux的文档中也有对应的介绍。

java.net.ServerSocket#setSoTimeout

既然不阻塞的关键参数是timeout , 接下来我们看下 timeout 值要如何设置。


/**
Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds. With this option set to a non-zero timeout, a call to accept() for this ServerSocket will block for only this amount of time. If the timeout expires, a java.net.SocketTimeoutException is raised, though the ServerSocket is still valid. The option must be enabled prior to entering the blocking operation to have effect. The timeout must be > 0. A timeout of zero is interpreted as an infinite timeout.
启用/禁用SO_TIMEOUT,指定超时时间,单位为毫秒。在这个选项被设置为非零超时的情况下,对这个ServerSocket的accept()的调用将只阻塞这个时间量。如果超时过后,会引发java.net.SocketTimeoutException,尽管ServerSocket仍然有效。该选项必须在进入阻塞操作之前启用才能生效。超时必须大于0。超时为0会被解释为无限期超时。
*/
public synchronized void setSoTimeout(int timeout) throws SocketException {  
    if (isClosed())  
        throw new SocketException("Socket is closed");  
    getImpl().setOption(SocketOptions.SO_TIMEOUT, timeout);  
}

简单明了,java.net.SocketOptions#setOption 方法最终调用的是java.net.AbstractPlainSocketImpl#setOption()

java.net.AbstractPlainSocketImpl#setOption


public void setOption(int opt, Object val) throws SocketException {  
    if (isClosedOrPending()) {  
        throw new SocketException("Socket Closed");  
    }  
    boolean on = true;  
    switch (opt) {  
  case SO_LINGER:  
        //..
    case SO_TIMEOUT:  
        if (val == null || (!(val instanceof Integer)))
                throw new SocketException("Bad parameter for SO_TIMEOUT");
            int tmp = ((Integer) val).intValue();
            if (tmp < 0)
                throw new IllegalArgumentException("timeout < 0");
            timeout = tmp;
            break;
    case TCP_NODELAY:  
        //....
    case SO_RCVBUF:  
        //....
    case SO_KEEPALIVE:  
        //....
    }  
    socketSetOption(opt, on, val);  
}

为了方便阅读,这里把其他的代码都删除了,只保留传参部分。

可以看到,这里仅仅是将setOption里面传入的timeout值,设置到了AbstractPlainSocketImpl的全局变量timeout

画图小结

个人认为整个accept() 操作比较”恶心“(个人观点)的是几个引用的赋值变化上面,暂时”解绑“的目的是在进行底层Socket连接的时候,如果Socket出现异常也没有影响,此时Socket持有的引用也是null,可以无阻碍的重新进行下一次Socket连接。

换句话说,整个Socket要么对接成功,要么就是重置回没对接之前的状态可以进行下一次尝试,保证ServerSocket会收到一个没有任何异常的Socket连接

最后再看一眼图:

image.png

改造并实现accept的非阻塞实现

在进行案例程序的改造之前,必须要先理解同步、异步、阻塞、非阻塞这几个概念。

这个概念在之前的笔记中 [[《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty]] 【洗衣机案例理解阻塞非阻塞,同步异步概念】这一部分提到过,[[【Java】《2小时搞定多线程》个人笔记]] 中又一次对于这几个概念做了个区分。

区分同步异步的关键点是被调用方的行为,没有得到结果之前,服务端不返回任何结果,那么操作就是同步的。

如果没有得到结果之前,服务器可以返回结果,比如给一个句柄,通过这个句柄可以在未来某个时间点之后获得结果,那么操作就是异步的。

这个句柄可以对应Java 并发编程的 Future 的概念

再举个例子,比如说前面的accept0应用程序调用操作系统,在Linux中就是访问系统内核,此时这一整块逻辑处理是选择”永久等待一个客户端连接“,符合 没有得到结果之前,服务端不返回任何结果 这种情况,所以它是同步的。

区分阻塞和非阻塞的关键点则是 对于调用者而言的服务端状态*,比如我们站在线程状态的角度,阻塞对应 Blocking,非阻塞此时应该对应Running正常执行。再比如站在线程发出请求之后请求方的角度,阻塞和非阻塞分别对应waitingNo waiting

理解同步异步阻塞和非阻塞之后,下面来尝试改造相关代码accept的阻塞问题,实现方式很简单,那就是设置 ** timeout ** , 然后在异常处理上continue重试。


/**  
 * accept 超时时间设置  
 */  
private static final int SO_TIMEOUT = 2000;
/***
 * NIO 改写
 * @description NIO 改写
 * @param port
 * @return void
 * @author xander
 * @date 2023/7/12 10:35
 */
public void initNioServer(int port) {
  ServerSocket serverSocket = null;//服务端Socket
  Socket socket = null;//客户端socket
  BufferedReader reader = null;
  String inputContent;
  int count = 0;
  try {
    serverSocket = new ServerSocket(port);
    // 1. 需要设置超时时间,会等待设置的时间之后再进行返回
    serverSocket.setSoTimeout(SO_TIMEOUT);
    log.info(stringNowTime() + ": serverSocket started");
    while (true) {
      // 2. 如果超时没有获取,这里会抛出异常,这里的处理策略是不处理异常
      try {
        socket = serverSocket.accept();
      } catch (SocketTimeoutException e) {
        //运行到这里表示本次accept是没有收到任何数据的,服务端的主线程在这里可以做一些其他事情
        log.info("now time is: " + stringNowTime());
        continue;
      }
      log.info(stringNowTime() + ": id为" + socket.hashCode() + "的Clientsocket connected");
      reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
      while ((inputContent = reader.readLine()) != null) {
        log.info("收到id为" + socket.hashCode() + "  " + inputContent);
        count++;
      }
      log.info("id为" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "读取结束");
    }
  } catch (IOException e) {
    e.printStackTrace();
  } finally {
    try {
      if(Objects.nonNull(reader)){
        reader.close();
      }
      if(Objects.nonNull(socket)){
        socket.close();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}/**运行结果:
     10:40:49.272 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - 2023-07-12 10:40:49: serverSocket started
     10:40:52.826 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:40:52
     10:40:54.830 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:40:54
     10:40:56.837 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:40:56
     10:40:58.840 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:40:58
     10:41:00.849 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:41:00
     10:41:02.852 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - now time is: 2023-07-12 10:41:02
     */

设置了timeout之后,accept 方法每次都会在间隔指定时间之后被唤醒一次,如果没有收到连接就会抛出异常,我们的处理方式是吞掉异常并且重新accept,这样就实现了类似非阻塞的效果。

小结

Socket 当中 getInputStream() 的方法解析以及后续的read操作结构图如下。

image.png

Socket 中的 getInputStream() 方法解析

实现了非阻塞的accept之后,再来看下另一个会产生阻塞的方法,那就是Socket.getInputStream,这个方法在Socket连接,服务端在read() 读取数据的时候会进行调用。

java.net.Socket#getInputStream


/**
返回该socket的输入流。
如果该套接字有一个关联的通道,那么生成的输入流会将其所有操作委托给该通道。如果通道处于非阻塞模式,那么输入流的读操作将抛出java.nio.channel.IllegalBlockingModeException。
在异常情况下,底层连接可能会被远程主机或网络软件中断(例如在TCP连接中的连接重置)。当网络软件检测到连接断开时,返回的输入流会出现以下情况:
*/
  public InputStream getInputStream() throws IOException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        if (!isConnected())
            throw new SocketException("Socket is not connected");
        if (isInputShutdown())
            throw new SocketException("Socket input is shutdown");
        InputStream is = null;
        try {
            is = AccessController.doPrivileged(
                new PrivilegedExceptionAction<>() {
                    public InputStream run() throws IOException {
                        return impl.getInputStream();
                    }
                });
        } catch (java.security.PrivilegedActionException e) {
            throw (IOException) e.getException();
        }
        return is;
    }

上面通过AccessController进行授权,run方法中调用java.net.AbstractPlainSocketImpl#getInputStream方法。


protected synchronized InputStream getInputStream() throws IOException {  
    synchronized (fdLock) {  
        if (isClosedOrPending())  
            throw new IOException("Socket Closed");  
        if (shut_rd)  
            throw new IOException("Socket input is shutdown");  
        if (socketInputStream == null)  
            socketInputStream = new SocketInputStream(this);  
    }  
    return socketInputStream;  
}

可以看到,代码中创建了 SocketInputStream 对象,并且会将当前AbstractPlainSocketImpl对象传进去(这个对象实际就是 SocksSocketImpl )。

read读数据的时候,则会调用如下方法:


public int read(byte b[], int off, int length) throws IOException {  
    return read(b, off, length, impl.getTimeout());  
}


int read(byte b[], int off, int length, int timeout) throws IOException {
  int n;
  // EOF already encountered
  if (eof) {
    return -1;
  }
  // connection reset
  if (impl.isConnectionReset()) {
    throw new SocketException("Connection reset");
  }
  // bounds check
  if (length <= 0 || off < 0 || length > b.length - off) {
    if (length == 0) {
      return 0;
    }
    throw new ArrayIndexOutOfBoundsException("length == " + length
        + " off == " + off + " buffer length == " + b.length);
  }
  // acquire file descriptor and do the read
  // 获取文件描述符并进行读取
  FileDescriptor fd = impl.acquireFD();
  try {
    n = socketRead(fd, b, off, length, timeout);
    if (n > 0) {
      return n;
    }
  } catch (ConnectionResetException rstExc) {
    impl.setConnectionReset();
  } finally {
    impl.releaseFD();
  }
  /*
   * If we get here we are at EOF, the socket has been closed,
   * or the connection has been reset.
   */
  if (impl.isClosedOrPending()) {
    throw new SocketException("Socket closed");
  }
  if (impl.isConnectionReset()) {
    throw new SocketException("Connection reset");
  }
  eof = true;
  return -1;
}

重点关注下面这一行代码,这里在读取的时候同样传递了 timeout 参数:


n = socketRead(fd, b, off, length, timeout);

socketRead 方法会调用 nativesocketRead0 方法,timeout 代表了读取的超时时间。


private native int socketRead0(FileDescriptor fd,  
                               byte b[], int off, int len,  
                               int timeout)  
    throws IOException;

timeout 参数源于前面的new SocketInputStream(this)(也就是 AbstractPlainSocketImpl 对象)中的this引用impl.getTimeout(),这个参数的作用是指定read的超时时间,超时之后没有结果抛出异常。


serverSocket.setSoTimeout(SO_TIMEOUT);

了解read方法中timeout的作用之后,我们便可以着手改造代码了,具体的改造部分个人放到后文单独的 titile 进行说明,方便后续回顾。

此外,这里经过仔细考虑,判断这部分代码读者很有可能会存在理解误区,误以为此处的 AbstractPlainSocketImpl 属于 ServerSocket,实际上它属于 Socket,也就是说我们设置的 timeout 是设置到 SocketAbstractPlainSocketImpl

最为简单的证明方法是先在  java.net.Socket#setImpl 中打上断点,在启动BIO的服务端之后,立即启动客户端,具体的Debug断点如下:

image.png

通过单步调试,我们在BioServerSocket 中看到两个对象是不一样的。

image.png

image.png

为什么不一样呢?这里需要回顾前面的【ServerSocket中accept 解读】这一部分的操作。这里把重要操作标记了一下:

image.png

这里复习之前提到的内容,在accept(); 中为了确保Socket连接是正确并且可用的,每次都会new Socket(),而这里的SocksSocketImpl 是属于 Socket 的成员变量。

在进行Socket套接字连接之前会先判断是否初始化,如果初始化没有就先进行初始化(具体可以看红框框的位置)。

如果还是理解不了,那么只能再次寄出另一张杀手锏图了:

image.png

实现 Socket 中的 read 方法非阻塞

AbstractPlainSocketImpl实现socketRead方法非阻塞,具体做法其实就是使用 AbstractPlainSocketImpl 传入了 timeout 参数,实现 SocketInputStream 非阻塞read

表面上看上去 read 方法是非阻塞的,实际上这里存在一个明显的 误区,那就是在socket = serverSocket.accept();这一段代码中,服务端构建出 Socket 连接之后,客户端和服务端交互是通过独立的Socket对象完成IO读写的。

然而在第一次改造过后,实际上还有两点不易察觉的问题:

(1)服务端read的非阻塞轮询效率非常低,基本上是“一核繁忙、多核围观”的情况。

(2)第一次改造设置的是设定的是ServerSocket级别SocksSocketImpl的timeout。每个新的客户端进来都是新的Socket连接,每个Socket又有各自的 SocksSocketImpl,这里客户端连接所产生新的Sockettimeout是没有做设置的,换句话说,服务端针对每个Socket的read依然是完全阻塞。

前文提到,在BIO非阻塞同步模型中,我们虽然没法解决 系统底层"同步" 问题,但是我们可以让“非阻塞”这一块更为优化合理和更为高效。

第一个问题的解决策略是启动多线程以非阻塞read()方式轮询,这样做的另一点好处是,某个Socket读写压力大并不会影响CPU 切到其他线程的正常工作。

解决第二点问题,我们需要为每个新的Socket设置 timeout

解决上面两个问题,真正BIO非阻塞实现才算是真正成立,下面我们来看下第二版代码优化:


/**
     * 1. NIO 改写,accept 非阻塞
     * 2. 实现 read() 同样非阻塞
     *
     * @param port
     * @return void
     * @description
     * @author xander
     * @date 2023/7/12 16:38
     */
    public void initNioAndNioReadServer(int port) {
        ServerSocket serverSocket = null;//服务端Socket
        Socket socket = null;//客户端socket
        BufferedReader reader = null;
        ExecutorService threadPool = Executors.newCachedThreadPool();
        String inputContent;
        int count = 0;
        try {
            serverSocket = new ServerSocket(port);
            // 1. 需要设置超时时间,会等待设置的时间之后再进行返回
            serverSocket.setSoTimeout(SO_TIMEOUT);
            log.info(stringNowTime() + ": serverSocket started");
            while (true) {
                // 2. 如果超时没有获取,这里会抛出异常,这里的处理策略是不处理异常
                try {
                    socket = serverSocket.accept();
                } catch (SocketTimeoutException e) {
                    //运行到这里表示本次accept是没有收到任何数据的,服务端的主线程在这里可以做一些其他事情
                    log.info("now time is: " + stringNowTime());
                    continue;
                }
                // 3. 拿到Socket 之后,应该使用线程池新开线程方式处理客户端连接,提高CPU利用率。
                Thread thread = new Thread(new ClientSocketThread(socket));
                threadPool.execute(thread);
//                log.info(stringNowTime() + ": id为" + socket.hashCode() + "的Clientsocket connected");
//                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//
//                while ((inputContent = reader.readLine()) != null) {
//                    log.info("收到 id为" + socket.hashCode() + "  " + inputContent);
//                    count++;
//                }
//                log.info("id为" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "读取结束");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (Objects.nonNull(reader)) {
                    reader.close();
                }
                if (Objects.nonNull(socket)) {
                    socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 改写 客户端 Socket 连接为单独线程处理
     */
    class ClientSocketThread implements Runnable {
        private static final int SO_TIMEOUT = 2000;
        private static final int SLEEP_TIME = 1000;
        public final Socket socket;
        public ClientSocketThread(Socket socket) {
            this.socket = socket;
        }
        @Override
        public void run() {
            BufferedReader reader = null;
            String inputContent;
            int count = 0;
            try {
                socket.setSoTimeout(SO_TIMEOUT);
            } catch (SocketException e1) {
                e1.printStackTrace();
            }
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                while (true) {
                    try {
                        while ((inputContent = reader.readLine()) != null) {
                            log.info("收到id为" + socket.hashCode() + "  " + inputContent);
                            count++;
                        }
                    } catch (Exception e) {
                        //执行到这里表示read方法没有获取到任何数据,线程可以执行一些其他的操作
                        log.info("Not read data: " + stringNowTime());
                        continue;
                    }
                    //执行到这里表示读取到了数据,我们可以在这里进行回复客户端的工作
                    log.info("id为" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "读取结束");
                    Thread.sleep(SLEEP_TIME);
                }
            } catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                try {
                    if (Objects.nonNull(reader)) {
                        reader.close();
                    }
                    if (Objects.nonNull(socket)) {
                        socket.close();
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

经过上面的改造,我们基本把 BIO 同步阻塞的工作方式更新为 同步非阻塞的工作方式,核心是对于 read()以及服务端接收新连接的accept()设置timeout参数。

在外部处理上,通过while(true) 加上“吞异常”方式,结合Thread.sleep()的套路实现“非阻塞”定期accept

当然,我们也可以看到,通过线程池每次都构建新线程的方式,在连接比较少的时候是比较高效的,但是一旦连接暴增,理论上JVM虽然可以构建非常多线程,实际上CPU肯定是吃不消,多线程“空轮询”判断的方式也十分浪费CPU资源,多线程切换起来更是雪上加霜。

基于BIO的种种弊端,Sun 在JDK1.4 提供了 NIO 来解决上面的几点问题。

native accept方法在Linux运作解读

accept(2): accept connection on socket - Linux man page (die.net)

原始文档相关解读:[[【Linux】accept(2) - Linux man page]],下面的内容基本为文档的翻译和理解介绍。

accept()本地方法,我们可以来试着看一看Linux这块的相关解读:


#include <sys/types.h>
#include <sys/socket.h>
int accept(int sockfd,struct sockaddr *addr,socklen_t *addrlen);

accept()系统调用主要用在基于连接的套接字类型,比如SOCK_STREAMSOCK_SEQPACKET。它提取出所监听套接字的等待连接队列中第一个连接请求,创建一个新的套接字,并返回指向该套接字的文件描述符。新建立的套接字不在监听状态,原来所监听的套接字也不受该系统调用的影响。

备注:新建立的套接字准备发送send()和接收数据recv()

sockfd,作用是 利用系统调用socket()建立的套接字描述符,通过bind()绑定到一个本地地址(一般为服务器的套接字),并且通过listen()一直在监听连接;

addr, 指向struct sockaddr的指针,该结构用通讯层服务器对等套接字的地址(一般为客户端地址)填写,返回地址addr的确切格式由套接字的地址类别(比如TCP或UDP)决定;

addr为NULL,没有有效地址填写,这种情况下,addrlen也不使用,应该置为NULL;

备注:addr是个指向局部数据结构sockaddr_in的指针,这就是要求接入的信息本地的套接字(地址和指针)。

addrlen, 代表一个值结果参数,调用函数必须初始化为包含addr所指向结构大小的数值,函数返回时包含对等地址(一般为服务器地址)的实际数值;

备注:addrlen是个局部整形变量,设置为sizeof(struct sockaddr_in)

如果队列中没有等待的连接,套接字也没有被标记为Non-blocking,accept()会阻塞调用函数直到连接出现;如果套接字被标记为Non-blocking,队列中也没有等待的连接,accept()返回错误EAGAINEWOULDBLOCK

备注:一般来说accept()为阻塞函数,当监听socket调用accept()时,它先到自己的receive_buf中查看是否有连接数据包;若有,把数据拷贝出来,删掉接收到的数据包,创建新的socket与客户发来的地址建立连接;若没有,就阻塞等待;

为了在套接字中有到来的连接时得到通知,可以使用select()poll()。当尝试建立新连接时,系统发送一个可读事件,然后调用accept()为该连接获取套接字。另一种方法是,当套接字中有连接到来时设定套接字发送SIGIO信号。

返回值成功时,返回非负整数,该整数是接收到套接字的描述符;出错时会返回-1,相应地设定全局变量error。

所以,在Java部分的源码里(java.net.ServerSocket#accept)会new 一个Socket出来,方便连接后拿到的新Socket的文件描述符的信息给设定到我们new出来的这个Socket 上来,这点在java.net.PlainSocketImpl#socketAccept中看到的尤为明显,读者可以回顾相关源码。

总结

本文一开始介绍了Bio Socket的基本代码,接着从ServerSocketbind方法解读,通过图文结合的方式介绍了源码如何处理,整个bind操作过程中有许多native层调用,所以Socket的代码调试是非常麻烦的。

介绍完bind之后,我们接着介绍了ServerSocketaccept方法,并且介绍了accept 方法的阻塞问题实际上和底层的操作系统行为有关,并且通过画图的方式理解accept中Socket连接比较“绕”的操作。

最后,文章的后半部分介绍了如何改造accept以及客户端的Socket连接解决非阻塞问题IO,最后我们介绍了  native accept方法在Linux运作,主要内容为Linux的相关文档理解。

写在最后

理解Socket的非阻塞操作有助于理解 NIO的Channel和Buffer的概念,实际上从我们的Demo代码可以看到Channel和非阻塞的BIO思路比较类似,而BufferReader缓冲流则贴合了 Buffer 的概念。

参考资料

Linux Network Programming, Part 1 (linuxjournal.com)

详解socket中的backlog 参数 - 知乎 (zhihu.com)

BIO到NIO源码的一些事儿之BIO - 掘金 (juejin.cn)

CachedThreadPool的工作原理

源码:


public static ExecutorService newCachedThreadPool() {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                  60L, TimeUnit.SECONDS, //60s 
                                  new SynchronousQueue<Runnable>());  
}

(1)corePoolSize = 0,maximumPoolSize = 最大值(无限大),keepAliveTime = 60s,workQueue = SynchronousQueue

(2)SynchronousQueue(实际上没有存储数据的空闲,是用来做多线程通信之间的协调作用的)。一开始提交一个任务过来,要求线程池里必须有一个线程对应可以处理这个任务,但是此时一个线程都没有,poolSize >= corePoolSize , workQueue已经满了,poolSize < maximumPoolSize(最大值),直接就会创建一个新的线程来处理这个任务

这样的效果也就是来一个任务就开一个线程,无界,无限开新线程,线程过多容易导致JVM的压力过大甚至直接崩溃。这也是为什么阿里巴巴规范禁掉这个方法的直接原因,容易误用。

(3)如果短期内有大量的任务都涌进来,实际上是走一个直接提交的思路,对每个任务,如果没法找到一个空闲的线程来处理它,那么就会立即创建一个新的线程出来,来处理这个新提交的任务

(4)短时间内,如果大量的任务涌入,可能会导致瞬间创建出来几百个线程,几千个线程,是不固定的。

(5)但是当这些线程工作完一段时间之后,就会处于空闲状态,就会看超过60s的空闲,就会直接将空闲的线程给释放掉。

相关文章
|
2月前
|
Java
利用GraalVM将java文件变成exe可执行文件
这篇文章简明地介绍了如何使用GraalVM将一个简单的Java程序编译成exe可执行文件,首先通过javac命令编译Java文件生成class文件,然后使用native-image命令将class文件转换成独立的exe文件,并展示了如何运行这个exe文件。
98 0
利用GraalVM将java文件变成exe可执行文件
|
23天前
|
Oracle 安全 Java
深入理解Java生态:JDK与JVM的区分与协作
Java作为一种广泛使用的编程语言,其生态中有两个核心组件:JDK(Java Development Kit)和JVM(Java Virtual Machine)。本文将深入探讨这两个组件的区别、联系以及它们在Java开发和运行中的作用。
48 1
|
1月前
|
IDE Java 编译器
开发 Java 程序一定要安装 JDK 吗
开发Java程序通常需要安装JDK(Java Development Kit),因为它包含了编译、运行和调试Java程序所需的各种工具和环境。不过,某些集成开发环境(IDE)可能内置了JDK,或可使用在线Java编辑器,无需单独安装。
66 1
|
2月前
|
缓存 Java Maven
java: 警告: 源发行版 11 需要目标发行版 11 无效的目标发行版: 11 jdk版本不符,项目jdk版本为其他版本
如何解决Java项目中因JDK版本不匹配导致的编译错误,包括修改`pom.xml`文件、调整项目结构、设置Maven和JDK版本,以及清理缓存和重启IDEA。
61 1
java: 警告: 源发行版 11 需要目标发行版 11 无效的目标发行版: 11 jdk版本不符,项目jdk版本为其他版本
|
2月前
|
设计模式 Java API
[Java]静态代理与动态代理(基于JDK1.8)
本文介绍了代理模式及其分类,包括静态代理和动态代理。静态代理分为面向接口和面向继承两种形式,分别通过手动创建代理类实现;动态代理则利用反射技术,在运行时动态创建代理对象,分为JDK动态代理和Cglib动态代理。文中通过具体代码示例详细讲解了各种代理模式的实现方式和应用场景。
33 0
[Java]静态代理与动态代理(基于JDK1.8)
|
2月前
|
Java
Java基础之 JDK8 HashMap 源码分析(中间写出与JDK7的区别)
这篇文章详细分析了Java中HashMap的源码,包括JDK8与JDK7的区别、构造函数、put和get方法的实现,以及位运算法的应用,并讨论了JDK8中的优化,如链表转红黑树的阈值和扩容机制。
35 1
|
3月前
|
Oracle Java 关系型数据库
Linux下JDK环境的配置及 bash: /usr/local/java/bin/java: cannot execute binary file: exec format error问题的解决
如果遇到"exec format error"问题,文章建议先检查Linux操作系统是32位还是64位,并确保安装了与系统匹配的JDK版本。如果系统是64位的,但出现了错误,可能是因为下载了错误的JDK版本。文章提供了一个链接,指向Oracle官网上的JDK 17 Linux版本下载页面,并附有截图说明。
Linux下JDK环境的配置及 bash: /usr/local/java/bin/java: cannot execute binary file: exec format error问题的解决
|
7天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
37 6
|
22天前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
20天前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
下一篇
DataWorks