Java多线程基础-9:代码案例之阻塞队列(二)

简介: Java多线程基础教程系列中,介绍了如何实现一个简单的阻塞队列(非泛型版本)。

Java多线程基础-9:代码案例之阻塞队列(一)+ https://developer.aliyun.com/article/1520531?spm=a2c6h.13148508.setting.14.75194f0eoT1Sgd


三、代码实现 BlockingQueue


此处介绍如何采用数组实现阻塞队列(不带泛型)。


阻塞队列就是“带有阻塞特性的队列”,实现一个阻塞队列主要分为3步:


实现一个普通队列。

加上线程安全。

加上阻塞功能。


1、实现一个普通队列


结合数据结构的知识,可以通过数组构造出一个循环队列:


class MyBlockingQueue {
    // 创建数组空间
    private int[] elem = new int[1000];
    private int front = 0;
    private int back = 0;
    private int size = 0;
 
    public void put(int value) {
        // 数组满
        if(elem.length == size) {
            return;
        }
 
        elem[back] = value;
        back++;
        // 判断
        if(back == elem.length) {
            back = 0;
        }
        size++;
    }
 
    public Integer take() {
        // 数组空
        if(size == 0) {
            return null;
        }
        int value = elem[front];
        front++;
 
        // 判断是否到数组末尾
        if(front == elem.length) {
            front = 0;
        }
        size--;
        return value;
    }
}



讲解一下这里为什么要加:



if(back == elem.length) {
    back = 0;
}
if(front == elem.length) {
    front = 0;
}


在tail或head到达数组的末端时,需要让这两个指针回到数组的开头,以此“循环”。很多同学可能接触过取余数的写法:back += (back+1)%数组长度。 这样的方式也是可行的,但是效率不高。


在写代码时,一般会追求开发效率和执行效率。开发效率是针对程序员开发程序而言的,而执行效率是针对代码运行而言的。写成上面取余数的方式,既没有提高程序的开发效率(这样写代码的可读性并不高),也没有保证执行效率,因此不是一种好的写法,尽管它也能达成目的。




而写成if的方式能够提高这两个效率。在执行效率方面,表面上看if语句有两行,但实际上,每次只有指针到达数组末尾了,才会进入if判断;而到达数组末尾的次数是远远小于操作总次数的。比如上述的elem数组,总长度为1000,只有执行了1000次操作后,才会进入一次if判断让back回到开头,而剩下的999次操作都不会进入if判断。


2、加上线程安全


即加锁,用synchronized即可。直接把synchronized加在成员方法take()和put()上,锁对象是this,即MyBlockingQueue类的实例。


如下面代码中,对象myBlockingQueue被创建出来,那么当它调用take和put时,该对象的实例就是锁对象,即this。找到谁是锁对象,对后面实现阻塞功能有重要影响。


public class Test {
    public static void main(String[] args) {
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
    }
}
 
 
class MyBlockingQueue {
    // 创建数组空间
    private int[] elem = new int[10];
    private int front = 0;
    private int back = 0;
    private int size = 0;
 
    synchronized public void put(int value) {
        // 数组满
        if(elem.length == size) {
            return;
        }
 
        elem[back] = value;
        back++;
        // 判断
        if(back == elem.length) {
            back = 0;
        }
        size++;
    }
 
    synchronized public Integer take() {
        // 数组空
        if(size == 0) {
            return null;
        }
        int value = elem[front];
        front++;
 
        // 判断是否到数组末尾
        if(front == elem.length) {
            front = 0;
        }
        size--;
        return value;
    }
}


3、加上阻塞功能


根据阻塞队列的特性:队列满时继续put()会阻塞,直到有线程take();队列空时继续从队列中take()也会阻塞。,直到有线程put()。通过wait()和notify()配合来实现。调用 wait() 和 notify() 方法的对象必须是共享同一个锁对象的线程,否则会抛出 IllegalMonitorStateException 异常。由于synchronized直接加在了成员方法上,因此锁对象就是this,那么调用wait()和notify()的对象也是this。


public class Test {
    public static void main(String[] args) {
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
    }
}
 
 
class MyBlockingQueue {
    // 创建数组空间
    private int[] elem = new int[10];
    private int front = 0;
    private int back = 0;
    private int size = 0;
 
    synchronized public void put(int value) throws InterruptedException {
        // 数组满
        if(elem.length == size) {
            this.wait();    //队列满再入队列->阻塞
        }
 
        elem[back] = value;
        back++;
        // 判断
        if(back == elem.length) {
            back = 0;
        }
        size++;
        this.notify();    // 唤醒因队列空而阻塞的情况
    }
 
