JAVA 分析线程池中的keepAliveTime参数具体实现

简介: JAVA 分析线程池中的keepAliveTime参数具体实现

分析线程池中的keepAliveTime参数具体实现

参数意义

创建线程池时,有一个重要参数就是keepAliveTime,标记线程空闲多久后被释放。

那么他到底是怎么实现的呢?

猜想

有一个线程在维护时间,可笑。专门有一个线程去维护,浪费资源,而且时间也不够精确。而且还要开辟空间记录线程开始空闲的时间,消耗空间。

源码分析

java.util.concurrent.ThreadPoolExecutor#getTask

/**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            // 当前线程数
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                // 这一步是关键,需要了解poll和take的区别,take会进行阻塞。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  • poll 方法作用是移除并返回队列的头节点。但是如果当队列里面是空的,没有任何东西可以移除的时候,便会返回 null 作为提示。
  • 带时间参数的 poll 方法:如果能够移除,便会立刻返回这个节点的内容;如果队列是空的就会进行等待,等待时间正是我们指定的时间,直到超时时间到了,如果队列里依然没有元素可供移除,便会返回 null 作为提示。
  • take 方法的作用是获取并移除队列的头结点。通常在队列里有数据的时候会正常取出数据并删除;但是如果执行 take 的时候队列里无数据,则阻塞,直到队列里有数据;一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。
  • timed为true则标志着:允许核心线程超时被释放或者当前线程数超过核心线程数。一旦为tue,就会去使用阻塞对垒的poll方法,如果keepAliveTime的时间里获取不到任务,就会返回Null,在上一级,也就是runWorker方法中去释放资源。

java.util.concurrent.ThreadPoolExecutor#runWorker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // while循环获取任务,如果获取不到,就跳出循环,在finally中进行释放。
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 去释放当前的worker w。
            processWorkerExit(w, completedAbruptly);
        }
    }

阻塞队列的poll方法又是怎么实现超时的呢?

但是我们较真来看,打破砂锅问到底,阻塞队列又是如何做到去完成超时的呢?

以java.util.concurrent.DelayQueue为例

源码分析

java.util.concurrent.DelayQueue#poll(long, java.util.concurrent.TimeUnit)

/**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue,
     * or the specified wait time expires.
     *
     * @return the head of this queue, or {@code null} if the
     *         specified waiting time elapses before an element with
     *         an expired delay becomes available
     * @throws InterruptedException {@inheritDoc}
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0)
                        // 已经超时就返回null
                        return null;
                    else
                        // 未超时就去await
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    if (nanos <= 0)
                        return null;
                    first = null; // don't retain ref while waiting
                    if (nanos < delay || leader != null)
                        // 未超时就去await
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 未超时就去await
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

可以看到反复调用了available.awaitNanos(delay)方法,继续往下看

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#awaitNanos

/**
         * Implements timed condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled, interrupted, or timed out.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    // 这一步去做了等待
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime();
        }

看到执行了LockSupport.parkNanos(this, nanosTimeout);

java.util.concurrent.locks.LockSupport#parkNanos(java.lang.Object, long)

public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }

执行了UNSAFE.park(false, nanos);

sun.misc.Unsafe#park

public native void park(boolean var1, long var2);

这是一个native方法,一般来说到了native我就不看了…c++都还给夫子了。

网上搜搜看吧!!

转载分析本地方法的博客:https://www.jianshu.com/p/37ef66eddca6

也不是特别详细,没有到操作系统底层。

个人猜测操作系统是包了一层while循环,在未超时时就放弃这个CPU时间片,让出CPU,等待下次调度再做判断。

总结

不算太虎头蛇尾吧,至少把jdk中是怎么做到实现keepAliveTime参数的源码给看懂了。

欢迎大家交流指出JRE中阻塞队列调用的UNSAFE如何做到超时,以及对应到操作系统如何配合这个超时指令。

目录
相关文章
|
17小时前
|
缓存 Java 调度
Java并发编程:深入理解线程池
【4月更文挑战第30天】 在Java并发编程中,线程池是一种重要的工具,它可以帮助我们有效地管理线程,提高系统性能。本文将深入探讨Java线程池的工作原理,如何使用它,以及如何根据实际需求选择合适的线程池策略。
|
23小时前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第30天】 本文将深入探讨Java中的线程池,解析其原理、使用场景以及如何合理地利用线程池提高程序性能。我们将从线程池的基本概念出发,介绍其内部工作机制,然后通过实例演示如何创建和使用线程池。最后,我们将讨论线程池的优缺点以及在实际应用中需要注意的问题。
|
1天前
|
设计模式 算法 安全
Java多线程编程实战:从入门到精通
【4月更文挑战第30天】本文介绍了Java多线程编程的基础,包括线程概念、创建线程(继承`Thread`或实现`Runnable`)、线程生命周期。还讨论了线程同步与锁(同步代码块、`ReentrantLock`)、线程间通信(等待/通知、并发集合)以及实战技巧,如使用线程池、线程安全设计模式和避免死锁。性能优化方面,建议减少锁粒度和使用非阻塞算法。理解这些概念和技术对于编写高效、可靠的多线程程序至关重要。
|
1天前
|
Java 调度 开发者
Java中的多线程编程:基础知识与实践
【4月更文挑战第30天】 在现代软件开发中,多线程编程是提高程序性能和响应能力的关键。Java作为一款广泛使用的编程语言,提供了丰富的多线程支持。本文将介绍Java多线程的基础概念、实现方法以及常见问题的解决策略。我们将从线程的创建和管理入手,逐步深入到同步机制、死锁避免以及高级并发工具类的应用。通过实例代码演示和理论分析,旨在帮助读者掌握Java多线程编程的核心技能,提升软件项目的并行处理能力。
|
1天前
|
Java
java多线程售票例子
java多线程售票例子
|
1天前
|
Java 程序员
Java中的多线程编程与性能优化
【4月更文挑战第30天】本文主要探讨了Java中的多线程编程以及如何通过多线程技术来提升程序的性能。首先,我们将介绍多线程的基本概念和原理,然后深入探讨Java中实现多线程的两种主要方式:继承Thread类和实现Runnable接口。接着,我们将讨论多线程中的同步问题,包括synchronized关键字和Lock锁。最后,我们将探讨如何通过线程池来管理和优化线程,以及如何避免常见的多线程问题。
|
1天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第30天】本文将深入探讨Java并发编程中的一个重要主题——线程池。我们将从线程池的基本概念入手,了解其工作原理和优势,然后详细介绍如何使用Java的Executor框架创建和管理线程池。最后,我们将讨论一些高级主题,如自定义线程工厂和拒绝策略。通过本文的学习,你将能够更好地理解和使用Java的线程池,提高你的并发编程能力。
|
1天前
|
存储 安全 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第30天】在Java开发中,并发编程是一个复杂而又关键的领域。它允许多个线程同时执行,从而提高程序性能和资源利用率。然而,并发编程也带来了许多挑战,如数据不一致、死锁和线程安全问题。本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化策略。我们将通过实例分析如何在保证线程安全的同时提高程序性能,为Java开发者提供实用的指导。
|
1天前
|
Java 程序员 开发者
深入理解Java并发编程:线程同步与锁机制
【4月更文挑战第30天】 在多线程的世界中,确保数据的一致性和线程间的有效通信是至关重要的。本文将深入探讨Java并发编程中的核心概念——线程同步与锁机制。我们将从基本的synchronized关键字开始,逐步过渡到更复杂的ReentrantLock类,并探讨它们如何帮助我们在多线程环境中保持数据完整性和避免常见的并发问题。文章还将通过示例代码,展示这些同步工具在实际开发中的应用,帮助读者构建对Java并发编程深层次的理解。
|
1天前
|
安全 Java 调度
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第30天】本文将深入探讨Java并发编程的核心概念,包括线程安全、同步机制、锁优化以及性能调优。我们将通过实例分析如何确保多线程环境下的数据一致性,同时介绍一些常见的并发模式和最佳实践,旨在帮助开发者在保证线程安全的同时,提升系统的性能和响应能力。