【Java数据结构及算法实战】系列010:Java队列04——链表实现的阻塞队列LinkedBlockingQueue

简介: 【Java数据结构及算法实战】系列010:Java队列04——链表实现的阻塞队列LinkedBlockingQueue

LinkedBlockingQueue是一种基于链表实现的可选边界的阻塞队列,该队列排序元素FIFO。队列的队首是在该队列上停留时间最长的元素,队列的队尾是在该队列上停留最短时间的元素。在队列尾部插入新的元素,队列检索操作在队列的头部获取元素。

 

在大多数并发应用程序中,基于链表实现的队列通常具有比基于数组实现的队列更高的吞吐量,但性能上未必占优势。

 

LinkedBlockingQueue在初始化时可以指定容量也可以不指定容量。当初始化LinkedBlockingQueue指定容量时,是有界队列;当初始化LinkedBlockingQueue未指定容量时,其内部会以Integer.MAX_VALUE值作为容量。当然,因为Integer.MAX_VALUE值非常大,近似无限大,因此LinkedBlockingQueue未指定容量时也可以近似认为是无界队列。

 

为防止队列的过度的扩展,建议在LinkedBlockingQueue初始化时指定容量。LinkedBlockingQueue内部的链接节点在每次入队元素时动态创建,除非这会使队列超过容量。

 

LinkedBlockingQueue类及其迭代器实现了Collection和Iterator接口的所有可选方法。LinkedBlockingQueue是Java Collections Framework的一个成员。

 

 

1.   LinkedBlockingQueue的声明

LinkedBlockingQueue的接口和继承关系如下

 

public class LinkedBlockingQueue<E> extends AbstractQueue<E>

       implements BlockingQueue<E>, java.io.Serializable {

  …

}

 

 

完整的接口继承关系如下图所示。

 

 

 

 

 

 

从上述代码可以看出,LinkedBlockingQueue既实现了BlockingQueue<E>和java.io.Serializable接口,又继承了java.util.AbstractQueue<E>。其中,AbstractQueue是Queue接口的抽象类,此处不再赘述。

 

 

 

2.   LinkedBlockingQueue的成员变量和构造函数

 

 

以下是LinkedBlockingQueue的构造函数和成员变量。

 

 

   // 容量

   private final int capacity;

 

   // 当前元素个数

   private final AtomicInteger count = new AtomicInteger();

 

   // 链表头结点

   // 不变式: head.item == null

   transient Node<E> head;

 

   // 链表尾结点

   // 不变式: last.next == null

   private transient Node<E> last;

 

   // 用于锁住take、poll等操作

   private final ReentrantLock takeLock = new ReentrantLock();

 

   // 队列非空,唤醒消费者

   private final Condition notEmpty = takeLock.newCondition();

 

   // 用于锁住put、offer等操作

   private final ReentrantLock putLock = new ReentrantLock();

 

   // 队列非满,唤醒生产者

private final Condition notFull = putLock.newCondition();

 

public LinkedBlockingQueue() {

       this(Integer.MAX_VALUE);

   }

 

   public LinkedBlockingQueue(int capacity) {

       if (capacity <= 0) throw new IllegalArgumentException();

       this.capacity = capacity;

       last = head = new Node<E>(null);

   }

 

   public LinkedBlockingQueue(Collection<? extends E> c) {

       this(Integer.MAX_VALUE);

       final ReentrantLock putLock = this.putLock;

       putLock.lock();  // 只锁可见,不互斥

       try {

           int n = 0;

           for (E e : c) {

               if (e == null)

                   throw new NullPointerException();

               if (n == capacity)

                   throw new IllegalStateException("Queue full");

               enqueue(new Node<E>(e));

               ++n;

           }

           count.set(n);

       } finally {

           putLock.unlock();

       }

}

 

 

从上述代码可以看出,构造函数有三种。构造函数中的参数含义如下

 

l  capacity用于设置队列容量。该参数是可选的,如果未设置,则取Integer.MAX_VALUE值作为容量

l  c用于设置最初包含给定集合的元素,按集合迭代器的遍历顺序添加

 

类成员last和head分别指代链表的尾结点和头结点。链表中的结点用Node类型表示,代码如下:

 

 

static class Node<E> {

       E item;

 

