【并发技术13】条件阻塞Condition的应用(一)

简介: 【并发技术13】条件阻塞Condition的应用

Condition 将 Object 监听器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用, Condition 替代了 Object 监视器方法的使用。


1. Condition的基本使用


由于 Condition 可以用来替代 wait、notify 等方法,所以可以对比着之前写过的传统线程同步通信技术的代码来看,先来回顾一下原来的问题:

有两个线程,子线程先执行10次,然后主线程执行5次,然后再切换到子线程执行10,再主线程执行5次……如此往返执行50次。

之前用 wait 和notify 来实现的,现在用 Condition 来改写一下,代码如下:

public class ConditionCommunication {
    public static void main(String[] args) {
        Business bussiness = new Business();
        new Thread(new Runnable() {// 开启一个子线程
                    @Override
                    public void run() {
                        for (int i = 1; i <= 50; i++) {
                            bussiness.sub(i);
                        }
                    }
                }).start();
        // main方法主线程
        for (int i = 1; i <= 50; i++) {
            bussiness.main(i);
        }
    }   
}
class Business {
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition(); //Condition是在具体的lock之上的
    private boolean bShouldSub = true;
    public void sub(int i) {
        lock.lock();
        try {
            while (!bShouldSub) {
                try {
                    condition.await(); //用condition来调用await方法
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            for (int j = 1; j <= 10; j++) {
                System.out.println("sub thread sequence of " + j
                        + ", loop of " + i);
            }
            bShouldSub = false;
            condition.signal(); //用condition来发出唤醒信号,唤醒某一个
        } finally {
            lock.unlock();
        }
    }
    public void main(int i) {
        lock.lock();
        try {
            while (bShouldSub) {
                try {
                    condition.await(); //用condition来调用await方法
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            for (int j = 1; j <= 10; j++) {
                System.out.println("main thread sequence of " + j
                        + ", loop of " + i);
            }
            bShouldSub = true;
            condition.signal(); //用condition来发出唤醒信号么,唤醒某一个
        } finally {
            lock.unlock();
        }
    }
}


从代码来看,Condition 的使用是和 Lock 一起的,没有 Lock 就没法使用 Condition,因为 Condition 是通过 Lock 来 new 出来的,这种用法很简单,只要掌握了 synchronized 和 wait、notify 的使用,完全可以掌握 Lock 和 Condition 的使用。


2. Condition的拔高


2.1 缓冲区的阻塞队列


上面使用 Lock 和 Condition 来代替 synchronized 和 Object 监视器方法实现了两个线程之间的通信,现在再来写个稍微高级点的应用:模拟缓冲区的阻塞队列。

什么叫缓冲区呢?举个例子,现在有很多人要发消息,我是中转站,我要帮别人把消息发出去,那么现在我就需要做两件事,一件事是接受用户发过来的消息,并按照顺序放到缓冲区,另一件事是从缓冲区按顺序取出用户发过来的消息,并发送出去。

现在把这个实际的问题抽象一下:缓冲区即一个数组,我们可以向数组种写入数据,也可以从数组种把数据取走,我要做的两件事就是开启两个线程,一个存数据,一个取数据。但是问题来了,如果缓冲区满了,说明接收的消息太多了,即发送过来的消息太快了,我另一个线程还来不及发完,导致现在的缓冲区没地方放了,那么此时就得阻塞存数据这个线程,让其等待;相反,如果我转发的太快,现在缓冲区所有内容都被我发完了,还没有用户发新的消息来,那么此时就得阻塞取数据这个线程。

好了,分析完了这个缓冲区的阻塞队列,下面就用 Condition 技术来实现一下。

class Buffer {
    final Lock lock = new ReentrantLock(); //定义一个锁
    final Condition notFull = lock.newCondition(); //定义阻塞队列满了的Condition
    final Condition notEmpty = lock.newCondition();//定义阻塞队列空了的Condition
    final Object[] items = new Object[10]; //为了下面模拟,设置阻塞队列的大小为10,不要设太大
    int putptr, takeptr, count; //数组下标,用来标定位置的
    //往队列中存数据
    public void put(Object x) throws InterruptedException {
        lock.lock(); //上锁
        try {
            while (count == items.length) {
                System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时无法存数据!");
                notFull.await();    //如果队列满了,那么阻塞存数据这个线程,等待被唤醒
            }
            //如果没满,按顺序往数组中存
            items[putptr] = x;
            if (++putptr == items.length) //这是到达数组末端的判断,如果到了,再回到始端
                putptr = 0;
            ++count;    //消息数量
            System.out.println(Thread.currentThread().getName() + " 存好了值: " + x);
            notEmpty.signal(); //好了,现在队列中有数据了,唤醒队列空的那个线程,可以取数据啦
        } finally {
            lock.unlock(); //放锁
        }
    }
    //从队列中取数据
    public Object take() throws InterruptedException {
        lock.lock(); //上锁
        try {
            while (count == 0) {
                System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时无法取数据!");
                notEmpty.await();  //如果队列是空,那么阻塞取数据这个线程,等待被唤醒
            }
            //如果没空,按顺序从数组中取
            Object x = items[takeptr];
            if (++takeptr == items.length) //判断是否到达末端,如果到了,再回到始端
                takeptr = 0;
            --count; //消息数量
            System.out.println(Thread.currentThread().getName() + " 取出了值: " + x);
            notFull.signal(); //好了,现在队列中有位置了,唤醒队列满的那个线程,可以存数据啦
            return x;
        } finally {
            lock.unlock(); //放锁
        }
    }
}


这个程序很经典,我是从官方 JDK 文档中拿出来的,然后加了注释。程序中定义了来两个 Condition,分别针对两个线程,等待和唤醒分别用不同的 Condition 来执行,思路很清晰,程序也很健壮。


相关文章
|
6月前
|
算法 安全 调度
解决Python并发访问共享资源引起的竞态条件、死锁、饥饿问题的策略
解决Python并发访问共享资源引起的竞态条件、死锁、饥饿问题的策略
94 0
|
2月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
死锁产生的条件是什么?
死锁是多个线程因为争夺资源而相互等待的一种状态,导致所有线程都无法继续执行下去。死锁产生的条件通常包括以下四个条件:
92 0
|
6月前
|
存储 安全 C++
《C++ Concurrencyin Action》第6章--基于锁的并发数据结构设计
《C++ Concurrencyin Action》第6章--基于锁的并发数据结构设计
《C++ Concurrencyin Action》第6章--基于锁的并发数据结构设计
|
6月前
|
Java
多线程并发之显示锁Lock与其通信方式Condition源码解读
多线程并发之显示锁Lock与其通信方式Condition源码解读
47 0
|
存储 安全 Java
线程安全问题的产生条件、解决方式
线程安全问题的产生条件、解决方式
72 0
|
安全 算法
死锁原因,条件和解决
死锁原因,条件和解决
133 0
并发编程之没有条件创造条件Condition
多线程编程必会内容, 锁条件Lock.Condition
114 0
多线程详解p13、join线程强制执行
多线程详解p13、join线程强制执行