离线消息使用场景
- 场景一:客户端本身对离线消息的优先级比较低,只要保证最终能处理就可以。
- 场景二:客户端对于离线消息需要有限处理,且要求比较实时。
针对场景一,因为 MQTT 默认的工作模式即可支持,客户端上线后,离线消息不会立即推送,是按照固定时间间隔,固定数量的方式推送。这种工作模式的好处是可以降低客户端上线时的压力,优先处理在线消息,离线消息只要保证最终能处理即可。该模式不需要任何设定。
针对场景二,因为客户端需要自己控制离线消息优先处理,所以 MQTT 提供了主动拉的模式来供客户端获取离线消息。即在客户端上线后,客户端自己调用接口来拉取自己所需的指定数量的消息。此模式下,客户端自己控制拉取的时间间隔和条数。
主动拉取离线消息使用说明
具体步骤如下:
- 客户端启动后,以控制消息的形式发起拉取消息的指令,设置拉取条数和顺序。
- 客户端等待本地处理成功。
- 继续拉取下一批消息。
注意事项
- 客户端如果需要使用主动模式,请务必在连接建立后的第一个心跳周期内发起请求,否则系统会按照自动模式,即按照固定周期推送离线数据。
- 客户端每次最多拉取消息的数量为30条,客户端发起拉取请求的最大频率限制为5次/秒。
- 客户端需要自己控制拉取时机,因为消息从发起指令到推送到客户端,到客户端消费完成回应ACK都是异步过程。如果客户端拉取过快,很有可能拉到前一批还没有删除的消息,造成重复;或者拉取到重复的消息,因为前一次的消息还没有回复ACK。
拉取离线消息相关API
拉取离线消息:
发送 Topic:$SYS/getOfflineMsg
内容:JSONString
内容信息:
返回值
普通的 PubAck 报文。
示例程序
MQTT客户端使用 Demo
- public void testOfflineMsg() throws Exception {
- String broker = "tcp://XXXX:1883";
- String clientId = "GID_XXX@@@XXXX";
- final String topic = "XXXX/11";
- final String topicFilter[] = {topic};
- final int qos[] = {1};
- MemoryPersistence persistence = new MemoryPersistence();
- try {
- final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
- final MqttConnectOptions connOpts = new MqttConnectOptions();
- System.out.println("Connecting to broker: " + broker);
- connOpts.setServerURIs(new String[]{broker});
- connOpts.setCleanSession(false);
- connOpts.setAutomaticReconnect(true);
- /**
- * 客户端长链接需要设置心跳实际,建议100s以下,超时,服务端会断开连接
- */
- connOpts.setKeepAliveInterval(90);
- sampleClient.setCallback(new MqttCallbackExtended() {
- public void connectComplete(boolean reconnect, String serverURI) {
- System.out.println("connect success");
- sampleClient.subscribe(topicFilter, qos);
- }
- public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
- System.out.println("recv Msg from " + topic);
- }
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
- }
- });
- sampleClient.connect(connOpts);
- JSONObject object = new JSONObject();
- object.put("maxPushNum", 20);
- object.put("pushOrder", "DESC");
- sampleClient.publish("$SYS/getOfflineMsg", new MqttMessage(object.toJSONString().getBytes()));
- Thread.sleep(1000000);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }