Java生产者消费者是最基础的线程同步问题,java岗面试中还是很容易遇到的,之前没写过多线程的代码,面试中被问到很尬啊,面完回来恶补下。在网上查到大概有5种生产者消费者的写法,分别如下。
用synchronized对存储加锁,然后用object原生的wait() 和 notify()做同步。
用concurrent.locks.Lock,然后用condition的await() 和signal()做同步。
直接使用concurrent.BlockingQueue。
使用PipedInputStream/PipedOutputStream。
使用信号量semaphore。
我的理解,生产者消费者模式,其实只要保证在存储端同一时刻只有一个线程读或写就不会有问题,然后再去考虑线程同步。方法1 2 5都比较类似,都是加锁来限制同一时刻只能有一个读或写。而方法3 4其实是在存储内部去保证读和写的唯一的,最低层肯定还是通过锁机制来实现的,java底层代码都封装好了而已。
我自己尝试写了下前三种,代码如下:
synchronized版本
import java.util.LinkedList; import java.util.Queue; public class ProducerAndConsumer { private final int MAX_LEN = 10; private Queue<Integer> queue = new LinkedList<Integer>(); class Producer extends Thread { @Override public void run() { producer(); } private void producer() { while(true) { synchronized (queue) { while (queue.size() == MAX_LEN) { queue.notify(); System.out.println("当前队列满"); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.add(1); queue.notify(); System.out.println("生产者生产一条任务,当前队列长度为" + queue.size()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } } class Consumer extends Thread { @Override public void run() { consumer(); } private void consumer() { while (true) { synchronized (queue) { while (queue.size() == 0) { queue.notify(); System.out.println("当前队列为空"); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.poll(); queue.notify(); System.out.println("消费者消费一条任务,当前队列长度为" + queue.size()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } } public static void main(String[] args) { ProducerAndConsumer pc = new ProducerAndConsumer(); Producer producer = pc.new Producer(); Consumer consumer = pc.new Consumer(); producer.start(); consumer.start(); } }
lock版实现,使用了condition做线程之间的同步。
import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * version 1 doesn't use synchronized to improve performance */ public class ProducerAndConsumer1 { private final int MAX_LEN = 10; private Queue<Integer> queue = new LinkedList<Integer>(); private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); class Producer extends Thread { @Override public void run() { producer(); } private void producer() { while(true) { lock.lock(); try { while (queue.size() == MAX_LEN) { System.out.println("当前队列满"); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.add(1); condition.signal(); System.out.println("生产者生产一条任务,当前队列长度为" + queue.size()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } finally { lock.unlock(); } } } } class Consumer extends Thread { @Override public void run() { consumer(); } private void consumer() { while (true) { lock.lock(); try { while (queue.size() == 0) { System.out.println("当前队列为空"); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.poll(); condition.signal(); System.out.println("消费者消费一条任务,当前队列长度为" + queue.size()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } finally { lock.unlock(); } } } } public static void main(String[] args) { ProducerAndConsumer pc = new ProducerAndConsumer(); Producer producer = pc.new Producer(); Consumer consumer = pc.new Consumer(); producer.start(); consumer.start(); } }
BlockingQueue版实现
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class ProducerAndConsumer { private BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(10); class Producer extends Thread { @Override public void run() { producer(); } private void producer() { while(true) { try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("生产者生产一条任务,当前队列长度为" + queue.size()); try { Thread.sleep(new Random().nextInt(1000)+500); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer extends Thread { @Override public void run() { consumer(); } private void consumer() { while (true) { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者消费一条任务,当前队列长度为" + queue.size()); try { Thread.sleep(new Random().nextInt(1000)+500); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { ProducerAndConsumer pc = new ProducerAndConsumer(); Producer producer = pc.new Producer(); Consumer consumer = pc.new Consumer(); producer.start(); consumer.start(); } }