一个使用Java BlockingQueue实现的生产者和消费者

简介: 一个使用Java BlockingQueue实现的生产者和消费者

消费者

package consumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Consumer implements Runnable {
  /*
   * 用util.concurrent.BlockingQueue沟通生产者和消费者的桥梁
  */
  BlockingQueue<String> queue;
  String id;
  @SuppressWarnings("unused")
  private volatile boolean      isRunning               = true;
  public Consumer(BlockingQueue<String> queue, String id) {
        this.queue = queue;
        this.id = id;
    }
    public void stop() {
        isRunning = false;
    }
  @Override
  public void run() {
    System.out.println("Thread: " + id + " Consumer thread is running...");
        boolean isRunning = true;
        try {
            while (isRunning) {
                System.out.println("Thread: " + id + " fetch data from linkedQueue..." + " queue size: " + queue.size());
                /*
                 * 从队列里取出一个元素,2秒超时,如果两秒之后还没有东西可以取,则poll返回null
                 */
                String data = queue.poll(2, TimeUnit.SECONDS);
                if (null != data) {
                    System.out.println("Thread: " + id + " has consumed one data from queue: " + data
                        + "   Queue sise: " + queue.size());
                    // simulate data consumption
                    Thread.sleep(1000);
                } else {
                    isRunning = false;
                    // 消费者准备退出
                    System.out.println("Thread: " + id + " Consumer read queue timeout");
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("Thread: " + id + " consumer thread ends");
        }
  }
}

生产者

package consumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
  BlockingQueue<String> queue;
  String id;
    public Producer(BlockingQueue<String> queue, String id) {
        this.queue = queue;
        this.id = id;
    }
    @Override
    public void run() {
        String data = null;
        try {
            while (isRunning) {
                System.out.println("PRODUCER: " + id + " is running");
                Thread.sleep(100);
                data = "data:" + count.incrementAndGet();
                System.out.println("Thread: " + id + " procedued data into queue: " + data + " ...");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.out.println("failed to put data into queue: " + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("Thread: " + id + " quit from producer thread");
        }
    }
    public void stop() {
        isRunning = false;
    }
    private volatile boolean      isRunning               = true;
    private static AtomicInteger  count                   = new AtomicInteger();
}

测试代码

package consumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class ConsumerProducerTest {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(15);
        Producer producer1 = new Producer(queue, "PROD1");
        Producer producer2 = new Producer(queue, "PROD2");
        Producer producer3 = new Producer(queue, "PROD3");
        Consumer consumer1 = new Consumer(queue, "CONSUMER1");
        Consumer consumer2 = new Consumer(queue, "CONSUMER2");
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer1);
        service.execute(consumer2);
        Thread.sleep(3 * 1000);
        producer1.stop(); // 一定要先关闭生产者
        producer2.stop();
        producer3.stop();
        consumer1.stop();
        consumer2.stop();
        Thread.sleep(2000);
        service.shutdown();
    }
}


相关文章
|
3月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
31 0
|
1天前
|
消息中间件 存储 Java
Java与Go的生产者消费者模型比较
【4月更文挑战第20天】
8 1
|
5月前
|
存储 Java
Java生产者消费者的三种实现
Java生产者消费者是最基础的线程同步问题,java岗面试中还是很容易遇到的,之前没写过多线程的代码,面试中被问到很尬啊,面完回来恶补下。在网上查到大概有5种生产者消费者的写法,分别如下。
44 0
|
2月前
|
存储 安全 Java
Java并发基础:BlockingQueue和BlockingDeque接口的区别?
BlockingQueue 和 BlockingDeque 它们都支持在并发编程中的线程安全操作,但是,这两个接口之间存在一些关键的区别,主要在于它们所支持的操作和数据结构的特性,
Java并发基础:BlockingQueue和BlockingDeque接口的区别?
|
3月前
|
Java
用java实现生产者和消费者模式
用java实现生产者和消费者模式
29 1
|
4月前
|
Java
Java之多线程的生产者消费者问题的详细解析
3.生产者消费者 3.1生产者和消费者模式概述【应用】 概述 生产者消费者模式是一个十分经典的多线程协作的模式,弄懂生产者消费者问题能够让我们对多线程编程的理解更加深刻。
44 0
Java之多线程的生产者消费者问题的详细解析
|
8月前
|
存储 安全 Java
Java中多线程同步问题、生产者与消费者、守护线程和volatile关键字(附带相关面试题)
1.多线程同步问题(关键字Synchronized),2. Object线程的等待与唤醒方法,3.模拟生产者与消费者,4.守护线程,5.volatile关键字
47 0
|
8月前
|
存储 安全 算法
【Java|多线程与高并发】阻塞队列以及生产者-消费者模型
阻塞队列(BlockingQueue)常用于多线程编程中,可以实现线程之间的同步和协作。它可以用来解决生产者-消费者问题,其中生产者线程将元素插入队列,消费者线程从队列中获取元素,它们之间通过阻塞队列进行协调。
|
Java
Java 实现汉字按照首字母分组排序
Java 实现汉字按照首字母分组排序
561 0
|
9月前
|
消息中间件 Java
Java中生产者消费者模型
Java中生产者消费者模型