【Java并发编程实战14】构建自定义同步工具(Building-Custom-Synchronizers)(上)

简介: JDK包含许多存在状态依赖的类,例如FutureTask、Semaphore和BlockingQueue,他们的一些操作都有前提条件,例如非空、任务已完成等。

JDK包含许多存在状态依赖的类,例如FutureTask、Semaphore和BlockingQueue,他们的一些操作都有前提条件,例如非空、任务已完成等。


创建状态依赖类的最简单的房就是在JDK提供了的状态依赖类基础上构造。例如ValueLactch,如果这些不满足,可以使用Java语言或者类库提供的底层机制来构造,包括


  • 内置的条件队列
  • condition
  • AQS


这一章就介绍这些。

1 状态依赖性的管理 State Dependence

下一节会介绍使用条件队列解决阻塞线程运行。

下面先介绍通过轮询和休眠的方式(勉强)的解决。

标准模板:

void blockingAction() throws InterruptedException {
   acquire lock on object state
   while (precondition does not hold) {
      release lock
      wait until precondition might hold
      optionally fail if interrupted or timeout expires
      reacquire lock
   }
   perform action
}

看看阻塞有界队列的几种实现方式。依赖的前提条件:

  • 不能从空缓存中获取元素
  • 不能将元素放入已满的缓存中


不满足条件候,依赖状态的操作可以:

  • 抛出异常
  • 返回一个错误状态(码)
  • 阻塞直到进入正确的状态


下面是基类,线程安全,但非阻塞。

@ThreadSafe
public abstract class BaseBoundedBuffer <V> {
    @GuardedBy("this") private final V[] buf;
    @GuardedBy("this") private int tail;
    @GuardedBy("this") private int head;
    @GuardedBy("this") private int count;
    protected BaseBoundedBuffer(int capacity) {
        this.buf = (V[]) new Object[capacity];
    }
    protected synchronized final void doPut(V v) {
        buf[tail] = v;
        if (++tail == buf.length)
            tail = 0;
        ++count;
    }
    protected synchronized final V doTake() {
        V v = buf[head];
        buf[head] = null;
        if (++head == buf.length)
            head = 0;
        --count;
        return v;
    }
    public synchronized final boolean isFull() {
        return count == buf.length;
    }
    public synchronized final boolean isEmpty() {
        return count == 0;
    }
}

“先检查再运行”的逻辑解决方案:

调用者必须自己处理前提条件失败的情况。当然也可以返回错误消息。


当然调用者可以不Sleep,而是直接重试,这就是忙等待或者自旋等待(busy waiting or spin waiting),如果换成很长时间都不变,那么这将会消耗大量的CPU时间!!!所以调用者自己休眠,sleep让出CPU。但这个时间就很尴尬:


  • sleep长了万一一会前提条件就满足了岂不是白等了从而响应性低
  • sleep短了浪费CPU时钟周期


另外可以试试yield,但是这也不靠谱。

@ThreadSafe
        public class GrumpyBoundedBuffer <V> extends BaseBoundedBuffer<V> {
    public GrumpyBoundedBuffer() {
        this(100);
    }
    public GrumpyBoundedBuffer(int size) {
        super(size);
    }
    public synchronized void put(V v) throws BufferFullException {
        if (isFull())
            throw new BufferFullException();
        doPut(v);
    }
    public synchronized V take() throws BufferEmptyException {
        if (isEmpty())
            throw new BufferEmptyException();
        return doTake();
    }
}
class ExampleUsage {
    private GrumpyBoundedBuffer<String> buffer;
    int SLEEP_GRANULARITY = 50;
    void useBuffer() throws InterruptedException {
        while (true) {
            try {
                String item = buffer.take();
                // use item
                break;
            } catch (BufferEmptyException e) {
                Thread.sleep(SLEEP_GRANULARITY);
            }
        }
    }
}

优化让客户端舒服些:

