用 Redis 做一个可靠的延迟队列

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 用 Redis 做一个可靠的延迟队列

我们先看看以下业务场景:

  1. 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存?
  2. 新创建店铺,N天内没有上传商品,系统如何知道该信息,并发送激活短信?

上述场景最简单直接的解决方案是定时扫表。我们假设 10 分钟未支付则关闭订单、定时任务设置为 5 分钟一次,那么一个订单最晚会在 15 分钟关闭。高达 5 分钟的误差是业务难以接受的。另一方面频繁的扫表可能消耗过多数据库资源,影响线上交易吞吐量。

此外还有朋友使用 Redis 的过期通知、时间轮、Java 的 DelayQueue 等方式实现延时任务。我们在之前的文章中讨论过他们的缺陷:比如使用 Redis 过期通知不保证准时、发送即忘不保证送达,时间轮缺乏持久化机制容易丢失等。

总结一下,我们对于延时队列的要求有下列几条(从重要到不重要排列):

  1. 持久化: 服务重启或崩溃不能丢失任务
  2. 确认重试机制: 任务处理失败或超时应该有重试
  3. 定时尽量精确

最合适的解决方案是使用 Pulsa、RocketMQ 等专业消息队列的延时投递功能。不过引入新的中间件通常存在各种非技术方面的麻烦。Redis 作为广泛使用的中间件,何不用 Redis 来制作延时队列呢?

使用有序集合结构实现延时队列的方法已经广为人知,无非是将消息作为有序集合的 member 投递时间戳作为 score,使用 zrangebyscore 命令搜索已到投递时间的消息然后将其发给消费者。

除了基本的延时投递之外我们的消息队列具有下列优势:

  1. 提供 ACK 和重试机制
  2. 只需要 Redis 和消费者即可运行,无需其它组件
  3. 提供 At-Least-One 投递语义、并保证消息不会并发消费

本文的完整代码实现在hdt3213/delayqueue,可以直接 go get github.com/hdt3213/delayqueue 完成安装。

具体使用也非常简单,只需要注册处理消息的回调函数并调用 start() 即可:

package main
import (
 "github.com/go-redis/redis/v8"
 "github.com/hdt3213/delayqueue"
 "strconv"
 "time"
)
func main() {
 redisCli := redis.NewClient(&redis.Options{
  Addr: "127.0.0.1:6379",
 })
 queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
  // 注册处理消息的回调函数
        // 返回 true 表示已成功消费,返回 false 消息队列会重新投递次消息
  return true
 })
 // 发送延时消息
 for i := 0; i < 10; i++ {
  err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
  if err != nil {
   panic(err)
  }
 }
 // 启动消费协程
 done := queue.StartConsume()
 // 阻塞等待消费协程退出
 <-done
}

由于数据存储在 redis 中所以我们最多能保证在 redis 无故障且消息队列相关 key 未被外部篡改的情况下不会丢失消息。

原理详解

消息队列涉及几个关键的 redis 数据结构:

  • msgKey: 为了避免两条内容完全相同的消息造成意外的影响,我们将每条消息放到一个字符串类型的键中,并分配一个 UUID 作为它的唯一标识。其它数据结构中只存储 UUID 而不存储完整的消息内容。每个 msg 拥有一个独立的 key 而不是将所有消息放到一个哈希表是为了利用 TTL 机制避免
  • pendingKey: 有序集合类型,member 为消息 ID, score 为投递时间的 unix 时间戳。
  • readyKey: 列表类型,需要投递的消息 ID。
  • unAckKey: 有序集合类型,member 为消息 ID, score 为重试时间的 unix 时间戳。
  • retryKey: 列表类型,已到重试时间的消息 ID
  • garbageKey: 集合类型,用于暂存已达重试上线的消息 ID
  • retryCountKey: 哈希表类型,键为消息 ID, 值为剩余的重试次数

流程如下图所示:

微信图片_20220907154432.png

由于我们允许分布式地部署多个消费者,每个消费者都在定时执行 lua 脚本,所以多个消费者可能处于上述流程中不同状态,我们无法预知(或控制)上图中五个操作发生的先后顺序,也无法控制有多少实例正在执行同一个操作。

因此我们需要保证上图中五个操作满足三个条件:

  1. 都是原子性的
  2. 不会重复处理同一条消息
  3. 操作前后消息队列始终处于正确的状态

