92. 你说你精通Java并发,那给我讲讲JUC吧(三)
J.U.C - 其它组件(这部分还需要细致总结)
FutureTask
在介绍 Callable 时我们知道它可以有返回值,返回值通过 Future 进行封装。FutureTask 实现了 RunnableFuture 接口,该接口继承自 Runnable 和 Future 接口,这使得 FutureTask 既可以当做一个任务执行,也可以有返回值。
public class FutureTask<V> implements RunnableFuture<V>public interface RunnableFuture<V> extends Runnable, Future<V>
FutureTask 可用于异步获取执行结果或取消执行任务的场景。当一个计算任务需要执行很长时间,那么就可以用 FutureTask 来封装这个任务,主线程在完成自己的任务之后再去获取结果。
public class FutureTaskExample { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { int result = 0; for (int i = 0; i < 100; i++) { Thread.sleep(10); result += i; } return result; } }); Thread computeThread = new Thread(futureTask); computeThread.start(); Thread otherThread = new Thread(() -> { System.out.println("other task is running..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); otherThread.start(); System.out.println(futureTask.get()); } }
控制台输出结果为:
other task is running... 4950
BlockingQueue
java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:
FIFO 队列 : LinkedBlockingQueue、ArrayBlockingQueue(固定长度)
优先级队列 : PriorityBlockingQueue 提供了阻塞的 take() 和 put() 方法:如果队列为空 take() 将阻塞,直到队列中有内容;如果队列为满 put() 将阻塞,直到队列有空闲位置。
使用 BlockingQueue 实现生产者消费者问题
public class ProductorConsumer { private static BlockingQueue<String> quene = new ArrayBlockingQueue<>(5); private static class Productor extends Thread{ @Override public void run() { try { quene.put("product"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("productor..."); } } private static class Consumer extends Thread{ @Override public void run() { try { String product = quene.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("consumer..."); } } public static void main(String[] args) { for(int i = 0; i < 2; i++){ Productor productor = new Productor(); productor.start(); } for(int i = 0; i < 5; i++){ Consumer consumer = new Consumer(); consumer.start(); } for(int i = 0; i < 3; i++){ Productor productor = new Productor(); productor.start(); } } }
控制台输出结果为(每次都不一样):
productor... productor... consumer... consumer... productor... productor... consumer... consumer... productor... consumer...
ForkJoin
使用了**“分治”**的思想。
主要用于并行计算中,和 MapReduce 原理类似,都是把大的计算任务拆分成多个小任务并行计算。
import java.util.concurrent.RecursiveTask; public class ForkJoinExample extends RecursiveTask<Integer> { private final int threshold = 5; private int first; private int last; public ForkJoinExample(int first, int last) { this.first = first; this.last = last; } @Override protected Integer compute() { int result = 0; if (last - first <= threshold) { // 任务足够小则直接计算 for (int i = first; i <= last; i++) { result += i; } } else { // 拆分成小任务 int middle = first + (last - first) / 2; ForkJoinExample leftTask = new ForkJoinExample(first, middle); ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last); leftTask.fork(); rightTask.fork(); result = leftTask.join() + rightTask.join(); } return result; } }
窃取算法(工作窃密算法)
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
一个大任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并未每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。比如线程1负责处理1队列里的任务,2线程负责2队列的。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务待处理。干完活的线程与其等着,不如帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们可能会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务线程永远从双端队列的尾部拿任务执行。
优点:充分利用线程进行并行计算,减少线程间的竞争。
缺点:在某些情况下还是会存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源, 比如创建多个线程和多个双端队列。
在Java中,
可以使用LinkedBlockingDeque来实现工作窃取算法
JDK1.7引入的Fork/Join框架就是基于工作窃取算法
另外,jdk1.7中引入了一种新的线程池:WorkStealingPool。
参考:
https://blog.csdn.net/pange1991/article/details/80944797
https://blog.csdn.net/huzhiqiangcsdn/article/details/55251384