【Azure Developer】使用 Python SDK连接Azure Storage Account, 计算Blob大小代码示例

简介: 【Azure Developer】使用 Python SDK连接Azure Storage Account, 计算Blob大小代码示例

问题描述

在微软云环境中,使用python SDK连接存储账号(Storage Account)需要计算Blob大小?虽然Azure提供了一个专用工具Azure Storage Explorer可以统计出Blob的大小:

但是它也是只能一个Blob Container一个的统计,如果Container数量巨大,这将是一个繁琐的工作。而作为开发者,应该让代码来帮助完成。下文使用最快上手的Python代码来计算Blob中容量的大小。

 

完整代码
import os, uuid, datetime, threading
import logging
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
def calculateBlob(connect_string, count):
    try:
        blob_service_client = BlobServiceClient.from_connection_string(connect_string)
    except Exception as e:
        messages = str(count) + "Connect_String Error, Messages:" + e.args.__str__()
        print(messages)
        logging.info(messages)
    else:
        all_containers = blob_service_client.list_containers()
        for c in all_containers:
            count_name = c.name
            print(count_name)
            if count_name not in blobSize_Total:
                blobSize_Total[count_name] = 0
            if count_name not in blobSize_Daily:
                blobSize_Daily[count_name] = 0
            container_client = blob_service_client.get_container_client(count_name)
            generator = container_client.list_blobs()
            total_size_container = 0
            daily_size_container = 0
            for blob in generator:
                total_size_container += blob.size
                blob_create_time = blob.creation_time.strftime("%Y%m%d")
                if blob_create_time != now_date:
                    continue
                else:
                    # Calculate BlobSize in this month
                    daily_size_container += blob.size
                    # blobSize_Daily[count_name] += blob.size  # /(1024*1024)  # content_length - bytes
            blobSize_Total[count_name] += total_size_container / (1024 * 1024)
            blobSize_Daily[count_name] += daily_size_container / (1024 * 1024)
    return None
if __name__ == '__main__':
    # connect string
    Connection_String_List ="DefaultEndpointsProtocol=https;AccountName=<storagename>;AccountKey=<key>;EndpointSuffix=core.chinacloudapi.cn"
    # for i in Connection_String:
    start = datetime.datetime.now()
    print(start)
    # 定义全局变量 - blobSize_Daily & blobSize_Total
    blobSize_Daily = {}
    blobSize_Total = {}
    now_date = datetime.datetime.now().strftime("%Y%m%d")
    print("开始计算")
    calculateBlob(Connection_String_List, 1)
    print("计算完成")
    print("统计当前新增大小")
    print(blobSize_Daily)
    print("统计Blob总大小")
    print(blobSize_Total)
    end = datetime.datetime.now()
    print(end)

如运行是没有Azure blob模块,可以使用 pip install azure-storage-blob 安装。以上代码运行结果如下:

 

 

如果有多个Storage Account,可以考虑加入多线程的方式来运行,在代码中增加一个myThread类,然后在 __main__ 中把 calculateBlob(Connection_String_List, 1) 运行替换为 many_thread(Connection_String_List) 即可。

class myThread(threading.Thread):
    def __init__(self, threadID, name, connection_string):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.connection_string = connection_string
    def run(self):
        print("开始线程:" + self.name)
        calculateBlob(self.connection_string, self.threadID)
        print("退出线程:" + self.name)
def many_thread(Connection_String_List):
    threads = []
    for i in range(len(Connection_String_List)):  # 循环创建多个线程
        t = myThread(i, "Thread-" + str(i), Connection_String_List[i])
        threads.append(t)
    for t in threads:  # 循环启动线程 一个线程对应一个连接字符串
        t.start()
    for t in threads:
        t.join()

 

遇见问题

在多线程执行时,可能会遇见问题:("Connection broken: ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)", ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)),出现此问题大都是由于客户端使用了已经断开的连接导致所导致的。所以一定要仔细调试多线程关闭代码。是否是把还需要运行的线程给关闭了。导致了以上的错误消息。

 