       /**

        * next有以下三种场景:

        * - 真正的后继结点

        * - 当前结点是头结点,则后继结点是head.next

        * - 值为null,表示当前结点是尾结点,没有后继结点

        */

       Node<E> next;

 

       Node(E x) { item = x; }

}

 

 

访问策略是通过ReentrantLock来实现的。通过两个加锁条件notEmpty、notFull来实现并发控制。与ArrayBlockingQueue所不同的是,LinkedBlockingQueue使用了takeLock和putLock两把锁来分别锁住出队操作和入队操作。

 

count用于记录当前队列里面的元素个数。

3.   LinkedBlockingQueue的核心方法

以下对LinkedBlockingQueue常用核心方法的实现原理进行解释。

 

 

3.1.     offer(e)

执行offer(e)方法后有两种结果

l  队列未满时,返回 true

l  队列满时,返回 false

 

LinkedBlockingQueue的offer (e)方法源码如下:

 

public boolean offer(E e) {

       if (e == null) throw new NullPointerException();  // 判空

       final AtomicInteger count = this.count;

       if (count.get() == capacity)

           return false;

       final int c;

       final Node<E> node = new Node<E>(e);

       final ReentrantLock putLock = this.putLock;

       putLock.lock();  // 加锁

       try {

           if (count.get() == capacity)

               return false;

           enqueue(node); // 入队

           c = count.getAndIncrement();

           if (c + 1 < capacity)

               notFull.signal(); // 标识当前队列非满

       } finally {

           putLock.unlock(); // 解锁

       }

       if (c == 0)

           signalNotEmpty();  // 标识当前队列已经是非空

       return true;

   }

 

从上面代码可以看出,执行offer(e)方法时,分为以下几个步骤:

 

l  判断待入队的元素e是否为null。为null则抛出NullPointerException异常。

l  判断count是否超过了容量的限制,如果是则证明队列已经满了,直接返回false。

l  为了确保并发操作的安全先做了加锁处理。

l  再次判断count是否超过了容量的限制,如果是则证明队列已经满了,直接返回false;否则将元素e做入队处理,并返回true。

l  解锁。

l  c是元素e入队前队列中的元素个数。如果是0,则说明之前的队列是空的,还需要执行signalNotEmpty()方法来标识当前队列已经是非空了。

 

enqueue(node)方法代码如下:

 

private void enqueue(Node<E> node) {

       last = last.next = node;

   }

 

enqueue(node)方法就在链表的尾部插入数据元素。

 

signalNotEmpty()方法代码如下:

 

private void signalNotEmpty() {

       final ReentrantLock takeLock = this.takeLock;

       takeLock.lock();

       try {

           notEmpty.signal();

       } finally {

           takeLock.unlock();

       }

   }

 

思考:细心的读者可能会发现,在offer (e)方法方法中做了两次判断count是否超过了容量的限制。那么为什么要判断两次呢?

3.2.     put(e)

执行put(e)方法后有两种结果:

•      

l  队列未满时,直接插入没有返回值

l  队列满时,会阻塞等待,一直等到队列未满时再插入

 

LinkedBlockingQueue的put(e)方法源码如下:

 

 

public void put(E e) throws InterruptedException {

       if (e == null) throw new NullPointerException();

       final int c;

       final Node<E> node = new Node<E>(e);

       final ReentrantLock putLock = this.putLock;

       final AtomicInteger count = this.count;

       putLock.lockInterruptibly();  // 获取锁

       try {

           while (count.get() == capacity) {

               notFull.await();  // 使线程等待

           }

           enqueue(node);

           c = count.getAndIncrement();

           if (c + 1 < capacity)

               notFull.signal();  // 标识当前队列非满

       } finally {

           putLock.unlock();  // 解锁

       }

       if (c == 0)

           signalNotEmpty();  // 标识当前队列已经是非空

}

 

从上面代码可以看出,put(e)方法的实现,分为以下几个步骤:

 

l  先是要获取锁。

l  而后判断count是否等于容量,如果是则证明队列已经满了,就等待;否则执行enqueue(e)方法做元素的入队。

l  解锁。

l  c是元素e入队前队列中的元素个数。如果是0,则说明之前的队列是空的,还需要执行signalNotEmpty()方法来标识当前队列已经是非空了。

3.3.     offer(e,time,unit)

offer(e,time,unit)方法与offer(e)方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false。执行offer(e,time,unit)方法有两种结果:

•      

