ThinkPHP5-消息队列

简介: 在这个例子当中,我们是手动指定的 $jobHandlerClassName ,更合理的做法是先定义好消息名称与消费者类名的映射关系,然后由某个可以获取该映射关系的类来推送这个消息。这样,生产者只需要知道消息的名称,而无需指定哪个消费者类来处理。

首先我们看一下自己的TP5的框架中的 TP5\vendor\topthink ,这个文件中有没有think-queue这个文件夹,如果没有可以通过Composer安装,如果没有安装composer,请安装Composer

$ curl -sS https://getcomposer.org/installer | php
$ mv composer.phar /usr/local/bin/composer

Linux上安装 think-queue ,请先进入到框架的根目录再运行

composer require topthink/think-queue

这个时候再去看就会有 think-queue 这个文件夹了,确定一下看看是否安装成功,运行

php think queue:work -h

能出现以下 结果 就表示think-queue的 安装好了
在这里插入图片描述

配置

配置文件位于 application/extra/queue.php

公共配置

<?php
return [
       'connector'  => 'Redis',            // Redis 驱动
       'expire'     => 60,                // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 
       'default'    => 'default',        // 默认的队列名称
       'host'       => '127.0.0.1',        // redis 主机ip
       'port'       => 6379,            // redis 端口
       'password'   => '',                // redis 密码
       'select'     => 0,                // 使用哪一个 db,默认为 db0
       'timeout'    => 0,                // redis连接的超时时间
       'persistent' => false,            // 是否是长连接
     
   //    'connector' => 'Database',   // 数据库驱动
   //    'expire'    => 60,           // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
   //    'default'   => 'default',    // 默认的队列名称
   //    'table'     => 'jobs',       // 存储消息的表名,不带前缀
   //    'dsn'       => [],
 
   //    'connector'   => 'Topthink',    // ThinkPHP内部的队列通知服务平台 ,本文不作介绍
   //    'token'       => '',
   //    'project_id'  => '',
   //    'protocol'    => 'https',
   //    'host'        => 'qns.topthink.com',
   //    'port'        => 443,
   //    'api_version' => 1,
   //    'max_retries' => 3,
   //    'default'     => 'default',
 
   //    'connector'   => 'Sync',        // Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行
   ];
 

消息的创建与推送

我们在业务控制器中创建一个新的消息,并推送到 helloJobQueue 队列

新增 \application\index\controller\JobTest.php 控制器,在该控制器中添加 actionWithHelloJob 方法

<?php
/**
* 文件路径: \application\index\controller\JobTest.php
* 该控制器的业务代码中借助了thinkphp-queue 库,将一个消息推送到消息队列
*/
namespace app\index\controller;
  use think\Exception;
 
  use think\Queue;
 
  class JobTest {
  /**
   * 一个使用了队列的 action
   */
  public function actionWithHelloJob(){
      
      // 1.当前任务将由哪个类来负责处理。 
      //   当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
      $jobHandlerClassName  = 'application\index\job\Hello'; 
      // 2.当前任务归属的队列名称,如果为新队列,会自动创建
      $jobQueueName        = "helloJobQueue"; 
      // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
      //   ( jobData 为对象时,需要在先在此处手动序列化,否则只存储其public属性的键值对)
      $jobData             = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ;
      // 4.将该任务推送到消息队列,等待对应的消费者去执行
      $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );    
      // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
      if( $isPushed !== false ){  
          echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
      }else{
          echo 'Oops, something went wrong.';
      }
  }
 }

注意: 在这个例子当中,我们是手动指定的 $jobHandlerClassName ,更合理的做法是先定义好消息名称与消费者类名的映射关系,然后由某个可以获取该映射关系的类来推送这个消息。这样,生产者只需要知道消息的名称,而无需指定哪个消费者类来处理。

除了 Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );这种方式之外,还可以直接传入 Queue::push( $jobHandlerObject ,null , $jobQueueName ); 这时,需要在 $jobHandlerObject 中定义一个 handle() 方法,消息队列在执行到该任务时会自动反序列化该对象,并调用其 handle()方法。 该方式的缺点是无法传入自定义数据。

消息的消费与删除

编写 Hello 消费者类,用于处理 helloJobQueue 队列中的任务

新增 \application\index\job\Hello.php 消费者类,并编写其 fire() 方法

