【从入门到放弃-Java】并发编程-NIO-Selector

简介: 前言 前两篇【从入门到放弃-Java】并发编程-NIO-Channel和【从入门到放弃-Java】并发编程-NIO-Buffer中我们学习了NIO中两个重要的概念Channel和Buffer。今天我们来看下另一个重要的内容 Selector 简介 Selector是多路复用器,会不断轮询已经注册了的Channel。

前言

前两篇【从入门到放弃-Java】并发编程-NIO-Channel【从入门到放弃-Java】并发编程-NIO-Buffer中我们学习了NIO中两个重要的概念Channel和Buffer。
今天我们来看下另一个重要的内容 Selector

简介

Selector是多路复用器,会不断轮询已经注册了的Channel。当有注册的channel产生连接、读、写等事件时,就会被Selector发现,从而可以进行相关后续操作。

Selector的好处是,可以通过一个线程来管理多个通道,减少了创建线程的资源占用及线程切换带来的消耗

Selector

SelectableChannel可以通过SelectionKey(记录channel和selector的注册关系)注册到Selector上。Selector维护了三个SelectionKey集合:

  • key set:存放了Selector上已经注册了的Channel的key。可以通过keys()方法获取。
  • selected-key set:当之前注册感兴趣的事件到达时,set中的keys会被更新或添加,set中维护了当前至少有一个可以操作的事件的channel key的集合。是key set的子集。可以使用selectedKeys()获取。
  • cancelled-key:存放已经调用cancel方法取消,等待下次操作时会调用deregister取消注册的channel,调用deregister后,所有的set中都没有这个channel的key了。

open

/**
 * Opens a selector.
 *
 * <p> The new selector is created by invoking the {@link
 * java.nio.channels.spi.SelectorProvider#openSelector openSelector} method
 * of the system-wide default {@link
 * java.nio.channels.spi.SelectorProvider} object.  </p>
 *
 * @return  A new selector
 *
 * @throws  IOException
 *          If an I/O error occurs
 */
public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}

开启selector,具体的实现会根据操作系统类型创建不同的实现类,如macOS下实际上是new了一个KQueueSelectorProvider实例,低层基于操作系统的kqueue实现。

register

protected final SelectionKey register(AbstractSelectableChannel ch,
                                          int ops,
                                          Object attachment)
{
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    //新建一个SelectionKey,记录channel与selector之间的注册关系
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);

    //前置操作,这里主要是判断下selector是否还处于open状态
    // register (if needed) before adding to key set
    implRegister(k);

    // 添加selectionKey至key set
    // add to the selector's key set, removing it immediately if the selector
    // is closed. The key is not in the channel's key set at this point but
    // it may be observed by a thread iterating over the selector's key set.
    keys.add(k);
    try {
        // 更新注册的事件码
        k.interestOps(ops);
    } catch (ClosedSelectorException e) {
        assert ch.keyFor(this) == null;
        keys.remove(k);
        k.cancel();
        throw e;
    }
    return k;
}

注册selector和channel之间的事件关系。

select

// timeout超时
@Override
public final int select(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}

@Override
public final int select() throws IOException {
    return lockAndDoSelect(null, -1);
}

// 不阻塞
@Override
public final int selectNow() throws IOException {
    return lockAndDoSelect(null, 0);
}

private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
{
    synchronized (this) {
        ensureOpen();
        if (inSelect)
            throw new IllegalStateException("select in progress");
        inSelect = true;
        try {
            synchronized (publicSelectedKeys) {
                return doSelect(action, timeout);
            }
        } finally {
            inSelect = false;
        }
    }
}

protected int doSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
{
    assert Thread.holdsLock(this);

    // 如果timeout = 0时,不阻塞
    long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
    boolean blocking = (to != 0);
    boolean timedPoll = (to > 0);

    int numEntries;
    processUpdateQueue();
    processDeregisterQueue();
    try {
        // 设置interrupt 可以处理中断信号 防止线程一直阻塞
        begin(blocking);

        // 轮询的监听,直到有注册的事件发生或超时。
        do {
            long startTime = timedPoll ? System.nanoTime() : 0;
            numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
            if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
                // timed poll interrupted so need to adjust timeout
                long adjust = System.nanoTime() - startTime;
                to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
                if (to <= 0) {
                    // timeout expired so no retry
                    numEntries = 0;
                }
            }
        } while (numEntries == IOStatus.INTERRUPTED);
        assert IOStatus.check(numEntries);

    } finally {
        end(blocking);
    }
    processDeregisterQueue();
    return processEvents(numEntries, action);
}

