PhalApi+Gearman,接口MQ异步队列任务的完整开发教程

简介: PhalApi+Gearman,接口MQ异步队列任务的完整开发教程

MQ异步队列

在API接口同步请求过程中,不适合处理耗时过长、或者一直轮询的工作。此时,可以结合MQ异步队列任务进行后台处理。

MQ异步队列服务 - Gearman

关于异步队列服务有很多种,这里PhalApi选择使用了Gearman,它的特点是:开源、使用简单、支持多客户端开发语言。

  • Gearmana官网:gearman.org/
  • Gearman下载:gearman.org/download/
  • 安装PHP Gearman扩展:gearman.org/download/#p…

安装和启动Gearman服务

例如,在RHEL/Fedora/Linux/CentOS系统,可以执行:

yum install gearmand
复制代码

如果是Debain/Ubuntu,可以执行:

apt-get install gearman-job-server
复制代码

源代码编码安装的方式:

tar xzf gearmand-X.Y.tar.gz
cd gearmand-X.Y
./configure
make
make install
复制代码

其他操作系统,可以参考Getting Started with Gearman 进行安装。

在服务端本地安装好Gearman服务后,启动Gearman服务命令:

$ gearmand -d
复制代码

再检查一下是否正常运行:

$ ps -ef | grep gearman
gearmand  1149     1  0 Jan06 ?        00:11:56 /usr/sbin/gearmand -d
复制代码

MQ配置

修改 ./config/sys.php 配置文件,填写对应的Gearman服务IP和端口。默认端口是

// MQ服务 - Gearman
    'gearman' => array(
        'ip' => '127.0.0.1',   //消息服务器
        'port' => 4730         //消息服务器端口
    ),
复制代码

接着,在PhalApi框架中,注册Gearman客户端的DI服务,修改 ./config/di.php 文件,取消PHP代码注销:

// gearman异步队列
// 如何使用?请参考本地文档:/wiki/#/2x-mq
$di->gearmanClient = new \Base\Common\GearmanClient();
复制代码

MQ脚本测试

默认情况下,可以通过以下两个脚本,先进行快速测试。

进入PhalApi根目录,先手动启动服务端脚本:

$ php ./bin/mq/phalapi_pro_gearman_mq_server.php
Starting
Waiting for job...
复制代码

可以看到,服务启动正常,正在等待接收新数据。

接着,启动客户端测试脚本:

$ php ./bin/mq/phalapi_pro_gearman_mq_example.php
复制代码

这时候,你会看到在前面的服务端脚本会有显示和反应:

$ php ./bin/mq/phalapi_pro_gearman_mq_server.php
Starting
Waiting for job...
Domain领域层,已接收到新数据:app_key = APP_KEY,data = {"title":"test\u6d4b\u8bd5\u6570\u636e"}
复制代码

运行正常后,可以继续配置以下内容。

后台启动脚本和守护进程

手动单次测试完成后,就可以在服务器正式启动MQ服务。

进入根目录,启动MQ服务脚本:

$ chmod +x ./bin/mq/phalapi_pro_gearman_mq_server.sh
$ ./bin/mq/phalapi_pro_gearman_mq_server.sh
ok
$ ps -ef | grep mq_server
apps     22537     1  0 15:55 pts/4    00:00:00 php /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.php
apps     22538     1  0 15:55 pts/4    00:00:00 php /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.php
apps     22539     1  0 15:55 pts/4    00:00:00 php /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.php
apps     22550 16696  0 15:55 pts/4    00:00:00 grep --color=auto mq_server
复制代码

从上往下,分别是添加执行权限、启动脚本和查看是否正常后台运行。可以看到,默认情况下,运行了3个消耗进程。

在随后的系统升级和发布时,也可以手动执行以下命令,对MQ服务进行重启:

$ chmod +x ./bin/mq/phalapi_pro_gearman_mq_server_restart.s
$ ./bin/mq/phalapi_pro_gearman_mq_server_restart.sh
restart ...
ok
$ ps -ef | grep mq_server
apps     22963     1  0 15:57 pts/4    00:00:00 php /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.php
apps     22964     1  0 15:57 pts/4    00:00:00 php /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.php
apps     22965     1  0 15:57 pts/4    00:00:00 php /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.php
apps     22976 16696  0 15:57 pts/4    00:00:00 grep --color=auto mq_server
复制代码

以上命令,依次是:添加执行权限、重启、确保重启成功。可以看到进程ID是刷新的了。

随后,为了保障MQ异步退出能继续恢复,可以在crontab添加每一分钟的守护进程。

$ crontab -e
# phalapi_pro_gearman_mq_server守护进程
*/1 * * * * /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.sh
复制代码

