开发者社区 问答 正文

即使timeToWait设置为1000毫秒,同步Paho MQTTClient仍挂在发布上

我们有一个Spring Boot应用程序,可将IoT传感器数据发布到AWS IoT。在负载下,我们的阻塞MqttClient无限期阻塞:

"taskScheduler-72" - Thread t@313
    java.lang.Thread.State: WAITING
    at java.lang.Object.wait(Native Method)
    - waiting on <7e7ec8a6> (a java.lang.Object)
    at java.lang.Object.wait(Object.java:502)
    at org.eclipse.paho.client.mqttv3.internal.Token.waitForResponse(Token.java:143)
    at org.eclipse.paho.client.mqttv3.internal.Token.waitForCompletion(Token.java:108)
    at org.eclipse.paho.client.mqttv3.MqttToken.waitForCompletion(MqttToken.java:67)
    at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:570)
    at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:562)
    at 
    ...

我们正在使用QOS 1:

try {
        mqttClient.publish(topic, payload, 1, false);
    } catch (MqttException e) {
        throw new PublishingException(e);
    }

我们正在使用spring-integration-mqtt:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>4.2.1.RELEASE</version>
  </dependency>

我尝试将paho客户端更新为最新版本:

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.2</version>
  <scope>compile</scope>
</dependency>

并设置timeToWait,据我理解,如果发布请求在1000毫秒内未收到确认,则应该抛出MqttException:

mqttClient.setTimeToWait(1000);

为什么timeToWait不起作用?我们应该切换到AsyncMqttClient吗?

展开
收起
垚tutu 2019-12-12 09:21:40 2498 分享 版权
阿里云 AI 助理回答

在您的场景中,timeToWait 参数设置理论上应该限制等待消息确认的时间,但请注意,这个参数仅在同步操作模式下有效,并且它定义的是客户端在断开连接之前等待响应的最长时间。如果在设置了 timeToWait 后仍然遇到阻塞问题,可能的原因有:

  1. 版本兼容性问题:尽管您已经尝试更新了 Paho 客户端到最新版本,但请确保 Spring Integration Mqtt 4.2.1.RELEASE 版本与新版本的 Paho 客户端之间没有兼容性问题。有时候,库之间的依赖关系和版本不匹配可能导致预期行为不一致。

  2. 配置未生效:确保 mqttClient.setTimeToWait(1000); 这行代码在实际执行发布操作之前已经被正确调用,并且该 mqttClient 实例是被正确初始化和使用的那个实例。

  3. 网络或服务端问题:如果 AWS IoT 服务端处理缓慢或者网络延迟高,客户端可能会因为无法在指定时间内收到确认而一直等待。这种情况下,即使设置了 timeToWait,客户端也可能因为底层网络问题而看起来像是“不起作用”。

  4. 异步考虑:使用 AsyncMqttClient 可以避免阻塞主线程,这对于高性能应用来说是一个更好的选择。AsyncMqttClient 允许你在发送消息时提供一个回调函数来处理成功或失败的情况,这样你的主应用程序逻辑不会因为等待确认而阻塞。

  5. Spring Integration 配置:检查你的 Spring Integration 配置是否支持并正确配置了超时处理。Spring Integration 提供了多种方式来处理异步消息和错误情况,确保你充分利用这些机制。

综上所述,如果阻塞仍然是一个问题,切换到 AsyncMqttClient 并采用异步编程模型可能是更优解。这不仅能够提高应用的响应性和吞吐量,还能更好地处理像你所遇到的这类阻塞情况。同时,不要忘记审查和测试所有相关配置,确保它们按照预期工作。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答