Java Review - 线程池使用FutureTask的小坑

简介: Java Review - 线程池使用FutureTask的小坑

195d03d17afc4a928bc581f313b01dfe.png

概述


先说结论


线程池使用FutureTask时如果把拒绝策略设置为 DiscardPolicy和 DiscardOldestPolicy,并且在被拒绝的任务的Future对象上调用了无参get方法,那么调用线程会一直被阻塞。


问题复现

import java.util.concurrent.*;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/11/21 0:11
 * @mark: show me the code , change the world
 */
public class FutureTest {
    //  1 线程池单个线程,队列大小为1  - 初始化线程池
    private final static ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES,
            new ArrayBlockingQueue<Runnable>(1),
            new ThreadPoolExecutor.DiscardPolicy());
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 2 添加你任务1
        Future futureOne = tpe.submit(() -> {
            System.out.println("开始处理业务1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("业务1执行结束");
            return "Result1";
        });
        // 3 添加你任务2
        Future futureTwo = tpe.submit(() -> {
            System.out.println("开始处理业务2");
            System.out.println("业务2执行结束");
            return "Result2";
        });
        // 4  添加任务3
        Future futureThree = null;
        try {
            futureThree = tpe.submit(() -> System.out.println("开始处理业务3"));
        } catch (Exception e) {
            System.out.print(e.getLocalizedMessage());
        }
        // 5 等待任务1执行完毕
        System.out.println("任务1返回结果: " + futureOne.get());
        // 6 等待任务2执行完毕
        System.out.println("任务2返回结果:  " + futureTwo.get());
        // 7 等待任务3执行完毕
        System.out.println("任务3返回结果:  " + futureThree==null?null:futureThree.get());
        //关闭线程池,阻塞知道所有任务执行完毕
        tpe.shutdown();
    }
}


输出

bfadc464ecf149b7ba0da8013b494d23.png


让我们来分析下


代码(1)创建了一个单线程和一个队列元素个数为1的线程池,并且把拒绝策略设置为 DiscardPolicy。


代码(2)向线程池提交了一个异步任务one,并且这个任务会由唯一的线程来执行,任务在打印【开始处理业务1】 后会阻塞该线程2s。


代码(3)向线程池提交了一个异步任务two,这时候会把任务two放入阻塞队列。


代码(4)向线程池提交任务three,由于队列已满所以触发拒绝策略丢弃任务three。


从执行结果看,在任务one阻塞的5s内,主线程执行到了代码(5)并等待任务one执行完毕,当任务one执行完毕后代码(5)返回,主线程打印出【任务1 null】。任务one执行完成后线程池的唯一线程会去队列里面取出任务two并执行,所以输出【开始处理业务2】,然后代码(6)返回,这时候主线程输出【任务2 null】。然后执行代码(7)等待任务three执行完毕。


从执行结果看,代码(7)会一直阻塞而不会返回,至此问题产生。如果把拒绝策略修改为DiscardOldestPolicy,也会存在有一个任务的get方法一直阻塞,只是现在是任务two被阻塞。


但是如果把拒绝策略设置为默认的AbortPolicy则会正常返回,并且会输出如下结果

开始处理业务1
Task java.util.concurrent.FutureTask@27bc2616 rejected from java.util.concurrent.ThreadPoolExecutor@3941a79c[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]业务1执行结束
任务1返回结果: Result1
开始处理业务2
业务2执行结束
任务2返回结果:  Result2
Exception in thread "main" java.lang.NullPointerException
  at com.artisan.bfzm.chapter11.FutureTest.main(FutureTest.java:58)


源码分析


要分析这个问题,需要看线程池的submit方法都做了什么,submit方法的代码如下

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 1 装饰Runnable对象为Future对象
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        // 6 返回Future对象
        return ftask;
    }


看下 execute方法

   /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */ 
    // 2 如果线程个数小于核心线程数量,则新增线程处理
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 3. 如果线程都在工作且当前线程个数已经达到核心线程数,就把任务放入队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 4 尝试新增处理线程
        else if (!addWorker(command, false))
            // 5 新增失败则触发拒绝策略
            reject(command);
    }


代码(1)装饰Runnable为FutureTask对象,然后调用线程池的execute方法。


代码(2)判断如果线程个数小于核心线程数则新增处理线程。


代码(3)判断如果当前线程个数已经达到核心线程数则将任务放入队列 。


代码(4)尝试新增处理线程。失败则执行代码(5),否则直接使用新线程处理。


