7、Callable接口
引入: 面试题:获得多线程的方法几种?
(1)继承thread类(2)runnable接口
如果只回答这两个你连被问到juc的机会都没有
(3) 实现Callable接口
7.1 是什么?
这是一个函数式接口,因此可以用作lambda表达式或方法引用的赋值对象。
7.2 与runnable对比
实现方法对比:
//创建新类MyThread实现runnable接口 class MyThread implements Runnable{ @Override public void run() { } } //新类MyThread2实现callable接口 class MyThread2 implements Callable<Integer>{ @Override public Integer call() throws Exception { return 200; } }
面试题:callable接口与runnable接口的区别?
答:(1)是否有返回值
(2)是否抛异常
(3)落地方法不一样,一个是run,一个是call
7.3 怎么用
直接替换runnable是否可行?
不可行,因为:thread类的构造方法根本没有Callable
解决办法:认识不同的人找中间人
这像认识一个不认识的同学,我可以找中间人介绍。
中间人是什么?java多态,一个类可以实现多个接口!!
运行成功后如何获得返回值?ft.get();
实现代码
class MyThread implements Callable<Integer> { @Override public Integer call() throws Exception { TimeUnit.SECONDS.sleep(4); System.out.println("*********come in here"); return 1024; } } /** * @author lxy * @version 1.0 * @Description 多线程中,第三种获得多线程的方式 * @date 2022/4/29 16:55 */ public class CallableDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask <Integer> futureTask = new FutureTask <>(new MyThread()); new Thread(futureTask,"A").start();//只被执行一次. new Thread(futureTask,"B").start(); System.out.println(Thread.currentThread().getName()); System.out.println(futureTask.get()); } }
7.4 FutureTask
7.4.1 FutureTask原理
未来的任务,用它就干一件事,异步调用
main方法就像一个冰糖葫芦,一个个方法由main串起来。
但解决不了一个问题:正常调用挂起堵塞问题 , 比如一个方法执行起来很花费时间…我们可以将它单独另启动一个线程A去做,主线程继续往下执行,最终在主线程结束之前将A的结果再进行操作… 可以大大提高效率.
例子:
(1)老师上着课,口渴了,去买水不合适,讲课线程继续,我可以单起个线程找班长帮忙买水,
水买回来了放桌上,我需要的时候再去get。
(2)4个同学,A算1+20,B算21+30,C算31*到40,D算41+50,是不是C的计算量有点大啊,
FutureTask单起个线程给C计算,我先汇总ABD,最后等C计算完了再汇总C,拿到最终结果
(3)高考:会做的先做,不会的放在后面做
7.4.2 原理补充
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。
一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。get方法获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。
只计算一次(线程只能被创建执行一次),get方法放到最后
8、JUC强大的辅助类讲解
8.1 CountDownLatch(减少计数)
/** * * CountDownLatch: 让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒。 * * CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。 * 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞), * 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。 * * 解释:6个同学陆续离开教室后值班同学才可以关门。 * * main主线程必须要等前面6个线程完成全部工作后,自己才能开干 */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6);//给计数器赋值为6 for (int i = 0; i < 6; i++) {//有六个上自习的同学,各自离开教室的时间不一样 new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t号同学离开教室"); countDownLatch.countDown();//每调用一次,计数器减一(也就是一个线程退出) },String.valueOf(i)).start(); } countDownLatch.await();//阻塞main线程,当计数器减为0,则唤醒main线程. System.out.println(Thread.currentThread().getName()+"\t************班长关门走人,main线程是班长"); } }
8.2 CyclicBarrier(循环栅栏)
/** * CyclicBarrier:字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是, * 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有 * 被屏障拦截的线程才会继续干活。 * 线程进入屏障通过CyclicBarrier的await()方法。 * * 例:集齐7颗龙珠就可以召唤神龙 */ public class CyclicBarrierDemo { public static void main(String[] args) { //CyclicBarrier(int parties, Runnable barrierAction) 当所有的屏障经历完才干活 CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> { System.out.println("**********召唤神龙"); }); for (int i = 0; i < 7; i++) { final int tempInt = i; new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t收集到第"+tempInt+"颗龙珠"); try { cyclicBarrier.await();//进行等待 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } },String.valueOf(i)).start(); } } }
8.3 Semaphore 信号灯
/** * * @Description: 信号量 * * 在信号量上我们定义两种操作: * acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1), * 要么一直等下去,直到有线程释放信号量,或超时。 * release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。 * * 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用(资源被用尽,只能等待,直到有多余资源), * 另一个用于并发线程数的控制。(1s 100w访问量) * 例:抢车位 */ public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3);//资源类,有三个空车位 for (int i = 1; i <= 6; i++) { new Thread(()->{ try { semaphore.acquire();//占车位,车位数-1 System.out.println(Thread.currentThread().getName()+"\t抢占到了车位"); //暂停一会线程 TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"\t离开了车位"); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release();//车位数+1 } },String.valueOf(i)).start(); } } }
9、ReentrantReadWriteLock
9.1 读写锁介绍
现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。
针对这种场景,JAVA 的并发包提供了读写锁ReentrantReadWriteLock ,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁。
线程进入读锁的前提条件:
没有其他线程的写锁
没有写请求, 或者有写请求,但调用线程和持有锁的线程是同一个(可重入锁)。
线程进入写锁的前提条件:
没有其他线程的读锁
没有其他线程的写锁
而读写锁有以下三个重要的特性:
(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
(2)重进入:读锁和写锁都支持线程重进入。
(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。
9.2 ReentrantReadWriteLock
ReentrantReadWriteLock 类的整体结构
public class ReentrantReadWriteLock implements ReadWriteLock,java.io.Serializable { /** * 读锁 */ private final ReentrantReadWriteLock.ReadLock readerLock; /** * 写锁 */ private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; /** * 使用默认(非公平)的排序属性创建一个新的 * ReentrantReadWriteLock */ public ReentrantReadWriteLock() { this(false); } /** * 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } /** * 返回用于写入操作的锁 */ public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } /** * 返回用于读取操作的锁 */ public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } abstract static class Sync extends AbstractQueuedSynchronizer { } static final class NonfairSync extends Sync { } static final class FairSync extends Sync { } public static class ReadLock implements Lock, java.io.Serializable { } public static class WriteLock implements Lock, java.io.Serializable { } }
可以看到,ReentrantReadWriteLock 实现了ReadWriteLock 接口,ReadWriteLock 接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable 接口,表示可以进行序列化,在源代码中可以看到ReentrantReadWriteLock 实现了自己的序列化逻辑。
9.3 入门案例
场景: 使用ReentrantReadWriteLock 对一个hashmap 进行读和写操作
package com.rg.juc; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; class MyCache { private volatile Map <String, Object> map = new HashMap <>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public void put(String key, Object value) { readWriteLock.writeLock().lock();//上写锁 try { System.out.println(Thread.currentThread().getName() + "\t ----写入数据" + key); //网络拥堵(暂停一会线程) try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } map.put(key, value); System.out.println(Thread.currentThread().getName() + "\t ----写入完成"+key); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } public void get(String key) { readWriteLock.readLock().lock();//读锁 try { System.out.println(Thread.currentThread().getName() + "\t ----读取数据" + key); //网络拥堵(暂停一会线程) try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } Object result = map.get(key); System.out.println(Thread.currentThread().getName() + "\t ----读取完成" + result); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } } /** * @author lxy * @version 1.0 * @Description * @date 2022/5/1 11:59 */ public class ReadWriteLockDemo { public static void main(String[] args) { MyCache myCache = new MyCache(); //五个读线程 for (int i = 1; i <= 5; i++) { final int tempInt = i; new Thread(() -> { myCache.put(tempInt + "", tempInt + ""); }, String.valueOf(i)).start(); } //五个写线程 for (int i = 1; i <= 5; i++) { final int tempInt = i; new Thread(() -> { myCache.get(tempInt + ""); }, String.valueOf(i)).start(); } } }