synchronized
synchronized的作用是实现线程间的同步,一般称为重量级锁,经过jdk6对synchronized优化后,其性能有所提高,与ReentrantLock基本持平。
synchronized关键字经过编译之后,会在同步块的前后分别形成monitorenter和monitorexit这两个字节码指令,这两个字节码都需要一个reference类型的参数来指明要锁定和解锁的对象。如果Java程序中的synchronized明确指定了对象参数,那就是这个对象的reference;如果没有明确指定,那就根据synchronized修饰的是实例方法还是类方法,去取对应的对象实例或Class对象来作为锁对象。
根据虚拟机规范的要求,在执行monitorenter指令时,首先要尝试获取对象的锁。如果这个对象没被锁定,或者当前线程已经拥有了那个对象的锁,把锁的计数器加1,相应的,在执行monitorexit指令时会将锁计数器减1,当计数器为0时,锁就被释放。
如果获取对象锁失败,那当前线程就要阻塞等待,直到对象锁被另外一个线程释放为止。synchronized同步块对同一条线程来说是可重入的,不会出现自己把自己锁死的问题。
synchronized应用
synchronized有三种方式来加锁:
1. 修饰实例方法,作用于当前实例加锁,进入同步代码前要获得当前实例的锁;
2. 静态方法,作用于当前类对象加锁,进入同步代码前要获得当前类对象的锁;
3. 修饰代码块,指定加锁对象,对给定对象加锁,进入同步代码库前要获得给定对象的锁。
synchronized括号中的对象是一把锁,在java中任意一个对象都可以成为锁,只有获得括号中对象锁的线程才能执行被锁定的代码块。多个线程需要获取synchronized锁定的对象必须是同一个,如果是不同对象,就意味着是不同的房间的钥匙,不能起到同步的作用。
monitorenter指令插入到同步代码块开始的位置、monitorexit指令插入到同步代码块结束位置,jvm需要保证每个monitorenter都有一个monitorexit对应,同一时刻只能有一个线程获取到由synchronized所保护对象的监视器。
线程执行到monitorenter指令时,会尝试获取对象所对应的monitor所有权,也就是尝试获取对象的锁;而执行monitorexit,就是释放monitor的所有权。
jdk6优化后,其获取锁过程可能为:
轻量级锁->偏向锁->自旋锁/自适应锁->重量级锁
锁是用来控制多个线程访问共享资源的方式,除synchronized关键字外,还可以使用Lock接口下的实现类来实现锁的功能,它们提供了与synchroinzed关键字类似的同步功能,但比synchronized更灵活,可以显示的获取和释放锁。
ReentrantLock
ReentrantLock是可重入锁,顾名思义,一个线程获取一个锁后,还可以接着重复获取这个锁多次。重入锁提供了两种实现,一种是非公平的重入锁,另一种是公平的重入锁。
公平锁
对于公平与非公平,如果在时间上先对锁进行获取的请求一定先被满足获得锁,那么这个锁就是公平锁,反之,就是不公平的。锁是否公平需要在构造方法中指定,默认为非公平:
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
公平锁会保证线程按照时间的先后顺序依次获取锁,这样能防止饥饿现象的发生。但是公平锁实现的成本较高,性能也相对低下,因此默认是非公平锁,如果没有特殊要求也没有必要使用公平锁。
响应中断
对于synchronized,线程等待锁的情况,只能是获得锁继续执行或者保持等待,等待过程中不可被中断。重入锁提供了高级功能,在等待锁的过程中可以响应中断,这能够用来解决死锁问题。通过lockInterruptibly()可以实现该功能:
public class LockTest {
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
int flag;
public LockTest(int flag) {
this.flag = flag;
}
@Override
public void run() {
try {
if (flag == 1) {
lock1.lockInterruptibly();
Thread.sleep(500);
lock2.lockInterruptibly();
} else {
lock2.lockInterruptibly();
Thread.sleep(500);
lock1.lockInterruptibly();
}
} catch (InterruptedException e) {
} finally {...}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new LockTest(1)), t2 = new Thread(new LockTest(2));
t1.start(); t2.start();
Thread.sleep(1000);
t1.interrupt();
}
}
t1和t2会造成死锁,直到t1执行interrupt(),t1会放弃锁资源,死锁解开。
tryLock
除了使用响应中断来取消等待,还可以使用tryLock限时等待获取锁。传入时间参数,表示等待指定的时间后仍没有获取锁则取消等待。
如果不传参数,则线程不会等待锁,而是立即返回锁申请结果:true表示获取锁成功,false表示获取锁失败。可以使用该方法配合失败重试机制来更好的解决死锁问题。
/**
* Acquires the lock only if it is not held by another thread at the time
* of invocation.
*/
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
/**
* Acquires the lock if it is not held by another thread within the given
* waiting time and the current thread has not been
* {@linkplain Thread#interrupt interrupted}.
*/
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
Condition
对于synchronized,线程的等待和唤起需要配合Object.wait()、Object.notify()和Object.notifyAll()使用,这三个方法也只能在synchronized块中使用。
而Condition实现的功能和上面三个方法类似,不同的是Condition是和重入锁相关联使用的。通过Lock接口的newCondition()接口可以生成一个与当前重入锁绑定的Condition实例:
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
/**
* Returns a new {@link Condition} instance that is bound to this
* {@code Lock} instance.
*/
Condition newCondition();
}
Condition的API如下:
public interface Condition {
/**
* Causes the current thread to wait until it is signalled or {@linkplain Thread#interrupt interrupted}.
*/
void await() throws InterruptedException;
/**
* Causes the current thread to wait until it is signalled.
*/
void awaitUninterruptibly();
/**
* Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses. This method is behaviorally
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* Causes the current thread to wait until it is signalled or interrupted, or the specified deadline elapses.
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* Wakes up one waiting thread.
*/
void signal();
/**
* Wakes up all waiting threads.
*/
void signalAll();
}
使用示例:
public class ConditionTest {
static ReentrantLock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
lock.lock();
new Thread(new TargetThread()).start();
System.out.println("main thread get lock");
try {
condition.await();
} finally {
lock.unlock();
}
System.out.println("main thread recovery");
}
static class TargetThread implements Runnable {
@Override
public void run() {
lock.lock();
try {
System.out.println("sub thread get lock and signal main thread");
condition.signal();
} finally {
lock.unlock();
}
}
}
}
运行结果:
main thread get lock
sub thread get lock and signal main thread
main thread recovery
在ArrayBlockingQueue中应用了ReentrantLock和Condition,其中notEmpty和notFull都是和lock属性绑定的,当put操作时,如果队列已满则执行notFull.await()挂起put线程,直到notFull.signal()唤起put线程再继续执行;当take操作时,如果count == 0则挂起take线程,直到notEmpty.signal()唤起take线程后再继续执行:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
/** Number of elements in the queue */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); // 通知take线程,队列已有数据
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // 通知put线程已有空闲空间
return x;
}
}
Semaphore
信号量是对锁的扩展,无论是synchronized还是其他锁,一次只能允许一条线程访问共享资源,而信号量可允许同时有多个线程访问共享资源,其构造函数如下:
/**
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore的实现原理类似于令牌桶,而permits就是桶的大小,即同一时间最多有多少条线程访问临界资源,permit为许可
之意,此处为了类比令牌桶原理,统一称为令牌
。
/**
* Acquires a permit from this semaphore, blocking until one is
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* Acquires a permit from this semaphore, blocking until one is
* available.
*/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
/**
* Acquires a permit from this semaphore, only if one is available at the
* time of invocation.
*/
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
/**
* Acquires a permit from this semaphore, if one becomes available
* within the given waiting time and the current thread has not
* been {@linkplain Thread#interrupt interrupted}.
*/
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* Releases a permit, returning it to the semaphore.
*/
public void release() {
sync.releaseShared(1);
}
acquire()
用于获取一次令牌,如果没有可用令牌则阻塞,阻塞过程中可响应中断;
acquireUninterruptibly()
类似acquire(),但是不响应中断;
tryAcquire()
用于获取令牌,该方法不阻塞,请求成功会立即返回,返回true表示获取令牌成功,false则为无可用令牌,获取失败;
tryAcquire(long timeout, TimeUnit unit)
类似tryAcquire()
,但是无可用令牌时,会等待timeout的时间,超出时间后取消等待,直接返回,返回值意义也同tryAcquire()
。
使用示例:
public class SemapDemo implements Runnable {
final Semaphore semaphore = new Semaphore(5);
@Override
public void run() {
try {
semaphore.acquire();
Thread.sleep(2000);
} catch (Exception e) {
} finally {
semaphore.release();
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
final SemapDemo semapDemo = new SemapDemo();
for (int i = 0; i < 20; i++) {
executorService.submit(semapDemo);
}
}
}
}
RateLimiter
guava的RateLimiter是一个限流工具类,其中的原理就是令牌桶。
比较简单的限流方式是在单位时间内使用计数器counter统计请求数量,如果counter到达限制时对请求进行丢弃或等待。但这种方式很难控制边界时间的请求,例如需要限制1s内最多10个请求,在第1s的前半秒没有请求,后半秒处理了10个请求;然后下1s的前半秒又处理10个请求,此时这第1s的后半秒和第2s的前半秒加起来一共处理了20个请求。
解决边界请求问题可以使用令牌桶方式。令牌桶中存放令牌,每个请求要首先拿到令牌后才能被处理,但是令牌的生成速度是恒定的,比如1s限流10的话,每100毫秒会生成1个令牌,且令牌桶中的令牌最多能存放10个令牌。
令牌桶和漏桶原理
使用1s内限流10条请求为例。
漏桶
漏桶的出水速度是恒定的,也就是说单位时间(每100毫秒)内处理请求数是恒定的,如果瞬时大量请求来临的话,将有大部分请求被丢弃或阻塞,对应上边的场景,就是第1s的后半秒来10条数据,也只能处理5条,剩下5条就被丢弃或阻塞。
令牌桶
单位时间(100毫秒)内处理请求数不恒定,但是生成令牌的速度是恒定的(每100毫秒产生1个令牌),而请求去拿令牌是没有速度限制的。面对瞬时大流量,令牌桶方式可以在短时间内处理大量请求。
即第1s的后半秒来10个请求,这10个请求都可能被处理。
令牌桶示例
public class RateLimiterDemo {
static RateLimiter limiter = RateLimiter.create(1);
public static class Task implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() / 1000);
}
}
public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
limiter.acquire();
new Thread(new Task()).start();
}
}
}
API示例:
// permitsPerSecond表示每秒令牌数,即每秒限流数
public static RateLimiter create(double permitsPerSecond) {
return create(RateLimiter.SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
}
// 拿不到令牌则阻塞
public double acquire() {
return this.acquire(1);
}
// 限时阻塞
public double acquire(int permits) {
long microsToWait = this.reserve(permits);
this.stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0D * (double)microsToWait / (double)TimeUnit.SECONDS.toMicros(1L);
}
// 限时阻塞
public boolean tryAcquire(long timeout, TimeUnit unit) {
return this.tryAcquire(1, timeout, unit);
}
// 限时阻塞
public boolean tryAcquire(int permits) {
return this.tryAcquire(permits, 0L, TimeUnit.MICROSECONDS);
}
// 非阻塞
public boolean tryAcquire() {
return this.tryAcquire(1, 0L, TimeUnit.MICROSECONDS);
}
ReadWriteLock
读写锁可以有效的减少锁竞争,提高系统性能,尤其是对于读多写少的场景。
读写锁在同一时刻可以允许多个线程访问,但是在写线程访问时,所有的读线程和其他写线程都会被阻塞。读写锁维护了一对锁:读锁和写锁。读锁和写锁关系:
读锁与读锁可以共享;
读锁与写锁互斥;
写锁与写锁互斥。
使用示例:
public class ReadWriteLockDemo {
private static Lock lock = new ReentrantLock();
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
private int value;
public Object handleRead(Lock lock) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);
return value;
} finally {lock.unlock();}
}
public void handleWrite(Lock lock, int index) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);
value = index;
} finally {lock.unlock();}
}
public static void main(String[] args) {
final ReadWriteLockDemo demo = new ReadWriteLockDemo();
Runnable readRunner = () -> {
try {
demo.handleRead(readLock);
// demo.handleRead(lock);
} catch (Exception e) {}
};
Runnable writeRunner = () -> {
try {
demo.handleWrite(writeLock, new Random().nextInt());
// demo.handleWrite(lock, new Random().nextInt());
} catch (Exception e) {}
};
for (int i = 0; i < 18; i++) {
new Thread(readRunner).start();
}
for (int i = 0; i < 2; i++) {
new Thread(writeRunner).start();
}
}
}
如果使用demo.handleRead(readLock)
和demo.handleWrite(writeLock, new Random().nextInt())
,因为读线程不被阻塞,2s内程序会执行完毕并退出;如果使用demo.handleRead(lock)
和demo.handleWrite(lock, new Random().nextInt())
,读和写都会被阻塞,执行时间会达到20s。
CountDownLatch
CountDownLatch是一个多线程控制工具类,相当于一个倒计数器,在初始化时能指定计数器的值:
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
当执行countDown()时,计数器会减1,当计数减到0时,所有线程并行执行:
/**
* Decrements the count of the latch, releasing all waiting threads if
* the count reaches zero.
*/
public void countDown() {
sync.releaseShared(1);
}
await()用于使当前线程等待,直到latch的计数器为0或者被中断。当计数器减到0时,该方法会立即返回true:
/**
* Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplain Thread#interrupt interrupted},
* or the specified waiting time elapses.
*
* <p>If the current count is zero then this method returns immediately
* with the value {@code true}.
*/
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
使用示例:
public class CountDownLatchDemo implements Runnable{
static final CountDownLatch latch = new CountDownLatch(10);
static final CountDownLatchDemo demo = new CountDownLatchDemo();
@Override
public void run() {
try {
Thread.sleep(new Random().nextInt(10) * 1000);
latch.countDown();
} catch (InterruptedException e) {...}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i=0; i<10; i++) {
exec.submit(demo);
}
latch.await();
exec.shutdown();
}
}
如果一个方法要求比较快的响应速度,方法内有非常耗时的操作,这时串行调用接口的用时必然较长,如果该场景可以使用多线程解决,可以使用CountDownLatch。
CyclicBarrier
CyclicBarrier类似于CountDownLatch,也通过计数器来实现,但CyclicBarier是正计数器,参与此次的线程执行await()时,计数器加1,当计数器达到最大值,所有因调用await()进入等待状态的线程被唤醒,继续执行后续操作。其构造器中的parties表示参与的线程数,也即计数器的最大数:
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and
* does not perform a predefined action when the barrier is tripped.
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
和CountDownLatch不同,CycliBarrier在释放等待线程后可以重复使用。
使用示例:
public class CyclicBarrierDemo {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
System.out.println(threadNum + " is ready");
barrier.await();
System.out.println(threadNum + " continue");
}
}
LockSupport
LockSupport是一个线程阻塞工具,可以在线程的任意位置使线程阻塞。和Object.wait()相比,它不需要获取任何对象的锁。
Condition的实现类中,实现线程等待就使用了LockSupport.park()方法和LockSupport.unpark()方法,park()可以阻塞当前线程,parkNanos()、parkUntil()实现的则是限时的等待:
public class LockSupport {
/**
* Makes available the permit for the given thread, if it
* was not already available. If the thread was blocked on
* {@code park} then it will unblock. Otherwise, its next call
* to {@code park} is guaranteed not to block. This operation
* is not guaranteed to have any effect at all if the given
* thread has not been started.
*/
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
/**
* Disables the current thread for thread scheduling purposes unless the
* permit is available.
*
* <p>If the permit is available then it is consumed and the call
* returns immediately; otherwise the current thread becomes disabled
* for thread scheduling purposes and lies dormant
*/
public static void park() {
UNSAFE.park(false, 0L);
}
/**
* Disables the current thread for thread scheduling purposes, for up to
* the specified waiting time, unless the permit is available.
*
* <p>If the permit is available then it is consumed and the call
* returns immediately; otherwise the current thread becomes disabled
* for thread scheduling purposes and lies dormant
*/
public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}
/**
* Disables the current thread for thread scheduling purposes, until
* the specified deadline, unless the permit is available.
*
* <p>If the permit is available then it is consumed and the call
* returns immediately; otherwise the current thread becomes disabled
* for thread scheduling purposes and lies dormant
*/
public static void parkUntil(long deadline) {
UNSAFE.park(true, deadline);
}
}
LockSupport使用类似信号量的机制,它为每一个线程准备了一个许可,如果许可可用,那么park()会立即返回,并且消费这个许可;如果许可不可用,就会阻塞。而unpark()则是使许可变成可用。和信号量不同的是,许可不能累加,一个线程不可能拥有超过一个许可。
这个特性使得,即使unpark()发生在park()之前,它也可以使下一次的park()操作立即返回。而resume()如果在suspend()之前执行,就可能会造成线程无限期的挂起。
并且,suspend()之后,线程状态认为RUNNABLE,而park()后,线程状态为WAITING。
atomic
Jdk5提供了原子操作类,这些原子操作类提供了线程安全的更新操作。atomic提供了12个类对应四种类型的原子更新操作:
基本类型:AtomicBoolean、AtomicInteger、AtomicLong
数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
引用类型:AtomicReference、AtomicReferenceFieldUpdater、AtomicMarkableReference
字段类型:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicStampedReference
这些原子类中多用了cas操作,例如AtomicInteger的incrementAndGet():
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
实际调用了Unsafe的getAndAddInt():
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
getAndAddInt()中通过循环以及cas的方式实现原子更新,类似AtomicInteger,其他atomic类也调用了Unsafe的方法,同样通过cas保证线程安全。
AQS
Lock实现线程安全的核心是AQS(AbstractQueuedSynchronizer),AbstractQueuedSynchronizer提供了一个队列,可以看做是一个用来实现锁以及其他需要同步功能的框架。AQS用一个int类型、volatile修饰的的state变量表示同步状态,并配合Unsafe工具对其进行原子性的操作来实现对当前锁状态的修改:
/**
* The synchronization state.
*/
private volatile int state;
AQS的主要作用是为同步提供统一的底层支持,其使用依靠继承来完成,子类通过继承自AQS并实现所需的方法来管理同步状态。例如ReentrantLock,CountdowLatch就是基于AQS实现的,用法是创建内部类,通过继承AQS实现其模版方法。如下:
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
...
}
对同步状态的修改主要靠如下方法操作:
getState() 获取当前的同步状态
setState(int newState) 设置当前同步状态
compareAndSetState(int expect,int update) 使用CAS设置当前状态,该方法能够保证状态设置的原子性。
从使用上来说,AQS的功能可以分为独占和共享。独占锁模式下,每次只能有一个线程持有锁,例如ReentrantLock的互斥锁;共享锁模式下,允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLock。
AQS则相当于独占锁和共享锁的实现的父类。
AQS的内部实现
AQS同步器内部依赖一个FIFO的双向队列(链表结构)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程;当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
双向链表包括一个head节点和一个tail节点,分别表示头结点和尾节点,其中头结点不存储Thread,仅保存next结点的引用。队列的基本结构如下:
设置尾结点:当一个线程成功地获取了同步状态(或锁),其他线程将无法获取到同步状态,转而被构造成为一个节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:
compareAndSetTail(Node expect,Nodeupdate)
它需要传递当前线程“认为”的尾节点和当前节点,这是一个cas操作,只有设置成功后,当前节点才正式与之前的尾节点建立关联,如下:
设置首节点:原头节点释放锁,唤醒后继节点,头节点即获取锁(同步状态)成功的节点,头节点在释放同步状态的时候,会唤醒后继节点,而后继节点将会在获取锁(同步状态)成功时候将自己设置为头节点。
设置头节点是由获取锁(同步状态)成功的线程来完成的,由于只有一个线程能够获取同步状态,则设置头节点的方法不需要CAS保证。如下:
队列中节点的定义在AQS中可见:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
/**
* The synchronization state.
*/
private volatile int state;
/**
* Wait queue node class.
*/
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
...
}
}
CAS
Compare And Swap,比较与交换,是某些情况下避免多线程中使用锁造成性能损耗的一种方案。在操作系统层面,CAS是一种原语,原语属于操作系统用语范畴,由若干条指令组成的,因为原语的执行必须是连续的且不允许被中断,所以CAS是CPU的原子指令。
CAS(V, E, N)
CAS包括三个参数,V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E值时,才会将V值设为N,如果V值和E值不同,说明已经有其他线程做了更新,则当前线程什么都不做。最后CAS返回当前V的真实值。
CAS是以乐观态度进行的,它总认为能够成功完成操作。当多个线程使用CAS操作同一个变量时,只有一个线程能更新成功,其余都更新失败。失败的线程不会挂起,而是直接返回,并且能够立即再次尝试。
基于此,CAS即使没有锁,也能感知其他线程的干扰,并进行相应处理,所以称为无锁
。
ABA问题
CAS存在ABA问题。
如a、b线程同时从内存中取出值为1的变量n,并且a线程将n值设置为3,然后又设置为1。此时b线程开始操作n,b将n设置为2,因为设置时发现n的值仍为1,所以b操作成功。
但是实际上b操作之前,n的值已经发生了变化,在某些业务下,此时b不应该操作成功。因为n在a的操作下,虽然最终值还是1,但这会造成CAS会判断为期间不存在其他线程介入,实际上是存在的。
JAVA中提供了AtomicStampedReference/AtomicMarkableReference来处理ABA问题,主要是在对象中额外再增加一个标记来标识对象是否有过变更。
参考:《Java高并发程序设计》