@ThreadSafe
public class SleepyBoundedBuffer <V> extends BaseBoundedBuffer<V> {
    int SLEEP_GRANULARITY = 60;
    public SleepyBoundedBuffer() {
        this(100);
    }
    public SleepyBoundedBuffer(int size) {
        super(size);
    }
    public void put(V v) throws InterruptedException {
        while (true) {
            synchronized (this) {
                if (!isFull()) {
                    doPut(v);
                    return;
                }
            }
            Thread.sleep(SLEEP_GRANULARITY);
        }
    }
    public V take() throws InterruptedException {
        while (true) {
            synchronized (this) {
                if (!isEmpty())
                    return doTake();
            }
            Thread.sleep(SLEEP_GRANULARITY);
        }
    }
}

这种方式测试失败,那么释放锁,让别人做,自己休眠下,然后再检测,不断的重复这个过程,当然可以解决,但还需要做权衡,CPU使用率与响应性之间的抉择。


那么我们想如果这种轮询和休眠的dummy方式不用,而是存在某种挂起线程的方案,并且这种方法能够确保当某个条件为 true 时,立刻唤醒线程,那将极大简化实现工作,这就是条件队列的实现。


Condition Queues的名字来源:

it gives a group of threads called the wait set a way to wait for a specific condition to become true. Unlike

typical queues in which the elements are data items, the elements of a

condition queue are the threads waiting for the condition.


每个Java对象都可以是一个锁,每个对象同样可以作为一个条件队列,并且Object的wait、notify和notifyAll就是内部条件队列的API。对象的内置锁(intrinsic lock )和内置条件队列是关联的,要调用X中的条件队列的任何一个方法,都必须持有对象X上的锁。


Object.wait自动释放锁,并且请求os挂起当前线程,其他线程可以获得这个锁并修改对象状态。当被挂起的线程唤醒时,它将在返回之前重新获取锁。

@ThreadSafe
public class BoundedBuffer <V> extends BaseBoundedBuffer<V> {
    // CONDITION PREDICATE: not-full (!isFull())
    // CONDITION PREDICATE: not-empty (!isEmpty())
    public BoundedBuffer() {
        this(100);
    }
    public BoundedBuffer(int size) {
        super(size);
    }
    // BLOCKS-UNTIL: not-full
    public synchronized void put(V v) throws InterruptedException {
        while (isFull())
            wait();
        doPut(v);
        notifyAll();
    }
    // BLOCKS-UNTIL: not-empty
    public synchronized V take() throws InterruptedException {
        while (isEmpty())
            wait();
        V v = doTake();
        notifyAll();
        return v;
    }
    // BLOCKS-UNTIL: not-full
    // Alternate form of put() using conditional notification
    public synchronized void alternatePut(V v) throws InterruptedException {
        while (isFull())
            wait();
        boolean wasEmpty = isEmpty();
        doPut(v);
        if (wasEmpty)
            notifyAll();
    }
}

注意,如果某个功能无法通过“轮询和休眠"来实现,那么条件队列也无法实现。

2 使用条件队列

2.1 条件谓词(The Condition Predicate)

条件谓词是使某个操作成为状态依赖操作的前提条件:

  • take方法的条件谓词是”缓存不为空“,take方法在执行之前必须首先测试条件谓词
  • put方法的条件谓词是”缓存不满“


在条件等待中存在一种重要的三元关系:

  • 加锁
  • wait方法
  • 条件谓词


条件谓词中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象和条件队列对象必须是同一个对象。


wait释放锁,线程挂起阻塞,等待直到超时,然后被另外一个线程中断或被通知唤醒。唤醒后,wait在返回前还需要重新获取锁,当线程从wait方法中唤醒,它在重新请求锁时不具有任何特殊的优先级,和其他人一起竞争。

2.2 过早唤醒

其他线程中间插足了,获取了锁,并且修改了遍历,这时候线程获取锁需要重新检查条件谓词。

wait block ----------race to get lock ------------------------------------------get lock ----- 
                    ^
wait block --------> race to get lock ------get lock------> perform action  ---> release lock
                    ^
                    notifyAll