    synchronized public Integer take() throws InterruptedException {
        // 数组空
        if(size == 0) {
            this.wait();    // 队列空而出队列->阻塞
        }
        int value = elem[front];
        front++;
 
        // 判断是否到数组末尾
        if(front == elem.length) {
            front = 0;
        }
        size--;
        this.notify();    // 唤醒因队列满而阻塞的情况
        return value;
    }
}


注意,上面两个队列不可能同时阻塞,也就是说,一个队列不可能既是空,又是满。


4、用while替代if进行条件判断


还有一点要强调,其实Java官方并不是非常推荐这样使用wait:



查看Java官方的说明,可以发现:



也就是说,wait()方法可能会被其它其它方法中断,如interrupt方法。此时,wait的等待条件其实还没成熟,就被提前唤醒了,这样代码的执行可能就不符合预期了。



明明队列非空的条件还没满足,但wait被唤醒后就继续往下走了,没有进行条件判断


解决这个问题很简单:更稳妥的方法,是在wait被唤醒之后,再进行一次条件判断。wait之前,发现继续执行的条件不满足,开始wait,然后等到wait被唤醒之后,再确认一下条件是不是满足。如果不满足,还可以继续wait。


wait应总是在while中使用。如这个例子:https://blog.csdn.net/wyd_333/article/details/130575605


当中,涉及的“虚假唤醒”问题就与wait是否在while中使用直接相关。


5、完整代码:MyBlockingQueue


class MyBlockingQueue {
    // 创建数组空间
    private int[] elem = new int[10];
    private int front = 0;
    private int back = 0;
    private int size = 0;
 
    synchronized public void put(int value) throws InterruptedException {
        // 数组满
        // 用while判断是否满足条件
        while(elem.length == size) {
            this.wait();
        }
 
        elem[back] = value;
        back++;
        // 判断
        if(back == elem.length) {
            back = 0;
        }
        size++;
        this.notify();
    }
 
    synchronized public Integer take() throws InterruptedException {
        // 数组空
        // 用while判断是否满足条件
        while(size == 0) {
            this.wait();
        }
        int value = elem[front];
        front++;
 
        // 判断是否到数组末尾
        if(front == elem.length) {
            front = 0;
        }
        size--;
        this.notify();
        return value;
    }
}


6、实现生产者-消费者模型


