前言
之前公司需要一个内部的通讯软件,就叫我做一个。通讯软件嘛,就离不开通讯了,然后我就想到了长连接。这里本人用的是GatewayWorker框架。
什么是GatewayWorker框架?
GatewayWorker是基于Workerman开发的一套TCP长连接的应用框架,实现了单发、群发、广播等接口,内置了mysql类库,GatewayWorker分为Gateway进程和Worker进程,支持分布式部署,能够支持大量的连接数。
GatewayWorker的工作原理
1、启动所有进程(GatewayWorker、business、register) 2、GatewayWorker和business进程启动后向register请求注册 3、register服务收到注册请求后,把所有Gateway的通讯地址保存在内存中同时把内存中所有的Gateway的通讯地址发给business 4、business进程得到所有的Gateway内部通讯地址后进行连接GatewayWorker 5、如果有新的GatewayWorker服务进行register,则将新的Gateway内部通讯地址列表将广播给所有buiness并建立连接 6、如果有GatewayWorker下线,则Register服务会收到通知,会将该GatewayWorker内部通讯地址删除,然后广播新的内部通讯地址列表给所有business 7、此时GatewayWorker与buiness已经建立起长连接 8、客户端的事件及接受的数据全部由GatewayWorker转发给business进行处理。
目录结构
├── Applications // 项目应用目录 │ └── YourAppGateway // 建立一个存放workman的目录,名字随意 │ ├── Events.php // 处理主逻辑业务的文件,管理onConnect onMessage onClose 等方法 │ ├── start_gateway.php // gateway进程启动脚本、配置服务注册地址、端口号、进程数等参数 │ ├── start_businessworker.php // 用户进程的启动脚本 │ └── start_register.php // 注册服务的启动脚本 │ ├── start.php // 全局启动脚本,此脚本会依次加载Applications/YourAppGateway/start*.php对所有脚本进行启动 │ └── vendor // GatewayWorker框架和Workerman框架源码目录
GatewayWorker实现
以宝塔为例
1.安装composer
登录SSH终端,使用以下命令下载Composer的安装脚本:
php -r "copy('https://getcomposer.org/installer', 'composer-setup.php');"
运行下面的命令来安装Composer:
php composer-setup.php --install-dir=/usr/local/bin --filename=composer
检查composer版本
composer -v //检查composer版本
2.安装workerman
在项目根目录打开宝塔终端,输入以下命令安装workman
composer require topthink/think-worker
3.安装GatewayWorker
在项目根目录打开宝塔终端,输入以下命令安装GatewayWorker
composer require workerman/gateway-worker
4.实现代码
可以选择官方提供的demo 链接:http://www.workerman.net/download/spenddebtFinancialewsgyftplan/Gateway/Worker.zip
或者使用我根据demo改编而来的
先在项目应用目录(一般是Applications)下新建一个文件存储以下四个进程文件
start_gateway.php
<?php use \Workerman\Worker; use \Workerman\WebServer; use \GatewayWorker\Gateway; use \GatewayWorker\BusinessWorker; use \Workerman\Autoloader; // 自动加载类 require_once __DIR__ . '/../../vendor/autoload.php'; // gateway 进程,这里使用Text协议,可以用telnet测试 $gateway = new Gateway("websocket://0.0.0.0:8283"); // gateway名称,status方便查看 $gateway->name = 'YourAppGateway'; // gateway进程数 $gateway->count = 200; // 本机ip,分布式部署时使用内网ip $gateway->lanIp = '127.0.0.1'; // 内部通讯起始端口,假如$gateway->count=4,起始端口为4000 // 则一般会使用4000 4001 4002 4003 4个端口作为内部通讯端口 $gateway->startPort = 2900; // 服务注册地址、端口 $gateway->registerAddress = '127.0.0.1:1237'; // 心跳间隔 //$gateway->pingInterval = 10; // 心跳数据 //$gateway->pingData = '{"type":"ping"}'; /* // 当客户端连接上来时,设置连接的onWebSocketConnect,即在websocket握手时的回调 $gateway->onConnect = function($connection) { $connection->onWebSocketConnect = function($connection , $http_header) { // 可以在这里判断连接来源是否合法,不合法就关掉连接 // $_SERVER['HTTP_ORIGIN']标识来自哪个站点的页面发起的websocket链接 if($_SERVER['HTTP_ORIGIN'] != 'http://kedou.workerman.net') { $connection->close(); } // onWebSocketConnect 里面$_GET $_SERVER是可用的 // var_dump($_GET, $_SERVER); }; }; */ // 如果不是在根目录启动,则运行runAll方法 if(!defined('GLOBAL_START')) { Worker::runAll(); }
start_businessworker.php
<?php use Workerman\Worker; use Workerman\WebServer; use GatewayWorker\Gateway; use GatewayWorker\BusinessWorker; use Workerman\Autoloader; // 自动加载类 require_once __DIR__ . '/../../vendor/autoload.php'; // bussinessWorker 进程 $worker = new BusinessWorker(); // worker名称 $worker->name = 'YourAppBusinessWorker'; // bussinessWorker进程数量 $worker->count = 200; // 服务注册地址、端口 $worker->registerAddress = '127.0.0.1:1237'; // 如果不是在根目录启动,则运行runAll方法 if(!defined('GLOBAL_START')) { Worker::runAll(); }
start_register.php
<?php use \Workerman\Worker; use \GatewayWorker\Register; // 自动加载类 require_once __DIR__ . '/../../vendor/autoload.php'; // register 必须是text协议 1237为端口 $register = new Register('text://0.0.0.0:1237'); // 如果不是在根目录启动,则运行runAll方法 if(!defined('GLOBAL_START')) { Worker::runAll(); }
Events.php
<?php use \GatewayWorker\Lib\Gateway; /** * 主逻辑 * 主要是处理 onConnect onMessage onClose 三个方法 */ class Events { /** * 当客户端连接时触发 * * @param int $client_id 连接id */ public static function onConnect($client_id) { echo "【新的客户端链接】:client_id:".$client_id.PHP_EOL; // 向当前client_id发送数据 Gateway::sendToClient($client_id, ""); // 向所有人发送 $data=[ 'client_id'=>$client_id, 'message'=>'欢迎'.$client_id.'登录!', 'data'=>[] ]; Gateway::sendToAll(json_encode($data)); // Gateway::sendToAll("$client_id login\r\n"); } /** * 当客户端发来消息时触发 * @param int $client_id 连接id * @param mixed $message 具体消息 */ public static function onMessage($client_id, $message){ $data=[ 'client_id'=>$client_id, 'message'=>$client_id.'说:'.$result['message'], 'data'=>$message ]; Gateway::sendToAll(json_encode($data)); // 向所有人发送 // Gateway::sendToAll("$client_id said $message\r\n"); } /** * 当用户断开连接时触发 * @param int $client_id 连接id */ public static function onClose($client_id) { // 向所有人发送 // GateWay::sendToAll("$client_id 退出了!\r\n"); } }
随后在项目的根目录下新建一个启动文件
start_all_workman.php
<?php ini_set('display_errors', 'on'); use Workerman\Worker; if(strpos(strtolower(PHP_OS), 'win') === 0) { exit("start.php not support windows, please use start_for_win.bat\n"); } // 检查扩展 if(!extension_loaded('pcntl')) { exit("Please install pcntl extension. See http://doc3.workerman.net/appendices/install-extension.html\n"); } if(!extension_loaded('posix')) { exit("Please install posix extension. See http://doc3.workerman.net/appendices/install-extension.html\n"); } // 标记是全局启动 define('GLOBAL_START', 1); require_once __DIR__ . '/vendor/autoload.php'; // 加载所有Applications/*/start.php,以便启动所有服务 foreach(glob(__DIR__.'/application/此处请改成你自己命名存放workman的目录名/start*.php') as $start_file) { require_once $start_file; } // 运行所有服务 Worker::runAll();
注意开启端口后记得去放行端口!!!除了宝塔放行以外,你的服务器(阿里云/腾讯云等等)也记得要去放行!!!
启动workman
在项目根目录下打开终端,输入php start_all_workman.php start -d ,开启守护进程,如果出现一下页面即成功开启
如想关闭workman进程则输入php start_all_workman.php stop 进行关闭
GatewayWorker使用
如果你的网站使用的是Https协议的话,WebSocket必须使用wss协议
但是wss协议不支持IP:端口的形式,而是只能写域名+url
所以为了解决使用https协议而WebSocket不能连接的问题,可以使用Nginx进行反向代理
在网站配置文件的server下加入以下代码
location /connectWorkman(名字随你取,别跟其他反向代理重名就行) { proxy_pass http://127.0.0.1:8283; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "Upgrade"; proxy_set_header X-Real-IP $remote_addr; }
前端使用(uniapp)
init() { SocketTask = uni.connectSocket({ url: 'wss://chat.gdpaimaihui.com/auction', //正式 header: { 'content-type': 'application/json' }, success: function(res) { console.log('WebSocket连接创建', res); }, fail: function(err) { uni.showToast({ title: '网络异常!', icon: 'none' }); console.log(err); } }); //websocket监听事件 SocketTask.onOpen((res) => { socketOpen = true canReconnect = true console.log('监听 WebSocket 连接打开事件。', res); //websocket连接后可以启动个定时器,每隔一段时间进行心跳一次,以防心跳停止断开连接 this.timer = setInterval(() => { SocketTask.send({ data: '心跳', success() { // console.log('发送心跳成功'); } }) }, 2000) }); SocketTask.onError((onError) => { console.log('监听 WebSocket 错误。错误信息', onError); socketOpen = false; if (canReconnect) { this.reconnect() canReconnect = false } }); SocketTask.onMessage((res) => { console.log('监听WebSocket接受到服务器的消息事件。服务器返回的消息', res); }); }, //重新连接 reconnect() { if (!socketOpen) { let count = 0; reconnectInterval = setInterval(() => { console.log("正在尝试重连") uni.showToast({ title: '正在尝试重连', icon: 'none' }) this.init(); count++ console.log(); //重连一定次数后就不再重连 if (count >= reconnectTimes) { clearInterval(reconnectInterval) uni.showToast({ title: '网络异常或服务器错误', icon: 'none' }) } }, reconnectDelay) } }
上述为之前给公司做内部通讯软件时个人整理内容,水平有限,如有错误之处,望各位园友不吝赐教!如果觉得不错,请点击推荐和关注!谢谢~๑•́₃•̀๑ [鲜花][鲜花][鲜花]