只要满足这三个条件,我们就可以部署多个实例且不需要使用分布式锁等技术来进行状态同步。

是不是听起来有点吓人?😂 其实简单的很,让我们一起来详细看看吧~

pending2ReadyScript

pending2ReadyScript 使用 zrangebyscore 扫描已到投递时间的消息ID并把它们移动到 ready 中:

-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- 从 pending key 中找出已到投递时间的消息
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- 将他们放入 ready key 中
for _,v in ipairs(msgs) do
 table.insert(args2, v) 
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- 从 pending key 中删除已投递的消息

ready2UnackScript

ready2UnackScript 从 ready 或者 retry 中取出一条消息发送给消费者并放入 unack 中,类似于 RPopLPush:

-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg

unack2RetryScript

unack2RetryScript 从 retry 中找出所有已到重试时间的消息并把它们移动到 unack 中:

-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- 找到已到重试时间的消息
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查询剩余重试次数
for i,v in ipairs(retryCounts) do
 local k = msgs[i]
 if tonumber(v) > 0 then -- 剩余次数大于 0
  redis.call("HIncrBy", KEYS[2], k, -1) -- 减少剩余重试次数
  redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中
 else -- 剩余重试次数为 0
  redis.call("HDel", KEYS[2], k) -- 删除重试次数记录
  redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待后续删除
 end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- 将已处理的消息从 unack key 中删除

因为 redis 要求 lua 脚本必须在执行前在 KEYS 参数中声明自己要访问的 key, 而我们将每个 msg 有一个独立的 key,我们在执行 unack2RetryScript 之前是不知道哪些 msg key 需要被删除。所以 lua 脚本只将需要删除的消息记在 garbage key 中,脚本执行完后再通过 del 命令将他们删除:

func (q *DelayQueue) garbageCollect() error {
 ctx := context.Background()
 msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
 if err != nil {
  return fmt.Errorf("smembers failed: %v", err)
 }
 if len(msgIds) == 0 {
  return nil
 }
 // allow concurrent clean
 msgKeys := make([]string, 0, len(msgIds))
 for _, idStr := range msgIds {
  msgKeys = append(msgKeys, q.genMsgKey(idStr))
 }
 err = q.redisCli.Del(ctx, msgKeys...).Err()
 if err != nil && err != redis.Nil {
  return fmt.Errorf("del msgs failed: %v", err)
 }
 err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
 if err != nil && err != redis.Nil {
  return fmt.Errorf("remove from garbage key failed: %v", err)
 }
 return nil
}

之前提到的 lua 脚本都是原子性执行的,不会有其它命令插入其中。 gc 函数由 3 条 redis 命令组成,在执行过程中可能会有其它命令插入执行过程中,不过考虑到一条消息进入垃圾回收流程之后不会复活所以不需要保证 3 条命令原子性。

ack

ack 只需要将消息彻底删除即可:

func (q *DelayQueue) ack(idStr string) error {
 ctx := context.Background()
 err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
 if err != nil {
  return fmt.Errorf("remove from unack failed: %v", err)
 }
 // msg key has ttl, ignore result of delete
 _ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
 q.redisCli.HDel(ctx, q.retryCountKey, idStr)
 return nil
}

否定确认只需要将 unack key 中消息的重试时间改为现在,随后执行的 unack2RetryScript 会立即将它移动到 retry key

func (q *DelayQueue) nack(idStr string) error {
 ctx := context.Background()
 // update retry time as now, unack2Retry will move it to retry immediately
 err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
  Member: idStr,
  Score:  float64(time.Now().Unix()),
 }).Err()
 if err != nil {
  return fmt.Errorf("negative ack failed: %v", err)
 }
 return nil
}

consume

消息队列的核心逻辑是每秒执行一次的 consume 函数,它负责调用上述脚本将消息转移到正确的集合中并回调 consumer 来消费消息:

