使用supervisor管理消费进程
上面的方式一次只能处理一个任务,配合supervisor可以以守护进程的模式不断的处理任务
supervisor配置
[supervisord]
logfile=/www/supervisor/log/supervisord.log ; 日志文件路径,可根据需要修改
logfile_maxbytes=50MB ; 单个日志文件的最大大小
logfile_backups=10 ; 保留的日志文件数量
loglevel=info ; 日志等级
pidfile=/www/supervisor/var/supervisord.pid ; supervisord pid 文件,可根据需要更改
nodaemon=false ; start in foreground if true; default false
minfds=1024 ; min. avail startup file descriptors; default 1024
minprocs=200 ; min. avail process descriptors;default 200
[unix_http_server]
file=/www/supervisor/var/supervisor.sock ; the path to the socket file
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///www/supervisor/var/supervisor.sock ; 用于supervisorctl,路径需要与unix_http_server中的sock文件相同
[include]
files = /www/supervisor/etc/*.conf ; 包含的其他配置文件
配置文件这里我保存在/www/supervisor/supervisord.conf,基本使用默认配置,改动的地方主要为log,pid,sock文件的路径,以及增加其他配置文件的路径,用于加载不同进程的配置文件。默认的配置文件,可以使用 echo_supervisord_conf 命令查看,以及 echo_supervisord_conf > file_to_save 保存默认配置文件到指定文件。
队列消费进程配置
以下配置文件保存在 /www/supervisor/etc/queue_worker.conf
[program:queue_worker] ;项目名称
directory = /www/tp5 ; 程序的启动目录
command = php think queue:work --queue addData --daemon ; 启动命令
process_name=%(program_name)s_%(process_num)02d
numprocs = 3 ; 开启的进程数量
autostart = true ; 在 supervisord 启动的时候也自动启动
startsecs = 5 ; 启动 5 秒后没有异常退出,就当作已经正常启动了
autorestart = true ; 程序异常退出后自动重启
startretries = 3 ; 启动失败自动重试次数,默认是 3
user = www ; 用哪个用户启动
redirect_stderr = true ; 把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes = 50MB ; stdout 日志文件大小,默认 50MB
stdout_logfile_backups = 20 ; stdout 日志文件备份数
; stdout 日志文件,需要手动创建目录(supervisord 会自动创建日志文件)
stdout_logfile = /www/supervisor/log/queue_worker.log
loglevel=info
启动supervisor
命令行执行以下命令,即可以启动supervisor,命令中指定了supervisor配置文件的路径
supervisord -c /www/supervisor/supervisord.conf
使用supervisorctl 查看运行状态
supervisorctl -c /www/supervisor/supervisord.conf queue_worker:queue_worker_00 RUNNING pid 12270, uptime 0:00:08 queue_worker:queue_worker_01 RUNNING pid 12271, uptime 0:00:08 queue_worker:queue_worker_02 RUNNING pid 12269, uptime 0:00:08
可以看到配置的3个进程已经正常启动。
此时,如果执行 kill 12270,关闭queue_worker:queue_worker_00,会看到supervisor会自动重启对应进程。也可以配和官方的命令:
php think queue:restart
达到重启队列的效果。
处理supervisor重启
当修改了supervisor配置后,更新配置时会重启相关的进程。这就可能导致在任务处理进行到一半的时候,被supervisor杀掉然后重启。当然,可以选择在没有任务的时候进行重启来避免这种情况。下面给出一种通过信号量的方式处理这中情况的方法。
在supervisor需要重启管理的进程时,默认会向进程发送TERM信号,如果无法关闭进程,则超过配置的时间(默认是10秒),会发送KILL信号强制关闭进程。我们可以通过修改supervisor的默认超时时间,并且处理TERM信号的方式,使进程安全的退出。
首先,我们需要修改supervisor中的配置,修改处理TERM信号的超时时间,修改为我们预计的单个任务最大执行时间,这里假设任务最大执行时间为1分钟
stopwaitsecs=60 ; max num secs to wait b4 SIGKILL (default 10)
然后,修改我们的php代码,在任务处理结束后,增加调用信号量分发方法
public function fire(Job job,job, job,data) { Db::table('task')->insert(['task_data' => json_encode(data)\]); job->delete(); // 增加信号量分发方法 pcntl_signal_dispatch(); }
然后,当没有任务时,会sleep一段时间后再获取是否有任务需要处理,这时也需要进行信号量分发。thinkphp-queue有对应的hook可以处理这种情况,在tags.php配置文件下增加以下配置
'worker_before_sleep' => ['app\\common\\behavior\\DispatchSignalBehavior']
对应的处理方法
class DispatchSignalBehavior { public function run() { pcntl_signal_dispatch(); } }
最后,我们增加对TERM信号的处理方法,在tags.php中添加以下配置
'worker_daemon_start' => ['app\\common\\behavior\\SignalBehavior']
对应的处理方法
class SignalBehavior { public function run(¶ms) { pcntl\_signal(SIGTERM, function (signo) { exit; }); } }
处理方法很简单,直接退出脚本就行。
这样,当supervisor关闭进程时,就不会中断当前所执行的任务了。