开发者社区> 问答> 正文

MQTT 获取离线消息怎么制作?



离线消息使用场景

  • 场景一:客户端本身对离线消息的优先级比较低,只要保证最终能处理就可以。
  • 场景二:客户端对于离线消息需要有限处理,且要求比较实时。

针对场景一,因为 MQTT 默认的工作模式即可支持,客户端上线后,离线消息不会立即推送,是按照固定时间间隔,固定数量的方式推送。这种工作模式的好处是可以降低客户端上线时的压力,优先处理在线消息,离线消息只要保证最终能处理即可。该模式不需要任何设定。
针对场景二,因为客户端需要自己控制离线消息优先处理,所以 MQTT 提供了主动拉的模式来供客户端获取离线消息。即在客户端上线后,客户端自己调用接口来拉取自己所需的指定数量的消息。此模式下,客户端自己控制拉取的时间间隔和条数。

主动拉取离线消息使用说明


具体步骤如下:
  1. 客户端启动后,以控制消息的形式发起拉取消息的指令,设置拉取条数和顺序。
  2. 客户端等待本地处理成功。
  3. 继续拉取下一批消息。


注意事项

  • 客户端如果需要使用主动模式,请务必在连接建立后的第一个心跳周期内发起请求,否则系统会按照自动模式,即按照固定周期推送离线数据。
  • 客户端每次最多拉取消息的数量为30条,客户端发起拉取请求的最大频率限制为5次/秒。
  • 客户端需要自己控制拉取时机,因为消息从发起指令到推送到客户端,到客户端消费完成回应ACK都是异步过程。如果客户端拉取过快,很有可能拉到前一批还没有删除的消息,造成重复;或者拉取到重复的消息,因为前一次的消息还没有回复ACK。


拉取离线消息相关API


拉取离线消息:
发送 Topic:$SYS/getOfflineMsg
内容:JSONString
内容信息:
名称类型说明
pushOrderString“DESC”或者”ASC”,分别代表从最新消息拉取还是从最早消息拉取
maxPushNumInteger一次最多拉取消息的条数,设置范围为1-30,超过会以上限计算

返回值
普通的 PubAck 报文。

示例程序



MQTT客户端使用 Demo

  1. public void testOfflineMsg() throws Exception {
  2.         String broker = "tcp://XXXX:1883";
  3.         String clientId = "GID_XXX@@@XXXX";
  4.         final String topic = "XXXX/11";
  5.         final String topicFilter[] = {topic};
  6.         final int qos[] = {1};
  7.         MemoryPersistence persistence = new MemoryPersistence();
  8.         try {
  9.             final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  10.             final MqttConnectOptions connOpts = new MqttConnectOptions();
  11.             System.out.println("Connecting to broker: " + broker);
  12.             connOpts.setServerURIs(new String[]{broker});
  13.             connOpts.setCleanSession(false);
  14.             connOpts.setAutomaticReconnect(true);
  15.             /**
  16.              * 客户端长链接需要设置心跳实际,建议100s以下,超时,服务端会断开连接
  17.              */
  18.             connOpts.setKeepAliveInterval(90);
  19.             sampleClient.setCallback(new MqttCallbackExtended() {
  20.                 public void connectComplete(boolean reconnect, String serverURI) {
  21.                     System.out.println("connect success");
  22.                       sampleClient.subscribe(topicFilter, qos);
  23.                 }
  24.                 public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  25.                     System.out.println("recv Msg from " + topic);
  26.                 }
  27.                 public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  28.                     System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
  29.                 }
  30.             });
  31.             sampleClient.connect(connOpts);
  32.             JSONObject object = new JSONObject();
  33.             object.put("maxPushNum", 20);
  34.             object.put("pushOrder", "DESC");
  35.             sampleClient.publish("$SYS/getOfflineMsg", new MqttMessage(object.toJSONString().getBytes()));
  36.             Thread.sleep(1000000);
  37.         } catch (Exception e) {
  38.             e.printStackTrace();
  39.         }
  40.     }

展开
收起
猫饭先生 2017-10-27 10:38:33 4069 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
RocketMQ Client-GO 介绍 立即下载
RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载