注意:
本文给出的实例均基于 Eclipse Paho C SDK 实现,该 SDK 下载请参见
MQTT 接入准备。如使用其他第三方的客户端,请适当修改。
1. 资源申请
使用 MQ 提供的 MQTT 服务,首先需要核实应用中使用的 Topic 资源是否已经申请,如果没有请先去控制台申请 Topic,Group ID 等资源。
申请资源时需要根据需求选择对应的 Region,例如,MQTT 需要使用华北 2 的接入点,那么 Topic 等资源就在华北 2 申请,资源申请具体请参见
申请 MQ 资源。
注意:MQTT 使用的多级子 Topic 不需要申请,代码里直接使用即可,没有限制。
2. C 客户端依赖
Paho MQTT C 客户端 SDK 从官网下载源码后,参考文档进行编译安装。可能需要安装以下开发库和工具。
- openssl:用于签名计算。
- cmake:用于编译示例程序,实际也可以根据习惯选用其他编译工具。
3. MQTT 收发消息
本段示例代码演示如何使用 C 客户端收发 MQTT 消息,其中编译工具使用CMake。
CMakeLists.txt 文件
- cmake_minimum_required(VERSION 3.7)
- project(mqttdemo)
- INCLUDE_DIRECTORIES(
- .
- ${CMAKE_SOURCE_DIR}/src
- ${CMAKE_BINARY_DIR}
- )
- SET(CMAKE_BUILD_TYPE "Debug")
- SET(CMAKE_CXX_FLAGS_DEBUG "$ENV{CXXFLAGS} -O0 -Wall -g2 -ggdb")
- SET(CMAKE_CXX_FLAGS_RELEASE "$ENV{CXXFLAGS} -O3 -Wall")
- set(CMAKE_C_STANDARD 99)
- add_executable(mqttDemo mqttDemo.c)
- TARGET_LINK_LIBRARIES(mqttDemo crypto)
- TARGET_LINK_LIBRARIES(mqttDemo paho-mqtt3a)
收发消息程序:
- #include "MQTTAsync.h"
- #include <signal.h>
- #include <memory.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <openssl/hmac.h>
- #include <openssl/bio.h>
- volatile int connected = 0;
- char *topic;
- char *userName;
- char *passWord;
- int messageDeliveryComplete(void *context, MQTTAsync_token token) {
- /* not expecting any messages */
- printf("send message %d success\n", token);
- return 1;
- }
- int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *m) {
- /* not expecting any messages */
- printf("recv message from %s ,body is %s\n", topicName, (char *) m->payload);
- return 1;
- }
- void onConnectFailure(void *context, MQTTAsync_failureData *response) {
- connected = 0;
- printf("connect failed, rc %d\n", response ? response->code : -1);
- MQTTAsync client = (MQTTAsync) context;
- }
- void onSubcribe(void *context, MQTTAsync_successData *response) {
- printf("subscribe success \n");
- }
- void onConnect(void *context, MQTTAsync_successData *response) {
- connected = 1;
- printf("connect success \n");
- MQTTAsync client = (MQTTAsync) context;
- //do sub when connect success
- MQTTAsync_responseOptions sub_opts = MQTTAsync_responseOptions_initializer;
- sub_opts.onSuccess = onSubcribe;
- int rc = 0;
- if ((rc = MQTTAsync_subscribe(client, topic, 1, ⊂_opts)) != MQTTASYNC_SUCCESS) {
- printf("Failed to subscribe, return code %d\n", rc);
- }
- }
- void onDisconnect(void *context, MQTTAsync_successData *response) {
- connected = 0;
- printf("connect lost \n");
- }
- void onPublishFailure(void *context, MQTTAsync_failureData *response) {
- printf("Publish failed, rc %d\n", response ? -1 : response->code);
- }
- int success = 0;
- void onPublish(void *context, MQTTAsync_successData *response) {
- printf("send success %d\n", ++success);
- }
- void connectionLost(void *context, char *cause) {
- connected = 0;
- MQTTAsync client = (MQTTAsync) context;
- MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
- int rc = 0;
- printf("Connecting\n");
- conn_opts.keepAliveInterval = 10;
- conn_opts.cleansession = 1;
- conn_opts.username = userName;
- conn_opts.password = passWord;
- conn_opts.onSuccess = onConnect;
- conn_opts.onFailure = onConnectFailure;
- conn_opts.context = client;
- conn_opts.ssl = NULL;
- if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
- printf("Failed to start connect, return code %d\n", rc);
- exit(EXIT_FAILURE);
- }
- }
- int main(int argc, char **argv) {
- MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
- MQTTAsync client;
- //MQTT使用的Topic,其中第一级父Topic需要在MQ控制台先申请
- topic = "XXXXX";
- //MQTT的接入域名,在MQ控制台购买付费实例后即分配该域名
- char *host = "XXX.mqtt.aliyuncs.com";
- //MQTT客户端分组Id,需要先在MQ控制台申请
- char *groupId = "GID_XXXXXX";
- //MQTT客户端设备Id,用户自行生成,需要保证对于所有TCP连接全局唯一
- char *deviceId = "XXXXXXX";
- //帐号AK
- char *accessKey = "XXXXXXX";
- //帐号SK
- char *secretKey = "XXXXXXX";
- //服务端口,使用MQTT协议时设置1883,其他协议参考文档选择合适端口
- int port = 1883;
- //QoS,消息传输级别,参考文档选择合适的值
- int qos = 1;
- //cleanSession,是否设置持久会话,如果需要离线消息则cleanSession必须是false,QoS必须是1
- int cleanSession = 0;
- int rc = 0;
- char tempData[100];
- int len = 0;
- HMAC(EVP_sha1(), secretKey, strlen(secretKey), groupId, strlen(groupId), tempData, &len);
- char resultData[100];
- int passWordLen = EVP_EncodeBlock((unsigned char *) resultData, tempData, len);
- resultData[passWordLen] = '\0';
- printf("passWord is %s", resultData);
- userName = accessKey;
- passWord = resultData;
- //1.create client
- MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
- create_opts.sendWhileDisconnected = 0;
- create_opts.maxBufferedMessages = 10;
- char url[100];
- sprintf(url, "%s:%d", host, port);
- char clientIdUrl[64];
- sprintf(clientIdUrl, "%s@@@%s", groupId, deviceId);
- rc = MQTTAsync_createWithOptions(&client, url, clientIdUrl, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
- rc = MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, NULL);
- //2.connect to server
- MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
- conn_opts.keepAliveInterval = 90;
- conn_opts.cleansession = cleanSession;
- conn_opts.username = userName;
- conn_opts.password = passWord;
- conn_opts.onSuccess = onConnect;
- conn_opts.onFailure = onConnectFailure;
- conn_opts.context = client;
- conn_opts.ssl = NULL;
- conn_opts.automaticReconnect = 1;
- conn_opts.connectTimeout = 3;
- if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
- printf("Failed to start connect, return code %d\n", rc);
- exit(EXIT_FAILURE);
- }
- //3.publish msg
- MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
- pub_opts.onSuccess = onPublish;
- pub_opts.onFailure = onPublishFailure;
- for (int i = 0; i < 1000; i++) {
- do {
- char data[100];
- sprintf(data, "hello mqtt demo");
- rc = MQTTAsync_send(client, topic, strlen(data), data, qos, 0, &pub_opts);
- sleep(1);
- } while (rc != MQTTASYNC_SUCCESS);
- }
- sleep(1000);
- disc_opts.onSuccess = onDisconnect;
- if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) {
- printf("Failed to start disconnect, return code %d\n", rc);
- exit(EXIT_FAILURE);
- }
- while (connected)
- sleep(1);
- MQTTAsync_destroy(&client);
- return EXIT_SUCCESS;
- }