带你手搓阻塞队列——自定义实现

简介: 带你手搓阻塞队列——自定义实现



 

一、阻塞队列的作用

一个分布式系统中,会经常出现这样的情况:有的机器能承担的压力更大,有的能承担的压力更小:

如果按照生产者消费者模型,那就另当别论了。

假设此时通过队列来让A和B进行交互:

 

二、阻塞队列实现

2.1 普通队列实现

在实现阻塞队列之前,我们先把普通的队列(基于数组的循环队列)进行一个简单的实现,然后通过进一步的改进,把普通的队列改造成一个阻塞队列。

class MyBlockingQueue{
    private int[] items;
    private int head = 0;//队列头指针
    private int tail = 0;//队列尾指针
    private int size = 0;//队列当前元素个数
    public MyBlockingQueue(){}
    //入队列
    public void put(int elem){}
    //出队列
    public int take(){}
}

2.1.1 构造方法

public MyBlockingQueue(){
        this.items = new int[100];
    }

2.1.2 入队列

//入队列
    public void put(int elem){
        if (size >= items.length){
            return;
        }
        items[tail] = elem;
        if (tail >= items.length){//判断尾指针是否到达末尾
            tail = 0;
        }
        tail++;
        size++;
    }

2.1.3 出队列

//出队列
    public int take(){
        if(size == 0){
            return -1;
        }
        int elem = items[head];
        if (head >= items.length){
            head = 0;
        }
        head++;
        size--;
        return elem;
    }

2.2 阻塞队列实现

现在,我们就把上面的队列改造成阻塞队列。

2.2.1 保证线程安全

在当前的代码下,如果是多线程的情况,调用put或者take,这两个方法中都涉及到了对变量的修改,这样就会出现线程安全问题。这就需要我们进行加锁。

//入队列
    public void put(int elem){
        synchronized (this){
            if (size >= items.length){
                return;
            }
            items[tail] = elem;
            if (tail >= items.length){
                tail = 0;
            }
            tail++;
            size++;
        }
    }
//出队列
    public int take(){
        synchronized (this){
            if(size == 0){
                return -1;
            }
            int elem = items[head];
            if (head >= items.length){
                head = 0;
            }
            head++;
            size--;
            return elem;
        }
    }

2.2.2 保证内存可见性

光加锁就够吗?我们可以看到,多线程的情况下,不光是对变量进行修改,还有读操作等等,那就有可能出现一个线程在读,另外一个线程在修改,这个读的线程没有读到。所以,此处除了加锁之外,还需要考虑内存可见性问题。也就是说,当其他线程进行修改的时候,我们要保证当前线程可以读到这个修改,所以我们把变量加上volatile关键字。

volatile private int head = 0;
    volatile private int tail = 0;
    volatile private int size = 0;

2.2.3 阻塞功能的实现

解决了上述问题后,我们就需要考虑一下如何实现阻塞功能了。

实现阻塞有两方面:

  • 当队列满的时候,再进行put(入队),就会产生阻塞。阻塞到队列中元素出队后,就去唤醒当前因队列满而被阻塞的状态。
  • 当队列空的时候,再进行take(出队),就会产生阻塞。阻塞到队列中有元素入队时,去唤醒当前因队列空而被阻塞的状态。
//入队列
    public void put(int elem) throws InterruptedException {
        synchronized (this){
            while (size >= items.length){
                //队列满了
                //return;
                this.wait();
            }
            items[tail] = elem;
            if (tail >= items.length){
                tail = 0;
            }
            tail++;
            size++;
            //成功入队
            this.notify();//唤醒因队列空而被阻塞的状态
        }
    }
    //出队列
    public int take() throws InterruptedException {
        synchronized (this){
            while (size == 0){
                //队列空
                //return -1;
                this.wait();
            }
            int elem = items[head];
            if (head >= items.length){
                head = 0;
            }
            head++;
            size--;
            this.notify();//使用这个notify唤醒队列满的阻塞状态
            return elem;
        }
    }
}

好了,经过上面的改进,我们就已经实现了一个简单的阻塞队列,下面是改进后的完整代码:

class MyBlockingQueue{
    private int[] items;
    volatile private int head = 0;
    volatile private int tail = 0;
    volatile private int size = 0;
    public MyBlockingQueue(){
        this.items = new int[100];
    }
    //入队列
    public void put(int elem) throws InterruptedException {
        synchronized (this){
            while (size >= items.length){
                //队列满了
                //return;
                this.wait();
            }
            items[tail] = elem;
            if (tail >= items.length){
                tail = 0;
            }
            tail++;
            size++;
            //成功入队
            this.notify();//唤醒因队列空而被阻塞的状态
        }
    }
    //出队列
    public int take() throws InterruptedException {
        synchronized (this){
            while (size == 0){
                //队列空
                //return -1;
                this.wait();
            }
            int elem = items[head];
            if (head >= items.length){
                head = 0;
            }
            head++;
            size--;
            this.notify();//使用这个notify唤醒队列满的阻塞状态
            return elem;
        }
    }
}

