4步实现状态机驱动的MQTT客户端,快速接入OneNet (1)

本文涉及的产品
性能测试 PTS,5000VUM额度
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 本文介绍了基于状态机驱动的MQTT客户端快速接入OneNet平台的实现方法,通过4步完成模块设计。文章以开源项目`Sparrow`为基础,引入`OneNetMqtt`业务模块,采用事件驱动模型和双层状态机设计,实现设备状态管理、消息处理及定时任务等功能。模块分为三层:`OneNetManager`负责核心逻辑,`OneNetDevice`管理设备信息,`OneNetDriver`处理Socket与MQTT通信。验证结果显示设备连接、数据上报及下线功能正常,稳定性良好。该设计简化了复杂条件判断,增强了系统灵活性与可扩展性,适用于实际项目参考。文末提供源码获取方式,助力读者实践与学习。

4步实现状态机驱动的MQTT客户端,快速接入OneNet (1)

[TOC]

引言

  开源项目 Sparrow 的基础框架搭建已接近完成,中间件的基础功能大多已经具备。为了验证该框架的实用性,在工程中引入了业务模块 OneNetMqtt。从模块命名可以推断其主要功能是通过 MQTT 协议连接 OneNet 平台。
  最初接触 OneNet 还是在大学期间,当时的毕业设计基于 OneNet 实现了环境数据采集系统。由于当时的个人水平限制,并未采用 MQTT协议实现,功能上体现的效果也不尽预期。现在重新构建此功能,弥补了旧时自身能力的不足,新的实现过程更为高效,连接和数据传输都相当稳定。本篇大致介绍一下功能和主要模块,后续根据需要补充。


注:文末提供本文源码获取方式。文章不定时更新,喜欢本公众号系列文章,可以星标公众号,避免遗漏干货文章。源码开源,如果对您有帮助,帮忙分享、点赞加收藏喔!

概述

  当前模块的实现引入了事件驱动模型的设计思想,由具体的事件触发,并结合当前模块对应的状态来决定执行的操作和响应的行为,确保能够根据实时情况动态调整,维持准确的运行状态。

  从业务架构的角度,OneNetMqtt 模块被划分为三层,每层专注于特定的功能:

  • OneNetManager: 作为业务管理模块,负责处理核心业务逻辑。它持有多个 OneNet 设备对象,管理设备的激活与注销,启动心跳定时器,并维护设备的状态转换等任务,以响应外部事件。
  • OneNetDevice: 设备模块。主要用于存储和管理每个设备的相关信息,确保设备数据的完整性和准确性。
  • OneNetDriver: 作为设备驱动模块。主要负责 Socket 状态的管理以及 MQTT 数据的编解码工作。这保证了与 OneNet 平台之间通信的稳定性和效率。

如下是代码结构:

Sparrow/Components/Business$ tree -L 2
.
├── CMakeLists.txt
└── OneNetMqtt
    ├── CMakeLists.txt
    ├── OneNetCommon.h
    ├── OneNetDevice.cpp
    ├── OneNetDevice.h
    ├── OneNetDriver.cpp
    ├── OneNetDriver.h
    ├── OneNetHub.cpp
    ├── OneNetHub.h
    ├── OneNetManager.cpp
    ├── OneNetManager.h
    └── main_onenet.cpp

需求分析

 通过分析物联网设备通信的特点,可以将功能需求概括为:

  1. 设备状态管理
  • 准确追踪设备的连接状态
    • Socket连接状态实时监控
    • MQTT协议状态准确跟踪
    • 支持状态查询和统计
  • 支持优雅的状态切换
    • 状态转换过程可控
    • 异常状态自动恢复
    • 状态切换日志记录
  • 异常情况自动恢复
    • 网络断开自动重连
    • 协议异常自动处理
    • 资源自动释放
  1. 消息处理机制
  • 支持异步消息处理
    • 消息队列管理
    • 优先级处理
    • 超时处理
  • 消息分发准确可靠
    • 基于状态的消息路由
    • 消息过滤机制
    • 错误消息处理
  • 便于扩展新消息类型
    • 消息类型注册机制
    • 处理函数动态绑定
    • 向后兼容支持
  1. 定时任务处理
  • 心跳保活机制
    • 可配置的心跳间隔
    • 心跳超时检测
    • 断线重连策略
  • 数据定时上报
    • 支持多种上报周期
    • 数据缓存机制
    • 失败重试机制

