PHP+Laravel框架RabbitMQ简单使用(工作队列模式(竞争消费者模式))

简介: 工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式假如我们拥有两个消费者,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者,平均而言,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询。假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。

一、简介


网络异常,图片无法展示
|


工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式


假如我们拥有两个消费者,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者,平均而言,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询


假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。


举一个例子

一个1年的程序员,跟一个3年的程序员,分配相同的任务量,明显3年的程序员处理起来更加得心应手,很快就无所事事了,但是3年的程序员拿着非常高的薪资!显然3年的程序员应该承担更多的责任,那怎么办呢?

公平分发

案例中发生上述问题的原因是 RabbitMQ 收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量

因此我们可以使用 basicQos 方法,并将参数 prefetchCount 设为1,告诉 RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样 RabbitMQ 就不会轮流分发了,而是寻找空闲的工作者进行分发。


二、使用Laravel的command来实现消息的生产和消费


生产者(老板) RabbitmqProducerCommand下的task方法

<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
//引入amqp扩展
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqProducerCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbitmq_producer';//给生产者起个command名称
    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';
    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }
    /**
     * Execute the console command.
     *  生产者消息代码
     * @return int
     */
    public function handle()
    {
        $this->task();
    }
    /**
     * Point-to-Point,点对点通信模型。PTP是基于队列(Queue)的,一个队列可以有多个生产者,和多个消费者。消息服务器按照收到消息的先后顺序,将消息放到队列中。队列中的每一条消息,只能由一个消费者进行消费,消费之后就会从队列中移除。
     * Author: 李硕
     * Date: 2022/5/7
     * Time: 09:26
     * @throws \Exception
     */
    public function ptp()
    {
        //创建服务器连接
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        //连接信道
        //信道是生产消费者与rabbit通信的渠道,生产者publish或者消费者消费一个队列都是需要通过信道来通信的
        //信道是建立在TCP上面的虚拟链接,也就是rabbitMQ在一个TCP上面建立成百上千的信道来达到多个线程处理。
        //注意是一个TCP 被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID,保证了信道的私有性,对应上唯一的线程使用。
        $channel = $connection->channel();
        //channel->queue_declare通过信道创建一个是否是持久化的消息队列
        //queue第一个参数代表消息队列名称
        $channel->queue_declare('test', false, false, false, false);
        //往队列里要发送内容,待发送的内容
        $msg = new AMQPMessage('我是一个生产者消息');
        //通过信道来进行生产消息
        //而exchange是怎么知道消息应该推到哪个queue呢,这就要通过绑定queue与exchange时的routingkey了,通过代码进行绑定并且指定routingkey,下面有一张关系图,p(发布者) —> x(exchange) bindding(绑定关系也就是我们的routingkey) 红色代表着queue
        $channel->basic_publish($msg, '', 'test');
        echo " [x] Sent '我是一个生产者消息!'\n";
        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();
    }
    /**
     *  工作队列模式(竞争消费者模式)
     * Author: 李硕
     * Date: 2022/5/7
     * Time: 09:39
     * @throws \Exception
     */
    public function task()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
        //确保队列能够在 RabbitMQ 节点重启后继续存在,第三个参数设置为true
        $channel->queue_declare('task_queue', false, true, false, false);
        $argv = [];
        $data = implode(' ', array_slice($argv, 1));
        if (empty($data)) {
            $data = "Hello World!";
        }
        //消息标记为持久
        $msg = new AMQPMessage(
            $data,
            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
        );
        $channel->basic_publish($msg, '', 'task_queue');
        echo ' [x] Sent ', $data, "\n";
        $channel->close();
        $connection->close();
    }
}
复制代码


消费者(工人) RabbitmqProducerCommand下的task方法


<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqConsumerCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbitmq_consumer';//给消费者起个command名称
    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';
    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }
    /**
     * Execute the console command.
     * @return int
     */
    public function handle()
    {
        $this->task();
    }
    /**
     * Point-to-Point,点对点通信模型。PTP是基于队列(Queue)的,一个队列可以有多个生产者,和多个消费者。消息服务器按照收到消息的先后顺序,将消息放到队列中。队列中的每一条消息,只能由一个消费者进行消费,消费之后就会从队列中移除。
     * Author: 李硕
     * Date: 2022/5/7
     * Time: 09:26
     * @throws \Exception
     */
    public function ptp()
    {
        //创建服务器连接
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        //连接信道
        //信道是生产消费者与rabbit通信的渠道,生产者publish或者消费者消费一个队列都是需要通过信道来通信的
        //信道是建立在TCP上面的虚拟链接,也就是rabbitMQ在一个TCP上面建立成百上千的信道来达到多个线程处理。
        //注意是一个TCP 被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID,保证了信道的私有性,对应上唯一的线程使用。
        $channel = $connection->channel();
        //channel->queue_declare通过信道创建一个是否是持久化的消息队列
        //queue第一个参数代表消息队列名称
        $channel->queue_declare('test', false, false, false, false);
        echo " [*] Waiting for messages. To exit press CTRL+C\n";
        //进行监听消费者是否有消息,如果有进行输出消息内容
        $callback = function ($msg) {
            echo ' [x] Received ', $msg->body, "\n";
        };
        //通过信道进行消费消息
        $channel->basic_consume('test', '', false, true, false, false, $callback);
        //如果信道是打开状态
        while ($channel->is_open()) {
            //然后让信道一直处于监听等待状态
            $channel->wait();
        }
        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();
    }
    /**
     *  工作队列模式(竞争消费者模式)
     * Author: 李硕
     * Date: 2022/5/7
     * Time: 09:39
     * @throws \Exception
     */
    public function task()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
        $channel->queue_declare('task_queue', false, true, false, false);
        echo " [*] Waiting for messages. To exit press CTRL+C\n";
        $callback = function ($msg) {
            echo ' [x] Received ', $msg->body, "\n";
            sleep(substr_count($msg->body, '.'));
            echo " [x] Done\n";
            $msg->ack();
        };
        //公平调度
        $channel->basic_qos(null, 1, null);
        $channel->basic_consume('task_queue', '', false, false, false, false, $callback);
        while ($channel->is_open()) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}
