手撸MQ消息队列——循环数组

简介: 队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。

队列是咱们开发中经常使用到的一种数据结构,它与的结构类似。然而栈是后进先出,而队列是先进先出,说的专业一点就是FIFO。在生活中到处都可以找到队列的,最常见的就是排队,吃饭排队,上地铁排队,其他就不过多举例了。

队列的模型

在数据结构中,和排队这种场景最像的就是数组了,所以我们的队列就用数组去实现。在排队的过程中,有两个基本动作就是入队出队,入队就是从队尾插入一个元素,而出队就是从队头移除一个元素。基本的模型我们可以画一个简图:
image-20240914101140833.png

看了上面的模型,我们很容易想到使用数组去实现队列,

  1. 先定义一个数组,并确定数组的长度,我们暂定数组长度是5,而上面图中的长度是一样的;
  2. 再定义两个数组下标,fronttail,front是队头的下标,每一次出队的操作,我们直接取front下标的元素就可以了。tail是队尾的下标,每一次入队的操作,我们直接给tail下标的位置插入元素就可以了。

我们看一下具体的过程,初始状态是一个空的队列,
image-20240914102724932.png

队头下标和队尾下标都是指向数组中的第0个元素,现在我们插入第一个元素“a”,如图:
image-20240914103323115.png

数组的第0个元素赋值“a”,tail的下标+1,由指向第0个元素变为指向第1个元素。这些变化我们都要记住啊,后续在编程实现的过程中,每一个细节都不能忽略。然后我们再做一次出队操作:
image-20240914103815186.png

第0个元素“a”在数组中移除了,并且front下标+1,指向第1个元素。

这些看起来不难实现啊,不就是给数组元素赋值,然后下标+1吗?但是我们想一想极端的情况, 我们给数组的最后一个元素赋值后,数组的下标怎么办?
image-20240914104554590.png

tail如果再+1,就超越了数组的长度了呀,这是明显的越界了。同样front如果取了数组中的最后一个元素,再+1,也会越界。这怎么办呢?

循环数组

我们最开始想到的方法,就是当tail下标到达数组的最后一个元素的时候,对数组进行扩容,数组的长度又5变为10。这种方法可行吗?如果一直做入队操作,那么数组会无限的扩容下去,占满磁盘空间,这是我们不想看到的。

另外一个方法,当front或tail指向数组最后一个元素时,再进行+1操作,我们将下标指向队列的开头,也就是第0个元素,形成一个循环,这就叫做循环数组。那么这里又引申出一个问题,我们的下标怎么计算呢?

  1. 数组的长度是5;
  2. tail当前的下标是4,也就是数组的最后一个元素;
  3. 我们给最后一个元素赋值后,tail怎么由数组的最后一个下标4,变为数组的第一个下标0?

这里我们可以使用取模来解决:tail = (tail + 1) % mod,模(mod)就是我们的数组长度5,我们可以试一下,tail当前值是4,套入公式计算得到0,符合我们的需求。我们再看看其他的情况符不符合,假设tail当前值是1,套入公式计算得出2,也相当于是+1操作,没有问题的。只有当tail+1=5时,才会变为0,这是符合我们的条件的。那么我们实现队列的方法就选用循环数组,而且数组下标的计算方法也解决了。

队列的空与满

队列的空与满对入队和出队的操作是有影响的,当队列是满的状态时,我们不能进行入队操作,要等到队列中有空余位置才可以入队。同样当队列时空状态时,我们不能进行出队操作,因为此时队列中没有元素,要等到队列中有元素时,才能进行出队操作。那么我们怎么判断队列的空与满呢?

我们先看看队列空与满时的状态:
image-20240914102724932.png

空时的状态就是队列的初始状态,front和tail的值是相等的。
image-20240914114618562.png

满时的状态也是front==tail,我们得到的结论是,front==tail时,队列不是空就是满,那么到底是空还是满呢?这里我们要看看是什么操作导致的front==tail,如果是入队导致的front==tail,那么就是满;如果是出队导致的front==tail,那就是空。

手撸代码