当然有时,比如一个你根本不知道为什么别人调用了notify或notifyAll,也许条件谓词压根就没满足,但线程还是获取了锁,然后test条件谓词,释放锁,其他线程都来了这么一趟,发生这就是“谎报军情”啊。


基于以上这两种情况,都必须重新测试条件谓词。


When using condition waits (Object.wait or Condition.await):


/

  • Always have a condition predicate——some test of object state that must hold before proceeding;
  • Always test the condition predicate before calling wait, and again after returning from wait;
  • Always call wait in a loop;
  • Ensure that the state variables making up the condition predicate are guarded by the lock associated with the condition queue;
  • Hold the lock associated with the the condition queue when calling wait, notify, or notifyAll
  • Do not release the lock after checking the condition predicate but before acting on it.


模板就是:

void stateDependentMethod() throws InterruptedException {
 // condition predicate must be guarded by lock
 synchronized(lock) {  
     while (!conditionPredicate())  //一定在循环里面做条件谓词
         lock.wait();  //确保和synchronized的是一个对象
     // object is now in desired state  //不要释放锁
 }
} 


目录
相关文章
|
6天前
|
安全 Java 调度
Java编程时多线程操作单核服务器可以不加锁吗?
Java编程时多线程操作单核服务器可以不加锁吗?
20 2
|
9天前
|
设计模式 缓存 Java
死磕-高效的Java编程(一)
死磕-高效的Java编程(一)
|
3天前
|
Java
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
11 2
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
|
8天前
|
Java 开发者
深入探索Java中的并发编程
本文将带你领略Java并发编程的奥秘,揭示其背后的原理与实践。通过深入浅出的解释和实例,我们将探讨Java内存模型、线程间通信以及常见并发工具的使用方法。无论是初学者还是有一定经验的开发者,都能从中获得启发和实用的技巧。让我们一起开启这场并发编程的奇妙之旅吧!
|
8天前
|
IDE Java 开发工具
java自定义异常20
java自定义异常20
15 3
|
9天前
|
算法 安全 Java
JAVA并发编程系列(12)ThreadLocal就是这么简单|建议收藏
很多人都以为TreadLocal很难很深奥,尤其被问到ThreadLocal数据结构、以及如何发生的内存泄漏问题,候选人容易谈虎色变。 日常大家用这个的很少,甚至很多近10年资深研发人员,都没有用过ThreadLocal。本文由浅入深、并且才有通俗易懂方式全面分析ThreadLocal的应用场景、数据结构、内存泄漏问题。降低大家学习啃骨头的心理压力,希望可以帮助大家彻底掌握并应用这个核心技术到工作当中。
|
7天前
|
Java 编译器 程序员
Java注解,元注解,自定义注解的使用
本文讲解了Java中注解的概念和作用,包括基本注解的用法(@Override, @Deprecated, @SuppressWarnings, @SafeVarargs, @FunctionalInterface),Java提供的元注解(@Retention, @Target, @Documented, @Inherited),以及如何自定义注解并通过反射获取注解信息。
Java注解,元注解,自定义注解的使用
|
9天前
|
Java 程序员 编译器
死磕-高效的Java编程(二)
死磕-高效的Java编程(二)
|
4天前
|
Java
JAVA并发编程系列(13)Future、FutureTask异步小王子
本文详细解析了Future及其相关类FutureTask的工作原理与应用场景。首先介绍了Future的基本概念和接口方法,强调其异步计算特性。接着通过FutureTask实现了一个模拟外卖订单处理的示例,展示了如何并发查询外卖信息并汇总结果。最后深入分析了FutureTask的源码,包括其内部状态转换机制及关键方法的实现原理。通过本文,读者可以全面理解Future在并发编程中的作用及其实现细节。
|
5天前
|
Java 数据中心 微服务
Java高级知识:线程池隔离与信号量隔离的实战应用
在Java并发编程中,线程池隔离与信号量隔离是两种常用的资源隔离技术,它们在提高系统稳定性、防止系统过载方面发挥着重要作用。
6 0
下一篇
无影云桌面