什么是消息队列?
消息队列在日常工作中用得特别多。目前市面上比较常用的 MQ 消息队列中间件有 RabbitMQ、Kafka、RocketMQ 等。根据业务需求,有时还可用 Redis 做轻量的消息队列。
它的应用场景有很多,比如秒杀、记录日志等等。
秒杀就很常见了,当同一时间有大量的请求进来。如果不适用消息队列,有可能会把服务器打挂。就算不挂也会造成响应超时等问题。有了消息队列,我们可以把请求都放到消息队列里面排队处理。如果长度超过最大可承载数量,那我们选择抛弃当前用户请求。提示客户 "排队中",这样更友好。
记录日志也有对应的场景。在没消息队列前,我们是客户端进来请求,顺便记录日志。它是一个同步的行为,这会占用服务器响应的时间。而使用消息队列没我们可以在请求结束时,把日志扔到队列里面,由消费者处理,服务器直接返回请求结果。
相信大家都知道,对于一个新的框架、中间件。用起来是非常简单的,看半小时相信你就能用起来了。「但如果让你手写一个简单的消息队列,你能写出来么?」
你比较熟悉的消息队列
❝狗哥用的 RabbitMQ 比较多,它是一个老牌的开源消息中间件。支持标准的 AMQP(Advanced Message Queuing Protocol,高级消息队列协议),使用 Erlang 语言开发,支持集群部署,和多种客户端语言混合调用,它支持的主流开发语言有以下这些:Java、.NET、Ruby、Python、PHP、JavaScript and Node、Objective-C and Swift、Rust、Scala 以及 Go。
❞
RabbitMQ 中有三个重要的角色:
- 生产者:消息的创建者,负责创建和推送数据到消息服务器。
- 消费者:消息的接收方,用于处理数据和确认消息。
- 代理:也就是 RabbitMQ 服务本身,它用于扮演 "快递" 的角色,因为它本身并不生产消息,只是扮演了 "快递" 的角色,把消息进行暂存和传递。
它的优点是:
- 支持多语言
- 支持持久化,RabbitMQ 支持磁盘持久化功能,保证了消息不会丢失;
- 高并发,RabbitMQ 使用了 Erlang 开发语言,Erlang 是为电话交换机开发的语言,天生自带高并发光环和高可用特性;
- 支持分布式集群,正是因为 Erlang 语言实现的,因此 RabbitMQ 集群部署也非常简单,只需要启动每个节点并使用 --link 把节点加入到集群中即可,并且 RabbitMQ 支持自动选主和自动容灾;
- 支持消息确认,支持消息消费确认(ack)保证了每条消息可以被正常消费;
- 它支持很多插件,比如网页控制台消息管理插件、消息延迟插件等,RabbitMQ 的插件很多并且使用都很方便。
下图就是它的工作流程:
它一共有四种消息类型:
- direct(默认类型)模式,此模式为一对一的发送方式,也就是一条消息只会发送给一个消费者;
- headers 模式,允许你匹配消息的 header 而非路由键(RoutingKey),除此之外 headers 和 direct 的使用完全一致,但因为 headers 匹配的性能很差,几乎不会被用到;
- fanout 模式,为多播的方式,会把一个消息分发给所有的订阅者;
- topic 模式,为主题订阅模式,允许使用通配符(#、*)匹配一个或者多个消息,我可以使用 "cn.mq.#" 匹配到多个前缀是 "cn.mq.xxx" 的消息,比如可以匹配到 "cn.mq.rabbit"、"cn.mq.kafka" 等消息。
实现一个消息队列
首先是「简单版」,必须有三个角色。消费者、生产者以及代理。只需借助 java 的 LinkedList 类即可。
import java.util.LinkedList; import java.util.Queue; public class SimpleQueue { // 定义消息队列 private static Queue< String > queue = new LinkedList< >(); public static void main(String[] args) { producer(); // 调用生产者 consumer(); // 调用消费者 } // 生产者 public static void producer() { // 添加消息 queue.add("first message."); queue.add("second message."); queue.add("third message."); } // 消费者 public static void consumer() { while (!queue.isEmpty()) { // 消费消息 System.out.println(queue.poll()); } } }
运行结果:可以看出消息是以先进先出的顺序消费的。
加下来是「带延迟功能」的消息队列,这就必须需要借助 java 的 DelayQueue 类以及 Delayed 接口了。
import java.text.DateFormat; import java.util.Date; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class SimpleDelayQueue { // 延迟消息队列 private static DelayQueue delayQueue = new DelayQueue(); public static void main(String[] args) throws InterruptedException { producer(); // 调用生产者 consumer(); // 调用消费者 } // 生产者 public static void producer() { // 添加消息 delayQueue.put(new MyDelay(1000, "消息1")); delayQueue.put(new MyDelay(3000, "消息2")); } // 消费者 public static void consumer() throws InterruptedException { System.out.println("开始执行时间:" + DateFormat.getDateTimeInstance().format(new Date())); while (!delayQueue.isEmpty()) { System.out.println(delayQueue.take()); } System.out.println("结束执行时间:" + DateFormat.getDateTimeInstance().format(new Date())); } /** * 自定义延迟队列 */ static class MyDelay implements Delayed { // 延迟截止时间(单位:毫秒) long delayTime = System.currentTimeMillis(); private String msg; public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } /** * 初始化 * @param delayTime 设置延迟执行时间 * @param msg 执行的消息 */ public MyDelay(long delayTime, String msg) { this.delayTime = (this.delayTime + delayTime); this.msg = msg; } // 获取剩余时间 @Override public long getDelay(TimeUnit unit) { return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } // 队列里元素的排序依据 @Override public int compareTo(Delayed o) { if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) { return 1; } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) { return -1; } else { return 0; } } @Override public String toString() { return this.msg; } } }
运行结果:可以看出消息 1、消息 2 都实现了延迟执行的功能。
巨人的肩膀
小结
本文聊了消息队列的使用场景、还介绍了我最常用的 RabbitMQ 的特性。同时还手动实现了简单版和带延迟功能的消息队列。它在我们工作中还是非常常用的,面试中问得也多。特别是诸如:聊聊你最常用的消息队列?如何手写一个消息队列等问题。