java.lang.NullPointerException
at com.aliyun.mns.client.impl.AbstractAction.execute(AbstractAction.java:94)
at com.aliyun.mns.client.CloudQueue.popMessage(CloudQueue.java:355)
at com.xxx.mns.receiver.MessageReceiver.receiveMessage(MessageReceiver.java:89)
at com.xxx.mns.worker.OrderWorker.work(OrderWorker.java:35)
at com.xxx.mns.MnsApplication.main(MnsApplication.java:34)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:54)
at java.lang.Thread.run(Thread.java:745)
主要核心代码参考
public class MessageReceiver {
public static final int WAIT_SECONDS = 30;
// if there are too many queues, a clear method could be involved after deleting the queue
protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();
protected Object lockObj;
protected String queueName;
protected CloudQueue cloudQueue;
protected int workerId;
public MessageReceiver(int id, MNSClient mnsClient, String queue) {
cloudQueue = mnsClient.getQueueRef(queue);
queueName = queue;
workerId = id;
synchronized (sLockObjMap) {
lockObj = sLockObjMap.get(queueName);
if (lockObj == null) {
lockObj = new Object();
sLockObjMap.put(queueName, lockObj);
}
}
}
public boolean setPolling() {
synchronized (lockObj) {
Boolean ret = sPollingMap.get(queueName);
if (ret == null || !ret) {
sPollingMap.put(queueName, true);
return true;
}
return false;
}
}
public void clearPolling() {
synchronized (lockObj) {
sPollingMap.put(queueName, false);
lockObj.notifyAll();
System.out.println("Everyone WakeUp and Work!");
}
}
public Message receiveMessage() {
boolean polling = false;
while (true) {
// synchronized (lockObj) {
// Boolean p = sPollingMap.get(queueName);
// if (p != null && p) {
// try {
// System.out.println("Thread" + workerId + " Have a nice sleep!");
// polling = false;
// lockObj.wait();
// } catch (InterruptedException e) {
// System.out.println("MessageReceiver Interrupted! QueueName is " + queueName);
// return null;
// }
// }
// }
Message message = null;
if (!polling) {
message = cloudQueue.popMessage();
if (message == null) {
polling = true;
continue;
}
} else {
if (setPolling()) {
System.out.println("Thread" + workerId + " Polling! " + Instant.now());
} else {
continue;
}
do {
System.out.println("Thread" + workerId + " KEEP Polling!" + Instant.now());
message = cloudQueue.popMessage(WAIT_SECONDS);
} while (message == null);
clearPolling();
}
return message;
}
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
从提供的错误信息来看,你的Java应用在调用阿里云MNS(Message Service)的CloudQueue.popMessage
方法时遇到了NullPointerException
(空指针异常)。这个异常通常意味着某个对象引用未被正确初始化。根据堆栈跟踪,问题可能出在以下几个方面:
cloudQueue
对象未正确初始化:确保在MessageReceiver
类的构造函数中,通过mnsClient.getQueueRef(queue)
获取到的CloudQueue
实例是有效的,且mnsClient
和queue
参数都不为null。
环境或配置问题:确认阿里云MNS客户端初始化时使用的凭证、区域等配置是否正确无误。错误的配置可能导致隐式地创建了无效的CloudQueue
实例。
并发访问问题:虽然你的代码中有并发控制的尝试(使用synchronized
关键字和锁对象lockObj
),但异常可能发生在锁之外的地方,或者是因为并发访问导致的其他状态不一致。检查是否有其他线程操作了cloudQueue
或其依赖对象,导致其状态变得不可预测。
MNS服务端问题:尽管较为少见,但也不能完全排除MNS服务端返回异常响应或服务不稳定导致的问题。可以通过阿里云控制台查看MNS服务的状态和日志,确认服务端是否正常。
针对这个问题,你可以尝试以下解决步骤:
验证初始化逻辑:在MessageReceiver
构造函数后添加日志输出,确认cloudQueue
是否被成功初始化。
增加空值检查:在调用popMessage
之前,增加对cloudQueue
的非空检查,以避免直接抛出NullPointerException。
if (cloudQueue != null) {
try {
cloudQueue.popMessage(WAIT_SECONDS);
} catch (Exception e) {
// 处理异常
}
} else {
log.error("CloudQueue is not initialized properly.");
}
审查并发逻辑:仔细检查多线程访问cloudQueue
及其相关变量的逻辑,确保同步机制正确无误,没有竞态条件。
查看MNS服务状态:登录阿里云控制台,检查MNS服务是否有任何警告或异常,确认服务正常运行。
如果以上步骤都无法定位问题,建议收集更详细的日志信息,包括但不限于MNS客户端初始化过程的日志,以及在调用popMessage
前后相关的变量状态,以便进一步分析。同时,也可以考虑升级阿里云MNS Java SDK到最新版本,以排除因使用旧版SDK可能存在的已知问题。