selectedKeys

public final Set<SelectionKey> selectedKeys() {
    ensureOpen();
    return publicSelectedKeys;
}

获取被事件唤醒的key
注意:当被遍历处理selectedKeys时,key被处理完需要手动remove掉,防止下次被重复消费,selectedKeys不会帮你删除已处理过的key。

close

public final void close() throws IOException {
    boolean open = selectorOpen.getAndSet(false);
    if (!open)
        return;
    implCloseSelector();
}


public final void implCloseSelector() throws IOException {
    //通知处于阻塞的select方法立即返回
    wakeup();
    synchronized (this) {
        implClose();
        synchronized (publicSelectedKeys) {
            // 遍历所有的SelectionKey,取消注册
            // Deregister channels
            Iterator<SelectionKey> i = keys.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                deregister(ski);
                SelectableChannel selch = ski.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
                selectedKeys.remove(ski);
                i.remove();
            }
            assert selectedKeys.isEmpty() && keys.isEmpty();
        }
    }
}

SelectionKey

SelectionKey在channel register时创建。用来记录channel和selector之间的注册事件关系。
事件主要有:

  • OP_READ
  • OP_WRITE
  • OP_CONNECT
  • OP_ACCEPT

每个SelectionKey有两个由整数表示的操作集合,用来标识channel支持的操作类型。

interest set:是在创建SelectionKey时定义的,当集合中的操作发生时,将会把channel置为ready状态
ready set:检测到selector中已经就绪的操作类型集合

channel

public SelectableChannel channel() {
    return (SelectableChannel)channel;
}

获取SelectionKey中的channel

selector

public Selector selector() {
    return selector;
}

获取SelectionKey中的selector

isReadable

public final boolean isReadable() {
    return (readyOps() & OP_READ) != 0;
}

根据readyOps(readySet)判断channel是否是可读状态

isWritable

public final boolean isWritable() {
    return (readyOps() & OP_WRITE) != 0;
}

根据readyOps(readySet)判断channel是否是可写状态

isConnectable

public final boolean isConnectable() {
    return (readyOps() & OP_CONNECT) != 0;
}

根据readyOps(readySet)判断channel是否是connect状态,通常是客户端使用,判断连接是否建立

isReadable

public final boolean isAcceptable() {
    return (readyOps() & OP_ACCEPT) != 0;
}

根据readyOps(readySet)判断channel是否是accept状态,通常是服务端使用,判断是否有客户端请求建立连接

总结

通过使用selector,可以使用一个线程来管理多个连接。需要注意的一点是,通常读、写操作都是比较耗时的,为了提高服务端的性能应该把Selector::select和read、write的具体处理逻辑在不同的线程中处理。
即:使用一个线程来进行select,只做分发。在获取到就绪的SelectionKey后,通过线程池在不同的线程中处理读写操作。

通过学习完NIO相关的知识,我们可以很清楚的回答下面这个问题

  • 问:基于BIO实现的server端,当建立100个连接时,需要多少个线程?基于NIO实现的呢?
  • 答:基于BIO实现的server端,通常需要由一个线程accept,并为每个新建立的连接创建一个线程去处理IO操作,因此需要 1个accept线程+100个IO线程
    基于NIO实现的server端,使用Selector多路复用机制,由一个线程进行select,为了提高并发可以使用线程池来处理IO操作,通常为了发挥CPU的性能会创建(cpu核数 x 2)个线程来处理IO操作。因此需要 1个select线程 + cpu核数 x 2 个IO线程

更多文章见:https://nc2era.com

