PHP使用topthink/think-queue消息队列实例

简介: PHP使用topthink/think-queue消息队列实例

常住队列消费命令

sudo nohup php7.2 think queue:work --daemon --queue createAdminLogQueue --tries 2 > out.file 2>&1 &

sudo php7.2 think queue:listen --queue createAdminLogQueue

单次队列消费命令

sudo php7.2 think queue:work --daemon --queue createAdminLogQueue

队列添加php代码快

// 当前队列归属的队列名称
        $jobHandlerClassName = 'app\hook\adminLog\job\AdminLogCreateQueueJob';
        //队列名称
        $jobQueueName = "createAdminLogQueue";
        // 插入队列
        $isPushed = Queue::push($jobHandlerClassName, $data, $jobQueueName);
        if( $isPushed == false ){
            \Log::error("createAdminLogQueue创建队列失败".$data, []);
        }

使用tp5勾子实现think-queue消息队列实例,实现后台操作日志到添加到数据库

前提:thinkphp5框架基础上,已包含topthink/think-queue消息队列依赖包,可以用composer下载,这里不懂可以百度,就不说你。

1、创建admin_op_log数据表(字段不要更改)

CREATE TABLE `admin_op_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `op_time` varchar(20) NOT NULL,
  `admin_id` int(11) NOT NULL,
  `employee_no` varchar(30) NOT NULL,
  `op_type` tinyint(4) NOT NULL COMMENT '操作类型:1-新增 2-修改 3-删除',
  `op_object` varchar(50) NOT NULL COMMENT '操作对象',
  `op_object_id` int(11) NOT NULL DEFAULT '0' COMMENT '操作对象id',
  `op_info` varchar(2000) NOT NULL,
  `op_status` tinyint(4) unsigned zerofill NOT NULL DEFAULT '0000' COMMENT '操作状态 0-成功 1-失败',
  `op_ip` varchar(50) NOT NULL,
  `sys` varchar(10) DEFAULT '343' COMMENT '平台判断',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3981 DEFAULT CHARSET=utf8mb4

2、配置(配置文件位于 application/extra/queue.php)

return [
    'connector' => 'database'  //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动,topthink:Topthink驱动
];

3、创建接口。

//添加操作日志
$context = AdminLog::getInstance()->newAdminLogContext(AdminLogContext::ADMIN_OP_TYPE_UPDATE, $this::ADMIN_OP_TABLE_NAME, AdminLogContext::ADMIN_OP_STATUS_SUCCESS, $authorId, $opInfo);
\think\facade\Hook::listen('admin_log', $context);

4、增加任务,创建任务类,这里为了区分模块单独创建了一个模块和类。

application/hook/adminLog/controller/AdminLog.php

<?php
namespace app\hook\adminLog\controller;
use app\hook\adminLog\context\AdminLogContext;
use app\lucky\admin\controller\AdminBase;
use app\lucky\common\InstanceTrait;
use think\App;
use think\Queue;
use think\Request;
class AdminLog extends AdminBase
{
    use InstanceTrait;
    public $sys = '';
    public function __construct(App $app = null, Request $request = null)
    {
        parent::__construct($app, $request);
        $this->sys = \session('sys') ? \session('sys') : 343;
    }
    /**
     * 获取newAdminLogContext
     */
    public function newAdminLogContext($operateType, $operateObject, $operateStatus, $operateObjectId, $operateInfo)
    {
        $operateInfo = json_encode($operateInfo);
        return new AdminLogContext([
            'operateUserId' => $this->loginInfo['admin_id'],
            'operateUserEmployeeNo' => $this->loginInfo['employee_no'],
            'operateType' => $operateType,
            'operateObject' => $operateObject,
            'operateObjectId' => $operateObjectId,
            'operateInfo' => $operateInfo,
            'operateStatus' => $operateStatus,
            'operateIp' => $this->requestIp
        ]);
    }
    /**
     * @description 日志添加
     */
    public function run(AdminLogContext $context)
    {
        $data = [
            "admin_id" => $context->operateUserId,
            "employee_no" => $context->operateUserEmployeeNo,
            "op_type" => $context->operateType,
            "op_object" => $context->operateObject,
            "op_object_id" => $context->operateObjectId,
            "op_info" => $context->operateInfo,
            "op_status" => $context->operateStatus,
            "op_ip" => $context->operateIp,
            "op_time" => time(),
            "sys" => $this->sys
        ];
        // 当前队列归属的队列名称
        $jobHandlerClassName = 'app\hook\adminLog\job\AdminLogCreateQueueJob';
        //队列名称
        $jobQueueName = "createAdminLogQueue";
        // 插入队列
        $isPushed = Queue::push($jobHandlerClassName, $data, $jobQueueName);
        if( $isPushed == false ){
            \Log::error("createAdminLogQueue创建队列失败".$data, []);
        }
    }
}

application/hook/adminLog/ job/AdminLogCreateQueueJob.php

<?php
namespace app\hook\adminLog\job;
use app\hook\adminLog\service\AdminOpLogService;
use think\queue\Job;
use think\facade\Log;
class AdminLogCreateQueueJob
{
    // php think queue:work --queue BlogViewSyncJob
    //消费队列
    public function perform($data)
    {
        $adminOpLogService = new AdminOpLogService();
        $createFlge =$adminOpLogService->create($data);
        if (!$createFlge){
            \Log::error("createAdminLogQueue消费队列失败", []);
        }
        return $createFlge;
    }
    /**
     * fire是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param array|mixed $data 发布任务时自定义的数据
     */
    public function fire(Job $job, $data)
    {
        //消费队列
        $isJobDone = $this->perform($data);
        if ($isJobDone) {
            //如果任务执行成功, 记得删除任务
            $job->delete();
        } else {
            //检查任务重试3次数后删除队列
            if ($job->attempts() > 3) {
                $job->delete();
            }
        }
    }
}

application/hook/adminLog/context/AdminLogContext.php

<?php
namespace app\hook\adminLog\context;
class AdminLogContext extends \app\common\Context
{
    //操作类型:1-新增 2-修改 3-删除
    const ADMIN_OP_TYPE_CREATE = 1;
    const ADMIN_OP_TYPE_UPDATE = 2;
    const ADMIN_OP_TYPE_DELETE = 3;
    //操作状态 0-成功 1-失败'
    const ADMIN_OP_STATUS_SUCCESS = 0;
    const ADMIN_OP_STATUS_FAIL = 1;
    //操作类型:1-新增 2-修改 3-删除
    public $operateType;
    //操作人ID
    public $operateUserId;
    //操作人EmployeeNo
    public $operateUserEmployeeNo;
    //操作对象 表名
    public $operateObject;
    //操作对象id 表名id
    public $operateObjectId;
    //操作信息
    public $operateInfo;
    //操作状态 0-成功 1-失败'
    public $operateStatus;
    //操作人IP
    public $operateIp;
    //操作时间
    public $operateTime = '';
}

application/hook/adminLog/model/AdminOpLogModel.php

<?php
namespace app\hook\adminLog\model;
use think\Db;
class AdminOpLogModel
{
    /**
     * 新增操作记录
     * @param $data
     */
    public function insert($data)
    {
        return DB::table("admin_op_log")->insert($data);
    }
}

application/hook/adminLog/service/AdminOpLogService.php

<?php
namespace app\hook\adminLog\service;
use app\hook\adminLog\model\AdminOpLogModel;
use think\Exception;
use think\facade\Log;
class AdminOpLogService
{
    public $model = '';
    public function __construct()
    {
        $this->model = new AdminOpLogModel();
    }
    /**
     * 添加
     * @param array $data 数据数组
     * @return bool 返回类型
     */
    public function create($data)
    {
        try {
            return $this->model->insert($data);
        }catch (Exception $e) {
            \Log::error("添加操作日志失败(admin_op_log表插入失败)" . $e->getMessage(), []);
            return false;
        }
    }
}

5、在控制台监听任务并执行消费任务

cd到项目下执行命令 C:\wamp\www\test>php think queue:listen
执行全部 sudo php think queue:listen --queue createAdminLogQueue
执行一次 sudo php think queue:work --queue createAdminLogQueue

thinkqueue 后台运行常驻程序

进入项目路径,在目录下执行命令

在后台运行两条进程,常驻内存,不断的处理任务消息队列任务,如果要用指定版本php7.2表示使用7.2版本来执行,默认用php就可以来

sudo nohup php think queue:work --daemon --queue jobQueue --tries 2 >  out.file  2>&1  &
sudo nohup php7.2 think queue:work --daemon --queue jobQueueSlow --tries 2 >  out2.file  2>&1  &


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
2月前
|
设计模式 算法 数据库连接
PHP中的设计模式:提高代码的可维护性与扩展性本文旨在探讨PHP中常见的设计模式及其应用,帮助开发者编写出更加灵活、可维护和易于扩展的代码。通过深入浅出的解释和实例演示,我们将了解如何使用设计模式解决实际开发中的问题,并提升代码质量。
在软件开发过程中,设计模式是一套经过验证的解决方案模板,用于处理常见的软件设计问题。PHP作为流行的服务器端脚本语言,也有其特定的设计模式应用。本文将重点介绍几种PHP中常用的设计模式,包括单例模式、工厂模式和策略模式,并通过实际代码示例展示它们的具体用法。同时,我们还将讨论如何在实际项目中合理选择和应用这些设计模式,以提升代码的可维护性和扩展性。
61 4
|
8天前
|
PHP
PHP的pcntl多进程用法实例
PHP使用PCNTL系列的函数也能做到多进程处理一个事务。
|
1月前
|
设计模式 SQL 安全
PHP中的设计模式:单例模式的深入探索与实践在PHP开发领域,设计模式是解决常见问题的高效方案集合。它们不是具体的代码,而是一种编码和设计经验的总结。单例模式作为设计模式中的一种,确保了一个类仅有一个实例,并提供一个全局访问点。本文将深入探讨单例模式的基本概念、实现方式及其在PHP中的应用。
单例模式在PHP中的应用广泛,尤其在处理数据库连接、日志记录等场景时,能显著提高资源利用率和执行效率。本文从单例模式的定义出发,详细解释了其在PHP中的不同实现方法,并探讨了使用单例模式的优势与注意事项。通过对示例代码的分析,读者将能够理解如何在PHP项目中有效应用单例模式。
|
2月前
|
设计模式 数据库连接 PHP
PHP中的设计模式:如何提高代码的可维护性与扩展性在软件开发领域,PHP 是一种广泛使用的服务器端脚本语言。随着项目规模的扩大和复杂性的增加,保持代码的可维护性和可扩展性变得越来越重要。本文将探讨 PHP 中的设计模式,并通过实例展示如何应用这些模式来提高代码质量。
设计模式是经过验证的解决软件设计问题的方法。它们不是具体的代码,而是一种编码和设计经验的总结。在PHP开发中,合理地使用设计模式可以显著提高代码的可维护性、复用性和扩展性。本文将介绍几种常见的设计模式,包括单例模式、工厂模式和观察者模式,并通过具体的例子展示如何在PHP项目中应用这些模式。
|
2月前
|
设计模式 SQL 安全
PHP中的设计模式:单例模式的深入探索与实践在PHP的编程实践中,设计模式是解决常见软件设计问题的最佳实践。单例模式作为设计模式中的一种,确保一个类只有一个实例,并提供全局访问点,广泛应用于配置管理、日志记录和测试框架等场景。本文将深入探讨单例模式的原理、实现方式及其在PHP中的应用,帮助开发者更好地理解和运用这一设计模式。
在PHP开发中,单例模式通过确保类仅有一个实例并提供一个全局访问点,有效管理和访问共享资源。本文详细介绍了单例模式的概念、PHP实现方式及应用场景,并通过具体代码示例展示如何在PHP中实现单例模式以及如何在实际项目中正确使用它来优化代码结构和性能。
45 2
|
4月前
|
消息中间件 安全 PHP
消息队列 MQ使用问题之如何获取PHP客户端代码
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 数据可视化 Ubuntu
php laravel5.5使用rabbitmq消息队列
php laravel5.5使用rabbitmq消息队列
49 0
|
6月前
|
网络安全 PHP
[网络安全/CTF] BUUCTF极客大挑战2019PHP解题详析(Dirsearch使用实例+php反序列化)
[网络安全/CTF] BUUCTF极客大挑战2019PHP解题详析(Dirsearch使用实例+php反序列化)
159 0
|
6月前
|
消息中间件 网络协议 Java
RabbitMQ消息队列基础详解与安装实例
RabbitMQ消息队列基础详解与安装实例
295 0
|
消息中间件 数据可视化 Ubuntu
php laravel5.5使用rabbitmq消息队列
php laravel5.5使用rabbitmq消息队列
72 0