JAVA锁应用

简介: synchronized synchronized的作用是实现线程间的同步,一般称为重量级锁,经过jdk6对synchronized优化后,其性能有所提高,与ReentrantLock基本持平。 synchronized关键字经过编译之后,会在同步块的前后分别形成monitorenter和monitorexit这两个字节码指令,这两个字节码都需要一个reference类型的参数来指明要锁定和解锁的对象。
+关注继续查看

synchronized

synchronized的作用是实现线程间的同步,一般称为重量级锁,经过jdk6对synchronized优化后,其性能有所提高,与ReentrantLock基本持平。

synchronized关键字经过编译之后,会在同步块的前后分别形成monitorenter和monitorexit这两个字节码指令,这两个字节码都需要一个reference类型的参数来指明要锁定和解锁的对象。如果Java程序中的synchronized明确指定了对象参数,那就是这个对象的reference;如果没有明确指定,那就根据synchronized修饰的是实例方法还是类方法,去取对应的对象实例或Class对象来作为锁对象。

根据虚拟机规范的要求,在执行monitorenter指令时,首先要尝试获取对象的锁。如果这个对象没被锁定,或者当前线程已经拥有了那个对象的锁,把锁的计数器加1,相应的,在执行monitorexit指令时会将锁计数器减1,当计数器为0时,锁就被释放。

如果获取对象锁失败,那当前线程就要阻塞等待,直到对象锁被另外一个线程释放为止。synchronized同步块对同一条线程来说是可重入的,不会出现自己把自己锁死的问题。

synchronized应用

synchronized有三种方式来加锁:

1. 修饰实例方法,作用于当前实例加锁,进入同步代码前要获得当前实例的锁;

2. 静态方法,作用于当前类对象加锁,进入同步代码前要获得当前类对象的锁;

3. 修饰代码块,指定加锁对象,对给定对象加锁,进入同步代码库前要获得给定对象的锁。

synchronized括号中的对象是一把锁,在java中任意一个对象都可以成为锁,只有获得括号中对象锁的线程才能执行被锁定的代码块。多个线程需要获取synchronized锁定的对象必须是同一个,如果是不同对象,就意味着是不同的房间的钥匙,不能起到同步的作用。

monitorenter指令插入到同步代码块开始的位置、monitorexit指令插入到同步代码块结束位置,jvm需要保证每个monitorenter都有一个monitorexit对应,同一时刻只能有一个线程获取到由synchronized所保护对象的监视器。

线程执行到monitorenter指令时,会尝试获取对象所对应的monitor所有权,也就是尝试获取对象的锁;而执行monitorexit,就是释放monitor的所有权。

jdk6优化后,其获取锁过程可能为:

轻量级锁->偏向锁->自旋锁/自适应锁->重量级锁

锁是用来控制多个线程访问共享资源的方式,除synchronized关键字外,还可以使用Lock接口下的实现类来实现锁的功能,它们提供了与synchroinzed关键字类似的同步功能,但比synchronized更灵活,可以显示的获取和释放锁。

ReentrantLock

ReentrantLock是可重入锁,顾名思义,一个线程获取一个锁后,还可以接着重复获取这个锁多次。重入锁提供了两种实现,一种是非公平的重入锁,另一种是公平的重入锁。

公平锁

对于公平与非公平,如果在时间上先对锁进行获取的请求一定先被满足获得锁,那么这个锁就是公平锁,反之,就是不公平的。锁是否公平需要在构造方法中指定,默认为非公平:

    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

公平锁会保证线程按照时间的先后顺序依次获取锁,这样能防止饥饿现象的发生。但是公平锁实现的成本较高,性能也相对低下,因此默认是非公平锁,如果没有特殊要求也没有必要使用公平锁。

响应中断

对于synchronized,线程等待锁的情况,只能是获得锁继续执行或者保持等待,等待过程中不可被中断。重入锁提供了高级功能,在等待锁的过程中可以响应中断,这能够用来解决死锁问题。通过lockInterruptibly()可以实现该功能:

public class LockTest {
    static Lock lock1 = new ReentrantLock();
    static Lock lock2 = new ReentrantLock();
    int flag;

    public LockTest(int flag) {
        this.flag = flag;
    }

