【Java并发编程实战14】构建自定义同步工具(Building-Custom-Synchronizers)(下)

简介: JDK包含许多存在状态依赖的类,例如FutureTask、Semaphore和BlockingQueue,他们的一些操作都有前提条件,例如非空、任务已完成等。

5 AbstractQueuedSynchronizer (AQS)

基于AQS构建的同步器类中,最进步的操作包括各种形式的获取操作和释放操作。获取操作是一种依赖状态的操作,并且通常会阻塞。


如果一个类想成为状态依赖的类,它必须拥有一些状态,AQS负责管理这些状态,通过getState,setState, compareAndSetState等protected类型方法进行操作。这是设计模式中的模板模式。


使用AQS的模板如下:


获取锁:首先判断当前状态是否允许获取锁,如果是就获取锁,否则就阻塞操作或者获取失败,也就是说如果是独占锁就可能阻塞,如果是共享锁就可能失败。另外如果是阻塞线程,那么线程就需要进入阻塞队列。当状态位允许获取锁时就修改状态,并且如果进了队列就从队列中移除。


释放锁:这个过程就是修改状态位,如果有线程因为状态位阻塞的话就唤醒队列中的一个或者更多线程。

boolean acquire() throws InterruptedException {
 while (state does not permit acquire) {
 if (blocking acquisition requested) {
 enqueue current thread if not already queued
 block current thread
 }
 else
 return failure
 }
 possibly update synchronization state
 dequeue thread if it was queued
 return success
}
void release() {
 update synchronization state
 if (new state may permit a blocked thread to acquire)
 unblock one or more queued threads
}

要支持上面两个操作就必须有下面的条件:

  • 原子性操作同步器的状态位
  • 阻塞和唤醒线程
  • 一个有序的队列


1 状态位的原子操作

这里使用一个32位的整数来描述状态位,前面章节的原子操作的理论知识整好派上用场,在这里依然使用CAS操作来解决这个问题。事实上这里还有一个64位版本的同步器(AbstractQueuedLongSynchronizer),这里暂且不谈。

2 阻塞和唤醒线程

标准的JAVA API里面是无法挂起(阻塞)一个线程,然后在将来某个时刻再唤醒它的。JDK 1.0的API里面有Thread.suspend和Thread.resume,并且一直延续了下来。但是这些都是过时的API,而且也是不推荐的做法。


HotSpot在Linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。


在JDK 5.0以后利用JNI在LockSupport类中实现了此特性。



LockSupport.park() LockSupport.park(Object) LockSupport.parkNanos(Object, long) LockSupport.parkNanos(long) LockSupport.parkUntil(Object, long) LockSupport.parkUntil(long) LockSupport.unpark(Thread)


上面的API中park()是在当前线程中调用,导致线程阻塞,带参数的Object是挂起的对象,这样监视的时候就能够知道此线程是因为什么资源而阻塞的。由于park()立即返回,所以通常情况下需要在循环中去检测竞争资源来决定是否进行下一次阻塞。park()返回的原因有三:


  • 其他某个线程调用将当前线程作为目标调用unpark
  • 其他某个线程中断当前线程;
  • 该调用不合逻辑地(即毫无理由地)返回。


其实第三条就决定了需要循环检测了,类似于通常写的while(checkCondition()){Thread.sleep(time);}类似的功能。

3 有序队列

在AQS中采用CHL列表来解决有序的队列的问题。

AQS采用的CHL模型采用下面的算法完成FIFO的入队列和出队列过程。该队列的操作均通过Lock-Free(CAS)操作.

自己实现的CLH SpinLock如下:

class ClhSpinLock {
    private final ThreadLocal<Node> prev;
    private final ThreadLocal<Node> node;
    private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());
    public ClhSpinLock() {
        this.node = new ThreadLocal<Node>() {
            protected Node initialValue() {
                return new Node();
            }
        };
        this.prev = new ThreadLocal<Node>() {
            protected Node initialValue() {
                return null;
            }
        };
    }
    public void lock() {
        final Node node = this.node.get();
        node.locked = true;
        // 一个CAS操作即可将当前线程对应的节点加入到队列中,
        // 并且同时获得了前继节点的引用,然后就是等待前继释放锁
        Node pred = this.tail.getAndSet(node);
        this.prev.set(pred);
        while (pred.locked) {// 进入自旋
        }
    }
    public void unlock() {
        final Node node = this.node.get();
        node.locked = false;
        this.node.set(this.prev.get());
    }
    private static class Node {
        private volatile boolean locked;
    }
}

