Java如何使用BlockingQueue模拟生产者与消费者?
" package com.stardust.study.demo.se.multithread;
import org.apache.commons.lang3.StringUtils;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;
public class ProductDemo { public static void main(String[] args) { BlockingQueue blockingQueue = new LinkedBlockingQueue<>(3); ProductThread productThread = new ProductThread(blockingQueue); ConsumerThread consumerThread = new ConsumerThread(blockingQueue); Thread p = new Thread(productThread); Thread c = new Thread(consumerThread); p.start(); c.start(); try { Thread.sleep(2 * 1000); productThread.stop(); } catch (InterruptedException e) { e.printStackTrace(); } } }
class ProductThread implements Runnable{ private BlockingQueue blockingQueue; private AtomicInteger atomicInteger = new AtomicInteger(); private volatile boolean flag = true;
ProductThread(BlockingQueue<String> blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println(""线程:"" + Thread.currentThread().getName() + "",生产者开始执行"");
while (flag){
String count = atomicInteger.incrementAndGet() + """";
try {
boolean offer = blockingQueue.offer(count, 2, TimeUnit.SECONDS);
if(offer) {
System.out.println(""线程:"" + Thread.currentThread().getName() + "",生产者添加元素成功"");
}else {
System.out.println(""线程:"" + Thread.currentThread().getName() + "",生产者添加元素失败"");
}
} catch (Exception e) {
}
}
System.out.println(""线程:"" + Thread.currentThread().getName() + "",生产者结束执行"");
}
public void stop(){
this.flag = false;
}
}
class ConsumerThread implements Runnable{ private BlockingQueue blockingQueue; private volatile boolean flag = true;
ConsumerThread(BlockingQueue<String> blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println(""线程:"" + Thread.currentThread().getName() + "",消费者开始执行"");
while (flag){
try {
String count = blockingQueue.poll(2, TimeUnit.SECONDS);
if(StringUtils.isBlank(count)) {
this.flag = false;
System.out.println(""线程:"" + Thread.currentThread().getName() + "",消费者超过两秒没有获取消息"");
return;
}
System.out.println(""线程:"" + Thread.currentThread().getName() + "",消费者成功获取消息:"" + count);
} catch (Exception e) {
}
}
}
}"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。