    @Override
    public void run() {
        try {
            if (flag == 1) {
                lock1.lockInterruptibly();
                Thread.sleep(500);
                lock2.lockInterruptibly();
            } else {
                lock2.lockInterruptibly();
                Thread.sleep(500);
                lock1.lockInterruptibly();
            }
        } catch (InterruptedException e) {
        } finally {...}
    }
    
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new LockTest(1)), t2 = new Thread(new LockTest(2));
        t1.start(); t2.start();
        Thread.sleep(1000);
        t1.interrupt();
    }
}

t1和t2会造成死锁,直到t1执行interrupt(),t1会放弃锁资源,死锁解开。

tryLock

除了使用响应中断来取消等待,还可以使用tryLock限时等待获取锁。传入时间参数,表示等待指定的时间后仍没有获取锁则取消等待。

如果不传参数,则线程不会等待锁,而是立即返回锁申请结果:true表示获取锁成功,false表示获取锁失败。可以使用该方法配合失败重试机制来更好的解决死锁问题。

    /**
     * Acquires the lock only if it is not held by another thread at the time
     * of invocation.
     */
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

    /**
     * Acquires the lock if it is not held by another thread within the given
     * waiting time and the current thread has not been
     * {@linkplain Thread#interrupt interrupted}.
     */
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

Condition

对于synchronized,线程的等待和唤起需要配合Object.wait()、Object.notify()和Object.notifyAll()使用,这三个方法也只能在synchronized块中使用。

而Condition实现的功能和上面三个方法类似,不同的是Condition是和重入锁相关联使用的。通过Lock接口的newCondition()接口可以生成一个与当前重入锁绑定的Condition实例:

public interface Lock {

    void lock();

    void lockInterruptibly() throws InterruptedException;

    boolean tryLock();

    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    void unlock();

    /**
     * Returns a new {@link Condition} instance that is bound to this
     * {@code Lock} instance.
     */
    Condition newCondition();
}

Condition的API如下:

public interface Condition {

    /**
     * Causes the current thread to wait until it is signalled or {@linkplain Thread#interrupt interrupted}.
     */
    void await() throws InterruptedException;

    /**
     * Causes the current thread to wait until it is signalled.
     */
    void awaitUninterruptibly();

    /**
     * Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    /**
     * Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses. This method is behaviorally
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    /**
     * Causes the current thread to wait until it is signalled or interrupted, or the specified deadline elapses.
     */
    boolean awaitUntil(Date deadline) throws InterruptedException;

    /**
     * Wakes up one waiting thread.
     */
    void signal();

    /**
     * Wakes up all waiting threads.
     */
    void signalAll();
}

使用示例:

public class ConditionTest {

    static ReentrantLock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();
    
    public static void main(String[] args) throws InterruptedException {
        lock.lock();
        new Thread(new TargetThread()).start();
        System.out.println("main thread get lock");
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
        System.out.println("main thread recovery");
    }
    
    static class TargetThread implements Runnable {
        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println("sub thread get lock and signal main thread");
                condition.signal();
            } finally {
                lock.unlock();
            }
        }
    }
}

运行结果:

main thread get lock
sub thread get lock and signal main thread
main thread recovery

在ArrayBlockingQueue中应用了ReentrantLock和Condition,其中notEmpty和notFull都是和lock属性绑定的,当put操作时,如果队列已满则执行notFull.await()挂起put线程,直到notFull.signal()唤起put线程再继续执行;当take操作时,如果count == 0则挂起take线程,直到notEmpty.signal()唤起take线程后再继续执行:

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    /** Number of elements in the queue */
    int count;
    /** Main lock guarding all access */
    final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty; 
    /** Condition for waiting puts */
    private final Condition notFull;
 
    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    } 

    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal(); // 通知take线程,队列已有数据
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();  // 通知put线程已有空闲空间
        return x;
    }
}

Semaphore

信号量是对锁的扩展,无论是synchronized还是其他锁,一次只能允许一条线程访问共享资源,而信号量可允许同时有多个线程访问共享资源,其构造函数如下:

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and nonfair fairness setting.
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and the given fairness setting.
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

