PHP+Laravel框架RabbitMQ简单使用(工作队列模式(竞争消费者模式))

简介: 工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式假如我们拥有两个消费者,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者,平均而言,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询。假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。

一、简介


网络异常,图片无法展示
|


工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式


假如我们拥有两个消费者,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者,平均而言,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询


假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。


举一个例子

一个1年的程序员,跟一个3年的程序员,分配相同的任务量,明显3年的程序员处理起来更加得心应手,很快就无所事事了,但是3年的程序员拿着非常高的薪资!显然3年的程序员应该承担更多的责任,那怎么办呢?

公平分发

案例中发生上述问题的原因是 RabbitMQ 收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量

因此我们可以使用 basicQos 方法,并将参数 prefetchCount 设为1,告诉 RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样 RabbitMQ 就不会轮流分发了,而是寻找空闲的工作者进行分发。


二、使用Laravel的command来实现消息的生产和消费


生产者(老板) RabbitmqProducerCommand下的task方法

<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
//引入amqp扩展
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqProducerCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbitmq_producer';//给生产者起个command名称
    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';
    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }
    /**
     * Execute the console command.
     *  生产者消息代码
     * @return int
     */
    public function handle()
    {
        $this->task();
    }
    /**
     * Point-to-Point,点对点通信模型。PTP是基于队列(Queue)的,一个队列可以有多个生产者,和多个消费者。消息服务器按照收到消息的先后顺序,将消息放到队列中。队列中的每一条消息,只能由一个消费者进行消费,消费之后就会从队列中移除。
     * Author: 李硕
     * Date: 2022/5/7
     * Time: 09:26
     * @throws \Exception
     */
    public function ptp()
    {
        //创建服务器连接
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        //连接信道
        //信道是生产消费者与rabbit通信的渠道,生产者publish或者消费者消费一个队列都是需要通过信道来通信的
        //信道是建立在TCP上面的虚拟链接,也就是rabbitMQ在一个TCP上面建立成百上千的信道来达到多个线程处理。
        //注意是一个TCP 被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID,保证了信道的私有性,对应上唯一的线程使用。
        $channel = $connection->channel();
        //channel->queue_declare通过信道创建一个是否是持久化的消息队列
        //queue第一个参数代表消息队列名称
        $channel->queue_declare('test', false, false, false, false);
        //往队列里要发送内容,待发送的内容
        $msg = new AMQPMessage('我是一个生产者消息');
        //通过信道来进行生产消息
        //而exchange是怎么知道消息应该推到哪个queue呢,这就要通过绑定queue与exchange时的routingkey了,通过代码进行绑定并且指定routingkey,下面有一张关系图,p(发布者) —> x(exchange) bindding(绑定关系也就是我们的routingkey) 红色代表着queue
        $channel->basic_publish($msg, '', 'test');
        echo " [x] Sent '我是一个生产者消息!'\n";
        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();
    }
    /**
     *  工作队列模式(竞争消费者模式)
     * Author: 李硕
     * Date: 2022/5/7
     * Time: 09:39
     * @throws \Exception
     */
    public function task()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
        //确保队列能够在 RabbitMQ 节点重启后继续存在,第三个参数设置为true
        $channel->queue_declare('task_queue', false, true, false, false);
        $argv = [];
        $data = implode(' ', array_slice($argv, 1));
        if (empty($data)) {
            $data = "Hello World!";
        }
        //消息标记为持久
        $msg = new AMQPMessage(
            $data,
            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
        );
        $channel->basic_publish($msg, '', 'task_queue');
        echo ' [x] Sent ', $data, "\n";
        $channel->close();
        $connection->close();
    }
}
复制代码


消费者(工人) RabbitmqProducerCommand下的task方法


