消费者
package consumer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class Consumer implements Runnable { /* * 用util.concurrent.BlockingQueue沟通生产者和消费者的桥梁 */ BlockingQueue<String> queue; String id; @SuppressWarnings("unused") private volatile boolean isRunning = true; public Consumer(BlockingQueue<String> queue, String id) { this.queue = queue; this.id = id; } public void stop() { isRunning = false; } @Override public void run() { System.out.println("Thread: " + id + " Consumer thread is running..."); boolean isRunning = true; try { while (isRunning) { System.out.println("Thread: " + id + " fetch data from linkedQueue..." + " queue size: " + queue.size()); /* * 从队列里取出一个元素,2秒超时,如果两秒之后还没有东西可以取,则poll返回null */ String data = queue.poll(2, TimeUnit.SECONDS); if (null != data) { System.out.println("Thread: " + id + " has consumed one data from queue: " + data + " Queue sise: " + queue.size()); // simulate data consumption Thread.sleep(1000); } else { isRunning = false; // 消费者准备退出 System.out.println("Thread: " + id + " Consumer read queue timeout"); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("Thread: " + id + " consumer thread ends"); } } }
生产者
package consumer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable { BlockingQueue<String> queue; String id; public Producer(BlockingQueue<String> queue, String id) { this.queue = queue; this.id = id; } @Override public void run() { String data = null; try { while (isRunning) { System.out.println("PRODUCER: " + id + " is running"); Thread.sleep(100); data = "data:" + count.incrementAndGet(); System.out.println("Thread: " + id + " procedued data into queue: " + data + " ..."); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { System.out.println("failed to put data into queue: " + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("Thread: " + id + " quit from producer thread"); } } public void stop() { isRunning = false; } private volatile boolean isRunning = true; private static AtomicInteger count = new AtomicInteger(); }
测试代码
package consumer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class ConsumerProducerTest { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> queue = new LinkedBlockingQueue<String>(15); Producer producer1 = new Producer(queue, "PROD1"); Producer producer2 = new Producer(queue, "PROD2"); Producer producer3 = new Producer(queue, "PROD3"); Consumer consumer1 = new Consumer(queue, "CONSUMER1"); Consumer consumer2 = new Consumer(queue, "CONSUMER2"); ExecutorService service = Executors.newCachedThreadPool(); service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer1); service.execute(consumer2); Thread.sleep(3 * 1000); producer1.stop(); // 一定要先关闭生产者 producer2.stop(); producer3.stop(); consumer1.stop(); consumer2.stop(); Thread.sleep(2000); service.shutdown(); } }