l  队列未满时,返回 true

l  队列满时,会阻塞等待,如果在指定时间内还不能往队列中插入数据则返回 false

 

LinkedBlockingQueue的offer(e,time,unit)方法源码如下:

 

public boolean offer(E e, long timeout, TimeUnit unit)

       throws InterruptedException {

 

       if (e == null) throw new NullPointerException();

       long nanos = unit.toNanos(timeout);

       final int c;

       final ReentrantLock putLock = this.putLock;

       final AtomicInteger count = this.count;

       putLock.lockInterruptibly();  // 获取锁

       try {

           while (count.get() == capacity) {

               if (nanos <= 0L)

                   return false;

               nanos = notFull.awaitNanos(nanos); // 使线程等待指定的时间

           }

           enqueue(new Node<E>(e));

           c = count.getAndIncrement();

           if (c + 1 < capacity)

               notFull.signal();  // 标识当前队列非满

       } finally {

           putLock.unlock();  // 解锁

       }

       if (c == 0)

           signalNotEmpty();  // 标识当前队列已经是非空

       return true;

}

 

从上面代码可以看出,offer(e,time,unit)方法的实现,分为以下几个步骤:

 

l  先是要获取锁。

l  而后判断count是否等于容量,如果是则证明队列已经满了,就等待;否则执行enqueue(e)方法做元素的入队。

l  解锁。

l  c是元素e入队前队列中的元素个数。如果是0,则说明之前的队列是空的,还需要执行signalNotEmpty()方法来标识当前队列已经是非空了。

 

3.4.     add(e)

执行add(e)方法后有两种结果

 

l  队列未满时,返回 true

l  队列满时,则抛出异常

 

ArrayBlockingQueue的add(e)方法源码如下:

 

   public boolean add(E e) {

       return super.add(e);

   }

 

 

从上面代码可以看出,add(e)方法的实现,直接是调用了父类AbstractQueue的add(e)方法。而AbstractQueue的add(e)方法源码如下:

 

 

public boolean add(E e) {

       if (offer(e))

           return true;

       else

           throw new IllegalStateException("Queue full");

}

 

 

从上面代码可以看出,add(e)方法又调用了offer(e)方法。offer(e)方法此处不再赘述。

 

 

 

 

3.5.     poll ()

执行poll ()方法后有两种结果:

 

l  队列不为空时,返回队首值并移除

l  队列为空时,返回 null

 

 

LinkedBlockingQueue的poll ()方法源码如下:

 

public E poll() {

       final AtomicInteger count = this.count;

       if (count.get() == 0)

           return null;

       final E x;

       final int c;

       final ReentrantLock takeLock = this.takeLock;

       takeLock.lock();  // 加锁

       try {

           if (count.get() == 0)

               return null;

           x = dequeue();  // 出队

           c = count.getAndDecrement();

           if (c > 1)

               notEmpty.signal();  // 标识当前队列非空

       } finally {

           takeLock.unlock();  // 解锁

       }

       if (c == capacity)

           signalNotFull();  // 标识当前队列已经是非满

       return x;

   }

从上面代码可以看出,执行poll()方法时,分为以下几个步骤:

 

l  先是判断count是否等于0,如果等于0则证明队列为空,直接返回null。

l  为了确保并发操作的安全先做了加锁处理。

l  再次判断count是否等于0,如果等于0则证明队列为空,直接返回null;否则执行dequeue()方法做元素的出队。

l  解锁。

l  c是元素e入队前队列中的元素个数。如果是等于队列的容量,则说明之前的队列是满的,还需要执行signalNotFull ()方法来标识当前队列已经是非满了。

 

dequeue()方法源码如下:

 

 

private E dequeue() {

       Node<E> h = head;

       Node<E> first = h.next;

       h.next = h; // 利于GC

       head = first;

       E x = first.item;

       first.item = null;

       return x;

}

 

上面代码比较简单,就是移除链表的头结点。

 

3.6.     take()

执行take()方法后有两种结果:

 

l  队列不为空时,返回队首值并移除

l  队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值

 

LinkedBlockingQueue的take ()方法源码如下:

 

