《深入探索Java并发编程&从锁到并发工具的深入解析》(上)+https://developer.aliyun.com/article/1625012
CyclickBarrier
其实CyclickBarrier
的本质就是一个加法计数器。
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
package org.example; import java.util.concurrent.CyclicBarrier; /** * @author linghu * @date 2023/12/20 10:32 */ public class CyclickBarrierDemo { public static void main(String[] args) { /** * 集齐7颗龙珠召唤神龙 */ //召唤龙珠的线程 CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{ System.out.println("召唤神龙~"); }); for (int i=1;i<=7;i++){ int temp=i; new Thread(()->{ System.out.println(Thread.currentThread().getName()+"搜集"+temp+"颗龙珠"); try { cyclicBarrier.await();//等待计数器到7,然后往下执行~ } catch (Exception e) { e.printStackTrace(); } },String.valueOf(i)).start(); } } }
如上代码运行结果如下:
总结
cyclicBarrier.await()
是 Java 中的一个方法,用于让当前线程等待其他线程到达屏障(CyclicBarrier)时再继续执行。当所有线程都调用了await()
方法后,屏障才会打开,允许所有线程继续执行。这个方法通常用在多线程编程中,以确保所有线程都达到某个同步点后再继续执行。
Semaphore
在操作系统中我们学过信号量,学过PV操作。其实这里的 Semaphore
信号量就是用来做线程限流操作的。
我们看如下代码:
package org.example; import sun.awt.windows.ThemeReader; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * @author linghu * @date 2023/12/20 10:58 */ public class SemaphoreDemo { public static void main(String[] args) { //线程数量:停车位!用来限流 Semaphore semaphore = new Semaphore(3); for (int i=1;i<=6;i++){ new Thread(()->{ try { semaphore.acquire();//得到 System.out.println(Thread.currentThread().getName()+"抢到车位"); TimeUnit.SECONDS.sleep(2);//让车多停一会 System.out.println(Thread.currentThread().getName()+"离开车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release();//释放 } },String.valueOf(i)).start(); } } }
在上面代码中,我们对6个线程-车进行限流,只有3个车位,6个车去抢车位!
总结
semaphore.acquire()
;//得到,等待释放为止semaphore.release()
;//释放信号量以后会唤醒等待中的线程
关于PV操作,其实可以看我这个视频课程:
读写锁
所谓的读写锁(Readers-Writer Lock),顾名思义就是将一个锁拆分为读锁和写锁两个锁。其中读锁允许多个线程同时获得,而写锁则是互斥锁,不允许多个线程同时获得写锁,并且写操作和读操作也是互斥的。
为什么要读写锁?
Synchronized 和 ReentrantLock 都是独占锁,即在同一时刻只有一个线程获取到锁。然而在有些业务场景中,我们大多在读取数据,很少写入数据,这种情况下,如果仍使用独占锁,效率将及其低下。针对这种情况,Java提供了读写锁——ReentrantReadWriteLock。
主要解决:对共享资源有读和写的操作,且写操作没有读操作那么频繁的场景。
案例
如下分别读写线程6次执行:
package org.example; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @author linghu * @date 2023/12/20 11:50 */ public class ReadWriteLockDemo { public static void main(String[] args) { MyCache2 myCache2 = new MyCache2(); int num=6; for (int i=1;i<=6;i++){ int finall=i; //6个线程开始写 new Thread(()->{ myCache2.write(String.valueOf(finall),String.valueOf(finall)); },String.valueOf(i)).start(); //6个线程开始读 new Thread(()->{ myCache2.read(String.valueOf(finall)); },String.valueOf(i)).start(); } } } class MyCache2{ private volatile Map<String, String> map=new HashMap<>(); private ReadWriteLock lock=new ReentrantReadWriteLock(); public void write(String key,String value){ lock.writeLock().lock();//写锁 try { System.out.println(Thread.currentThread().getName()+"线程开始写入"); map.put(key,value); System.out.println(Thread.currentThread().getName()+"线程写入OK"); } catch (Exception e) { e.printStackTrace(); } finally { lock.writeLock().unlock();//释放写锁 } } public void read(String key){ lock.readLock().lock();//读锁 try { System.out.println(Thread.currentThread().getName()+"线程开始读取"); map.get(key); System.out.println(Thread.currentThread().getName()+"线程读取OK"); } catch (Exception e) { e.printStackTrace(); } finally { lock.readLock().unlock();//释放读锁 } } }
运行效果:
总结
如果我们不用锁,多线程的读写会造成数据不可靠的问题。我们可以采用独占锁:Synchronized 和 ReentrantLock保证数据可靠。但是使用更细粒度的锁读写锁(Readers-Writer Lock)效率更高!
BlockQueue
BlockQueue
是 Collection
的一个子类,我们把它叫做:阻塞队列!整个队列的家族是下面这幅图这个样子的:
通过上图我们知道了我们重点要学的是BlockQueue
!
BlockingQueue
主要提供了四类方法,如下表所示:
同步队列
同步队列没有容量,也可以视为容量为1的队列。
- 进去一个元素必须等待取出来以后才能往里面放元素
- put了一个元素,就必须从里面先take出来,否则不能再put进去值!
代码
如下通过一个代码案例进行说明:
package org.example; import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; /** * @author linghu * @date 2023/12/21 15:16 */ public class SynchronousQueue { public static void main(String[] args) { BlockingQueue<String> synchronousQueue= new java.util.concurrent.SynchronousQueue<>(); //往queue中添加元素 new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"put 01"); synchronousQueue.put("1"); System.out.println(Thread.currentThread().getName()+"put 02"); synchronousQueue.put("2"); System.out.println(Thread.currentThread().getName()+"put 03"); synchronousQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); //取出元素 new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"take"+synchronousQueue.take()); System.out.println(Thread.currentThread().getName()+"take"+synchronousQueue.take()); System.out.println(Thread.currentThread().getName()+"take"+synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
总结:上述代码中,三个线程负责往队列里put元素,但是只能等到对列的元素被take的时候才能往里面put!
执行结果如下:
上图得知总结:01,02在put以后只有take以后才能put03!说明了队列的同步性!
线程池
线程池用到了池化技术,其实在编程中,很多地方都有池化技术的思想,比如数据库连接池,HttpClient 连接池等。池化技术的出现主要是为了解决:资源的利用问题!提高效率,对资源进行复用!
这里我们用了线程池,目的就是 复用线程!。线程复用才可以控制最大并发数,管理线程!
线程池学习原则总结为:
- 三大方式
- 七大参数
- 四种拒绝策略
线程池的三大方法
Executors.newSingleThreadExecutor()
单个线程的创建Executors.newFixedThreadPool(5)
创建一个固定大小的线程池Executors.newCachedThreadPool()
可伸缩的线程池创建
package org.example; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author linghu * @date 2023/12/21 16:35 * Executors工具类,3大方法 */ public class Demo { public static void main(String[] args) { //单个线程池,线程 ExecutorService threadPool = Executors.newSingleThreadExecutor(); //固定的线程池大小 // ExecutorService threadPool = Executors.newFixedThreadPool(5); //可伸缩的线程池大小 // ExecutorService threadPool = Executors.newCachedThreadPool(); try { for (int i=0;i<10;i++){ //使用线程池创建线程,以前用new Thread()来创建 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+"OK"); }); } } catch (Exception e) { e.printStackTrace(); } finally { //线程池用完,程序结束,关闭线程池 threadPool.shutdown(); } } }
运行结果:
七大参数
这里说到七大参数是指我们在自定义线程池的时候用到的七大参数!
七大参数如下:
- 核心线程数(corePoolSize):2
- 最大线程数(maximumPoolSize):5
- 空闲线程存活时间(keepAliveTime):3秒
- 时间单位(unit):TimeUnit.SECONDS
- 任务队列(workQueue):使用LinkedBlockingDeque作为任务队列,容量为3
- 线程工厂(threadFactory):使用默认的线程工厂(Executors.defaultThreadFactory())
- 拒绝策略(rejectedExecutionHandler):使用AbortPolicy策略,当任务队列已满时,新提交的任务会抛出RejectedExecutionException异常。
接下来我们通过一个银行等待业务的例子讲清楚上面的七大参数。
如上图所示,空闲线程存活时间(keepAliveTime)表示在候客区4、5窗口没人的情况下,3秒以后,这个窗口就会被释放掉,不能让它一直空闲下去,这样非常浪费资源!
接下来就是手动创建一个线程池,代码如下:
//自定义线程池 ExecutorService threadPool = new ThreadPoolExecutor( 2,//核心线程数(corePoolSize):2 5,//最大线程数(maximumPoolSize):5 3,//空闲线程存活时间(keepAliveTime):3秒 TimeUnit.SECONDS,//时间单位(unit):TimeUnit.SECONDS new LinkedBlockingDeque<>(3),//任务队列(workQueue):使用LinkedBlockingDeque作为任务队列,容量为3 Executors.defaultThreadFactory(),//线程工厂(threadFactory):使用默认的线程工厂(Executors.defaultThreadFactory()) new ThreadPoolExecutor.AbortPolicy()//拒绝策略(rejectedExecutionHandler):使用AbortPolicy策略,当任务队列已满时,新提交的任务会抛出RejectedExecutionException异常。 );
在上述代码的拒绝策略中,我们需要知道拒绝策略有四种:
new ThreadPoolExecutor.AbortPolicy()
;//银行满了,还有人进来,不处理这个人,抛出异常new ThreadPoolExecutor.CallerRunsPolicy()
;//哪里来的去哪里new ThreadPoolExecutor.DiscardPolicy()
;//队列满了,丢掉任务,不会抛出异常new ThreadPoolExecutor.DiscardOldestPolicy()
;//队列满了,尝试去和最早的竞争,也不会抛出异常
package org.example; import java.util.concurrent.*; /** * @author linghu * @date 2023/12/22 10:36 * 拒绝策略: * new ThreadPoolExecutor.AbortPolicy();//银行满了,还有人进来,不处理这个人,抛出异常 * new ThreadPoolExecutor.CallerRunsPolicy();//哪里来的去哪里 * new ThreadPoolExecutor.DiscardPolicy();//队列满了,丢掉任务,不会抛出异常 * new ThreadPoolExecutor.DiscardOldestPolicy();//队列满了,尝试去和最早的竞争,也不会抛出异常 */ public class Demo02 { public static void main(String[] args) { //自定义线程池 ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); try { for (int i=1;i<=8;i++){ //使用自己定义的线程池创建了线程 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" OK"); }); } } catch (Exception e) { e.printStackTrace(); } finally { //关闭线程池 threadPool.shutdown(); } } }
我们在如上代码中,创建了线程池,通过线程池创建线程,通过循环创建了8个任务,每个任务打印当前线程的名称和"OK"。这些任务被提交到自定义的线程池中执行。最后,在finally块中关闭线程池。
从上述运行结果来看,线程池中的核心线程就是1和3。最大线程数为5,空闲线程存活时间为3秒,任务队列容量为3,使用默认的线程工厂创建线程,拒绝策略为AbortPolicy
如何设置线程池的大小?
设置线程池的小主要是做调优的工作!
//自定义线程池 ExecutorService threadPool = new ThreadPoolExecutor( 2,//核心线程数(corePoolSize):2 5,//最大线程数(maximumPoolSize):5 3,//空闲线程存活时间(keepAliveTime):3秒 TimeUnit.SECONDS,//时间单位(unit):TimeUnit.SECONDS new LinkedBlockingDeque<>(3),//任务队列(workQueue):使用LinkedBlockingDeque作为任务队列,容量为3 Executors.defaultThreadFactory(),//线程工厂(threadFactory):使用默认的线程工厂(Executors.defaultThreadFactory()) new ThreadPoolExecutor.AbortPolicy()//拒绝策略(rejectedExecutionHandler):使用AbortPolicy策略,当任务队列已满时,新提交的任务会抛出RejectedExecutionException异常。 );
上述代码中,我们需要知道最大线程数(maximumPoolSize)应该怎么设置?
设置原则为:
- CPU密集型:电脑的核数是几核,这里就写多少
- I/O密集型:判断程序中消耗IO线程的数量,然后乘以1倍到2倍即可!
CPU密集型
电脑核数的查看方式有两种:
- 任务管理器
Runtime.getRuntime().availableProcessors()
任务管理器
任务管理器CPU的查看如下:
Runtime.getRuntime().availableProcessors()
其实这个方法的调用如下:
public class Main { public static void main(String[] args) { int coreCount = Runtime.getRuntime().availableProcessors(); System.out.println("电脑的核数为: " + coreCount); } }
I/O密集型
这里我们需要知道我们的IO任务是多少,然后大于这个任务1倍~2倍即可。
四大函数接口
顾明思议:函数接口就是只有一个方法的接口,如下面这个接口:
总结:这种函数式接口用的非常多,特别是在很多框架中。
四大函数接口是Java中用于处理不同类型数据的方法,它们分别是:
Consumer
:接收一个输入参数并对其执行某种操作,但不返回任何结果。Function
:接收一个输入参数并返回一个结果。Predicate
:接收一个输入参数并返回一个布尔值,表示该参数是否满足某个条件。Supplier
:不接收任何参数,但返回一个结果。
Function函数型接口
Function函数型接口就是有一个输入参数,有一个输出参数。
import java.util.function.Function; /** * @author linghu * @date ${DATE} ${TIME} */ public class Main { public static void main(String[] args) { //输出输入的值 // Function function=new Function<String,String>(){ // @Override // public String apply(String str) { // return str; // } // }; //只要是函数型接口,就可以用lambda表达式简化 Function function1=(str)->{ return str; }; System.out.println(function1.apply("123")); } }
predicate断定型接口
predicate断定型接口就是有一个输入参数,返回值只能是布尔值。
import java.util.function.Predicate; /** * @author linghu * @date 2023/12/22 17:01 */ public class Demo02 { public static void main(String[] args) { // Predicate<String> predicate = new Predicate<>() { // @Override // public boolean test(String s) { // return s.isEmpty(); // } // }; Predicate<String> predicate2 =(str)->{ return str.isEmpty(); }; System.out.println(predicate2.test("addd")); } }
Suppier供给型接口
Suppier供给型接口是没有参数,只有返回值
import java.util.function.Supplier; /** * @author linghu * @date 2023/12/22 17:11 */ public class Demo03 { public static void main(String[] args) { Supplier supplier = new Supplier<>() { @Override public Integer get() { return 1024; } }; Supplier supplier2 =()->{ return 1024; }; System.out.println(supplier2.get()); } }
Consumer消费性接口
Consumer消费性接口只有输入,没有返回值。
import java.util.function.Consumer; /** * @author linghu * @date 2023/12/22 17:15 */ public class Demo04 { public static void main(String[] args) { Consumer<String> consumer = new Consumer<>() { @Override public void accept(String s) { System.out.println(s); } }; Consumer<String> consumer2 =(str)->{ System.out.println(str); }; consumer2.accept("hi"); } }
Stream流式计算
Stream 就好像一个高级的迭代器,但只能遍历一次,就好像一江春水向东流;在流的过程中,对流中的元素执行一些操作,比如“过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等。
流的操作可以分为两种类型:
1)中间操作,可以有多个,每次返回一个新的流,可进行链式操作。
2)终端操作,只能有一个,每次执行完,这个流也就用光光了,无法执行下一个操作,因此只能放在最后。
@NoArgsConstructor public class User { private int id; private String name; private int age; public User(int i, String a, int i1) { } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }
package org.example; import java.util.Arrays; import java.util.List; /** * 题目要求: * 1、ID必须是偶数 * 2、年龄必须大于23岁 * 3、用户名转为大写字母 * 4、用户名字母倒着排序 * 5、只输出一个用户名 */ public class Test { public static void main(String[] args) { User u1 = new User(1, "a", 21); User u2 = new User(2, "b", 22); User u3 = new User(3, "c", 23); User u4 = new User(4, "d", 24); User u5 = new User(5, "e", 25); //集合是存储 List<User> list= Arrays.asList(u1,u2,u3,u4,u5); //计算交给Stream流 list.stream()//下面相当于写条件 .filter(u->{ return u.getId()%2==0; }) .filter(u->{ return u.getAge()>23; }) .map(u->{ return u.getName().toUpperCase(); }) .sorted((o1,o2)->{ return o2.compareTo(o1); }) .limit(1) .forEach(System.out::println); } }
ForkJoin
在JDK中,提供了这样一种功能:它能够将复杂的逻辑拆分成一个个简单的逻辑来并行执行,待每个并行执行的逻辑执行完成后,再将各个结果进行汇总,得出最终的结果数据。有点像Hadoop中的MapReduce。
ForkJoin是由JDK1.7之后提供的多线程并发处理框架。ForkJoin框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值分解成多个计算,然后将各个计算结果进行汇总。相应的,ForkJoin将复杂的计算当做一个任务,而分解的多个计算则是当做一个个子任务来并行执行。
package org.example; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; /** * @author linghu * @date 2023/12/25 11:18 */ public class ForkJoinTest { private static final long SUM=20_0000_0000; public static void main(String[] args) throws ExecutionException, InterruptedException { test01(); test02(); test03(); } //使用普通方法 public static void test01(){ long start=System.currentTimeMillis(); long sum=0L; for (int i=1;i<SUM;i++){ sum+=i; } long end=System.currentTimeMillis(); System.out.println(sum); System.out.println("时间:"+(end-start)); System.out.println("============================"); } //使用ForkJoin方法 public static void test02() throws ExecutionException, InterruptedException { long start=System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L, SUM); ForkJoinTask<Long> submit = forkJoinPool.submit(task); Long aLong = submit.get(); System.out.println(aLong); Long end=System.currentTimeMillis(); System.out.println("时间:"+(end-start)); System.out.println("=========================="); } //使用Stream流计算 public static void test03(){ long start=System.currentTimeMillis(); long sum= LongStream.range(0L,2000000000L).parallel().reduce(0,Long::sum); System.out.println(sum); long end=System.currentTimeMillis(); System.out.println("时间:"+(end-start)); System.out.println("=========================="); } }
package org.example; import java.util.Locale; import java.util.concurrent.RecursiveTask; /** * @author linghu * @date 2023/12/25 10:59 */ public class ForkJoinDemo extends RecursiveTask<Long> { private long star; private long end; //临界值 private long temp=100000L; public ForkJoinDemo(long star, long end) { this.star = star; this.end = end; } //计算方法 @Override protected Long compute() { if ((end-star)<temp){ Long sum=0L; for (Long i=star;i<end;i++){ sum+=i; } return sum; }else { //使用ForkJoin分而治之计算 //1、计算平均值 long middle=(star+end)/2; ForkJoinDemo forkJoinDemo1 = new ForkJoinDemo(star, middle); //拆分任务,把线程压入线程队列 forkJoinDemo1.fork(); ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo(middle,end); forkJoinDemo2.fork(); Long taskSum=forkJoinDemo1.join()+forkJoinDemo2.join(); return taskSum; } } }
ForkJoin特点:工作窃取
工作窃取
实现原理是:双端队列!
异步回调
Java中异步回调执行和前端的 Ajax
其实是一样的。但是在Java中的话用的是 CompletableFuture
。
回调情况分为:
- 没有返回值的
runAsync
异步回调 - 有返回值的异步回调
supplyAsync
没有返回值的runAsync异步回调
对于异步回调,其实就是三个过程:
- 异步执行
- 成功回调
- 失败回调
代码如下:
package org.example; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** * @author linghu * @date 2023/12/25 16:11 * desc: 异步调用:CompleteableFuture * //异步执行 * //成功回调 * //失败回调 */ public class Demo01 { public static void main(String[] args) throws ExecutionException, InterruptedException { //发起一个请求void //没有返回值runAsync异步回调 CompletableFuture<Void> completableFuture=CompletableFuture.runAsync(()->{ try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"runAsync=>void"); }); System.out.println("1111111111111"); completableFuture.get();//获取执行结果 } }
总结:上面会先发起一个请求,请求休眠,同时打印语句,打印完毕以后回调get这个执行结果
有返回值的异步回调supplyAsync
whenComplete((t, u)
有两个参数,一个是t,一个是u:
- T是正常返回的结果
- U是抛出异常的错误信息
如果发生了异常,get可以获取 exceptionally((e)
返回的错误信息。
package org.example; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** * @author linghu * @date 2023/12/25 16:11 * desc: 异步调用:CompleteableFuture * //异步执行 * //成功回调 * //失败回调 */ public class Demo01 { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer"); int i=10/0; return 1024; }); System.out.println(completableFuture.whenComplete((t, u) -> { System.out.println("t=>" + t); System.out.println("u=>" + u); }).exceptionally((e) -> { System.out.println(e.getMessage()); return 233;//获取错误的返回结果 }).get()); } }
如上返回的结果:
JMM
JMM是一个概念,不是真实存在的,用来描述Java内存模型。
JMM是一种规范,目的是解决由于多线程通过共享内存进行通信时,存在的本地内存数据不一致、编译器会对代码指令重排序、处理器会对代码乱序执行等带来的问题。
关于JMM的一些同步约定
通过上图总结:
- 线程解锁前,必须把共享变量立刻撤回主存;
- 线程加锁前,必须读取主存中的最新值到工作内存中
- 加锁和解锁是同一把锁
- 线程中分为 工作内存和主内存
现在如果有两个线程都在对主存进行操作:
通过上图发现,当我们的线程B修改了主存值,并且读取到了工作内存,这个时候线程A不知道,怎么办呢?
于是引入了 volatile
。
volatile
定义
volatile
是Java提供的一种轻量级的同步机制。- Java 语言包含两种内在的同步机制:同步块(或方法)和
volatile
变量。相比于synchronized
(synchronized
通常称为重量级锁),volatile
更轻量级,因为它不会引起线程上下文的切换和调度。但是volatile
变量的同步性较差(有时它更简单并且开销更低),而且其使用也更容易出错。
1)、可见性
当多个线程访问同一个变量时,一个线程修改了这个变量的值,另外一个线程是可以看到修改的值。
package org.example; import java.util.concurrent.TimeUnit; /** * @author linghu * @date 2023/12/26 16:12 */ public class JMMDemo01 { //如果不加volatile程序会死循环 //加了volatile是可以保证可见性的 // private static Integer number=0; private volatile static Integer number=0; public static void main(String[] args) { //main线程 //子线程1 new Thread(()->{ while (number==0){ } }).start(); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //子线程2 new Thread(()->{ while (number==0){ } }).start(); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } number=1; System.out.println(number); } }
注:synchronized和lock都能保证可见性。
2)、不保证原子性
线程A在执行任务的时候,不能被打扰的,也不能被分割的,要么同时成功,要么同时失败。
package org.example; /** * @author linghu * @date 2023/12/26 16:44 */ public class VDemo02 { private static volatile int number=0; public synchronized static void add(){ number++; } public static void main(String[] args) { for (int i=1;i<=20;i++){ new Thread(()->{ for (int j=1;j<=1000;j++){ add(); } }).start(); } while (Thread.activeCount()>2){ Thread.yield(); } System.out.println(Thread.currentThread().getName()+",num="+number); } }
总结:模拟多线程环境下对共享变量的操作,并展示线程之间的竞争和协作。
使用Thread.yield()
方法让出CPU资源给其他线程
在这个地方,如果不加lock和synchronized,怎么保证原子性?
理论上,num的值应该为:2万。
package org.example; import java.util.concurrent.atomic.AtomicInteger; /** * @author linghu * @date 2023/12/26 16:44 */ public class VDemo02 { // private static volatile AtomicInteger number=new AtomicInteger(); private static volatile int number=0; public static void add(){ number++; // number.incrementAndGet();//底层是CAS保证的原子性,加1操作 } public static void main(String[] args) { for (int i=1;i<=20;i++){ new Thread(()->{ for (int j=1;j<=1000;j++){ add(); } }).start(); } while (Thread.activeCount()>2){ Thread.yield(); } System.out.println(Thread.currentThread().getName()+",num="+number); } }
解决方案
可以用原子类解决问题,比锁的效率高很多!
package org.example; import java.util.concurrent.atomic.AtomicInteger; /** * @author linghu * @date 2023/12/26 16:44 */ public class VDemo02 { private static volatile AtomicInteger number=new AtomicInteger(); // private static volatile int number=0; public static void add(){ // number++; number.incrementAndGet();//底层是CAS保证的原子性,加1操作 } public static void main(String[] args) { for (int i=1;i<=20;i++){ new Thread(()->{ for (int j=1;j<=1000;j++){ add(); } }).start(); } while (Thread.activeCount()>2){ Thread.yield(); } System.out.println(Thread.currentThread().getName()+",num="+number); } }
3)、禁止指令重排
计算机并不是按照我们自己写的那样去执行的。
指令重排一般分为以下三种
编译器优化
重新安排语句的执行顺序指令并行重排
利用指令级并行技术将多个指令并行执行,如果指令之前没有数据依赖,处理器可以改变对应机器指令的执行顺序内存系统重排
由于处理使用缓存和读写缓冲区,所以它们是乱序的
package org.example; import org.openjdk.jol.info.ClassLayout; /** * @author linghu * @date 2023/12/27 14:25 */ public class CountObjectSize { int a=10; int b=20; double c=30.0; public static void main(String[] args) { CountObjectSize object = new CountObjectSize(); System.out.println(ClassLayout.parseInstance(object).toPrintable()); } }
引入的pom:
<!--查看对象头工具--> <!-- https://mvnrepository.com/artifact/org.openjdk.jol/jol-core --> <dependency> <groupId>org.openjdk.jol</groupId> <artifactId>jol-core</artifactId> <version>0.16</version> </dependency>
总结:通过上面的代码,我们希望代码执行的顺序是:a-b-c;但实际的编译情况是,执行顺序是:a-c-b。
方案
如何禁止指令重排呢?
可以用 Volatile
关键字实现!Volatile
中会加一道内存的屏障,这个内存屏障可以保证在这个屏障中的指令顺序。
内存屏障:CPU指令
总结
- volatile可以保证可见性
- 不能保证原子性
- 由于内存屏障,可以保证避免指令重排的现象产生
单例模式
单例模式的文章我之前写过,文章如下:
1)、饿汉式
如下饿汉式单例代码:
package org.example; /** * @author linghu * @date 2023/12/28 9:41 * 饿汉式单例 */ public class Hungry { private byte[] data1=new byte[1024*1024]; private byte[] data2=new byte[1024*1024]; private byte[] data3=new byte[1024*1024]; private byte[] data4=new byte[1024*1024]; //私有化构造器 public Hungry() { } private final static Hungry HUNGRY=new Hungry(); public static Hungry getInstance(){ return HUNGRY; } }
2)、DCL懒汉式
package org.example; /** * @author linghu * @date 2023/12/27 17:23 * 懒汉式 */ public class LazyMan { //私有构造器 public LazyMan() { System.out.println(Thread.currentThread().getName()+"OK"); } private static LazyMan lazyMan; public static LazyMan getInstance(){ if (lazyMan==null){ lazyMan=new LazyMan(); } return lazyMan; } //多线程合并会有隐患! public static void main(String[] args) { for (int i=0;i<10;i++){ new Thread(()->{ lazyMan.getInstance(); }).start(); } } }
package org.example; /** * @author linghu * @date 2023/12/27 17:23 * 懒汉式 */ public class LazyMan { //私有构造器 public LazyMan() { System.out.println(Thread.currentThread().getName()+"OK"); } private static LazyMan lazyMan; //双重检测锁模式的 懒汉式单例 DCL懒汉式 public static LazyMan getInstance(){ if (lazyMan==null){ synchronized (LazyMan.class){ if (lazyMan==null){ lazyMan=new LazyMan(); } } } return lazyMan; } //多线程合并会有隐患! public static void main(String[] args) { for (int i=0;i<10;i++){ new Thread(()->{ lazyMan.getInstance(); }).start(); } } }
加volatile可以防止指令重排
private volatile static LazyMan lazyMan;
package org.example; /** * @author linghu * @date 2023/12/27 17:23 * 懒汉式 */ public class LazyMan { //私有构造器 public LazyMan() { System.out.println(Thread.currentThread().getName()+"OK"); } private volatile static LazyMan lazyMan; //双重检测锁模式的 懒汉式单例 DCL懒汉式 public static LazyMan getInstance(){ if (lazyMan==null){ synchronized (LazyMan.class){ if (lazyMan==null){ lazyMan=new LazyMan(); } } } return lazyMan; } //多线程合并会有隐患! public static void main(String[] args) { for (int i=0;i<10;i++){ new Thread(()->{ lazyMan.getInstance(); }).start(); } } }
深入理解CAS
CAS就是比较以前工作内存中的值 和 主存内存中的值,如果这个值是期望的,那么执行操作! 如果不是,就一直循环,使用的是 自旋锁。
package org.example; import java.util.concurrent.atomic.AtomicInteger; /** * @author linghu * @date 2023/12/29 9:54 */ public class casDemo { //CAS:compareAndSet public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(2020); //如果实际值 和 期望值相同,那么就更新 //如果实际值 和 期望值不同,那么就不更新 System.out.println(atomicInteger.compareAndSet(2020, 2021)); System.out.println(atomicInteger.get()); //因为期望值是2020,实际值却变成了2021 所以会修改失败! atomicInteger.getAndIncrement();//++操作 System.out.println(atomicInteger.compareAndSet(2020, 2021)); System.out.println(atomicInteger.get()); } }
总结:
- CAS 是CPU的并发原语。
这段代码演示了如何使用原子操作来安全地更新一个整数值,并展示了
compareAndSet
方法的工作原理。
缺点:
- 循环会耗时
- 一次性只能保证一个共享变量的原子性
- 它会存在ABA问题【解决方案:原子引用】