线程同步
Java 多线程无法绕过的点就包括线程同步,线程同步就是基于程序员所想的顺序对竟态资源进行访问。我们已知的线程同步方式就有锁(synchronized,Lock)以及JUC 下的一些同步工具。
那么抛开已知的线程同步工具,我们自己是否能实现一个同步工具呢?答案是肯定的,基于AbstractQueuedSynchronizer 提供的模版方法,我们很快的就可以实现一个自己的同步工具。
DIY同步工具
首先我们创建一个自己的同步工具类SelfLock.java
publicclassSelfLock { /*** 构建自己的同步核心类,继承AQS*/classSyncextendsAbstractQueuedSynchronizer{ /*** 重写该方法,该方法是我们实现同步工具的获取具体方法* 以下代码表示通过cas 的方式将AQS 中的state 字段设置成1* 这里没有设置获取锁成功的线程* * @param arg 参数* @return true 成功获取锁 false 获取锁失败*/protectedbooleantryAcquire(intarg) { returncompareAndSetState(0,1); } /*** 重写该方法,该方法是我们实现同步工具释放锁的具体方法* 以下代码直接给AQS 的state 设置为0 表示解锁* 这里没有校验解锁的是否为当前线程** @param arg 参数* @return true 成功释放锁 false 释放锁失败*/protectedbooleantryRelease(intarg) { setState(0); returntrue; } protectedbooleanisHeldExclusively() { returngetState() ==1; } } /*** 创建同步核心对象*/Syncsync=newSync(); /*** 加锁方法 相当于直接调用AQS 的acquire 方法*/publicvoidlock(){ sync.acquire(1); } /*** 解锁 相当于直接调用AQS 的release 方法*/publicvoidunlock(){ sync.release(1); } }
以上就是自定义实现同步工具的代码,那么我么测试一下,写个测试用例:
publicclassTestSelfLock { staticintcount=0; staticSelfLockleeLock=newSelfLock(); publicstaticvoidmain(String[] args) throwsInterruptedException { Runnablerunnable= () -> { try { leeLock.lock(); for (inti=0; i<10000; i++) { count++; } } catch (Exceptione) { e.printStackTrace(); } finally { leeLock.unlock(); } }; // 根据Java 开发手册之规定,通过线程工厂构建线程;MyselfTreadFactoryselfLockTest=newMyselfTreadFactory("self lock test"); Threadthread1=selfLockTest.newThread(runnable); Threadthread2=selfLockTest.newThread(runnable); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(count); } }
MyselfTreadFactory.java
publicclassMyselfTreadFactoryimplementsThreadFactory { privatefinalStringnamePrefix; /*** 通过原子类保证线程顺序*/privatefinalAtomicIntegernextId=newAtomicInteger(1); publicMyselfTreadFactory(StringwhatFeatureOfGroup){ this.namePrefix="From MyselfTreadFactory's "+whatFeatureOfGroup+"Worker-"; } publicThreadnewThread(Runnabler) { Stringname=namePrefix+nextId.getAndIncrement(); Threadthread=newThread(null,r,name,0); System.out.println(thread.getName()); returnthread; } }
执行结果:
通过上面测试结果来看,几行代买就可以实现一个有效的同步工具,但是具体实现还要考虑一些细节,比如加锁时设置获取锁成功的线程,在解锁时进行校验,保证同步工具的稳定性。简单的实现只是想表达AQS 的强大,接下来我们简单了解一下AQS;
AQS(AbstractQueuedSynchronizer):
它是由DougLee大神,使用一个volatile的int类型的成员变量state 来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对state值的修改。它是很多上层同步实现类的基础,如:ReentrantLock、CountDownLatch、Semaphore等它们通过集成AQS实现其模板方法,然后将AQS子类作为同步组件的内部类,通常命名为Sync。
在我们自己实现的同步工具类中,state 为1表示锁已被线程占用,state 为0表示锁可以被获取,之所以用volatile 就是利用了其可见性和防止指令重排序的特性,低成本的保证state 的修改对其他线程可见及安全性。
AQS 加锁
我们的同步工具在工作时,其实调用的是AQS 的acquire 方法:
publicfinalvoidacquire(intarg) { if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire() 方法就是我们重写的逻辑,当获取锁失败时,会将当前线程通过addWaiter方法包装成一个Node 节点加入队列,并且当参数传入acquireQueued方法,该方法会处理等待队列中的线程是该出队还是该休眠。注(该队列的头节点时一个虚拟头节点,也就是说,当队列初始化时,同时有两个线程竞争锁,有一个线程将会包装成node 节点放入队列,当前队列则有两个节点。)
finalbooleanacquireQueued(finalNodenode, intarg) { booleaninterrupted=false; try { // 通过自旋进行锁的获取for (;;) { finalNodep=node.predecessor(); if (p==head&&tryAcquire(arg)) { // 说明获取到锁,将当前节点设置为虚拟头节点setHead(node); p.next=null; // help GCreturninterrupted; } // 进行到这儿表示没有获取到锁或者该节点不是头节点,避免浪费cpu 资源进行中断if (shouldParkAfterFailedAcquire(p, node)) interrupted|=parkAndCheckInterrupt(); } } catch (Throwablet) { cancelAcquire(node); if (interrupted) selfInterrupt(); throwt; } }
AQS 解锁
对于加锁我们分析完成后,我们则要对解锁进行了解,解锁其实就是调用了AQS 的release 方法;
publicfinalbooleanrelease(intarg) { if (tryRelease(arg)) { Nodeh=head; if (h!=null&&h.waitStatus!=0) unparkSuccessor(h); returntrue; } returnfalse; }
解锁也是调用了我们自己实现工具类重写的tryRelease方法,整个方法看起来还是比较简单的,值得注意的就是h != null && h.waitStatus != 0 整个判断条件;
- h == null 表示head 为null 则表示队列没数据 不需要唤醒中断
- h != null 且waitStatus == 0 表示不需要唤醒
- h != null 且waitStatus < 0 ( >0 的节点已经出队了,因为取消竞争锁了) 则表示需要唤醒
唤醒后继节点进行争取锁资源,这就完成了闭环了。
以上就是本次对AQS 的分享,有兴趣的小伙伴可以深入研究一下该源码,本人则通过源码阅读对模版设计模式有了一些理解,并且对各个方法的职责明确有了新的认识,希望大家在不断学习的过程提升自己的编码能力以及认知能力。
大家加油~