Java并发编程之线程池中的Future

简介: Java并发编程之线程池中的Future

线程池中的Future:



1.线程池的典型使用场景


 ExecutorService executorService = Executors.newFixedThreadPool(10);
  //此处Task为实现Runnable接口的类
  Future future = executorService.submit(new Task());
  try {
      future.get();//此处会阻塞等待结果
  } catch (InterruptedException e) {
      e.printStackTrace();
  } catch (ExecutionException e) {
      e.printStackTrace();
  }


线程池的提交接口会返回一个Future对象,该对象的get方法会造成调用线程阻塞,等任务执行完毕时,会唤醒调用线程继续执行.


2.线程池Future的实现原理;


public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    //此处构造了一个future对象
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
//构造一个FutureTask返回
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}


可以看到线程池的任务提交后内部会构造一个FutureTask对象返回


3.FutureTask对象:


//类名FutureTask
public FutureTask(Runnable runnable, V result) {
    //包装成callable接口
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // 设置任务的初始状态
}
/**
     * 等待任务执行完毕返回结果
     * @throws CancellationException {@inheritDoc}
     */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //如果任务状态为处理中,或者未处理则等待处理完成
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);//此处会导致阻塞
    //报告结果
    return report(s);
}


造成调用线程阻塞的方法是awaitDone,任务处理完成后会唤醒阻塞线程,从而继续调用report方法返回处理结果.

4.awaitDone

//类名FutureTask
/**
     * Awaits completion or aborts on interrupt or timeout.
     * 此方法有3种处理结果,等待任务完成或者被interrupt退出,或者超时退出
     * @param timed true表示设置了等待时间
     * @param nanos 等待时长
     * @return state upon completion
     */
private int awaitDone(boolean timed, long nanos) throws     InterruptedException {
    //任务到期时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    //等待节点,链表结构,记录了所有调用get方法的线程
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        //判断当前线程是否被中断
        if (Thread.interrupted()) {
            //删除链表中该线程所在的节点
            removeWaiter(q);
            //抛出中断异常
            throw new InterruptedException();
        }
        int s = state;
        //任务已经执行结束
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;//help gc
            return s;//返回执行状态
        }
        //任务正在执行中
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();//让出cpu执行权
        else if (q == null)
            q = new WaitNode();//初始一个等待节点
        else if (!queued)//CAS操作,如果节点没有放入链表(栈结构),就放入栈中
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {//如果设置了超时时间,则进行超时等待
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {//判断是否超时
                removeWaiter(q);//删除等待节点
                return state;
            }
            LockSupport.parkNanos(this, nanos);//阻塞当前线程,nanos时间
        }
        else
            LockSupport.park(this);//阻塞当前线程
    }
}
//等待节点,记录等待执行结果的线程
static final class WaitNode {
    volatile Thread thread; //等待线程
    volatile WaitNode next;//指向下一节点,链表结构
    WaitNode() { thread = Thread.currentThread(); }
}
//删除等待的节点
private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;//设置待删除节点的线程为空,用于标记待删除线程
        retry:
        for (;;) { // 处理多个节点同时删除的情况,restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;//s指向q的后继节点
                if (q.thread != null)//判断当前节点是否是待删除的节点
                    pred = q;//pred指向q的前驱节点
                else if (pred != null) {//走到这个分支,说明q.thread==null,也就是说q为待删除节点
                    pred.next = s;//将q的前驱指向q的后继
                    if (pred.thread == null) // 如果此时前驱节点也变成待删除节点,说明此时有竞争,CAS重试
                        continue retry;
                }
                //走到这个分支,说明前驱节点为空,此时设置q的后继节点为链表的头节点
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    continue retry;//失败则跳出,cas重试
            }
            break;
        }
    }
}
//返回任务执行结果
private V report(int s) throws ExecutionException {
    Object x = outcome;//执行结果
    if (s == NORMAL)//判断状态,执行完成则返回
        return (V)x;
    if (s >= CANCELLED)//取消执行
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);//执行异常
}


