概述
本文主要介绍如何使用Python3.6操作阿里云AMQP。阿里云的AMQP是完全兼容开源社区的AMQP,使用过程中只需要在创建连接阶段参考官方示例配置连接信息,之后的使用与开源社区AMQP使用完全一致,使用的SDK也是开源社区的SDK:pika。
Code Sample
1、计算username、password
# -*- coding: utf-8 -*
import base64
import hashlib
import hmac
from datetime import datetime
class AliyunCredentialsProvider:
"""
Python3.6+适用,根据阿里云的 accessKey,accessSecret,UID算出amqp连接使用的username和password
UID是资源ownerID,一般是接入点第一段
"""
ACCESS_FROM_USER: int = 0
def __init__(self, access_key: str, access_secret: str, uid: int, security_token: str = None) -> None:
self.accessKey = access_key
self.accessSecret = access_secret
self.UID = uid
self.securityToken = security_token
def get_username(self) -> str:
ak = self.accessKey
ret = base64.b64encode(f'{self.ACCESS_FROM_USER}:{self.UID}:{ak}'.encode())
if self.securityToken:
ret = f'{ret}:{self.securityToken}'
return str(ret, 'UTF-8')
def get_password(self) -> str:
now = datetime.now()
timestamp = int(now.timestamp() * 1000)
key = bytes(str(timestamp), 'UTF-8')
message = bytes(self.accessSecret, 'UTF-8')
digester = hmac.new(key, message, hashlib.sha1)
signature1: str = digester.hexdigest()
signature1 = signature1.upper()
ret = base64.b64encode(f'{signature1}:{timestamp}'.encode())
passoword = str(ret, 'UTF-8')
return passoword
2、获取认证需要的参数
# -*- coding: utf-8 -*
import pika
from AMQP.AliyunCredentialsProvider3 import AliyunCredentialsProvider
# 接入点
host = "1848217816617278.mq-amqp.cn-hangzhou-a.aliyuncs.com";
# 默认端口
port = 5672;
# 资源隔离
virtualHost = "yutaoamqptest";
# 阿里云的accessKey
accessKey = "********";
# 阿里云的accessSecret
accessSecret = "********";
# 主账号id
resourceOwnerId = int(184********17278);
provider = AliyunCredentialsProvider(accessKey, accessSecret, resourceOwnerId)
def getConnectionParam():
credentials = pika.PlainCredentials(provider.get_username(), provider.get_password(), erase_on_connect=True)
return pika.ConnectionParameters(host, port, virtualHost, credentials)
3、发送Code
import pika
from AMQP import connection
connection = pika.BlockingConnection(connection.getConnectionParam()) # 建立连接
# Create a new channel with the next available channel number or pass in a channel number to use
channel = connection.channel()
# Declare queue, create if needed. This method creates or checks a queue.
# When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print("[x] Sent 'Hello World!'")
connection.close()
4、接收Code
import pika
from AMQP import connection
connection = pika.BlockingConnection(connection.getConnectionParam()) # 建立连接
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
5、项目目录结构
6、接收测试结果
参考链接
amqp-python-demo