开发者社区> 问答> 正文

阿里云消息服务使用示例代码报空指针

java.lang.NullPointerException

at com.aliyun.mns.client.impl.AbstractAction.execute(AbstractAction.java:94)
at com.aliyun.mns.client.CloudQueue.popMessage(CloudQueue.java:355)
at com.xxx.mns.receiver.MessageReceiver.receiveMessage(MessageReceiver.java:89)
at com.xxx.mns.worker.OrderWorker.work(OrderWorker.java:35)
at com.xxx.mns.MnsApplication.main(MnsApplication.java:34)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:54)
at java.lang.Thread.run(Thread.java:745)

主要核心代码参考
public class MessageReceiver {

public static final int WAIT_SECONDS = 30;

// if there are too many queues, a clear method could be involved after deleting the queue
protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();

protected Object lockObj;
protected String queueName;
protected CloudQueue cloudQueue;
protected int workerId;

public MessageReceiver(int id, MNSClient mnsClient, String queue) {
    cloudQueue = mnsClient.getQueueRef(queue);
    queueName = queue;
    workerId = id;

    synchronized (sLockObjMap) {
        lockObj = sLockObjMap.get(queueName);
        if (lockObj == null) {
            lockObj = new Object();
            sLockObjMap.put(queueName, lockObj);
        }
    }
}

public boolean setPolling() {
    synchronized (lockObj) {
        Boolean ret = sPollingMap.get(queueName);
        if (ret == null || !ret) {
            sPollingMap.put(queueName, true);
            return true;
        }
        return false;
    }
}

public void clearPolling() {
    synchronized (lockObj) {
        sPollingMap.put(queueName, false);
        lockObj.notifyAll();
        System.out.println("Everyone WakeUp and Work!");
    }
}

public Message receiveMessage() {
    boolean polling = false;
    while (true) {

// synchronized (lockObj) {
// Boolean p = sPollingMap.get(queueName);
// if (p != null && p) {
// try {
// System.out.println("Thread" + workerId + " Have a nice sleep!");
// polling = false;
// lockObj.wait();
// } catch (InterruptedException e) {
// System.out.println("MessageReceiver Interrupted! QueueName is " + queueName);
// return null;
// }
// }
// }

        Message message = null;
        if (!polling) {
            message = cloudQueue.popMessage();
            if (message == null) {
                polling = true;
                continue;
            }
        } else {
            if (setPolling()) {
                System.out.println("Thread" + workerId + " Polling! " + Instant.now());
            } else {
                continue;
            }
            do {
                System.out.println("Thread" + workerId + " KEEP Polling!" + Instant.now());
                message = cloudQueue.popMessage(WAIT_SECONDS);
            } while (message == null);
            clearPolling();
        }
        return message;
    }
}

}

展开
收起
hbwhypw 2017-05-04 16:41:48 3800 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
消息服务在Serverless中的应用 立即下载
阿里云通信战略新品发布 ——国际/港澳台消息服务 立即下载
阿里云通信战略新品发布—国际/港澳台消息服务 立即下载