复制代码


三、运行截图


执行生产消息 php artisan rabbitmq_producer


执行消费消息 hp artisan rabbitmq_consumer

网络异常,图片无法展示
|


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
中间件 PHP 调度
深入浅出PHP框架之Laravel的优雅
【10月更文挑战第3天】在PHP的世界里,Laravel以其优雅和简洁闻名。本文将带你走进Laravel的世界,探索它的魔法。我们将通过代码示例,一步步揭示Laravel的魅力。准备好,让我们一起开始这场奇妙的旅程吧!
|
JSON 自然语言处理 前端开发
【01】对APP进行语言包功能开发-APP自动识别地区ip后分配对应的语言功能复杂吗?-成熟app项目语言包功能定制开发-前端以uniapp-基于vue.js后端以laravel基于php为例项目实战-优雅草卓伊凡
【01】对APP进行语言包功能开发-APP自动识别地区ip后分配对应的语言功能复杂吗?-成熟app项目语言包功能定制开发-前端以uniapp-基于vue.js后端以laravel基于php为例项目实战-优雅草卓伊凡
642 72
【01】对APP进行语言包功能开发-APP自动识别地区ip后分配对应的语言功能复杂吗?-成熟app项目语言包功能定制开发-前端以uniapp-基于vue.js后端以laravel基于php为例项目实战-优雅草卓伊凡
|
缓存 安全 PHP
深入浅出PHP框架之Laravel的优雅与实用
【10月更文挑战第22天】在PHP的世界里,Laravel如同一股清流,以其优雅的设计和实用的功能赢得了广大开发者的喜爱。本文将带你走进Laravel的世界,探索其背后的设计哲学,以及如何利用Laravel构建高效、可维护的Web应用。从路由到模型,从控制器到视图,我们将一步步揭开Laravel的神秘面纱。
273 3
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
存储 前端开发 PHP
深入浅出PHP框架之Laravel的优雅
【9月更文挑战第31天】在编程世界里,PHP语言如同一位多才多艺的老匠人,而Laravel框架则是其手中的精工细作。本文将带你领略Laravel的魅力所在,从其优雅的设计哲学到实用的功能特性,再到如何通过实际代码示例掌握它的核心操作。让我们一起探索Laravel的世界,发现编程之美。
193 6
|
前端开发 中间件 PHP
PHP框架深度解析:Laravel的魔力与实战应用####
【10月更文挑战第31天】 本文作为一篇技术深度好文,旨在揭开PHP领域璀璨明星——Laravel框架的神秘面纱。不同于常规摘要的概括性介绍,本文将直接以一段引人入胜的技术剖析开场,随后通过具体代码示例和实战案例,逐步引导读者领略Laravel在简化开发流程、提升代码质量及促进团队协作方面的卓越能力。无论你是PHP初学者渴望深入了解现代开发范式,还是经验丰富的开发者寻求优化项目架构的灵感,本文都将为你提供宝贵的见解与实践指导。 ####
|
安全 前端开发 PHP
构建与验证表单:传统PHP与Laravel框架的比较分析——探索Web开发中表单处理的优化策略和最佳实践
【8月更文挑战第31天】在 Web 开发中,表单构建与数据验证至关重要。传统 PHP 方法需手动处理 HTML 表单和数据验证,而 Laravel 框架则提供了一种更现代、高效的解决方案。本文通过对比传统 PHP 和 Laravel 的方法,探讨表单构建与验证的最佳实践。Laravel 通过简洁的语法糖、内置的数据过滤和验证机制,显著提升了代码的安全性和可维护性,适用于大型项目或需要快速开发的场景。然而,在追求灵活性的小型项目中,直接使用 PHP 仍是不错的选择。了解两者的优劣,有助于开发者根据项目需求做出最佳决策。
188 0
|
缓存 安全 PHP
深入浅出PHP框架:Laravel的优雅之旅
【8月更文挑战第15天】 探索PHP世界里的瑰宝,Laravel框架以其优雅、简洁著称。本文将带你领略Laravel的核心魅力,从安装到构建应用,再到高级特性的应用,让你轻松驾驭这个强大的工具。无论你是PHP新手还是资深开发者,这篇文章都将成为你理解并使用Laravel的指南针。
147 2
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
前端开发 中间件 PHP