三、基于自定义阻塞队列,模拟生产者消费者模型

实现阻塞队列之后,我们利用阻塞队列简单模拟一下生产者消费者模型:

public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        //生产者线程
        Thread product = new Thread(()->{
            int count = 0;
            while (true){
                try {
                    queue.put(count);
                    System.out.println("生产元素:>"+count);
                    count++;
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //消费者线程
        Thread consummer = new Thread(()->{
            while (true){
                try {
                    int elem = queue.take();
                    System.out.println("消费元素:>"+elem);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        product.start();
        consummer.start();
    }

运行结果:


🌈🌈🌈好啦,今天的分享就到这里!

🛩️🛩️🛩️希望各位看官读完文章后,能够有所提升。

🎉🎉🎉创作不易,还希望各位大佬支持一下!

✈️✈️✈️点赞,你的认可是我创作的动力!

⭐⭐⭐收藏,你的青睐是我努力的方向!

✏️✏️✏️评论:你的意见是我进步的财富!

 

目录
相关文章
|
Java 测试技术 Maven
看到一个魔改线程池,面试素材加一!(中)
看到一个魔改线程池,面试素材加一!(中)
444 0
看到一个魔改线程池,面试素材加一!(中)
|
5月前
|
Java 程序员
惊呆了!LinkedList的这些队列功能,99%的程序员都没用过!
【6月更文挑战第18天】`LinkedList`不仅是Java集合中的列表实现,还可作队列(`peek()`,`add()`,`remove()`)和双端队列(`Deque`,`addFirst()`,`addLast()`,`peekFirst()`,`peekLast()`),甚至栈(`push()`,`pop()`,`peek()`)。常被低估,其实它具备从两端操作数据的强大能力,适合多种数据结构需求。
37 6
|
5月前
|
Java Apache Spring
面试官:如何自定义一个工厂类给线程池命名,我:现场手撕吗?
【6月更文挑战第3天】面试官:如何自定义一个工厂类给线程池命名,我:现场手撕吗?
38 0
|
6月前
|
存储 缓存 Java
【linux线程(四)】初识线程池&手撕线程池
【linux线程(四)】初识线程池&手撕线程池
|
JavaScript API
面试官:手撕代码!判断两个对象是否相等?
前言 在实际项目开发中,判断两个对象是否相等可能是比较常见的需求了,有些小伙伴会使用第三方库实现,有些小伙伴会自己手动实现。不管怎么实现,只能能满足项目需求,那就是好样的。但是可能有些小伙伴如果对 JS 还不够熟悉,他可能就会有疑问:判断相等不是用==比较就可以了吗?答案肯定是错误的,面试官要是听了你这个回答,估计会当场吐血! 今天就来学一学如何比较两个对象是否相等? 学习目标:实现判断两个对象是否相等,即所有键值对相等。
425 0
面试官:手撕代码!判断两个对象是否相等?
|
监控 Java
手撕Java线程池
原理和源码解析
3826 0
手撕Java线程池
10分钟手撸Java线程池,yyds!!
10分钟手撸Java线程池,yyds!!
268 0
10分钟手撸Java线程池,yyds!!
|
存储 负载均衡 并行计算
手撸一款简单高效的线程池(五)
在之前的内容中,我们给大家介绍了 C++实现线程池过程中的一些常用线优化方案,并分析了不同机制使用时的利弊。这一篇,是线程池系列的最后一章。我们会介绍一下 CGraph 中的 threadpool 如何使用,给出性能对比,并对接下来的工作做一些展望。让我们在线程池性能优化和功能提升的道路上,越走越远。
351 0
手撸一款简单高效的线程池(五)
|
缓存 负载均衡 Java
手撸一款简单高效的线程池(二)
大家好,我是不会写代码的纯序员——Chunel Feng[1]。这篇文章是线程池优化系列连载的第二篇。主要跟大家介绍几种线程池优化思路,包括:local-thread 机制、lock-free 机制、work-stealing 机制。
523 0
手撸一款简单高效的线程池(二)
|
缓存 负载均衡 前端开发
手撸一款简单高效的线程池(一)
线程池大家应该都用过,不过如何从 0 到 1 的设计一款简单好用且性能较好的线程池?我们在接下来的几篇文章中,为您一一介绍。
490 0
手撸一款简单高效的线程池(一)