工作三年,小胖问我怎么实现一个消息队列?真的菜!

简介: 工作三年,小胖问我怎么实现一个消息队列?真的菜!

什么是消息队列?


消息队列在日常工作中用得特别多。目前市面上比较常用的 MQ 消息队列中间件有 RabbitMQ、Kafka、RocketMQ 等。根据业务需求,有时还可用 Redis 做轻量的消息队列。


它的应用场景有很多,比如秒杀、记录日志等等。


秒杀就很常见了,当同一时间有大量的请求进来。如果不适用消息队列,有可能会把服务器打挂。就算不挂也会造成响应超时等问题。有了消息队列,我们可以把请求都放到消息队列里面排队处理。如果长度超过最大可承载数量,那我们选择抛弃当前用户请求。提示客户 "排队中",这样更友好。


记录日志也有对应的场景。在没消息队列前,我们是客户端进来请求,顺便记录日志。它是一个同步的行为,这会占用服务器响应的时间。而使用消息队列没我们可以在请求结束时,把日志扔到队列里面,由消费者处理,服务器直接返回请求结果。


相信大家都知道,对于一个新的框架、中间件。用起来是非常简单的,看半小时相信你就能用起来了。「但如果让你手写一个简单的消息队列,你能写出来么?」


你比较熟悉的消息队列


狗哥用的 RabbitMQ 比较多,它是一个老牌的开源消息中间件。支持标准的 AMQP(Advanced Message Queuing Protocol,高级消息队列协议),使用 Erlang 语言开发,支持集群部署,和多种客户端语言混合调用,它支持的主流开发语言有以下这些:Java、.NET、Ruby、Python、PHP、JavaScript and Node、Objective-C and Swift、Rust、Scala 以及 Go。


RabbitMQ 中有三个重要的角色:


  • 生产者:消息的创建者,负责创建和推送数据到消息服务器。
  • 消费者:消息的接收方,用于处理数据和确认消息。
  • 代理:也就是 RabbitMQ 服务本身,它用于扮演 "快递" 的角色,因为它本身并不生产消息,只是扮演了 "快递" 的角色,把消息进行暂存和传递。


它的优点是:


  • 支持多语言
  • 支持持久化,RabbitMQ 支持磁盘持久化功能,保证了消息不会丢失;
  • 高并发,RabbitMQ 使用了 Erlang 开发语言,Erlang 是为电话交换机开发的语言,天生自带高并发光环和高可用特性;
  • 支持分布式集群,正是因为 Erlang 语言实现的,因此 RabbitMQ 集群部署也非常简单,只需要启动每个节点并使用 --link 把节点加入到集群中即可,并且 RabbitMQ 支持自动选主和自动容灾;
  • 支持消息确认,支持消息消费确认(ack)保证了每条消息可以被正常消费;
  • 它支持很多插件,比如网页控制台消息管理插件、消息延迟插件等,RabbitMQ 的插件很多并且使用都很方便。


下图就是它的工作流程:


640.png


它一共有四种消息类型:


  • direct(默认类型)模式,此模式为一对一的发送方式,也就是一条消息只会发送给一个消费者;
  • headers 模式,允许你匹配消息的 header 而非路由键(RoutingKey),除此之外 headers 和 direct 的使用完全一致,但因为 headers 匹配的性能很差,几乎不会被用到;
  • fanout 模式,为多播的方式,会把一个消息分发给所有的订阅者;
  • topic 模式,为主题订阅模式,允许使用通配符(#、*)匹配一个或者多个消息,我可以使用 "cn.mq.#" 匹配到多个前缀是 "cn.mq.xxx" 的消息,比如可以匹配到 "cn.mq.rabbit"、"cn.mq.kafka" 等消息。


实现一个消息队列


首先是「简单版」,必须有三个角色。消费者、生产者以及代理。只需借助 java 的 LinkedList  类即可。


import java.util.LinkedList;
import java.util.Queue;
public class SimpleQueue {
    // 定义消息队列
    private static Queue< String > queue = new LinkedList< >();
    public static void main(String[] args) {
        producer(); // 调用生产者
        consumer(); // 调用消费者
    }
    // 生产者
    public static void producer() {
        // 添加消息
        queue.add("first message.");
        queue.add("second message.");
        queue.add("third message.");
    }
    // 消费者
    public static void consumer() {
        while (!queue.isEmpty()) {
            // 消费消息
            System.out.println(queue.poll());
        }
    }
}


运行结果:可以看出消息是以先进先出的顺序消费的。


640.png


加下来是「带延迟功能」的消息队列,这就必须需要借助 java 的 DelayQueue 类以及 Delayed 接口了。


import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class SimpleDelayQueue {
    // 延迟消息队列
    private static DelayQueue delayQueue = new DelayQueue();
    public static void main(String[] args) throws InterruptedException {
        producer(); // 调用生产者
        consumer(); // 调用消费者
    }
    // 生产者
    public static void producer() {
        // 添加消息
        delayQueue.put(new MyDelay(1000, "消息1"));
        delayQueue.put(new MyDelay(3000, "消息2"));
    }
    // 消费者
    public static void consumer() throws InterruptedException {
        System.out.println("开始执行时间:" +
                DateFormat.getDateTimeInstance().format(new Date()));
        while (!delayQueue.isEmpty()) {
            System.out.println(delayQueue.take());
        }
        System.out.println("结束执行时间:" +
                DateFormat.getDateTimeInstance().format(new Date()));
    }
    /**
     * 自定义延迟队列
     */
    static class MyDelay implements Delayed {
        // 延迟截止时间(单位:毫秒)
        long delayTime = System.currentTimeMillis();
        private String msg;
        public String getMsg() {
            return msg;
        }
        public void setMsg(String msg) {
            this.msg = msg;
        }
        /**
         * 初始化
         * @param delayTime 设置延迟执行时间
         * @param msg       执行的消息
         */
        public MyDelay(long delayTime, String msg) {
            this.delayTime = (this.delayTime + delayTime);
            this.msg = msg;
        }
        // 获取剩余时间
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        // 队列里元素的排序依据
        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            } else {
                return 0;
            }
        }
        @Override
        public String toString() {
            return this.msg;
        }
    }
}


