【Azure Developer】使用 Python SDK连接Azure Storage Account, 计算Blob大小代码示例

简介: 【Azure Developer】使用 Python SDK连接Azure Storage Account, 计算Blob大小代码示例

问题描述

在微软云环境中,使用python SDK连接存储账号(Storage Account)需要计算Blob大小?虽然Azure提供了一个专用工具Azure Storage Explorer可以统计出Blob的大小:

但是它也是只能一个Blob Container一个的统计,如果Container数量巨大,这将是一个繁琐的工作。而作为开发者,应该让代码来帮助完成。下文使用最快上手的Python代码来计算Blob中容量的大小。

 

完整代码
import os, uuid, datetime, threading
import logging
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
def calculateBlob(connect_string, count):
    try:
        blob_service_client = BlobServiceClient.from_connection_string(connect_string)
    except Exception as e:
        messages = str(count) + "Connect_String Error, Messages:" + e.args.__str__()
        print(messages)
        logging.info(messages)
    else:
        all_containers = blob_service_client.list_containers()
        for c in all_containers:
            count_name = c.name
            print(count_name)
            if count_name not in blobSize_Total:
                blobSize_Total[count_name] = 0
            if count_name not in blobSize_Daily:
                blobSize_Daily[count_name] = 0
            container_client = blob_service_client.get_container_client(count_name)
            generator = container_client.list_blobs()
            total_size_container = 0
            daily_size_container = 0
            for blob in generator:
                total_size_container += blob.size
                blob_create_time = blob.creation_time.strftime("%Y%m%d")
                if blob_create_time != now_date:
                    continue
                else:
                    # Calculate BlobSize in this month
                    daily_size_container += blob.size
                    # blobSize_Daily[count_name] += blob.size  # /(1024*1024)  # content_length - bytes
            blobSize_Total[count_name] += total_size_container / (1024 * 1024)
            blobSize_Daily[count_name] += daily_size_container / (1024 * 1024)
    return None
if __name__ == '__main__':
    # connect string
    Connection_String_List ="DefaultEndpointsProtocol=https;AccountName=<storagename>;AccountKey=<key>;EndpointSuffix=core.chinacloudapi.cn"
    # for i in Connection_String:
    start = datetime.datetime.now()
    print(start)
    # 定义全局变量 - blobSize_Daily & blobSize_Total
    blobSize_Daily = {}
    blobSize_Total = {}
    now_date = datetime.datetime.now().strftime("%Y%m%d")
    print("开始计算")
    calculateBlob(Connection_String_List, 1)
    print("计算完成")
    print("统计当前新增大小")
    print(blobSize_Daily)
    print("统计Blob总大小")
    print(blobSize_Total)
    end = datetime.datetime.now()
    print(end)

如运行是没有Azure blob模块,可以使用 pip install azure-storage-blob 安装。以上代码运行结果如下:

 

 

如果有多个Storage Account,可以考虑加入多线程的方式来运行,在代码中增加一个myThread类,然后在 __main__ 中把 calculateBlob(Connection_String_List, 1) 运行替换为 many_thread(Connection_String_List) 即可。

class myThread(threading.Thread):
    def __init__(self, threadID, name, connection_string):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.connection_string = connection_string
    def run(self):
        print("开始线程:" + self.name)
        calculateBlob(self.connection_string, self.threadID)
        print("退出线程:" + self.name)
def many_thread(Connection_String_List):
    threads = []
    for i in range(len(Connection_String_List)):  # 循环创建多个线程
        t = myThread(i, "Thread-" + str(i), Connection_String_List[i])
        threads.append(t)
    for t in threads:  # 循环启动线程 一个线程对应一个连接字符串
        t.start()
    for t in threads:
        t.join()

 

遇见问题

在多线程执行时,可能会遇见问题:("Connection broken: ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)", ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)),出现此问题大都是由于客户端使用了已经断开的连接导致所导致的。所以一定要仔细调试多线程关闭代码。是否是把还需要运行的线程给关闭了。导致了以上的错误消息。

 

附录一:多线程计算Blob的完整代码

import os, uuid, datetime, threading
import logging
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
def calculateBlob(connect_string, count):
    try:
        blob_service_client = BlobServiceClient.from_connection_string(connect_string)
    except Exception as e:
        messages = str(count) + "Connect_String Error, Messages:" + e.args.__str__()
        print(messages)
        logging.info(messages)
    else:
        all_containers = blob_service_client.list_containers()
        for c in all_containers:
            count_name = c.name
            print(count_name)
            if count_name not in blobSize_Total:
                blobSize_Total[count_name] = 0
            if count_name not in blobSize_Daily:
                blobSize_Daily[count_name] = 0
            container_client = blob_service_client.get_container_client(count_name)
            generator = container_client.list_blobs()
            total_size_container = 0
            daily_size_container = 0
            for blob in generator:
                total_size_container += blob.size
                blob_create_time = blob.creation_time.strftime("%Y%m%d")
                if blob_create_time != now_date:
                    continue
                else:
                    # Calculate BlobSize in this month
                    daily_size_container += blob.size
                    # blobSize_Daily[count_name] += blob.size  # /(1024*1024)  # content_length - bytes
            blobSize_Total[count_name] += total_size_container / (1024 * 1024)
            blobSize_Daily[count_name] += daily_size_container / (1024 * 1024)
    return None
class myThread(threading.Thread):
    def __init__(self, threadID, name, connection_string):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.connection_string = connection_string
    def run(self):
        print("开始线程:" + self.name)
        calculateBlob(self.connection_string, self.threadID)
        print("退出线程:" + self.name)