代码(5)执行具体拒绝策略,从这里也可以看出,使用业务线程执行拒绝策略。


所以要找到上面例子中问题所在,只需要看代码(5)对被拒绝任务的影响,这里先看下拒绝策略DiscardPolicy的代码。

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }
        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }


拒绝策略的rejectedExecution方法什么都没做,代码(4)调用submit后会返回一个Future对象。

41892aa861c34f4bae1bfbe6ad3a1109.png


Future是有状态的,Future的状态枚举值如下


39b4da07248544dc89396b3844995224.png

在代码(1)中使用newTaskFor方法将Runnable任务转换为FutureTask,

   protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }


继续

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Callable}.
     *
     * @param  callable the callable task
     * @throws NullPointerException if the callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }


而在FutureTask的构造函数里面设置的状态就是NEW。


所以使用DiscardPolicy策略提交后返回了一个状态为NEW的Future对象。

那么我们下面就需要看下当调用Future的无参get方法时Future变为什么状态才会返回,那就要看下FutureTask的get()方法代码。


fba7e7bbf616469a94464223bc9f50a5.png


   public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //当前状态值 <= COMPLETING需要等待,否则调用report返回 
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    /**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        // 状态值为NORMAL的时候正常返回
        if (s == NORMAL)
            return (V)x;
       // 状态值大于等于CANCELLED的时候抛出异常
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

也就是说,当Future的状态>COMPLETING时调用get方法才会返回,而明显DiscardPolicy策略在拒绝元素时并没有设置该Future的状态,后面也没有其他机会可以设置该Future的状态,所以Future的状态一直是NEW,所以一直不会返回。


同理,DiscardOldestPolicy策略也存在这样的问题,最老的任务被淘汰时没有设置被淘汰任务对应Future的状态。


那么默认的AbortPolicy策略为啥没问题呢?其实在执行AbortPolicy策略时,代码(5)会直接抛出RejectedExecutionException异常,也就是submit方法并没有返回Future对象,这时候futureThree是null。84839f793f0d4e73bcf68ffdc1556692.png

    /**
     * Invokes the rejected execution handler for the given command.
     * Package-protected for use by ScheduledThreadPoolExecutor.
     */
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }


解决办法


所以当使用Future时,尽量使用带超时时间的get方法,这样即使使用了DiscardPolicy拒绝策略也不至于一直等待,超时时间到了就会自动返回。


如果非要使用不带参数的get方法则可以重写DiscardPolicy的拒绝策略,在执行策略时设置该Future的状态大于COMPLETING即可。但是我们查看FutureTask提供的方法,会发现只有cancel方法是public的,并且可以设置FutureTask的状态大于COMPLETING,则重写拒绝策略的具体代码如下。

import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/11/21 1:40
 * @mark: show me the code , change the world
 */
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            if (!executor.isShutdown()){
                if (null != r && r instanceof FutureTask) {
                    ((FutureTask) r).cancel(true);
                }
            }
    }
}

使用这个策略时,由于在cancel的任务上调用get()方法会抛出异常,所以代码(7)需要使用try-catch块捕获异常,因此将代码(7)修改为如下所示。

b263dfc3b49848a387d279b90d506226.png


执行结果为


196a9c1557fd45d38233cdd3e10a278d.png

当然这相比正常情况多了一个异常捕获操作。最好的情况是,重写拒绝策略时设置FutureTask的状态为NORMAL,但是这需要重写FutureTask方法,因为FutureTask并没有提供接口让我们设置。


小结


通过案例介绍了在线程池中使用FutureTask时,当拒绝策略为DiscardPolicy和DiscardOldestPolicy时,在被拒绝的任务的FutureTask对象上调用get()方法会导致调用线程一直阻塞,所以在日常开发中尽量使用带超时参数的get方法以避免线程一直阻塞。


相关文章
|
9天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
49 17
|
19天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
5天前
|
缓存 安全 算法
Java 多线程 面试题
Java 多线程 相关基础面试题
|
21天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
21天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
22天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
45 3
|
机器学习/深度学习 Java 程序员
Java Review(三十二、异常处理)
Java Review(三十二、异常处理)
143 0
Java Review(三十二、异常处理)
|
XML 存储 Java
Java Review(三十三、异常处理----补充:断言、日志、调试)
Java Review(三十三、异常处理----补充:断言、日志、调试)
183 0
|
22天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
120 2
|
30天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
48 6