对于入队列(*enqueue):*采用CAS操作,每次比较尾结点是否一致,然后插入的到尾结点中。

do {
        pred = tail;
}while ( !compareAndSet(pred,tail,node) );

对于出队列(dequeue):由于每一个节点也缓存了一个状态,决定是否出队列,因此当不满足条件时就需要自旋等待,一旦满足条件就将头结点设置为下一个节点。

AQS里面有三个核心字段:


private volatile int state;

private transient volatile Node head;

private transient volatile Node tail;


其中state描述的有多少个线程取得了锁,对于互斥锁来说state<=1。head/tail加上CAS操作就构成了一个CHL的FIFO队列。下面是Node节点的属性。


独占操作的API都是不带有shared,而共享的包括semaphore和countdownlatch都是使用带有shared字面的API。


一些有用的参考资料:


**java.util.concurrent.locks.AbstractQueuedSynchronizer - **AQS


http://gee.cs.oswego.edu/dl/papers/aqs.pdf论文


http://www.blogjava.net/xylz/archive/2010/07/08/325587.html 一个比较全面的另外一个人的解读


http://suo.iteye.com/blog/1329460


http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer


http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-overview.html


http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-clh-and-spin-lock.html


http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-locksupport-and-thread-interrupt.html

独占的就用TRyAcquire, TRyRelease, and isHeldExclusively,共享的就用 tryAcquireShared and TRyReleaseShared. 带有try前缀的方法都是模板方法,AQS用于判断是否可以继续,例如如果tryAcquireShared返回一个负值,那么表示获取锁失败,失败的就需要进入CLH队列,并且挂起线程。


举一个例子,一个简单的闭锁:

@ThreadSafe
public class OneShotLatch {
    private final Sync sync = new Sync();
    public void signal() {
        sync.releaseShared(0);
    }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(0);
    }
    private class Sync extends AbstractQueuedSynchronizer {
        protected int tryAcquireShared(int ignored) {
            // Succeed if latch is open (state == 1), else fail
            return (getState() == 1) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int ignored) {
            setState(1); // Latch is now open
            return true; // Other threads may now be able to acquire
        }
    }
}

下面是自己实现的一个Mutex。

/**
 * Lock free的互斥锁,简单实现,不可重入锁
 */
public class Mutex implements Lock {
    private static final int FREE = 0;
    private static final int BUSY = 1;
    private static class LockSync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4689388770786922019L;
        protected boolean isHeldExclusively() {
            return getState() == BUSY;
        }
        public boolean tryAcquire(int acquires) {
            return compareAndSetState(FREE, BUSY);
        }
        protected boolean tryRelease(int releases) {
            if (getState() == FREE) {
                throw new IllegalMonitorStateException();
            }
            setState(FREE);
            return true;
        }
        Condition newCondition() {
            return new ConditionObject();
        }
    }
    private final LockSync sync = new LockSync();
    public void lock() {
        sync.acquire(0);
    }
    public boolean tryLock() {
        return sync.tryAcquire(0);
    }
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    public void unlock() {
        sync.release(0);
    }
    public Condition newCondition() {
        return sync.newCondition();
    }
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(0);
    }
}

6 AQS实现类

ReentrantLock

protected boolean tryAcquire(int ignored) {
 final Thread current = Thread.currentThread();
 int c = getState();
 if (c == 0) {
 if (compareAndSetState(0, 1)) {
 owner = current;
 return true;
 }
 } else if (current == owner) {
 setState(c+1);
 return true;
 }
 return false;
} 

Semaphore和CountDownLatch