def many_thread(Connection_String_List):
    threads = []
    for i in range(len(Connection_String_List)):  # 循环创建多个个线程
        t = myThread(i, "Thread-" + str(i), Connection_String_List[i])
        threads.append(t)
    for t in threads:  # 循环启动线程 - 一个线程对应一个连接字符串
        t.start()
    for t in threads:
        t.join()
if __name__ == '__main__':
    # connect string
    Connection_String_List =  ['DefaultEndpointsProtocol=https;AccountName=<your storage account 1>;AccountKey=<Key 1>;EndpointSuffix=core.chinacloudapi.cn', 'DefaultEndpointsProtocol=https;AccountName=<your storage account 2>;AccountKey=<Key 2>;EndpointSuffix=core.chinacloudapi.cn']
    # for i in Connection_String:
    start = datetime.datetime.now()
    print(start)
    # 定义全局变量 - blobSize_Daily & blobSize_Total
    blobSize_Daily = {}
    blobSize_Total = {}
    now_date = datetime.datetime.now().strftime("%Y%m%d")
    many_thread(Connection_String_List)
    print("Main Thread End")
    print(blobSize_Daily)
    print(blobSize_Total)
    end = datetime.datetime.now()
    print(end)

运行效果:

 

 

 

参考资料

快速入门:使用 Python v12 SDK 管理 blobhttps://docs.azure.cn/zh-cn/storage/blobs/storage-quickstart-blobs-python

Python 列表(List) : https://www.runoob.com/python/python-lists.html

BlobServiceClient Class : https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobserviceclient?view=azure-python

 

相关文章
|
4月前
|
存储 人工智能 开发工具
AI助理化繁为简,速取代码参数——使用python SDK 处理OSS存储的图片
只需要通过向AI助理提问的方式输入您的需求,即可瞬间获得核心流程代码及参数,缩短学习路径、提升开发效率。
1484 4
AI助理化繁为简,速取代码参数——使用python SDK 处理OSS存储的图片
|
2月前
|
JavaScript API C#
【Azure Developer】Python代码调用Graph API将外部用户添加到组,结果无效,也无错误信息
根据Graph API文档,在单个请求中将多个成员添加到组时,Python代码示例中的`members@odata.bind`被错误写为`members@odata_bind`,导致用户未成功添加。
50 10
|
2月前
|
弹性计算 安全 开发工具
灵码评测-阿里云提供的ECS python3 sdk做安全组管理
批量变更阿里云ECS安全组策略(批量变更)
|
2月前
|
API Python
【Azure Developer】分享一段Python代码调用Graph API创建用户的示例
分享一段Python代码调用Graph API创建用户的示例
68 11
|
4月前
|
机器人 Shell Linux
【Azure Bot Service】部署Python ChatBot代码到App Service中
本文介绍了使用Python编写的ChatBot在部署到Azure App Service时遇到的问题及解决方案。主要问题是应用启动失败,错误信息为“Failed to find attribute &#39;app&#39; in &#39;app&#39;”。解决步骤包括:1) 修改`app.py`文件,添加`init_func`函数;2) 配置`config.py`,添加与Azure Bot Service认证相关的配置项;3) 设置App Service的启动命令为`python3 -m aiohttp.web -H 0.0.0.0 -P 8000 app:init_func`。
|
3月前
|
中间件 Docker Python
【Azure Function】FTP上传了Python Function文件后,无法在门户页面加载函数的问题
通过FTP上传Python Function至Azure云后,出现函数列表无法加载的问题。经排查,发现是由于`requirements.txt`中的依赖包未被正确安装。解决方法为:在本地安装依赖包到`.python_packages/lib/site-packages`目录,再将该目录内容上传至云上的`wwwroot`目录,并重启应用。最终成功加载函数列表。
|
4月前
|
Linux Python
【Azure Function】Python Function部署到Azure后报错No module named '_cffi_backend'
ERROR: Error: No module named '_cffi_backend', Cannot find module. Please check the requirements.txt file for the missing module.
110 2
|
4月前
|
JavaScript 前端开发 开发工具
【Azure Developer】使用JavaScript通过SDK进行monitor-query的client认证报错问题
AADSTS90002: Tenant 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx' not found. Check to make sure you have the correct tenant ID and are signing into the correct cloud. Check with your subscription administrator, this may happen if there are no active subscriptions for the tenant.
|
2月前
|
人工智能 数据可视化 数据挖掘
探索Python编程:从基础到高级
在这篇文章中,我们将一起深入探索Python编程的世界。无论你是初学者还是有经验的程序员,都可以从中获得新的知识和技能。我们将从Python的基础语法开始,然后逐步过渡到更复杂的主题,如面向对象编程、异常处理和模块使用。最后,我们将通过一些实际的代码示例,来展示如何应用这些知识解决实际问题。让我们一起开启Python编程的旅程吧!
|
2月前
|
存储 数据采集 人工智能
Python编程入门:从零基础到实战应用
本文是一篇面向初学者的Python编程教程,旨在帮助读者从零开始学习Python编程语言。文章首先介绍了Python的基本概念和特点,然后通过一个简单的例子展示了如何编写Python代码。接下来,文章详细介绍了Python的数据类型、变量、运算符、控制结构、函数等基本语法知识。最后,文章通过一个实战项目——制作一个简单的计算器程序,帮助读者巩固所学知识并提高编程技能。

热门文章

最新文章