概念
- 解决线程通信的问题。
- 阻塞方法:put、take。
• 生产者消费者模式
- 生产者:产生数据的线程。
- 消费者:使用数据的线程。
• 实现类
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlockingQueue、SynchronousQueue、DelayQueue等
测试代码
我们先创建一个通道,通道的容量设置为10,然后创建一个生产者,每隔20毫秒生产一个数,放入通道。如果通道总数够十个,则停止生产,总共生产100个。创建一个消费者,每隔一段时间消费产品,观察最终输出。
1. package com.nowcoder.community; 2. 3. import java.util.Random; 4. import java.util.concurrent.ArrayBlockingQueue; 5. import java.util.concurrent.BlockingQueue; 6. 7. public class BlockingQueueTests { 8. 9. public static void main(String[] args) { 10. BlockingQueue queue = new ArrayBlockingQueue(10); 11. new Thread(new Producer(queue)).start(); 12. new Thread(new Consumer(queue)).start(); 13. new Thread(new Consumer(queue)).start(); 14. new Thread(new Consumer(queue)).start(); 15. } 16. 17. } 18. 19. class Producer implements Runnable { 20. 21. private BlockingQueue<Integer> queue; 22. 23. public Producer(BlockingQueue<Integer> queue) { 24. this.queue = queue; 25. } 26. 27. @Override 28. public void run() { 29. try { 30. for (int i = 0; i < 100; i++) { 31. Thread.sleep(20); 32. queue.put(i); 33. System.out.println(Thread.currentThread().getName() + "生产:" + queue.size()); 34. } 35. } catch (Exception e) { 36. e.printStackTrace(); 37. } 38. } 39. 40. } 41. 42. class Consumer implements Runnable { 43. 44. private BlockingQueue<Integer> queue; 45. 46. public Consumer(BlockingQueue<Integer> queue) { 47. this.queue = queue; 48. } 49. 50. @Override 51. public void run() { 52. try { 53. while (true) { 54. Thread.sleep(new Random().nextInt(1000)); 55. queue.take(); 56. System.out.println(Thread.currentThread().getName() + "消费:" + queue.size()); 57. } 58. } catch (Exception e) { 59. e.printStackTrace(); 60. } 61. } 62. }