<?php
  /**
   * 文件路径: \application\index\job\Hello.php
   * 这是一个消费者类,用于处理 helloJobQueue 队列中的任务
   */
  namespace app\index\job;
 
  use think\queue\Job;
 
  class Hello {
      
      /**
       * fire方法是消息队列默认调用的方法
       * @param Job            $job      当前的任务对象
       * @param array|mixed    $data     发布任务时自定义的数据
       */
      public function fire(Job $job,$data){
          // 如有必要,可以根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行.
          $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
          if(!isJobStillNeedToBeDone){
              $job->delete();
              return;
          }
        
          $isJobDone = $this->doHelloJob($data);
        
          if ($isJobDone) {
              //如果任务执行成功, 记得删除任务
              $job->delete();
              print("<info>Hello Job has been done and deleted"."</info>\n");
          }else{
              if ($job->attempts() > 3) {
                  //通过这个方法可以检查这个任务已经重试了几次了
                  print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
                  $job->delete();
                  // 也可以重新发布这个任务
                  //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
                  //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
              }
          }
      }
 
       /**
       * 有些消息在到达消费者时,可能已经不再需要执行了
       * @param array|mixed    $data     发布任务时自定义的数据
       * @return boolean                 任务执行的结果
       */
      private function checkDatabaseToSeeIfJobNeedToBeDone($data){
          return true;
      }
 
      /**
       * 根据消息中的数据进行实际的业务处理
       * @param array|mixed    $data     发布任务时自定义的数据
       * @return boolean                 任务执行的结果
       */
      private function doHelloJob($data) {
          // 根据消息中的数据进行实际的业务处理...
        
          print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");
          print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");
          print("<info>Hello Job is Done!"."</info> \n");
          
          return true;
      }
  }

至此,所有的代码都已准备完毕,在运行消息队列之前,我们先看一下现在的目录结构:
在这里插入图片描述

发布任务
在浏览器中访问 http://your.project.domain/index/job_test/actionWithHelloJob ,可以看到消息推送成功。

这个时候去Linux中链接redis,查看redis的队列任务,就可以看到有数据在里面

连接redis:/usr/local/redis/bin/redis-cli -h 12.131.12.12 -p 6379

redis对列的命令,显示helloJobQueue对列中的数据:LRANGE queues:helloJobQueue 0 -1

显示对列中有几条数据:Llen queues:helloJobQueue
在这里插入图片描述

看到这样就说明已经加入到队列了,截图是加入了好多条

处理任务

切换当前终端窗口的目录到项目根目录下,执行

php think queue:work --queue helloJobQueue

可以看到执行的结果类似如下:
在这里插入图片描述

​由于php think queue:work --queue helloJobQueue这个命令只能在TP5框架的根目录才能运行成功,所以,shell脚本要先cd到框架的根目录,具体见下面的shell脚本截图

至此,我们成功地经历了一个消息的 创建 -> 推送 -> 消费 -> 删除 的基本流程

但是!!! 但是!!! 但是!!!,重要的事说3遍

虽然现在是可以将消息的创建 -> 推送 -> 消费 -> 删除 的基本流程全部跑通,但是每次执行 php think queue:work --queue helloJobQueue 这个命令都是进行了一次,也就是说,在对列 helloJobQueue 中有5条要处理的数据,每次执行 php think queue:work --queue helloJobQueue 都是只执行了一条数据,还有4条数据没有处理,我们要的是执行一次可以直接将对列中的数据全部处理掉,于是,我们想到定时任务去处理

首先我们写两个shell脚本

1.monitorHandleQueue.sh,作用是检查队列的进程是否在运行

pid=$(ps -ef| grep handleQueue |grep -v grep | awk ' NR==1 {print $2}')
 
if [ -z $pid ]
 then
   sh /home/wwwroot/default/www/thinkphp5/handleQueue.sh &>/dev/null 2>&1
fi

mysqld是进程名称

检查进程是否存在,如果不存在启动handleQueue.sh脚本,注意:monitorHandleQueue.sh脚本中的启动handleQueue.sh的路径写自己的,NR==1表示只取第一个进程,|grep -v grep 过滤掉自己的进程

2.handleQueue.sh 脚本

cd /home/wwwroot/default/www/thinkphp5
while [ 2 > 0 ]
 do
  len=`/usr/local/redis/bin/redis-cli -h 1.1.1.1 -p 6379 Llen queues:helloJobQueue`
  if [ $((len + 0 )) -gt 0 ];then
        php think queue:work --queue helloJobQueue
  else
        sleep 3
        php think queue:work --queue helloJobQueue
  fi
done

此脚本中的cd /home/wwwroot/default/www/thinkphp5,一定要切换到框架的根目录,解释一下handleQueue.sh脚本的逻辑:先切换到框架的根目录,while判断2大于0为真,所以会一直执行,连接到redis,获取队列的长度,if判断,如果队列的长度大于0直接执行队列,否则就停3秒再执行队列,很简单,写了很长时间,还有一点要注意,shell脚本最好不要在编辑器编辑,直接在Linux上编辑,因为如果在编辑器上编辑上传到Linux上会产生意想不到的问题(我在这里耽误了很长时间),找不到问题所在就直接在Linux上编写好了,省的麻烦

