前言
本文为JUC并发编程相关知识,Java全栈学习路线可参考:【Java全栈学习路线】最全的Java学习路线及知识清单,Java自学方向指引,内含最全Java全栈学习技术清单~
本文上接:【JavaSE】之JUC并发编程(上)
十、阻塞队列
1.阻塞队列简介
集合框架:
阻塞队列特点:
- 先进先出
- 写入时,如果队列满了就必须阻塞等待
- 取出时,如果队列为空就必须阻塞等待
2.四组API
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞,一直等待 | 阻塞,超时等待 |
添加 | add() | offer() | put() | offer(,,) |
移除 | remove() | pull() | take() | pull(,) |
检测队首元素 | element() | peek() | - | - |
代码示例:
public class Test { public static void main(String[] args) throws InterruptedException { test4(); } // 抛出异常:java.lang.IllegalStateException: Queue full public static void test1(){ // 队列的大小为3 ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); // add()方法返回boolean值 boolean flag1 = blockingQueue.add("a"); boolean flag2 = blockingQueue.add("b"); boolean flag3 = blockingQueue.add("c"); boolean flag4 = blockingQueue.add("d");// add添加元素超过队列的长度会抛出异常java.lang.IllegalStateException: Queue full System.out.println(blockingQueue.element());// 获得队首元素 System.out.println("========="); // remove()返回本次移除的元素 Object e1 = blockingQueue.remove(); Object e2 = blockingQueue.remove(); Object e3 = blockingQueue.remove(); Object e4 = blockingQueue.remove();// 队列中没有元素仍继续移除元素会抛出异常java.util.NoSuchElementException } // 有返回值,不抛出异常 public static void test2(){ // 队列的大小为3 ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); // offer返回boolean值 boolean flag1 = blockingQueue.offer("a"); boolean flag2 = blockingQueue.offer("b"); boolean flag3 = blockingQueue.offer("c"); //boolean flag4 = blockingQueue.offer("d");// offer添加元素超过队列的长度会返回false System.out.println(blockingQueue.peek());// 获得队首元素 System.out.println("========="); // poll()返回本次移除的元素 Object poll1 = blockingQueue.poll(); Object poll2 = blockingQueue.poll(); Object poll3 = blockingQueue.poll(); Object poll4 = blockingQueue.poll();// 队列中没有元素仍继续移除元素会打印出null } // 阻塞,一直等待 public static void test3() throws InterruptedException { // 队列的大小为3 ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); // put没有返回值 blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); //blockingQueue.put("d");// put添加元素超过队列的长度会一直等待 System.out.println("========="); // take()返回本次移除的元素 Object take1 = blockingQueue.take(); Object take2 = blockingQueue.take(); Object take3 = blockingQueue.take(); Object take4 = blockingQueue.take();// 队列中没有元素仍继续移除元素会一直等待 } // 阻塞,超时等待 public static void test4() throws InterruptedException { // 队列的大小为3 ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); // offer返回boolean值 boolean flag1 = blockingQueue.offer("a"); boolean flag2 = blockingQueue.offer("b"); boolean flag3 = blockingQueue.offer("c"); // offer添加元素超过队列的长度会返回false;并且等待指定时间后推出,向下执行 boolean flag4 = blockingQueue.offer("d", 2, TimeUnit.SECONDS); System.out.println("========="); // poll()返回本次移除的元素 Object poll1 = blockingQueue.poll(); Object poll2 = blockingQueue.poll(); Object poll3 = blockingQueue.poll(); // 队列中没有元素仍继续移除元素会打印出null,等待指定之间后退出。 Object poll4 = blockingQueue.poll(2,TimeUnit.SECONDS); } }
3.SynchronousQueue同步队列
- 特点:进去一个元素,必须等待取出这个元素后,才能放下一个元素
- 添加元素方法:put()
- 取出元素方法:take()
十一、线程池
1.线程池简介
- 程序的运行,本质:占用系统的资源! (优化资源的使用 => 池化技术)
- 池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我
- 线程池的好处:1、降低系统资源的消耗;2、提高响应的速度;3、方便管理
对于线程池的一句话总结: 3大方法、7大参数、4大拒绝策略
2.三大方法
创建单个线程的线程池:Executors.newSingleThreadExecutor();
创建一个固定大小的线程池: Executors.newFixedThreadPool(5);
创建一个可伸缩的线程池:Executors.newCachedThreadPool();
代码示例:
package com.wang.pool; import java.util.concurrent.ExecutorService; import java.util.List; import java.util.concurrent.Executors; public class Demo01 { public static void main(String[] args) { // Executors 工具类、3大方法 // Executors.newSingleThreadExecutor();// 创建单个线程的线程池 // Executors.newFixedThreadPool(5);// 创建一个固定大小的线程池 // Executors.newCachedThreadPool();// 创建一个可伸缩的线程池 // 单个线程的线程池 ExecutorService threadPool = Executors.newSingleThreadExecutor(); try { for (int i = 1; i < 100; i++) { // 使用了线程池之后,使用线程池来创建线程 threadPool.execute(()->{ System.out.println( Thread.currentThread().getName()+" ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 线程池用完,程序结束,关闭线程池 threadPool.shutdown(); } } }
3.七大参数
int corePoolSize:核心线程池大小
int maximumPoolSize:最大核心线程池大小
long keepAliveTime:超时没有人调用就会释放
TimeUnit unit:超时单位
BlockingQueue workQueue:阻塞队列
ThreadFactory threadFactory:线程工厂:创建线程的,一般 不用动
RejectedExecutionHandler handle:拒绝策略
源码分析:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor( 5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } // 本质ThreadPoolExecutor() public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小 int maximumPoolSize, // 最大核心线程池大小 long keepAliveTime, // 超时没有人调用就会释放 TimeUnit unit, // 超时单位 // 阻塞队列 BlockingQueue<Runnable> workQueue, // 线程工厂:创建线程的,一般 不用动 ThreadFactory threadFactory, // 拒绝策略 RejectedExecutionHandler handle ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
因为实际开发中工具类Executors 不安全,所以需要手动创建线程池,自定义7个参数。
示例代码:
package com.wang.pool; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; // Executors 工具类、3大方法 // Executors.newSingleThreadExecutor();// 创建一个单个线程的线程池 // Executors.newFixedThreadPool(5);// 创建一个固定大小的线程池 // Executors.newCachedThreadPool();// 创建一个可伸缩的线程池 /** * 四种拒绝策略: * * new ThreadPoolExecutor.AbortPolicy() * 银行满了,还有人进来,不处理这个人的,抛出异常 * * new ThreadPoolExecutor.CallerRunsPolicy() * 哪来的去哪里!比如你爸爸 让你去通知妈妈洗衣服,妈妈拒绝,让你回去通知爸爸洗 * * new ThreadPoolExecutor.DiscardPolicy() * 队列满了,丢掉任务,不会抛出异常! * * new ThreadPoolExecutor.DiscardOldestPolicy() * 队列满了,尝试去和最早的竞争,也不会抛出异常! */ public class Demo01 { public static void main(String[] args) { // 自定义线程池!工作 ThreadPoolExecutor ExecutorService threadPool = new ThreadPoolExecutor( 2,// int corePoolSize, 核心线程池大小(候客区窗口2个) 5,// int maximumPoolSize, 最大核心线程池大小(总共5个窗口) 3,// long keepAliveTime, 超时3秒没有人调用就会释,放关闭窗口 TimeUnit.SECONDS,// TimeUnit unit, 超时单位 秒 new LinkedBlockingDeque<>(3),// 阻塞队列(候客区最多3人) Executors.defaultThreadFactory(),// 默认线程工厂 // 4种拒绝策略之一: // 队列满了,尝试去和 最早的竞争,也不会抛出异常! new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了,尝试去和最早的竞争,也不会抛出异常! try { // 最大承载:Deque + max // 超过 RejectedExecutionException for (int i = 1; i <= 9; i++) { // 使用了线程池之后,使用线程池来创建线程 threadPool.execute(()->{ System.out.println( Thread.currentThread().getName()+" ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 线程池用完,程序结束,关闭线程池 threadPool.shutdown(); } } }
4.四大拒绝策略
new ThreadPoolExecutor.AbortPolicy() :线程队列满了,不处理该线程,抛出异常!
new ThreadPoolExecutor.CallerRunsPolicy() :交给主线程去处理!
new ThreadPoolExecutor.DiscardPolicy() :队列满了,丢掉任务,不会抛出异常!
new ThreadPoolExecutor.DiscardOldestPolicy() :队列满了,尝试去和最早的竞争,也不会抛出异常!
5.最大线程应该如何设置?
- CPU密集型:最大线程数,CPU几核的就是几,可以保持CPU效率最高。
- IO密集型:判断程序中十分耗IO的线程数量,大于这个数,一般是这个数的两倍。
十二、四大函数式接口
函数式接口:只有一个方法的接口。
四大函数式接口:
Function函数型接口:有一个输入参数,有一个输出(返回值)。
predicate断定型接口:有一个输入参数,返回值只能是boolean值。
consumer消费型接口:有一个输入参数,没有返回值。
supplier供给型接口:没有输入参数,有一个输出(返回值)。
十三、stream流式计算
Stream 流式计算是在jdk 1.8后引入的新特性,将集合或数组转换成一种流的元素序列。流不是集合中的元素,也不是一种数据结构,不负责数据的存储。Stream 流也不会改变源对象(源集合)。
Stream 接口中几乎所有方法的参数都是四大函数式接口接口类型的参数。而函数式接口可以使用 lambda 表达式来简化开发,并且Stream 接口中的方法基本都是返回对象本身(返回对象本身的方法可以使用链式编程)。所以在使用 Stream流式计算时,基本上都用到了函数式接口、lambda表达式 和 链式编程。
Collection接口在 jdk 1.8 后新增了一个 stream() 方法,调用该方法会得到一个 Stream流对象。通过这个对象可以调用 Stream接口相应的方法对集合进行过滤、排序、截断(丢弃)、截断(获取)、转换、遍历、计数、拼接、取最大值、取最小值 等操作。
代码示例:
/** * 筛选用户,五个条件 * 1.ID是偶数 2.年龄大于23岁 3.用户名转化为大写字母 4.用户名字母倒着排序 5.只输出一个用户 */ public class Test { public static void main(String[] args) { User u1 = new User(1,"A",23); User u2 = new User(2,"B",21); User u3 = new User(3,"C",25); User u4 = new User(4,"D",30); User u4 = new User(5,"E",28); //集合就是存储 List<User> list = Arrays.asList(u1,u2,u3,u4); //计算交给Stream list.stream() .filter(user -> {return user.getId()%2==0;}) .filter(user -> {return user.getAge()>23;}) .map(user -> {return user.getName().toUpperCase();}) .sorted((uu1,uu2)->{return uu2.compareTo(uu1);}) .limit(1) .forEach(System.out::println); } }
十四、Forkjoin详解
1.什么是ForkJoin
ForkJoin也是一种线程池,只不过ForkJoin是专为CPU密集型任务而建立的线程池,它能大大提高CPU密集型任务的执行效率。主要用于并行执行任务,提高效率,大数据量。
ForkJoin是使用分治算法实现的,主要的原理就是将一个大的任务拆分为若干个小任务分发给若干个线程去处理,最后将若干的线程处理好后的结果进行汇总,从而达到提升计算效率的结果。
2.forkjoinPool
- (1)通过forkjoinPool来执行ForkJoin
- (2)计算任务:forkjoinPool.execute(ForkJoinTask task)
代码示例:
public class ForkJoinDemo extends RecursiveTask<long> { private Long start;//1 private Long end;//19990009 //临界值 private Long temp = 10000L; public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } //计算方法 @Override protected long compute() { if ((end-start)<temp){ int sum = 0; for (int i=1;i<10_0000_0000;i++){ sum+=i; } return sum; }else { //取一个中间值 long middle = (start + end) / 2; ForkJoinDemo forkJoinDemo1 = new ForkJoinDemo(start,middle); //拆分任务,把任务压到线程队列 forkJoinDemo1.fork(); ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo(middle+1,end); //拆分任务,把任务压到线程队列 forkJoinDemo2.fork(); return forkJoinDemo1.join()+forkJoinDemo2.join(); } } }
3.三种方式进行计算任务
public static void test1(){ Long sum = 0L; long start = System.currentTimeMillis(); for (int i=1;i<10_0000_0000;i++){ sum+=i; } long end = System.currentTimeMillis(); System.out.println("sum="+sum+"时间:"+(end-start)); } public static void test2() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L,10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task); Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum="+sum+"时间:"+(end-start)); } public static void test3(){ long start = System.currentTimeMillis(); //stream并行流 long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum); long end = System.currentTimeMillis(); System.out.println("sum="+sum+"时间:"+(end-start)); }
结果:test3快于test2快于test1。
十五、异步回调
1.Future
- future设计的初衷就是对将来某个事件结果进行建模
- 本质就是前端发送Ajax异步请求给后端
- 实现类中使用比较多的事CompletableFuture
2.没有返回值的runAsync异步回调
public static void main(String[] args) throws ExecutionException, InterruptedException { //发起一个请求 CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{ try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"执行"); }); System.out.println("1111"); //获取阻塞执行结果 completableFuture.get(); }
3.有返回值的supplyAsync异步回调
- whenComplete()方法有两个参数,T代表正常的返回结果,U代表的是抛出异常的错误信息
- 如果发生异常的话get()方法会得到exceptionally里的信息
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"执行"); return 1024; }); System.out.println(completableFuture.whenComplete((t,u)->{ System.out.println("t:"+t+";u="+u); }).exceptionally((e)->{ System.out.println(e.getMessage()); return 233; }).get()); }