剖析 Redis List 消息队列的三种消费线程模型

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。

Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。

生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型

1 核心流程

生产者使用 LPUSH key element[element...] 将消息插入到队列的头部,如果 key 不存在则会创建一个空的队列再插入消息。

如下,生产者向队列 queue 先后插入了 「Java」「勇哥」「Go」,返回值表示消息插入队列后的个数。

> LPUSH queue Java 勇哥 Go
(integer) 3

消费者使用 RPOP key 依次读取队列的消息,先进先出,所以 「Java」会先读取消费:

> RPOP queue
"Java"
> RPOP queue
"勇哥"
> RPOP queue
"Go"

接下来,我们可以通过 spring-data-redis API 演示生产消费流程:

  • 生产者
redisTemplate.opsForList().leftPush("queue" , "Java");
redisTemplate.opsForList().leftPush("queue" , "勇哥");
redisTemplate.opsForList().leftPush("queue" , "Go");
  • 消费者

我们启动一个独立的线程从队列中读取消息(RPOP 命令),读取成功之后,消费消息,若没有消息,则休眠一会,下一次循环再继续。

上图的伪代码中, while(true) 循环内不停地调用 RPOP 指令,当有消息时,可以及时处理,但假如没有读取到消息,则需要休眠一会。

这里要加休眠,主要是为了减少空读的频率,避免 CPU 无意义的消耗

有什么更优化的方式吗? 有,那就是使用 Redis 阻塞读取 List 的命令。

Redis 提供了 BLPOP、BRPOP 阻塞读取的命令,消费者在在读取队列没有数据的时自动阻塞,直到有新的消息写入队列,才会继续读取新消息执行业务逻辑。

BRPOP queue 0

参数 0 表示阻塞等待时间无限制 。

如图,我们启动一个消费线程永动机,消费线程拉取消息后,执行消费逻辑。

这种消费者线程模型非常容易理解,同时也非常适合顺序消费的模式。同时,假如我们在消费消息时,服务器宕机或者断电,可能丢失一条消息。

接下来,我们想一想,有没有消费速度更高的消费模型吗? 笔者根据过往的经历,列举三种模式:

  • 拉取线程 + 消费线程池(非阻塞模式)
  • 拉取线程 + 消费线程池 (阻塞模式)
  • 拉取线程 + Disruptor(阻塞模式)

2 拉取线程 + 消费线程池(非阻塞模式)

为了提升消费速度,我们可以将拉取和消费拆分成两种动作,分别通过不同的线程池来处理。拉取线程池负责拉取消息,消费线程池负责消费消息。

伪代码类似:

如图,在拉取线程内部,我们拉取完消息后,将消息提交到消费线程 consumeExecutor 。

这样方式可以通过多线程执行大幅度提升消费速度 ,但是这里还是有一个问题:

假如消费速度很慢,生产者速度很高,那么就会在线程池内容易产生消息堆积,这里面会产生两个隐形风险:

  • 线程池队列无限堆积,则可能有 OOM 的风险 ;
  • 假如消费者服务器宕机或者断电,那么会丢失大量的消息。

那么如何优化这种模式呢 ?

答案是:拉取线程提交消息到线程池时,当队列中消息数量到达一定数量时,提交消息到线程池会阻塞。

3 拉取线程 + 消费线程池(阻塞模式)

我们将消息包装为 Runnable ,然后通过消费线程池执行 execute ,拉取线程会不会阻塞呢 ?

下图是执行的源码:

可以看到,第 30 行调用的是 workQueue 的非阻塞的 offer 方法

如果队列已满,新提交的任务并不会被 block 住,反而会调用后续的 reject 流程。

如果我们想要达到阻塞生产者的目的的话,可以采取如下的两种方案:

  • 信号量限制同时进入线程池等待队列的任务数 。

  • 使用线程池的拒绝机制,把新加入的任务 put 到等待队列里,这样也可以阻塞住生产者。

4 拉取线程 + Disruptor

下图展示了 Disruptor 的流程图 。

和线程池机制非常类似, Disruptor 也是非常典型的生产者/消费者模式。线程池存储提交任务的容器是阻塞队列,而 Disruptor 使用的是环形缓冲区 RingBuffer

