java.lang.NullPointerException
at com.aliyun.mns.client.impl.AbstractAction.execute(AbstractAction.java:94)
at com.aliyun.mns.client.CloudQueue.popMessage(CloudQueue.java:355)
at com.xxx.mns.receiver.MessageReceiver.receiveMessage(MessageReceiver.java:89)
at com.xxx.mns.worker.OrderWorker.work(OrderWorker.java:35)
at com.xxx.mns.MnsApplication.main(MnsApplication.java:34)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:54)
at java.lang.Thread.run(Thread.java:745)
主要核心代码参考
public class MessageReceiver {
public static final int WAIT_SECONDS = 30;
// if there are too many queues, a clear method could be involved after deleting the queue
protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();
protected Object lockObj;
protected String queueName;
protected CloudQueue cloudQueue;
protected int workerId;
public MessageReceiver(int id, MNSClient mnsClient, String queue) {
cloudQueue = mnsClient.getQueueRef(queue);
queueName = queue;
workerId = id;
synchronized (sLockObjMap) {
lockObj = sLockObjMap.get(queueName);
if (lockObj == null) {
lockObj = new Object();
sLockObjMap.put(queueName, lockObj);
}
}
}
public boolean setPolling() {
synchronized (lockObj) {
Boolean ret = sPollingMap.get(queueName);
if (ret == null || !ret) {
sPollingMap.put(queueName, true);
return true;
}
return false;
}
}
public void clearPolling() {
synchronized (lockObj) {
sPollingMap.put(queueName, false);
lockObj.notifyAll();
System.out.println("Everyone WakeUp and Work!");
}
}
public Message receiveMessage() {
boolean polling = false;
while (true) {
// synchronized (lockObj) {
// Boolean p = sPollingMap.get(queueName);
// if (p != null && p) {
// try {
// System.out.println("Thread" + workerId + " Have a nice sleep!");
// polling = false;
// lockObj.wait();
// } catch (InterruptedException e) {
// System.out.println("MessageReceiver Interrupted! QueueName is " + queueName);
// return null;
// }
// }
// }
Message message = null;
if (!polling) {
message = cloudQueue.popMessage();
if (message == null) {
polling = true;
continue;
}
} else {
if (setPolling()) {
System.out.println("Thread" + workerId + " Polling! " + Instant.now());
} else {
continue;
}
do {
System.out.println("Thread" + workerId + " KEEP Polling!" + Instant.now());
message = cloudQueue.popMessage(WAIT_SECONDS);
} while (message == null);
clearPolling();
}
return message;
}
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。