PHP+Laravel框架RabbitMQ简单使用(Pub/Sub模式)

简介: 一、简介publish-and- subscribe, 即发布订阅模型。在Pub/Sub模型中,生产者将消息发布到一个主题(Topic)中,订阅了该Topic的所有下游消费者,都可以接收到这条消息。特点:每个消息可以有多个订阅者;客户端只有订阅后才能接收到消息;持久订阅和非持久订阅。注意:发布者和订阅者有时间依赖:接受者和发布者只有建立订阅关系才能收到消息;持久订阅:订阅关系建立后,消息就不会消失,不管订阅者是否都在线;非持久订阅:订阅者为了接受消息,必须一直在线。 当只有一个订阅者时约等于点对点模式

一、简介


publish-and- subscribe, 即发布订阅模型。在Pub/Sub模型中,生产者将消息发布到一个主题(Topic)中,订阅了该Topic的所有下游消费者,都可以接收到这条消息。


特点:

  1. 每个消息可以有多个订阅者;
  2. 客户端只有订阅后才能接收到消息;
  3. 持久订阅和非持久订阅。


注意:

  1. 发布者和订阅者有时间依赖:接受者和发布者只有建立订阅关系才能收到消息;
  2. 持久订阅:订阅关系建立后,消息就不会消失,不管订阅者是否都在线;
  3. 非持久订阅:订阅者为了接受消息,必须一直在线。 当只有一个订阅者时约等于点对点模式


网络异常,图片无法展示
|

P 表示为生产者、 X 表示交换机、C1与C2 表示为消费者,红色表示队列。

通常情况下,一个条消息只要被消费一次就行了,那么什么情况下需要所有的消费者都对这条消息进行消费呢?最典型的情况就是需要在内存中对数据进行缓存,并需要实时进行更新。


例如,笔者做过一个违禁词系统,对用户输入的评论内容进行违禁词汇检测。这个违禁词系统,部署了在N台服务器上,为了提升检测性能,每台机器都会将违禁词库全量加载到内存中,词库的更新,是通过发送MQ消息来完成的。由于采用Pub/Sub模型,每台机器的consumer,都可以接收到这条消息,直接在内存中更新敏感词库即可。

我们现在希望将消息发布到我们的日志交换而不是无名的交换(emit_log.php)


二、使用Laravel的command来实现消息的生产和消费


发布者我们现在希望将消息发布到我们的日志交换而不是无名的交换 RabbitmqProducerCommand下的publish方法


<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
//引入amqp扩展
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqProducerCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbitmq_producer';//给生产者起个command名称
    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';
    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }
    /**
     * Execute the console command.
     *  生产者消息代码
     * @return int
     */
    public function handle()
    {
        $this->publish();
    }
    /**
     * 发布订阅模式的发布操作
     * publish-and- subscribe, 即发布订阅模型。在Pub/Sub模型中,生产者将消息发布到一个主题(Topic)中,订阅了该Topic的所有下游消费者,都可以接收到这条消息。
     * Author: 李硕
     * Date: 2022/5/7
     * Time: 11:03
     */
    public function publish()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
        //发布模式(fanout)
        $channel->exchange_declare('logs', 'fanout', false, false, false);
        $argv = [];
        $data = implode(' ', array_slice($argv, 1));
        if (empty($data)) {
            $data = "info: 我是发布者生产的消息消息!";
        }
        $msg = new AMQPMessage($data);
        $channel->basic_publish($msg, 'logs');
        echo ' [x] Sent ', $data, "\n";
        $channel->close();
        $connection->close();
    }
}
复制代码


订阅者RabbitmqProducerCommand下的subscribe方法


<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqConsumerCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbitmq_consumer';//给消费者起个command名称
    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';
    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }
    /**
     * Execute the console command.
     * @return int
     */
    public function handle()
    {
        $this->subscribe();
    }
    /**
     * 发布订阅模式 -订阅
     * publish-and- subscribe, 即发布订阅模型。在Pub/Sub模型中,生产者将消息发布到一个主题(Topic)中,订阅了该Topic的所有下游消费者,都可以接收到这条消息。
     * Author: 李硕
     * Date: 2022/5/7
     * Time: 11:03
     */
    public function subscribe()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
        $channel->exchange_declare('logs', 'fanout', false, false, false);
        list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
        $channel->queue_bind($queue_name, 'logs');
        echo " [*] Waiting for logs. To exit press CTRL+C\n";
        $callback = function ($msg) {
            echo ' 我是订阅者开始进行消费[x] ', $msg->body, "\n";
        };
        $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
        while ($channel->is_open()) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}
