simps/mqtt:适用于PHP的 MQTT 协议解析和协程客户端

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Simps 的第一个版本 MQTT 库 就是参考了 Workerman 的实现,使其能够使用 Swoole 的协程能力,同时也修复了一些问题

MQTT 是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,作为一种低开销、低带宽占用的即时通讯协议,已经成为物联网的重要组成部分


Swoole 也给 PHP 提供了开发物联网项目的能力,只需要设置一个 open_mqtt_protocol 选项,启用后就会解析 MQTT 包头,在 Worker 进程的 onReceive 事件每次都会返回一个完整的 MQTT 数据包


当然其他的也有,例如 Workerman 之前提供的 异步 mqtt 客户端库 ,还有其他的开源库,这里就不一一介绍了


Simps 的第一个版本 MQTT 库 就是参考了 Workerman 的实现,使其能够使用 Swoole 的协程能力,同时也修复了一些问题


第一个版本的实现是放在了框架当中,限制了一些用户的使用。于是又开始了重构,将 MQTT 独立为一个 library ,方便用户使用的同时也丰富了 PHP 生态,让 PHP 程序员不再局限于 Web 开发


在第一个版本发布之后,Simps 的交流群中也有不少用户询问 MQTT 的问题,Swoole 也修复了一些相关的 Bug,现在使用 PHP + Swoole 去开发物联网相关的项目应该是如虎添翼


同时第一个版本的 MQTT 库,只支持 MQTT 3.x,不支持 MQTT 5.0,在 GitHub 上也没有找到相关支持的类库,所以在重构了 3.x 版本之后,也支持了一下 MQTT 5.0


也许这是第一个支持 MQTT v5.0 协议的 PHP library...


支持 MQTT 协议 3.1、3.1.1 和 5.0 版本,支持 QoS 0、QoS 1、QoS 2,那么它来了,使用 composer 来安装


composer require simps/mqtt

安装成功之后我们来看一下订阅和发布的使用,以 MQTT5.0 为例


订阅


首先应该是订阅,订阅成功之后才能收到对应主题的发布消息,创建一个subscribe.php写入以下内容

include __DIR__ . '/vendor/autoload.php';
use Simps\MQTT\Hex\ReasonCode;
use Swoole\Coroutine;
use Simps\MQTT\Client;
use Simps\MQTT\Types;
$config = [
    'host' => 'broker.emqx.io',
    'port' => 1883,
    'time_out' => 5,
    'user_name' => 'user001',
    'password' => 'hLXQ9ubnZGzkzf',
    'client_id' => Client::genClientID(),
    'keep_alive' => 10,
    'properties' => [
        'session_expiry_interval' => 60,
        'receive_maximum' => 200,
        'topic_alias_maximum' => 200,
    ],
    'protocol_level' => 5,
];
Coroutine\run(function () use ($config) {
    $client = new Client($config, ['open_mqtt_protocol' => true, 'package_max_length' => 2 * 1024 * 1024]);
    while (!$data = $client->connect()) {
        Coroutine::sleep(3);
        $client->connect();
    }
    $topics['simps-mqtt/user001/get'] = [
        'qos' => 1,
        'no_local' => true,
        'retain_as_published' => true,
        'retain_handling' => 2,
    ];
    $timeSincePing = time();
    $res = $client->subscribe($topics);
    // 订阅的结果
    var_dump($res);
    while (true) {
        $buffer = $client->recv();
        if ($buffer && $buffer !== true) {
            $timeSincePing = time();
            // 收到的数据包
            var_dump($buffer);
        }
        if (isset($config['keep_alive']) && $timeSincePing < (time() - $config['keep_alive'])) {
            $buffer = $client->ping();
            if ($buffer) {
                echo 'send ping success' . PHP_EOL;
                $timeSincePing = time();
            } else {
                $client->close();
                break;
            }
        }
        // QoS1 发布回复
        if ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) {
            $client->send(
                [
                    'type' => Types::PUBACK,
                    'message_id' => $buffer['message_id'],
                    'code' => ReasonCode::SUCCESS
                ]
            );
        }
    }
});


