阿里云AMQP Python3使用

简介: 概述本文主要介绍如何使用Python3.6操作阿里云AMQP。阿里云的AMQP是完全兼容开源社区的AMQP,使用过程中只需要在创建连接阶段参考官方示例配置连接信息,之后的使用与开源社区AMQP使用完全一致,使用的SDK也是开源社区的SDK:pika。

概述

本文主要介绍如何使用Python3.6操作阿里云AMQP。阿里云的AMQP是完全兼容开源社区的AMQP,使用过程中只需要在创建连接阶段参考官方示例配置连接信息,之后的使用与开源社区AMQP使用完全一致,使用的SDK也是开源社区的SDK:pika。

Code Sample

1、计算username、password
# -*- coding: utf-8 -*
import base64
import hashlib
import hmac
from datetime import datetime

class AliyunCredentialsProvider:
    """
    Python3.6+适用,根据阿里云的 accessKey,accessSecret,UID算出amqp连接使用的username和password
    UID是资源ownerID,一般是接入点第一段
    """

    ACCESS_FROM_USER: int = 0

    def __init__(self, access_key: str, access_secret: str, uid: int, security_token: str = None) -> None:
        self.accessKey = access_key
        self.accessSecret = access_secret
        self.UID = uid
        self.securityToken = security_token

    def get_username(self) -> str:
        ak = self.accessKey
        ret = base64.b64encode(f'{self.ACCESS_FROM_USER}:{self.UID}:{ak}'.encode())
        if self.securityToken:
            ret = f'{ret}:{self.securityToken}'
        return str(ret, 'UTF-8')

    def get_password(self) -> str:
        now = datetime.now()
        timestamp = int(now.timestamp() * 1000)
        key = bytes(str(timestamp), 'UTF-8')
        message = bytes(self.accessSecret, 'UTF-8')

        digester = hmac.new(key, message, hashlib.sha1)
        signature1: str = digester.hexdigest()
        signature1 = signature1.upper()

        ret = base64.b64encode(f'{signature1}:{timestamp}'.encode())
        passoword = str(ret, 'UTF-8')
        return passoword
2、获取认证需要的参数
# -*- coding: utf-8 -*
import pika
from AMQP.AliyunCredentialsProvider3 import AliyunCredentialsProvider

# 接入点
host = "1848217816617278.mq-amqp.cn-hangzhou-a.aliyuncs.com";
# 默认端口
port = 5672;
# 资源隔离
virtualHost = "yutaoamqptest";
# 阿里云的accessKey
accessKey = "********";
# 阿里云的accessSecret
accessSecret = "********";
# 主账号id
resourceOwnerId = int(184********17278);

provider = AliyunCredentialsProvider(accessKey, accessSecret, resourceOwnerId)

def getConnectionParam():
    credentials = pika.PlainCredentials(provider.get_username(), provider.get_password(), erase_on_connect=True)
    return pika.ConnectionParameters(host, port, virtualHost, credentials)
3、发送Code

import pika

from AMQP import connection
connection = pika.BlockingConnection(connection.getConnectionParam()) # 建立连接

# Create a new channel with the next available channel number or pass in a channel number to use
channel = connection.channel()
# Declare queue, create if needed. This method creates or checks a queue.
# When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print("[x] Sent 'Hello World!'")
connection.close()
4、接收Code
import pika
from AMQP import connection

connection = pika.BlockingConnection(connection.getConnectionParam()) # 建立连接
channel = connection.channel()
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
5、项目目录结构

_

6、接收测试结果

_

参考链接

amqp-python-demo