public E take() throws InterruptedException {

       final E x;

       final int c;

       final AtomicInteger count = this.count;

       final ReentrantLock takeLock = this.takeLock;

       takeLock.lockInterruptibly();  // 获取锁

       try {

           while (count.get() == 0) {

               notEmpty.await();  // 使线程等待

           }

           x = dequeue();  // 出队

           c = count.getAndDecrement();

           if (c > 1)

               notEmpty.signal();  // 标识当前队列非空

       } finally {

           takeLock.unlock();  // 解锁

       }

       if (c == capacity)

           signalNotFull();  // 标识当前队列已经是非满

       return x;

   }

 

从上面代码可以看出,执行take()方法时,分为以下几个步骤:

 

l  先是要获取锁。

l  而后判断count是否等于0,如果等于0则证明队列为空,会阻塞等待;否则执行dequeue()方法做元素的出队。

l  解锁。

l  c是元素e入队前队列中的元素个数。如果是等于队列的容量,则说明之前的队列是满的,还需要执行signalNotFull ()方法来标识当前队列已经是非满了。

 

dequeue()和signalNotFull ()方法此处不再赘述。

 

3.7.     poll(time,unit)

poll(time,unit)方法与poll()方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内队列还为空,则返回null。执行poll(time,unit)方法后有两种结果:

 

l  队列不为空时,返回队首值并移除

l  队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null

 

LinkedBlockingQueue的poll(time,unit)方法源码如下:

 

public E poll(long timeout, TimeUnit unit) throws InterruptedException {

       final E x;

       final int c;

       long nanos = unit.toNanos(timeout);

       final AtomicInteger count = this.count;

       final ReentrantLock takeLock = this.takeLock;

       takeLock.lockInterruptibly();  // 获取锁

       try {

           while (count.get() == 0) {

               if (nanos <= 0L)

                   return null;

               nanos = notEmpty.awaitNanos(nanos); // 使线程等待指定的时间

           }

           x = dequeue();  // 出队

           c = count.getAndDecrement();

           if (c > 1)

               notEmpty.signal();  // 标识当前队列非空

       } finally {

           takeLock.unlock();  // 解锁

       }

       if (c == capacity)

           signalNotFull();  // 标识当前队列已经是非满

       return x;

}

 

 

从上面代码可以看出,执行poll(time,unit)方法时,分为以下几个步骤:

 

l  先是要获取锁。

l  而后判断count是否等于0,如果等于0则证明队列为空,会阻塞等待;否则执行dequeue()方法做元素的出队。

l  解锁。

l  c是元素e入队前队列中的元素个数。如果是等于队列的容量,则说明之前的队列是满的,还需要执行signalNotFull ()方法来标识当前队列已经是非满了。

 

dequeue()和signalNotFull ()方法此处不再赘述。

 

 

3.8.     remove()

执行remove()方法后有两种结果:

 

l  队列不为空时,返回队首值并移除

l  队列为空时,抛出异常

 

LinkedBlockingQueue的remove()方法其实是调用了父类AbstractQueue的remove ()方法,源码如下:

 

public E remove() {

       E x = poll();

       if (x != null)

           return x;

       else

           throw new NoSuchElementException();

}

 

从上面代码可以看出,remove()直接调用了poll()方法。如果poll()方法返回结果为null,则抛出NoSuchElementException异常。

 

poll()方法此处不再赘述。

 

3.9.     peek()

执行peek()方法后有两种结果:

 

l  队列不为空时,返回队首值但不移除

l  队列为空时,返回null

 

 

peek()方法源码如下:

 

public E peek() {

       final AtomicInteger count = this.count;

       if (count.get() == 0)

           return null;

       final ReentrantLock takeLock = this.takeLock;

       takeLock.lock();  // 加锁

       try {

           return (count.get() > 0) ? head.next.item : null;  // 空则返回null

       } finally {

           takeLock.unlock();  // 解锁

       }

}

 

从上面代码可以看出,peek()方法比较简单,直接就是获取了链表里面头结点的元素值。

 

3.10.            element()

执行element()方法后有两种结果:

 

l  队列不为空时,返回队首值但不移除

l  队列为空时,抛出异常

 

 

element()方法其实是调用了父类AbstractQueue的element()方法,源码如下:

 

public E element() {

       E x = peek();

       if (x != null)

           return x;

       else

           throw new NoSuchElementException();

}

 

从上面代码可以看出,执行element()方法时,先是获取peek()方法的结果,如果结果是null,则抛出NoSuchElementException异常。

 

 

 

4.   LinkedBlockingQueue的单元测试

 

