开发者社区> 问答> 正文

如何查找流量回执消息(2)



编写样例代码

  1. #!/usr/bin/env python
  2. # coding=utf8
  3. import sys
  4. from aliyunsdkcore.acs_exception.exceptions import ServerException
  5. from aliyunsdkcore.client import AcsClient
  6. from aliyunsdkdybaseapi.request.v20170525.QueryTokenForMnsQueueRequest import QueryTokenForMnsQueueRequest
  7. import datetime
  8. from mns.account import Account
  9. from mns.queue import *
  10. try:
  11.     import json
  12. except ImportError:
  13.     import simplejson as json
  14. """
  15. 云通信基础能力业务回执消息消费示例,供参考。
  16. Created on 2017-06-13
  17. """
  18. reload(sys)
  19. sys.setdefaultencoding('utf8')
  20. #暂不支持多region,默认需配置成cn-hangzhou
  21. regionid = "cn-hangzhou"
  22. # ACCESS_KEY_ID/ACCESS_KEY_SECRET 根据实际申请的账号信息进行替换
  23. accesskeyid = "yourAccessKeyId"
  24. accesskeysecret = "yourAccessKeySecret"
  25. endpoint = "http://1943695596114318.mns.cn-hangzhou.aliyuncs.com"
  26. # 短信回执:SmsReport,短息上行:SmsUp,语音呼叫:VoiceReport,流量直冲:FlowReport
  27. msgtype = "FlowReport"
  28. # 在云通信页面开通相应业务消息后,就能在页面上获得对应的queueName
  29. qname = "Alicom-Queue-1093695199815616-FlowReport"
  30. acs_client = AcsClient(accesskeyid, accesskeysecret, regionid)
  31. # 云通信业务token存在失效时间,需动态更新。
  32. class Token():
  33.     def __init__(self, token=None, tmp_access_id=None, tmp_access_key=None, expire_time=None):
  34.         self.__token = token
  35.         self.__tmp_access_id = tmp_access_id
  36.         self.__tmp_access_key = tmp_access_key
  37.         self.__expire_time = expire_time
  38.     def get_token(self):
  39.         return self.__token
  40.     def set_token(self, token):
  41.         self.__token = token
  42.     def get_tmp_access_id(self):
  43.         return self.__tmp_access_id
  44.     def set_tmp_access_id(self, tmp_access_id):
  45.         self.__tmp_access_id = tmp_access_id
  46.     def get_tmp_access_key(self):
  47.         return self.__tmp_access_key
  48.     def set_tmp_access_key(self, tmp_access_key):
  49.         self.__tmp_access_key = tmp_access_key
  50.     def get_expire_time(self):
  51.         return self.__expire_time
  52.     def set_expire_time(self, expire_time):
  53.         self.__expire_time = expire_time
  54.     def is_refresh(self):
  55.         # 失效时间与当前系统时间比较,提前2分钟刷新token
  56.         now = datetime.datetime.now()
  57.         expire = datetime.datetime.strptime(self.__expire_time, "%Y-%m-%d %H:%M:%S")
  58.         # intval = (expire - now).seconds
  59.         # print "token生效剩余时长(秒):" + str(intval)
  60.         if (expire - now).seconds < 120:
  61.             return 1
  62.         return 0
  63.     def refresh(self):
  64.         print "start refresh token..."
  65.         request = QueryTokenForMnsQueueRequest()
  66.         request.set_MessageType(msgtype)
  67.         response = acs_client.do_action_with_exception(request)
  68.         # print response
  69.         if response is None:
  70.             raise ServerException("GET_TOKEN_FAIL", "获取token时无响应")
  71.         response_body = json.loads(response)
  72.         if response_body.get("Code") != "OK":
  73.             raise ServerException("GET_TOKEN_FAIL", "获取token失败")
  74.         self.__tmp_access_key = response_body.get("MessageTokenDTO").get("AccessKeySecret")
  75.         self.__tmp_access_id = response_body.get("MessageTokenDTO").get("AccessKeyId")
  76.         self.__expire_time = response_body.get("MessageTokenDTO").get("ExpireTime")
  77.         self.__token = response_body.get("MessageTokenDTO").get("SecurityToken")
  78.         print "finsh refresh token..."
  79. # 初始化 my_account, my_queue
  80. token = Token()
  81. token.refresh()
  82. my_account = Account(endpoint, token.get_tmp_access_id(), token.get_tmp_access_key(), token.get_token())
  83. my_queue = my_account.get_queue(qname)
  84. # my_queue.set_encoding(False)
  85. # 循环读取删除消息直到队列空
  86. # receive message请求使用long polling方式,通过wait_seconds指定长轮询时间为3秒
  87. ## long polling 解析:
  88. ### 当队列中有消息时,请求立即返回;
  89. ### 当队列中没有消息时,请求在MNS服务器端挂3秒钟,在这期间,有消息写入队列,请求会立即返回消息,3秒后,请求返回队列没有消息;
  90. wait_seconds = 3
  91. print "%sReceive And Delete Message From Queue%s\nQueueName:%s\nWaitSeconds:%s\n" % (
  92.     10 * "=", 10 * "=", qname, wait_seconds)
  93. while True:
  94.     # 读取消息
  95.     try:
  96.         if token.is_refresh() == 1:
  97.             # 刷新token
  98.             token.refresh()
  99.             my_account.mns_client.close_connection();
  100.             my_account = Account(endpoint, token.get_tmp_access_id(), token.get_tmp_access_key(), token.get_token())
  101.             my_queue = my_account.get_queue(qname)
  102.             # my_queue.set_encoding(False)
  103.         recv_msg = my_queue.receive_message(wait_seconds)
  104.         print "Receive Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" % (
  105.             recv_msg.receipt_handle, recv_msg.message_body, recv_msg.message_id)
  106.     except MNSExceptionBase, e:
  107.         if e.type == "QueueNotExist":
  108.             print "Queue not exist, please create queue before receive message."
  109.             sys.exit(0)
  110.         elif e.type == "MessageNotExist":
  111.             print "Queue is empty! sleep 10s"
  112.             time.sleep(10)
  113.             continue
  114.         print "Receive Message Fail! Exception:%s\n" % e
  115.         continue
  116.     # 删除消息
  117.     try:
  118.         my_queue.delete_message(recv_msg.receipt_handle)
  119.         print "Delete Message Succeed!  ReceiptHandle:%s" % recv_msg.receipt_handle
  120.     except MNSExceptionBase, e:
  121.         print "Delete Message Fail! Exception:%s\n" % e

展开
收起
猫饭先生 2017-10-25 14:01:08 1596 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载