设计方案

 基于上述分析,以下是设计方案的大致流程:

  1. 状态机设计
  • ① 定义双层状态结构
    • 第一层负责Socket连接状态管理
    • 第二层负责MQTT协议状态管理
    • 定义状态间的转换关系
  • ② 实现状态转换表
    • 使用表驱动方式管理状态转换
    • 支持状态通配符
    • 实现状态转换回调
  • ③ 设计消息分发机制
    • 基于观察者模式的消息通知
    • 支持多接收者订阅
    • 实现消息过滤功能
  1. 设备管理实现
  • ① 设备生命周期管理
    • 创建和销毁设备实例
    • 管理设备连接状态
    • 处理设备配置信息
  • ② 资源自动回收
    • 使用智能指针管理内存
    • 定时清理无效连接
    • 释放系统资源
  • ③ 异常状态处理
    • 检测异常状态
    • 实现恢复策略
    • 记录错误信息

详细设计

主要通过状态机和观察者模式实现设备管理,详细实现如下:

  1. 双层状态定义
     首先定义清晰的状态枚举,包括Socket层和MQTT层两个维度:
// 一级状态(Socket层)
enum EOneNetMgrLev1State {
   
    LEV1_ONENET_MGR_ANY,            // 任意状态
    LEV1_ONENET_MGR_IDLE,           // 初始状态
    LEV1_ONENET_MGR_CONNECTING,     // 连接中
    LEV1_ONENET_MGR_CONNECTED,      // 已连接
    LEV1_ONENET_MGR_DISCONNECTED,   // 已断开
    LEV1_ONENET_MGR_BUTT
};

// 二级状态(MQTT层)
enum EOneNetMgrLev2State {
   
    LEV2_ONENET_MGR_ANY,            // 任意状态
    LEV2_ONENET_MGR_BUTT
};

 这种双层状态设计的优势在于:

  • 清晰分离网络层和协议层状态
  • 支持ANY状态用于通配处理
  • 便于扩展新的状态定义
  1. 状态转换表实现
     使用状态转换表来管理所有可能的状态转换,每个表项定义了:源状态、目标状态、触发消息和处理函数:
vector<StateTransition<EOneNetMgrLev1State, EOneNetMgrLev2State, ESprSigId, OneNetManager, SprMsg>> 
OneNetManager::mStateTable = {
   
    // 空闲状态处理连接请求
    {
    LEV1_ONENET_MGR_IDLE, LEV2_ONENET_MGR_ANY,
      SIG_ID_ONENET_MGR_ACTIVE_DEVICE_CONNECT,
      &OneNetManager::MsgRespondActiveDeviceConnect },

    // 已连接状态处理心跳
    {
    LEV1_ONENET_MGR_CONNECTED, LEV2_ONENET_MGR_ANY,
      SIG_ID_ONENET_MGR_PING_TIMER_EVENT,
      &OneNetManager::MsgRespondMqttPingTimerEvent },

    // ... 其他状态转换定义
};

 状态转换表的设计考虑了以下几点:

  • 使用模板实现通用状态转换结构
  • 支持状态和消息的通配匹配
  • 每个状态转换绑定具体的处理函数
  1. 智能消息分发机制
     实现了一个高效的消息分发器,能根据当前状态和消息类型自动匹配到对应的处理函数:
int32_t OneNetManager::ProcessMsg(const SprMsg& msg)
{
   
    SPR_LOGD("Recv msg: %s on <%s : %s>\n", GetSigName(msg.GetMsgId()),
              GetLev1StateString(mCurLev1State), GetLev2StateString(mCurLev2State));

    // 查找匹配的状态处理函数
    auto stateEntry = std::find_if(mStateTable.begin(), mStateTable.end(),
        [this, &msg](const StateTransitionType& entry) {
   
            return ((entry.lev1State == mCurLev1State || entry.lev1State == LEV1_ONENET_MGR_ANY) &&
                   (entry.lev2State == mCurLev2State || entry.lev2State == LEV2_ONENET_MGR_ANY) &&
                   (entry.sigId     == msg.GetMsgId() || entry.sigId     == SIG_ID_ANY));
        });

    if (stateEntry != mStateTable.end()) {
   
        (this->*(stateEntry->callback))(msg);
    }

    return 0;
}
  1. 定时器管理
     实现定时器管理机制,用于处理心跳保活和数据上报等周期性任务:
