【Java并发编程实战14】构建自定义同步工具(Building-Custom-Synchronizers)(下)

简介: JDK包含许多存在状态依赖的类,例如FutureTask、Semaphore和BlockingQueue,他们的一些操作都有前提条件,例如非空、任务已完成等。

5 AbstractQueuedSynchronizer (AQS)

基于AQS构建的同步器类中,最进步的操作包括各种形式的获取操作和释放操作。获取操作是一种依赖状态的操作,并且通常会阻塞。


如果一个类想成为状态依赖的类,它必须拥有一些状态,AQS负责管理这些状态,通过getState,setState, compareAndSetState等protected类型方法进行操作。这是设计模式中的模板模式。


使用AQS的模板如下:


获取锁:首先判断当前状态是否允许获取锁,如果是就获取锁,否则就阻塞操作或者获取失败,也就是说如果是独占锁就可能阻塞,如果是共享锁就可能失败。另外如果是阻塞线程,那么线程就需要进入阻塞队列。当状态位允许获取锁时就修改状态,并且如果进了队列就从队列中移除。


释放锁:这个过程就是修改状态位,如果有线程因为状态位阻塞的话就唤醒队列中的一个或者更多线程。

boolean acquire() throws InterruptedException {
 while (state does not permit acquire) {
 if (blocking acquisition requested) {
 enqueue current thread if not already queued
 block current thread
 }
 else
 return failure
 }
 possibly update synchronization state
 dequeue thread if it was queued
 return success
}
void release() {
 update synchronization state
 if (new state may permit a blocked thread to acquire)
 unblock one or more queued threads
}

要支持上面两个操作就必须有下面的条件:

  • 原子性操作同步器的状态位
  • 阻塞和唤醒线程
  • 一个有序的队列


1 状态位的原子操作

这里使用一个32位的整数来描述状态位,前面章节的原子操作的理论知识整好派上用场,在这里依然使用CAS操作来解决这个问题。事实上这里还有一个64位版本的同步器(AbstractQueuedLongSynchronizer),这里暂且不谈。

2 阻塞和唤醒线程

标准的JAVA API里面是无法挂起(阻塞)一个线程,然后在将来某个时刻再唤醒它的。JDK 1.0的API里面有Thread.suspend和Thread.resume,并且一直延续了下来。但是这些都是过时的API,而且也是不推荐的做法。


HotSpot在Linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。


在JDK 5.0以后利用JNI在LockSupport类中实现了此特性。



LockSupport.park() LockSupport.park(Object) LockSupport.parkNanos(Object, long) LockSupport.parkNanos(long) LockSupport.parkUntil(Object, long) LockSupport.parkUntil(long) LockSupport.unpark(Thread)


上面的API中park()是在当前线程中调用,导致线程阻塞,带参数的Object是挂起的对象,这样监视的时候就能够知道此线程是因为什么资源而阻塞的。由于park()立即返回,所以通常情况下需要在循环中去检测竞争资源来决定是否进行下一次阻塞。park()返回的原因有三:


  • 其他某个线程调用将当前线程作为目标调用unpark
  • 其他某个线程中断当前线程;
  • 该调用不合逻辑地(即毫无理由地)返回。


其实第三条就决定了需要循环检测了,类似于通常写的while(checkCondition()){Thread.sleep(time);}类似的功能。

3 有序队列

在AQS中采用CHL列表来解决有序的队列的问题。

AQS采用的CHL模型采用下面的算法完成FIFO的入队列和出队列过程。该队列的操作均通过Lock-Free(CAS)操作.

自己实现的CLH SpinLock如下:

class ClhSpinLock {
    private final ThreadLocal<Node> prev;
    private final ThreadLocal<Node> node;
    private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());
    public ClhSpinLock() {
        this.node = new ThreadLocal<Node>() {
            protected Node initialValue() {
                return new Node();
            }
        };
        this.prev = new ThreadLocal<Node>() {
            protected Node initialValue() {
                return null;
            }
        };
    }
    public void lock() {
        final Node node = this.node.get();
        node.locked = true;
        // 一个CAS操作即可将当前线程对应的节点加入到队列中,
        // 并且同时获得了前继节点的引用,然后就是等待前继释放锁
        Node pred = this.tail.getAndSet(node);
        this.prev.set(pred);
        while (pred.locked) {// 进入自旋
        }
    }
    public void unlock() {
        final Node node = this.node.get();
        node.locked = false;
        this.node.set(this.prev.get());
    }
    private static class Node {
        private volatile boolean locked;
    }
}

