laravel的消息队列剖析

简介:

laravel的消息队列剖析

这篇来自于看到朋友转的58沈剑的一篇文章:1分钟实现“延迟消息”功能

在实际工作中也不止遇见过一次这个问题,我在想着以前是怎么处理的呢?我记得当初在上家公司的时候直接使用的是laravel的queue来实现的。当然,这里说的laravel的queue实际上也是基于redis的队列实现的。正好今天遇上这个问题,追下底层机制。

使用如下:http://d.laravel-china.org/docs/5.3/queues

// 创建10分钟后执行的任务

$job = (new ProcessPodcast($pocast))
            ->delay(Carbon::now()->addMinutes(10));

dispatch($job);
AI 代码解读
//启动队列命令
php artisan queue:work
AI 代码解读

分发部分

首先看dispatch这边做的事情:

dispatch函数首先就是调用

return app(Dispatcher::class)->dispatch($job);
// Illuminate\Contracts\Bus\Dispatcher
AI 代码解读

首先需要理解这里的Dispatcher::class 实际注入的是哪个类。

看到vendor/laravel/framework/src/Illuminate/Bus/BusServiceProvider.php:26,有

public function register()
{
    $this->app->singleton('Illuminate\Bus\Dispatcher', function ($app) {
        return new Dispatcher($app, function ($connection = null) use ($app) {
            return $app['Illuminate\Contracts\Queue\Factory']->connection($connection);
        });
    });

    $this->app->alias(
        'Illuminate\Bus\Dispatcher', 'Illuminate\Contracts\Bus\Dispatcher'
    );

    $this->app->alias(
        'Illuminate\Bus\Dispatcher', 'Illuminate\Contracts\Bus\QueueingDispatcher'
    );
}
AI 代码解读

所以最后是实例化了Illuminate\Bus\Dispatcher

看看它的dispatch函数做了啥?

public function dispatch($command)
{
    if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
        return $this->dispatchToQueue($command);
    } else {
        return $this->dispatchNow($command);
    }
}
AI 代码解读

假设我们的dispatch是基于队列的(ShouldQueue)。那么就是走dispatchToQueue,最终,走的是pushCommandToQueue

protected function pushCommandToQueue($queue, $command)
{
    ...

    if (isset($command->delay)) {
        return $queue->later($command->delay, $command);
    }
    ...
}
AI 代码解读

这里的queue就是队列的范畴了,假设我们用的队列是redis。(队列的解析器就是singleton的时候传入的Cluster)。最终这里落入的是vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php:111的

public function later($delay, $job, $data = '', $queue = null)
{
    $payload = $this->createPayload($job, $data);

    $this->getConnection()->zadd(
        $this->getQueue($queue).':delayed', $this->getTime() + $this->getSeconds($delay), $payload
    );

    return Arr::get(json_decode($payload, true), 'id');
}
AI 代码解读

这下就看清楚了:

laravel的延迟队列,使用的是zadd命令,往{$queue}:delayed,中插入一条job信息,它的score是执行时间。

(得到这条结论还真tmd是不容易)

队列监听部分

队列监听命令来自于: php artisan queue:work

命令行的入口就不追踪了,直接到vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php:29 类

protected function runWorker($connection, $queue)
{
    $this->worker->setCache($this->laravel['cache']->driver());

    return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
}
AI 代码解读

这里的daemon和runNextJob是只跑一次还是持续跑的意思,我们当然假定是以daemon的形式在跑。

这里的worker是vendor/laravel/framework/src/Illuminate/Queue/Worker.php:78

public function daemon($connectionName, $queue, WorkerOptions $options)
{
    $lastRestart = $this->getTimestampOfLastQueueRestart();

    while (true) {
        $this->registerTimeoutHandler($options);

        if ($this->daemonShouldRun($options)) {
            $this->runNextJob($connectionName, $queue, $options);
        } else {
            $this->sleep($options->sleep);
        }

        if ($this->memoryExceeded($options->memory) ||
            $this->queueShouldRestart($lastRestart)) {
            $this->stop();
        }
    }
}
AI 代码解读