protected int tryAcquireShared(int acquires) {
 while (true) {
 int available = getState();
 int remaining = available - acquires;
 if (remaining < 0
 || compareAndSetState(available, remaining))
 return remaining;
 }
}
protected boolean tryReleaseShared(int releases) {
 while (true) {
 int p = getState();
 if (compareAndSetState(p, p + releases))
 return true;
 }
} 
目录
相关文章
|
3天前
|
人工智能 缓存 监控
使用LangChain4j构建Java AI智能体:让大模型学会使用工具
AI智能体是大模型技术的重要演进方向,它使模型能够主动使用工具、与环境交互,以完成复杂任务。本文详细介绍如何在Java应用中,借助LangChain4j框架构建一个具备工具使用能力的AI智能体。我们将创建一个能够进行数学计算和实时信息查询的智能体,涵盖工具定义、智能体组装、记忆管理以及Spring Boot集成等关键步骤,并展示如何通过简单的对话界面与智能体交互。
68 1
|
6天前
|
人工智能 Java API
构建基于Java的AI智能体:使用LangChain4j与Spring AI实现RAG应用
当大模型需要处理私有、实时的数据时,检索增强生成(RAG)技术成为了核心解决方案。本文深入探讨如何在Java生态中构建具备RAG能力的AI智能体。我们将介绍新兴的Spring AI项目与成熟的LangChain4j框架,详细演示如何从零开始构建一个能够查询私有知识库的智能问答系统。内容涵盖文档加载与分块、向量数据库集成、语义检索以及与大模型的最终合成,并提供完整的代码实现,为Java开发者开启构建复杂AI智能体的大门。
216 1
|
7天前
|
人工智能 Java API
Java与大模型集成实战:构建智能Java应用的新范式
随着大型语言模型(LLM)的API化,将其强大的自然语言处理能力集成到现有Java应用中已成为提升应用智能水平的关键路径。本文旨在为Java开发者提供一份实用的集成指南。我们将深入探讨如何使用Spring Boot 3框架,通过HTTP客户端与OpenAI GPT(或兼容API)进行高效、安全的交互。内容涵盖项目依赖配置、异步非阻塞的API调用、请求与响应的结构化处理、异常管理以及一些面向生产环境的最佳实践,并附带完整的代码示例,助您快速将AI能力融入Java生态。
114 12
|
10天前
|
SQL Java 数据库
2025 年 Java 从零基础小白到编程高手的详细学习路线攻略
2025年Java学习路线涵盖基础语法、面向对象、数据库、JavaWeb、Spring全家桶、分布式、云原生与高并发技术,结合实战项目与源码分析,助力零基础学员系统掌握Java开发技能,从入门到精通,全面提升竞争力,顺利进阶编程高手。
180 1
|
11天前
|
Java 开发者
Java并发编程:CountDownLatch实战解析
Java并发编程:CountDownLatch实战解析
283 100
|
13天前
|
安全 Java API
使用 Java 构建强大的 REST API 的四个基本技巧
本文结合探险领域案例,分享Java构建REST API的四大核心策略:统一资源命名、版本控制与自动化文档、安全防护及标准化异常处理,助力开发者打造易用、可维护、安全可靠的稳健API服务。
86 2
|
21天前
|
NoSQL Java 关系型数据库
超全 Java 学习路线,帮你系统掌握编程的超详细 Java 学习路线
本文为超全Java学习路线,涵盖基础语法、面向对象编程、数据结构与算法、多线程、JVM原理、主流框架(如Spring Boot)、数据库(MySQL、Redis)及项目实战等内容,助力从零基础到企业级开发高手的进阶之路。
120 1
|
安全 Java 容器
Java并发编程 - 线程不安全类 & 同步/并发容器之简介
Java并发编程 - 线程不安全类 & 同步/并发容器之简介
169 0
Java并发编程 - 线程不安全类 & 同步/并发容器之简介
|
安全 Java 容器
java并发编程笔记3-同步容器&并发容器&闭锁&栅栏&信号量
一.同步容器:   1.Vector容器实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是Vector中的方法都是synchronized方法,即进行了同步措施。保证了线程安全。
1619 0