对于入队列(*enqueue):*采用CAS操作,每次比较尾结点是否一致,然后插入的到尾结点中。

do {
        pred = tail;
}while ( !compareAndSet(pred,tail,node) );

对于出队列(dequeue):由于每一个节点也缓存了一个状态,决定是否出队列,因此当不满足条件时就需要自旋等待,一旦满足条件就将头结点设置为下一个节点。

AQS里面有三个核心字段:


private volatile int state;

private transient volatile Node head;

private transient volatile Node tail;


其中state描述的有多少个线程取得了锁,对于互斥锁来说state<=1。head/tail加上CAS操作就构成了一个CHL的FIFO队列。下面是Node节点的属性。


独占操作的API都是不带有shared,而共享的包括semaphore和countdownlatch都是使用带有shared字面的API。


一些有用的参考资料:


**java.util.concurrent.locks.AbstractQueuedSynchronizer - **AQS


http://gee.cs.oswego.edu/dl/papers/aqs.pdf论文


http://www.blogjava.net/xylz/archive/2010/07/08/325587.html 一个比较全面的另外一个人的解读


http://suo.iteye.com/blog/1329460


http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer


http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-overview.html


http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-clh-and-spin-lock.html


http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-locksupport-and-thread-interrupt.html

独占的就用TRyAcquire, TRyRelease, and isHeldExclusively,共享的就用 tryAcquireShared and TRyReleaseShared. 带有try前缀的方法都是模板方法,AQS用于判断是否可以继续,例如如果tryAcquireShared返回一个负值,那么表示获取锁失败,失败的就需要进入CLH队列,并且挂起线程。


举一个例子,一个简单的闭锁:

@ThreadSafe
public class OneShotLatch {
    private final Sync sync = new Sync();
    public void signal() {
        sync.releaseShared(0);
    }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(0);
    }
    private class Sync extends AbstractQueuedSynchronizer {
        protected int tryAcquireShared(int ignored) {
            // Succeed if latch is open (state == 1), else fail
            return (getState() == 1) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int ignored) {
            setState(1); // Latch is now open
            return true; // Other threads may now be able to acquire
        }
    }
}

下面是自己实现的一个Mutex。

/**
 * Lock free的互斥锁,简单实现,不可重入锁
 */
public class Mutex implements Lock {
    private static final int FREE = 0;
    private static final int BUSY = 1;
    private static class LockSync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4689388770786922019L;
        protected boolean isHeldExclusively() {
            return getState() == BUSY;
        }
        public boolean tryAcquire(int acquires) {
            return compareAndSetState(FREE, BUSY);
        }
        protected boolean tryRelease(int releases) {
            if (getState() == FREE) {
                throw new IllegalMonitorStateException();
            }
            setState(FREE);
            return true;
        }
        Condition newCondition() {
            return new ConditionObject();
        }
    }
    private final LockSync sync = new LockSync();
    public void lock() {
        sync.acquire(0);
    }
    public boolean tryLock() {
        return sync.tryAcquire(0);
    }
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    public void unlock() {
        sync.release(0);
    }
    public Condition newCondition() {
        return sync.newCondition();
    }
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(0);
    }
}

6 AQS实现类

ReentrantLock

protected boolean tryAcquire(int ignored) {
 final Thread current = Thread.currentThread();
 int c = getState();
 if (c == 0) {
 if (compareAndSetState(0, 1)) {
 owner = current;
 return true;
 }
 } else if (current == owner) {
 setState(c+1);
 return true;
 }
 return false;
} 

Semaphore和CountDownLatch

