PHP+Laravel+RabbitMQ实现异步延迟消息队列(库存归还)

简介: 一、前言需求:电商秒杀场景中,如果用户下单10分钟未支付,需要进行库存归还本篇是用PHP+Laravel+RabbitMQ来实现异步延迟消息队列

一、前言


需求:电商秒杀场景中,如果用户下单10分钟未支付,需要进行库存归还

本篇是用PHP+Laravel+RabbitMQ来实现异步延迟消息队列


二、场景



  • 在电商项目中,当我们下单之后,一般需要 20 分钟之内或者 30 分钟之内付款,否则订单就会进入异常处理逻辑中,被取消,那么进入到异常处理逻辑中,就可以当成是一个延迟队列
  • 公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户
  • 安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人
  • 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时
  • ...

很多场景下我们都需要延迟队列。

本文以 RabbitMQ 为例来和大家聊一聊延迟队列的玩法。

使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,

这种方案较简单。

三、安装RabbitMQ延迟队列插件


官网插件下载地址

网络异常,图片无法展示
|


我这里直接下载了最新版本,你们根据自己的rabbitmq版本号进行下载


网络异常,图片无法展示
|


把下载好的文件移动到rabbitmq的插件plugins下,以我自己的Mac为例子,放到了如下路径


网络异常,图片无法展示
|


然后执行安装插件指令,如下

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码


网络异常,图片无法展示
|


最后重启rabbitmq服务,并刷新查看exchanges交换机有没有该插件

网络异常,图片无法展示
|


如上图则延迟消息队列插件安装完成


四、在Laravel框架中进行使用


新建rabbitmq服务类,包含延迟消息队列生产消息,和消费消息,如下

网络异常,图片无法展示
|


核心代码如下:

<?php
namespace App\Http\Controllers\Service;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class RabbitmqServer
{
    private $host = "127.0.0.1";
    private $port = 5672;
    private $user = "guest";
    private $password = "guest";
    private $msg;
    private $channel;
    private $connection;
    //  过期时间
    const TIMEOUT_5_S = 5;     // 5s
    const TIMEOUT_10_S = 10;    // 10s
    private $exchange_logs = "logs";
    private $exchange_direct = "direct";
    private $exchange_delayed = "delayed";
    private $queue_delayed = "delayedQueue";
    const EXCHANGETYPE_FANOUT = "fanout";
    const EXCHANGETYPE_DIRECT = "direct";
    const EXCHANGETYPE_DELAYED = "x-delayed-message";
    public function __construct($type = false)
    {
        $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);
        $this->channel = $this->connection->channel();
        // 声明Exchange
        $this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false, false, false, new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT]));
        $this->channel->queue_declare($this->queue_delayed, false, true, false, false);
        $this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed, $this->queue_delayed);
    }
    /**
     * delay creat message
     */
    public function createMessageDelay($msg, $time)
    {
        $delayConfig = [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'application_headers' => new AMQPTable(['x-delay' => $time * 1000])
        ];
        $msg = new AMQPMessage($msg, $delayConfig);
        return $msg;
    }
    /**
     * delay send message
     */
    public function sendDelay($msg, $time = self::TIMEOUT_10_S)
    {
        $msg = $this->createMessageDelay($msg, $time);;
        $this->channel->basic_publish($msg, $this->exchange_delayed, $this->queue_delayed);
        $this->channel->close();
        $this->connection->close();
    }
    /**
     * delay consum
     */
    public function consumDelay()
    {
        $callback = function ($msg) {
            echo ' [x] ', $msg->body, "\n";
            $this->channel->basic_ack($msg->delivery_info['delivery_tag'], false);
        };
        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback);
        echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }
        $this->channel->close();
        $this->connection->close();
    }
}
复制代码


比如新建QueueController控制器,进行测试生产消息放到延迟消息队列中

网络异常,图片无法展示
|


代码如下:

