PHP+Laravel框架RabbitMQ简单使用(Pub/Sub模式)

简介: 一、简介publish-and- subscribe, 即发布订阅模型。在Pub/Sub模型中,生产者将消息发布到一个主题(Topic)中,订阅了该Topic的所有下游消费者,都可以接收到这条消息。特点:每个消息可以有多个订阅者;客户端只有订阅后才能接收到消息;持久订阅和非持久订阅。注意:发布者和订阅者有时间依赖:接受者和发布者只有建立订阅关系才能收到消息;持久订阅:订阅关系建立后,消息就不会消失,不管订阅者是否都在线;非持久订阅:订阅者为了接受消息,必须一直在线。 当只有一个订阅者时约等于点对点模式

一、简介


publish-and- subscribe, 即发布订阅模型。在Pub/Sub模型中,生产者将消息发布到一个主题(Topic)中,订阅了该Topic的所有下游消费者,都可以接收到这条消息。


特点:

  1. 每个消息可以有多个订阅者;
  2. 客户端只有订阅后才能接收到消息;
  3. 持久订阅和非持久订阅。


注意:

  1. 发布者和订阅者有时间依赖:接受者和发布者只有建立订阅关系才能收到消息;
  2. 持久订阅:订阅关系建立后,消息就不会消失,不管订阅者是否都在线;
  3. 非持久订阅:订阅者为了接受消息,必须一直在线。 当只有一个订阅者时约等于点对点模式


网络异常,图片无法展示
|

P 表示为生产者、 X 表示交换机、C1与C2 表示为消费者,红色表示队列。

通常情况下,一个条消息只要被消费一次就行了,那么什么情况下需要所有的消费者都对这条消息进行消费呢?最典型的情况就是需要在内存中对数据进行缓存,并需要实时进行更新。


例如,笔者做过一个违禁词系统,对用户输入的评论内容进行违禁词汇检测。这个违禁词系统,部署了在N台服务器上,为了提升检测性能,每台机器都会将违禁词库全量加载到内存中,词库的更新,是通过发送MQ消息来完成的。由于采用Pub/Sub模型,每台机器的consumer,都可以接收到这条消息,直接在内存中更新敏感词库即可。

我们现在希望将消息发布到我们的日志交换而不是无名的交换(emit_log.php)


二、使用Laravel的command来实现消息的生产和消费


发布者我们现在希望将消息发布到我们的日志交换而不是无名的交换 RabbitmqProducerCommand下的publish方法


<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
//引入amqp扩展
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqProducerCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbitmq_producer';//给生产者起个command名称
    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';
    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }
    /**
     * Execute the console command.
     *  生产者消息代码
     * @return int
     */
    public function handle()
    {
        $this->publish();
    }
    /**
     * 发布订阅模式的发布操作
     * publish-and- subscribe, 即发布订阅模型。在Pub/Sub模型中,生产者将消息发布到一个主题(Topic)中,订阅了该Topic的所有下游消费者,都可以接收到这条消息。
     * Author: 李硕
     * Date: 2022/5/7
     * Time: 11:03
     */
    public function publish()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
        //发布模式(fanout)
        $channel->exchange_declare('logs', 'fanout', false, false, false);
        $argv = [];
        $data = implode(' ', array_slice($argv, 1));
        if (empty($data)) {
            $data = "info: 我是发布者生产的消息消息!";
        }
        $msg = new AMQPMessage($data);
        $channel->basic_publish($msg, 'logs');
        echo ' [x] Sent ', $data, "\n";
        $channel->close();
        $connection->close();
    }
}
复制代码


订阅者RabbitmqProducerCommand下的subscribe方法


<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqConsumerCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbitmq_consumer';//给消费者起个command名称
    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';
    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }
    /**
     * Execute the console command.
     * @return int
     */
    public function handle()
    {
        $this->subscribe();
    }
    /**
     * 发布订阅模式 -订阅
     * publish-and- subscribe, 即发布订阅模型。在Pub/Sub模型中,生产者将消息发布到一个主题(Topic)中,订阅了该Topic的所有下游消费者,都可以接收到这条消息。
     * Author: 李硕
     * Date: 2022/5/7
     * Time: 11:03
     */
    public function subscribe()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
        $channel->exchange_declare('logs', 'fanout', false, false, false);
        list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
        $channel->queue_bind($queue_name, 'logs');
        echo " [*] Waiting for logs. To exit press CTRL+C\n";
        $callback = function ($msg) {
            echo ' 我是订阅者开始进行消费[x] ', $msg->body, "\n";
        };
        $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
        while ($channel->is_open()) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}