protected int tryAcquireShared(int acquires) {
 while (true) {
 int available = getState();
 int remaining = available - acquires;
 if (remaining < 0
 || compareAndSetState(available, remaining))
 return remaining;
 }
}
protected boolean tryReleaseShared(int releases) {
 while (true) {
 int p = getState();
 if (compareAndSetState(p, p + releases))
 return true;
 }
} 
目录
相关文章
|
5天前
|
监控 Java Unix
6个Java 工具,轻松分析定位 JVM 问题 !
本文介绍了如何使用 JDK 自带工具查看和分析 JVM 的运行情况。通过编写一段测试代码(启动 10 个死循环线程,分配大量内存),结合常用工具如 `jps`、`jinfo`、`jstat`、`jstack`、`jvisualvm` 和 `jcmd` 等,详细展示了 JVM 参数配置、内存使用、线程状态及 GC 情况的监控方法。同时指出了一些常见问题,例如参数设置错误导致的内存异常,并通过实例说明了如何排查和解决。最后附上了官方文档链接,方便进一步学习。
|
1月前
|
安全 Java 程序员
《从头开始学java,一天一个知识点》之:控制流程:if-else条件语句实战
**你是否也经历过这些崩溃瞬间?** - 看了三天教程,连`i++`和`++i`的区别都说不清 - 面试时被追问&quot;`a==b`和`equals()`的区别&quot;,大脑突然空白 - 写出的代码总是莫名报NPE,却不知道问题出在哪个运算符 这个系列为你打造Java「速效救心丸」!每天1分钟,地铁通勤、午休间隙即可完成学习。直击高频考点和实际开发中的「坑位」,拒绝冗长概念,每篇都有可运行的代码示例。明日预告:《for与while循环的使用场景》。 ---
63 19
|
1月前
|
消息中间件 Java 应用服务中间件
JVM实战—1.Java代码的运行原理
本文介绍了Java代码的运行机制、JVM类加载机制、JVM内存区域及其作用、垃圾回收机制,并汇总了一些常见问题。
JVM实战—1.Java代码的运行原理
|
1月前
|
Java 数据库
【YashanDB知识库】kettle同步大表提示java内存溢出
在数据导入导出场景中,使用Kettle进行大表数据同步时出现“ERROR:could not create the java virtual machine!”问题,原因为Java内存溢出。解决方法包括:1) 编辑Spoon.bat增大JVM堆内存至2GB;2) 优化Kettle转换流程,如调整批量大小、精简步骤;3) 合理设置并行线程数(PARALLELISM参数)。此问题影响所有版本,需根据实际需求调整相关参数以避免内存不足。
|
1月前
|
机器学习/深度学习 人工智能 Java
Java机器学习实战:基于DJL框架的手写数字识别全解析
在人工智能蓬勃发展的今天,Python凭借丰富的生态库(如TensorFlow、PyTorch)成为AI开发的首选语言。但Java作为企业级应用的基石,其在生产环境部署、性能优化和工程化方面的优势不容忽视。DJL(Deep Java Library)的出现完美填补了Java在深度学习领域的空白,它提供了一套统一的API,允许开发者无缝对接主流深度学习框架,将AI模型高效部署到Java生态中。本文将通过手写数字识别的完整流程,深入解析DJL框架的核心机制与应用实践。
89 3
|
2月前
|
安全 Java 开发者
Java并发迷宫:同步的魔法与死锁的诅咒
在Java并发编程中,合理使用同步机制可以确保线程安全,避免数据不一致的问题。然而,必须警惕死锁的出现,采取适当的预防措施。通过理解同步的原理和死锁的成因,并应用有效的设计和编码实践,可以构建出高效、健壮的多线程应用程序。
53 21
|
1月前
|
存储 Java 编译器
课时11:综合实战:简单Java类
本次分享的主题是综合实战:简单 Java 类。主要分为两个部分: 1.简单 Java 类的含义 2.简单 Java 类的开发
|
2月前
|
监控 前端开发 Java
构建高效Java后端与前端交互的定时任务调度系统
通过以上步骤,我们构建了一个高效的Java后端与前端交互的定时任务调度系统。该系统使用Spring Boot作为后端框架,Quartz作为任务调度器,并通过前端界面实现用户交互。此系统可以应用于各种需要定时任务调度的业务场景,如数据同步、报告生成和系统监控等。
82 9
|
1月前
|
Oracle Java 关系型数据库
课时37:综合实战:数据表与简单Java类映射转换
今天我分享的是数据表与简单 Java 类映射转换,主要分为以下四部分。 1. 映射关系基础 2. 映射步骤方法 3. 项目对象配置 4. 数据获取与调试
|
2月前
|
Java Shell 数据库
【YashanDB 知识库】kettle 同步大表提示 java 内存溢出
【问题分类】数据导入导出 【关键字】数据同步,kettle,数据迁移,java 内存溢出 【问题描述】kettle 同步大表提示 ERROR:could not create the java virtual machine! 【问题原因分析】java 内存溢出 【解决/规避方法】 ①增加 JVM 的堆内存大小。编辑 Spoon.bat,增加堆大小到 2GB,如: if "%PENTAHO_DI_JAVA_OPTIONS%"=="" set PENTAHO_DI_JAVA_OPTIONS="-Xms512m" "-Xmx512m" "-XX:MaxPermSize=256m" "-