好了,队列的模型以及基本的问题都解决了,我们就可以手撸代码了,我先把代码贴出来,然后再给大家讲解。

public class MyQueue<T> {
   

    //循环数组
    private T[] data;
    //数组长度
    private int size;
    //出队下标
    private int front =0;
    //入队下标
    private int tail = 0;
    //导致front==tail的原因,0:出队;1:入队
    private int flag = 0;

    //构造方法,定义队列的长度
    public MyQueue(int size) {
   
        this.size = size;
        data = (T[])new Object[size];
    }

    /**
     * 判断对队列是否满
     * @return
     */
    public boolean isFull() {
   
        return front == tail && flag == 1;
    }

    /**
     * 判断队列是否空
     * @return
     */
    public boolean isEmpty() {
   
        return front == tail && flag == 0;
    }

    /**
     * 入队操作
     * @param e
     * @return
     */
    public boolean add(T e) {
   
        if (isFull()) {
   
            throw new RuntimeException("队列已经满了");
        }
        data[tail] = e;
        tail = (tail + 1) % size;
        if (tail == front) {
   
            flag = 1;
        }

        return true;
    }

    /**
     * 出队操作
     * @return
     */
    public T poll() {
   
        if (isEmpty()) {
   
            throw new RuntimeException("队列中没有元素");
        }
        T rtnData = data[front];
        front = (front + 1) % size;
        if (front == tail) {
   
            flag = 0;
        }
        return rtnData;
    }
}

在类的开始,我们分别定义了,循环数组,数组的长度,入队下标,出队下标,还有一个非常重要的变量flag,它表示导致front==tail的原因,0代表出队,1代表入队。这里我们初始化为0,因为队列初始化的时候是空的,而且front==tail,这样我们判断isEmpty()的时候也是正确的。

接下来是构造方法,在构造方法中,我们定义了入参size,也就是队列的长度,其实就是我们循环数组的长度,并且对循环数组进行了初始化。

再下面就是判断队列空和满的方法,实现也非常的简单,就是依照上一小节的原理。

然后就是入队操作,入队操作要先判断队列是不是已经满了,如果满了,我们进行报错,不进行入队的操作。有的同学可能会说,这里应该等待,等待队列有空位了再去执行。这种说法是非常正确的,我们先把最基础的队列写完,后面还会再完善,大家不要着急。下面就是对循环数组的tail元素进行赋值,赋值后,使用我们的公式移动tail下标,tail到达最后一个元素时,通过公式计算,可以回到第0个元素。最后再判断一下,这个入队操作是不是导致了front==tail,如果导致了,就将flag置为1。

出队操作和入队操作类似,只不过是取值的步骤,这里不给大家详细解释了。

我们做个简单的测试吧,

 public static void main(String[] args) {
   
        MyQueue<String> myQueue = new MyQueue<>(5);
        System.out.println("isFull:"+myQueue.isFull()+" isEmpty:"+myQueue.isEmpty());
        myQueue.add("a");
        System.out.println("isFull:"+myQueue.isFull()+" isEmpty:"+myQueue.isEmpty());
        myQueue.add("b");
        myQueue.add("c");
        myQueue.add("d");
        myQueue.add("e");
        System.out.println("isFull:"+myQueue.isFull()+" isEmpty:"+myQueue.isEmpty());
        myQueue.add("f");
    }

我们定义长度是5的队列,分别加入a b c d e f6个元素,并且看一下空和满的状态。

打印日志如下:

isFull:false isEmpty:true
isFull:false isEmpty:false
isFull:true isEmpty:false
Exception in thread "main" java.lang.RuntimeException: 队列已经满了
    at org.example.queue.MyQueue.add(MyQueue.java:29)
    at org.example.queue.MyQueue.main(MyQueue.java:82)

空和满的状态都是对的,而且再插入f元素的时候,报错了”队列已经满了“,是没有问题的。出队的测试这里就不做了,留个小伙伴们去做吧。

并发与等待