以上,我们分析了Future造成调用线程阻塞的逻辑,那么何时会唤醒阻塞的线程呢?有两种情况,一是任务执行完毕,一是任务被取消执行。


5.唤醒阻塞的线程:


//类名FutureTask
//线程池的任务执行逻辑
 public void run() {
     //不是初始状态,或者CAS设置任务的当前处理线程失败,则返回
     if (state != NEW ||
         !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                      null, Thread.currentThread()))
         return;
     try {
         Callable<V> c = callable;
         if (c != null && state == NEW) {//判断状态,任务有可能被取消执行
             V result;
             boolean ran;
             try {
                 result = c.call();//执行具体任务
                 ran = true;
             } catch (Throwable ex) {
                 result = null;
                 ran = false;
                 setException(ex);//设置任务异常
             }
             if (ran)
                 set(result);//设置执行结果
         }
     } finally {
         // runner must be non-null until state is settled to
         // prevent concurrent calls to run()
         runner = null;//执行线程置空
         // state must be re-read after nulling runner to prevent
         // leaked interrupts
         int s = state;
         if (s >= INTERRUPTING)
             handlePossibleCancellationInterrupt(s);
     }
 }
protected void set(V v) {
    //cas操作,防止此时其他线程修改状态
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;//记录结果
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 设置最终状态,final state
        finishCompletion();//执行完成,唤醒等待结果的线程
    }
}
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
//任务执行完成,唤醒等待节点
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        //cas设置链表头结点为空,设置失败,重新循环判断
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);//唤醒等待节点
                }
                WaitNode next = q.next;
                if (next == null)//到尾部后,退出循环
                    break;
                q.next = null; // unlink to help gc
                q = next;//移动节点,循环判断
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}
//取消任务执行,返回false表示取消失败,true表示成功
public boolean cancel(boolean mayInterruptIfRunning) {
    //如果状态不是NEW,或者CAS设置状态失败,返回false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                                   mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {//运行的时候是否允许中断
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();//中断任务执行线程
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();//通知任务已执行完毕
    }
    return true;
}


6.Java中线程池的Future的运行原理:任务提交到线程池后,线程池会生成一个Future对象用于跟踪任务的执行状态,并可以取消任务的执行,如果线程池还没有开始执行任务,那么取消操作将导致任务不会被线程池执行,否则的话取消操作可以对线程池中执行任务的线程进行中断。



目录
相关文章
|
3天前
|
数据采集 存储 数据处理
Python中的多线程编程及其在数据处理中的应用
本文深入探讨了Python中多线程编程的概念、原理和实现方法,并详细介绍了其在数据处理领域的应用。通过对比单线程与多线程的性能差异,展示了多线程编程在提升程序运行效率方面的显著优势。文章还提供了实际案例,帮助读者更好地理解和掌握多线程编程技术。
|
2天前
|
API Android开发 iOS开发
深入探索Android与iOS的多线程编程差异
在移动应用开发领域,多线程编程是提高应用性能和响应性的关键。本文将对比分析Android和iOS两大平台在多线程处理上的不同实现机制,探讨它们各自的优势与局限性,并通过实例展示如何在这两个平台上进行有效的多线程编程。通过深入了解这些差异,开发者可以更好地选择适合自己项目需求的技术和策略,从而优化应用的性能和用户体验。
|
8天前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
|
8天前
|
Java 开发者
Java多线程编程的艺术与实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的技术文档,本文以实战为导向,通过生动的实例和详尽的代码解析,引领读者领略多线程编程的魅力,掌握其在提升应用性能、优化资源利用方面的关键作用。无论你是Java初学者还是有一定经验的开发者,本文都将为你打开多线程编程的新视角。 ####
|
7天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
1月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
48 1
C++ 多线程之初识多线程
|
1月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
20 3
|
1月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
19 2
|
1月前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
30 2
|
1月前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
34 1
下一篇
无影云桌面