Java并发编程之线程池中的Future

简介: Java并发编程之线程池中的Future

线程池中的Future:



1.线程池的典型使用场景


 ExecutorService executorService = Executors.newFixedThreadPool(10);
  //此处Task为实现Runnable接口的类
  Future future = executorService.submit(new Task());
  try {
      future.get();//此处会阻塞等待结果
  } catch (InterruptedException e) {
      e.printStackTrace();
  } catch (ExecutionException e) {
      e.printStackTrace();
  }


线程池的提交接口会返回一个Future对象,该对象的get方法会造成调用线程阻塞,等任务执行完毕时,会唤醒调用线程继续执行.


2.线程池Future的实现原理;


public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    //此处构造了一个future对象
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
//构造一个FutureTask返回
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}


可以看到线程池的任务提交后内部会构造一个FutureTask对象返回


3.FutureTask对象:


//类名FutureTask
public FutureTask(Runnable runnable, V result) {
    //包装成callable接口
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // 设置任务的初始状态
}
/**
     * 等待任务执行完毕返回结果
     * @throws CancellationException {@inheritDoc}
     */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //如果任务状态为处理中,或者未处理则等待处理完成
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);//此处会导致阻塞
    //报告结果
    return report(s);
}


造成调用线程阻塞的方法是awaitDone,任务处理完成后会唤醒阻塞线程,从而继续调用report方法返回处理结果.

4.awaitDone

//类名FutureTask
/**
     * Awaits completion or aborts on interrupt or timeout.
     * 此方法有3种处理结果,等待任务完成或者被interrupt退出,或者超时退出
     * @param timed true表示设置了等待时间
     * @param nanos 等待时长
     * @return state upon completion
     */
private int awaitDone(boolean timed, long nanos) throws     InterruptedException {
    //任务到期时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    //等待节点,链表结构,记录了所有调用get方法的线程
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        //判断当前线程是否被中断
        if (Thread.interrupted()) {
            //删除链表中该线程所在的节点
            removeWaiter(q);
            //抛出中断异常
            throw new InterruptedException();
        }
        int s = state;
        //任务已经执行结束
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;//help gc
            return s;//返回执行状态
        }
        //任务正在执行中
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();//让出cpu执行权
        else if (q == null)
            q = new WaitNode();//初始一个等待节点
        else if (!queued)//CAS操作,如果节点没有放入链表(栈结构),就放入栈中
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {//如果设置了超时时间,则进行超时等待
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {//判断是否超时
                removeWaiter(q);//删除等待节点
                return state;
            }
            LockSupport.parkNanos(this, nanos);//阻塞当前线程,nanos时间
        }
        else
            LockSupport.park(this);//阻塞当前线程
    }
}
//等待节点,记录等待执行结果的线程
static final class WaitNode {
    volatile Thread thread; //等待线程
    volatile WaitNode next;//指向下一节点,链表结构
    WaitNode() { thread = Thread.currentThread(); }
}
//删除等待的节点
private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;//设置待删除节点的线程为空,用于标记待删除线程
        retry:
        for (;;) { // 处理多个节点同时删除的情况,restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;//s指向q的后继节点
                if (q.thread != null)//判断当前节点是否是待删除的节点
                    pred = q;//pred指向q的前驱节点
                else if (pred != null) {//走到这个分支,说明q.thread==null,也就是说q为待删除节点
                    pred.next = s;//将q的前驱指向q的后继
                    if (pred.thread == null) // 如果此时前驱节点也变成待删除节点,说明此时有竞争,CAS重试
                        continue retry;
                }
                //走到这个分支,说明前驱节点为空,此时设置q的后继节点为链表的头节点
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    continue retry;//失败则跳出,cas重试
            }
            break;
        }
    }
}
//返回任务执行结果
private V report(int s) throws ExecutionException {
    Object x = outcome;//执行结果
    if (s == NORMAL)//判断状态,执行完成则返回
        return (V)x;
    if (s >= CANCELLED)//取消执行
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);//执行异常
}


以上,我们分析了Future造成调用线程阻塞的逻辑,那么何时会唤醒阻塞的线程呢?有两种情况,一是任务执行完毕,一是任务被取消执行。


5.唤醒阻塞的线程:


//类名FutureTask
//线程池的任务执行逻辑
 public void run() {
     //不是初始状态,或者CAS设置任务的当前处理线程失败,则返回
     if (state != NEW ||
         !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                      null, Thread.currentThread()))
         return;
     try {
         Callable<V> c = callable;
         if (c != null && state == NEW) {//判断状态,任务有可能被取消执行
             V result;
             boolean ran;
             try {
                 result = c.call();//执行具体任务
                 ran = true;
             } catch (Throwable ex) {
                 result = null;
                 ran = false;
                 setException(ex);//设置任务异常
             }
             if (ran)
                 set(result);//设置执行结果
         }
     } finally {
         // runner must be non-null until state is settled to
         // prevent concurrent calls to run()
         runner = null;//执行线程置空
         // state must be re-read after nulling runner to prevent
         // leaked interrupts
         int s = state;
         if (s >= INTERRUPTING)
             handlePossibleCancellationInterrupt(s);
     }
 }
