Java并发编程笔记之FutureTask源码分析

简介: FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。

FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。

类图结构如下所示:

线程池使用 FutureTask 时候需要注意的一点事,FutureTask 使用不当可能会造成调用线程一直阻塞,如何避免?

线程池使用 FutureTask 的时候如果拒绝策略设置为了 DiscardPolicyDiscardOldestPolicy并且在被拒绝的任务的 Future 对象上调用无参 get 方法那么调用线程会一直被阻塞。

下面先通过一个简单的例子来复现问题,代码如下:


public class FutureTest {

    //(1)线程池单个线程,线程池队列元素个数为1
        private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,
            new ArrayBlockingQueue<Runnable>(1),new ThreadPoolExecutor.DiscardPolicy());

    public static void main(String[] args) throws Exception {

        //(2)添加任务one
        Future futureOne = executorService.submit(new Runnable() {
            @Override
            public void run() {

                System.out.println("start runable one");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        //(3)添加任务two
        Future futureTwo = executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("start runable two");
            }
        });

        //(4)添加任务three
        Future futureThree=null;
        try {
            futureThree = executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("start runable three");
                }
            });
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
        }

        System.out.println("task one " + futureOne.get());//(5)等待任务one执行完毕
        System.out.println("task two " + futureTwo.get());//(6)等待任务two执行完毕
        System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任务three执行完毕

        executorService.shutdown();//(8)关闭线程池,阻塞直到所有任务执行完毕
 }


运行结果如下:

代码 (1) 创建了一个单线程并且队列元素个数为 1 的线程池,并且拒绝策略设置为了DiscardPolicy

代码(2)向线程池提交了一个任务 one,那么这个任务会使用唯一的一个线程进行执行,任务在打印 start runable one后会阻塞该线程 5s.

代码(3)向线程池提交了一个任务 two,这时候会把任务 two 放入到阻塞队列

代码(4)向线程池提交任务 three,由于队列已经满了则会触发拒绝策略丢弃任务 three, 从执行结果看在任务 one 阻塞的 5s 内,主线程执行到了代码 (5) 等待任务 one 执行完毕,当任务 one 执行完毕后代码(5)返回,主线程打印出 task one null。任务 one 执行完成后线程池的唯一线程会去队列里面取出任务 two 并执行所以输出 start runable two 然后代码(6)会返回,这时候主线程输出 task two null,然后执行代码(7)等待任务 three 执行完毕,从执行结果看代码(7)会一直阻塞不会返回,至此问题产生,如果把拒绝策略修改为 DiscardOldestPolicy 也会存在有一个任务的 get 方法一直阻塞只是现在是任务 two 被阻塞。但是如果拒绝策略设置为默认的 AbortPolicy 则会正常返回,并且会输出如下结果:

 

要分析这个问题需要看下线程池的 submit 方法里面做了什么,submit 方法源码如下:


public Future<?> submit(Runnable task) {
        ...
        //(1)装饰Runnable为Future对象
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        //(6)返回future对象
        return ftask;
}

 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
 }

public void execute(Runnable command) {
         ...
        //(2) 如果线程个数消息核心线程数则新增处理线程处理
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //(3)如果当前线程个数已经达到核心线程数则任务放入队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //(4)尝试新增处理线程进行处理
        else if (!addWorker(command, false))
            reject(command);//(5)新增失败则调用拒绝策略
}


代码(1)装饰 Runnable 为 FutureTask 对象,然后调用线程池的 execute 方法。

代码 (2) 如果线程个数消息核心线程数则新增处理线程处理

代码(3)如果当前线程个数已经达到核心线程数则任务放入队列

代码(4)尝试新增处理线程进行处理,失败则进行代码(5),否者直接使用新线程处理

代码(5)执行具体拒绝策略,从这里也可以看出拒绝策略执行是使用的业务线程。

所以要分析上面例子中问题所在只需要看步骤(5)对被拒绝任务的影响,这里先看下拒绝策略 DiscardPolicy 的源码,如下:


public static class DiscardPolicy implements RejectedExecutionHandler {

    public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

    }

}


可知拒绝策略 rejectedExecution 方法里面什么都没做,所以代码(4)调用 submit 后会返回一个 future 对象,这里有必要在重新说 future 是有状态的,FutureTask 内部有一个state用来展示任务的状态,并且是volatile修饰的,future 的状态枚举值如下:


/** Possible state transitions:
 * NEW -> COMPLETING -> NORMAL 正常的状态转移
 * NEW -> COMPLETING -> EXCEPTIONAL 异常
 * NEW -> CANCELLED 取消
 * NEW -> INTERRUPTING -> INTERRUPTED 中断
 */
 
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;


