使用supervisor管理消费队列等进程

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 使用supervisor管理消费队列等进程

使用supervisor管理消费进程

上面的方式一次只能处理一个任务,配合supervisor可以以守护进程的模式不断的处理任务


supervisor配置

[supervisord]

logfile=/www/supervisor/log/supervisord.log ; 日志文件路径,可根据需要修改

logfile_maxbytes=50MB ; 单个日志文件的最大大小

logfile_backups=10 ; 保留的日志文件数量

loglevel=info ; 日志等级

pidfile=/www/supervisor/var/supervisord.pid ; supervisord pid 文件,可根据需要更改

nodaemon=false ; start in foreground if true; default false

minfds=1024 ; min. avail startup file descriptors; default 1024

minprocs=200 ; min. avail process descriptors;default 200


[unix_http_server]

file=/www/supervisor/var/supervisor.sock ; the path to the socket file


[rpcinterface:supervisor]

supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface


[supervisorctl]

serverurl=unix:///www/supervisor/var/supervisor.sock ; 用于supervisorctl,路径需要与unix_http_server中的sock文件相同


[include]

files = /www/supervisor/etc/*.conf ; 包含的其他配置文件

配置文件这里我保存在/www/supervisor/supervisord.conf,基本使用默认配置,改动的地方主要为log,pid,sock文件的路径,以及增加其他配置文件的路径,用于加载不同进程的配置文件。默认的配置文件,可以使用 echo_supervisord_conf 命令查看,以及 echo_supervisord_conf > file_to_save 保存默认配置文件到指定文件。


队列消费进程配置

以下配置文件保存在 /www/supervisor/etc/queue_worker.conf


[program:queue_worker] ;项目名称

directory = /www/tp5 ; 程序的启动目录

command = php think queue:work --queue addData --daemon ; 启动命令

process_name=%(program_name)s_%(process_num)02d

numprocs = 3 ; 开启的进程数量

autostart = true ; 在 supervisord 启动的时候也自动启动

startsecs = 5 ; 启动 5 秒后没有异常退出,就当作已经正常启动了

autorestart = true ; 程序异常退出后自动重启

startretries = 3 ; 启动失败自动重试次数,默认是 3

user = www ; 用哪个用户启动

redirect_stderr = true ; 把 stderr 重定向到 stdout,默认 false

stdout_logfile_maxbytes = 50MB ; stdout 日志文件大小,默认 50MB

stdout_logfile_backups = 20 ; stdout 日志文件备份数

; stdout 日志文件,需要手动创建目录(supervisord 会自动创建日志文件)

stdout_logfile = /www/supervisor/log/queue_worker.log

loglevel=info

启动supervisor

命令行执行以下命令,即可以启动supervisor,命令中指定了supervisor配置文件的路径


supervisord -c /www/supervisor/supervisord.conf

使用supervisorctl 查看运行状态

supervisorctl -c /www/supervisor/supervisord.conf
queue_worker:queue_worker_00 RUNNING pid 12270, uptime 0:00:08
queue_worker:queue_worker_01 RUNNING pid 12271, uptime 0:00:08
queue_worker:queue_worker_02 RUNNING pid 12269, uptime 0:00:08

可以看到配置的3个进程已经正常启动。


此时,如果执行 kill 12270,关闭queue_worker:queue_worker_00,会看到supervisor会自动重启对应进程。也可以配和官方的命令:


php think queue:restart

达到重启队列的效果。


处理supervisor重启


当修改了supervisor配置后,更新配置时会重启相关的进程。这就可能导致在任务处理进行到一半的时候,被supervisor杀掉然后重启。当然,可以选择在没有任务的时候进行重启来避免这种情况。下面给出一种通过信号量的方式处理这中情况的方法。


在supervisor需要重启管理的进程时,默认会向进程发送TERM信号,如果无法关闭进程,则超过配置的时间(默认是10秒),会发送KILL信号强制关闭进程。我们可以通过修改supervisor的默认超时时间,并且处理TERM信号的方式,使进程安全的退出。


首先,我们需要修改supervisor中的配置,修改处理TERM信号的超时时间,修改为我们预计的单个任务最大执行时间,这里假设任务最大执行时间为1分钟


stopwaitsecs=60 ; max num secs to wait b4 SIGKILL (default 10)

然后,修改我们的php代码,在任务处理结束后,增加调用信号量分发方法

public function fire(Job job,job, job,data)
{
Db::table('task')->insert(['task_data' => json_encode(data)\]); job->delete();
// 增加信号量分发方法
pcntl_signal_dispatch();
}

然后,当没有任务时,会sleep一段时间后再获取是否有任务需要处理,这时也需要进行信号量分发。thinkphp-queue有对应的hook可以处理这种情况,在tags.php配置文件下增加以下配置

'worker_before_sleep' => ['app\\common\\behavior\\DispatchSignalBehavior']

对应的处理方法

class DispatchSignalBehavior
{
public function run()
{
pcntl_signal_dispatch();
}
}

最后,我们增加对TERM信号的处理方法,在tags.php中添加以下配置

'worker_daemon_start' => ['app\\common\\behavior\\SignalBehavior']

对应的处理方法

class SignalBehavior
{
public function run(&params)  {  pcntl\_signal(SIGTERM, function (signo) {
exit;
});
}
}

处理方法很简单,直接退出脚本就行。

这样,当supervisor关闭进程时,就不会中断当前所执行的任务了。



相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
3月前
|
Prometheus 监控 Cloud Native
使用supervisor守护Prometheus进程
使用supervisor守护Prometheus进程
|
15天前
|
算法 调度 UED
深入理解操作系统:进程调度与优先级队列
【10月更文挑战第31天】在计算机科学的广阔天地中,操作系统扮演着枢纽的角色,它不仅管理着硬件资源,还为应用程序提供了运行的环境。本文将深入浅出地探讨操作系统的核心概念之一——进程调度,以及如何通过优先级队列来优化资源分配。我们将从基础理论出发,逐步过渡到实际应用,最终以代码示例巩固知识点,旨在为读者揭开操作系统高效管理的神秘面纱。
|
2月前
|
存储 算法 前端开发
深入理解操作系统:进程调度与优先级队列算法
【9月更文挑战第25天】在操作系统的复杂世界中,进程调度是维持系统稳定运行的核心机制之一。本文将深入探讨进程调度的基本概念,分析不同的进程调度算法,并着重介绍优先级队列算法的原理和实现。通过简洁明了的语言,我们将一起探索如何优化进程调度,提高操作系统的效率和响应速度。无论你是计算机科学的初学者还是希望深化理解的专业人士,这篇文章都将为你提供有价值的见解。
|
3月前
|
算法 调度 UED
探索操作系统核心:进程调度与优先级队列
【8月更文挑战第31天】在计算机的心脏——操作系统中,进程调度是维持系统运行的关键机制。本文将深入浅出地介绍进程调度的概念,并通过一个简单的优先级队列算法示例,展示如何在操作系统设计中实现基本的进程管理。我们将从理论到实践,逐步揭示如何通过代码构建一个模拟的进程调度系统,帮助读者理解这一复杂但至关重要的操作系统特性。
|
3月前
|
数据采集 Java Python
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
|
3月前
使用supervisor守护freeswitch进程
使用supervisor守护freeswitch进程
|
3月前
|
Python
Python IPC深度探索:解锁跨进程通信的无限可能,以管道与队列为翼,让你的应用跨越边界,无缝协作,震撼登场
【8月更文挑战第3天】Python IPC大揭秘:解锁进程间通信新姿势,让你的应用无界连接
27 0
|
4月前
|
SQL 自然语言处理 网络协议
【Linux开发实战指南】基于TCP、进程数据结构与SQL数据库:构建在线云词典系统(含注册、登录、查询、历史记录管理功能及源码分享)
TCP(Transmission Control Protocol)连接是互联网上最常用的一种面向连接、可靠的、基于字节流的传输层通信协议。建立TCP连接需要经过著名的“三次握手”过程: 1. SYN(同步序列编号):客户端发送一个SYN包给服务器,并进入SYN_SEND状态,等待服务器确认。 2. SYN-ACK:服务器收到SYN包后,回应一个SYN-ACK(SYN+ACKnowledgment)包,告诉客户端其接收到了请求,并同意建立连接,此时服务器进入SYN_RECV状态。 3. ACK(确认字符):客户端收到服务器的SYN-ACK包后,发送一个ACK包给服务器,确认收到了服务器的确
192 1
|
4月前
|
安全 API Python
`multiprocessing`是Python的一个标准库,用于支持生成进程,并通过管道和队列、信号量、锁和条件变量等同步原语进行进程间通信(IPC)。
`multiprocessing`是Python的一个标准库,用于支持生成进程,并通过管道和队列、信号量、锁和条件变量等同步原语进行进程间通信(IPC)。
|
4月前
|
Ubuntu Unix Linux
如何使用 Supervisor 管理你的进程
**Supervisor** 是一款Python编写的进程管理工具,用于类Unix系统,确保应用服务持续运行。常用命令包括:`reload`(重新加载配置)、`status`(查看进程状态)、`shutdown`(关闭所有进程)、`start`和`stop`(控制单个进程)。在CentOS上安装Supervisor用`yum install -y supervisor`,配置文件通常在`/etc/supervisord.conf`。
72 0

热门文章

最新文章

相关实验场景

更多