<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqConsumerCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbitmq_consumer';//给消费者起个command名称
    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';
    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }
    /**
     * Execute the console command.
     * @return int
     */
    public function handle()
    {
        $this->task();
    }
    /**
     * Point-to-Point,点对点通信模型。PTP是基于队列(Queue)的,一个队列可以有多个生产者,和多个消费者。消息服务器按照收到消息的先后顺序,将消息放到队列中。队列中的每一条消息,只能由一个消费者进行消费,消费之后就会从队列中移除。
     * Author: 李硕
     * Date: 2022/5/7
     * Time: 09:26
     * @throws \Exception
     */
    public function ptp()
    {
        //创建服务器连接
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        //连接信道
        //信道是生产消费者与rabbit通信的渠道,生产者publish或者消费者消费一个队列都是需要通过信道来通信的
        //信道是建立在TCP上面的虚拟链接,也就是rabbitMQ在一个TCP上面建立成百上千的信道来达到多个线程处理。
        //注意是一个TCP 被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID,保证了信道的私有性,对应上唯一的线程使用。
        $channel = $connection->channel();
        //channel->queue_declare通过信道创建一个是否是持久化的消息队列
        //queue第一个参数代表消息队列名称
        $channel->queue_declare('test', false, false, false, false);
        echo " [*] Waiting for messages. To exit press CTRL+C\n";
        //进行监听消费者是否有消息,如果有进行输出消息内容
        $callback = function ($msg) {
            echo ' [x] Received ', $msg->body, "\n";
        };
        //通过信道进行消费消息
        $channel->basic_consume('test', '', false, true, false, false, $callback);
        //如果信道是打开状态
        while ($channel->is_open()) {
            //然后让信道一直处于监听等待状态
            $channel->wait();
        }
        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();
    }
    /**
     *  工作队列模式(竞争消费者模式)
     * Author: 李硕
     * Date: 2022/5/7
     * Time: 09:39
     * @throws \Exception
     */
    public function task()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
        $channel->queue_declare('task_queue', false, true, false, false);
        echo " [*] Waiting for messages. To exit press CTRL+C\n";
        $callback = function ($msg) {
            echo ' [x] Received ', $msg->body, "\n";
            sleep(substr_count($msg->body, '.'));
            echo " [x] Done\n";
            $msg->ack();
        };
        //公平调度
        $channel->basic_qos(null, 1, null);
        $channel->basic_consume('task_queue', '', false, false, false, false, $callback);
        while ($channel->is_open()) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}
复制代码


三、运行截图


执行生产消息 php artisan rabbitmq_producer


执行消费消息 hp artisan rabbitmq_consumer

网络异常,图片无法展示
|


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
设计模式 数据库连接 PHP
PHP中的设计模式:提升代码的可维护性与扩展性在软件开发过程中,设计模式是开发者们经常用到的工具之一。它们提供了经过验证的解决方案,可以帮助我们解决常见的软件设计问题。本文将介绍PHP中常用的设计模式,以及如何利用这些模式来提高代码的可维护性和扩展性。我们将从基础的设计模式入手,逐步深入到更复杂的应用场景。通过实际案例分析,读者可以更好地理解如何在PHP开发中应用这些设计模式,从而写出更加高效、灵活和易于维护的代码。
本文探讨了PHP中常用的设计模式及其在实际项目中的应用。内容涵盖设计模式的基本概念、分类和具体使用场景,重点介绍了单例模式、工厂模式和观察者模式等常见模式。通过具体的代码示例,展示了如何在PHP项目中有效利用设计模式来提升代码的可维护性和扩展性。文章还讨论了设计模式的选择原则和注意事项,帮助开发者在不同情境下做出最佳决策。
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
537 3
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
380 4
|
设计模式 监控 中间件
深入理解PHP中的中间件模式
【10月更文挑战第20天】探索PHP编程世界中的“交通枢纽”——中间件模式。从代码层面剖析如何实现请求和响应的高效管理,以及如何在开发中应用这一模式来增强项目的扩展性和维护性。
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
1139 6
|
中间件 PHP 开发者
深入理解PHP中的中间件模式
【10月更文挑战第6天】在PHP开发中,中间件模式是一种优雅的处理请求和响应的方式。本文将带你探索中间件模式的概念、实现及其在PHP框架中的应用,同时提供实用的代码示例来加深理解。
250 4
|
设计模式 数据库连接 PHP
PHP中的设计模式:如何提高代码的可维护性与扩展性在软件开发领域,PHP 是一种广泛使用的服务器端脚本语言。随着项目规模的扩大和复杂性的增加,保持代码的可维护性和可扩展性变得越来越重要。本文将探讨 PHP 中的设计模式,并通过实例展示如何应用这些模式来提高代码质量。
设计模式是经过验证的解决软件设计问题的方法。它们不是具体的代码,而是一种编码和设计经验的总结。在PHP开发中,合理地使用设计模式可以显著提高代码的可维护性、复用性和扩展性。本文将介绍几种常见的设计模式,包括单例模式、工厂模式和观察者模式,并通过具体的例子展示如何在PHP项目中应用这些模式。
|
设计模式 存储 安全
PHP中单例模式的深入解析与实践指南
在PHP开发领域,设计模式是构建高效、可维护代码的重要工具。本文聚焦于单例模式——一种确保类仅有一个实例,并提供全局访问点的模式。我们将从理论出发,探讨单例模式的基本概念、应用场景,并通过实际案例分析其在PHP中的实现技巧。最后,讨论单例模式的优势、潜在缺陷及如何在实际项目中合理运用。
|
设计模式 缓存 中间件
深入理解PHP中的中间件模式
【9月更文挑战第12天】本文旨在通过浅显易懂的语言和实际代码示例,引导读者了解PHP中如何实现和使用中间件模式,以及这一设计模式如何优化我们的应用程序结构。文章将逐步介绍中间件的概念、在PHP中的应用实例,以及如何自定义中间件来解决实际问题。