执行php subscribe.php,就会得到这样的输出


array(3) {
  ["type"]=>
  int(9)
  ["message_id"]=>
  int(1)
  ["codes"]=>
  array(1) {
    [0]=>
    int(1)
  }
}

表示订阅成功,codes 对应的是对应订阅主题的 QoS 等级


发布


订阅成功之后,创建一个publish.php来测试发布

include __DIR__ . '/vendor/autoload.php';
use Swoole\Coroutine;
use Simps\MQTT\Client;
$config = [
    'host' => 'broker.emqx.io',
    'port' => 1883,
    'time_out' => 5,
    'user_name' => 'user002',
    'password' => 'adIJS1D482sd',
    'client_id' => Client::genClientID(),
    'keep_alive' => 20,
    'properties' => [
        'session_expiry_interval' => 60,
        'receive_maximum' => 200,
        'topic_alias_maximum' => 200,
    ],
    'protocol_level' => 5,
];
Coroutine\run(function () use ($config) {
    $client = new Client($config, ['open_mqtt_protocol' => true, 'package_max_length' => 2 * 1024 * 1024]);
    while (!$client->connect()) {
        Coroutine::sleep(3);
        $client->connect();
    }
    while (true) {
        $response = $client->publish(
            'simps-mqtt/user001/get',
            '{"time":' . time() . '}',
            1,
            0,
            0,
            ['topic_alias' => 1]
        );
        var_dump($response);
        Coroutine::sleep(3);
    }
});


代码的意思是每隔 3 秒给订阅的主题simps-mqtt/user001/get发布一次消息

打开一个新的终端窗口,执行php publish.php就会得到输出:


array(4) {
  ["type"]=>
  int(4)
  ["message_id"]=>
  int(1)
  ["code"]=>
  int(0)
  ["message"]=>
  string(7) "Success"
}


这里增加了 message,为了用户可读,不需要去查找对应的 code 含义是什么

返回到订阅的窗口,就会看到所打印的发布信息


array(8) {
  ["type"]=>
  int(3)
  ["topic"]=>
  string(0) ""
  ["message"]=>
  string(19) "{"time":1608017156}"
  ["dup"]=>
  int(1)
  ["qos"]=>
  int(1)
  ["retain"]=>
  int(0)
  ["message_id"]=>
  int(4)
  ["properties"]=>
  array(1) {
    ["topic_alias"]=>
    int(1)
  }
}

这样一个简单的发布订阅功能就实现了

在这个库中还有一些值得优化和还未完成的部分,如还没有支持 MQTT5 的Auth type,以及部分的properties还未支持

