MQTT是机器对机器(M2M)/物联网(IoT)连接协议。它被设计为一个极其轻量级的发布/订阅消息传输协议。对于需要较小代码占用空间和/或网络带宽非常宝贵的远程连接非常有用,是专为受限设备和低带宽、高延迟或不可靠的网络而设计。这些原则也使该协议成为新兴的“机器到机器”(M2M)或物联网(IoT)世界的连接设备,以及带宽和电池功率非常高的移动应用的理想选择。例如,它已被用于通过卫星链路与代理通信的传感器、与医疗服务提供者的拨号连接,以及一系列家庭自动化和小型设备场景。它也是移动应用的理想选择,因为它体积小,功耗低,数据包最小,并且可以有效地将信息分配给一个或多个接收器。
添加MQTT依赖库
3.1的库:https://pub.flutter-io.cn/packages/mqtt_client
5.0的库:https://pub.flutter-io.cn/packages/mqtt5_client
我们使用3.1的库
dependencies: flutter: sdk: flutter flutter_localizations: sdk: flutter ... mqtt_client: ^9.6.3
使用MQTT前,最好先熟悉一下MQTT相关的概念,比如Topic、clean session、qos、retain。
配置
创建
/// 服务器地址,这里使用EMQ提供的免费mqtt测试服务器。 /// https://www.emqx.com/zh/mqtt/public-mqtt5-broker final String brokerUrl = "broker-cn.emqx.io"; /// 设备id final String clientId = "deviced_111"; /// 端口号 final int port = 1883; /// 创建Mqtt实例 var _client = MqttServerClient.withPort(brokerUrl, clientId, port);
参数配置
/// 是否打印mqtt日志信息 _client.logging(on: true); /// 设置端口号。创建时已经指定端口号就不需要设置。 _client.port = port; /// 设置协议版本,默认是3.1,根据服务器需要的版本来设置 /// _client.setProtocolV31(); _client.setProtocolV311(); /// 保持连接ping-pong周期。默认不设置时关闭。 _client.keepAlivePeriod = 60; /// 连接成功回调 _client.onConnected = onConnected; /// 连接断开回调 _client.onDisconnected = onDisconnected; /// 取消订阅回调 _client.onUnsubscribed = onUnsubscribed; /// 订阅成功回调 _client.onSubscribed = onSubscribed; /// 订阅失败回调 _client.onSubscribeFail = onSubscribeFail; /// ping pong响应回调 _client.pongCallback = pong; void onConnected() { print("连接成功...."); } void onDisconnected() { print("连接断开"); } void onUnsubscribed(String topic) { print("取消订阅 $topic"); } void onSubscribed(String topic) { print("订阅 $topic 成功"); } void onSubscribeFail(String topic) { print("订阅主题: $topic 失败"); } void pong() { print("Ping的响应"); }
SSL证书配置
如果服务器需要使用ssl,我们需要提前下载好证书文件,放在assets目录中。
_configSSL() async{ /// 证书路径 var certPath = "assets/xxxx.cer"; /// 开启安全设置 _client.secure = true; /// 创建SecurityContext final mqttContext = SecurityContext.defaultContext; /// 加载SSL证书 var byteData = await rootBundle.load(certPath); /// 将证书的buffer数据添加到context中 mqttContext.setClientAuthoritiesBytes(byteData.buffer.asUint8List()); /// client指定context. _client.securityContext = mqttContext; }
连接设置
/// 创建连接配置 final connMessage = MqttConnectMessage() /// 设置wiil遗嘱主题 .withWillTopic("willTopic") /// 设置will遗嘱消息 .withWillMessage("willMessage") /// 开启Clean Session.每次连接都是一个新的Session. .startClean() /// 设置will消息的qos模式。 .withWillQos(MqttQos.atLeastOnce); _client.connectionMessage = connMessage;
连接
_connect() async { try{ _client.connect(); }catch (e) { print("连接异常: $e"); _client.disconnect(); } } /// 检查连接状态 if (_client.connectionStatus.state == MqttConnectionState.connected) { print("已经连接...."); } else { print("连接失败啦 状态===${_client.connectionStatus}"); _client.disconnect(); return; } /// 连接成功,可以继续操作 print("连接成功.....");
发送和接收消息
接收主题消息
订阅
想要接收消息,必须先订阅对应的主题
/// 需要订阅的主题 var scribeTopic = "topic/test"; 订阅主题,并设置qos _client.subscribe(scribeTopic, MqttQos.atLeastOnce);
监听消息
MqttClient.updates
_client.updates.listen((event) { var recvMessage = event[0].payload as MqttPublishMessage; /// 二进制格式的消息 bytes = recvMessage.payload.message; /// 字符串格式的消息 message = Utf8Decoder().convert(recvMessage.payload.message); print("原始数据-----:${recvMessage.payload.message}"); /// 转换成字符串 print( "接收到了主题${event[0].topic}的消息: ${Utf8Decoder().convert(recvMessage.payload.message)}"); });
发送消息
使用MqttClientPayloadBuilder
构造消息内容,然后使用MqttServerClient.publishMessage()
发送消息。
发送明文
var builder = MqttClientPayloadBuilder(); builder.addUTF8String("This is a message"); _client.publishMessage(topic, MqttQos.atLeastOnce, builder.payload);
发送字节数组
var builder = MqttClientPayloadBuilder(); var data = [11,12,103,33]; builder.payload.addAll(data); _client.publishMessage("消息的主题", MqttQos.atLeastOnce, builder.payload);
发送JSON数据
var builder = MqttClientPayloadBuilder(); var data = [11,12,103,33]; var jsonData = json.encode({"temperature": 23.5, "humidity": 32}); builder.addUTF8String(jsonData); _client.publishMessage("消息的主题", MqttQos.atLeastOnce, builder.payload);
关闭
使用disconnect
关闭连接,一般在dispose中关闭。
@override void dispose() { super.dispose(); if (_client != null) { _client.disconnect(); _client = null; } }