Java Review - 并发编程_抽象同步队列AQS(下)

简介: Java Review - 并发编程_抽象同步队列AQS(下)

维护AQS提供的队列 - 入队操作


最后,我们来看看如何维护AQS提供的队列,主要看入队操作。


入队操作: 当一个线程获取锁失败后该线程会被转换为Node节点,然后就会使用enq(final Node node)方法将该节点插入到AQS的阻塞队列.

 /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail; // 1 
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))// 2
                    tail = head;
            } else {
                node.prev = t; // 3 
                if (compareAndSetTail(t, node)) { // 4 
                    t.next = node;
                    return t;
                }
            }
        }
    }


下面结合代码和节点图来讲解入队的过程。

【第一次循环】

  • 如上代码在第一次循环中,当要在AQS队列尾部插入元素时,AQS队列状态如下所示


a01b8af981dd462a9208ad559fa89790.png


也就是队列头、尾节点都指向null;

  • 当执行代码(1)后节点t指向了尾部节点,这时候队列状态如下图所示。

2c6cb7d03d3d4fb98ea235c7e45061a8.png

  • 这时候t为null,故执行代码(2),使用CAS算法设置一个哨兵节点为头节点,如果CAS设置成功,则让尾部节点也指向哨兵节点,这时候队列状态如下图所示



dfa36be10b674be698d3bda71d119be3.png



【第二次循环】

  • 到现在为止只插入了一个哨兵节点,还需要插入node节点,所以在第二次循环后执行到代码(1),这时候队列状态如下图所示


47176dd6c62f46cfb344f3cf03176908.png

  • 然后执行代码(3)设置node(入参)的前驱节点为尾部节点,这时候队列状态如下图所示


ec4e0c56b0864be08d55fd5dc6a826a7.png


  • 然后通过CAS算法设置node节点为尾部节点,CAS成功后队列状态如下图所示


84647f92448743489b952a67130164b9.png



CAS成功后再设置原来的尾部节点的后驱节点为node,这时候就完成了双向链表的插入,此时队列状态如下图所示。


cfe49b997ff24e3b8eae28892420cc07.png

AQS——条件变量的支持


我们知道notify和wait,是配合synchronized内置锁实现线程间同步的基础设施一样,条件变量的signal和await方法也是用来配合锁(使用AQS实现的锁)实现线程间同步的基础设施。


它们的不同在于,synchronized同时只能与一个共享变量的notify或wait方法实现同步,而AQS的一个锁可以对应多个条件变量。


在调用共享变量的notify和wait方法前必须先获取该共享变量的内置锁,同理,在调用条件变量的signal和await方法前也必须先获取条件变量对应的锁。


那么,到底什么是条件变量呢?如何使用呢?不急,下面看一个例子。

      // 1
        ReentrantLock lock = new ReentrantLock();
        // 2
        Condition condition = lock.newCondition();
        // 3
        lock.lock();
        try {
            System.out.println("begin wait");
            // 4
            condition.await();
            System.out.println("end wait");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 5
            lock.unlock();
        }
        // 6
        lock.lock();
        try {
            System.out.println("begin single");
            // 7
            condition.signal();
            System.out.println("end single");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 8
            lock.unlock();
        }


代码(1)创建了一个独占锁ReentrantLock对象,ReentrantLock是基于AQS实现的锁。


代码(2)使用创建的Lock对象的newCondition()方法创建了一个ConditionObject变量,这个变量就是Lock锁对应的一个条件变量。需要注意的是,一个Lock对象可以创建多个条件变量。


代码(3)首先获取了独占锁


代码(4)则调用了条件变量的await()方法阻塞挂起了当前线程。 当其他线程调用条件变量的signal方法时,被阻塞的线程才会从await处返回。需要注意的是,和调用Object的wait方法一样,如果在没有获取到锁前调用了条件变量的await方法则会抛出 java.lang.IllegalMonitorStateException异常。


代码(5)则释放了获取的锁。


其实这里的Lock对象等价于synchronized加上共享变量,调用lock.lock()方法就相当于进入了synchronized块(获取了共享变量的内置锁),调用lock.unLock()方法就相当于退出synchronized块。 调用条件变量的await()方法就相当于调用共享变量的wait()方法,调用条件变量的signal方法就相当于调用共享变量的notify()方法。调用条件变量的signalAll()方法就相当于调用共享变量的notifyAll()方法。