Semaphore的实现原理类似于令牌桶,而permits就是桶的大小,即同一时间最多有多少条线程访问临界资源,permit为许可之意,此处为了类比令牌桶原理,统一称为令牌

    /**
     * Acquires a permit from this semaphore, blocking until one is
     * available, or the thread is {@linkplain Thread#interrupt interrupted}.
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    /**
     * Acquires a permit from this semaphore, blocking until one is
     * available.
     */
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    
    /**
     * Acquires a permit from this semaphore, only if one is available at the
     * time of invocation.
     */
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /**
     * Acquires a permit from this semaphore, if one becomes available
     * within the given waiting time and the current thread has not
     * been {@linkplain Thread#interrupt interrupted}.
     */
    public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    
    /**
     * Releases a permit, returning it to the semaphore.
     */
    public void release() {
        sync.releaseShared(1);
    }

acquire()用于获取一次令牌,如果没有可用令牌则阻塞,阻塞过程中可响应中断;

acquireUninterruptibly()类似acquire(),但是不响应中断;

tryAcquire()用于获取令牌,该方法不阻塞,请求成功会立即返回,返回true表示获取令牌成功,false则为无可用令牌,获取失败;

tryAcquire(long timeout, TimeUnit unit)类似tryAcquire(),但是无可用令牌时,会等待timeout的时间,超出时间后取消等待,直接返回,返回值意义也同tryAcquire()

使用示例:

public class SemapDemo implements Runnable {
        final Semaphore semaphore = new Semaphore(5);
        @Override
        public void run() {
            try {
                semaphore.acquire();
                Thread.sleep(2000);
            } catch (Exception e) {
            } finally {
                semaphore.release();
            }
        }
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(20);
            final SemapDemo semapDemo = new SemapDemo();
            for (int i = 0; i < 20; i++) {
                executorService.submit(semapDemo);
            }
        }
    }
}

RateLimiter

guava的RateLimiter是一个限流工具类,其中的原理就是令牌桶。

比较简单的限流方式是在单位时间内使用计数器counter统计请求数量,如果counter到达限制时对请求进行丢弃或等待。但这种方式很难控制边界时间的请求,例如需要限制1s内最多10个请求,在第1s的前半秒没有请求,后半秒处理了10个请求;然后下1s的前半秒又处理10个请求,此时这第1s的后半秒和第2s的前半秒加起来一共处理了20个请求。

解决边界请求问题可以使用令牌桶方式。令牌桶中存放令牌,每个请求要首先拿到令牌后才能被处理,但是令牌的生成速度是恒定的,比如1s限流10的话,每100毫秒会生成1个令牌,且令牌桶中的令牌最多能存放10个令牌。

令牌桶和漏桶原理

使用1s内限流10条请求为例。

漏桶

漏桶的出水速度是恒定的,也就是说单位时间(每100毫秒)内处理请求数是恒定的,如果瞬时大量请求来临的话,将有大部分请求被丢弃或阻塞,对应上边的场景,就是第1s的后半秒来10条数据,也只能处理5条,剩下5条就被丢弃或阻塞。

令牌桶

单位时间(100毫秒)内处理请求数不恒定,但是生成令牌的速度是恒定的(每100毫秒产生1个令牌),而请求去拿令牌是没有速度限制的。面对瞬时大流量,令牌桶方式可以在短时间内处理大量请求。

即第1s的后半秒来10个请求,这10个请求都可能被处理。

令牌桶示例

public class RateLimiterDemo {
    static RateLimiter limiter = RateLimiter.create(1);

    public static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() / 1000);
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            limiter.acquire();
            new Thread(new Task()).start();
        }
    }
}