void OneNetManager::MsgRespondMqttConnAck(const SprMsg& msg)
{
   
    // 获取设备的心跳间隔
    int32_t keepAliveInSec = mOneDeviceMap[mCurActiveDevice]->GetKeepAliveIntervalInSec();
    if (keepAliveInSec <= 0) {
   
        SPR_LOGW("Invalid keep alive interval: %d, set default: %d", 
                 keepAliveInSec, DEFAULT_PING_TIMER_INTERVAL);
        keepAliveInSec = DEFAULT_PING_TIMER_INTERVAL;
    }

    // 启动定时器
    StartTimerToPingOneNet(keepAliveInSec * 1000);
    StartTimerToReportData(DEFAULT_DATA_REPORT_INTERVAL * 1000);
}

void OneNetManager::MsgRespondMqttPingTimerEvent(const SprMsg& msg)
{
   
    // 检查设备连接状态
    if (mCurLev1State != LEV1_ONENET_MGR_CONNECTED) {
   
        SPR_LOGD("Device not connect, stop ping timer\n");
        mEnablePingTimer = false;
        UnregisterTimer(SIG_ID_ONENET_MGR_PING_TIMER_EVENT);
        return;
    }

    // 发送心跳消息
    NotifyMsgToOneNetDevice(mCurActiveDevice, msg);
}
  1. 错误处理机制
     设计了统一的错误处理宏,简化错误处理代码:
#define CHECK_ONENET_RET_VALIDITY(__expr) do {      \
    int32_t __ret = (__expr);                       \
    if (__ret == -1) {                              \
        return __ret;                               \
    }                                               \
} while(0)