func (q *DelayQueue) consume() error {
 // 执行 pending2ready,将已到时间的消息转移到 ready
 err := q.pending2Ready()
 if err != nil {
  return err
 }
 // 循环调用 ready2Unack 拉取消息进行消费
 var fetchCount uint
 for {
  idStr, err := q.ready2Unack()
  if err == redis.Nil { // consumed all
   break
  }
  if err != nil {
   return err
  }
  fetchCount++
  ack, err := q.callback(idStr)
  if err != nil {
   return err
  }
  if ack {
   err = q.ack(idStr)
  } else {
   err = q.nack(idStr)
  }
  if err != nil {
   return err
  }
  if fetchCount >= q.fetchLimit {
   break
  }
 }
 // 将 nack 或超时的消息放入重试队列
 err = q.unack2Retry()
 if err != nil {
  return err
 }
    // 清理已达到最大重试次数的消息
 err = q.garbageCollect()
 if err != nil {
  return err
 }
 // 消费重试队列
 fetchCount = 0
 for {
  idStr, err := q.retry2Unack()
  if err == redis.Nil { // consumed all
   break
  }
  if err != nil {
   return err
  }
  fetchCount++
  ack, err := q.callback(idStr)
  if err != nil {
   return err
  }
  if ack {
   err = q.ack(idStr)
  } else {
   err = q.nack(idStr)
  }
  if err != nil {
   return err
  }
  if fetchCount >= q.fetchLimit {
   break
  }
 }
 return nil
}

至此一个简单可靠的延时队列就做好了,何不赶紧开始试用呢😘😋?

相关文章
|
6月前
|
存储 监控 NoSQL
使用Redis实现延迟消息发送功能
使用 Redis 的密码认证功能,为实例设置密码以防止未授权访问。为消息提供适当加密,确保消息内容在网络传输过程中不被窃取或篡改。
254 16
|
编解码 NoSQL Java
使用Spring Boot + Redis 队列实现视频文件上传及FFmpeg转码的技术分享
【8月更文挑战第30天】在当前的互联网应用中,视频内容的处理与分发已成为不可或缺的一部分。对于视频平台而言,高效、稳定地处理用户上传的视频文件,并对其进行转码以适应不同设备的播放需求,是提升用户体验的关键。本文将围绕使用Spring Boot结合Redis队列技术来实现视频文件上传及FFmpeg转码的过程,分享一系列技术干货。
820 4
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
253 6
|
消息中间件 存储 NoSQL
如何用Redis实现延迟队列?
综上所述,通过Redis的有序集合和一些基本命令,我们可以轻松地构建出功能完善的延迟队列系统。根据具体需求,可以进一步优化和扩展,以满足高性能和高可靠性的业务需求。
356 1
|
消息中间件 NoSQL Go
PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技术实践
【9月更文挑战第7天】在从 PHP 的 ThinkPHP 框架迁移到 Go 的 Gin 框架时,涉及 Redis 延时消息队列的技术实践主要包括:理解延时消息队列概念,其能在特定时间处理消息,适用于定时任务等场景;在 ThinkPHP 中使用 Redis 实现延时队列;在 Gin 中结合 Go 的 Redis 客户端库实现类似功能;Go 具有更高性能和简洁性,适合处理大量消息。迁移过程中需考虑业务需求及系统稳定性。
332 1
|
NoSQL Linux Redis
Redis性能优化问题之想确认Redis延迟变大是否因为fork耗时导致的,如何解决
Redis性能优化问题之想确认Redis延迟变大是否因为fork耗时导致的,如何解决
|
NoSQL Redis
Redis性能优化问题之为什么配置为 appendfsync everysec 的 AOF 也可能导致 Redis 延迟变大
Redis性能优化问题之为什么配置为 appendfsync everysec 的 AOF 也可能导致 Redis 延迟变大
|
监控 NoSQL Redis
Redis性能优化问题之配置 Redis 的自动碎片整理功能,如何解决
Redis性能优化问题之配置 Redis 的自动碎片整理功能,如何解决
|
缓存 NoSQL Java
面试官:Redis如何实现延迟任务?
延迟任务是计划任务,用于在未来特定时间执行。常见应用场景包括定时通知、异步处理、缓存管理、计划任务、订单处理、重试机制、提醒和数据采集。Redis虽无内置延迟任务功能,但可通过过期键通知、ZSet或Redisson实现。然而,这种方法精度有限,稳定性较差,适合轻量级需求。Redisson的RDelayedQueue提供更简单的延迟队列实现。
699 9
|
存储 NoSQL API
【小小思考】Redis实现去重任务队列
【2月更文挑战第1天】思考一下如何用Redis实现去重的任务队列,主要有List 、List + Set/Hash/Bloom Filter、ZSet、Lua和开源库等方式。
554 1