延迟队列介绍
顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。
例如:滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。
Redis实现延迟队列
Redis
可以利用 zset
(有序列表)来实现,将消息序列化成一个字符串作为 zset
的 value
;
这个消息的到期处理时间作为 score
,利用多个线程轮询 zset
获取到期的任务进行处理。
多线程是为了保证可用性,万一挂了一个线程还有其他线程可以继续处理;
因为有多个线程,所以需要考虑并发争抢任务,确保任务不会多次执行。
代码实现
require_once("../RedisClient.php"); $client = RedisClient::getInstance(); //延时队列 function delay(string $message, int $timeout = 5) { global $client; $time = microtime(true) + $timeout; return $client->zadd('delay:', [$message => $time]); } //顺序消费延迟队列中的消息 function loop() { global $client; while (true) { //从延迟队列获取一条最近时间的消息 $message_data = $client->zrangebyscore('delay:', '-inf', microtime(true), ['withscores' => true, 'limit' => [0, 1]]); //延迟队列中无消息 if (!$message_data) { sleep(1); continue; } //提取消息数据 $message = key($message_data); //从延迟队列中删除刚获取的消息 $success = $client->zrem('delay:', $message); //多线程或多进程争抢消息时, //根据zrem返回值判断,当前实例有没有抢到任务 //抢到任务,做业务处理后返回 if ($success) { //do something.. echo sprintf("消费的消息,[%s]", $message) . PHP_EOL; } } } //delay('test1'); //delay('test2'); //delay('test2'); //loop(); # php queue.php 消费的消息,[mmm1] 消费的消息,[mmm2] 消费的消息,[mmm3]
进一步优化
细心的同学会发现上面算法代码中,有几处问题
- 同一个任务被多个进程取到后再使用
zrem
进行争抢,没有抢到的进程白白浪费了一次任务; - 取出条数和删除只能一条,且
zrangebyscore
和zrem
不是原子操作; - 消息取出后,执行了一部分逻辑,服务器突然重启了,剩下的逻辑没有执行完成该如何处理?
我们可以通过使用lua脚本,解决前面两个问题,至于第三个问题可以通过代码层面其他数据库事务解决。
require_once("../RedisClient.php"); use Predis\Command\ScriptCommand; $client = RedisClient::getInstance(); /** * 从消息队列中搜索符合条件的最近n条消息 * 返回消息内容并从消息队列中删除 * @param string queue_key 消息队列的key * @param int $min 搜索时间戳开始时间 * @param int $max 搜索时间戳结束时间 * @param int $offset 要跳过的消息数量 * @param int $limit 获取消息数量 * @return array 删除成功的消息的消息内容 * @ */ class getAndDeleteRecentMessageScript extends ScriptCommand { public function getKeysCount() { return 1; } public function getScript() { return <<<LUA -- 消息队列的redisKey local queue = KEYS[1] -- 搜索范围的最大/最小值,偏移量和取值数量 local min, max, offset, count = ARGV[1], ARGV[2], ARGV[3], ARGV[4] local message = false local messages = {} local queue_value = {} local insert = table.insert -- 获取最近n条消息并删除消息 queue_value = redis.call("ZRANGEBYSCORE",queue,min,max,"LIMIT",offset,count) for idx, message in pairs(queue_value) do if redis.call("ZREM",queue,message) then insert(messages, idx, message) end end -- 返回删除成功的消息 return messages LUA; } } $client->getProfile()->defineCommand('get_and_delete_recent_message','getAndDeleteRecentMessageScript'); //向延迟队列中写入10条数据 foreach(range(1,10) as $msg_id){ $success = delay("msg{$msg_id}"); if($success){ echo "写入消息[msg{$msg_id}], 成功" . PHP_EOL; } } //删除最近写入的 2条 $ret = $conn->get_and_delete_recent_message('delay:',0,microtime(true),3,2); var_export($ret);
后记
延时队列是一个实现“延时消息”的好方法,解决了业务问题。至于可达性、幂等性未来另述。