本文介绍 MQTT 客户端上传 Token、监听 Token 失效信息、监听 Token 非法信息三个接口的使用,并给出相关示例代码以供参考。
客户端上传 Token 凭证
发送 Topic:$SYS/uploadToken
内容:JSONString
内容信息:
返回值
普通的 PubAck 报文。客户端必须等到该响应才能进行下一步 Pub 或者 Sub 操作,否则服务端可能会鉴权失败导致连接断开。
客户端监听即将失效的 Token 信息
接收 Topic:$SYS/tokenExpireNotice
内容:JSONString
内容信息:
响应:
客户端收到 Token 即将失效的消息后,需要尽快处理重新申请 Token 的动作,以免造成收发消息失败。
客户端监听 Token 非法的通知
接收 Topic:$SYS/tokenInvalidNotice
内容:JSONString
内容信息:
响应:
服务端校验 Token 失效,会导致鉴权失败,服务端会主动断开链接。断开链接之前,服务端会给客户端推送失败的 code,客户端根据 code 即可判断原因。
注意:
- 服务端下推 Token 非法的通知后,会接着断开连接。正常情况下客户端应该能根据该通知判断是否需要申请 Token。
- 异常情况下,可能尚未收到通知,即已经断开连接。此时客户端需要判断原先的 Token 是否到期,如果确实接近到期时间,则先申请 Token 再重连。
示例程序
- public static void main(String[] args) throws MqttException, InterruptedException {
- String broker = "tcp://XXXX:1883";
- String clientId = "GID_XXX@@@XXXX";
- final String topic = "XXXX/XXXXX";
- MemoryPersistence persistence = new MemoryPersistence();
- final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
- final MqttConnectOptions connOpts = new MqttConnectOptions();
- System.out.println("Connecting to broker: " + broker);
- connOpts.setServerURIs(new String[] {broker});
- connOpts.setCleanSession(true);
- connOpts.setKeepAliveInterval(90);
- sampleClient.setCallback(new MqttCallbackExtended() {
- @Override
- public void connectComplete(boolean reconnect, String serverURI) {
- //连接成功,需重新上传Token
- JSONObject object = new JSONObject();
- object.put("token", "XXXX");//设置Token内容
- object.put("type", "RW");//设置Token类型,共有RW,R,W三种类型
- MqttMessage message = new MqttMessage(object.toJSONString().getBytes());
- message.setQos(1);
- try {
- sampleClient.publish("$SYS/uploadToken", message);
- sampleClient.subscribe(topic, 0);
- } catch (MqttException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void connectionLost(Throwable throwable) {
- System.out.println("mqtt connection lost");
- throwable.printStackTrace();
- }
- @Override
- public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
- if (topic.equals("$SYS/tokenInvalidNotice")) {
- //Token非法的通知
- } else if (topic.equals("$SYS/tokenExpireNotice")) {
- //Token即将失效的通知
- }
- }
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
- }
- });
- sampleClient.connect(connOpts);
- JSONObject object = new JSONObject();
- object.put("token", "XXXX");//设置Token内容
- object.put("type", "RW");//设置Token类型,共有RW,R,W三种类型
- MqttMessage message = new MqttMessage(object.toJSONString().getBytes());
- message.setQos(1);
- MqttTopic pubTopic = sampleClient.getTopic("$SYS/uploadToken");
- MqttDeliveryToken token = pubTopic.publish(message);
- token.waitForCompletion();//同步等待上传结果
- //Token上传成功,在开始正常的业务消息收发
- sampleClient.subscribe(topic, 0);
- while (true) {
- sampleClient.publish(topic, message);
- Thread.sleep(5000);
- }
- }