自实现阻塞队列:
实现阻塞队列的关键在于实现其阻塞的功能。其他的和普通的队列差不多。这里主要实现put和take方法:
1. 2. class MyBlockingQueue{ 3. 4. //利用数组实现 5. private int[] arr=new int[1000];//设定数组长度为1000 6. 7. private int size=0;//记录数组的内容长度 8. //利用end和begin两个指针使得数组变为循环数组(逻辑上的循环) 9. private int end=0; 10. private int begin=0; 11. 12. //实现put方法 13. //阻塞考虑使用wait和notify进行唤醒(sleep不太靠谱) 14. public void put(int value) throws InterruptedException { 15. //判断是否满了(这里要用循环判断,因为在多线程当中,线程被唤醒的时候不一定不满) 16. //加锁保证原子性 17. synchronized (this){ 18. while(size>= arr.length){ 19. this.wait(); 20. } 21. //不满之后放入元素 22. arr[end]=value; 23. //调整长度 24. end++; 25. size++; 26. //如果放满了则将end变为0 27. if(end>= arr.length){ 28. end=0; 29. } 30. //进行唤醒 31. this.notify(); 32. } 33. } 34. //实现take方法 35. public int take() throws InterruptedException { 36. synchronized (this){ 37. //判断是否为空 38. while (size==0){ 39. this.wait(); 40. } 41. //不空之后开始取出元素 42. int ret=arr[begin]; 43. begin++; 44. if(begin>= arr.length){ 45. begin=0; 46. } 47. size--; 48. this.notify(); 49. return ret; 50. } 51. 52. } 53. //长度 54. public synchronized int Size(){ 55. return size; 56. } 57. 58. } 59. 60. 61. public class BlockingQueueDemo3 { 62. 63. public static void main(String[] args) throws InterruptedException { 64. 65. MyBlockingQueue queue=new MyBlockingQueue(); 66. queue.put(100); 67. queue.put(200); 68. queue.put(300); 69. System.out.println(queue.take()); 70. System.out.println(queue.take()); 71. System.out.println(queue.take()); 72. 73. } 74. }
显然,当其中没有元素的时候就会阻塞等待。
生产者消费者模型:
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。
首先,阻塞队列相当于一个缓冲区,它平衡了生产者和消费者之间的处理能力。
其次,阻塞队列使得生产者和消费者之间的进行了解耦合,也就是消费者不再是直接依赖生产者。
对于阻塞队列,生产者是添加元素的一方,消费者是取元素的一方,产品是阻塞队列的元素。生产者和消费者通过阻塞队列相互联系。
画个图演示一下:
我们举向游戏里面氪金的例子吧!!!
这样就会出现一个问题:服务器A和服务器B的耦合太高,一旦其中一个服务器出现了问题,就会导致另一个服务器也无法完成需求。就会出现一个服务器挂了,把另一个服务器也带走的情况。
并且,如果我们需要在此基础上加一个新的服务器参与其他相关的功能,比如日志,也是会有问题的!
那么如何解决这种情况呢?这就用到了当前的生产者消费者模型。生产者生成的资源,我们可以将其放到一个阻塞队列当中去,当有消费者需要消费的时候,就直接从该阻塞队列当中去取,如果队列中没有资源,就阻塞等待,等待生产者进行生产,当阻塞队列满的时候,生产者也要进行阻塞等待。这里的服务器A就是生产者,服务器BC就是消费者。所以我们可以利用该模型进行这样的设计:
生产者消费者模型的实现:
利用系统的BlockingQueue实现生产者消费者模型:
首先我们利用系统提供的BlockingQueue实现生产者消费者模型:
1. import java.util.concurrent.BlockingQueue; 2. import java.util.concurrent.LinkedBlockingQueue; 3. 4. public class BlockingQueueDemo2 { 5. public static void main(String[] args) { 6. //创建阻塞队列 7. BlockingQueue<Integer>queue=new LinkedBlockingQueue<>(); 8. //使用两个线程:一个线程充当生产者,一个线程充当消费者 9. //生产者 10. Thread t1=new Thread(()->{ 11. int count=0; 12. while(true){ 13. try { 14. queue.put(count); 15. System.out.println("生产者生产:"+count); 16. count++; 17. Thread.sleep(500); 18. } catch (InterruptedException e) { 19. throw new RuntimeException(e); 20. } 21. } 22. }); 23. Thread t2=new Thread(()->{ 24. 25. while(true){ 26. try { 27. int ret=queue.take(); 28. System.out.println("消费者消费:"+ret); 29. Thread.sleep(500); 30. } catch (InterruptedException e) { 31. throw new RuntimeException(e); 32. } 33. } 34. }); 35. t1.start(); 36. t2.start(); 37. } 38. }
我们可以明确的看到,消费元素和生产元素是成对出现的。这就不会出现生产者没有生产出来的东西被消费的情况。
利用自实现的BlockingQueue实现生产者消费者模型:
1. 2. class MyBlockingQueue1{ 3. 4. //利用数组实现 5. private int[] arr=new int[1000];//设定数组长度为1000 6. 7. private int size=0;//记录数组的内容长度 8. //利用end和begin两个指针使得数组变为循环数组(逻辑上的循环) 9. private int end=0; 10. private int begin=0; 11. 12. //实现put方法 13. //阻塞考虑使用wait和notify进行唤醒(sleep不太靠谱) 14. public void put(int value) throws InterruptedException { 15. //判断是否满了(这里要用循环判断,因为在多线程当中,线程被唤醒的时候不一定不满) 16. //加锁保证原子性 17. synchronized (this){ 18. while(size>= arr.length){ 19. this.wait(); 20. } 21. //不满之后放入元素 22. arr[end]=value; 23. //调整长度 24. end++; 25. size++; 26. //如果放满了则将end变为0 27. if(end>= arr.length){ 28. end=0; 29. } 30. //进行唤醒 31. this.notify(); 32. } 33. } 34. //实现take方法 35. public int take() throws InterruptedException { 36. synchronized (this){ 37. //判断是否为空 38. while (size==0){ 39. this.wait(); 40. } 41. //不空之后开始取出元素 42. int ret=arr[begin]; 43. begin++; 44. if(begin>= arr.length){ 45. begin=0; 46. } 47. size--; 48. this.notify(); 49. return ret; 50. } 51. 52. } 53. //长度 54. public synchronized int Size(){ 55. return size; 56. } 57. 58. } 59. 60. 61. public class BlockingQueueDemo4 { 62. public static void main(String[] args) throws InterruptedException { 63. MyBlockingQueue1 queue1=new MyBlockingQueue1(); 64. //生产者 65. Thread producer =new Thread(()->{ 66. int count=0; 67. while(true){ 68. try { 69. queue1.put(count); 70. System.out.println("生产者生产元素:"+count); 71. count++; 72. Thread.sleep(500); 73. } catch (InterruptedException e) { 74. throw new RuntimeException(e); 75. } 76. } 77. }); 78. 79. //消费者 80. Thread customer =new Thread(()->{ 81. while(true){ 82. try { 83. int ret=queue1.take(); 84. System.out.println("消费者消费元素:"+ret); 85. Thread.sleep(500); 86. } catch (InterruptedException e) { 87. throw new RuntimeException(e); 88. } 89. } 90. }); 91. producer.start(); 92. customer.start(); 93. } 94. }