附录一:多线程计算Blob的完整代码

import os, uuid, datetime, threading
import logging
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
def calculateBlob(connect_string, count):
    try:
        blob_service_client = BlobServiceClient.from_connection_string(connect_string)
    except Exception as e:
        messages = str(count) + "Connect_String Error, Messages:" + e.args.__str__()
        print(messages)
        logging.info(messages)
    else:
        all_containers = blob_service_client.list_containers()
        for c in all_containers:
            count_name = c.name
            print(count_name)
            if count_name not in blobSize_Total:
                blobSize_Total[count_name] = 0
            if count_name not in blobSize_Daily:
                blobSize_Daily[count_name] = 0
            container_client = blob_service_client.get_container_client(count_name)
            generator = container_client.list_blobs()
            total_size_container = 0
            daily_size_container = 0
            for blob in generator:
                total_size_container += blob.size
                blob_create_time = blob.creation_time.strftime("%Y%m%d")
                if blob_create_time != now_date:
                    continue
                else:
                    # Calculate BlobSize in this month
                    daily_size_container += blob.size
                    # blobSize_Daily[count_name] += blob.size  # /(1024*1024)  # content_length - bytes
            blobSize_Total[count_name] += total_size_container / (1024 * 1024)
            blobSize_Daily[count_name] += daily_size_container / (1024 * 1024)
    return None
class myThread(threading.Thread):
    def __init__(self, threadID, name, connection_string):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.connection_string = connection_string
    def run(self):
        print("开始线程:" + self.name)
        calculateBlob(self.connection_string, self.threadID)
        print("退出线程:" + self.name)
def many_thread(Connection_String_List):
    threads = []
    for i in range(len(Connection_String_List)):  # 循环创建多个个线程
        t = myThread(i, "Thread-" + str(i), Connection_String_List[i])
        threads.append(t)
    for t in threads:  # 循环启动线程 - 一个线程对应一个连接字符串
        t.start()
    for t in threads:
        t.join()
if __name__ == '__main__':
    # connect string
    Connection_String_List =  ['DefaultEndpointsProtocol=https;AccountName=<your storage account 1>;AccountKey=<Key 1>;EndpointSuffix=core.chinacloudapi.cn', 'DefaultEndpointsProtocol=https;AccountName=<your storage account 2>;AccountKey=<Key 2>;EndpointSuffix=core.chinacloudapi.cn']
    # for i in Connection_String:
    start = datetime.datetime.now()
    print(start)
    # 定义全局变量 - blobSize_Daily & blobSize_Total
    blobSize_Daily = {}
    blobSize_Total = {}
    now_date = datetime.datetime.now().strftime("%Y%m%d")
    many_thread(Connection_String_List)
    print("Main Thread End")
    print(blobSize_Daily)
    print(blobSize_Total)
    end = datetime.datetime.now()
    print(end)

运行效果:

 

 

 

参考资料

快速入门:使用 Python v12 SDK 管理 blobhttps://docs.azure.cn/zh-cn/storage/blobs/storage-quickstart-blobs-python

Python 列表(List) : https://www.runoob.com/python/python-lists.html

BlobServiceClient Class : https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobserviceclient?view=azure-python

 

