我们有一个Spring Boot应用程序,可将IoT传感器数据发布到AWS IoT。在负载下,我们的阻塞MqttClient无限期阻塞:
"taskScheduler-72" - Thread t@313
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Native Method)
- waiting on <7e7ec8a6> (a java.lang.Object)
at java.lang.Object.wait(Object.java:502)
at org.eclipse.paho.client.mqttv3.internal.Token.waitForResponse(Token.java:143)
at org.eclipse.paho.client.mqttv3.internal.Token.waitForCompletion(Token.java:108)
at org.eclipse.paho.client.mqttv3.MqttToken.waitForCompletion(MqttToken.java:67)
at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:570)
at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:562)
at
...
我们正在使用QOS 1:
try {
mqttClient.publish(topic, payload, 1, false);
} catch (MqttException e) {
throw new PublishingException(e);
}
我们正在使用spring-integration-mqtt:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>4.2.1.RELEASE</version>
</dependency>
我尝试将paho客户端更新为最新版本:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
<scope>compile</scope>
</dependency>
并设置timeToWait,据我理解,如果发布请求在1000毫秒内未收到确认,则应该抛出MqttException:
mqttClient.setTimeToWait(1000);
为什么timeToWait不起作用?我们应该切换到AsyncMqttClient吗?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在您的场景中,timeToWait
参数设置理论上应该限制等待消息确认的时间,但请注意,这个参数仅在同步操作模式下有效,并且它定义的是客户端在断开连接之前等待响应的最长时间。如果在设置了 timeToWait
后仍然遇到阻塞问题,可能的原因有:
版本兼容性问题:尽管您已经尝试更新了 Paho 客户端到最新版本,但请确保 Spring Integration Mqtt 4.2.1.RELEASE 版本与新版本的 Paho 客户端之间没有兼容性问题。有时候,库之间的依赖关系和版本不匹配可能导致预期行为不一致。
配置未生效:确保 mqttClient.setTimeToWait(1000);
这行代码在实际执行发布操作之前已经被正确调用,并且该 mqttClient
实例是被正确初始化和使用的那个实例。
网络或服务端问题:如果 AWS IoT 服务端处理缓慢或者网络延迟高,客户端可能会因为无法在指定时间内收到确认而一直等待。这种情况下,即使设置了 timeToWait
,客户端也可能因为底层网络问题而看起来像是“不起作用”。
异步考虑:使用 AsyncMqttClient 可以避免阻塞主线程,这对于高性能应用来说是一个更好的选择。AsyncMqttClient 允许你在发送消息时提供一个回调函数来处理成功或失败的情况,这样你的主应用程序逻辑不会因为等待确认而阻塞。
Spring Integration 配置:检查你的 Spring Integration 配置是否支持并正确配置了超时处理。Spring Integration 提供了多种方式来处理异步消息和错误情况,确保你充分利用这些机制。
综上所述,如果阻塞仍然是一个问题,切换到 AsyncMqttClient 并采用异步编程模型可能是更优解。这不仅能够提高应用的响应性和吞吐量,还能更好地处理像你所遇到的这类阻塞情况。同时,不要忘记审查和测试所有相关配置,确保它们按照预期工作。