运行结果:可以看出消息 1、消息 2 都实现了延迟执行的功能。


640.png


巨人的肩膀



小结


本文聊了消息队列的使用场景、还介绍了我最常用的 RabbitMQ 的特性。同时还手动实现了简单版和带延迟功能的消息队列。它在我们工作中还是非常常用的,面试中问得也多。特别是诸如:聊聊你最常用的消息队列?如何手写一个消息队列等问题。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件 监控 Kafka
不愧是Alibaba技术官,Kafka的精髓全写这本“限量笔记”里,服了
分布式,是程序员必备技能之一,在面试过程中属于必备类的,在工作中更是会经常用到。而Kafka是一个分布式的基于发布订阅的消息队列,目前它的魅力是无穷的,对于Kafka的奥秘,还需要我们细细去探寻。
|
5月前
|
消息中间件 弹性计算 运维
参课赢好礼!云消息队列 RabbitMQ 版陪跑班报名中
本次课程将向您介绍云消息队列 RabbitMQ 版对比开源 RabbitMQ 的优势,以及 Serverless 系列的弹性灵活和按量计费,带您快速上手云消息队列 RabbitMQ 版消息收发操作和 Serverless 系列价格计算器,帮助您满足随着业务增长而提高的消息队列性能和稳定性的需求,有效降低资源和运维成本。
107 17
|
8月前
|
NoSQL 安全 Java
三面阿里被挂,竟获内推名额,历经5面拿下口碑offer(Java后台)
每一个互联网人心中都有一个大厂梦,百度、阿里巴巴、腾讯是很多互联网人梦寐以求的地方,而我也不例外。但是,BAT等一线互联网大厂并不是想进就能够进的,它对人才的技术能力和学历都是有一定要求的,所以除了学历以外,我们的技术和能力都要过硬才行。
|
Java 程序员
终于不慌内卷了,多亏阿里内部的并发图册+JDK源码速成笔记
并发编程 Java并发在近几年的面试里面可以说是面试热点,每个面试官面试的时候都会跟你扯一下并发,甚至是高并发。面试前你不仅得需要弄清楚的是什么是并发,还得搞清什么是高并发! 在这里很多小白朋友就会很疑惑:我工作又不用,为啥面试总是问?真就内卷卷我呗!(手动狗头)互联网内卷已经是现在的行业趋势,而且是不可逆的,这个大家也知道;但LZ要说的是,虽然简单地增删改查并不需要并发的知识,但是业务稍微复杂一点,你的技术水平稍微提升一点的话你就会知道,并发是我们Java程序员绕不开的一道坎。
54 0
|
消息中间件 canal 运维
听叔一句劝,消息队列的水太深,你把握不住!
听叔一句劝,消息队列的水太深,你把握不住!
110 0
|
消息中间件 Cloud Native 物联网
极客时间「大师课·深度剖析 RocketMQ5.0」上线啦,欢迎免费领取!
极客时间「大师课·深度剖析 RocketMQ5.0」上线啦,快来免费领取课程玩转 RocketMQ 5.0 吧!
165 0
|
消息中间件 存储 算法
头条终面:写个消息中间件(上)
头条终面:写个消息中间件(上)
头条终面:写个消息中间件(上)
|
消息中间件 存储 监控
4.图灵学院-----阿里/京东/滴滴/美团整理----高频MQ消息队列篇
4.图灵学院-----阿里/京东/滴滴/美团整理----高频MQ消息队列篇
259 0
4.图灵学院-----阿里/京东/滴滴/美团整理----高频MQ消息队列篇
|
前端开发 JavaScript
#yyds干货盘点# 前端歌谣的刷题之路-第一百五十三题-发布订阅者模式
#yyds干货盘点# 前端歌谣的刷题之路-第一百五十三题-发布订阅者模式
100 0
#yyds干货盘点# 前端歌谣的刷题之路-第一百五十三题-发布订阅者模式
|
消息中间件 存储 算法
头条终面:写个消息中间件(下)
头条终面:写个消息中间件(下)