public class Test {
    public static void main(String[] args) {
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
 
        // 生产者 put
        Thread t1 = new Thread(() -> {
            try {
                int value = 0;
                while(true) {
                    System.out.println("生产:" + value);
                    myBlockingQueue.put(value);
                    Thread.sleep(1000);
                    value++;
                }
 
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
 
        t1.start();
        // 消费者 take
        Thread t2 = new Thread(() -> {
            while(true) {
                try {
                    int value = myBlockingQueue.take();
                    System.out.println("消费:" + value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t2.start();
    }
}



在生产者中添加Thread.sleep(),此时是生产得快,消费得慢。运行后可以直观感受到,元素是生产一个、消费一个的。每生产完一个,就立即消费掉,等待再次生产。


当然,也可以在消费者中添加Thread.sleep()。此时是生产得慢,而消费得快。在消费者sleep的时候,生产者生产迅速,很可能会一口气生成完整个队列中的所有元素,令队列满而生产者进入阻塞。等待消费者结束sleep,消费1个元素,此时生产者才会再次生产。


        Thread t2 = new Thread(() -> {
            while(true) {
                try {
                    // 令消费者sleep
                    Thread.sleep(3000);
                    int value = myBlockingQueue.take();
                    System.out.println("消费:" + value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });



上面之所以一口气只生产到10,是因为在MyBlockingQueue中,把队列容量设定为了10.


    // 创建数组空间
    private int[] elem = new int[10];


四、**补充:SynchronousQueue

在 Java 中,SynchronousQueue 是一个实现了 BlockingQueue 接口的类。与普通的 BlockingQueue 不同的是,SynchronousQueue没有容量capacity,具有一些特殊的行为。


1、SynchronousQueue的使用

***有一个疑问:SynchronousQueue没有capacity,那么它怎么存放put进来的元素呢?又从哪里take元素呢?


确实,SynchronousQueue 并不是像其他阻塞队列那样有固定的容量,它的实现实际上是基于一种传输机制,即:一个线程试图将元素添加到队列中时,必须等待另一个线程从队列中取走该元素,反之亦然。换句话说,SynchronousQueue 可以看作是一个零容量的阻塞队列,其中元素的存储和移除,是通过线程之间的相互配合完成的。


具体来说:当一个线程调用 put 方法时,它会将要添加的元素暂时保存在本地变量中,然后阻塞自己,等待另一个线程调用 take 方法取走该元素。当另一个线程调用 take 方法时,它也会阻塞自己,等待另一个线程调用 put 方法并将要传递的元素交给它。在这个过程中,元素的实际存储并不需要使用队列,因为队列并没有容量,而是仅用于实现线程之间的协作机制。


因此,SynchronousQueue 可以用于在两个线程之间传递元素:一个线程生产元素并调用 put 方法,另一个线程消费元素并调用 take 方法。这种交换操作在两个线程之间同步进行(因此也称为同步队列)。


下面展示了如何在两个线程之间使用 SynchronousQueue 进行元素交换:


import java.util.concurrent.SynchronousQueue;
 
public class SynchronousQueueExample {
    public static void main(String[] args) {
        final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
 
        Thread producer = new Thread(new Runnable() {
            public void run() {
                try {
                    int value = 42;
                    System.out.println("Producer is putting: " + value);
                    queue.put(value);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
 
        Thread consumer = new Thread(new Runnable() {
            public void run() {
                try {
                    int value = queue.take();
                    System.out.println("Consumer is taking: " + value);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
 
        producer.start();
        consumer.start();
    }
}


在上面程序中,我们创建了一个 SynchronousQueue 实例,然后创建了一个生产者线程和一个消费者线程。生产者线程调用 put 方法将值 42 添加到队列中,然后阻塞等待消费者线程取走这个元素。消费者线程调用 take 方法从队列中取出一个元素,并在控制台输出它。由于这个队列是同步的,因此生产者线程和消费者线程将会同步进行,直到元素被成功交换。


2、注意

(1)对于 SynchronousQueue 来说,put 和 take 操作必须是交替执行的

如果一个线程调用了 put 方法并被阻塞,那么它只有等待另一个线程调用 take 方法才能继续执行。反之,如果一个线程调用了 take 方法并被阻塞,那么它只有等待另一个线程调用 put 方法才能继续执行。


这种交替执行的特性使得 SynchronousQueue 在某些情况下非常有用。


例如,当一个线程需要向另一个线程传递数据时,可以使用 SynchronousQueue 来保证线程之间的同步和可靠性。在这种情况下,一个线程负责生产数据并将其添加到队列中,而另一个线程负责消费数据并从队列中取出它。由于队列中始终只有一个元素,因此这种操作是非常高效和可靠的。


但要注意,如果一个线程调用了 put 方法但是没有其他线程调用 take 方法取走元素,那么这个线程就会一直被阻塞下去。同样地,如果一个线程调用了 take 方法但是没有其他线程调用 put 方法添加元素,那么这个线程也会一直被阻塞下去。


(2)SynchronousQueue 中的元素不可重复使用。

一旦一个线程调用 put 方法将元素添加到队列中,并被另一个线程调用 take 方法取走,该元素就会从队列中消失,不再可用于后续的操作了。这也是 SynchronousQueue 的一个特殊之处,使得它非常适用于需要高效、安全地将数据传递给另一个线程的场景。


因此,在使用 SynchronousQueue 时,需要特别注意线程的调度和同步问题,以免造成死锁或其他问题。


相关文章
|
4天前
|
Java
在 Java 中捕获和处理自定义异常的代码示例
本文提供了一个 Java 代码示例,展示了如何捕获和处理自定义异常。通过创建自定义异常类并使用 try-catch 语句,可以更灵活地处理程序中的错误情况。
|
4天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
23 9
|
7天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
4天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
7天前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
21 3
|
6天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
7天前
|
Java UED
Java中的多线程编程基础与实践
【10月更文挑战第35天】在Java的世界中,多线程是提升应用性能和响应性的利器。本文将深入浅出地介绍如何在Java中创建和管理线程,以及如何利用同步机制确保数据一致性。我们将从简单的“Hello, World!”线程示例出发,逐步探索线程池的高效使用,并讨论常见的多线程问题。无论你是Java新手还是希望深化理解,这篇文章都将为你打开多线程的大门。
|
8天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
17天前
|
安全 Java
java 中 i++ 到底是否线程安全?
本文通过实例探讨了 `i++` 在多线程环境下的线程安全性问题。首先,使用 100 个线程分别执行 10000 次 `i++` 操作,发现最终结果小于预期的 1000000,证明 `i++` 是线程不安全的。接着,介绍了两种解决方法:使用 `synchronized` 关键字加锁和使用 `AtomicInteger` 类。其中,`AtomicInteger` 通过 `CAS` 操作实现了高效的线程安全。最后,通过分析字节码和源码,解释了 `i++` 为何线程不安全以及 `AtomicInteger` 如何保证线程安全。
java 中 i++ 到底是否线程安全?
|
7天前
|
Java
java小知识—进程和线程
进程 进程是程序的一次执行过程,是系统运行的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程 线程,与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比
17 1