上面的shell脚本是我第一次写,碰到了很多问题

  1. [ 2 > 0 ]格式为[空格判断表达式空格]
  2. len=/usr/local/redis/bin/redis-cli -h 47.101.54.26 -p 6379 Llen queues:helloJobQueue,等于号的左右两边不能有空格,反引号 `` 不知道怎么输出,在1是我左边的那个键
  3. if 的格式,if [ $((len + 0 )) -gt 0 ];then 变量大于0,大于号用 -gt 表示

创建定时任务

*/1 * * * *  /home/wwwroot/default/www/thinkphp5/monitorHandleQueue.sh &>/dev/null 2>&1

当然也可以不写shell脚本

启动队列监听服务
Work 模式

php think queue:work \
--daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
--queue  helloJobQueue  //要处理的队列的名称
--delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明
--memory 128 \      //该进程允许使用的内存上限,以 M 为单位
--sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
--tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0

Listen 模式

php think queue:listen \
--queue  helloJobQueue \   //监听的队列的名称
--delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--memory 128 \       //该进程允许使用的内存上限,以 M 为单位
--sleep  3 \         //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
--tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
--timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位

可以看到 listen 模式下,不包含 --deamon 参数

done! 可以交差啦!

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
目录
相关文章
|
10月前
|
搜索推荐 5G 网络性能优化
|
JSON 数据可视化 前端开发
可视化表单&试卷搭建平台技术详解
可视化表单&试卷搭建平台技术详解
227 0
|
2月前
|
运维 安全 算法
服务器 CPU 占用忽高忽低?排查这 6 个隐藏进程,90% 的异常都能解决
服务器运维中,CPU占用忽高忽低常由隐藏进程引发,影响服务稳定性。本文介绍六大需排查的隐藏进程:异常编译、挖矿程序、内存泄漏、网络请求异常、日志轮转问题及恶意软件。通过排查工具如top、ps、netstat等定位问题进程,并提供针对性解决方法,帮助开发者快速稳定服务器性能。
483 0
|
1月前
|
存储 弹性计算 安全
阿里云服务器2核8G与4核16G配置选购参考:价格、性能及适用场景
2核8G、4核16G配置是很多个人和企业用户搭建网站和中小型数据库系统等场景时首选的云服务器配置,现在购买2核8G配置最低年付价格只要652.32元/1年,4核16G配置短期租用可以选择月付,现在89元即可购买到经济型e实例4核16G10M带宽配置1个月,年付选择通用算力型u1实例价格为1196.64元/1年起。本文为大家解析阿里云服务器2核8G与4核16G配置的租用费用,热门实例适用场景,以供参考和选择。
|
1月前
|
弹性计算 运维 负载均衡
阿里云轻量应用服务器产品介绍、收费标准以及搭建个人博客教程参考
本文为大家介绍阿里云轻量应用服务器的产品优势、应用场景、使用须知、地域与网络连通性、与云服务器ECS的区别以及使用轻量应用服务器搭建WordPress个人博客的图文教程,以供大家了解和使用轻量应用服务器。
|
3月前
|
人工智能 Cloud Native 数据管理
邀您参加 KubeCon China 2025 分论坛 | 阿里云 AI 基础设施技术沙龙
KubeCon + CloudNativeCon China 2025 将于6月10-11日在香港合和酒店举办,由CNCF与Linux基金会联合主办。阿里云开发者将在大会上分享多个技术议题,涵盖AI模型分发、Argo工作流、Fluid数据管理等领域。大会前还有阿里云AI基础设施技术沙龙,聚焦AI基础设施及云原生技术实战经验。欢迎扫码报名参与!
331 65
|
4月前
|
数据采集 Web App开发 监控
如何用Pyppeteer打造高并发无头浏览器采集方案
本文从电商行业数据采集痛点出发,结合 Pyppeteer 高并发无头浏览器技术,打造可配置代理的高效采集方案。通过爬虫代理突破 IP 限制,模拟真实用户行为,实现 Amazon 特价商品数据的稳定抓取与分析。代码示例详细展示了代理集成、并发控制及数据处理流程,实验验证效率提升超 4 倍。该方案助力商业决策、竞品分析,并支持技术扩展与创新应用。
166 13
如何用Pyppeteer打造高并发无头浏览器采集方案
|
8月前
|
API 开发者 Python
Pygame Zero(pgzrun)详解(简介、使用方法、坐标系、目录结构、语法参数、安装、实例解释)
Pygame Zero(pgzrun)详解(简介、使用方法、坐标系、目录结构、语法参数、安装、实例解释)
1102 17
|
人工智能 机器人 API
一键打造你的专属钉钉AI助手
【8月更文挑战第7天】一键打造你的专属钉钉AI助手
697 15
一键打造你的专属钉钉AI助手