API示例:

    // permitsPerSecond表示每秒令牌数,即每秒限流数
    public static RateLimiter create(double permitsPerSecond) {
        return create(RateLimiter.SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
    }
    
    // 拿不到令牌则阻塞
    public double acquire() {
        return this.acquire(1);
    }

    // 限时阻塞
    public double acquire(int permits) {
        long microsToWait = this.reserve(permits);
        this.stopwatch.sleepMicrosUninterruptibly(microsToWait);
        return 1.0D * (double)microsToWait / (double)TimeUnit.SECONDS.toMicros(1L);
    }
    
    // 限时阻塞
    public boolean tryAcquire(long timeout, TimeUnit unit) {
        return this.tryAcquire(1, timeout, unit);
    }

    // 限时阻塞
    public boolean tryAcquire(int permits) {
        return this.tryAcquire(permits, 0L, TimeUnit.MICROSECONDS);
    }

    // 非阻塞
    public boolean tryAcquire() {
        return this.tryAcquire(1, 0L, TimeUnit.MICROSECONDS);
    }

ReadWriteLock

读写锁可以有效的减少锁竞争,提高系统性能,尤其是对于读多写少的场景。

读写锁在同一时刻可以允许多个线程访问,但是在写线程访问时,所有的读线程和其他写线程都会被阻塞。读写锁维护了一对锁:读锁和写锁。读锁和写锁关系:

读锁与读锁可以共享;
读锁与写锁互斥;
写锁与写锁互斥。

使用示例:

public class ReadWriteLockDemo {

    private static Lock lock = new ReentrantLock();
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();
    private int value;

    public Object handleRead(Lock lock) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(1000);
            return value;
        } finally {lock.unlock();}
    }

    public void handleWrite(Lock lock, int index) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(1000);
            value = index;
        } finally {lock.unlock();}
    }

    public static void main(String[] args) {
        final ReadWriteLockDemo demo = new ReadWriteLockDemo();
        Runnable readRunner = () -> {
                try {
                    demo.handleRead(readLock);
//                    demo.handleRead(lock);
                } catch (Exception e) {}
        };
        Runnable writeRunner = () -> {
                try {
                    demo.handleWrite(writeLock, new Random().nextInt());
//                    demo.handleWrite(lock, new Random().nextInt());
                } catch (Exception e) {}
        };

        for (int i = 0; i < 18; i++) {
            new Thread(readRunner).start();
        }

        for (int i = 0; i < 2; i++) {
            new Thread(writeRunner).start();
        }
    }
}

如果使用demo.handleRead(readLock)demo.handleWrite(writeLock, new Random().nextInt()),因为读线程不被阻塞,2s内程序会执行完毕并退出;如果使用demo.handleRead(lock)demo.handleWrite(lock, new Random().nextInt()),读和写都会被阻塞,执行时间会达到20s。

CountDownLatch

CountDownLatch是一个多线程控制工具类,相当于一个倒计数器,在初始化时能指定计数器的值:

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

当执行countDown()时,计数器会减1,当计数减到0时,所有线程并行执行:

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     */
    public void countDown() {
        sync.releaseShared(1);
    }

await()用于使当前线程等待,直到latch的计数器为0或者被中断。当计数器减到0时,该方法会立即返回true:

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
     * or the specified waiting time elapses.
     *
     * <p>If the current count is zero then this method returns immediately
     * with the value {@code true}.
     */
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

使用示例:

public class CountDownLatchDemo implements Runnable{

    static final CountDownLatch latch = new CountDownLatch(10);
    static final CountDownLatchDemo demo = new CountDownLatchDemo();

    @Override
    public void run() {
        try {
            Thread.sleep(new Random().nextInt(10) * 1000);
            latch.countDown();
        } catch (InterruptedException e) {...}
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for (int i=0; i<10; i++) {
            exec.submit(demo);
        }
        latch.await();
        exec.shutdown();
    }
}

如果一个方法要求比较快的响应速度,方法内有非常耗时的操作,这时串行调用接口的用时必然较长,如果该场景可以使用多线程解决,可以使用CountDownLatch。

CyclicBarrier

CyclicBarrier类似于CountDownLatch,也通过计数器来实现,但CyclicBarier是正计数器,参与此次的线程执行await()时,计数器加1,当计数器达到最大值,所有因调用await()进入等待状态的线程被唤醒,继续执行后续操作。其构造器中的parties表示参与的线程数,也即计数器的最大数:

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    
    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and
     * does not perform a predefined action when the barrier is tripped.
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

和CountDownLatch不同,CycliBarrier在释放等待线程后可以重复使用。

使用示例:

public class CyclicBarrierDemo {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 5; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {}
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        System.out.println(threadNum + " is ready");
        barrier.await();
        System.out.println(threadNum + " continue");
    }
}

LockSupport

LockSupport是一个线程阻塞工具,可以在线程的任意位置使线程阻塞。和Object.wait()相比,它不需要获取任何对象的锁。

Condition的实现类中,实现线程等待就使用了LockSupport.park()方法和LockSupport.unpark()方法,park()可以阻塞当前线程,parkNanos()、parkUntil()实现的则是限时的等待:

