1.问题描述
实现Java非阻塞式生产者消费者,用来解决,生产和消费对于资源访问不同步和造成资源冗余的问题
2.实现思想
- 针对于同一资源,生产者生产前会前检测资源是否大于0,如果大于0,则生产者线程释放资源锁,进入waiting阶段,如果小于0,则生产者线程持有锁,并且生产资源,生产一定资源之后,通知消费者。
- 消费者线程,去消费之前,会检测资源是否大于0,如果小于0,则阻塞等待,并且通知生产者生产资源,如果大于0,则消费资源,消费完成,唤醒生产者,生产资源。
- 这样就实现了每次生产者生产都是生产一定的资源,等待消费者消费完成之后,才去继续生产
3.代码
3.1使用wait、notify
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class Main {
public static class Breads {
private List<Integer> bordIntegers = new ArrayList<Integer>();
public synchronized void add() {
if (bordIntegers.size() > 0) {
System.out.println("生产者检测到目前资源大于0,不生产");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("生产者检测到目前资源小于0,开始生产");
for (int i = 0; i < 5; i++) {
bordIntegers.add(new Random().nextInt(1000));
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
notify();
}
public synchronized void del() {
if (bordIntegers.size() <= 0) {
System.out.println("消费者检测到目前资源小于0,通知生产者");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消费者检测到目前资源大于0,开始消费");
for (int i = 0; i < bordIntegers.size(); i++) {
System.out.println(bordIntegers.get(i));
}
bordIntegers.clear();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
notify();
}
}
public static void main(String[] args) {
Breads borad = new Breads();
Thread createThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
borad.add();
}
}
});
Thread consumethread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
borad.del();
}
}
});
createThread.start();
consumethread.start();
System.out.println("Main Thread exit!");
}
}
3.2使用ArrayBlockingQueue
import java.util.concurrent.ArrayBlockingQueue;
public class Main {
static ArrayBlockingQueue<Integer> bordBlockingQueue = new ArrayBlockingQueue<Integer>(
10);
public static void main(String[] args) {
Thread createThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
for (int i = 0; i < 10; i++) {
try {
bordBlockingQueue.put(i);
System.out.println("生产者生产");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});
Thread consumethread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
while (!bordBlockingQueue.isEmpty()) {
try {
System.out.println("消费者消费");
bordBlockingQueue.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
});
createThread.start();
consumethread.start();
System.out.println("Main Thread exit!");
}
}