相关文章
|
1月前
|
SQL 数据库 开发者
Python中使用Flask-SQLAlchemy对数据库的增删改查简明示例
这样我们就对Flask-SQLAlchemy进行了一次简明扼要的旅程,阐述了如何定义模型,如何创建表,以及如何进行基本的数据库操作。希望你在阅读后能对Flask-SQLAlchemy有更深入的理解,这将为你在Python世界中从事数据库相关工作提供极大的便利。
192 77
|
3月前
|
XML JSON API
淘宝商品详情API的调用流程(python请求示例以及json数据示例返回参考)
JSON数据示例:需要提供一个结构化的示例,展示商品详情可能包含的字段,如商品标题、价格、库存、描述、图片链接、卖家信息等。考虑到稳定性,示例应基于淘宝开放平台的标准响应格式。
|
25天前
|
API 开发工具 网络架构
【Azure Service Bus】使用Python SDK创建Service Bus Namespace资源(中国区)
本文介绍了如何使用Python SDK创建Azure Service Bus Namespace资源。首先,通过Microsoft Entra ID注册应用获取Client ID、Client Secret和Tenant ID,完成中国区Azure认证。接着,初始化ServiceBusManagementClient对象,并调用`begin_create_or_update`方法创建资源。
81 29
|
29天前
|
人工智能 数据库连接 API
掌握Python的高级用法:技巧、技术和实用性示例
本文分享了Python的高级用法,包括生成器、装饰器、上下文管理器、元类和并发编程等。生成器通过`yield`实现懒加载序列;装饰器用于增强函数功能,如添加日志或性能分析;上下文管理器借助`with`语句管理资源;元类动态定制类行为;并发编程利用`threading`和`asyncio`库提升任务执行效率。掌握这些高级概念可优化代码质量,解决复杂问题,提高程序性能与可维护性。
|
2月前
|
人工智能 API 开发工具
【AI大模型】使用Python调用DeepSeek的API,原来SDK是调用这个,绝对的一分钟上手和使用
本文详细介绍了如何使用Python调用DeepSeek的API,从申请API-Key到实现代码层对话,手把手教你快速上手。DeepSeek作为领先的AI大模型,提供免费体验机会,帮助开发者探索其语言生成能力。通过简单示例代码与自定义界面开发,展示了API的实际应用,让对接过程在一分钟内轻松完成,为项目开发带来更多可能。
|
3月前
|
API 开发工具 Python
|
3月前
|
JSON 监控 API
python语言采集淘宝商品详情数据,json数据示例返回
通过淘宝开放平台的API接口,开发者可以轻松获取商品详情数据,并利用这些数据进行商品分析、价格监控、库存管理等操作。本文提供的示例代码和JSON数据解析方法,可以帮助您快速上手淘宝商品数据的采集与处理。
|
3月前
|
存储 XML 开发工具
【Azure Storage Account】利用App Service作为反向代理, 并使用.NET Storage Account SDK实现上传/下载操作
本文介绍了如何在Azure上使用App Service作为反向代理,以自定义域名访问Storage Account。主要内容包括: 1. **设置反向代理**:通过配置`applicationhost.xdt`和`web.config`文件,启用IIS代理功能并设置重写规则。 2. **验证访问**:测试原生URL和自定义域名的访问效果,确保两者均可正常访问Storage Account。 3. **.NET SDK连接**:使用共享访问签名(SAS URL)初始化BlobServiceClient对象,实现通过自定义域名访问存储服务。
|
4月前
|
Go Python
Python中的round函数详解及使用示例
`round()`函数是Python内置的用于四舍五入数字的工具。它接受一个数字(必需)和可选的小数位数参数,返回最接近的整数或指定精度的浮点数。本文详细介绍其用法、参数及示例,涵盖基本操作、负数处理、特殊情况及应用建议,帮助你更好地理解和运用该函数。
334 2
|
4月前
|
API 开发工具 Python
【Azure Developer】编写Python SDK代码实现从China Azure中VM Disk中创建磁盘快照Snapshot
本文介绍如何使用Python SDK为中国区微软云(China Azure)中的虚拟机磁盘创建快照。通过Azure Python SDK的Snapshot Class,指定`location`和`creation_data`参数,使用`Copy`选项从现有磁盘创建快照。代码示例展示了如何配置Default Azure Credential,并设置特定于中国区Azure的`base_url`和`credential_scopes`。参考资料包括官方文档和相关API说明。

推荐镜像

更多