LinkedBlockingQueue的单元测试如下:

 

 

 

package com.waylau.java.demo.datastructure;

 

import static org.junit.jupiter.api.Assertions.assertEquals;

import static org.junit.jupiter.api.Assertions.assertFalse;

import static org.junit.jupiter.api.Assertions.assertNotNull;

import static org.junit.jupiter.api.Assertions.assertNull;

import static org.junit.jupiter.api.Assertions.assertThrows;

import static org.junit.jupiter.api.Assertions.assertTrue;

 

import java.util.NoSuchElementException;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.TimeUnit;

 

import org.junit.jupiter.api.Test;

 

/**

* LinkedBlockingQueue Test

*

* @since 1.0.0 2020年5月24日

* @author <a href="https://waylau.com">Way Lau</a>

*/

class LinkedBlockingQueueTests {

   @Test

   void testOffer() {

       // 初始化队列

       BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);

 

       // 测试队列未满时,返回 true

       boolean resultNotFull = queue.offer("Java");

       assertTrue(resultNotFull);

 

       // 测试队列满则,返回 false

       queue.offer("C");

       queue.offer("Python");

       boolean resultFull = queue.offer("C++");

       assertFalse(resultFull);

   }

 

   @Test

   void testPut() throws InterruptedException {

       // 初始化队列

       BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);

 

       // 测试队列未满时,直接插入没有返回值;

       queue.put("Java");

 

       // 测试队列满则, 会阻塞等待,一直等到队列未满时再插入。

       queue.put("C");

       queue.put("Python");

       queue.put("C++");  // 阻塞等待

   }

 

   @Test

   void testOfferTime() throws InterruptedException {

       // 初始化队列

       BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);

 

       // 测试队列未满时,返回 true

       boolean resultNotFull = queue.offer("Java", 5, TimeUnit.SECONDS);

       assertTrue(resultNotFull);

 

       // 测试队列满则,返回 false

       queue.offer("C");

       queue.offer("Python");

       boolean resultFull = queue.offer("C++", 5, TimeUnit.SECONDS); // 等5秒

       assertFalse(resultFull);

   }

 

   @Test

   void testAdd() {

       // 初始化队列

       BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);

 

       // 测试队列未满时,返回 true

       boolean resultNotFull = queue.add("Java");

       assertTrue(resultNotFull);

 

       // 测试队列满则抛出异常

       queue.add("C");

       queue.add("Python");

 

       Throwable excpetion = assertThrows(IllegalStateException.class, () -> {

           queue.add("C++");// 抛异常

       });

 

       assertEquals("Queue full", excpetion.getMessage());

   }

 

   @Test

   void testPoll() throws InterruptedException {

       // 初始化队列

       BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);

 

       // 测试队列为空时,返回 null

       String resultEmpty = queue.poll();

       assertNull(resultEmpty);

 

       // 测试队列不为空时,返回队首值并移除

       queue.put("Java");

       queue.put("C");

       queue.put("Python");

       String resultNotEmpty = queue.poll();

       assertEquals("Java", resultNotEmpty);

   }

 

   @Test

   void testTake() throws InterruptedException {

       // 初始化队列

       BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);

 

       // 测试队列不为空时,返回队首值并移除

       queue.put("Java");

       queue.put("C");

       queue.put("Python");

       String resultNotEmpty = queue.take();

       assertEquals("Java", resultNotEmpty);

 

       // 测试队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值

       queue.clear();

       String resultEmpty = queue.take(); // 阻塞等待

       assertNotNull(resultEmpty);

   }

 

   @Test

   void testPollTime() throws InterruptedException {

       // 初始化队列

       BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);

 

       // 测试队列不为空时,返回队首值并移除

       queue.put("Java");

       queue.put("C");

       queue.put("Python");

       String resultNotEmpty = queue.poll(5, TimeUnit.SECONDS);

       assertEquals("Java", resultNotEmpty);

 

       // 测试队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null

       queue.clear();

       String resultEmpty = queue.poll(5, TimeUnit.SECONDS); // 等待5秒

       assertNull(resultEmpty);

   }

 

   @Test

   void testRemove() throws InterruptedException {

       // 初始化队列

       BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);

 

       // 测试队列为空时,抛出异常

       Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {

           queue.remove();// 抛异常

       });

 

       assertEquals(null, excpetion.getMessage());

 

       // 测试队列不为空时,返回队首值并移除

       queue.put("Java");

       queue.put("C");

       queue.put("Python");

       String resultNotEmpty = queue.remove();

       assertEquals("Java", resultNotEmpty);

}

 