这里的代码就值得我们自己写deamon的时候来参考了,它考虑了timeout,考虑了memory的情况。

而runNextJob的命令实际上就很清晰了

public function runNextJob($connectionName, $queue, WorkerOptions $options)
{
    ...
        $job = $this->getNextJob(
            $this->manager->connection($connectionName), $queue
        );

        ...
            return $this->process(
                $connectionName, $job, $options
            );
    ...
}
AI 代码解读

这里的Manager对应的是QueueManager, 这个类内部会创建一个connector(vendor/laravel/framework/src/Illuminate/Queue/Connectors/RedisConnector.php:30)

public function connect(array $config)
{
    return new RedisQueue(
        $this->redis, $config['queue'],
        Arr::get($config, 'connection', $this->connection),
        Arr::get($config, 'retry_after', 60)
    );
}
AI 代码解读

看到这里就明白了,最后还是掉落到RedisQueue中。 很好,和我们前面的任务分发终于对上了,圈子差不多画完了,我们可以看到曙光了。

追到RedisQueue里面,看它的pop行为。

public function pop($queue = null)
{
    $original = $queue ?: $this->default;

    $queue = $this->getQueue($queue);

    $this->migrateExpiredJobs($queue.':delayed', $queue);

    if (! is_null($this->expire)) {
        $this->migrateExpiredJobs($queue.':reserved', $queue);
    }

    list($job, $reserved) = $this->getConnection()->eval(
        LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire
    );

    if ($reserved) {
        return new RedisJob($this->container, $this, $job, $reserved, $original);
    }
}
AI 代码解读

这段就是精华了。它做了什么事情呢?

先看migrateExpiredJobs:

public function migrateExpiredJobs($from, $to)
{
    $this->getConnection()->eval(
        LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->getTime()
    );
}
AI 代码解读

这里的eval就是对应redis的eval操作,https://redis.io/commands/eval,2是说明后面有两个key,最后一个getTime()获取的是arg。
下面就看lua脚本了。

public static function migrateExpiredJobs()
{
    return <<<'LUA'
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
    redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
end
end
return true
LUA;
}
AI 代码解读

结合起来看就是:

  • 使用zrangebyscore 和zremrangebyrank 从{queue}:delayed 队列中,从-inf到now的任务拿出来。
  • 用rpush的方式存入到默认queue中(后续就是放入到{queue}:reserved )

这个zrangebyscore就是判断延迟任务是否应该执行的操作了。

然后就进行的是

list($job, $reserved) = $this->getConnection()->eval(
    LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire
);
AI 代码解读

这里的LuaScripts::pop()如下:

public static function pop()
{
    return <<<'LUA'
local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', KEYS[2], ARGV[1], reserved)
end
return {job, reserved}
LUA;
}
AI 代码解读

做了下面操作:

  • 把默认队列中的任务lpop出来
  • 将他的attempts次数+1
  • zadd 存入{queue}:reserved 队列,score为now+60(默认的过期时间)

最后,我就返回这个job,这里结束了getNextJob的过程

process过程就是调用了一下:vendor/laravel/framework/src/Illuminate/Queue/Worker.php:187

public function process($connectionName, $job, WorkerOptions $options)
{
    try {
        $this->raiseBeforeJobEvent($connectionName, $job);

        $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
            $connectionName, $job, (int) $options->maxTries
        );

        // Here we will fire off the job and let it process. We will catch any exceptions so
        // they can be reported to the developers logs, etc. Once the job is finished the
        // proper events will be fired to let any listeners know this job has finished.
        $job->fire();

        $this->raiseAfterJobEvent($connectionName, $job);
    } catch (Exception $e) {
        $this->handleJobException($connectionName, $job, $options, $e);
    } catch (Throwable $e) {
        $this->handleJobException(
            $connectionName, $job, $options, new FatalThrowableError($e)
        );
    }
}
AI 代码解读

$this->events->fire(new Events\JobProcessing(
    $connectionName, $job
));
AI 代码解读

这里的raiseBeforeJobEvent和raiseAfterJobEvent又是使用event和listener的形式来做处理的。这里的$this->events是vendor/laravel/framework/src/Illuminate/Events/Dispatcher.php:197

