一套极简的MQTT使用接口EasyMqttClient

简介: 一套极简的MQTT使用接口EasyMqttClient

在物联网相关的应用开发中或多或少都会用到MQTT,以下这个开源项目是我基于杰杰大佬的mqttclient项目进行二次封装的接口:

https://github.com/Yangyuanxin/EasyMqttClient

640.png

杰杰大佬的mqttclient项目:

https://github.com/jiejieTop/mqttclient

640.png

在封装之前,通过内存泄露工具定位排查得知调用mqtt_releaseSALOF_LOG存在一些问题,好在mqtt_release的场景其实基本上不会用到,但还是有必要拿出来说说:


  • Bug1(platform_thread内存泄露)


mqttclient/platform/linux/platform_thread.c文件中的platform_thread_destroy函数并没有对线程封装函数中的thread所申请的内存进行释放,这样的后果是会在mqttclient调用mqtt_release函数的时候造成内存泄露。


  • Bug2(实际待解决-可通过屏蔽宏定义解决)


mqttclient/common/log模块存在内存泄露,可通过关闭mqtt_config.h文件中的MQTT_LOG_IS_SALOF解决。


  • Bug3(待解决-可暂时忽略)


在执行mqtt_release时,有机率出现core dump,但是mqtt_release场景在一般产品开发中并不常见,除非有特殊的需要。具体详见我提出的Issues:

https://github.com/jiejieTop/mqttclient/issues/60

除此之外,mqttclient用起来还是很爽的,不少开源项目和实际产品上都用了这套接口,非常稳定。


以下是我封装以后的接口就只有7个API,十分简单!分别是:

//MQTT初始化
EasyMqttClient_t *EasyMqttInit(EasyMqttAttr_t *Attr);
//MQTT反初始化
int EasyMqttUnInit(EasyMqttClient_t *Client);
//MQTT连接
int EasyMqttConnect(EasyMqttClient_t *Client);
//MQTT断开连接
int EasyMqttDisConnect(EasyMqttClient_t *Client);
//MQTT Topic订阅
int EasyMqttSubscribe(EasyMqttClient_t *Client, const char *Topic, enum EasyMqttQos_t Qos, 
                    void (*Cb)(const char *Topic,char* Data,unsigned short Len));
//MQTT 解除订阅
int EasyMqttUnsubscribe(EasyMqttClient_t *Client, const char *Topic);
//MQTT Topic发布
int EasyMqttPublish(EasyMqttClient_t *Client, const char *Topic, enum EasyMqttQos_t Qos, char *Data, unsigned short Len);

其中EasyMqttInit函数将以下这些琐碎的过程,例如设置URL、设置端口号等过程用结构体EasyMqttAttr封装到了一起:

typedef struct EasyMqttAttr
{
    char *Url;
    char *Port;
    char *ClientId;
    char *Username;
    char *Password;
}EasyMqttAttr_t;
//......................
mqtt_set_host;
mqtt_set_port;
mqtt_set_client_id;
mqtt_set_user_name;
mqtt_set_password;
mqtt_set_clean_session;
//调用的时候很简单
//1.
//2.定义一个结构体变量
example: 
EasyMqttClient_t *Client = NULL;
EasyMqttAttr_t Attr = 
{
   .Url      = "192.168.4.248",
   .Port     = "30157",
   .ClientId = "EasyMqttMqtt",
   .Username = "EasyMqtt",
   .Password = "123456"
};
//3.调用EasyMqttInit函数
Client = EasyMqttInit(Client, &Attr);
//to do
//实现你的MQTT连接、订阅、分布等逻辑
//to do end

另外,它还实现了对不同订阅Topic的回调函数进行分开处理,让开发的逻辑更加清晰,也易于调试和解决问题,这个实现的机制是基于一个结构体数组来实现的,如下所示:

struct TopicHandler_t
{
    //Topic
    const char *Topic;
    //Topic对应的回调函数
    void (*CallBack)(const char *Topic,char* Data,unsigned short Len);
};
//结构体数组表,最大支持处理Topic的个数为MAX_TOPIC,该值默认为64
struct TopicHandler_t Table[MAX_TOPIC];