复制代码


三、运行截图


执行生产消息 php artisan rabbitmq_producer

执行消费消息 hp artisan rabbitmq_consumer


网络异常,图片无法展示
|


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
19天前
|
Cloud Native 关系型数据库 MySQL
PHP的演变之路:从初生之犊到成熟框架
【8月更文挑战第20天】本文将带你穿梭时空,探索PHP这一互联网基石语言的发展历程。我们将从PHP的起源谈起,见证它如何应对时代的挑战,逐步成长为支持现代Web开发的强力工具。文章不仅追溯PHP的历史轨迹,还将深入探讨其对开发者社区的影响以及在现代技术环境中的定位和未来趋势。
42 13
|
8天前
|
前端开发 中间件 PHP
|
8天前
|
缓存 中间件 PHP
Laravel 框架:优雅 PHP Web 开发的典范
【8月更文挑战第31天】
30 0
|
8天前
|
安全 Java 云计算
JSF 应用究竟何去何从?云端部署能否成为其全新突破点?快来一探究竟!
【8月更文挑战第31天】本文介绍了将JavaServer Faces(JSF)应用部署到云平台的过程。首先,根据成本、功能、可靠性和安全性选择合适的云平台。接着,展示了构建简单JSF应用的示例代码。最后,以AWS Elastic Beanstalk为例,详细说明了部署流程。部署至云端可提升应用的可用性、扩展性和安全性。
19 0
|
10天前
|
安全 中间件 网络安全
深入浅出PHP框架之Laravel的优雅云计算与网络安全:探索云服务、网络安全和信息安全的技术领域
【8月更文挑战第29天】在编程的世界里,PHP以其灵活性和易用性广受欢迎。本文将深入探讨PHP的一个流行框架——Laravel,揭示它如何以简洁、高雅的解决方案满足复杂的开发需求。我们将一起走进Laravel的世界,探索其背后的哲学,以及它如何让代码变得更加动人和富有韵律。
|
12天前
|
JavaScript PHP 开发者
PHP中的异常处理与自定义错误处理器构建高效Web应用:Node.js与Express框架实战指南
【8月更文挑战第27天】在PHP编程世界中,异常处理和错误管理是代码健壮性的关键。本文将深入探讨PHP的异常处理机制,并指导你如何创建自定义错误处理器,以便优雅地管理运行时错误。我们将一起学习如何使用try-catch块捕获异常,以及如何通过set_error_handler函数定制错误响应。准备好让你的代码变得更加可靠,同时提供更友好的错误信息给最终用户。
|
15天前
|
存储 关系型数据库 Linux
【Azure 应用服务】App Service For Linux 部署PHP Laravel 项目,如何修改首页路径为 wwwroot\public\index.php
【Azure 应用服务】App Service For Linux 部署PHP Laravel 项目,如何修改首页路径为 wwwroot\public\index.php
|
20天前
|
网络协议 API PHP
PhalApi:在宝塔一键安装部署PHP开源接口框架的教程
要在宝塔面板上一键安装部署PhalApi开源接口框架,首先进入宝塔软件商店,切换到“一键部署”选项,搜索“phalapi”并点击“一键部署”。安装时需填写接口域名、数据库名及密码,提交后等待安装完成。安装成功后可在宝塔面板中查看新站点和源代码目录,并通过DNS解析设置访问接口域名,如`http://myapi.phalapi.net/`。默认开启的调试模式便于测试,可通过修改`config/sys.php`中的`debug`值为`false`关闭。最后,在源代码中开发自己的PHP接口,PhalApi会自动生成在线接口文档,方便后续调用与维护。更多详细教程可参考官方文档。
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
195 0
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
109 0