队列的基础代码已经实现了,我们再看看有没有其他的问题。对了,第一个问题就是并发,我们多个线程同时入队或者出队时,就会引发问题,那么怎么办呢?其实也很简单,加上synchronized关键字就可以了,如下:

/**
 * 入队操作
 * @param e
 * @return
 */
public synchronized boolean add(T e) {
   
    if (isFull()) {
   
        throw new RuntimeException("队列已经满了");
    }
    data[tail] = e;
    tail = (tail + 1) % size;
    if (tail == front) {
   
        flag = 1;
    }

    return true;
}

/**
 * 出队操作
 * @return
 */
public synchronized T poll() {
   
    if (isEmpty()) {
   
        throw new RuntimeException("队列中没有元素");
    }
    T rtnData = data[front];
    front = (front + 1) % size;
    if (front == tail) {
   
        flag = 0;
    }
    return rtnData;
}

这样入队出队操作就不会有并发的问题了。下面我们再去解决上面小伙伴提出的问题,就是入队时,队列满了要等待,出队时,队列空了要等待,这个要怎么解决呢?这里要用的wait()notifyAll()了,再进行编码前,我们先理清一下思路,

  1. 目前队列的长度是5,并且已经满了;
  2. 现在要向队列插入第6个元素,插入时,判断队列满了,要进行等待wait();
  3. 此时有一个出队操作,队列有空位了,此时应该唤起之前等待的线程,插入元素;

相反的,出队时,队列是空的,也要等待,当队列有元素时,唤起等待的线程,进行出队操作。好了,撸代码,

/**
 * 入队操作
 * @param e
 * @return
 */
public synchronized boolean add(T e) throws InterruptedException {
   
    if (isFull()) {
   
        wait();
    }
    data[tail] = e;
    tail = (tail + 1) % size;
    if (tail == front) {
   
        flag = 1;
    }
    notifyAll();
    return true;
}

/**
 * 出队操作
 * @return
 */
public synchronized T poll() throws InterruptedException {
   
    if (isEmpty()) {
   
        wait();
    }
    T rtnData = data[front];
    front = (front + 1) % size;
    if (front == tail) {
   
        flag = 0;
    }
    notifyAll();
    return rtnData;
}

之前我们抛异常的地方,统一改成了wait(),而且方法执行到最后进行notifyAll(),唤起等待的线程。我们进行简单的测试,

public static void main(String[] args) throws InterruptedException {
   
    MyQueue<String> myQueue = new MyQueue<>(5);
    new Thread(() -> {
   
        try {
   
            System.out.println(myQueue.poll());
        } catch (InterruptedException e) {
   
            throw new RuntimeException(e);
        }
    }).start();

    myQueue.add("a");
}

测试结果没有问题,可以正常打印"a"。这里只进行了出队的等待测试,入队的测试,小伙伴们自己完成吧。

if还是while

到这里,我们手撸的消息队列还算不错,基本的功能都实现了,但是有没有什么问题呢?我们看看下面的测试程序,

public static void main(String[] args) throws InterruptedException {
   
    MyQueue<String> myQueue = new MyQueue<>(5);
    new Thread(() -> {
   
        try {
   
            System.out.println(myQueue.poll());
        } catch (InterruptedException e) {
   
            throw new RuntimeException(e);
        }
    }).start();
    new Thread(() -> {
   
        try {
   
            System.out.println(myQueue.poll());
        } catch (InterruptedException e) {
   
            throw new RuntimeException(e);
        }
    }).start();

    Thread.sleep(5000);
    myQueue.add("a");
}

我们启动了两个消费者线程,同时从队列里获取数据,此时,队列是空的,两个线程都进行等待,5秒后,我们插入元素"a",看看结果如何,

a
null

结果两个消费者都打印出了日志,一个获取到null,一个获取到”a“,这是什么原因呢?还记得我们怎么判断空和满的吗?对了,使用的是if,我们捋一下整体的过程,

  1. 两个消费者线程同时从队列获取数据,队列是空的,两个消费者通过if判断,进入等待;
  2. 5秒后,向队列中插入"a"元素,并唤起所有等待线程;
  3. 两个消费者线程被依次唤起,一个取到值,一个没有取到。没有取到是因为取到的线程将front加了1导致的。这里为什么说依次唤起等待线程呢?因为notifyAll()不是同时唤起所有等待线程,是依次唤起,而且顺序是不确定的。