当调用EasyMqttSubscribe Topic订阅函数订阅一个Topic时,就会将这个Topic和它的回调添加到这个表里。当mqttclient接收到不同的Topic时,则会查表调用不同Topic所对应的回调函数,具体逻辑如下所示:

//Topic回调触发
static void TopicHandlerCallBack(void* client, message_data_t* Msg)
{
    (void)client;
    int Index = 0;
    char *Topic = Msg->topic_name;
    unsigned short Len = Msg->message->payloadlen;
    char *Data  = (char *)Msg->message->payload;
    //上锁
    pthread_mutex_lock(&Mutex);
    //当接收到不同的Topic时,根据Topic找到对应的回调函数并进行调用
    for(Index = 0; Index < sizeof(Table)/sizeof(Table[0]); Index++)
    {
        if(0 == strcmp(Msg->topic_name,Table[Index].Topic))
        {
            Table[Index].CallBack(Topic,Data,Len);
            break;
        }
    }
    //解锁
    pthread_mutex_unlock(&Mutex);
}
//EasyMqttSubscribe Topic订阅函数
int EasyMqttSubscribe(EasyMqttClient_t *Client, const char *Topic, enum EasyMqttQos_t Qos, 
                    void (*Cb)(const char *Topic, char* Data, unsigned short Len))
{
    if(Index > MAX_TOPIC-1)
    {
        printf("Exceeds the maximum number of topics set:%d!\n", Index);
        return -1;
    }
    Table[Index].Topic = Topic;
    Table[Index].CallBack = Cb;
    Index++;
    return mqtt_subscribe(Client, Topic, (mqtt_qos_t)Qos, TopicHandlerCallBack);
}

具体使用方法可参考EasyMqtt.c中的EasyMqttTest函数。目前该项目仅在Linux项目上测试通过,后续将在不同的RTOS环境下进行测试。欢迎持续关注,也欢迎提Pr,共同让嵌入式MQTT应用开发变得更简单。


在Linux环境下使用本项目:


  • 1、克隆本项目
git clone https://github.com/Yangyuanxin/EasyMqttClient.git
  • 2、修改交叉编译工具链(默认为gcc)


如果你希望在嵌入式平台运行,则需要修改Makefile里的:

CROSS_COMPILE =

否则默认以gcc环境编译。


  • 3、编译
make
  • 4、执行
./a.out

其它环境:待测试。

往期精彩

基于HarmonyOS项目的手把手开源教程


分享GitHub上一些嵌入式相关的高星开源项目


开源:AliOS_Things_Developer_Kit开发板复活计划


一个超棒的开源解读项目【Linux内核揭秘】,一定不要错过啦!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
安全 物联网 测试技术
C++ 构建通用的MQTT接口:从理论到实践
C++ 构建通用的MQTT接口:从理论到实践
523 2
|
7天前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7天前
|
消息中间件 网络协议 RocketMQ
消息队列 MQ产品使用合集之broker开启proxy,启动之后producer生产消息始终都只到一个broker,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7天前
|
消息中间件 网络安全 开发工具
消息队列 MQ产品使用合集之使用grpc proxy,生产者心跳并没有发送至Default中,如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7天前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ产品使用合集之如何关闭客户端的日志记录
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7天前
|
消息中间件 监控 Oracle
消息队列 MQ产品使用合集之启动Namesrv节点时,遇到报错,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7天前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7天前
|
消息中间件 监控 Java
消息队列 MQ产品使用合集之如何查看推送是否被限制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7天前
|
消息中间件 存储 开发工具
消息队列 MQ产品使用合集之C++如何使用Paho MQTT库进行连接、发布和订阅消息
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4天前
|
消息中间件
RabbitMQ 死信消息队列 重复消费 basicAck basicNack
RabbitMQ 死信消息队列 重复消费 basicAck basicNack