php+redis实现延迟队列(订单超时未支付。会员时间过期)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: php+redis实现延迟队列(订单超时未支付。会员时间过期)

基于redis有序集实现延迟任务执行,比如某个时间给某个用户发短信,订单过期处理,等等

我是在tp5框架上写的,实现起来很简单,对于一些不是很复杂的应用足够了,目前在公司项目中使用,后台进程并没有实现多进程,


1、命令行脚本 执行方法:php think delay-queue queuename(这是有序集的key)

namespace app\command;
use app\common\lib\delayqueue\DelayQueue;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Db;
class DelayQueueWorker extends Command
{
    const COMMAND_ARGV_1 = 'queue';
    protected function configure()
    {
        $this->setName('delay-queue')->setDescription('延迟队列任务进程');
        $this->addArgument(self::COMMAND_ARGV_1);
    }
    protected function execute(Input $input, Output $output)
    {
        $queue = $input->getArgument(self::COMMAND_ARGV_1);
        //参数1 延迟队列表名,对应与redis的有序集key名
        while (true) {
            DelayQueue::getInstance($queue)->perform();
            usleep(300000);
        }
    }
}

库类目录结构

image.png

config.php 里是redis连接参数配置

RedisHandler.php只实现有序集的操作,重连机制还没有实现

namespace app\common\lib\delayqueue;
class RedisHandler
{
    public $provider;
    private static $_instance = null;
    private function __construct() {
        $this->provider = new \Redis();
        //host port
        $config = require_once 'config.php';
        $this->provider->connect($config['redis_host'], $config['redis_port']);
    }
    final private function __clone() {}
    public static function getInstance() {
        if(!self::$_instance) {
            self::$_instance = new RedisHandler();
        }
        return self::$_instance;
    }
    /**
     * @param string $key 有序集key
     * @param number $score 排序值
     * @param string $value 格式化的数据
     * @return int
     */
    public function zAdd($key, $score, $value)
    {
        return $this->provider->zAdd($key, $score, $value);
    }
    /**
     * 获取有序集数据
     * @param $key
     * @param $start
     * @param $end
     * @param null $withscores
     * @return array
     */
    public function zRange($key, $start, $end, $withscores = null)
    {
        return $this->provider->zRange($key, $start, $end, $withscores);
    }
    /**
     * 删除有序集数据
     * @param $key
     * @param $member
     * @return int
     */
    public function zRem($key,$member)
    {
        return $this->provider->zRem($key,$member);
    }
}

延迟队列

namespace app\common\lib\delayqueue;
class DelayQueue
{
    private $prefix = 'delay_queue:';
    private $queue;
    private static $_instance = null;
    private function __construct($queue) {
        $this->queue = $queue;
    }
    final private function __clone() {}
    public static function getInstance($queue = '') {
        if(!self::$_instance) {
            self::$_instance = new DelayQueue($queue);
        }
        return self::$_instance;
    }
    /**
     * 添加任务信息到队列
     *
     * demo DelayQueue::getInstance('test')->addTask(
     *    'app\common\lib\delayqueue\job\Test',
     *    strtotime('2018-05-02 20:55:20'),
     *    ['abc'=>111]
     * );
     *
     * @param $jobClass
     * @param int $runTime 执行时间
     * @param array $args
     */
    public function addTask($jobClass, $runTime, $args = null)
    {
        $key = $this->prefix.$this->queue;
        $params = [
            'class' => $jobClass,
            'args'  => $args,
            'runtime' => $runTime,
        ];
        RedisHandler::getInstance()->zAdd(
            $key,
            $runTime,
            serialize($params)
        );
    }
    /**
     * 执行job
     * @return bool
     */
    public function perform()
    {
        $key = $this->prefix.$this->queue;
        //取出有序集第一个元素
        $result = RedisHandler::getInstance()->zRange($key, 0 ,0);
        if (!$result) {
            return false;
        }
        $jobInfo = unserialize($result[0]);
        print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL);
        $jobClass = $jobInfo['class'];
        if(!@class_exists($jobClass)) {
            print_r($jobClass.' undefined'. PHP_EOL);
            RedisHandler::getInstance()->zRem($key, $result[0]);
            return false;
        }
        // 到时间执行
        if (time() >= $jobInfo['runtime']) {
            $job = new $jobClass;
            $job->setPayload($jobInfo['args']);
            $jobResult = $job->preform();
            if ($jobResult) {
                // 将任务移除
                RedisHandler::getInstance()->zRem($key, $result[0]);
                return true;
            }
        }
        return false;
    }
}

异步任务基类:

namespace app\common\lib\delayqueue;
class DelayJob
{
    protected $payload;
    public function preform ()
    {
        // todo
        return true;
    }
    public function setPayload($args = null)
    {
        $this->payload = $args;
    }
}

所有异步执行的任务都卸载job目录下,且要继承DelayJob,你可以实现任何你想延迟执行的任务

如:

namespace app\common\lib\delayqueue\job;
use app\common\lib\delayqueue\DelayJob;
class Test extends DelayJob
{
    public function preform()
    {
        // payload 里应该有处理任务所需的参数,通过DelayQueue的addTask传入
        print_r('test job'.PHP_EOL);
        return true;
    }
}

使用方法:


假设用户创建了一个订单,订单在10分钟后失效,那么在订单创建后加入:


DelayQueue::getInstance('close_order')->addTask(

    'app\common\lib\delayqueue\job\CloseOrder', // 自己实现的job

    strtotime('2018-05-02 20:55:20'), // 订单失效时间

    ['order_id'=>123456] // 传递给job的参数

);

close_order 是有序集的key


命令行启动进程


php think delay-queue close_order


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
NoSQL 算法 PHP
Redis 延迟队列实现(基于PHP)
顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。 例如:滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。
299 0
|
存储 JSON NoSQL
redis延迟队列php,php redis延迟队列,redis延迟队列
redis延迟队列php,php redis延迟队列,redis延迟队列
104 0
|
消息中间件 安全 PHP
PHP+Laravel+RabbitMQ实现异步延迟消息队列(库存归还)
一、前言 需求:电商秒杀场景中,如果用户下单10分钟未支付,需要进行库存归还 本篇是用PHP+Laravel+RabbitMQ来实现异步延迟消息队列
|
NoSQL PHP Redis
Mac PHP安装Redis扩展
php安装redis的扩展 采用pecl命令进行安装; pecl命令,在使用brew 安装php时,已经为我们安装上了,这里我们直接使用即可。 我们先进入php的bin目录看下命令是否存在,对应路径如下: cd /opt/homebrew/Cellar/php@7.3/7.3.32 这里的7.3为我通过brew install [php@7.3]安装的php具体版本号,大家可以通过ls命令查看文件夹下是否存在pecl命令
1459 0
|
NoSQL PHP Apache
windows下PHP安装Redis扩展不成功的可能原因
windows下PHP安装Redis扩展不成功的可能原因
525 0
windows下PHP安装Redis扩展不成功的可能原因
|
NoSQL Linux PHP
php7安装redis6扩展
php7安装redis6扩展
191 0