Java Review - 并发编程_抽象同步队列AQS(下)

简介: Java Review - 并发编程_抽象同步队列AQS(下)

维护AQS提供的队列 - 入队操作


最后,我们来看看如何维护AQS提供的队列,主要看入队操作。


入队操作: 当一个线程获取锁失败后该线程会被转换为Node节点,然后就会使用enq(final Node node)方法将该节点插入到AQS的阻塞队列.

 /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail; // 1 
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))// 2
                    tail = head;
            } else {
                node.prev = t; // 3 
                if (compareAndSetTail(t, node)) { // 4 
                    t.next = node;
                    return t;
                }
            }
        }
    }


下面结合代码和节点图来讲解入队的过程。

【第一次循环】

  • 如上代码在第一次循环中,当要在AQS队列尾部插入元素时,AQS队列状态如下所示


a01b8af981dd462a9208ad559fa89790.png


也就是队列头、尾节点都指向null;

  • 当执行代码(1)后节点t指向了尾部节点,这时候队列状态如下图所示。

2c6cb7d03d3d4fb98ea235c7e45061a8.png

  • 这时候t为null,故执行代码(2),使用CAS算法设置一个哨兵节点为头节点,如果CAS设置成功,则让尾部节点也指向哨兵节点,这时候队列状态如下图所示



dfa36be10b674be698d3bda71d119be3.png



【第二次循环】

  • 到现在为止只插入了一个哨兵节点,还需要插入node节点,所以在第二次循环后执行到代码(1),这时候队列状态如下图所示


47176dd6c62f46cfb344f3cf03176908.png

  • 然后执行代码(3)设置node(入参)的前驱节点为尾部节点,这时候队列状态如下图所示


ec4e0c56b0864be08d55fd5dc6a826a7.png


  • 然后通过CAS算法设置node节点为尾部节点,CAS成功后队列状态如下图所示


84647f92448743489b952a67130164b9.png



CAS成功后再设置原来的尾部节点的后驱节点为node,这时候就完成了双向链表的插入,此时队列状态如下图所示。


cfe49b997ff24e3b8eae28892420cc07.png

AQS——条件变量的支持


我们知道notify和wait,是配合synchronized内置锁实现线程间同步的基础设施一样,条件变量的signal和await方法也是用来配合锁(使用AQS实现的锁)实现线程间同步的基础设施。


它们的不同在于,synchronized同时只能与一个共享变量的notify或wait方法实现同步,而AQS的一个锁可以对应多个条件变量。


在调用共享变量的notify和wait方法前必须先获取该共享变量的内置锁,同理,在调用条件变量的signal和await方法前也必须先获取条件变量对应的锁。


那么,到底什么是条件变量呢?如何使用呢?不急,下面看一个例子。

      // 1
        ReentrantLock lock = new ReentrantLock();
        // 2
        Condition condition = lock.newCondition();
        // 3
        lock.lock();
        try {
            System.out.println("begin wait");
            // 4
            condition.await();
            System.out.println("end wait");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 5
            lock.unlock();
        }
        // 6
        lock.lock();
        try {
            System.out.println("begin single");
            // 7
            condition.signal();
            System.out.println("end single");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 8
            lock.unlock();
        }


代码(1)创建了一个独占锁ReentrantLock对象,ReentrantLock是基于AQS实现的锁。


代码(2)使用创建的Lock对象的newCondition()方法创建了一个ConditionObject变量,这个变量就是Lock锁对应的一个条件变量。需要注意的是,一个Lock对象可以创建多个条件变量。


代码(3)首先获取了独占锁


代码(4)则调用了条件变量的await()方法阻塞挂起了当前线程。 当其他线程调用条件变量的signal方法时,被阻塞的线程才会从await处返回。需要注意的是,和调用Object的wait方法一样,如果在没有获取到锁前调用了条件变量的await方法则会抛出 java.lang.IllegalMonitorStateException异常。


代码(5)则释放了获取的锁。


其实这里的Lock对象等价于synchronized加上共享变量,调用lock.lock()方法就相当于进入了synchronized块(获取了共享变量的内置锁),调用lock.unLock()方法就相当于退出synchronized块。 调用条件变量的await()方法就相当于调用共享变量的wait()方法,调用条件变量的signal方法就相当于调用共享变量的notify()方法。调用条件变量的signalAll()方法就相当于调用共享变量的notifyAll()方法。


经过上面解释,知道条件变量是什么,它是用来做什么的了。


在上面代码中,lock.newCondition()的作用其实是new了一个在AQS内部声明的ConditionObject对象,ConditionObject是AQS的内部类,可以访问AQS内部的变量(例如状态变量state)和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。注意这个条件队列和AQS队列不是一回事。


