PhalApi+Gearman,接口MQ异步队列任务的完整开发教程

简介: PhalApi+Gearman,接口MQ异步队列任务的完整开发教程

MQ异步队列

在API接口同步请求过程中,不适合处理耗时过长、或者一直轮询的工作。此时,可以结合MQ异步队列任务进行后台处理。

MQ异步队列服务 - Gearman

关于异步队列服务有很多种,这里PhalApi选择使用了Gearman,它的特点是:开源、使用简单、支持多客户端开发语言。

  • Gearmana官网:gearman.org/
  • Gearman下载:gearman.org/download/
  • 安装PHP Gearman扩展:gearman.org/download/#p…

安装和启动Gearman服务

例如,在RHEL/Fedora/Linux/CentOS系统,可以执行:

yum install gearmand
复制代码

如果是Debain/Ubuntu,可以执行:

apt-get install gearman-job-server
复制代码

源代码编码安装的方式:

tar xzf gearmand-X.Y.tar.gz
cd gearmand-X.Y
./configure
make
make install
复制代码

其他操作系统,可以参考Getting Started with Gearman 进行安装。

在服务端本地安装好Gearman服务后,启动Gearman服务命令:

$ gearmand -d
复制代码

再检查一下是否正常运行:

$ ps -ef | grep gearman
gearmand  1149     1  0 Jan06 ?        00:11:56 /usr/sbin/gearmand -d
复制代码

MQ配置

修改 ./config/sys.php 配置文件,填写对应的Gearman服务IP和端口。默认端口是

// MQ服务 - Gearman
    'gearman' => array(
        'ip' => '127.0.0.1',   //消息服务器
        'port' => 4730         //消息服务器端口
    ),
复制代码

接着,在PhalApi框架中,注册Gearman客户端的DI服务,修改 ./config/di.php 文件,取消PHP代码注销:

// gearman异步队列
// 如何使用?请参考本地文档:/wiki/#/2x-mq
$di->gearmanClient = new \Base\Common\GearmanClient();
复制代码

MQ脚本测试

默认情况下,可以通过以下两个脚本,先进行快速测试。

进入PhalApi根目录,先手动启动服务端脚本:

$ php ./bin/mq/phalapi_pro_gearman_mq_server.php
Starting
Waiting for job...
复制代码

可以看到,服务启动正常,正在等待接收新数据。

接着,启动客户端测试脚本:

$ php ./bin/mq/phalapi_pro_gearman_mq_example.php
复制代码

这时候,你会看到在前面的服务端脚本会有显示和反应:

$ php ./bin/mq/phalapi_pro_gearman_mq_server.php
Starting
Waiting for job...
Domain领域层,已接收到新数据:app_key = APP_KEY,data = {"title":"test\u6d4b\u8bd5\u6570\u636e"}
复制代码

运行正常后,可以继续配置以下内容。

后台启动脚本和守护进程

手动单次测试完成后,就可以在服务器正式启动MQ服务。

进入根目录,启动MQ服务脚本:

$ chmod +x ./bin/mq/phalapi_pro_gearman_mq_server.sh
$ ./bin/mq/phalapi_pro_gearman_mq_server.sh
ok
$ ps -ef | grep mq_server
apps     22537     1  0 15:55 pts/4    00:00:00 php /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.php
apps     22538     1  0 15:55 pts/4    00:00:00 php /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.php
apps     22539     1  0 15:55 pts/4    00:00:00 php /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.php
apps     22550 16696  0 15:55 pts/4    00:00:00 grep --color=auto mq_server
复制代码

从上往下,分别是添加执行权限、启动脚本和查看是否正常后台运行。可以看到,默认情况下,运行了3个消耗进程。

在随后的系统升级和发布时,也可以手动执行以下命令,对MQ服务进行重启:

$ chmod +x ./bin/mq/phalapi_pro_gearman_mq_server_restart.s
$ ./bin/mq/phalapi_pro_gearman_mq_server_restart.sh
restart ...
ok
$ ps -ef | grep mq_server
apps     22963     1  0 15:57 pts/4    00:00:00 php /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.php
apps     22964     1  0 15:57 pts/4    00:00:00 php /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.php
apps     22965     1  0 15:57 pts/4    00:00:00 php /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.php
apps     22976 16696  0 15:57 pts/4    00:00:00 grep --color=auto mq_server
复制代码

以上命令,依次是:添加执行权限、重启、确保重启成功。可以看到进程ID是刷新的了。

随后,为了保障MQ异步退出能继续恢复,可以在crontab添加每一分钟的守护进程。

$ crontab -e
# phalapi_pro_gearman_mq_server守护进程
*/1 * * * * /path/to/phalapi-pro/bin/mq/phalapi_pro_gearman_mq_server.sh
复制代码

