Java生产者和消费者问题是线程安全模型中的经典问题:生产者和消费者在同一个时间段共用同一个存储空间,生产者向存储空间中添加产品呢,消费者取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞
实现生产者消费者的四种方式
- 2.1,最基础的,利用 wait() 和 notify() 方法实现,当缓冲区满或为空时都调用 wait() 方法等待,当生产者生产了一个产品或消费者消费了一个产品后会唤醒所有线程;
package com.practice; public class testMain { private static Integer count = 0; private static final Integer FULL = 10; private static String LOCK = "lock"; public static void main(String[] args) { testMain testMain = new testMain(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i < 10; i++) { try{ Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); } synchronized (LOCK){ while(count == FULL){//缓存空间满了 try{ LOCK.wait();//线程阻塞 }catch (Exception e){ e.printStackTrace(); } } count++;//生产者 System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有"+count); LOCK.notifyAll();//唤醒所有线程 } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i < 10; i++) { try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } synchronized (LOCK){ while(count == 0){ try{ LOCK.wait(); }catch (Exception e){ } } count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有 "+count); LOCK.notifyAll();//唤醒所有线程 } } } } }
2.2 java.util.concurrent.lock 中的 Lock 框架,通过对 lock 的 lock() 方法和 unlock() 方法实现对锁的显示控制,而 synchronize() 则是对锁的隐形控制,可重入锁也叫做递归锁,指的是同一个线程外层函数获得锁之后,内层递归函数仍然有获取该锁的代码,但不受影响;
简单来说,该锁维护这一个与获取锁相关的计数器,如果拥有锁的某个线程再次得到锁,那么获计数器就加1,函数调用结束计数器就减1,然后锁需要释放两次才能获得真正释放,已经获取锁的线程进入其他需要相同锁的同步代码块不会被阻塞
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockTest { private static Integer count = 0; private static Integer FULL = 10; //创建一个锁对象 private Lock lock = new ReentrantLock(); //创建两个条件变量,一个为缓冲非满,一个缓冲区非空 private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public static void main(String[] args){ ReentrantLockTest testMain = new ReentrantLockTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } // 获取锁 lock.lock(); try { while (count == FULL) { try{ notFull.await(); }catch(InterruptedException e){ e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); }finally { lock.unlock(); } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); } catch (Exception e){ e.printStackTrace(); } lock.lock(); try{ while(count==0){ try{ notEmpty.await(); }catch (InterruptedException e){ e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有 " + count); }finally { lock.unlock();//解锁 } } } } }
- 2.3 阻塞队列BlockingQueue的实现
被阻塞的情况主要分为如下两种,BlockingQueue 是线程安全的
- 1,当队列满了的时候进行入队操作;
- 2,当队列空的时候进行出队操作
Blockqueue 接口的一些方法
四类方法分别对应于:
1,ThrowsException,如果操作不能马上进行,则抛出异常;
2,SpecialValue 如果操作不能马上进行,将会返回一个特殊的值,true或false;
3,Blocks 操作被阻塞;
4,TimeOut 指定时间未执行返回一个特殊值 true 或 false
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * 使用 BlockQueue 实现生产者消费模型 */ public class BlockQueueTest { public static Integer count = 0; //创建一个阻塞队列 final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10); public static void main(String[] args) { BlockQueueTest testMain = new BlockQueueTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); } try{ blockingQueue.put(1); count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有 " + count); }catch (InterruptedException e){ e.printStackTrace(); } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } try{ blockingQueue.take();//消费 count--; System.out.println(Thread.currentThread().getName() + " 消费者消费,目前总共有 "+ count); }catch (InterruptedException e){ e.printStackTrace(); } } } } }
2.4 信号量 Semaphore 的实现
Semaphore (信号量) 用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。Java中的 Semaphone 维护了一个许可集,一开始设定这个许可集的数量,使用 acquire() 方法获得一个许可,当许可不足时会被阻塞,release() 添加一个许可。
下面代码中,还加入了 mutex
信号量,维护消费者和生产者之间的同步关系,保证生产者消费者之间的交替进行
import java.util.concurrent.Semaphore; public class SemaphoreTest { private static Integer count = 0; //创建三个信号量 final Semaphore notFull = new Semaphore(10); final Semaphore notEmpty = new Semaphore(0); final Semaphore mutex = new Semaphore(1);//互斥锁,控制共享数据的互斥访问 public static void main(String[] args) { SemaphoreTest testMain = new SemaphoreTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } try{ notFull.acquire();//获取一个信号量 mutex.acquire(); count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有 "+count); } catch (InterruptedException e){ e.printStackTrace(); } finally { mutex.release();//添加 notEmpty.release(); } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch(InterruptedException e){ e.printStackTrace(); } try{ notEmpty.acquire(); mutex.acquire(); count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有"+count); }catch (InterruptedException e){ e.printStackTrace(); }finally { mutex.release(); notFull.release(); } } } } }