#define CHECK_ONENET_POINTER(__p, __err) do {                   \
    if ((__p) == nullptr) {                                     \
        SPR_LOGE("INVALID POINTER: %s is nullptr!\n", (#__p));  \
        return __err;                                           \
    }                                                           \
} while(0)
  1. 模块插件化编程机制
      首先OneNetMqtt为业务模块而非核心模块,在设计时希望将其做成可动态加载卸载的模块。即根据项目需要动态配置是否支持此业务功能。插件化编程机制在之前的文章4步实现C/C++插件化编程也有体现,这里只列举模块入口。
    ```c++
    // The entry of OneNet business plugin
    extern "C" void PluginEntry(std::map& observers, SprContext& ctx)
    {
    if (observers.find(MODULE_ONENET_DRIVER) != observers.end() && observers[MODULE_ONENET_DRIVER]) {

     SPR_LOGD("OneNet driver module has been loaded!\n");
     return;
    

    }

    if (observers.find(MODULE_ONENET_MANAGER) != observers.end() && observers[MODULE_ONENET_MANAGER]) {

     SPR_LOGD("OneNet manager module has been loaded!\n");
     return;
    

    }

    auto pOneDrv = new (std::nothrow) OneNetDriver(MODULE_ONENET_DRIVER, "OneDrv");
    auto pOneMgr = new (std::nothrow) OneNetManager(MODULE_ONENET_MANAGER, "OneMgr");
    gpOneNetHub = new (std::nothrow) OneNetHub("OneNetMqtt", pOneMgr);

    pOneDrv->Initialize();
    pOneMgr->Initialize();
    gpOneNetHub->InitializeHub();
    observers[MODULE_ONENET_DRIVER] = pOneDrv;
    observers[MODULE_ONENET_MANAGER] = pOneMgr;
    SPR_LOGD("Load plug-in OneNet modules\n");
    }

// The exit of OneNet business plugin
extern "C" void PluginExit(std::map& observers, SprContext& ctx)
{
if (gpOneNetHub) {
delete gpOneNetHub;
gpOneNetHub = nullptr;
}

auto it = observers.find(MODULE_ONENET_DRIVER);
if (it != observers.end() && it->second) {
    delete it->second;
    it->second = nullptr;
    observers.erase(it);
}

it = observers.find(MODULE_ONENET_MANAGER);
if (it != observers.end() && it->second) {
    delete it->second;
    it->second = nullptr;
    observers.erase(it);
}

SPR_LOGD("Unload plug-in OneNet modules\n");

}


## 验证
1.**验证设备成功连接**    

* 激活指令:
```shell
$ echo Active PC_TEST_02 > /tmp/sparrowsrv
  • 终端日志打印:
    12-12 22:04:37.614  78478 OneNetMgr    D:  235 OneNetManager Init
    ...
    12-12 22:04:42.179  78478 OneNetMgr    D:  464 Debug Active Device [PC_TEST_02]
    12-12 22:04:42.260  78478 OneNetMgr    D:  393 Enable report timer, interval: 10000ms
    12-12 22:05:02.260  78478 OneNetMgr    D:  408 Notify module device: PC_TEST_02, msg: SIG_ID_ONENET_MGR_DATA_REPORT_TIMER_EVENT
    
  • Onenet平台:

设备状态

设备数据

从日志上看PC_TEST_02设备已经被正常激活,且数据正常上报。OneNet平台看数据上报周期正常(10s)上报一次。

2.验证设备主动下线

  • 下线指令:

    $ echo Deactive > /tmp/sparrowsrv
    
  • 终端日志打印:

    12-12 22:10:15.630  78478 OneNetMgr    D:  472 Debug Deactive Device
    12-12 22:10:15.630  78478 OneNetMgr    D:  634 Recv msg: SIG_ID_ONENET_MGR_DEACTIVE_DEVICE_DISCONNECT on <LEV1_ONENET_MGR_CONNECTED : LEV2_ONENET_MGR_ANY>
    12-12 22:10:15.630  78478 OneNetMgr    D:  322 Lev1 state changed: LEV1_ONENET_MGR_CONNECTED -> LEV1_ONENET_MGR_DISCONNECTED
    12-12 22:10:15.630  78478 OneNetMgr    D:  408 Notify module device: PC_TEST_02, msg: SIG_ID_ONENET_MGR_DEACTIVE_DEVICE_DISCONNECT
    
  • Onenet平台:

总结

  • 事件驱动模型设计思想在嵌入式大型项目中应用广泛,它能够有效减少复杂的if-else条件判断语句,从而简化代码结构。此外,这种模型有助于清晰地表达业务逻辑,使系统行为与实际业务需求更加紧密对齐。
  • 当然在编码实现时,也不应机械套用设计思想,而应在深刻理解业务需求的基础上,灵活运用事件驱动模型的理念或模式。确保代码不仅结构清晰、逻辑严谨,还能准确表达业务流程和状态转换。
  • 在实现该业务模块时,鉴于其定位为业务模块而非核心组件,并为了探索插件化编程的优势,最终决定采用插件形式进行开发。这种方式不仅保持了系统的灵活性和可扩展性,还便于未来对该模块的独立更新和维护。
  • 通过OneNetMqtt模块的实现,验证了Sparrow中消息分发、RPC、定时器、配置以及插件化框架功能,目前看稳定性还可以,可供实际项目应用参考。
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
11月前
|
消息中间件 Java Spring
RocketMQ-JAVA客户端不同版本接入方式
RocketMQ4.0 RocketMQ5.0 JAVA接入 spring springboot
RocketMQ-JAVA客户端不同版本接入方式
|
10月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ产品使用合集之如何关闭客户端的日志记录
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
9月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
9月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
9月前
|
消息中间件 安全 PHP
消息队列 MQ使用问题之如何获取PHP客户端代码
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
8月前
|
安全 网络性能优化
MQTT 客户端 MQTT.fx 使用说明
MQTT 客户端 MQTT.fx 使用说明
703 0
|
9月前
|
消息中间件 JavaScript Linux
消息队列 MQ操作报错合集之客户端在启动时遇到了连接错误,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
9月前
|
消息中间件 存储 负载均衡
消息队列 MQ使用问题之如何在grpc客户端中设置负载均衡器
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
10月前
|
消息中间件 Java 测试技术
消息队列 MQ操作报错合集之设置了setKeepAliveInterval(1)但仍然出现客户端未连接,该怎么解决
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
156 2
|
10月前
|
消息中间件 Serverless 网络性能优化
消息队列 MQ产品使用合集之客户端和服务器之间的保活心跳检测间隔是怎么设置的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

相关产品

  • 云消息队列 MQ
  • 下一篇
    oss创建bucket