@Test

   void testPeek() throws InterruptedException {

       // 初始化队列

       Queue<String> queue = new LinkedBlockingQueue<String>(3);

 

       // 测试队列不为空时,返回队首值并但不移除

       queue.add("Java");

       queue.add("C");

       queue.add("Python");

       String resultNotEmpty = queue.peek();

       assertEquals("Java", resultNotEmpty);

       resultNotEmpty = queue.peek();

       assertEquals("Java", resultNotEmpty);

       resultNotEmpty = queue.peek();

       assertEquals("Java", resultNotEmpty);

 

       // 测试队列为空时,返回null

       queue.clear();

       String resultEmpty = queue.peek();

       assertNull(resultEmpty);

   }

 

   @Test

   void testElement() throws InterruptedException {

       // 初始化队列

       Queue<String> queue = new LinkedBlockingQueue<String>(3);

 

       // 测试队列不为空时,返回队首值并但不移除

       queue.add("Java");

       queue.add("C");

       queue.add("Python");

       String resultNotEmpty = queue.element();

       assertEquals("Java", resultNotEmpty);

       resultNotEmpty = queue.element();

       assertEquals("Java", resultNotEmpty);

       resultNotEmpty = queue.element();

       assertEquals("Java", resultNotEmpty);

 

       // 测试队列为空时,抛出异常

       queue.clear();

       Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {

           queue.element();// 抛异常

       });

 

       assertEquals(null, excpetion.getMessage());

   }

}

 

 

5.   LinkedBlockingQueue的应用案例

以下是一个生产者-消费者的示例。该示例模拟了1个生产者,2个消费者。当队列满时,则会阻塞生产者生产;当队列空时,则会阻塞消费者消费。

 

package com.waylau.java.demo.datastructure;

 

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.BlockingQueue;

 

/**

* LinkedBlockingQueue Demo

*

* @since 1.0.0 2020年5月23日

* @author <a href="https://waylau.com">Way Lau</a>

*/

public class LinkedBlockingQueueDemo {

   public static void main(String[] args) {

       BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);

 

       // 1个生产者

       Producer p = new Producer(queue);

 

       // 2个消费者

       Consumer c1 = new Consumer("c1", queue);

       Consumer c2 = new Consumer("c2", queue);

 

       // 启动线程

       new Thread(p).start();

       new Thread(c1).start();

       new Thread(c2).start();

   }

 

}

 

class Producer implements Runnable {

   private final BlockingQueue<String> queue;

 

   Producer(BlockingQueue<String> queue) {

       this.queue = queue;

   }

 

   public void run() {

       try {

           while (true) {

               // 模拟耗时操作

               Thread.sleep(1000L);

 

               queue.put(produce());

           }

       } catch (InterruptedException ex) {

           ex.printStackTrace();

       }

   }

 

   String produce() {

       String apple = "apple: " + System.currentTimeMillis();

       System.out.println("produce " + apple);

       return apple;

   }

}

 

class Consumer implements Runnable {

   private final BlockingQueue<String> queue;

 

   private final String name;

 

   Consumer(String name, BlockingQueue<String> queue) {

       this.queue = queue;

       this.name = name;

   }

 

   public void run() {

       try {

           while (true) {

               // 模拟耗时操作

               Thread.sleep(2000L);

 

               consume(queue.take());

           }

       } catch (InterruptedException ex) {

           ex.printStackTrace();

       }

   }

 

   void consume(Object x) {

       System.out.println(this.name + " consume " + x);

   }

}

 

 

运行上述程序,输出内容如下:

 

produce apple: 1590308520134

c1 consume apple: 1590308520134

produce apple: 1590308521135

c2 consume apple: 1590308521135

produce apple: 1590308522142

c1 consume apple: 1590308522142

produce apple: 1590308523147

c2 consume apple: 1590308523147

produce apple: 1590308524156

c1 consume apple: 1590308524156

produce apple: 1590308525157

c2 consume apple: 1590308525157

produce apple: 1590308526157

c1 consume apple: 1590308526157

produce apple: 1590308527157

c2 consume apple: 1590308527157

 

6.   参考引用