在代码(1)的时候使用 newTaskFor 方法转换 Runnable 任务为 FutureTask,而 FutureTask 的构造函数里面设置的状态就是 New。FutureTask的构造函数源码如下:


public FutureTask(Runnable runnable, V result) {
     this.callable = Executors.callable(runnable, result);
     this.state = NEW;       // ensure visibility of callable
}

把FutureTask提交到线程池或者线程执行start时候会调用run方法,源码如下:


public void run() {

    //如果当前不是new状态,或者当前cas设置当前线程失败则返回,只有一个线程可以成功。
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        //当前状态为new 则调用任务的call方法执行任务
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);//完成NEW -> COMPLETING -> EXCEPTIONAL 状态转移
            }

            //执行任务成功则保存结果更新状态,unpark所有等待线程。
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}


protected void set(V v) {
    //状态从new->COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        //状态从COMPLETING-》NORMAL
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        //unpark所有等待线程。
        finishCompletion();
    }
}

所以使用 DiscardPolicy 策略提交任务后返回了一个状态值为NEW的future对象。那么我们下面就要看下当future的无参get()方法的时候,future变为什么状态才会返回,这时候就要看一下FutureTask的get方法的源码,源码如下:


   public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //当状态值<=COMPLETING时候需要等待,否者调用report返回
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {

            //如果被中断,则抛异常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            //组建单列表
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            else if (timed) {


                nanos = deadline - System.nanoTime();
                //超时则返回
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                //否者设置park超时时间
                LockSupport.parkNanos(this, nanos);
            }
            else
                //直接挂起当前线程
                LockSupport.park(this);
        }
    }
    
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        //状态值为NORMAL正常返回
        if (s == NORMAL)
            return (V)x;
        //状态值大于等于CANCELLED则抛异常
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }


也就是说当 future 的状态 > COMPLETING 时候调用 get 方法才会返回,而明显 DiscardPolicy 策略在拒绝元素的时候并没有设置该 future 的状态,后面也没有其他机会可以设置该 future 的状态,所以 future 的状态一直是 NEW,所以一直不会返回,同理 DiscardOldestPolicy 策略也是这样的问题,最老的任务被淘汰时候没有设置被淘汰任务对于 future 的状态。、

在submit任务后还可以调用futuretask的cancel来取消任务:

 

public boolean cancel(boolean mayInterruptIfRunning) {
        //只有任务是new的才能取消
        if (state != NEW)
            return false;
       //运行时允许中断
        if (mayInterruptIfRunning) {
           //完成new->INTERRUPTING
            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
                return false;
            Thread t = runner;
            if (t != null)
                t.interrupt();
            //完成INTERRUPTING->INTERRUPTED
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
        }
       //不允许中断则直接new->CANCELLED
        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
            return false;
        finishCompletion();
        return true;
} 


那么默认的 AbortPolicy 策略为啥没问题呢?

也就是说当 future 的状态 > COMPLETING 时候调用 get 方法才会返回,而明显 DiscardPolicy 策略在拒绝元素的时候并没有设置该 future 的状态,后面也没有其他机会可以设置该 future 的状态,所以 future 的状态一直是 NEW,所以一直不会返回,同理 DiscardOldestPolicy 策略也是这样的问题,最老的任务被淘汰时候没有设置被淘汰任务对于 future 的状态。

所以当使用 Future 的时候,尽量使用带超时时间的 get 方法,这样即使使用了 DiscardPolicy 拒绝策略也不至于一直等待,等待超时时间到了会自动返回的,如果非要使用不带参数的 get 方法则可以重写 DiscardPolicy 的拒绝策略在执行策略时候设置该 Future 的状态大于 COMPLETING 即可,但是查看 FutureTask 提供的方法发现只有 cancel 方法是 public 的并且可以设置 FutureTask 的状态大于 COMPLETING,重写拒绝策略具体代码可以如下:

/**
 * Created by cong on 2018/7/13.
 */
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
        if (!threadPoolExecutor.isShutdown()) {
            if(null != runnable && runnable instanceof FutureTask){
                ((FutureTask) runnable).cancel(true);
            }
        }
    }
}

使用这个策略时候由于从 report 方法知道在 cancel 的任务上调用 get() 方法会抛出异常所以代码(7)需要使用 try-catch 捕获异常代码(7)修改为如下:

package com.hjc;

import java.util.concurrent.*;

