【从入门到放弃-Java】并发编程-NIO-Selector

简介: 前言 前两篇【从入门到放弃-Java】并发编程-NIO-Channel和【从入门到放弃-Java】并发编程-NIO-Buffer中我们学习了NIO中两个重要的概念Channel和Buffer。今天我们来看下另一个重要的内容 Selector 简介 Selector是多路复用器,会不断轮询已经注册了的Channel。

前言

前两篇【从入门到放弃-Java】并发编程-NIO-Channel【从入门到放弃-Java】并发编程-NIO-Buffer中我们学习了NIO中两个重要的概念Channel和Buffer。
今天我们来看下另一个重要的内容 Selector

简介

Selector是多路复用器,会不断轮询已经注册了的Channel。当有注册的channel产生连接、读、写等事件时,就会被Selector发现,从而可以进行相关后续操作。

Selector的好处是,可以通过一个线程来管理多个通道,减少了创建线程的资源占用及线程切换带来的消耗

Selector

SelectableChannel可以通过SelectionKey(记录channel和selector的注册关系)注册到Selector上。Selector维护了三个SelectionKey集合:

  • key set:存放了Selector上已经注册了的Channel的key。可以通过keys()方法获取。
  • selected-key set:当之前注册感兴趣的事件到达时,set中的keys会被更新或添加,set中维护了当前至少有一个可以操作的事件的channel key的集合。是key set的子集。可以使用selectedKeys()获取。
  • cancelled-key:存放已经调用cancel方法取消,等待下次操作时会调用deregister取消注册的channel,调用deregister后,所有的set中都没有这个channel的key了。

open

/**
 * Opens a selector.
 *
 * <p> The new selector is created by invoking the {@link
 * java.nio.channels.spi.SelectorProvider#openSelector openSelector} method
 * of the system-wide default {@link
 * java.nio.channels.spi.SelectorProvider} object.  </p>
 *
 * @return  A new selector
 *
 * @throws  IOException
 *          If an I/O error occurs
 */
public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}

开启selector,具体的实现会根据操作系统类型创建不同的实现类,如macOS下实际上是new了一个KQueueSelectorProvider实例,低层基于操作系统的kqueue实现。

register

protected final SelectionKey register(AbstractSelectableChannel ch,
                                          int ops,
                                          Object attachment)
{
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    //新建一个SelectionKey,记录channel与selector之间的注册关系
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);

    //前置操作,这里主要是判断下selector是否还处于open状态
    // register (if needed) before adding to key set
    implRegister(k);

    // 添加selectionKey至key set
    // add to the selector's key set, removing it immediately if the selector
    // is closed. The key is not in the channel's key set at this point but
    // it may be observed by a thread iterating over the selector's key set.
    keys.add(k);
    try {
        // 更新注册的事件码
        k.interestOps(ops);
    } catch (ClosedSelectorException e) {
        assert ch.keyFor(this) == null;
        keys.remove(k);
        k.cancel();
        throw e;
    }
    return k;
}

注册selector和channel之间的事件关系。

select

// timeout超时
@Override
public final int select(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}

@Override
public final int select() throws IOException {
    return lockAndDoSelect(null, -1);
}

// 不阻塞
@Override
public final int selectNow() throws IOException {
    return lockAndDoSelect(null, 0);
}

private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
{
    synchronized (this) {
        ensureOpen();
        if (inSelect)
            throw new IllegalStateException("select in progress");
        inSelect = true;
        try {
            synchronized (publicSelectedKeys) {
                return doSelect(action, timeout);
            }
        } finally {
            inSelect = false;
        }
    }
}

protected int doSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
{
    assert Thread.holdsLock(this);

    // 如果timeout = 0时,不阻塞
    long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
    boolean blocking = (to != 0);
    boolean timedPoll = (to > 0);

    int numEntries;
    processUpdateQueue();
    processDeregisterQueue();
    try {
        // 设置interrupt 可以处理中断信号 防止线程一直阻塞
        begin(blocking);

        // 轮询的监听,直到有注册的事件发生或超时。
        do {
            long startTime = timedPoll ? System.nanoTime() : 0;
            numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
            if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
                // timed poll interrupted so need to adjust timeout
                long adjust = System.nanoTime() - startTime;
                to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
                if (to <= 0) {
                    // timeout expired so no retry
                    numEntries = 0;
                }
            }
        } while (numEntries == IOStatus.INTERRUPTED);
        assert IOStatus.check(numEntries);

    } finally {
        end(blocking);
    }
    processDeregisterQueue();
    return processEvents(numEntries, action);
}

selectedKeys

public final Set<SelectionKey> selectedKeys() {
    ensureOpen();
    return publicSelectedKeys;
}

获取被事件唤醒的key
注意:当被遍历处理selectedKeys时,key被处理完需要手动remove掉,防止下次被重复消费,selectedKeys不会帮你删除已处理过的key。

close

public final void close() throws IOException {
    boolean open = selectorOpen.getAndSet(false);
    if (!open)
        return;
    implCloseSelector();
}


