Thinkphp5+FastAdmin配置workerman消息推送(多线程)

简介: Thinkphp5+FastAdmin配置workerman消息推送(多线程)

环境:linux系统、fastadmin(tp5内核)、php7.4.3,workerman1.0

1.首先删除vordor文件夹

2.安装扩展,下载workerman

//安装扩展
yum install php-process
//下载workerman
composer require topthink/think-worker

3.在application创建server.php

代码:

<?php
define('APP_PATH', __DIR__ . '/../application/');
 //定义监听控制器
define('BIND_MODULE','push/Workerman');
// 加载框架引导文件
require __DIR__ . '/../thinkphp/start.php';

4.在application目录创建push目录

application/push/controller/Worker.php

注:如果想监听多个端口,只需要在application/push/controller中再创建一个文件,把端口改一下即可,另外在application创建再创建一个server.php

代码:

<?php
namespace app\push\controller; 
use think\worker\Server; 
use Workerman\Lib\Timer;
use think\Db;
class Worker extends Server{    
    protected $socket = 'http://0.0.0.0:2348'; //linux服务器端口
    protected static $heartbeat_time=55;
      /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
    {
        if($data=="ping"&&$data==0){
        }else{
        //接收的参数
        }
        $connection->send("ping");
        $connection->lastMessageTime=time();
    }
    /**
     * 每个进程启动
     * @param $worker
     */
    public function onWorkerStart($worker){
    //查看是否有新的充值或提现订单,有就推送给所有用户
        Timer::add(3, function()use($worker){
            $time_now=time();
            $hasNewDepositOrder   =   Db::name('worker')->where('is_push',0)->order('id desc')->count('id');
            // $system_listener    =   Db::name('worker')->cache(true)->order('id desc')->select();
            if($hasNewDepositOrder){
                $depositOrderInfo   =   Db::name('worker')->where('is_push',0)->order('id desc')->find();
                $data   =   ['creatTime'=>date('Y-m-d H:i:s'),'name'=>$depositOrderInfo['name'],'tel'=>$depositOrderInfo['tel']];
                foreach($worker->connections as $connection) {
                    if(empty($connection->lastMessageTime)){
                        $connection->lastMessageTime =   $time_now;
                    }
                    if($time_now-$connection->lastMessageTime > self::$heartbeat_time){
                        $connection->close();
                    }
                    $connection->send(json_encode($data));
                }
              Db::name('worker')->where('id',$depositOrderInfo['id'])->update(['is_push'=>1]);
            }else{
                foreach($worker->connections as $connection) {
                    if(empty($connection->lastMessageTime)){
                        $connection->lastMessageTime = $time_now;
                        continue;
                    }
                    if($time_now-$connection->lastMessageTime > self::$heartbeat_time){      //连接超时
                        $connection->close();
                    }
                }
            }
        });
    }
}

5.找到/vendor/topthink/think-worker/src里面的Server.php

6.使用命令进入到application目录中,执行命令:php server.php start

注:如果想要监听多个端口:需要找到$this->worker = new Worker();

改成$this->worker = new Worker($this->socket);

整体代码如下:

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2014 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: liu21st <liu21st@gmail.com>
// +----------------------------------------------------------------------
namespace think\worker;
use Workerman\Worker;
/**
 * Worker控制器扩展类
 */
abstract class Server
{
    protected $worker;
    protected $socket    = '';
    protected $protocol  = 'http';
    protected $host      = '0.0.0.0';
    protected $port      = '2346';
    protected $processes = 4;
    /**
     * 架构函数
     * @access public
     */
    public function __construct()
    {
        // 实例化 Websocket 服务
        $this->worker = new Worker($this->socket);//this->socket
        // 设置进程数
        $this->worker->count = $this->processes;
        // 初始化
        $this->init();
        // 设置回调
        foreach (['onWorkerStart', 'onConnect', 'onMessage', 'onClose', 'onError', 'onBufferFull', 'onBufferDrain', 'onWorkerStop', 'onWorkerReload'] as $event) {
            if (method_exists($this, $event)) {
                $this->worker->$event = [$this, $event];
            }
        }
        // Run worker
        Worker::runAll();
    }
    protected function init()
    {
    }
}


相关文章
|
5月前
|
Java 调度 Spring
SpringBoot实现多线程定时任务动态定时任务配置文件配置定时任务
SpringBoot实现多线程定时任务动态定时任务配置文件配置定时任务
558 0
|
2月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
82 7
|
3月前
|
Java Spring
spring boot 中默认最大线程连接数,线程池数配置查看
spring boot 中默认最大线程连接数,线程池数配置查看
246 4
|
3月前
|
安全 算法 Java
Java面试题:如何诊断和解决Java应用程序中的内存泄漏问题?如何实现一个线程安全的计数器?如何合理配置线程池以应对不同的业务场景?
Java面试题:如何诊断和解决Java应用程序中的内存泄漏问题?如何实现一个线程安全的计数器?如何合理配置线程池以应对不同的业务场景?
27 0
|
3月前
|
Java Redis 数据安全/隐私保护
Redis14----Redis的java客户端-jedis的连接池,jedis本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,最好用jedis连接池代替jedis,配置端口,密码
Redis14----Redis的java客户端-jedis的连接池,jedis本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,最好用jedis连接池代替jedis,配置端口,密码
|
5月前
|
分布式计算 Java Hadoop
NameNode 处理线程配置(心跳并发)
NameNode线程池处理客户端和数据节点请求,如读写文件及心跳、块报告。通过调整`dfs.namenode.handler.count`(默认10,示例设为21)在`hdfs-site.xml`中可控制并发处理能力。线程数过多或过少都可能影响性能,需平衡资源使用并进行基准测试以确定最佳值。合理线程数可通过公式`int(math.log(N) * 20)`计算,N为服务器数量。例如,3台服务器的计算结果为21。
|
4月前
|
消息中间件
RabbitMQ配置多线程消费
RabbitMQ配置多线程消费
253 0
|
5月前
|
关系型数据库 MySQL Java
实时计算 Flink版产品使用合集之同步MySQL数据到Hologres时,配置线程池的大小该考虑哪些
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
Java 应用服务中间件
Springboot启动的时候初始化的线程池默认配置tomcat
Springboot启动的时候初始化的线程池默认配置tomcat
144 1
|
5月前
|
域名解析 安全 Java
SpringBoot启动的时候初始化的线程池默认配置tomcat
SpringBoot启动的时候初始化的线程池默认配置tomcat
42 1