public class LockSupport {

    /**
     * Makes available the permit for the given thread, if it
     * was not already available.  If the thread was blocked on
     * {@code park} then it will unblock.  Otherwise, its next call
     * to {@code park} is guaranteed not to block. This operation
     * is not guaranteed to have any effect at all if the given
     * thread has not been started.
     */
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

    /**
     * Disables the current thread for thread scheduling purposes unless the
     * permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant
     */
    public static void park() {
        UNSAFE.park(false, 0L);
    }

    /**
     * Disables the current thread for thread scheduling purposes, for up to
     * the specified waiting time, unless the permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant 
     */
    public static void parkNanos(long nanos) {
        if (nanos > 0)
            UNSAFE.park(false, nanos);
    }

    /**
     * Disables the current thread for thread scheduling purposes, until
     * the specified deadline, unless the permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant 
     */
    public static void parkUntil(long deadline) {
        UNSAFE.park(true, deadline);
    }
}

LockSupport使用类似信号量的机制,它为每一个线程准备了一个许可,如果许可可用,那么park()会立即返回,并且消费这个许可;如果许可不可用,就会阻塞。而unpark()则是使许可变成可用。和信号量不同的是,许可不能累加,一个线程不可能拥有超过一个许可。

这个特性使得,即使unpark()发生在park()之前,它也可以使下一次的park()操作立即返回。而resume()如果在suspend()之前执行,就可能会造成线程无限期的挂起。

并且,suspend()之后,线程状态认为RUNNABLE,而park()后,线程状态为WAITING。

atomic

Jdk5提供了原子操作类,这些原子操作类提供了线程安全的更新操作。atomic提供了12个类对应四种类型的原子更新操作:

基本类型:AtomicBoolean、AtomicInteger、AtomicLong

数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

引用类型:AtomicReference、AtomicReferenceFieldUpdater、AtomicMarkableReference

字段类型:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicStampedReference

这些原子类中多用了cas操作,例如AtomicInteger的incrementAndGet():

    /**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

实际调用了Unsafe的getAndAddInt():

    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

getAndAddInt()中通过循环以及cas的方式实现原子更新,类似AtomicInteger,其他atomic类也调用了Unsafe的方法,同样通过cas保证线程安全。

AQS

Lock实现线程安全的核心是AQS(AbstractQueuedSynchronizer),AbstractQueuedSynchronizer提供了一个队列,可以看做是一个用来实现锁以及其他需要同步功能的框架。AQS用一个int类型、volatile修饰的的state变量表示同步状态,并配合Unsafe工具对其进行原子性的操作来实现对当前锁状态的修改:

    /**
     * The synchronization state.
     */
    private volatile int state;

AQS的主要作用是为同步提供统一的底层支持,其使用依靠继承来完成,子类通过继承自AQS并实现所需的方法来管理同步状态。例如ReentrantLock,CountdowLatch就是基于AQS实现的,用法是创建内部类,通过继承AQS实现其模版方法。如下:

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    ...
}

对同步状态的修改主要靠如下方法操作:

getState() 获取当前的同步状态
setState(int newState) 设置当前同步状态
compareAndSetState(int expect,int update) 使用CAS设置当前状态,该方法能够保证状态设置的原子性。

从使用上来说,AQS的功能可以分为独占和共享。独占锁模式下,每次只能有一个线程持有锁,例如ReentrantLock的互斥锁;共享锁模式下,允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLock。

AQS则相当于独占锁和共享锁的实现的父类。

AQS的内部实现

AQS同步器内部依赖一个FIFO的双向队列(链表结构)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程;当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

双向链表包括一个head节点和一个tail节点,分别表示头结点和尾节点,其中头结点不存储Thread,仅保存next结点的引用。队列的基本结构如下:
image

设置尾结点:当一个线程成功地获取了同步状态(或锁),其他线程将无法获取到同步状态,转而被构造成为一个节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:

compareAndSetTail(Node expect,Nodeupdate)

它需要传递当前线程“认为”的尾节点和当前节点,这是一个cas操作,只有设置成功后,当前节点才正式与之前的尾节点建立关联,如下:
image