public final void implCloseSelector() throws IOException {
    //通知处于阻塞的select方法立即返回
    wakeup();
    synchronized (this) {
        implClose();
        synchronized (publicSelectedKeys) {
            // 遍历所有的SelectionKey,取消注册
            // Deregister channels
            Iterator<SelectionKey> i = keys.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                deregister(ski);
                SelectableChannel selch = ski.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
                selectedKeys.remove(ski);
                i.remove();
            }
            assert selectedKeys.isEmpty() && keys.isEmpty();
        }
    }
}

SelectionKey

SelectionKey在channel register时创建。用来记录channel和selector之间的注册事件关系。
事件主要有:

  • OP_READ
  • OP_WRITE
  • OP_CONNECT
  • OP_ACCEPT

每个SelectionKey有两个由整数表示的操作集合,用来标识channel支持的操作类型。

interest set:是在创建SelectionKey时定义的,当集合中的操作发生时,将会把channel置为ready状态
ready set:检测到selector中已经就绪的操作类型集合

channel

public SelectableChannel channel() {
    return (SelectableChannel)channel;
}

获取SelectionKey中的channel

selector

public Selector selector() {
    return selector;
}

获取SelectionKey中的selector

isReadable

public final boolean isReadable() {
    return (readyOps() & OP_READ) != 0;
}

根据readyOps(readySet)判断channel是否是可读状态

isWritable

public final boolean isWritable() {
    return (readyOps() & OP_WRITE) != 0;
}

根据readyOps(readySet)判断channel是否是可写状态

isConnectable

public final boolean isConnectable() {
    return (readyOps() & OP_CONNECT) != 0;
}

根据readyOps(readySet)判断channel是否是connect状态,通常是客户端使用,判断连接是否建立

isReadable

public final boolean isAcceptable() {
    return (readyOps() & OP_ACCEPT) != 0;
}

根据readyOps(readySet)判断channel是否是accept状态,通常是服务端使用,判断是否有客户端请求建立连接

总结

通过使用selector,可以使用一个线程来管理多个连接。需要注意的一点是,通常读、写操作都是比较耗时的,为了提高服务端的性能应该把Selector::select和read、write的具体处理逻辑在不同的线程中处理。
即:使用一个线程来进行select,只做分发。在获取到就绪的SelectionKey后,通过线程池在不同的线程中处理读写操作。

通过学习完NIO相关的知识,我们可以很清楚的回答下面这个问题

  • 问:基于BIO实现的server端,当建立100个连接时,需要多少个线程?基于NIO实现的呢?
  • 答:基于BIO实现的server端,通常需要由一个线程accept,并为每个新建立的连接创建一个线程去处理IO操作,因此需要 1个accept线程+100个IO线程
    基于NIO实现的server端,使用Selector多路复用机制,由一个线程进行select,为了提高并发可以使用线程池来处理IO操作,通常为了发挥CPU的性能会创建(cpu核数 x 2)个线程来处理IO操作。因此需要 1个select线程 + cpu核数 x 2 个IO线程

更多文章见:https://nc2era.com

目录
相关文章
|
13天前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
20 0
|
2月前
|
存储 安全 Java
从入门到精通:Java Map全攻略,一篇文章就够了!
【10月更文挑战第17天】本文详细介绍了Java编程中Map的使用,涵盖Map的基本概念、创建、访问与修改、遍历方法、常用实现类(如HashMap、TreeMap、LinkedHashMap)及其特点,以及Map在多线程环境下的并发处理和性能优化技巧,适合初学者和进阶者学习。
61 3
|
16天前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
36 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
2天前
|
存储 监控 Java
Java的NIO体系
通过本文的介绍,希望您能够深入理解Java NIO体系的核心组件、工作原理及其在高性能应用中的实际应用,并能够在实际开发中灵活运用这些知识,构建高效的Java应用程序。
19 5
|
12天前
|
监控 架构师 Java
Java虚拟机调优的艺术:从入门到精通####
本文作为一篇深入浅出的技术指南,旨在为Java开发者揭示JVM调优的神秘面纱,通过剖析其背后的原理、分享实战经验与最佳实践,引领读者踏上从调优新手到高手的进阶之路。不同于传统的摘要概述,本文将以一场虚拟的对话形式,模拟一位经验丰富的架构师向初学者传授JVM调优的心法,激发学习兴趣,同时概括性地介绍文章将探讨的核心议题——性能监控、垃圾回收优化、内存管理及常见问题解决策略。 ####
|
18天前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
59 6
|
1月前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
1月前
|
Java 大数据 API
14天Java基础学习——第1天:Java入门和环境搭建
本文介绍了Java的基础知识,包括Java的简介、历史和应用领域。详细讲解了如何安装JDK并配置环境变量,以及如何使用IntelliJ IDEA创建和运行Java项目。通过示例代码“HelloWorld.java”,展示了从编写到运行的全过程。适合初学者快速入门Java编程。
|
1月前
|
存储 缓存 安全
Java内存模型(JMM):深入理解并发编程的基石####
【10月更文挑战第29天】 本文作为一篇技术性文章,旨在深入探讨Java内存模型(JMM)的核心概念、工作原理及其在并发编程中的应用。我们将从JMM的基本定义出发,逐步剖析其如何通过happens-before原则、volatile关键字、synchronized关键字等机制,解决多线程环境下的数据可见性、原子性和有序性问题。不同于常规摘要的简述方式,本摘要将直接概述文章的核心内容,为读者提供一个清晰的学习路径。 ####
41 2
|
1月前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?