概述
我们知道synchronized的机制有一个很重要的特点是:使用synchronized, 当一个线程获取了锁,其他线程只能一直等待,等待这个获取锁的线程释放锁,如果这个线程执行时间很长,其他线程就需要一直等待 。 除非获取锁的线程执行完了该代码块,释放锁或者线程执行发生异常,JVM会使线程自动释放锁。
当然了J.U.C包中 Doug Lea大神已经设计了非常完美的解决方案,我们这里不讨论J.U.C的实现。
我们自己实现一套的话,该如何实现呢? 有几点需要思考
- 原有的synchronized功能,必须保证,即一个线程拿到锁后,其他线程必须等待
- 谁加的锁,必须由谁来释放
- 加入超时功能
- …
好了,开始吧
步骤
自定义超时异常处理类
既然要设计带超时功能的锁, 少不了当超时时,抛出异常,以便上层捕获处理。
public class TimeOutException extends RuntimeException { public TimeOutException(String message){ super(message); } }
ILock接口
约定几个接口方法: lock 、lock(long timeout)、unlock、getBlockedThread、getBlockedSize 详见代码注释
package com.artisan.customLock; import java.util.Collection; public interface ILock { /** * 加锁 */ void lock() throws InterruptedException; /** * 加锁 * @param timeout 持有锁的时间,过了该时间(毫秒) 自动释放该锁 */ void lock(long timeout) throws InterruptedException,TimeOutException; /** * 释放锁 */ void unlock(); /** * 用于观察 有哪些线程因没有获取到锁被blocked * @return */ Collection<Thread> getBlockedThreads(); /** * 被blocked的线程数量 * @return */ int getBlockedSize(); }
实现类
详见代码注释。 加锁和释放锁方法 使用 synchronized 修饰,否则使用wait && notifyAll抛出异常
import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Optional; public class CustomLock implements ILock { // 默认false // true: 已经被线程抢到 false: 空闲 private boolean lockFlag; // 用于存储被blocked的线程,方便查看及计算被blocked的线程数量 Collection<Thread> blockedThreadCollection = new ArrayList<>(); /** * 构造函数中初始化该lockFlag */ public CustomLock(){ this.lockFlag = false; } /** * synchronized 修饰该方法 * @throws InterruptedException */ @Override public synchronized void lock() throws InterruptedException { // 如果其他线程已经获取到了锁,让该线程wait while(lockFlag){ // 加入到blockedThreadCollection blockedThreadCollection.add(Thread.currentThread()); // wait this.wait(); } // 如果空闲,将该monitor置为true blockedThreadCollection.remove(Thread.currentThread()); lockFlag = true; } @Override public void lock(long timeout) throws InterruptedException, TimeOutException { } @Override public synchronized void unlock() { // 如果是加锁的线程 // 将Monitor置为空闲 this.lockFlag = false; Optional.of(Thread.currentThread().getName() + " 释放lock").ifPresent(System.out::println); // 唤醒其他正在等待的线程 this.notifyAll(); } @Override public Collection<Thread> getBlockedThreads() { // blockedThreadCollection 可能被其他线程add 或者remove,这里定义为不可变的集合类型 return Collections.unmodifiableCollection(blockedThreadCollection); } @Override public int getBlockedSize() { return blockedThreadCollection.size(); } }
测试
package com.artisan.customLock; import java.time.LocalTime; import java.util.Optional; import java.util.stream.Stream; public class CustomLockTest { public static void main(String[] args) throws InterruptedException { CustomLock customLock = new CustomLock(); // 开启5个线程 Stream.of("T1", "T2", "T3", "T4", "T5") .forEach(name -> new Thread(() -> { // 加锁 处理业务 try { // 加锁 customLock.lock(); Optional.of(Thread.currentThread().getName() + " hold the Monitor") .ifPresent(System.out::println); // 调用业务 work(); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 在finally中释放锁 customLock.unlock(); } }, name).start()); } /** * 模拟线程的业务逻辑 * * @throws InterruptedException */ public static void work() throws InterruptedException { Optional.of(Thread.currentThread().getName() + " begin to work " + LocalTime.now().withNano(0)).ifPresent(System.out::println); Thread.sleep(3_000); } }
日志输出:
"E:\Program Files\Java\jdk1.8.0_161\bin\java" "-javaagent:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\lib\idea_rt.jar=53159:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\bin" -Dfile.encoding=UTF-8 -classpath "E:\Program Files\Java\jdk1.8.0_161\jre\lib\charsets.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\deploy.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\access-bridge-64.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\cldrdata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\dnsns.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jaccess.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jfxrt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\localedata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\nashorn.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunec.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunjce_provider.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunmscapi.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunpkcs11.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\zipfs.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\javaws.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jce.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfr.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfxswt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jsse.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\management-agent.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\plugin.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\resources.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\rt.jar;D:\IdeaProjects\mvc\target\classes" com.artisan.customLock.CustomLockTest T1 hold the Monitor T1 begin to work 22:19:14 T1 释放lock T5 hold the Monitor T5 begin to work 22:19:17 T5 释放lock T2 hold the Monitor T2 begin to work 22:19:20 T2 释放lock T4 hold the Monitor T4 begin to work 22:19:23 T4 释放lock T3 hold the Monitor T3 begin to work 22:19:26 T3 释放lock Process finished with exit code 0
可以看到 确实是一个线程拿到锁后,其他线程必须等待 。
针对第二点呢: 谁加的锁,必须由谁来释放 .
我们来测试下
存在的问题
针对第二点呢: 谁加的锁,必须由谁来释放 .
我们来测试下 : 假设我们在main线程中调用了unlock方法
重新运行测试,观察日志
T1 hold the Monitor T1 begin to work 22:24:41 main 释放lock T5 hold the Monitor T5 begin to work 22:24:41 T1 释放lock T2 hold the Monitor T2 begin to work 22:24:44 T5 释放lock T4 hold the Monitor T4 begin to work 22:24:44 T2 释放lock T3 hold the Monitor T3 begin to work 22:24:47 T4 释放lock T3 释放lock Process finished with exit code 0
T1拿到锁还没有工作完,就被主线程释放了,结果T5又抢到了… 很明显不对了 。
修复存在的问题
见代码
再次运行测试 ,OK
超时功能
@Override public synchronized void lock(long timeout) throws InterruptedException, TimeOutException { // 入参不合理,直接调用lock ,也可抛出异常 if (timeout <= 0 ) lock(); // 线程等待的剩余时间 long leftTime = timeout; // 计算结束时间 long endTime = System.currentTimeMillis() + timeout; while(lockFlag){ // 如果超时了,抛出异常 if (leftTime <= 0){ throw new TimeOutException(Thread.currentThread().getName() + " 超时..."); } // 加入到blockedThreadCollection blockedThreadCollection.add(Thread.currentThread()); // wait 指定的时间 this.wait(timeout); // 计算是否超时 leftTime = endTime - System.currentTimeMillis(); } // 如果空闲,将该monitor置为true blockedThreadCollection.remove(Thread.currentThread()); this.lockFlag = true; // 将当前线程置为lockHolderThread this.lockHolderThread = Thread.currentThread(); }
测试超时功能
package com.artisan.customLock; import java.time.LocalTime; import java.util.Optional; import java.util.stream.Stream; public class CustomLockTest { public static void main(String[] args) { CustomLock customLock = new CustomLock(); // 开启5个线程 Stream.of("T1", "T2", "T3", "T4", "T5") .forEach(name -> new Thread(() -> { // 加锁 处理业务 try { // 加锁 最多等待100毫秒,如果100ms,没抢到则中断执行 customLock.lock(100); Optional.of(Thread.currentThread().getName() + " hold the Monitor") .ifPresent(System.out::println); // 调用业务 work(); } catch (InterruptedException e) { e.printStackTrace(); } catch (TimeOutException e){ Optional.of(Thread.currentThread().getName() + " timeOut") .ifPresent(System.out::println); }finally { // 在finally中释放锁 customLock.unlock(); } }, name).start()); } /** * 模拟线程的业务逻辑 * * @throws InterruptedException */ public static void work() throws InterruptedException { Optional.of(Thread.currentThread().getName() + " begin to work " + LocalTime.now().withNano(0)).ifPresent(System.out::println); Thread.sleep(3_000); } }
运行结果:
OK。
CustomLock
package com.artisan.customLock; import java.time.LocalTime; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Optional; public class CustomLock implements ILock { // 默认false // true: 已经被线程抢到 false: 空闲 private boolean lockFlag; // 用于存储被blocked的线程,方便查看及计算被blocked的线程数量 Collection<Thread> blockedThreadCollection = new ArrayList<>(); // 当前持有锁的线程 Thread lockHolderThread ; /** * 构造函数中初始化该lockFlag */ public CustomLock(){ this.lockFlag = false; } /** * synchronized 修饰该方法 * @throws InterruptedException */ @Override public synchronized void lock() throws InterruptedException { // 如果其他线程已经获取到了锁,让该线程wait while(lockFlag){ // 加入到blockedThreadCollection blockedThreadCollection.add(Thread.currentThread()); // wait this.wait(); } // 如果空闲,将该monitor置为true blockedThreadCollection.remove(Thread.currentThread()); this.lockFlag = true; // 将当前线程置为lockHolderThread this.lockHolderThread = Thread.currentThread(); } @Override public synchronized void lock(long timeout) throws InterruptedException, TimeOutException { // 入参不合理,直接调用lock ,也可抛出异常 if (timeout <= 0 ) lock(); // 线程等待的剩余时间 long leftTime = timeout; // 计算结束时间 long endTime = System.currentTimeMillis() + timeout; while(lockFlag){ // 如果超时了,抛出异常 if (leftTime <= 0){ throw new TimeOutException(Thread.currentThread().getName() + " 超时..."); } // 加入到blockedThreadCollection blockedThreadCollection.add(Thread.currentThread()); // wait 指定的时间 this.wait(timeout); // 计算是否超时 leftTime = endTime - System.currentTimeMillis(); } // 如果空闲,将该monitor置为true blockedThreadCollection.remove(Thread.currentThread()); this.lockFlag = true; // 将当前线程置为lockHolderThread this.lockHolderThread = Thread.currentThread(); } @Override public synchronized void unlock() { // 如果是加锁的线程 if(lockHolderThread == Thread.currentThread()){ // 将Monitor置为空闲 this.lockFlag = false; Optional.of(Thread.currentThread().getName() + " 释放lock" + LocalTime.now().withNano(0)).ifPresent(System.out::println); // 唤醒其他正在等待的线程 this.notifyAll(); } } @Override public Collection<Thread> getBlockedThreads() { // blockedThreadCollection 可能被其他线程add 或者remove,这里定义为不可变的集合类型 return Collections.unmodifiableCollection(blockedThreadCollection); } @Override public int getBlockedSize() { return blockedThreadCollection.size(); } }