这里就是触发了一个Events\JobProcessing事件,我们现在要找到对应的lister:

答案是在QueueManager中定义的

/**
 * Register an event listener for the before job event.
 *
 * @param  mixed  $callback
 * @return void
 */
public function before($callback)
{
    $this->app['events']->listen(Events\JobProcessing::class, $callback);
}

/**
 * Register an event listener for the after job event.
 *
 * @param  mixed  $callback
 * @return void
 */
public function after($callback)
{
    $this->app['events']->listen(Events\JobProcessed::class, $callback);
}
AI 代码解读

换句话说,我们希望监听一个job开始和结束的时候,我们可以使用QueueManager的before,after来监听。比如发个邮件,唱唱小曲啥的。

那么这里我们,从{queue}:reserved中获取了job之后(这里的job是RedisJob),我们是什么时候触发的delete呢?是在

$job->fire();
AI 代码解读

这个fire是RedisJob(vendor/laravel/framework/src/Illuminate/Queue/Jobs/RedisJob.php)但继承来自vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php:72, 经过调用CallQueuedHandler,最终会落到
vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php:154

public function deleteReserved($queue, $job)
{
    $this->getConnection()->zrem($this->getQueue($queue).':reserved', $job);
}
AI 代码解读

这里就是将job从{queue}:reserved 队列中删除。

至此,整个队列及延迟机制就处理完了。

实际

我们实际监听一下redis就可以验证结果:

// 使用dispatch
1489802272.491060 [0 127.0.0.1:63798] "SELECT" "0"
1489802272.491513 [0 127.0.0.1:63798] "ZADD" "queues:default:delayed" "1489802332" "{\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\",\"data\":{\"commandName\":\"App\\\\Jobs\\\\DelayTestJob\",\"command\":\"O:21:\\\"App\\\\Jobs\\\\DelayTestJob\\\":4:{s:6:\\\"\\u0000*\\u0000job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:5:\\\"delay\\\";O:13:\\\"Carbon\\\\Carbon\\\":3:{s:4:\\\"date\\\";s:26:\\\"2017-03-18 01:58:52.000000\\\";s:13:\\\"timezone_type\\\";i:3;s:8:\\\"timezone\\\";s:3:\\\"UTC\\\";}}\"},\"id\":\"q7ss6fRgCbMNHhCv6gOXX0Or7B43blU9\",\"attempts\":1}"