如何开发?

当你需要添加一个新的主题时,可以先修改
./bin/mq/phalapi_pro_gearman_mq_server.php,复制并修改以下代码片段,添加新的主题,例如:

// 示例:接口请求后的更多异步处理
$gmworker->addFunction("api_request", "gearman_client_api_request");
// 示例:接口请求后的更多异步处理
function gearman_client_api_request($job)
{
    // 获取提交的日志数据
    $workload= $job->workload();
    $workload = json_decode($workload, true);
    $appKey = $workload['appKey'];
    $data = $workload['data'];
    \PhalApi\DI()->logger->debug('已接收到新的MQ队列数据#' . $finishTimes , ['app_key' => $workload['appKey'], 'data'=>$data]);
    if (empty($appKey) || empty($data)) {
        echo "empty app_app or data ...\n";
        return false;
    }
    try {
        // TODO:开始你的业务处理(特别是耗时的操作)……
        $domain = new \Base\Domain\MQ();
        $domain->afterApiRequestMQ($appKey, $data);
    } catch (Exception $ex) {
        echo "Failed to deal with data: ".$ex->getMessage(), json_encode($data), PHP_EOL;
        throw $ex;
    }
    // Return what we want to send back to the client.
    return true;
}
复制代码

接着,在前台或客户端或API接口请求时,将队列数据压入,例如:

$gearmanCient = \PhalApi\DI()->gearmanClient;
$theme = 'api_request'; // 主题。必须对应
$appKey = 'APP_KEY';
$data = array('title' => 'test测试数据');
$gearmanCient->pushMQ($theme, $appKey, $data);
复制代码

当你熟悉时,用一行代码即可:

\PhalApi\DI()->gearmanClient->pushMQ('api_request', 'APP_KEY', array('title' => 'test测试数据'));
复制代码

最后,就是根据需要,在Domain层编写你的业务逻辑。参考 ./src/base/Domain/MQ.php 文件:

<?php
namespace Base\Domain;
/**
 * MQ异步队列的领域层处理
 * @author dogstar 20221019
 */
class MQ {
  // 示例:接口后的更多处理逻辑
  public function afterApiRequestMQ($appKey, $data) {
    echo 'Domain领域层,已接收到新数据:app_key = ' . $appKey . ',data = ' . json_encode($data) . PHP_EOL;
    // 继续你的业务处理……
  }
}
复制代码

至此,快速开发和使用已介绍完毕。

如何查看MQ日记?

为了和API接口日记分开,MQ脚本本身的日志存放在目录:
runtime/mq/phalapi_pro_gearman_mq_server.log;可以自己定时清理。

如果是PHP业务层的日记,则存放在runtime/mq/目录下,例如:

$ less ./runtime/mq/log/202210/20221019.log 
2022-10-19 15:26:29|DEBUG|已接收到新的MQ队列数据|{"app_key":"APP_KEY","data":{"title":"test测试数据"}}
2022-10-19 15:27:07|DEBUG|已接收到新的MQ队列数据|{"app_key":"APP_KEY","data":{"title":"test测试数据"}}
2022-10-19 15:29:42|DEBUG|已接收到新的MQ队列数据|{"app_key":"APP_KEY","data":{"title":"test测试数据"}}
复制代码

相关代码文件位置

本次涉及的文件和配置有:

# 配置
./config/sys.php
# PHP代码
./src/base/Domain/MQ.php
# 脚本
$ tree ./bin/mq 
./bin/mq
├── phalapi_pro_gearman_mq_example.php
├── phalapi_pro_gearman_mq_server.php
├── phalapi_pro_gearman_mq_server.sh
└── phalapi_pro_gearman_mq_server_restart.sh
复制代码

其他使用,可参考PhalApi开源接口框架和Gearman官方文档,进行完整的开发。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
54 1
|
1月前
|
消息中间件 监控 Java
RocketMQ 同步发送、异步发送和单向发送,如何选择?
本文详细分析了 RocketMQ 中同步发送、异步发送和单向发送三种消息发送方式的原理、优缺点及适用场景。同步发送可靠性高但延迟较大,适合订单系统等场景;异步发送非阻塞且延迟低,适用于实时数据处理等场景;单向发送高效但可靠性低,适用于日志收集等场景。文章还提供了示例代码和核心源码分析,帮助读者更好地理解每种发送方式的特点。
219 4
|
5月前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
135 6
|
2月前
|
消息中间件 Kafka 数据安全/隐私保护
RabbitMQ异步通信详解
RabbitMQ异步通信详解
93 16
|
2月前
|
消息中间件 JSON Java
|
2月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
2月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
87 0
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
93 2
|
4月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决