<?php
namespace App\Http\Controllers\Api\v1;
use App\Http\Controllers\Controller;
use App\Http\Controllers\Service\RabbitmqServer;
use App\Jobs\Queue;
use Illuminate\Http\Request;
class QueueController extends Controller
{
    //
    public function index(Request $request)
    {
        //比如说现在是下订单操作
        //需求:如果用户10分钟之内不支付订单就要取消订单,并且库存归还
        $msg = $request->post();
        $Rabbit = new RabbitmqServer("x-delayed-message");
        //第一个参数发送的消息,第二个参数延迟多少秒
        $Rabbit->sendDelay(json_encode($msg),5);
    }
}
复制代码


至此通过接口调试工具进行模拟生产消息即可

消息生产完毕要进行消费,这里使用的是Laravel的任务调度,代码如下

网络异常,图片无法展示
|


<?php
namespace App\Console\Commands;
use App\Http\Controllers\Service\RabbitmqServer;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqConsumerCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbitmq_consumer';//给消费者起个command名称
    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';
    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }
    /**
     * Execute the console command.
     * @return int
     */
    public function handle()
    {
        $Rabbit = new RabbitmqServer("x-delayed-message");
        $Rabbit->consumDelay();
    }
}
复制代码


五、执行生产消息和消费消息


用postman模拟生产消息,效果如下:

网络异常,图片无法展示
|


然后消费消息,用一下命令,如果延迟5秒执行消费则成功

网络异常,图片无法展示
|


至此,就完成了rabbitmq异步延迟消息队列


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
中间件 PHP 调度
深入浅出PHP框架之Laravel的优雅
【10月更文挑战第3天】在PHP的世界里,Laravel以其优雅和简洁闻名。本文将带你走进Laravel的世界,探索它的魔法。我们将通过代码示例,一步步揭示Laravel的魅力。准备好,让我们一起开始这场奇妙的旅程吧!
|
8月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
6月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
466 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
6月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
376 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
JSON 自然语言处理 前端开发
【01】对APP进行语言包功能开发-APP自动识别地区ip后分配对应的语言功能复杂吗?-成熟app项目语言包功能定制开发-前端以uniapp-基于vue.js后端以laravel基于php为例项目实战-优雅草卓伊凡
【01】对APP进行语言包功能开发-APP自动识别地区ip后分配对应的语言功能复杂吗?-成熟app项目语言包功能定制开发-前端以uniapp-基于vue.js后端以laravel基于php为例项目实战-优雅草卓伊凡
663 72
【01】对APP进行语言包功能开发-APP自动识别地区ip后分配对应的语言功能复杂吗?-成熟app项目语言包功能定制开发-前端以uniapp-基于vue.js后端以laravel基于php为例项目实战-优雅草卓伊凡
|
消息中间件 监控 Java
RocketMQ 同步发送、异步发送和单向发送,如何选择?
本文详细分析了 RocketMQ 中同步发送、异步发送和单向发送三种消息发送方式的原理、优缺点及适用场景。同步发送可靠性高但延迟较大,适合订单系统等场景;异步发送非阻塞且延迟低,适用于实时数据处理等场景;单向发送高效但可靠性低,适用于日志收集等场景。文章还提供了示例代码和核心源码分析,帮助读者更好地理解每种发送方式的特点。
2463 4
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
1007 91
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
486 98
|
11月前
|
前端开发 API PHP
PHP网编程:guzzle的鉴权和异步操作实践。
Guzzle是一个强大且灵活的HTTP客户端库,它可以方便地发送HTTP请求,并且其对异步请求和各类鉴权方式的支持使其成为处理HTTP请求的理想工具。你需要了解和掌握Guzzle的异步操作并发请求和鉴权方式,以便在实际的开发中得心应手地处理HTTP请求。
321 13
|
PHP 计算机视觉 UED
Buzz库:PHP图像处理中的异步图像下载和保存
Buzz库:PHP图像处理中的异步图像下载和保存

相关产品

  • 云消息队列 MQ