protected void set(V v) {
    //cas操作,防止此时其他线程修改状态
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;//记录结果
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 设置最终状态,final state
        finishCompletion();//执行完成,唤醒等待结果的线程
    }
}
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
//任务执行完成,唤醒等待节点
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        //cas设置链表头结点为空,设置失败,重新循环判断
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);//唤醒等待节点
                }
                WaitNode next = q.next;
                if (next == null)//到尾部后,退出循环
                    break;
                q.next = null; // unlink to help gc
                q = next;//移动节点,循环判断
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}
//取消任务执行,返回false表示取消失败,true表示成功
public boolean cancel(boolean mayInterruptIfRunning) {
    //如果状态不是NEW,或者CAS设置状态失败,返回false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                                   mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {//运行的时候是否允许中断
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();//中断任务执行线程
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();//通知任务已执行完毕
    }
    return true;
}


6.Java中线程池的Future的运行原理:任务提交到线程池后,线程池会生成一个Future对象用于跟踪任务的执行状态,并可以取消任务的执行,如果线程池还没有开始执行任务,那么取消操作将导致任务不会被线程池执行,否则的话取消操作可以对线程池中执行任务的线程进行中断。



目录
相关文章
|
4天前
|
安全 Java UED
Java中的多线程编程:从基础到实践
本文深入探讨了Java中的多线程编程,包括线程的创建、生命周期管理以及同步机制。通过实例展示了如何使用Thread类和Runnable接口来创建线程,讨论了线程安全问题及解决策略,如使用synchronized关键字和ReentrantLock类。文章还涵盖了线程间通信的方式,包括wait()、notify()和notifyAll()方法,以及如何避免死锁。此外,还介绍了高级并发工具如CountDownLatch和CyclicBarrier的使用方法。通过综合运用这些技术,可以有效提高多线程程序的性能和可靠性。
|
4天前
|
缓存 Java UED
Java中的多线程编程:从基础到实践
【10月更文挑战第13天】 Java作为一门跨平台的编程语言,其强大的多线程能力一直是其核心优势之一。本文将从最基础的概念讲起,逐步深入探讨Java多线程的实现方式及其应用场景,通过实例讲解帮助读者更好地理解和应用这一技术。
19 3
|
6天前
|
缓存 安全 Java
使用 Java 内存模型解决多线程中的数据竞争问题
【10月更文挑战第11天】在 Java 多线程编程中,数据竞争是一个常见问题。通过使用 `synchronized` 关键字、`volatile` 关键字、原子类、显式锁、避免共享可变数据、合理设计数据结构、遵循线程安全原则和使用线程池等方法,可以有效解决数据竞争问题,确保程序的正确性和稳定性。
13 2
|
7天前
|
存储 安全 Java
Java-如何保证线程安全?
【10月更文挑战第10天】
|
2天前
|
缓存 算法 Java
Java 中线程和纤程Fiber的区别是什么?
【10月更文挑战第14天】
10 0
|
13天前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
32 1
C++ 多线程之初识多线程
|
28天前
|
数据采集 负载均衡 安全
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
本文提供了多个多线程编程问题的解决方案,包括设计有限阻塞队列、多线程网页爬虫、红绿灯路口等,每个问题都给出了至少一种实现方法,涵盖了互斥锁、条件变量、信号量等线程同步机制的使用。
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
|
1月前
|
Java Spring
spring多线程实现+合理设置最大线程数和核心线程数
本文介绍了手动设置线程池时的最大线程数和核心线程数配置方法,建议根据CPU核数及程序类型(CPU密集型或IO密集型)来合理设定。对于IO密集型,核心线程数设为CPU核数的两倍;CPU密集型则设为CPU核数加一。此外,还讨论了`maxPoolSize`、`keepAliveTime`、`allowCoreThreadTimeout`和`queueCapacity`等参数的设置策略,以确保线程池高效稳定运行。
139 10
spring多线程实现+合理设置最大线程数和核心线程数
|
13天前
|
存储 前端开发 C++
C++ 多线程之带返回值的线程处理函数
这篇文章介绍了在C++中使用`async`函数、`packaged_task`和`promise`三种方法来创建带返回值的线程处理函数。
36 6
|
10天前
|
存储 运维 NoSQL
Redis为什么最开始被设计成单线程而不是多线程
总之,Redis采用单线程设计是基于对系统特性的深刻洞察和权衡的结果。这种设计不仅保持了Redis的高性能,还确保了其代码的简洁性、可维护性以及部署的便捷性,使之成为众多应用场景下的首选数据存储解决方案。
22 1