复制代码


三、运行截图


执行生产消息 php artisan rabbitmq_producer

执行消费消息 hp artisan rabbitmq_consumer


网络异常,图片无法展示
|


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
16天前
|
SQL 安全 PHP
PHP 自发布以来一直在 Web 开发领域占据重要地位,PHP 8 更是带来了属性、刚性类型等新特性。
【10月更文挑战第1天】PHP 自问世以来,凭借其易用性和灵活性,在 Web 开发领域迅速崛起。从简单的网页脚本语言逐步演进为支持面向对象编程的现代语言,尤其自 PHP 5.3 引入命名空间后,代码组织和维护变得更加高效。PHP 7 的性能优化和 PHP 8 的新特性(如属性和刚性类型)进一步巩固了其地位。框架如 Laravel、Symfony、Yii2 和 CodeIgniter 等简化了开发流程,提高了效率和安全性。
35 2
|
1月前
|
设计模式 SQL 安全
PHP中的设计模式:单例模式的深入探索与实践在PHP的编程实践中,设计模式是解决常见软件设计问题的最佳实践。单例模式作为设计模式中的一种,确保一个类只有一个实例,并提供全局访问点,广泛应用于配置管理、日志记录和测试框架等场景。本文将深入探讨单例模式的原理、实现方式及其在PHP中的应用,帮助开发者更好地理解和运用这一设计模式。
在PHP开发中,单例模式通过确保类仅有一个实例并提供一个全局访问点,有效管理和访问共享资源。本文详细介绍了单例模式的概念、PHP实现方式及应用场景,并通过具体代码示例展示如何在PHP中实现单例模式以及如何在实际项目中正确使用它来优化代码结构和性能。
35 2
|
2月前
|
安全 前端开发 PHP
构建与验证表单:传统PHP与Laravel框架的比较分析——探索Web开发中表单处理的优化策略和最佳实践
【8月更文挑战第31天】在 Web 开发中,表单构建与数据验证至关重要。传统 PHP 方法需手动处理 HTML 表单和数据验证,而 Laravel 框架则提供了一种更现代、高效的解决方案。本文通过对比传统 PHP 和 Laravel 的方法,探讨表单构建与验证的最佳实践。Laravel 通过简洁的语法糖、内置的数据过滤和验证机制,显著提升了代码的安全性和可维护性,适用于大型项目或需要快速开发的场景。然而,在追求灵活性的小型项目中,直接使用 PHP 仍是不错的选择。了解两者的优劣,有助于开发者根据项目需求做出最佳决策。
32 0
|
1月前
|
消息中间件 NoSQL Go
PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技术实践
【9月更文挑战第7天】在从 PHP 的 ThinkPHP 框架迁移到 Go 的 Gin 框架时,涉及 Redis 延时消息队列的技术实践主要包括:理解延时消息队列概念,其能在特定时间处理消息,适用于定时任务等场景;在 ThinkPHP 中使用 Redis 实现延时队列;在 Gin 中结合 Go 的 Redis 客户端库实现类似功能;Go 具有更高性能和简洁性,适合处理大量消息。迁移过程中需考虑业务需求及系统稳定性。
|
2月前
|
缓存 中间件 PHP
Laravel 框架:优雅 PHP Web 开发的典范
【8月更文挑战第31天】
77 0
|
2月前
|
安全 Java 云计算
JSF 应用究竟何去何从?云端部署能否成为其全新突破点?快来一探究竟!
【8月更文挑战第31天】本文介绍了将JavaServer Faces(JSF)应用部署到云平台的过程。首先,根据成本、功能、可靠性和安全性选择合适的云平台。接着,展示了构建简单JSF应用的示例代码。最后,以AWS Elastic Beanstalk为例,详细说明了部署流程。部署至云端可提升应用的可用性、扩展性和安全性。
40 0
|
2月前
|
JavaScript PHP 开发者
PHP中的异常处理与自定义错误处理器构建高效Web应用:Node.js与Express框架实战指南
【8月更文挑战第27天】在PHP编程世界中,异常处理和错误管理是代码健壮性的关键。本文将深入探讨PHP的异常处理机制,并指导你如何创建自定义错误处理器,以便优雅地管理运行时错误。我们将一起学习如何使用try-catch块捕获异常,以及如何通过set_error_handler函数定制错误响应。准备好让你的代码变得更加可靠,同时提供更友好的错误信息给最终用户。
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
213 0
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
118 0
|
消息中间件 Ubuntu Shell
ubuntu安装rabbitmq教程 避坑
ubuntu安装rabbitmq教程 避坑
460 0