Redis 延迟队列实现(基于PHP)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。例如:滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。

延迟队列介绍

顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。

例如:滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。

Redis实现延迟队列

Redis 可以利用 zset (有序列表)来实现,将消息序列化成一个字符串作为 zsetvalue

这个消息的到期处理时间作为 score,利用多个线程轮询 zset 获取到期的任务进行处理。

多线程是为了保证可用性,万一挂了一个线程还有其他线程可以继续处理;

因为有多个线程,所以需要考虑并发争抢任务,确保任务不会多次执行

代码实现

require_once("../RedisClient.php");
$client = RedisClient::getInstance();
//延时队列
function delay(string $message, int $timeout = 5)
{
    global $client;
    $time = microtime(true) + $timeout;
    return $client->zadd('delay:', [$message => $time]);
}
//顺序消费延迟队列中的消息
function loop()
{
    global $client;
    while (true) {
        //从延迟队列获取一条最近时间的消息
        $message_data = $client->zrangebyscore('delay:', '-inf', microtime(true), ['withscores' => true, 'limit' => [0, 1]]);
        //延迟队列中无消息
        if (!$message_data) {
            sleep(1);
            continue;
        }
        //提取消息数据
        $message = key($message_data);
        //从延迟队列中删除刚获取的消息
        $success = $client->zrem('delay:', $message);
        //多线程或多进程争抢消息时,
        //根据zrem返回值判断,当前实例有没有抢到任务
        //抢到任务,做业务处理后返回
        if ($success) {
            //do something..
            echo sprintf("消费的消息,[%s]", $message) . PHP_EOL;
        }
    }
}
//delay('test1');
//delay('test2');
//delay('test2');
//loop();
# php queue.php 
消费的消息,[mmm1]
消费的消息,[mmm2]
消费的消息,[mmm3]


进一步优化

细心的同学会发现上面算法代码中,有几处问题

  • 同一个任务被多个进程取到后再使用 zrem 进行争抢,没有抢到的进程白白浪费了一次任务;
  • 取出条数和删除只能一条,且 zrangebyscorezrem 不是原子操作;
  • 消息取出后,执行了一部分逻辑,服务器突然重启了,剩下的逻辑没有执行完成该如何处理?

我们可以通过使用lua脚本,解决前面两个问题,至于第三个问题可以通过代码层面其他数据库事务解决。

require_once("../RedisClient.php");
use Predis\Command\ScriptCommand;
$client = RedisClient::getInstance();
/**
 * 从消息队列中搜索符合条件的最近n条消息
 * 返回消息内容并从消息队列中删除
 * @param string queue_key 消息队列的key
 * @param int $min      搜索时间戳开始时间
 * @param int $max      搜索时间戳结束时间
 * @param int $offset   要跳过的消息数量
 * @param int $limit    获取消息数量
 * @return array 删除成功的消息的消息内容
 * @
 */
class getAndDeleteRecentMessageScript extends ScriptCommand {
    public function getKeysCount()
    {
        return 1;
    }
    public function getScript()
    {
        return <<<LUA
-- 消息队列的redisKey
local queue = KEYS[1]
-- 搜索范围的最大/最小值,偏移量和取值数量
local min, max, offset, count = ARGV[1], ARGV[2], ARGV[3], ARGV[4] 
local message = false
local messages = {}
local queue_value = {}
local insert = table.insert
-- 获取最近n条消息并删除消息
queue_value = redis.call("ZRANGEBYSCORE",queue,min,max,"LIMIT",offset,count)
for idx, message in pairs(queue_value) do
    if redis.call("ZREM",queue,message) then
        insert(messages, idx, message)
    end
end
-- 返回删除成功的消息
return messages
LUA;
    }
}
$client->getProfile()->defineCommand('get_and_delete_recent_message','getAndDeleteRecentMessageScript');
//向延迟队列中写入10条数据
foreach(range(1,10) as $msg_id){
    $success = delay("msg{$msg_id}");
    if($success){
        echo "写入消息[msg{$msg_id}], 成功" . PHP_EOL;
    }
}
//删除最近写入的 2条
$ret = $conn->get_and_delete_recent_message('delay:',0,microtime(true),3,2);
var_export($ret);


后记

延时队列是一个实现“延时消息”的好方法,解决了业务问题。至于可达性、幂等性未来另述。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
2月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
47 5
|
2月前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
59 6
|
2月前
|
存储 NoSQL PHP
PHP与Redis结合使用,提升数据存储性能
随着互联网应用的发展,PHP与Redis的结合成为提升数据存储性能的重要手段。PHP作为流行的服务器端语言,常用于网站开发;Redis作为高性能内存数据库,以其快速读写能力,有效优化数据访问速度,减轻数据库压力。两者结合通过缓存机制显著提升应用响应速度,支持高并发场景下的稳定性和可扩展性。
|
2月前
|
存储 NoSQL 关系型数据库
PHP 使用 Redis
10月更文挑战第22天
46 6
|
3月前
|
消息中间件 存储 NoSQL
如何用Redis实现延迟队列?
综上所述,通过Redis的有序集合和一些基本命令,我们可以轻松地构建出功能完善的延迟队列系统。根据具体需求,可以进一步优化和扩展,以满足高性能和高可靠性的业务需求。
77 1
|
2月前
|
前端开发 关系型数据库 MySQL
PHP与MySQL动态网站开发实战指南####
【10月更文挑战第21天】 本文将深入浅出地探讨如何使用PHP与MySQL构建一个动态网站,从环境搭建到项目部署,全程实战演示。无论你是编程新手还是希望巩固Web开发技能的老手,都能在这篇文章中找到实用的技巧和启发。我们将一起探索如何通过PHP处理用户请求,利用MySQL存储数据,并最终呈现动态内容给用户,打造属于自己的在线平台。 ####
66 0
|
1月前
|
存储 关系型数据库 MySQL
PHP与MySQL动态网站开发:从基础到实践####
本文将深入探讨PHP与MySQL的结合使用,展示如何构建一个动态网站。通过一系列实例和代码片段,我们将逐步了解数据库连接、数据操作、用户输入处理及安全防护等关键技术点。无论您是初学者还是有经验的开发者,都能从中获益匪浅。 ####
|
2月前
|
安全 关系型数据库 MySQL
PHP与MySQL动态网站开发实战指南####
——深入探索LAMP栈下的高效数据交互与处理技巧 ####
|
1月前
|
关系型数据库 MySQL PHP
php实现一个简单的MySQL分页
通过本文的详细步骤和代码示例,我们实现了一个简单的PHP MySQL分页功能。主要步骤包括计算总记录数、设置分页参数、查询当前页的数据以及生成分页链接。这种分页方式适用于大多数Web应用,能够有效提升用户体验和页面响应速度。
33 4
|
2月前
|
关系型数据库 MySQL PHP
PHP与MySQL动态网站开发实战指南####
深入探索PHP与MySQL的协同工作机制,本文旨在通过一系列实战案例,揭示构建高效、稳定且用户友好的动态网站的秘诀。从环境搭建到数据交互,再到最佳实践分享,本文为开发者提供了一条清晰的学习路径,助力其在LAMP(Linux, Apache, MySQL, PHP/Perl/Python)栈上实现技术飞跃。 ####
下一篇
开通oss服务