在物联网相关的应用开发中或多或少都会用到MQTT,以下这个开源项目是我基于杰杰大佬的mqttclient
项目进行二次封装的接口:
https://github.com/Yangyuanxin/EasyMqttClient
杰杰大佬的mqttclient
项目:
https://github.com/jiejieTop/mqttclient
在封装之前,通过内存泄露工具定位排查得知调用mqtt_release
和SALOF_LOG
存在一些问题,好在mqtt_release
的场景其实基本上不会用到,但还是有必要拿出来说说:
- Bug1(
platform_thread
内存泄露)
mqttclient/platform/linux/platform_thread.c
文件中的platform_thread_destroy
函数并没有对线程封装函数中的thread
所申请的内存进行释放,这样的后果是会在mqttclient
调用mqtt_release
函数的时候造成内存泄露。
- Bug2(实际待解决-可通过屏蔽宏定义解决)
mqttclient/common/log
模块存在内存泄露,可通过关闭mqtt_config.h
文件中的MQTT_LOG_IS_SALOF
解决。
- Bug3(待解决-可暂时忽略)
在执行mqtt_release时,有机率出现core dump
,但是mqtt_release
场景在一般产品开发中并不常见,除非有特殊的需要。具体详见我提出的Issues:
https://github.com/jiejieTop/mqttclient/issues/60
除此之外,mqttclient
用起来还是很爽的,不少开源项目和实际产品上都用了这套接口,非常稳定。
以下是我封装以后的接口就只有7个API,十分简单!分别是:
//MQTT初始化 EasyMqttClient_t *EasyMqttInit(EasyMqttAttr_t *Attr); //MQTT反初始化 int EasyMqttUnInit(EasyMqttClient_t *Client); //MQTT连接 int EasyMqttConnect(EasyMqttClient_t *Client); //MQTT断开连接 int EasyMqttDisConnect(EasyMqttClient_t *Client); //MQTT Topic订阅 int EasyMqttSubscribe(EasyMqttClient_t *Client, const char *Topic, enum EasyMqttQos_t Qos, void (*Cb)(const char *Topic,char* Data,unsigned short Len)); //MQTT 解除订阅 int EasyMqttUnsubscribe(EasyMqttClient_t *Client, const char *Topic); //MQTT Topic发布 int EasyMqttPublish(EasyMqttClient_t *Client, const char *Topic, enum EasyMqttQos_t Qos, char *Data, unsigned short Len);
其中EasyMqttInit函数将以下这些琐碎的过程,例如设置URL、设置端口号等过程用结构体EasyMqttAttr
封装到了一起:
typedef struct EasyMqttAttr { char *Url; char *Port; char *ClientId; char *Username; char *Password; }EasyMqttAttr_t; //...................... mqtt_set_host; mqtt_set_port; mqtt_set_client_id; mqtt_set_user_name; mqtt_set_password; mqtt_set_clean_session; //调用的时候很简单 //1. //2.定义一个结构体变量 example: EasyMqttClient_t *Client = NULL; EasyMqttAttr_t Attr = { .Url = "192.168.4.248", .Port = "30157", .ClientId = "EasyMqttMqtt", .Username = "EasyMqtt", .Password = "123456" }; //3.调用EasyMqttInit函数 Client = EasyMqttInit(Client, &Attr); //to do //实现你的MQTT连接、订阅、分布等逻辑 //to do end
另外,它还实现了对不同订阅Topic的回调函数进行分开处理,让开发的逻辑更加清晰,也易于调试和解决问题,这个实现的机制是基于一个结构体数组来实现的,如下所示:
struct TopicHandler_t { //Topic const char *Topic; //Topic对应的回调函数 void (*CallBack)(const char *Topic,char* Data,unsigned short Len); }; //结构体数组表,最大支持处理Topic的个数为MAX_TOPIC,该值默认为64 struct TopicHandler_t Table[MAX_TOPIC];
当调用EasyMqttSubscribe
Topic订阅函数订阅一个Topic时,就会将这个Topic和它的回调添加到这个表里。当mqttclient
接收到不同的Topic时,则会查表调用不同Topic所对应的回调函数,具体逻辑如下所示:
//Topic回调触发 static void TopicHandlerCallBack(void* client, message_data_t* Msg) { (void)client; int Index = 0; char *Topic = Msg->topic_name; unsigned short Len = Msg->message->payloadlen; char *Data = (char *)Msg->message->payload; //上锁 pthread_mutex_lock(&Mutex); //当接收到不同的Topic时,根据Topic找到对应的回调函数并进行调用 for(Index = 0; Index < sizeof(Table)/sizeof(Table[0]); Index++) { if(0 == strcmp(Msg->topic_name,Table[Index].Topic)) { Table[Index].CallBack(Topic,Data,Len); break; } } //解锁 pthread_mutex_unlock(&Mutex); } //EasyMqttSubscribe Topic订阅函数 int EasyMqttSubscribe(EasyMqttClient_t *Client, const char *Topic, enum EasyMqttQos_t Qos, void (*Cb)(const char *Topic, char* Data, unsigned short Len)) { if(Index > MAX_TOPIC-1) { printf("Exceeds the maximum number of topics set:%d!\n", Index); return -1; } Table[Index].Topic = Topic; Table[Index].CallBack = Cb; Index++; return mqtt_subscribe(Client, Topic, (mqtt_qos_t)Qos, TopicHandlerCallBack); }
具体使用方法可参考EasyMqtt.c
中的EasyMqttTest
函数。目前该项目仅在Linux项目上测试通过,后续将在不同的RTOS环境下进行测试。欢迎持续关注,也欢迎提Pr,共同让嵌入式MQTT应用开发变得更简单。
在Linux环境下使用本项目:
- 1、克隆本项目
git clone https://github.com/Yangyuanxin/EasyMqttClient.git
- 2、修改交叉编译工具链(默认为gcc)
如果你希望在嵌入式平台运行,则需要修改Makefile里的:
CROSS_COMPILE =
否则默认以gcc环境编译。
- 3、编译
make
- 4、执行
./a.out
其它环境:待测试。
往期精彩
开源:AliOS_Things_Developer_Kit开发板复活计划