本系列归档至《Java数据结构及算法实战》:https://github.com/waylau/java-data-structures-and-algorithms-in-action

《数据结构和算法基础(Java语言实现)》(柳伟卫著,北京大学出版社出版):https://item.jd.com/13014179.html

目录
相关文章
|
9天前
|
Java
java数据结构,双向链表的实现
文章介绍了双向链表的实现,包括数据结构定义、插入和删除操作的代码实现,以及双向链表的其他操作方法,并提供了完整的Java代码实现。
java数据结构,双向链表的实现
|
19天前
|
存储 Java
【数据结构】优先级队列(堆)从实现到应用详解
本文介绍了优先级队列的概念及其底层数据结构——堆。优先级队列根据元素的优先级而非插入顺序进行出队操作。JDK1.8中的`PriorityQueue`使用堆实现,堆分为大根堆和小根堆。大根堆中每个节点的值都不小于其子节点的值,小根堆则相反。文章详细讲解了如何通过数组模拟实现堆,并提供了创建、插入、删除以及获取堆顶元素的具体步骤。此外,还介绍了堆排序及解决Top K问题的应用,并展示了Java中`PriorityQueue`的基本用法和注意事项。
23 5
【数据结构】优先级队列(堆)从实现到应用详解
|
1月前
|
存储 Java 索引
【数据结构】链表从实现到应用,保姆级攻略
本文详细介绍了链表这一重要数据结构。链表与数组不同,其元素在内存中非连续分布,通过指针连接。Java中链表常用于需动态添加或删除元素的场景。文章首先解释了单向链表的基本概念,包括节点定义及各种操作如插入、删除等的实现方法。随后介绍了双向链表,说明了其拥有前后两个指针的特点,并展示了相关操作的代码实现。最后,对比了ArrayList与LinkedList的不同之处,包括它们底层实现、时间复杂度以及适用场景等方面。
44 10
【数据结构】链表从实现到应用,保姆级攻略
|
6天前
|
前端开发
07_用队列实现栈
07_用队列实现栈
|
6天前
|
测试技术
02_由两个栈组成的队列
02_由两个栈组成的队列
|
10天前
|
存储
|
27天前
|
存储 C语言
数据结构基础详解(C语言): 栈与队列的详解附完整代码
栈是一种仅允许在一端进行插入和删除操作的线性表,常用于解决括号匹配、函数调用等问题。栈分为顺序栈和链栈,顺序栈使用数组存储,链栈基于单链表实现。栈的主要操作包括初始化、销毁、入栈、出栈等。栈的应用广泛,如表达式求值、递归等场景。栈的顺序存储结构由数组和栈顶指针构成,链栈则基于单链表的头插法实现。
151 3
|
27天前
|
存储 算法 C语言
C语言手撕实战代码_循环单链表和循环双链表
本文档详细介绍了用C语言实现循环单链表和循环双链表的相关算法。包括循环单链表的建立、逆转、左移、拆分及合并等操作;以及双链表的建立、遍历、排序和循环双链表的重组。通过具体示例和代码片段,展示了每种算法的实现思路与步骤,帮助读者深入理解并掌握这些数据结构的基本操作方法。
|
28天前
|
Java
【数据结构】栈和队列的深度探索,从实现到应用详解
本文介绍了栈和队列这两种数据结构。栈是一种后进先出(LIFO)的数据结构,元素只能从栈顶进行插入和删除。栈的基本操作包括压栈、出栈、获取栈顶元素、判断是否为空及获取栈的大小。栈可以通过数组或链表实现,并可用于将递归转化为循环。队列则是一种先进先出(FIFO)的数据结构,元素只能从队尾插入,从队首移除。队列的基本操作包括入队、出队、获取队首元素、判断是否为空及获取队列大小。队列可通过双向链表或数组实现。此外,双端队列(Deque)支持两端插入和删除元素,提供了更丰富的操作。
30 0
【数据结构】栈和队列的深度探索,从实现到应用详解
|
2月前
|
存储 安全 Java
从基础到实战:如何用 Java 手写一个阻塞队列?
大家好,我是小米!今天分享手写阻塞队列(Blocking Queue)教程,深入讲解并发编程中的 wait() 和 notifyAll() 机制,通过代码实战,让你轻松掌握生产者-消费者模型中的阻塞队列实现!
55 0
下一篇
无影云桌面