1. 常见设计模式与多把锁使用场景
1.1 同步模式之保护性暂停
即 Guarded Supension,用在一个线程等待另一个线程的执行结果
要点:
- 有一个结果需要仓一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK中,join的实现、Future的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
t0:需要结果response
t1:产生response结果
当response没有值,那么t0需要先等待;
当t1为response赋值成功后,会唤醒在GuardedObject中等待的线程
t0被唤醒后自然就获得了值
(topic = "test5") public class test5 { // 线程1等待线程2的结果,并以GuardObject对象为媒介进行传输 public static void main(String[] args) { GuardObject obj = new GuardObject(); new Thread(()->{ log.info("等待结果"); Object response = (Object) obj.getResponse(2000); System.out.println(response); },"t1").start(); new Thread(()->{ log.info("执行任务"); try { Thread.sleep(1000);// 模拟同步任务 obj.setResponse(new Object()); log.info("任务执行完毕"); } catch (InterruptedException e) { throw new RuntimeException(e); } },"t2").start(); } } class GuardObject{ private static final Logger log = LoggerFactory.getLogger(GuardObject.class); private Object response; public Object getResponse() throws InterruptedException { synchronized (this){ while (response == null){ this.wait(); } } return response; } public Object getResponse(long timeout) { synchronized (this){ // 开始时间 long start = System.currentTimeMillis(); // 经历时间 long passedTime = 0; while (response == null){ // 本轮循环应该等待的时间 long waitTime = timeout - passedTime;//假设timeout为2;第一轮:2-0=2 第二轮: 2-0.5=1.5 第三轮:2-2.1=-0.1 if (waitTime<=0){// 第一轮:2未超时,进入等待;第二轮:1.5未超时;第三轮:超时跳出 // 超时 log.error("超时异常!"); break; } // 第一轮:2秒;第二轮:1.5秒 try { log.info("准备开始等待"); this.wait(waitTime);// 避免虚假唤醒,不可以直接填入timeout log.info("等待结束"); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("11"); } // 获得本轮经历的时间 passedTime = System.currentTimeMillis() - start; // 第一轮:0.5 第二轮:2.1 } } return response; } public void setResponse(Object response) { synchronized (this){ this.response = response; this.notifyAll(); } } }
1.1.1 join原理
public final void join() throws InterruptedException { join(0); } public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
1.1.2 扩展
图中Futures就好比居民楼一层的信箱(每个信箱都有房间标号)
- 左侧的t0、t2、t4就好比等待邮件的居民
- 右侧的t1、t3、t5就好比邮递员
如果需要在多个类之间使用GuardedObject对象,作为参数传递不是很方便,因此涉及一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
解耦的中间类是什么意思?
分层关系:
- 调用者(Postman、People):通过解耦层Mailboxes类对执行者GuardObject进行使用
- 指挥者————解耦层(Mailboxes):接收调用者的命令来对执行者GuardObject类完成任务的使用并返回结果
- 执行者(GuardObject类):真正实现干活的人
package com.renex.c4; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; import java.util.Hashtable; import java.util.Map; import java.util.Set; /** * 保护性暂停 */ (topic = "test6") public class test6 { public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { new People().start(); } Thread.sleep(1000); for (Integer id : Mailboxes.getIds()) { new Postman(id,"内容"+id).start(); } } } ////////////////////////////////// 调用/通过中间层达到某种目的 //////////////////////////// (topic = "People") class People extends Thread{ public void run() { GuardObject content = Mailboxes.createGuardObject(); // 收信中 log.info("开始收信:{}",content.getId()); Object mail = content.getResponse(5000); log.info("收到信了:{},内容是:{}",content.getId(),mail); } } ////////////////////////////////// 调用/通过中间层达到某种目的 //////////////////////////// (topic = "Postman") class Postman extends Thread{ private int id; private String mail; public Postman(Integer id, String s) { this.id = id; this.mail = s; } public void run() { GuardObject guardObject = Mailboxes.getGuardObject(id); log.info("送信 id:{},内容:{}",id,mail); guardObject.setResponse(mail); } } ////////////////////////////////// 承上启下;一个中间层 /////////////////////////////// class Mailboxes{ private static Map<Integer,GuardObject> boxes = new Hashtable<>(); private static int id = 1; // 产生唯一id private static synchronized int createId(){ return id++; } public static GuardObject createGuardObject(){ GuardObject go = new GuardObject(createId()); boxes.put(go.getId(),go); return go; } public static GuardObject getGuardObject(int id){ return boxes.remove(id); } public static Set<Integer> getIds(){ return boxes.keySet(); } } ////////////////////////////////// 真正实现功能的人 /////////////////////////////// (topic = "GuardObject") class GuardObject { private Object response; private int id; public GuardObject() { } public GuardObject(int id) { this.id = id; } public Object getResponse() throws InterruptedException { synchronized (this) { while (response == null) { this.wait(); } } return response; } public Object getResponse(long timeout) { synchronized (this) { // 开始时间 long start = System.currentTimeMillis(); // 经历时间 long passedTime = 0; while (response == null) { // 本轮循环应该等待的时间 long waitTime = timeout - passedTime;//假设timeout为2;第一轮:2-0=2 第二轮: 2-0.5=1.5 第三轮:2-2.1=-0.1 if (waitTime <= 0) {// 第一轮:2未超时,进入等待;第二轮:1.5未超时;第三轮:超时跳出 // 超时 log.error("超时异常!"); break; } // 第一轮:2秒;第二轮:1.5秒 try { log.info("准备开始等待"); this.wait(waitTime);// 避免虚假唤醒,不可以直接填入timeout log.info("等待结束"); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("11"); } // 获得本轮经历的时间 passedTime = System.currentTimeMillis() - start; // 第一轮:0.5 第二轮:2.1 } } return response; } public void setResponse(Object response) { synchronized (this) { this.response = response; this.notifyAll(); } } public Integer getId() { return this.id; } } ///////////////////// 输出结果: [Thread-1] INFO People - 开始收信:3 [Thread-0] INFO People - 开始收信:1 [Thread-0] INFO GuardObject - 准备开始等待 [Thread-2] INFO People - 开始收信:2 [Thread-1] INFO GuardObject - 准备开始等待 [Thread-2] INFO GuardObject - 准备开始等待 [Thread-3] INFO Postman - 送信 id:3,内容:内容3 [Thread-5] INFO Postman - 送信 id:1,内容:内容1 [Thread-1] INFO GuardObject - 等待结束 [Thread-0] INFO GuardObject - 等待结束 [Thread-1] INFO People - 收到信了:3,内容是:内容3 [Thread-0] INFO People - 收到信了:1,内容是:内容1 [Thread-4] INFO Postman - 送信 id:2,内容:内容2 [Thread-2] INFO GuardObject - 等待结束 [Thread-2] INFO People - 收到信了:2,内容是:内容2
1.2 生产者/消费者
很熟悉是吗?这就是MQ消息队列的概念!
1.2.1 定义
与前面的保护性暂停中的GuardObject不同,不需要产生结果和消费结果的线程一一对应
消费队列可以用来平衡生产和消费的线程资源
生产者仅负责生产结果数据,不关心数据该如何处理,而消费者专心处理结果数据
消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
JDK中的各种阻塞队列,采用的就是这种模式
因为是阻塞的,所以在队列中若是未被消费的信息会一直等待下去(死信队列);不过这种会有解决的办法,可以设置一个最大延迟时间
1.2.2 实现
package com.renex.c4; import lombok.extern.slf4j.Slf4j; (topic = "test7") public class test7 { public static void main(String[] args) { MessageQueue messageQueue = new MessageQueue(2); for (int i = 0; i < 3; i++) { int id =i; new Thread(()->{ messageQueue.putMsg(new Message(id,("张三在吃"+id+"顿饭"))); },"生产者"+i+"号").start(); } new Thread(()->{ while (true){ try { Thread.sleep(100); Message msg = messageQueue.getMsg(); } catch (InterruptedException e) { throw new RuntimeException(e); } } },"消费者").start(); } } /** * 消息队列类 */ (topic = "MessageQueue") public class MessageQueue { // 消息的队列集合 private LinkedList<Message> queue = new LinkedList<>(); // 队列容量 private int capcity; public MessageQueue(int capcity) { this.capcity = capcity; } /** * 获取消息 * @return */ public Message getMsg() { synchronized (queue){ while (queue.isEmpty()){ try { log.info("队列为空!消费者线程等待消息的产生"); queue.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } // 从队列的头部获取消息 Message message = queue.removeFirst(); log.info("已消费消息:{}",message); queue.notifyAll(); return message; } } /** * 存入消息 * @param message */ public void putMsg(Message message) { synchronized (queue){ while (queue.size() == capcity){ try { log.info("队列已满,生产者线程等待消费者进行完成消费操作"); queue.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } queue.addLast(message); log.info("已生产消息:{}",message); // 唤醒所有正在等待的消费者;已经存在消息了 queue.notifyAll(); } } } final class Message{ private int id; private String msg; } ////////////////////////////////////////// [生产者1号] INFO MessageQueue - 已生产消息:Message(id=1, msg=张三在吃1顿饭) [生产者2号] INFO MessageQueue - 已生产消息:Message(id=2, msg=张三在吃2顿饭) [生产者0号] INFO MessageQueue - 队列已满,生产者线程等待消费者进行完成消费操作 [消费者] INFO MessageQueue - 已消费消息:Message(id=1, msg=张三在吃1顿饭) [生产者0号] INFO MessageQueue - 已生产消息:Message(id=0, msg=张三在吃0顿饭) [消费者] INFO MessageQueue - 已消费消息:Message(id=2, msg=张三在吃2顿饭) [消费者] INFO MessageQueue - 已消费消息:Message(id=0, msg=张三在吃0顿饭) [消费者] INFO MessageQueue - 队列为空!消费者线程等待消息的产生
- 输出分析:
- 拟定了3个生产者,这个队列的容量设置为了2;
- 当生产者生产了2个消息后,
生产者0号再次生成消息就会发现队列已满,然后将消费者唤醒; - 消费者醒来后开始消费消息;当消费后检测到队列已空出一个空位;
这时就会唤醒生产者0号将消息放入队列中 - 然后生产者已经全部生产完了消息这时再度唤醒消费者来消费消息
1.3 Park & Unpark
1.3.1 基本使用
- 它们都是LockSupport类中的方法
public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } public native void unpark(Object thread); public static void park() { UNSAFE.park(false, 0L); } public native void park(boolean isAbsolute, long time);
// 暂停当前线程 LockSupport.park(); // 恢复某个线程的运行 LockSupport.unpark(线程对象);
小例子:
public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { log.info("start...."); try { Thread.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } log.info("park...."); LockSupport.park(); log.info("resume...."); }, "t1"); t1.start(); Thread.sleep(2); log.info("unpark...."); LockSupport.unpark(t1); } ////////////////////////////////////////////// [t1] INFO test1 - start.... [t1] INFO test1 - park.... [main] INFO test1 - unpark.... [t1] INFO test1 - resume....
1.3.2 特点
与Object的wait & notify相比
- wait、notify、notifyAll必须配合ObjectMonitor一起使用,而park、unpark不必
- park、unpark是以线程为单位来【阻塞】和【唤醒】线程
而notify只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么【精确】 - park、unpark可以先unpark,而wait、notify不能先notify
1.4 重新理解线程状态转换
1.4.1 New ——> RUNNABLE
- 当调用 t.start() 方法时,那么就会出现:New ——> RUNNABLE
1.4.2 RUNNABLE <——> WAITING
t 线程用 synchronized(obj) 获取了对象锁后
- 调用obj.wait()方法时,t线程从RUNNABLE --> WAITING
- 调用obj.notify(),obj.notifyAll(),t.interrupt()时
- 竞争锁成功,t线程从
WAITING --> RUNNABLE - 竞争锁失败,t线程从 WAITING --> BLOCKED
1.4.3 RUNNABLE <——> WAITING
- 当前线程
调用t.join()方法时,当前线程从 RUNNABLE --> WAITING
- 注意是当前线程在t线程对象的监视器上等待
- t线程运行结束,或调用了**当前线程的interrupt()**时,当前线程从 WAITING --> RUNNABLE
1.4.4 RUNNABLE <——> WAITING
- 当前线程调用LockSupport.park()方法会让当前线程从RUNNABLE --> WAITING
- 调用LockSupport.unpark(目标线程)或调用了线程的interrupt(),会让目标线程从WAITING --> RUNNABLE
1.4.5 RUNNABLE <——> TIMED_WAITING
t 线程用 synchronized(obj) 获取了对象锁后
- 调用obj.wait(long n)方法时,t线程从RUNNABLE --> TIMED_WAITING
- t线程等待时间超过了n毫秒,或调用obj.notify(),obj.notifyAll(),t.interrupt()时
- 竞争锁成功,t线程从
TIMED_WAITING--> RUNNABLE - 竞争锁失败,t线程从 TIMED_WAITING–> BLOCKED
1.4.6 RUNNABLE <——> TIMED_WAITING
- 当前线程
调用t.join(long n)方法时,当前线程从 RUNNABLE --> TIMED_WAITING
- 注意是当前线程在t线程对象的监视器上等待
- 当前线程等待时间超过了n毫秒,或t线程运行结束,或调用了当前先从dinterrupt()时,当前线程从 TIMED_WAITING–> RUNNABLE
1.4.7 RUNNABLE <——> TIMED_WAITING
- 当前线程
调用Thread.sleep(long n),当前线程从 RUNNABLE ——> TIMED_WAITING - 当前线程等待时间超过n毫秒,当前线程从 TIMED_WAITING ——> RUNNABLE
1.4.8 RUNNABLE <——> TIMED_WAITING
- 当前线程调用
LockSupport.parkNanos(long nanos)或LockSupport.parkUntil(long millis)时,当前线程从RUNNABLE ——> TIMED_WAITING - 调用
LockSupport.unpark(目标线程)或调用了线程的interrupt(),或是等待超时,会让目标线程从TIMED_WAITING --> RUNNABLE
1.4.9 RUNNABLE <——> BLOCKED
- t线程用
synchronized(obj)获取了对象锁时,如果竞争失败,从 RUNNABLE --> BLOCKED - 持obj锁对象的同步代码块执行完毕,会唤醒该对象上所有BLOCKED的线程重新竞争
- t线程竞争成功,从 BLOCKED --> RUNNABLE,其他失败的线程仍然是 BLOCKED
1.4.10 RUNNABLE <——> TERMINATED
当前线程所有代码运行完毕,进入TERMINATED
1.5 多把锁
1.5.1 多把不相干的锁
通常当sychronized锁住了一把锁的时候,并发效率是很低的,因为需要等待持锁线程将锁释放出来后才会发生锁的更替;在更替过程中,若出现多个线程,还会发生锁的竞争
而可以使用两把锁进行提高并发的效果,但这种情况也更容易发生死锁效应
public class test2 { private static Object lock = new Object(); private static Object lock2 = new Object(); public static void main(String[] args) { new Thread(()->{ synchronized (lock){ System.out.println("tt"); } }).start(); new Thread(()->{ synchronized (lock2){ System.out.println("dd"); } }).start(); } }
1.5.2 活跃性
- 当存在某种情况导致你的代码无法正常被终止的时候可以说活跃性已经无了
1.5.2.1 死锁
有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁
- t1线程获得A对象锁,接下来想获取B对象的锁
- t2线程获得B对象锁,接下来想获取A对象的锁
例子:
private static Object lock = new Object(); private static Object lock2 = new Object(); public static void main(String[] args) { Thread t1 = new Thread(() -> { synchronized (lock) { log.info("lock A"); try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } synchronized (lock2) { log.info("lock B"); log.info("A操作"); } } }, "t1"); Thread t2 = new Thread(() -> { synchronized (lock2) { log.info("lock B"); try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } synchronized (lock) { log.info("lock A"); log.info("B操作"); } } }, "t2"); t1.start(); t2.start(); } /////////////////////////////////////// [t1] INFO test3 - lock A [t2] INFO test3 - lock B
死锁现象:
当t1线程需要获取t2线程的锁时,它本身也是有把lock锁在身上,而t2线程还持有lock2锁,所以t1线程就会等待t2线程将锁释放。而t2线程开始运行时,也去获取t1线程持有lock锁,那么这时t2线程也在等待t1线程将锁释放。
这样两个线程就一直在等对方线程将锁释放掉,就造成了死锁现象,程序无法终止
1.5.2.1.1 定位死锁
- jstack
C:\Users\renex>jps 12240 17940 test3 9204 Launcher 10300 Jps C:\Users\renex>jstack 17940 2024-11-18 17:13:27 Full thread dump OpenJDK 64-Bit Server VM (25.422-b05 mixed mode): "DestroyJavaVM" #22 prio=5 os_prio=0 tid=0x000001fe99054800 nid=0x26b8 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "t2" #21 prio=5 os_prio=0 tid=0x000001fec0c79000 nid=0x7468 waiting for monitor entry [0x0000000954aff000] java.lang.Thread.State: BLOCKED (on object monitor) at com.renex.c5.test3.lambda$main$1(test3.java:35) - waiting to lock <0x0000000716c5d300> (a java.lang.Object) - locked <0x0000000716c5d310> (a java.lang.Object) at com.renex.c5.test3$$Lambda$2/2094548358.run(Unknown Source) at java.lang.Thread.run(Thread.java:750) "t1" #20 prio=5 os_prio=0 tid=0x000001fec0e8b800 nid=0x1098 waiting for monitor entry [0x00000009549ff000] java.lang.Thread.State: BLOCKED (on object monitor) at com.renex.c5.test3.lambda$main$0(test3.java:20) - waiting to lock <0x0000000716c5d310> (a java.lang.Object) - locked <0x0000000716c5d300> (a java.lang.Object) at com.renex.c5.test3$$Lambda$1/500977346.run(Unknown Source) at java.lang.Thread.run(Thread.java:750) # 省略 ....... Found one Java-level deadlock: ============================= "t2": waiting to lock monitor 0x000001febd3cfed8 (object 0x0000000716c5d300, a java.lang.Object), which is held by "t1" "t1": waiting to lock monitor 0x000001febd3d2768 (object 0x0000000716c5d310, a java.lang.Object), which is held by "t2" Java stack information for the threads listed above: =================================================== "t2": at com.renex.c5.test3.lambda$main$1(test3.java:35) - waiting to lock <0x0000000716c5d300> (a java.lang.Object) - locked <0x0000000716c5d310> (a java.lang.Object) at com.renex.c5.test3$$Lambda$2/2094548358.run(Unknown Source) at java.lang.Thread.run(Thread.java:750) "t1": at com.renex.c5.test3.lambda$main$0(test3.java:20) - waiting to lock <0x0000000716c5d310> (a java.lang.Object) - locked <0x0000000716c5d300> (a java.lang.Object) at com.renex.c5.test3$$Lambda$1/500977346.run(Unknown Source) at java.lang.Thread.run(Thread.java:750) Found 1 deadlock.
Found one Java-level deadlock:
就代表jstack找到了一个java级别的死锁
它给你列举出来了是哪个线程包括在源码中的哪个类中
- jconsole
jconsole
可以以图形的表示,把连接的类中的死锁现象给检测出来
1.5.2.2 活锁
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如:
private static volatile int count = 10; private static Object lock = new Object(); public static void main(String[] args) { new Thread(()->{ while (count > 0) { try { Thread.sleep(2); } catch (InterruptedException e) { throw new RuntimeException(e); } count--; log.info("count:{}",count); } }).start(); new Thread(()->{ while (count < 20) { try { Thread.sleep(2); } catch (InterruptedException e) { throw new RuntimeException(e); } count++; log.info("count:{}",count); } }).start(); }
这段代码,执行到后面会
因为并发机制问题渐渐自减到小于0的数然后结束循环。不过大体来说并没有出错,因为已经严重超出了我们预想的运行时间
如何解决?
- 可以延缓一个线程的睡眠时间,让它们
交错执行
1.5.2.3 饥饿
- 一个线程由于优先级太低,始终得不到CPU调度分配时间片执行,也不能够结束;
饥饿的情况不容易演示,但是实现读写锁时会涉及到饥饿问题
1.5.3 ReentrantLock(可重入锁)
相对于synchronized它具备如下特点
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
与synchronized一样,都支持可重入
// 获得锁 reentrantLock.lock(); try{ // 临界区开始 }catch(){ // 临界区结束 }finally{ // 释放锁 reentrantLock.unlock(); }
可重入性是什么意思?
重入一般可以理解为一个函数在同时多次调用
例如操作系统在进程调度过程中,或者单片机、处理器等的中断的时候会发生重入的现象。
可重入的函数必须满足以下三个条件:
- 可以在执行的过程中可以被打断;
- 被打断之后,在该函数一次调用执行完之前,可以再次被调用(或进入,reentered)。
- 再次调用执行完之后,被打断的上次调用可以继续恢复执行,并正确执行。
线程安全与重入性关系总结如下:
- 一个线程安全的函数不一定是可重入的;
- 一个可重入的函数缺也不一定是线程安全的!
- 一个不可重入的函数一定不是线程安全的。这是收到多线程并发环境的限制,但可重入本身与线程安全无必然联系。
1.5.3.1 可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权力再次获取这把锁,如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
/** * 可重入锁 */ @Slf4j(topic = "E1L") public class E1L { private static ReentrantLock reentrantLock = new ReentrantLock(); public static void main(String[] args) { reentrantLock.lock(); try { log.info("enter main"); m1(); }finally { reentrantLock.unlock(); } } public static void m1(){ reentrantLock.lock(); try { log.info("enter m1"); m2(); }finally { reentrantLock.unlock(); } } public static void m2(){ reentrantLock.lock(); try { log.info("enter m2"); }finally { reentrantLock.unlock(); } } } ////////////////////////////////// [main] INFO E1L - enter main [main] INFO E1L - enter m1 [main] INFO E1L - enter m2
1.5.3.2 可打断
.lockInterruptibly() 和 .lock() 两者的区别:
- .lockInterruptibly()
- 如果没有其他线程进行竞争锁,那么此方法就会为当前线程拉取一个lock对象锁
- 如果有其他线程进行竞争锁,那么该方法就进入阻塞队列进行等待
在阻塞队列中,可以被其他线程用.interrupt()打断
.lock()本身有可重入性,也正是由于可重入性,导致无法被打断
- 会直接获得锁,它只考虑锁的持有者重复进入临界区
/** * 可重入锁 */ @Slf4j(topic = "E2L") public class E2L { private static ReentrantLock reentrantLock = new ReentrantLock(); public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { try { log.info("t1尝试获得锁"); /** * 如果没有竞争那么此方法就会获得lock对象锁 * 如果有竞争就进入阻塞队列,可以被其他线程用interrupt方法打断 */ reentrantLock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); log.error("先线程被打断,没有获得锁"); return; } try{ log.info("获得到锁"); }finally { reentrantLock.unlock(); } }, "t1"); reentrantLock.lock(); t1.start(); Thread.sleep(1000); log.info("打断 t1"); t1.interrupt(); } }
1.5.3.3 锁超时
/** * 可重入锁 */ @Slf4j(topic = "E3L") public class E3L { private static ReentrantLock reentrantLock = new ReentrantLock(); public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { log.info("尝试获得锁"); try { if (!reentrantLock.tryLock(2, TimeUnit.SECONDS)){ log.error("获取不到锁"); return; } } catch (InterruptedException e) { e.printStackTrace(); log.error("t1在等待锁的过程中被打断"); return; } try { log.info("获得到锁"); }finally { reentrantLock.unlock(); } },"t1"); reentrantLock.lock(); log.info("main 获得锁"); Thread.sleep(1); reentrantLock.unlock(); log.info("main 释放了锁"); t1.start(); } } //////////////////////////// [main] INFO E3L - main 获得锁 [main] INFO E3L - main 释放了锁 [t1] INFO E3L - 尝试获得锁 [t1] INFO E3L - 获得到锁
tryLock()两类方法:尝试获得锁
public boolean tryLock(long timeout, TimeUnit unit){} public boolean tryLock() {}
tryLock():尝试获得锁,如果获得了锁返回true,反之返回false
tryLock(long timeout, TimeUnit unit):尝试获得锁,获得了锁返回true,反之返回false
- 参数一:最大等待时间
- 参数二:时间的单位(秒、分…)
方法设置最大等待时间底层的代码,其实由LockSupport.parkNanos()实现
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } // 尝试获得锁 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 判断,如果被打断抛出打断异常 if (Thread.interrupted()) throw new InterruptedException(); // 尝试两种方式获得锁,两个方法其中一个方法获得锁就可以了 // 第一个:尝试独占模式获得锁, // 第二个:设置最大等待时间,在等待时间内获得锁返回true return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } // 尝试以独占模式获得锁 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // 进行锁的获取和设定最大等待时间 private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 进行重复获取操作 for (;;) { final Node p = node.predecessor(); // 当当前线程节点处于阻塞队列中 并且 以独占模式获取锁成功了! if (p == head && tryAcquire(arg)) { setHead(node);// 设置队列头 p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); // 防止虚假唤醒 if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) // 通过parkNanos设置最大等待时间; // 如果超过时间,则会将锁释放掉 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
由于使用
parkNanos()方法,所以超出等待时间还未获得锁,那么这时候它就会将锁释放掉(parkNanos()的效果)
1.5.3.4 公平锁
reentrantLock默认是非公平锁
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
而设置成公平锁,其实也没多大用。
- 主要是解决饥饿问题,但缺点是降低并发速度
如有其中一种情况:
- 当线程1任务量少,线程2任务量多(持锁),线程2将锁释放后,线程1拿到锁后很快将任务做完。这时线程2再次拿到锁。
而这期间,锁切换的过程太快了,没有必要这么公平分配锁。当线程一多切换次数上升,就会影响并发速度
1.5.3.5 条件变量
synchronized中也有条件变量;例如:waitSet队列,当条件不满足时进入 waitSet等待
ReentrantLock的条件变量比synchronized强大之处在于:它支持多个条件变量
- synchronized是那些不满足条件的线程都在一间休息室等待消息
- 而ReentrantLock支持多间休息室,有专门等烟的休息室、专门等烟的休息室…而唤醒时也是按照进入休息室的顺序来唤醒
使用流程:
- await前需要获得锁
- await执行后,会释放锁,进入conditionObject等待
- awaity的线程被唤醒(或打断、或超时)去重新竞争lock锁
- 竞争lock锁成功后,从await后继续执行
1.6 顺序控制
1.6.1 固定运行顺序
简单说:控制几个线程的执行顺序
1.6.1.1 wait() & notify()
static final Object lock = new Object(); private static final Logger log = LoggerFactory.getLogger(test1.class); static boolean firstRunner = false; public static void main(String[] args) { Thread t1 = new Thread(() -> { synchronized (lock) { // 如果有其他线程运行,那么这里进行无限制等待 while (!firstRunner) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("t1"); } }, "t1"); ///////////////////// Thread t2 = new Thread(() -> { synchronized (lock) { log.info("t2"); // 代表t2线程先运行了 firstRunner = true; lock.notify(); } }, "t2"); t1.start(); t2.start(); }
1.6.1.2 reetrantLock 方式
static final ReentrantLock reentrantLock = new ReentrantLock(); static boolean firstRunner = false; public static void main(String[] args) { Condition c1 = reentrantLock.newCondition(); Thread t1 = new Thread(() -> { reentrantLock.lock(); try{ // 如果有其他线程运行,那么这里进行无限制等待 while (!firstRunner) { try { c1.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("t1"); }finally { reentrantLock.unlock(); } }, "t1"); ///////////////////// Thread t2 = new Thread(() -> { reentrantLock.lock(); try{ // 临界区开始 log.info("t2"); // 代表t2线程先运行了 firstRunner = true; c1.signal(); }finally{ // 临界区结束 // 释放锁 reentrantLock.unlock(); } }, "t2"); t1.start(); t2.start(); }
1.6.1.3 park方式
public static void main(String[] args) { Thread t1 = new Thread(() -> { LockSupport.park(); log.info("t1"); }, "t1"); ///////////////////// Thread t2 = new Thread(() -> { log.info("t2"); LockSupport.unpark(t1); }, "t2"); t1.start(); t2.start(); }