Condition
Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,有了解过阻塞队列底层实现的朋友应该会知道,阻塞队列实际上是使用了Condition来模拟线程间协作。
- Condition是个接口,基本的方法就是await()和signal()方法
- Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()
- 调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用
- Conditon中的await()对应Object的wait();
- Condition中的signal()对应Object的notify();
- Condition中的signalAll()对应Object的notifyAll()。
上边我们讲解了基于wait、notify实现的等待、通知模式,但是这种模式的实现缺乏一定的灵活性,那么接下来的这段内容主要是依靠Condition来实现生产消费模式:
生产者队列:
import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author idea * @data 2019/2/17 */ public class ProductQueue<T> { private final T[] items; private final Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); private int head; private int tail; private int count; public ProductQueue(int maxSize) { items = (T[]) new Object[maxSize]; } public ProductQueue() { this(10); } public void put(T t) { lock.lock(); try { while (count == getCapacity()) { //有点类似于Object.wait //队列满了则不写入 System.out.println("资源已生产完毕,材料不足了"); notFull.await(); } items[tail] = t; if (++tail == getCapacity()) { tail = 0; } count++; //有点类似于Object.notifyAll notEmpty.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public T take() { lock.lock(); try { while (count == 0) { //相当于Object.wait System.out.println("资源已经抢夺完毕"); notEmpty.await(); } T out = items[head]; //GC items[head] = null; if (++head == getCapacity()) { head = 0; } --count; notFull.signalAll(); System.out.println("抢到了资源:"); return out; } catch (Exception e) { e.printStackTrace(); return null; } finally { lock.unlock(); } } public int size() { lock.lock(); try { return count; } finally { lock.unlock(); } } public int getCapacity() { return items.length; } public static void main(String[] args) throws InterruptedException { ProductQueue<Integer> productQueue = new ProductQueue<>(); Thread provider = new Thread(new Provider(productQueue)); //验证队列满了是否可以再加入 provider.start(); CountDownLatch countDownLatch = new CountDownLatch(1); Consumer[] consumers = new Consumer[100]; //验证队列为空是否可以进行消费 for (int i = 0; i < 100; i++) { consumers[i] = new Consumer(productQueue, countDownLatch); consumers[i].start(); } System.out.println("消费者创建完毕"); countDownLatch.countDown(); } } 复制代码
生产者:
/** * @author idea * @data 2019/2/17 */ public class Provider implements Runnable { private ProductQueue<Integer> productQueue; public Provider(ProductQueue productQueue) { this.productQueue = productQueue; } @Override public void run() { while (true){ for(int i=0;i<10;i++){ System.out.println("------------生产了资源:"+i); productQueue.put(i); } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } 复制代码
消费者:
import java.util.concurrent.CountDownLatch; /** * @author idea * @data 2019/2/17 */ public class Consumer extends Thread { private ProductQueue<Integer> productQueue; private CountDownLatch countDownLatch; public Consumer(ProductQueue productQueue,CountDownLatch countDownLatch) { this.countDownLatch=countDownLatch; this.productQueue = productQueue; } @Override public void run() { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } while (true){ Integer number = productQueue.take(); } } } 复制代码
与Object里面的wait和notify相比,Condition里面还包含了很多有趣的知识点,由于篇幅有限,关于Condition在aqs中源码的实现部分,以后我会专门抽空来写一篇文章进行总结。