设置首节点:原头节点释放锁,唤醒后继节点,头节点即获取锁(同步状态)成功的节点,头节点在释放同步状态的时候,会唤醒后继节点,而后继节点将会在获取锁(同步状态)成功时候将自己设置为头节点。

设置头节点是由获取锁(同步状态)成功的线程来完成的,由于只有一个线程能够获取同步状态,则设置头节点的方法不需要CAS保证。如下:
image

队列中节点的定义在AQS中可见:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    /**
     * The synchronization state.
     */
    private volatile int state;

    /**
     * Wait queue node class.
     */
    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        ...
    }
}

CAS

Compare And Swap,比较与交换,是某些情况下避免多线程中使用锁造成性能损耗的一种方案。在操作系统层面,CAS是一种原语,原语属于操作系统用语范畴,由若干条指令组成的,因为原语的执行必须是连续的且不允许被中断,所以CAS是CPU的原子指令。

CAS(V, E, N)

CAS包括三个参数,V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E值时,才会将V值设为N,如果V值和E值不同,说明已经有其他线程做了更新,则当前线程什么都不做。最后CAS返回当前V的真实值。

CAS是以乐观态度进行的,它总认为能够成功完成操作。当多个线程使用CAS操作同一个变量时,只有一个线程能更新成功,其余都更新失败。失败的线程不会挂起,而是直接返回,并且能够立即再次尝试。

基于此,CAS即使没有锁,也能感知其他线程的干扰,并进行相应处理,所以称为无锁

ABA问题

CAS存在ABA问题。

如a、b线程同时从内存中取出值为1的变量n,并且a线程将n值设置为3,然后又设置为1。此时b线程开始操作n,b将n设置为2,因为设置时发现n的值仍为1,所以b操作成功。

但是实际上b操作之前,n的值已经发生了变化,在某些业务下,此时b不应该操作成功。因为n在a的操作下,虽然最终值还是1,但这会造成CAS会判断为期间不存在其他线程介入,实际上是存在的。

JAVA中提供了AtomicStampedReference/AtomicMarkableReference来处理ABA问题,主要是在对象中额外再增加一个标记来标识对象是否有过变更。

参考:《Java高并发程序设计》

相关实践学习
基于阿里云DeepGPU实例,用AI画唯美国风少女
本实验基于阿里云DeepGPU实例,使用aiacctorch加速stable-diffusion-webui,用AI画唯美国风少女,可提升性能至高至原性能的2.6倍。
目录
相关文章
|
12天前
|
Java Apache 数据安全/隐私保护
Java RPC调用: 远程过程调用的实现与应用
Java RPC调用: 远程过程调用的实现与应用
|
12天前
|
存储 算法 Java
深入探索Java中的数组操作:从基础到高级的技巧与应用
深入探索Java中的数组操作:从基础到高级的技巧与应用
|
13天前
|
NoSQL Java
如何对生产环境的JAVA应用进行远程调试
如何对生产环境的JAVA应用进行远程调试
8 0
|
13天前
|
Java Linux Android开发
如何让JAVA应用在Eclipse中也能调用shutdownhook
如何让JAVA应用在Eclipse中也能调用shutdownhook
11 0
|
13天前
|
Java
分析JAVA应用CPU占用过高的问题
分析JAVA应用CPU占用过高的问题
15 0
|
14天前
|
机器学习/深度学习 分布式计算 算法
java在机器学习的应用
java在机器学习的应用
14 1
|
15天前
|
监控 Java 调度
阿里云 ARMS 应用监控重磅支持 Java 21
阿里云 ARMS 应用监控重磅支持 Java 21
48087 33
|
19天前
|
安全 Java 大数据
java在医保平台的应用
java在医保平台的应用
11 0
|
21天前
|
消息中间件 算法 Java
聊聊如何在Java应用中发送短信
很多业务场景里,我们都需要发送短信,比如登陆验证码、告警、营销通知、节日祝福等等。 这篇文章,我们聊聊 Java 应用中如何优雅的发送短信。
聊聊如何在Java应用中发送短信
|
1月前
|
设计模式 存储 Java
JAVA设计模式4:谈谈原型模式在JAVA实战开发中的应用
JAVA设计模式4:谈谈原型模式在JAVA实战开发中的应用
相关产品
云迁移中心
推荐文章
更多