经过上面解释,知道条件变量是什么,它是用来做什么的了。


在上面代码中,lock.newCondition()的作用其实是new了一个在AQS内部声明的ConditionObject对象,ConditionObject是AQS的内部类,可以访问AQS内部的变量(例如状态变量state)和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。注意这个条件队列和AQS队列不是一回事。


在如下代码中,当线程调用条件变量的await()方法时(必须先调用锁的lock()方法获取锁),在内部会构造一个类型为Node.CONDITION的node节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),并被阻塞挂起。


这时候如果有其他线程调用lock.lock()尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的await()方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在await()方法处阻塞。

   /**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 9 创建新的node节点,并插入到条件队列的对尾   
            Node node = addConditionWaiter();
            // 10 释放当前线程获取的锁 
            int savedState = fullyRelease(node);
            int interruptMode = 0;
      // 11 调用park方法阻塞挂起当前线程 
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

在如下代码中,当另外一个线程调用条件变量的signal方法时(必须先调用锁的lock()方法获取锁),在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并放入AQS的阻塞队列里面,然后激活这个线程。

    /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
              // 将条件队列的队首移动到AQS队列
                doSignal(first);
        }

需要注意的是,AQS只提供了ConditionObject的实现,并没有提供newCondition函数,该函数用来new一个ConditionObject对象。需要由AQS的子类来提供newCondition函数


下面来看当一个线程调用条件变量的await()方法而被阻塞后,如何将其放入条件队列。

     /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 1 
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // 2 
            if (t == null)
                firstWaiter = node;
            else
              // 3 
                t.nextWaiter = node;
            // 4 
            lastWaiter = node;
            return node;
        }


代码(1)首先根据当前线程创建一个类型为Node.CONDITION的节点,然后通过代码(2)(3)(4)在单向条件队列尾部插入一个元素。


注意:当多个线程同时调用lock.lock()方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为Node节点插入到lock锁对应的AQS阻塞队列里面,并做自旋CAS尝试获取锁。


如果获取到锁的线程又调用了对应的条件变量的await()方法,则该线程会释放获取到的锁,并被转换为Node节点插入到条件变量对应的条件队列里面。


这时候因为调用lock.lock()方法被阻塞到AQS队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的await()方法则该线程也会被放入条件变量的条件队列里面。


当另外一个线程调用条件变量的signal()或者signalAll()方法时,会把条件队列里面的一个或者全部Node节点移动到AQS的阻塞队列里面,等待时机获取锁。


最后使用一个图总结如下:一个锁对应一个AQS阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。


e0a7252105ec401997e53e1931aa69a0.png


基于AQS实现自定义同步器


我们基于AQS实现一个不可重入的独占锁,。


自定义AQS需要重写一系列函数,还需要定义原子变量state的含义。这里我们定义,state为0表示目前锁没有被线程持有,state为1表示锁已经被某一个线程持有,由于是不可重入锁,所以不需要记录持有锁的线程获取锁的次数。另外,我们自定义的锁支持条件变量。


【基于AQS实现的不可重入的独占锁】

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/5 22:35
 * @mark: show me the code , change the world
 */
