重写线程池 execute 方法导致线程池“失效” 问题

简介: 今天群里有个同学遇到一个看似很奇怪的问题,自定义 `ThreadPoolTaskExecutor` 子类,重写了 execute 方法,通过 execute 方法来执行任务时打印当前线程,日志显示任务一直在调用者线程里执行 (其实并不是),似乎线程池失效了。

一、背景

今天群里有个同学遇到一个看似很奇怪的问题,自定义 ThreadPoolTaskExecutor 子类,重写了 execute 方法,通过 execute 方法来执行任务时打印当前线程,日志显示任务一直在调用者线程里执行 (其实并不是),似乎线程池失效了。

二、场景复现

自定义 ThreadPoolTaskExecutor 子类

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class ThreadPoolTaskExecutorImpl extends ThreadPoolTaskExecutor {

    @Override
    public void execute(Runnable command) {
        System.out.println("当前线程" + Thread.currentThread().getName());

        super.execute(command);
    }

}

编写测试代码:

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo {

    public static void main(String[] args) throws InterruptedException {
        // 构造线程池
        ThreadPoolTaskExecutorImpl executor = new ThreadPoolTaskExecutorImpl();
        executor.initialize();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("[线程池的线程]");

        // 执行任务
         for (int i = 0; i < 5; i++) {
            executor.execute(() -> {
                System.out.println("测试");
            });
        }


        TimeUnit.SECONDS.sleep(3);

        executor.shutdown();
    }
}

执行结果:

当前线程main
当前线程main
测试
测试
当前线程main
测试
当前线程main
当前线程main
测试
测试

由此断定:自定义的线程池失效,在 execute 方法中获取当前线程时,并没有出现我们定义的线程名称前缀的线程,仍然使用 main 线程来执行任务。
但是,真的是这样吗?

三、分析

由于很多同学没有认真思考过多线程的本质,会想当然地认为线程池的 execute 方法的所有代码都是在线程池创建的线程中执行,可是真的是这样吗?

我们知道在没有使用新线程的情况下,程序会使用当前线程(main 线程)顺序执行。
在这里插入图片描述

因此,在 org.example.thread.ThreadPoolTaskExecutorImpl#execute 中,打印的 “当前线程” 的代码仍然是在 main 方法中执行的。
在这里插入图片描述

进入 super.execute 方法

    @Override
    public void execute(Runnable task) {
        Executor executor = getThreadPoolExecutor();
        try {
            executor.execute(task);
        }
        catch (RejectedExecutionException ex) {
            throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
        }
    }

这里的 executor.execute 方法实际上是 java.util.concurrent.ThreadPoolExecutor#execute

 /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

小于核心线程数,会执行到 `addWorker ,此时才真正创建新的线程去执行任务:

    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        
        // 省略部分代码

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //【关键1】 创建新的线程(run 方法)
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //【关键2】启动创建的线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

我们再看下 new Worker 时做了什么。

  private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
     

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //【关键】 使用线程工厂创建线程
            this.thread = getThreadFactory().newThread(this);
        }
        
        //【关键】 重写 Runnable 的 run 方法,里面就封装了外面传入的 Runnable
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        
        
       final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

        

// 省略其他

}

因此
(1)自定义的 ThreadPoolTaskExecutorImpl 类里重写的 execute 方法里打印的当前线程实际还是调用者线程。
(2)外面传入的 Runnable 参数最终会在 Worker 现成的 run 方法中执行到。

四、解决之道

我们可以使用包装器模式处理下即可:

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class ThreadPoolTaskExecutorImpl extends ThreadPoolTaskExecutor {

    @Override
    public void execute(Runnable command) {
    // 当前在父线程里

        super.execute(() -> {
            // 这里在子线程里执行
            
            // 可以在任务前打印下当前线程名称,线程池的状态等信息
            System.out.println("当前线程" + Thread.currentThread().getName());
            
            // 原始的任务
            command.run();
        });
    }

}

测试代码:

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo {

    public static void main(String[] args) throws InterruptedException {
        // 构造线程池
        ThreadPoolTaskExecutorImpl executor = new ThreadPoolTaskExecutorImpl();
        executor.initialize();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("[线程池的线程]");

        // 执行任务
        for (int i = 0; i < 5; i++) {
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "-> 测试");
            });
        }

        TimeUnit.SECONDS.sleep(3);

        executor.shutdown();
    }
}

输出结果:

当前线程[线程池的线程]2
当前线程[线程池的线程]1
[线程池的线程]1-> 测试
[线程池的线程]2-> 测试
当前线程[线程池的线程]3
[线程池的线程]3-> 测试
当前线程[线程池的线程]4
[线程池的线程]4-> 测试
当前线程[线程池的线程]5
[线程池的线程]5-> 测试

五、启示

5.1 关于提问

该同学提问非常模糊,甚至“反复修改问题”,最终给出关键代码截图,才真正理解问题是什么。

大家请教别人时,尽量能将问题转化为别人容易理解的表达方式。

大家请教别人时,一定自己先搞清楚问题究竟是什么,而不需要别人一再追问下,才不断逼近真实的问题。

大家请教别人时,最好能够有源码或者关键信息截图等。

5.2 现象与本质

我们使用线程池时,总是观察到我们传入的 Runnable 是在线程池中的线程执行的,我们是使用 execute 方法来执行的,但这并不意味着 execute 方法的所有步骤都是在线程池中的线程里执行的。

学习某个技术时,要真正理解技术的本质,而不是表象。

如调用线程的 start 方法才真正启动线程,在重写的 execute 方法第一行压根就没有创建新的线程,怎么会在新的线程里执行呢?

在实际开发和验证问题时,多进行代码调试,掌握高级的调试技巧,如调试时表达式计算、条件断点、移除栈帧回退调用等。


创作不易,如果本文对你有帮助,欢迎点赞、收藏加关注,你的支持和鼓励,是我创作的最大动力。
在这里插入图片描述
相关文章
|
2月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
119 38
|
8天前
|
缓存 安全 Java
【JavaEE】——单例模式引起的多线程安全问题:“饿汉/懒汉”模式,及解决思路和方法(面试高频)
单例模式下,“饿汉模式”,“懒汉模式”,单例模式下引起的线程安全问题,解锁思路和解决方法
|
8天前
|
Java 程序员 调度
【JavaEE】线程创建和终止,Thread类方法,变量捕获(7000字长文)
创建线程的五种方式,Thread常见方法(守护进程.setDaemon() ,isAlive),start和run方法的区别,如何提前终止一个线程,标志位,isinterrupted,变量捕获
|
2月前
|
Java
.如何根据 CPU 核心数设计线程池线程数量
IO 密集型:核心数*2 计算密集型: 核心数+1 为什么加 1?即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
61 4
|
2月前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
95 2
|
2月前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
342 2
|
11天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
34 1
|
3月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
62 1
|
3月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
40 3
|
3月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
28 2