问题描述
实现经典同步问题:生产者—消费者,具体要求如下:
① 一个大小为 n的缓冲区,初始状态为空。
② 生产者: 往缓冲区中添加数据,若缓冲区已满,等待消费者取走数据之后再添加
③ 消费者: 从缓冲区中读取数据,若缓冲区为空,等待生产者添加数据之后再读取
下面为java代码实现
设定
- n赋值为10
static final int n=10;
- 往缓冲区中添加的数据为buffer[in]的下标in
buffer[in]= ""+in;
简单实现
package os.prioducer_consumer; import java.util.concurrent.Semaphore; public class ProcessTest1 { static final int n=10; static int in=0,out=0; static String[] buffer=new String[n]; static Semaphore mutex=new Semaphore(1,false); static Semaphore empty=new Semaphore(n,false); static Semaphore full=new Semaphore(0,false); public static void main(String[] args) { Thread producer=new Thread(new Runnable() { @Override public void run() { while (true){ try { empty.acquire(); mutex.acquire(); System.out.println(Thread.currentThread().getName()+"放入"+in); buffer[in]= ""+in; in=(in+1)%n; Thread.sleep(1000);//休眠表示放入的过程 mutex.release(); full.release(); }catch (InterruptedException e) { System.out.println("生产者获取资源失败!"); e.printStackTrace(); } } } }); Thread consumer=new Thread(new Runnable() { @Override public void run() { while (true){ try { full.acquire(); mutex.acquire(); String o=buffer[out]; System.out.println(Thread.currentThread().getName()+"取出"+o); Thread.sleep(1000);//休眠表示放入的过程 out=(out+1)%n; mutex.release(); empty.release(); }catch (InterruptedException e) { System.out.println("消费者获取资源失败!"); e.printStackTrace(); } } } }); producer.setName("producer"); consumer.setName("consumer"); producer.start(); consumer.start(); } }
运行结果
producer放入0 producer放入1 producer放入2 producer放入3 producer放入4 producer放入5 consumer取出0 consumer取出1 consumer取出2 consumer取出3 consumer取出4 consumer取出5 producer放入6 producer放入7 producer放入8 consumer取出6 consumer取出7 consumer取出8 producer放入9 producer放入0 consumer取出9 producer放入1 producer放入2 producer放入3 producer放入4 producer放入5 producer放入6 producer放入7 consumer取出0 consumer取出1 producer放入8
如果需要增加生产者和消费者的数量,只能复制一份,增加变量,比较麻烦
所以,下面封装为类。
Thread producer1=new Thread(new Runnable() { @Override public void run() { while (true){ try { empty.acquire(); mutex.acquire(); System.out.println(Thread.currentThread().getName()+"放入"+in); buffer[in]= ""+in; in=(in+1)%n; Thread.sleep(1000);//休眠表示放入的过程 mutex.release(); full.release(); }catch (InterruptedException e) { System.out.println("生产者获取资源失败!"); e.printStackTrace(); } } } }); Thread consumer1=new Thread(new Runnable() { @Override public void run() { while (true){ try { full.acquire(); mutex.acquire(); String o=buffer[out]; System.out.println(Thread.currentThread().getName()+"取出"+o); Thread.sleep(1000);//休眠表示放入的过程 out=(out+1)%n; mutex.release(); empty.release(); }catch (InterruptedException e) { System.out.println("消费者获取资源失败!"); e.printStackTrace(); } } } }); producer1.setName("producer1"); consumer1.setName("consumer1"); producer1.start(); consumer1.start();
运行结果
producer放入0 producer放入1 producer放入2 producer放入3 producer放入4 producer放入5 producer放入6 producer放入7 producer放入8 producer1放入9 consumer取出0 consumer取出1 consumer取出2 consumer取出3 consumer取出4 consumer取出5 consumer取出6 consumer1取出7 producer放入0 producer放入1 producer放入2 producer1放入3 consumer取出8 consumer1取出9 consumer1取出0 consumer1取出1
封装为类和对象
package os.prioducer_consumer; import java.util.concurrent.Semaphore; class MyProcess { static final int n=5; static int in = 0, out = 0; static String[] buffer = new String[n]; static Semaphore mutex = new Semaphore(1, false); static Semaphore empty = new Semaphore(n, false); static Semaphore full = new Semaphore(0, false); public void producerFunc() { try { empty.acquire(); mutex.acquire(); System.out.println(Thread.currentThread().getName()+"放入" + in); buffer[in] = "" + in; in = (in + 1) % n; Thread.sleep(1000);//休眠表示放入的过程 mutex.release(); full.release(); } catch (InterruptedException e) { System.out.println("生产者获取资源失败!"); e.printStackTrace(); } } public void consumerFunc() { try { full.acquire(); mutex.acquire(); String o = buffer[out]; System.out.println(Thread.currentThread().getName()+"取出" + o); Thread.sleep(1000);//休眠表示放入的过程 out = (out + 1) % n; mutex.release(); empty.release(); } catch (InterruptedException e) { System.out.println("消费者获取资源失败!"); e.printStackTrace(); } } } class Producer extends Thread { private MyProcess myProcess; public Producer(MyProcess myProcess) { this.myProcess = myProcess; } @Override public void run() { while (true){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } myProcess.producerFunc(); } } } class Consumer extends Thread{ private MyProcess myProcess; public Consumer(MyProcess myProcess) { this.myProcess = myProcess; } @Override public void run() { while (true){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } myProcess.consumerFunc(); } } } public class ProcessTest2 { public static void main(String[] args) { MyProcess myProcess=new MyProcess(); Producer producer = new Producer(myProcess); Producer producer1 = new Producer(myProcess); Consumer consumer = new Consumer(myProcess); Consumer consumer1 = new Consumer(myProcess); producer.setName("producer"); producer1.setName("producer1"); consumer.setName("consumer"); consumer1.setName("consumer1"); producer.start(); producer1.start(); consumer.start(); consumer1.start(); } }
运行结果
producer放入0 producer1放入1 consumer1取出0 producer放入2 consumer取出1 producer1放入3 consumer取出2 consumer1取出3 producer放入4 producer1放入5 consumer取出4 producer放入6 consumer1取出5 producer1放入7 consumer取出6 producer放入8 consumer1取出7 producer1放入9 consumer取出8 producer放入0 consumer1取出9 producer1放入1 consumer取出0 producer放入2 consumer1取出1 producer1放入3 consumer取出2
如果需要增加生产者和消费者的数量,只能需新建变量即可
Producer producer1 = new Producer(myProcess); Consumer consumer1 = new Consumer(myProcess); producer1.setName("producer1"); consumer1.setName("consumer1"); producer1.start(); consumer1.start();