我们希望得到的结果是,一个消费线程得到”a“元素,另一个消费线程继续等待。这个怎么实现呢?对了,就是将判断是用到的if改为while,如下:

/**
 * 入队操作
 * @param e
 * @return
 */
public synchronized boolean add(T e) throws InterruptedException {
   
    while (isFull()) {
   
        wait();
    }
    data[tail] = e;
    tail = (tail + 1) % size;
    if (tail == front) {
   
        flag = 1;
    }
    notifyAll();
    return true;
}

/**
 * 出队操作
 * @return
 */
public synchronized T poll() throws InterruptedException {
   
    while (isEmpty()) {
   
        wait();
    }
    T rtnData = data[front];
    front = (front + 1) % size;
    if (front == tail) {
   
        flag = 0;
    }
    notifyAll();
    return rtnData;
}

在判断空还是满的时候,我们使用while去判断,当两个消费线程被依次唤起时,还会再进行空和满的判断,这时,第一个消费线程判断队列中有元素,会进行获取,第二个消费线程被唤起时,判断队列没有元素,会再次进入等待。我们写段代码测试一下,

public static void main(String[] args) throws InterruptedException {
   
    MyQueue<String> myQueue = new MyQueue<>(5);
    new Thread(() -> {
   
        try {
   
            System.out.println(myQueue.poll());
        } catch (InterruptedException e) {
   
            throw new RuntimeException(e);
        }
    }).start();
    new Thread(() -> {
   
        try {
   
            System.out.println(myQueue.poll());
        } catch (InterruptedException e) {
   
            throw new RuntimeException(e);
        }
    }).start();

    Thread.sleep(5000);
    myQueue.add("a");
    Thread.sleep(5000);
    myQueue.add("b");
}

同样,有两个消费线程去队列获取数据,此时队列为空,然后,我们每隔5秒,插入一个元素,看看结果如何,

a
b

10秒过后,插入的两个元素正常打印,说明我们的队列没有问题。入队的测试,大家自己进行吧。

总结

好了,我们手撸的消息队列完成了,看看都有哪些重点吧,

  1. 循环数组;
  2. 数组下标的计算,用取模法;
  3. 队列空与满的判断,注意flag;
  4. 并发;
  5. 唤起线程注意使用while
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
30天前
|
消息中间件 存储 Java
【揭秘】RocketMQ内部运作大揭秘:一探究竟,原来消息队列是这样工作的!
【8月更文挑战第19天】RocketMQ是一款高性能、高可用的消息中间件,在分布式系统中至关重要。它采用发布/订阅模式,支持高吞吐量的消息传递。核心组件包括管理元数据的NameServer、存储消息的Broker以及Producer和Consumer。RocketMQ支持发布/订阅与点对点两种模型,并具备复杂的消息持久化和路由机制。通过Java API示例,可轻松实现消息的发送与接收。RocketMQ凭借其出色的特性和可靠性,成为大型分布式系统首选的消息解决方案。
50 5
|
1月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
74 1
|
2月前
|
消息中间件 Java 物联网
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
|
1月前
|
消息中间件 网络架构
RabbitMQ消息队列常见面试题
这篇文章总结了RabbitMQ的常见面试题,涵盖了消息模型、使用场景、实现功能、消息幂等性、顺序性、堆积和丢失的避免方法,以及推模式和拉模式的区别。
43 0
|
1月前
|
消息中间件 Java Kafka
MQ 消息队列 比较
MQ 消息队列 比较
29 0
|
2月前
|
消息中间件 JavaScript Linux
消息队列 MQ操作报错合集之客户端在启动时遇到了连接错误,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Java 开发工具
消息队列 MQ使用问题之如何使用DefaultMQPushConsumer来消费消息
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 运维 Go
消息队列 MQ使用问题之如何配置生产环境
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。