// 1分钟后
1489802333.957500 [0 127.0.0.1:63792] "EVAL" "local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])\nif(next(val) ~= nil) then\n    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)\n    for i = 1, #val, 100 do\n        redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))\n    end\nend\nreturn true" "2" "queues:default:delayed" "queues:default" "1489802333"
1489802333.957563 [0 lua] "zrangebyscore" "queues:default:delayed" "-inf" "1489802333"
1489802333.957586 [0 lua] "zremrangebyrank" "queues:default:delayed" "0" "0"
1489802333.958628 [0 lua] "rpush" "queues:default" "{\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\",\"data\":{\"commandName\":\"App\\\\Jobs\\\\DelayTestJob\",\"command\":\"O:21:\\\"App\\\\Jobs\\\\DelayTestJob\\\":4:{s:6:\\\"\\u0000*\\u0000job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:5:\\\"delay\\\";O:13:\\\"Carbon\\\\Carbon\\\":3:{s:4:\\\"date\\\";s:26:\\\"2017-03-18 01:58:52.000000\\\";s:13:\\\"timezone_type\\\";i:3;s:8:\\\"timezone\\\";s:3:\\\"UTC\\\";}}\"},\"id\":\"q7ss6fRgCbMNHhCv6gOXX0Or7B43blU9\",\"attempts\":1}"
1489802333.959572 [0 127.0.0.1:63792] "EVAL" "local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])\nif(next(val) ~= nil) then\n    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)\n    for i = 1, #val, 100 do\n        redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))\n    end\nend\nreturn true" "2" "queues:default:reserved" "queues:default" "1489802333"
1489802333.959672 [0 lua] "zrangebyscore" "queues:default:reserved" "-inf" "1489802333"
1489802333.959866 [0 127.0.0.1:63792] "EVAL" "local job = redis.call('lpop', KEYS[1])\nlocal reserved = false\nif(job ~= false) then\n    reserved = cjson.decode(job)\n    reserved['attempts'] = reserved['attempts'] + 1\n    reserved = cjson.encode(reserved)\n    redis.call('zadd', KEYS[2], ARGV[1], reserved)\nend\nreturn {job, reserved}" "2" "queues:default" "queues:default:reserved" "1489802343"
1489802333.959938 [0 lua] "lpop" "queues:default"
1489802333.959965 [0 lua] "zadd" "queues:default:reserved" "1489802343" "{\"id\":\"q7ss6fRgCbMNHhCv6gOXX0Or7B43blU9\",\"attempts\":2,\"data\":{\"command\":\"O:21:\\\"App\\\\Jobs\\\\DelayTestJob\\\":4:{s:6:\\\"\\u0000*\\u0000job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:5:\\\"delay\\\";O:13:\\\"Carbon\\\\Carbon\\\":3:{s:4:\\\"date\\\";s:26:\\\"2017-03-18 01:58:52.000000\\\";s:13:\\\"timezone_type\\\";i:3;s:8:\\\"timezone\\\";s:3:\\\"UTC\\\";}}\",\"commandName\":\"App\\\\Jobs\\\\DelayTestJob\"},\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\"}"
1489802333.963223 [0 127.0.0.1:63792] "ZREM" "queues:default:reserved" "{\"id\":\"q7ss6fRgCbMNHhCv6gOXX0Or7B43blU9\",\"attempts\":2,\"data\":{\"command\":\"O:21:\\\"App\\\\Jobs\\\\DelayTestJob\\\":4:{s:6:\\\"\\u0000*\\u0000job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:5:\\\"delay\\\";O:13:\\\"Carbon\\\\Carbon\\\":3:{s:4:\\\"date\\\";s:26:\\\"2017-03-18 01:58:52.000000\\\";s:13:\\\"timezone_type\\\";i:3;s:8:\\\"timezone\\\";s:3:\\\"UTC\\\";}}\",\"commandName\":\"App\\\\Jobs\\\\DelayTestJob\"},\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\"}"
AI 代码解读

精简下路径就是:

// 第一步:先往delayed队列中插入job
1489802272.491513 [0 127.0.0.1:63798] "ZADD" "queues:default:delayed" "1489802332" {job}

// 第二步,将delayed队列中到期的job取出,并且rpush进default队列
1489802333.957563 [0 lua] "zrangebyscore" "queues:default:delayed" "-inf" "1489802333"
1489802333.957586 [0 lua] "zremrangebyrank" "queues:default:delayed" "0" "0"
1489802333.958628 [0 lua] "rpush" "queues:default" {job}

// 第三步,从default队列中lpop出job
1489802333.959938 [0 lua] "lpop" "queues:default"

// 第四步,zadd到default:reserved
1489802333.959965 [0 lua] "zadd" "queues:default:reserved" "1489802343" {job}

// 第五步,程序处理这个job

// 第六步,讲job从default:reserved中删除
1489802333.963223 [0 127.0.0.1:63792] "ZREM" "queues:default:reserved" {job}
AI 代码解读

符合预期。

总结

laravel这边的延迟队列使用了三个队列。

  • queue:default:delayed // 存储延迟任务
  • queue:default // 存储“生”任务,就是未处理任务
  • queue:default:reserved // 存储待处理任务

任务在三个队列中进行轮转,最后一定进入到queue:default:reserved,并且成功后把任务从这个队列中删除。

其间还使用了lua脚本,所以至少laravel5.3(本文的laravel环境)在无lua脚本支持的redis版本是跑不了的。

它用三个队列把所有的步骤给原子了,所以并没有使用multi等操作。也是防止了锁的使用把。每一步操作失败了,都会有后续的步骤继续帮忙完成,记录等行为的。