在如下代码中,当线程调用条件变量的await()方法时(必须先调用锁的lock()方法获取锁),在内部会构造一个类型为Node.CONDITION的node节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),并被阻塞挂起。


这时候如果有其他线程调用lock.lock()尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的await()方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在await()方法处阻塞。

   /**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 9 创建新的node节点,并插入到条件队列的对尾   
            Node node = addConditionWaiter();
            // 10 释放当前线程获取的锁 
            int savedState = fullyRelease(node);
            int interruptMode = 0;
      // 11 调用park方法阻塞挂起当前线程 
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

在如下代码中,当另外一个线程调用条件变量的signal方法时(必须先调用锁的lock()方法获取锁),在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并放入AQS的阻塞队列里面,然后激活这个线程。

    /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
              // 将条件队列的队首移动到AQS队列
                doSignal(first);
        }

需要注意的是,AQS只提供了ConditionObject的实现,并没有提供newCondition函数,该函数用来new一个ConditionObject对象。需要由AQS的子类来提供newCondition函数


下面来看当一个线程调用条件变量的await()方法而被阻塞后,如何将其放入条件队列。

     /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 1 
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // 2 
            if (t == null)
                firstWaiter = node;
            else
              // 3 
                t.nextWaiter = node;
            // 4 
            lastWaiter = node;
            return node;
        }


代码(1)首先根据当前线程创建一个类型为Node.CONDITION的节点,然后通过代码(2)(3)(4)在单向条件队列尾部插入一个元素。


注意:当多个线程同时调用lock.lock()方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为Node节点插入到lock锁对应的AQS阻塞队列里面,并做自旋CAS尝试获取锁。


如果获取到锁的线程又调用了对应的条件变量的await()方法,则该线程会释放获取到的锁,并被转换为Node节点插入到条件变量对应的条件队列里面。


这时候因为调用lock.lock()方法被阻塞到AQS队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的await()方法则该线程也会被放入条件变量的条件队列里面。


当另外一个线程调用条件变量的signal()或者signalAll()方法时,会把条件队列里面的一个或者全部Node节点移动到AQS的阻塞队列里面,等待时机获取锁。


最后使用一个图总结如下:一个锁对应一个AQS阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。


e0a7252105ec401997e53e1931aa69a0.png


基于AQS实现自定义同步器


我们基于AQS实现一个不可重入的独占锁,。


自定义AQS需要重写一系列函数,还需要定义原子变量state的含义。这里我们定义,state为0表示目前锁没有被线程持有,state为1表示锁已经被某一个线程持有,由于是不可重入锁,所以不需要记录持有锁的线程获取锁的次数。另外,我们自定义的锁支持条件变量。


【基于AQS实现的不可重入的独占锁】

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/5 22:35
 * @mark: show me the code , change the world
 */