相关文章
|
8天前
|
机器学习/深度学习 人工智能 分布式计算
我的阿里云社区年度总结报告:Python、人工智能与大数据领域的探索之旅
我的阿里云社区年度总结报告:Python、人工智能与大数据领域的探索之旅
83 35
|
1月前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
83 2
|
1月前
|
弹性计算 安全 开发工具
灵码评测-阿里云提供的ECS python3 sdk做安全组管理
批量变更阿里云ECS安全组策略(批量变更)
|
2月前
|
机器学习/深度学习 自然语言处理 API
如何使用阿里云的语音合成服务(TTS)将文本转换为语音?本文详细介绍了从注册账号、获取密钥到编写Python代码调用TTS服务的全过程
如何使用阿里云的语音合成服务(TTS)将文本转换为语音?本文详细介绍了从注册账号、获取密钥到编写Python代码调用TTS服务的全过程。通过简单的代码示例,展示如何将文本转换为自然流畅的语音,适用于有声阅读、智能客服等场景。
605 3
|
5月前
|
弹性计算 API 开发工具
揭秘Python与阿里云API的神秘邂逅!流式处理的魔法之旅,一场颠覆想象的技术盛宴!
【8月更文挑战第15天】在数字世界的广阔舞台上,Python与阿里云API的相遇,就像是一场命中注定的邂逅。它们携手共舞,为我们带来了流式处理的魔法之旅。本文将揭开这场神秘邂逅的面纱,带你领略Python与阿里云API之间的奇妙互动。让我们一起踏上这场颠覆想象的技术盛宴,探索流式处理的无限可能!
95 7
|
5月前
|
弹性计算 JSON 开发工具
"一键玩转阿里云ECS!Python大神揭秘:如何自动化创建镜像并跨地域复制,让你的云资源部署秒变高效达人!"
【8月更文挑战第14天】本文介绍如何使用Python与阿里云SDK自动化管理ECS镜像,包括创建镜像及跨地域复制,以优化云资源部署。首先安装`aliyun-python-sdk-ecs`并配置阿里云凭证。接着,通过Python脚本实现镜像创建与复制功能,简化日常运维工作并增强灾难恢复能力。注意权限及费用问题。
114 2
|
5月前
|
关系型数据库 数据库 数据安全/隐私保护
"告别繁琐!Python大神揭秘:如何一键定制阿里云RDS备份策略,让数据安全与效率并肩飞,轻松玩转云端数据库!"
【8月更文挑战第14天】在云计算时代,数据库安全至关重要。阿里云RDS提供自动备份,但标准策略难以适应所有场景。传统手动备份灵活性差、管理成本高且恢复效率低。本文对比手动备份,介绍使用Python自定义阿里云RDS备份策略的方法,实现动态调整备份频率、集中管理和智能决策,提升备份效率与数据安全性。示例代码演示如何创建自动备份任务。通过自动化与智能化备份管理,支持企业数字化转型。
125 2
|
6月前
|
消息中间件 分布式计算 DataWorks
DataWorks产品使用合集之如何使用Python和阿里云SDK读取OSS中的文件
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
6月前
|
分布式计算 数据可视化 大数据
阿里云大牛熬夜整理的Python大数据小抄,GitHub星标125K!
Python 是一种流行的编程语言,在大数据领域有广泛的应用。Python 拥有丰富的库和工具,可用于数据处理、分析和可视化。 在大数据处理方面,Python 可以与 Hadoop、Spark 等大数据框架集成,实现大规模数据的处理和分析。它也适用于数据清洗、数据转换、数据挖掘等任务。 此外,Python 的数据分析库如 Pandas、NumPy 和 Matplotlib 等,提供了强大的数据处理和可视化功能,使得数据分析变得更加简单和高效。
|
6月前
|
人工智能 分布式计算 数据挖掘
阿里云 MaxCompute MaxFrame 开启免费公测,统一 Python 开发生态
阿里云 MaxCompute MaxFrame 开启免费公测,统一 Python 开发生态。分布式计算框架 MaxFrame 支持 Python 编程接口并可直接复用 MaxCompute 弹性计算资源及海量数据,100%兼容 Pandas 且自动分布式,与 MaxCompute Notebook、镜像管理等功能共同构成了 MaxCompute 的 Python 开发生态。用户可以以更熟悉、高效、灵活的方式在 MaxCompute 上进行大规模数据分析处理、可视化数据探索分析以及科学计算、ML/AI 开发等工作。
269 7