/**
 * Created by cong on 2018/7/13.
 */
public class FutureTest {

    //(1)线程池单个线程,线程池队列元素个数为1
    private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L,
 TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1), new MyRejectedExecutionHandler());

    public static void main(String[] args) throws Exception {

        //(2)添加任务one
        Future futureOne = executorService.submit(new Runnable() {
           
            public void run() {

                System.out.println("start runable one");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        //(3)添加任务two
        Future futureTwo = executorService.submit(new Runnable() {
            
            public void run() {
                System.out.println("start runable two");
            }
        });

        //(4)添加任务three
        Future futureThree = null;
        try {
            futureThree = executorService.submit(new Runnable() {
                
                public void run() {
                    System.out.println("start runable three");
                }
            });
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
        }

        System.out.println("task one " + futureOne.get());//(5)等待任务one执行完毕
        System.out.println("task two " + futureTwo.get());//(6)等待任务two执行完毕
        try{
            System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任务three
        }catch(Exception e){
            System.out.println(e.getLocalizedMessage());
        }

        executorService.shutdown();//(8)关闭线程池,阻塞直到所有任务执行完毕
    }
}

运行结果如下:

当然这相比正常情况下多了一个异常捕获,其实最好的情况是重写拒绝策略时候设置 FutureTask 的状态为 NORMAL,但是这需要重写 FutureTask 方法了,因为 FutureTask 并没有提供接口进行设置。

目录
相关文章
|
6天前
|
JSON Java Apache
非常实用的Http应用框架,杜绝Java Http 接口对接繁琐编程
UniHttp 是一个声明式的 HTTP 接口对接框架,帮助开发者快速对接第三方 HTTP 接口。通过 @HttpApi 注解定义接口,使用 @GetHttpInterface 和 @PostHttpInterface 等注解配置请求方法和参数。支持自定义代理逻辑、全局请求参数、错误处理和连接池配置,提高代码的内聚性和可读性。
|
8天前
|
安全 Java 编译器
JDK 10中的局部变量类型推断:Java编程的简化与革新
JDK 10引入的局部变量类型推断通过`var`关键字简化了代码编写,提高了可读性。编译器根据初始化表达式自动推断变量类型,减少了冗长的类型声明。虽然带来了诸多优点,但也有一些限制,如只能用于局部变量声明,并需立即初始化。这一特性使Java更接近动态类型语言,增强了灵活性和易用性。
90 53
|
7天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
4天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
6天前
|
存储 缓存 安全
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见。本文介绍了使用 `File.createTempFile` 方法和自定义创建临时文件的两种方式,详细探讨了它们的使用场景和注意事项,包括数据缓存、文件上传下载和日志记录等。强调了清理临时文件、确保文件名唯一性和合理设置文件权限的重要性。
17 2
|
7天前
|
Java UED
Java中的多线程编程基础与实践
【10月更文挑战第35天】在Java的世界中,多线程是提升应用性能和响应性的利器。本文将深入浅出地介绍如何在Java中创建和管理线程,以及如何利用同步机制确保数据一致性。我们将从简单的“Hello, World!”线程示例出发,逐步探索线程池的高效使用,并讨论常见的多线程问题。无论你是Java新手还是希望深化理解,这篇文章都将为你打开多线程的大门。
|
8天前
|
安全 Java 编译器
Java多线程编程的陷阱与最佳实践####
【10月更文挑战第29天】 本文深入探讨了Java多线程编程中的常见陷阱,如竞态条件、死锁、内存一致性错误等,并通过实例分析揭示了这些陷阱的成因。同时,文章也分享了一系列最佳实践,包括使用volatile关键字、原子类、线程安全集合以及并发框架(如java.util.concurrent包下的工具类),帮助开发者有效避免多线程编程中的问题,提升应用的稳定性和性能。 ####
33 1
|
8天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
17天前
|
安全 Java
java 中 i++ 到底是否线程安全?
本文通过实例探讨了 `i++` 在多线程环境下的线程安全性问题。首先,使用 100 个线程分别执行 10000 次 `i++` 操作,发现最终结果小于预期的 1000000,证明 `i++` 是线程不安全的。接着,介绍了两种解决方法:使用 `synchronized` 关键字加锁和使用 `AtomicInteger` 类。其中,`AtomicInteger` 通过 `CAS` 操作实现了高效的线程安全。最后,通过分析字节码和源码,解释了 `i++` 为何线程不安全以及 `AtomicInteger` 如何保证线程安全。
java 中 i++ 到底是否线程安全?
|
4天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
22 9