环形缓冲区的设计相比阻塞队列有如下优点:

  • 环形数组结构

为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。

  • 元素位置定位

数组长度 2^n,通过位运算,加快定位的速度。下标采取递增的形式,不用担心 index 溢出的问题。index 是 long 类型,即使100万QPS的处理速度,也需要30万年才能用完。

  • 无锁设计

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

此刻大家并不需要理解环形缓冲区的读写机制,只需要明白 环形缓冲区 RingBuffer 是 Disruptor 的精髓即可。

将消费线程池替换成 Disruptor 有两个明显的优点:

  • 无锁队列,写入读取性能非常好
  • 当拉取线程提交消息到 Disruptor 时,若环形缓冲区 RingBuffer 已经满了,则拉取线程会阻塞,这样天然的可以避免无限拉取,同时避免 OOM 的问题。

伪代码类似:

1、定义 Disruptor

2、拉取线程将消息发送到 Disruptor Ringbuffer

3、消费消息

整体的消费者线程模型如下图:

5 平滑停服 + 定时任务补偿

当我们分析消费者线程模型时,无论我们使用哪种方式,假如服务器突然宕机、或者物理机断电,则会丢失消息。

笔者推荐两种方式:

1、平滑停服

平滑停服是指在停止应用程序时,尽量避免中断正在进行的请求或任务,尽量让正在进行的任务处理完成,并且不再接收新的任务,等所有任务执行完成后关闭应用。

在 Unix/Linux 系统中,可以使用 kill 命令发送信号给运行中的进程。

常见的信号有:

  • SIGTERM (15):请求进程终止,可以被捕捉和处理,用于优雅地停止进程。
  • SIGKILL (9):强制终止进程,不能被捕捉或忽略。
  • SIGQUIT (3):进程退出并生成核心转储(core dump)。

为了实现平滑停服,可以使用 Java 的 Runtime.getRuntime().addShutdownHook 方法注册一个关闭钩子(shutdown hook)。当 JVM 接收到SIGTERM信号时,关闭钩子会被执行,从而可以在应用程序停止前执行一些清理工作。

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        System.out.println("Shutdown hook triggered. Performing cleanup...");
        // 在这里执行清理工作,如关闭资源、保存状态等
}));

我们可以在钩子里,关闭拉取线程池 ,优雅关闭消费线程池等 ,这样可以尽量避免丢失消息。

2、定时任务补偿

使用 List 做消息队列,不可避免的会有消息丢失,所以我们需要用定时任务做补偿,每隔一段时间去业务表里查询业务状态机,若状态机不符合条件,则触发补偿策略。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
9天前
|
监控 NoSQL 安全
如何在 Redis 中正确使用多线程?
【10月更文挑战第16天】正确使用 Redis 多线程需要综合考虑多个因素,并且需要在实践中不断摸索和总结经验。通过合理的配置和运用,多线程可以为 Redis 带来性能上的提升,同时也要注意避免可能出现的问题,以保障系统的稳定和可靠运行。
23 2
|
9天前
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
22 1
|
21天前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
56 6
|
3天前
|
安全
List并发线程安全问题
【10月更文挑战第21天】`List` 并发线程安全问题是多线程编程中一个非常重要的问题,需要我们认真对待和处理。只有通过不断地学习和实践,我们才能更好地掌握多线程编程的技巧和方法,提高程序的性能和稳定性。
|
4天前
|
并行计算 JavaScript 前端开发
单线程模型
【10月更文挑战第15天】
|
5天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
8 1
|
18天前
|
存储 运维 NoSQL
Redis为什么最开始被设计成单线程而不是多线程
总之,Redis采用单线程设计是基于对系统特性的深刻洞察和权衡的结果。这种设计不仅保持了Redis的高性能,还确保了其代码的简洁性、可维护性以及部署的便捷性,使之成为众多应用场景下的首选数据存储解决方案。
31 1
|
22天前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
22 2
|
26天前
|
NoSQL Redis 数据库
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
本文解释了Redis为什么采用单线程模型,以及为什么Redis单线程模型的效率和速度依然可以非常高,主要原因包括Redis操作主要访问内存、核心操作简单、单线程避免了线程竞争开销,以及使用了IO多路复用机制epoll。
38 0
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
|
14天前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。