本文转自轩脉刃博客园博客,原文链接:http://www.cnblogs.com/yjf512/p/6571941.html,如需转载请自行联系原作者

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
打赏
0
0
0
0
56
分享
相关文章
想了解流计算,你必须得看一眼,实现Flink on Yarn的三种部署方式,并运行wordcount
想了解流计算,你必须得看一眼,实现Flink on Yarn的三种部署方式,并运行wordcount
1315 0
想了解流计算,你必须得看一眼,实现Flink on Yarn的三种部署方式,并运行wordcount
AI大模型进阶系列(03) prompt 工程指南 | 实战核心技术有哪些?
本文深入讲解了AI大模型中的prompt工程。文章分析了role角色(system、user、assistant)的意义,message多轮会话记忆机制,以及prompt的核心三要素(上下文背景、输入内容、输出指示)。同时介绍了多种提示优化技术,如少样本提示、CoT链式思考、prompt chaining链式提示、思维树ToT提示等,还展示了让AI生成提示词的方法,为实际应用提供了全面指导。
🧠 用 AI 提升你的编程效率 —— 在 PyCharm 中体验通义灵码
通义灵码是一款基于大模型的智能编程辅助工具,现已上线PyCharm插件V2.5+版本。它能根据自然语言描述、注释或上下文生成高质量代码,支持多语言(Python、Java等),提供代码补全、优化建议、单元测试生成及异常排查等功能。集成魔搭MCP市场3000+服务,具备编程智能体模式与长期记忆能力,助开发者提升效率。适用初学者、资深开发者及团队协作场景。小红书、B站、抖音、微博均有相关资源分享。 小红书: http://xhslink.com/a/SvabuxSObf3db bilibili:https://b23.tv/1HJAdIx 抖音: https://v.douyin.com/1DAG
544 3
网络安全视角:从地域到账号的阿里云日志审计实践
日志审计的必要性在于其能够帮助企业和组织落实法律要求,打破信息孤岛和应对安全威胁。选择 SLS 下日志审计应用,一方面是选择国家网络安全专用认证的日志分析产品,另一方面可以快速帮助大型公司统一管理多组地域、多个账号的日志数据。除了在日志服务中存储、查看和分析日志外,还可通过报表分析和告警配置,主动发现潜在的安全威胁,增强云上资产安全。
649 45
CSS进阶-CSS动画关键帧
【6月更文挑战第15天】CSS的`@keyframes`创建细腻动画,定义样式变化阶段以增强网页互动性。通过`animation`属性应用动画,如`fadeIn`示例。常见问题包括动画结束状态、卡顿和浏览器兼容性,解决办法涉及优化关键帧、使用硬件加速和添加前缀。进阶技巧包括多步骤动画和控制播放状态。例如,背景色渐变动画展示了颜色随时间变化的效果。学习和实践关键帧动画,提升Web开发技能。
368 7
网络抓包分析【IP,ICMP,ARP】以及 IP数据报,MAC帧,ICMP报和ARP报的数据报格式
本文详细介绍了如何使用网络抓包工具Wireshark进行网络抓包分析,包括以太网v2 MAC帧、IP数据报、ICMP报文和ARP报文的格式,以及不同网络通信的过程。文章通过抓包分析展示了IP数据报、ICMP数据报和ARP数据报的具体信息,包括MAC地址、IP地址、ICMP类型和代码、以及ARP的硬件类型、协议类型、操作类型等。通过这些分析,可以更好地理解网络协议的工作机制和数据传输过程。
网络抓包分析【IP,ICMP,ARP】以及 IP数据报,MAC帧,ICMP报和ARP报的数据报格式
MyBatis-Plus整合SpringBoot及使用
务必记住,随着MyBatis-Plus版本的更新,一些具体的配置和使用方式可能会有所变动。在实际开发过程中,建议参考MyBatis-Plus的官方文档,以获取最新和详细的指导。
236 1
央视《赢在AI+》正式发布!首场路演将于2024云栖大会亮相
刚刚,在中央广播电视总台举办的央视频金秋创新活动发布会上,聚焦AI领域的大型纪实创投节目——《赢在AI+》正式启动。同时,节目正式吹响集结令,向广大创业者、投资人发出邀请,成为AI领域的创新先锋!
510 21
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问