public class NonReentrantLock implements Lock, Serializable  {
    //静态内部类,用于辅助
    private static class Sync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            assert arg == 1;//如果state为0,则尝试获取锁
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        @Override
        protected boolean tryRelease(int arg) {
            assert arg == 1;//如果state为0,则尝试获取锁
            if (getState()==0){
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        @Override
        protected boolean isHeldExclusively() {
            // 是否锁已经被持有
            return getState()==1;
        }
        //提供条件变量接口
        public Condition newCondition(){
            return new ConditionObject();
        }
    }
    Sync sync = new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    @Override
    public void unlock() {
        sync.release(1);
    }
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

在如上代码中,NonReentrantLock定义了一个内部类Sync用来实现具体的锁的操作,Sync则继承了AQS。由于我们实现的是独占模式的锁,所以Sync重写了tryAcquire、tryRelease和isHeldExclusively 3个方法。另外,Sync提供了newCondition这个方法用来支持条件变量。


【使用自定义锁实现生产—消费模型】

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/5 22:59
 * @mark: show me the code , change the world
 */
public class NonReentrantLockTest {
    static NonReentrantLock lock = new NonReentrantLock();
    static Condition notFull = lock.newCondition();
    static Condition notEmpty = lock.newCondition();
    static Queue<String> queue = new LinkedBlockingQueue<>();
    static int queueSize = 10;
    public static void main(String[] args) {
        Thread producer = new Thread(() -> {
            lock.lock();
            try {
                //如果队列满了,则等待
                while (queue.size() == queueSize) {
                    notEmpty.await();
                }
                //添加队列元素
                queue.add("element ");
                //唤醒消费线程
                notFull.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //释放锁
                lock.unlock();
            }
        });
        Thread consumer = new Thread(() -> {
            lock.lock();
            try {
                //队列为空,则等待
                while (queue.size()==0){
                    notFull.await();
                }
                //消费元素
                queue.poll();
                //唤醒生产线程
                notEmpty.signalAll();
            }catch (InterruptedException e){
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });
        producer.start();
        consumer.start();
    }
}


如上代码首先创建了NonReentrantLock的一个对象lock,然后调用lock.newCondition创建了两个条件变量,用来进行生产者和消费者线程之间的同步。


在main 函数里面,首先创建了producer生产线程,在线程内部首先调用lock.lock()获取独占锁,然后判断当前队列是否已经满了,如果满了则调用notEmpty.await()阻塞挂起当前线程。需要注意的是,这里使用while 而不是if是为了避免虚假唤醒。如果队列不满则直接向队列里面添加元素,然后调用notFull.signalAll()唤醒所有因为消费元素而被阻塞的消费线程,最后释放获取的锁。


然后在main函数里面创建了consumer生产线程,在线程内部首先调用lock.lock()获取独占锁,然后判断当前队列里面是不是有元素,如果队列为空则调用notFull.await()阻塞挂起当前线程。需要注意的是,这里使用while 而不是if是为了避免虚假唤醒。如果队列不为空则直接从队列里面获取并移除元素,然后唤醒因为队列满而被阻塞的生产线程,最后释放获取的锁。


相关文章
|
7天前
|
缓存 监控 Java
Java中的并发编程:理解并应用线程池
在Java的并发编程中,线程池是提高应用程序性能的关键工具。本文将深入探讨如何有效利用线程池来管理资源、提升效率和简化代码结构。我们将从基础概念出发,逐步介绍线程池的配置、使用场景以及最佳实践,帮助开发者更好地掌握并发编程的核心技巧。
|
8天前
|
安全 Java 测试技术
掌握Java的并发编程:解锁高效代码的秘密
在Java的世界里,并发编程就像是一场精妙的舞蹈,需要精准的步伐和和谐的节奏。本文将带你走进Java并发的世界,从基础概念到高级技巧,一步步揭示如何编写高效、稳定的并发代码。让我们一起探索线程池的奥秘、同步机制的智慧,以及避免常见陷阱的策略。
|
19天前
|
Java 程序员
从0到1,手把手教你玩转Java多线程同步!
从0到1,手把手教你玩转Java多线程同步!
16 3
|
19天前
|
Java 测试技术
Java多线程同步实战:从synchronized到Lock的进化之路!
Java多线程同步实战:从synchronized到Lock的进化之路!
78 1
|
17天前
|
C# 开发者 数据处理
WPF开发者必备秘籍:深度解析数据网格最佳实践,轻松玩转数据展示与编辑大揭秘!
【8月更文挑战第31天】数据网格控件是WPF应用程序中展示和编辑数据的关键组件,提供排序、筛选等功能,显著提升用户体验。本文探讨WPF中数据网格的最佳实践,通过DevExpress DataGrid示例介绍其集成方法,包括添加引用、定义数据模型及XAML配置。通过遵循数据绑定、性能优化、自定义列等最佳实践,可大幅提升数据处理效率和用户体验。
34 0
|
17天前
|
开发者 C# 存储
WPF开发者必读:资源字典应用秘籍,轻松实现样式与模板共享,让你的WPF应用更上一层楼!
【8月更文挑战第31天】在WPF开发中,资源字典是一种强大的工具,用于共享样式、模板、图像等资源,提高了应用的可维护性和可扩展性。本文介绍了资源字典的基础知识、创建方法及最佳实践,并通过示例展示了如何在项目中有效利用资源字典,实现资源的重用和动态绑定。
34 0
|
17天前
|
开发者 Java Spring
【绝技揭秘】掌握Vaadin数据绑定:一键同步Java对象,告别手动数据烦恼,轻松玩转Web应用开发!
【8月更文挑战第31天】Vaadin不仅是一个功能丰富的Java Web应用框架,还提供了强大的数据绑定机制,使开发者能轻松连接UI组件与后端Java对象,简化Web应用开发流程。本文通过创建一个简单的用户信息表单示例,详细介绍了如何使用Vaadin的`Binder`类实现数据绑定,包括字段与模型属性的双向绑定及数据验证。通过这个示例,开发者可以更专注于业务逻辑而非繁琐的数据同步工作,提高开发效率和应用可维护性。
38 0
|
机器学习/深度学习 Java 程序员
Java Review(三十二、异常处理)
Java Review(三十二、异常处理)
123 0
Java Review(三十二、异常处理)
|
XML 存储 Java
Java Review(三十三、异常处理----补充:断言、日志、调试)
Java Review(三十三、异常处理----补充:断言、日志、调试)
163 0
|
3天前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)