15000字、6个代码案例、5个原理图让你彻底搞懂Synchronized(上)
https://developer.aliyun.com/article/1504423?spm=a2c6h.13148508.setting.17.55974f0e4rkwEt
解锁
查看lock record中复制的内容是不是空,是空说明是可重入锁
不为空则查看mark word是否指向lock record,如果指向则CAS尝试将mark word记录指向lock record替换为lock record中的displaced mark word(也就是原来的mark word)
如果mark word不指向lock record 或者 CAS失败了 说明存在竞争,其他线程加锁失败让mark word指向重量级锁,直接膨胀
关键代码如下:
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) { assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here"); //获取复制的mark word markOop dhw = lock->displaced_header(); markOop mark ; //如果为空 说明是可重入 if (dhw == NULL) { // Recursive stack-lock. // Diagnostics -- Could be: stack-locked, inflating, inflated. mark = object->mark() ; assert (!mark->is_neutral(), "invariant") ; if (mark->has_locker() && mark != markOopDesc::INFLATING()) { assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ; } if (mark->has_monitor()) { ObjectMonitor * m = mark->monitor() ; assert(((oop)(m->object()))->mark() == mark, "invariant") ; assert(m->is_entered(THREAD), "invariant") ; } return ; } mark = object->mark() ; //如果mark word指向lock record if (mark == (markOop) lock) { assert (dhw->is_neutral(), "invariant") ; //尝试CAS将指向lock record的mark word替换为原来的内容 if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) { TEVENT (fast_exit: release stacklock) ; return; } } //未指向当前lock record或者CAS失败则膨胀 ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ; }
偏向锁
hotspot开发人员测试,在某些场景下,总是同一个线程获取锁,在这种场景下,希望用更小的开销来获取锁
当开启偏向锁后,如果是无锁状态会将mark word改为偏向某个线程ID,以此标识这个线程获取锁(锁偏向这个线程)
如果正处于偏向锁,遇到竞争可能膨胀为轻量级锁,如果要存储一致性哈希等情况也会膨胀为重量级锁
JDK8默认开启偏向锁,在高版本JDK默认不开启偏向锁,可能因为偏向锁的维护超过收益,我们也不深入进行研究
重量级锁
object monitor
使用object monitor对象来实现重量级锁
object monitor中使用一些字段记录信息,比如:object字段用于记录锁的那个对象,header字段用于记录锁的那个对象的mark word、owner字段用于记录持有锁的线程
object monitor使用阻塞队列来存储竞争不到锁的线程,使用等待队列来存储调用wait进入等待状态的线程
阻塞队列和等待队列类比着并发包下的AQS和Condition
object monitor使用cxq栈和entry list队列来实现阻塞队列,其中cxq栈中存储有竞争的线程,entry list存储已经竞争失败较稳定的线程;使用wait set实现等待队列
当线程调用wait时,进入wait set等待队列
而调用notify时,只是将等待队列的队头节点加入cxq,并没有唤醒该线程去竞争
真正的唤醒线程是在释放锁时,去稳定的队列entry list中唤醒队头节点去竞争,而此时被唤醒的节点并不一定能抢到锁,因为线程进入cxq时还会通过自旋来抢锁,以此来实现非公平锁
如果稳定的entry list中没有存储线程,会将cxq栈中存储的线程全存储到entry list中再去唤醒,此时越晚进入cxq的线程反而会越早被唤醒(cxq栈先进后出)
其实实现与AQS类似,来看这样一段代码:
t1-t6获取同一把锁,使用t1线程进行阻塞一会,后续t2-t6线程按照顺序启动,由于自转获取不到锁,它们会被依次放入cxq:t2,t3,t4,t5,t6
在t1释放锁时,由于entry list中没有线程,于是将cxq中的线程存入entry list:t6,t5,t4,t3,t2
,再唤醒t6
由于后续没有线程进行竞争,因此最终执行顺序为t1,t6,t5,t4,t3,t2
Object obj = new Object(); new Thread(() -> { synchronized (obj) { try { //输入阻塞 //阻塞的目的是让 其他线程自旋完未获取到锁,进入cxq栈 System.in.read(); } catch (IOException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 获取到锁"); } }, "t1").start(); //sleep控制线程阻塞的顺序 Thread.sleep(50); new Thread(() -> { synchronized (obj) { System.out.println(Thread.currentThread().getName() + " 获取到锁"); } }, "t2").start(); Thread.sleep(50); new Thread(() -> { synchronized (obj) { System.out.println(Thread.currentThread().getName() + " 获取到锁"); } }, "t3").start(); Thread.sleep(50); new Thread(() -> { synchronized (obj) { System.out.println(Thread.currentThread().getName() + " 获取到锁"); } }, "t4").start(); Thread.sleep(50); new Thread(() -> { synchronized (obj) { System.out.println(Thread.currentThread().getName() + " 获取到锁"); } }, "t5").start(); Thread.sleep(50); new Thread(() -> { synchronized (obj) { System.out.println(Thread.currentThread().getName() + " 获取到锁"); } }, "t6").start();
大致了解了下object monitor,我们再来看看膨胀和自旋
膨胀
在膨胀时会有四种状态,分别是
inflated 已膨胀:mark word锁标志为10(2)说明已膨胀,直接返回object monitor
inflation in progress 膨胀中:如果已经有其他线程在膨胀了,就等待一会循环后查看状态进入已膨胀的逻辑
stack-locked 轻量级锁膨胀
neutral 无锁膨胀
轻量级锁和无锁膨胀逻辑差不多,都是需要创建object monitor对象,并且set一些属性进去(比如:mark word、锁的哪个对象、哪个线程持有锁...),最后再使用CAS去替换mark word指向object monitor
c++
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) { for (;;) { const markOop mark = object->mark() ; assert (!mark->has_bias_pattern(), "invariant") ; // The mark can be in one of the following states: // * Inflated - just return // * Stack-locked - coerce it to inflated // * INFLATING - busy wait for conversion to complete // * Neutral - aggressively inflate the object. // * BIASED - Illegal. We should never see this // CASE: inflated // 已膨胀:查看 mark word 后两位是否为2 是则膨胀完 返回monitor对象 if (mark->has_monitor()) { ObjectMonitor * inf = mark->monitor() ; assert (inf->header()->is_neutral(), "invariant"); assert (inf->object() == object, "invariant") ; assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid"); return inf ; } // CASE: inflation in progress - inflating over a stack-lock. // 膨胀中: 等待一会 再循环 从膨胀完状态退出 if (mark == markOopDesc::INFLATING()) { TEVENT (Inflate: spin while INFLATING) ; ReadStableMark(object) ; continue ; } // CASE: stack-locked //轻量级锁膨胀 if (mark->has_locker()) { //创建ObjectMonitor ObjectMonitor * m = omAlloc (Self) ; m->Recycle(); m->_Responsible = NULL ; m->OwnerIsThread = 0 ; m->_recursions = 0 ; m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class //cas将mark word替换指向ObjectMonitor markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ; //cas 失败 则说明其他线程膨胀成功,删除当前monitor 退出 if (cmp != mark) { omRelease (Self, m, true) ; continue ; // Interference -- just retry } markOop dmw = mark->displaced_mark_helper() ; assert (dmw->is_neutral(), "invariant") ; //成功 设置mark word m->set_header(dmw) ; //设置持有锁的线程 m->set_owner(mark->locker()); //设置锁的是哪个对象 m->set_object(object); guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ; //修改mark word对象头信息 锁状态 2 object->release_set_mark(markOopDesc::encode(m)); if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ; TEVENT(Inflate: overwrite stacklock) ; if (TraceMonitorInflation) { if (object->is_instance()) { ResourceMark rm; tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s", (void *) object, (intptr_t) object->mark(), object->klass()->external_name()); } } return m ; } // CASE: neutral //无锁膨胀 与轻量级锁膨胀类似,也是创建monitor对象并注入属性,只是很多属性为空 assert (mark->is_neutral(), "invariant"); ObjectMonitor * m = omAlloc (Self) ; m->Recycle(); m->set_header(mark); m->set_owner(NULL); m->set_object(object); m->OwnerIsThread = 1 ; m->_recursions = 0 ; m->_Responsible = NULL ; m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class //cas 更新 mark word 失败循环等待 成功返回 if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) { m->set_object (NULL) ; m->set_owner (NULL) ; m->OwnerIsThread = 0 ; m->Recycle() ; omRelease (Self, m, true) ; m = NULL ; continue ; } if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ; TEVENT(Inflate: overwrite neutral) ; if (TraceMonitorInflation) { if (object->is_instance()) { ResourceMark rm; tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s", (void *) object, (intptr_t) object->mark(), object->klass()->external_name()); } } return m ; } }
自旋
膨胀过后,在最终挂起前会进行固定自旋和自适应自旋
固定自旋默认10+1次
自适应自旋一开始5000次,如果最近竞争少获取到锁就将自旋次数调大,如果最近竞争大获取不到锁就将自旋次数调小
c++
int ObjectMonitor::TrySpin_VaryDuration (Thread * Self) { // Dumb, brutal spin. Good for comparative measurements against adaptive spinning. int ctr = Knob_FixedSpin ; if (ctr != 0) { while (--ctr >= 0) { if (TryLock (Self) > 0) return 1 ; SpinPause () ; } return 0 ; } //先进行固定11自旋次数 获取到锁返回,没获取到空转 for (ctr = Knob_PreSpin + 1; --ctr >= 0 ; ) { if (TryLock(Self) > 0) { // Increase _SpinDuration ... // Note that we don't clamp SpinDuration precisely at SpinLimit. // Raising _SpurDuration to the poverty line is key. int x = _SpinDuration ; if (x < Knob_SpinLimit) { if (x < Knob_Poverty) x = Knob_Poverty ; _SpinDuration = x + Knob_BonusB ; } return 1 ; } SpinPause () ; } //自适应自旋 一开始5000 如果成功认为此时竞争不大 自旋获取锁成功率高 增加重试次数 如果失败则减少 //... }
总结
本篇文章围绕synchronized,深入浅出的描述CAS、synchronized在Java层面和C++层面的实现、锁升级原理、案例、源码等
synchronized用于并发下的需要同步的场景,使用它可以满足原子性、可见性以及有序性,它可以作用在普通对象和静态对象,作用于静态对象时是去获取其对应的Class对象的锁
synchronized作用在代码块上时,使用monitorentry、monitorexit字节码指令来标识加锁、解锁;作用在方法上时,在访问标识加锁synchronized关键字,虚拟机隐式使用monitorentry、monitorexit
CAS 比较并替换,常与重试机制实现乐观锁/自旋锁,优点是能够在竞争小的场景用较小的开销取代线程挂起,但带来ABA问题、无法预估重试次数空转CPU的开销等问题
轻量级锁的提出是为了在交替执行/竞争少的场景,用更小的开销取代互斥量;使用CAS和lock record实现
轻量级锁加锁时,如果是无锁则复制mark word到lock record中,再CAS将对象mark word替换为指向该lock record,失败则膨胀;如果已经持有锁则判断持有锁的线程是不是当前线程,是则累加次数,不是当前线程则膨胀
轻量级锁解锁时,查看lock record复制的是不是null,是则说明是可重入锁,次数减一;不是则CAS把复制过来的mark word替换回去,如果替换失败说明其他线程竞争,mark word已经指向object monitor,去指向重量级锁的释放
偏向锁的提出是为了在经常一条线程执行的场景下,用更小的开销来取代CAS的开销,只不过高版本不再默认开启
重量级锁由object monitor来实现,object monitor中使用cxq、entry list来构成阻塞队列,wait set来构成等待队列
当执行wait方法时,线程构建为节点加入wait set;当执行notify方法时,将wait set队头节点加入cxq,在释放锁时才去唤醒entry list队头节点竞争锁,即使没抢到锁构建为节点加入cxq时还会自旋,因此并不是entry list队头节点就一定能抢到锁,以此来实现非公平锁;当entry list为空时,将cxq栈中的节点加入entry list队列(后进入cxq的节点会被先唤醒)
在膨胀为重量级锁时有四种情况,如果状态为已膨胀则直接返回object monitor对象;如果状态为膨胀中,说明其他线程正在膨胀,等待会,下次循环进入已膨胀的逻辑;如果状态为轻量级锁膨胀或无锁膨胀,都会去创建object monitor对象,set一些重要属性,并CAS去将mark word替换为指向该object monitor
重量级锁在最终挂起前会进行固定自旋和自适应自旋(最近竞争小就增加自旋次数;竞争多就减少自旋次数)
最后(不要白嫖,一键三连求求拉~)
本篇文章被收入专栏 由点到线,由线到面,深入浅出构建Java并发编程知识体系,感兴趣的同学可以持续关注喔
本篇文章笔记以及案例被收入 gitee-StudyJava、 github-StudyJava 感兴趣的同学可以stat下持续关注喔~
案例地址:
Gitee-JavaConcurrentProgramming/src/main/java/B_Synchronized
Github-JavaConcurrentProgramming/src/main/java/B_Synchronized
有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
关注菜菜,分享更多干货,公众号:菜菜的后端私房菜