我们都知道,PHP 有它自带的进程控制 pcntl,Swoole 中的 process 提供了更强大的功能,直接截取了官网的一张图。
下面我们模拟一个 TCP 服务器,演示一下基于 process 的多进程服务。
接下来,先来看我们服务器的代码部分。我们设置子进程数为 3 个,在下面这段代码中,主进程启动之后,会额外启动 3 个子进程,负责处理客户端连接以及请求操作,当子进程退出后,主进程会重新创建新的子进程。如果主进程退出,那么子进程在处理完当前请求之后也会退出。
Server
<?php class Server { private $mpid; //主进程id private $pids = []; //子进程数组 private $socket; //网络套间字 const Max_PROCESS = 3; //最大创建进程数 //主进程逻辑 public function run() { $process = new \Swoole\Process(function () { //获取主进程id $this->mpid = posix_getpid(); echo time() . " master process pid: {$this->mpid}" . PHP_EOL; //创建TCP服务器并获取套间字 $this->socket = stream_socket_server("tcp://127.0.0.1:9505", $errno, $errstr); if (!$this->socket) { exit("service start error:$errstr --- $errno"); } //启动子进程 for ($i = 1; $i <= self::Max_PROCESS; $i++) { $this->startWorkerProcess(); } echo "Waiting client connect" . PHP_EOL; //主进程等待子进程退出 必须是死循环 while (1) { if(count($this->pids)){ //回收结束运行的子进程,默认为阻塞,false 表示非阻塞模式,如果失败返回 false $ret = \Swoole\Process::wait(false); if ($ret) { echo time() . " worker process: {$ret['pid']} exit ,then new process..." . PHP_EOL; //新创建一个子进程 $this->startWorkerProcess(); //从数组中删除这个已不存在的子进程 pid $index=array_search($ret['pid'],$this->pids); unset($this->pids[$index]); } } sleep(1); // } }, false, false); //把当前进程升级为守护进程 \Swoole\Process::daemon(); $process->start(); } //创建子进程 public function startWorkerProcess() { $process = new \Swoole\Process(function (\Swoole\Process $work) { $this->acceptClient($work); }, false, false); $pid = $process->start(); $this->pids[] = $pid; } //接收客户端请求 public function acceptClient(&$worker) { //子进程等待客户端连接,不能退出 while (1) { //接收由 stream_socket_server()创建的的套间字连接 $conn = stream_socket_accept($this->socket, -1); //如果定义了连接回调的地址,就调用 if ($this->onConnect) { call_user_func($this->onConnect, $conn); } //开始循环读取客户端信息 $recv = ''; //实际接收数据 $buffer = ''; //缓冲数据 while (1) { //检查主进程是否正常,如果不正常,退出子进程 $this->checkmPid($worker); //读取客户端信息 $buffer = fread($conn, 20); //如果没有消息 if ($buffer === false || $buffer === '') { //如果设置了了解关闭的回调函数,那么执行关闭回调 if ($this->onClose) { call_user_func($this->onClose, $conn); } //等待下一个连接消息 break; } //消息结束的位置 $pos = strpos($buffer, "\n"); //没有结束符,说明还没有读完 if (false === $pos) { $recv .= $buffer; //拼接消息 } else { //消息读完了,处理消息 $recv .= trim(substr($buffer, 0, $pos + 1)); //如果定义了处理消息回调的函数,那就直接调用回调 if ($this->onMessage) { call_user_func($this->onMessage, $conn, $recv); } //如果接收到quit 表示退出,那么关闭这个链接,等待下一个客户端连接 if ($recv === 'quit') { echo 'client close' . PHP_EOL; fclose($conn); break; } } //清空消息 $recv = ''; } } } public function checkmPid(&$worker) { //说明主进程不存在,子进程此时是僵尸进程,需要退出 if (!\Swoole\Process::kill($this->mpid, 0)) { $worker->exit(); echo "worker: {$worker->pid} exit" . PHP_EOL; } } } $server = new Server(); //连接回调 $server->onConnect = function ($conn) { echo "onConnect -- accepted " . stream_socket_get_name($conn, true) . PHP_EOL; }; //接收消息回调 $server->onMessage = function ($conn, $msg) { echo "message is ------ $msg" . PHP_EOL; fwrite($conn, "received: " . $msg . "\n"); }; //关闭回调 $server->onClose = function ($conn) { echo "close---" . stream_socket_get_name($conn, true) . PHP_EOL; }; $server->run();
开启服务之后,你可以通过命令查看进程 pstree -p
主进程id 或者通过命令查看运行的更多信息 ps -ef | grep Server.php
, 可以看到对应进程的 id。
现在让我们创建一个简单的客户端连接服务器。
Client <?php go(function () { $client = new \Swoole\Coroutine\Client(SWOOLE_SOCK_TCP); // 尝试与指定 TCP 服务端建立连接(IP和端口号需要与服务端保持一致,超时时间为0.5秒) if ($client->connect("127.0.0.1", 9505, 0.5)) { // 建立连接后发送内容 $client->send("hello world\n"); // 打印接收到的消息 echo $client->recv().PHP_EOL; sleep(2); // 关闭连接 $client->close(); } else { echo "connect failed."; } });
创建一个协程客户端请求,服务端接收数据,服务端响应数据,客户端接收数据的整个过程。整理客户端 sleep 两秒之后才关闭连接。
然后我们来很粗糙的手动 kill 掉其中的一个子进程。父进程又重新创建一个子进程。
和预期一样,主进程又重新创建了一个新的子进程。