想参与的同学可以提交 PR,如果有问题也可以提交 Issue,让我们共同去建设 PHP 的生态

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
17天前
|
网络协议 安全 网络安全
探索网络模型与协议:从OSI到HTTPs的原理解析
OSI七层网络模型和TCP/IP四层模型是理解和设计计算机网络的框架。OSI模型包括物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,而TCP/IP模型则简化为链路层、网络层、传输层和 HTTPS协议基于HTTP并通过TLS/SSL加密数据,确保安全传输。其连接过程涉及TCP三次握手、SSL证书验证、对称密钥交换等步骤,以保障通信的安全性和完整性。数字信封技术使用非对称加密和数字证书确保数据的机密性和身份认证。 浏览器通过Https访问网站的过程包括输入网址、DNS解析、建立TCP连接、发送HTTPS请求、接收响应、验证证书和解析网页内容等步骤,确保用户与服务器之间的安全通信。
73 1
|
1月前
|
运维 数据库连接 PHP
PHP中的异常处理机制深度解析####
本文深入探讨了PHP中异常处理机制的工作原理,通过实例分析展示了如何有效地使用try-catch语句来捕获和处理运行时错误。我们将从基础概念出发,逐步深入到高级应用技巧,旨在帮助开发者更好地理解和利用这一强大的工具,以提高代码的稳定性和可维护性。 ####
|
1月前
|
PHP 开发者 UED
PHP中的异常处理机制解析####
本文深入探讨了PHP中的异常处理机制,通过实例解析try-catch语句的用法,并对比传统错误处理方式,揭示其在提升代码健壮性与可维护性方面的优势。文章还简要介绍了自定义异常类的创建及其应用场景,为开发者提供实用的技术参考。 ####
|
1月前
|
PHP 开发者 容器
PHP命名空间深度解析及其最佳实践####
本文深入探讨了PHP中引入命名空间的重要性与实用性,通过实例讲解了如何定义、使用及别名化命名空间,旨在帮助开发者有效避免代码冲突,提升项目的模块化与可维护性。同时,文章还涉及了PHP-FIG标准,引导读者遵循最佳实践,优化代码结构,促进团队协作效率。 ####
31 1
|
1月前
|
PHP 开发者 容器
PHP命名空间深度解析:避免命名冲突与提升代码组织####
本文深入探讨了PHP中命名空间的概念、用途及最佳实践,揭示其在解决全局命名冲突、提高代码可维护性方面的重要性。通过生动实例和详尽分析,本文将帮助开发者有效利用命名空间来优化大型项目结构,确保代码的清晰与高效。 ####
32 1
|
2月前
|
监控 网络协议 网络性能优化
网络通信的核心选择:TCP与UDP协议深度解析
在网络通信领域,TCP(传输控制协议)和UDP(用户数据报协议)是两种基础且截然不同的传输层协议。它们各自的特点和适用场景对于网络工程师和开发者来说至关重要。本文将深入探讨TCP和UDP的核心区别,并分析它们在实际应用中的选择依据。
67 3
|
2月前
|
编译器 PHP 开发者
PHP 8新特性解析与实战应用####
随着PHP 8的发布,这一经典编程语言迎来了诸多令人瞩目的新特性和性能优化。本文将深入探讨PHP 8中的几个关键新功能,包括命名参数、JIT编译器、新的字符串处理函数以及错误处理改进等。通过实际代码示例,展示如何在现有项目中有效利用这些新特性来提升代码的可读性、维护性和执行效率。无论你是PHP新手还是经验丰富的开发者,本文都将为你提供实用的技术洞察和最佳实践指导。 ####
36 1
|
2月前
|
数据库连接 PHP 开发者
PHP中的异常处理机制深度解析####
本文深入探讨了PHP中异常处理的核心概念、使用场景及最佳实践,旨在帮助开发者更高效地管理和响应运行时错误。通过实例演示和理论分析,揭示try-catch块的运作原理,以及如何自定义异常类以增强代码的可读性和可维护性。文章还对比了传统错误处理方式与异常处理的优势,为读者提供了在复杂项目中实施健壮错误管理策略的指导。 ####
|
2月前
|
数据采集 JavaScript 网络安全
为什么PHP爬虫抓取失败?解析cURL常见错误原因
豆瓣电影评分是电影市场的重要参考,通过网络爬虫技术可以高效采集评分数据,帮助电影制作和发行方优化策略。本文介绍使用PHP cURL库和代理IP技术抓取豆瓣电影评分的方法,解决反爬机制、网络设置和数据解析等问题,提供详细代码示例和优化建议。
为什么PHP爬虫抓取失败?解析cURL常见错误原因
|
23天前
|
传感器
Modbus协议深入解析
Modbus协议是由Modicon公司(现施耐德电气)于1979年发明的串行通信协议,主要用于工业自动化系统中的PLC通信。本文深入解析了Modbus协议的主从模式、数据类型(线圈、离散输入、保持寄存器、输入寄存器)、帧结构和通信过程,并介绍了其应用场景和重要性。
22 0

推荐镜像

更多
下一篇
开通oss服务