public class NonReentrantLock implements Lock, Serializable  {
    //静态内部类,用于辅助
    private static class Sync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            assert arg == 1;//如果state为0,则尝试获取锁
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        @Override
        protected boolean tryRelease(int arg) {
            assert arg == 1;//如果state为0,则尝试获取锁
            if (getState()==0){
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        @Override
        protected boolean isHeldExclusively() {
            // 是否锁已经被持有
            return getState()==1;
        }
        //提供条件变量接口
        public Condition newCondition(){
            return new ConditionObject();
        }
    }
    Sync sync = new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    @Override
    public void unlock() {
        sync.release(1);
    }
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

在如上代码中,NonReentrantLock定义了一个内部类Sync用来实现具体的锁的操作,Sync则继承了AQS。由于我们实现的是独占模式的锁,所以Sync重写了tryAcquire、tryRelease和isHeldExclusively 3个方法。另外,Sync提供了newCondition这个方法用来支持条件变量。


【使用自定义锁实现生产—消费模型】

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/5 22:59
 * @mark: show me the code , change the world
 */
public class NonReentrantLockTest {
    static NonReentrantLock lock = new NonReentrantLock();
    static Condition notFull = lock.newCondition();
    static Condition notEmpty = lock.newCondition();
    static Queue<String> queue = new LinkedBlockingQueue<>();
    static int queueSize = 10;
    public static void main(String[] args) {
        Thread producer = new Thread(() -> {
            lock.lock();
            try {
                //如果队列满了,则等待
                while (queue.size() == queueSize) {
                    notEmpty.await();
                }
                //添加队列元素
                queue.add("element ");
                //唤醒消费线程
                notFull.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //释放锁
                lock.unlock();
            }
        });
        Thread consumer = new Thread(() -> {
            lock.lock();
            try {
                //队列为空,则等待
                while (queue.size()==0){
                    notFull.await();
                }
                //消费元素
                queue.poll();
                //唤醒生产线程
                notEmpty.signalAll();
            }catch (InterruptedException e){
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });
        producer.start();
        consumer.start();
    }
}


如上代码首先创建了NonReentrantLock的一个对象lock,然后调用lock.newCondition创建了两个条件变量,用来进行生产者和消费者线程之间的同步。


在main 函数里面,首先创建了producer生产线程,在线程内部首先调用lock.lock()获取独占锁,然后判断当前队列是否已经满了,如果满了则调用notEmpty.await()阻塞挂起当前线程。需要注意的是,这里使用while 而不是if是为了避免虚假唤醒。如果队列不满则直接向队列里面添加元素,然后调用notFull.signalAll()唤醒所有因为消费元素而被阻塞的消费线程,最后释放获取的锁。


然后在main函数里面创建了consumer生产线程,在线程内部首先调用lock.lock()获取独占锁,然后判断当前队列里面是不是有元素,如果队列为空则调用notFull.await()阻塞挂起当前线程。需要注意的是,这里使用while 而不是if是为了避免虚假唤醒。如果队列不为空则直接从队列里面获取并移除元素,然后唤醒因为队列满而被阻塞的生产线程,最后释放获取的锁。


相关文章
|
3月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
2月前
|
Java 调度
Java 线程同步的四种方式,最全详解,建议收藏!
本文详细解析了Java线程同步的四种方式:synchronized关键字、ReentrantLock、原子变量和ThreadLocal,通过实例代码和对比分析,帮助你深入理解线程同步机制。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
Java 线程同步的四种方式,最全详解,建议收藏!
|
3月前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
58 1
|
3月前
|
存储 安全 Java
【用Java学习数据结构系列】探索栈和队列的无尽秘密
【用Java学习数据结构系列】探索栈和队列的无尽秘密
39 2
|
4月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
3月前
|
存储 算法 Java
【用Java学习数据结构系列】用堆实现优先级队列
【用Java学习数据结构系列】用堆实现优先级队列
38 0
|
7月前
|
Java C++
关于《Java并发编程之线程池十八问》的补充内容
【6月更文挑战第6天】关于《Java并发编程之线程池十八问》的补充内容
54 5
|
4月前
|
缓存 监控 Java
Java中的并发编程:理解并应用线程池
在Java的并发编程中,线程池是提高应用程序性能的关键工具。本文将深入探讨如何有效利用线程池来管理资源、提升效率和简化代码结构。我们将从基础概念出发,逐步介绍线程池的配置、使用场景以及最佳实践,帮助开发者更好地掌握并发编程的核心技巧。
|
6月前
|
安全 Java 开发者
Java中的并发编程:深入理解线程池
在Java的并发编程中,线程池是管理资源和任务执行的核心。本文将揭示线程池的内部机制,探讨如何高效利用这一工具来优化程序的性能与响应速度。通过具体案例分析,我们将学习如何根据不同的应用场景选择合适的线程池类型及其参数配置,以及如何避免常见的并发陷阱。
63 1
|
6月前
|
监控 Java
Java并发编程:深入理解线程池
在Java并发编程领域,线程池是提升应用性能和资源管理效率的关键工具。本文将深入探讨线程池的工作原理、核心参数配置以及使用场景,通过具体案例展示如何有效利用线程池优化多线程应用的性能。