如何开发?

当你需要添加一个新的主题时,可以先修改
./bin/mq/phalapi_pro_gearman_mq_server.php,复制并修改以下代码片段,添加新的主题,例如:

// 示例:接口请求后的更多异步处理
$gmworker->addFunction("api_request", "gearman_client_api_request");
// 示例:接口请求后的更多异步处理
function gearman_client_api_request($job)
{
    // 获取提交的日志数据
    $workload= $job->workload();
    $workload = json_decode($workload, true);
    $appKey = $workload['appKey'];
    $data = $workload['data'];
    \PhalApi\DI()->logger->debug('已接收到新的MQ队列数据#' . $finishTimes , ['app_key' => $workload['appKey'], 'data'=>$data]);
    if (empty($appKey) || empty($data)) {
        echo "empty app_app or data ...\n";
        return false;
    }
    try {
        // TODO:开始你的业务处理(特别是耗时的操作)……
        $domain = new \Base\Domain\MQ();
        $domain->afterApiRequestMQ($appKey, $data);
    } catch (Exception $ex) {
        echo "Failed to deal with data: ".$ex->getMessage(), json_encode($data), PHP_EOL;
        throw $ex;
    }
    // Return what we want to send back to the client.
    return true;
}
复制代码

接着,在前台或客户端或API接口请求时,将队列数据压入,例如:

$gearmanCient = \PhalApi\DI()->gearmanClient;
$theme = 'api_request'; // 主题。必须对应
$appKey = 'APP_KEY';
$data = array('title' => 'test测试数据');
$gearmanCient->pushMQ($theme, $appKey, $data);
复制代码

当你熟悉时,用一行代码即可:

\PhalApi\DI()->gearmanClient->pushMQ('api_request', 'APP_KEY', array('title' => 'test测试数据'));
复制代码

最后,就是根据需要,在Domain层编写你的业务逻辑。参考 ./src/base/Domain/MQ.php 文件:

<?php
namespace Base\Domain;
/**
 * MQ异步队列的领域层处理
 * @author dogstar 20221019
 */
class MQ {
  // 示例:接口后的更多处理逻辑
  public function afterApiRequestMQ($appKey, $data) {
    echo 'Domain领域层,已接收到新数据:app_key = ' . $appKey . ',data = ' . json_encode($data) . PHP_EOL;
    // 继续你的业务处理……
  }
}
复制代码

至此,快速开发和使用已介绍完毕。

如何查看MQ日记?

为了和API接口日记分开,MQ脚本本身的日志存放在目录:
runtime/mq/phalapi_pro_gearman_mq_server.log;可以自己定时清理。

如果是PHP业务层的日记,则存放在runtime/mq/目录下,例如:

$ less ./runtime/mq/log/202210/20221019.log 
2022-10-19 15:26:29|DEBUG|已接收到新的MQ队列数据|{"app_key":"APP_KEY","data":{"title":"test测试数据"}}
2022-10-19 15:27:07|DEBUG|已接收到新的MQ队列数据|{"app_key":"APP_KEY","data":{"title":"test测试数据"}}
2022-10-19 15:29:42|DEBUG|已接收到新的MQ队列数据|{"app_key":"APP_KEY","data":{"title":"test测试数据"}}
复制代码

相关代码文件位置

本次涉及的文件和配置有:

# 配置
./config/sys.php
# PHP代码
./src/base/Domain/MQ.php
# 脚本
$ tree ./bin/mq 
./bin/mq
├── phalapi_pro_gearman_mq_example.php
├── phalapi_pro_gearman_mq_server.php
├── phalapi_pro_gearman_mq_server.sh
└── phalapi_pro_gearman_mq_server_restart.sh
复制代码

其他使用,可参考PhalApi开源接口框架和Gearman官方文档,进行完整的开发。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 Java
SpringBoot RabbitMQ死信队列
SpringBoot RabbitMQ死信队列
66 0
|
3月前
|
消息中间件
|
5月前
|
消息中间件 存储 负载均衡
Rabbitmq direct模式保证一个队列只对应一个消费者
Rabbitmq direct模式保证一个队列只对应一个消费者
112 0
|
3月前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
【1月更文挑战第12天】用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等
230 1
|
2月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
63 0
|
1月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
42 1
|
2月前
|
消息中间件 监控 数据挖掘
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
48 0
|
2月前
|
消息中间件 Docker 容器
docker构建rabbitmq并配置延迟队列插件
docker构建rabbitmq并配置延迟队列插件
35 0
|
2月前
|
消息中间件
rabbitmq动态创建队列
rabbitmq动态创建队列
36 0
|
3月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
343 1

热门文章

最新文章