目录
相关文章
|
15小时前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第20天】 在Java开发中,正确处理并发问题对于确保应用的稳定性和提高性能至关重要。本文将深入探讨Java并发编程的核心概念——线程安全,以及如何通过各种技术和策略实现它,同时保持甚至提升系统性能。我们将分析并发问题的根源,包括共享资源的竞争条件、死锁以及线程活性问题,并探索解决方案如同步机制、锁优化、无锁数据结构和并发工具类等。文章旨在为开发者提供一个清晰的指南,帮助他们在编写多线程应用时做出明智的决策,确保应用的高效和稳定运行。
|
1天前
|
Java
Java一分钟之-并发编程:线程间通信(Phaser, CyclicBarrier, Semaphore)
【5月更文挑战第19天】Java并发编程中,Phaser、CyclicBarrier和Semaphore是三种强大的同步工具。Phaser用于阶段性任务协调,支持动态注册;CyclicBarrier允许线程同步执行,适合循环任务;Semaphore控制资源访问线程数,常用于限流和资源池管理。了解其使用场景、常见问题及避免策略,结合代码示例,能有效提升并发程序效率。注意异常处理和资源管理,以防止并发问题。
22 2
|
1天前
|
安全 Java 容器
Java一分钟之-并发编程:线程安全的集合类
【5月更文挑战第19天】Java提供线程安全集合类以解决并发环境中的数据一致性问题。例如,Vector是线程安全但效率低;可以使用Collections.synchronizedXxx将ArrayList或HashMap同步;ConcurrentHashMap是高效线程安全的映射;CopyOnWriteArrayList和CopyOnWriteArraySet适合读多写少场景;LinkedBlockingQueue是生产者-消费者模型中的线程安全队列。注意,过度同步可能影响性能,应尽量减少共享状态并利用并发工具类。
15 2
|
2天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【5月更文挑战第18天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将了解线程池的基本概念,应用场景,以及如何优化线程池的性能。通过实例分析,我们将看到线程池如何提高系统性能,减少资源消耗,并提高系统的响应速度。
12 5
|
2天前
|
安全 Java 容器
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第18天】随着多核处理器的普及,并发编程变得越来越重要。Java提供了丰富的并发编程工具,如synchronized关键字、显式锁Lock、原子类、并发容器等。本文将深入探讨Java并发编程的核心概念,包括线程安全、死锁、资源竞争等,并分享一些性能优化的技巧。
|
2天前
|
安全 Java
Java一分钟之-并发编程:原子类(AtomicInteger, AtomicReference)
【5月更文挑战第18天】Java并发编程中的原子类如`AtomicInteger`和`AtomicReference`提供无锁原子操作,适用于高性能并发场景。`AtomicInteger`支持原子整数操作,而`AtomicReference`允许原子更新对象引用。常见问题包括误解原子性、过度依赖原子类以及忽略对象内部状态的并发控制。要避免这些问题,需明确原子操作边界,合理选择同步策略,并精确控制原子更新。示例代码展示了如何使用这两个类。正确理解和使用原子类是构建高效并发程序的关键。
12 1
|
2天前
|
安全 Java 容器
Java一分钟之-并发编程:并发容器(ConcurrentHashMap, CopyOnWriteArrayList)
【5月更文挑战第18天】本文探讨了Java并发编程中的`ConcurrentHashMap`和`CopyOnWriteArrayList`,两者为多线程数据共享提供高效、线程安全的解决方案。`ConcurrentHashMap`采用分段锁策略,而`CopyOnWriteArrayList`适合读多写少的场景。注意,`ConcurrentHashMap`的`forEach`需避免手动同步,且并发修改时可能导致`ConcurrentModificationException`。`CopyOnWriteArrayList`在写操作时会复制数组。理解和正确使用这些特性是优化并发性能的关键。
9 1
|
2天前
|
Java 编译器
Java并发编程中的锁优化策略
【5月更文挑战第18天】在Java并发编程中,锁是一种常用的同步机制,用于保护共享资源的访问。然而,不当的锁使用可能导致性能问题和死锁风险。本文将探讨Java中锁的优化策略,包括锁粗化、锁消除、锁分离和读写锁等技术,以提高并发程序的性能和可靠性。
|
3天前
|
Java 编译器
Java 并发编程中的锁优化策略
【5月更文挑战第17天】在 Java 并发编程中,锁是一种常见的同步机制,用于保护共享资源的访问。然而,不当使用锁可能导致性能问题和死锁风险。本文将探讨 Java 中的锁优化策略,包括锁粗化、锁消除、锁降级以及读写锁等技术,以提高并发程序的性能和可靠性。
|
3天前
|
Java 编译器
Java并发编程中的锁优化策略
【5月更文挑战第17天】在Java并发编程中,锁是一种常见的同步机制,用于保护共享资源。然而,使用不当的锁可能导致性能下降和死锁等问题。本文将探讨Java中锁的优化策略,包括锁粗化、锁消除、锁排序等方法,以提高程序的性能和可靠性。