【Azure Developer】使用 Python SDK连接Azure Storage Account, 计算Blob大小代码示例

简介: 【Azure Developer】使用 Python SDK连接Azure Storage Account, 计算Blob大小代码示例

问题描述

在微软云环境中,使用python SDK连接存储账号(Storage Account)需要计算Blob大小?虽然Azure提供了一个专用工具Azure Storage Explorer可以统计出Blob的大小:

但是它也是只能一个Blob Container一个的统计,如果Container数量巨大,这将是一个繁琐的工作。而作为开发者,应该让代码来帮助完成。下文使用最快上手的Python代码来计算Blob中容量的大小。

 

完整代码
import os, uuid, datetime, threading
import logging
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
def calculateBlob(connect_string, count):
    try:
        blob_service_client = BlobServiceClient.from_connection_string(connect_string)
    except Exception as e:
        messages = str(count) + "Connect_String Error, Messages:" + e.args.__str__()
        print(messages)
        logging.info(messages)
    else:
        all_containers = blob_service_client.list_containers()
        for c in all_containers:
            count_name = c.name
            print(count_name)
            if count_name not in blobSize_Total:
                blobSize_Total[count_name] = 0
            if count_name not in blobSize_Daily:
                blobSize_Daily[count_name] = 0
            container_client = blob_service_client.get_container_client(count_name)
            generator = container_client.list_blobs()
            total_size_container = 0
            daily_size_container = 0
            for blob in generator:
                total_size_container += blob.size
                blob_create_time = blob.creation_time.strftime("%Y%m%d")
                if blob_create_time != now_date:
                    continue
                else:
                    # Calculate BlobSize in this month
                    daily_size_container += blob.size
                    # blobSize_Daily[count_name] += blob.size  # /(1024*1024)  # content_length - bytes
            blobSize_Total[count_name] += total_size_container / (1024 * 1024)
            blobSize_Daily[count_name] += daily_size_container / (1024 * 1024)
    return None
if __name__ == '__main__':
    # connect string
    Connection_String_List ="DefaultEndpointsProtocol=https;AccountName=<storagename>;AccountKey=<key>;EndpointSuffix=core.chinacloudapi.cn"
    # for i in Connection_String:
    start = datetime.datetime.now()
    print(start)
    # 定义全局变量 - blobSize_Daily & blobSize_Total
    blobSize_Daily = {}
    blobSize_Total = {}
    now_date = datetime.datetime.now().strftime("%Y%m%d")
    print("开始计算")
    calculateBlob(Connection_String_List, 1)
    print("计算完成")
    print("统计当前新增大小")
    print(blobSize_Daily)
    print("统计Blob总大小")
    print(blobSize_Total)
    end = datetime.datetime.now()
    print(end)

如运行是没有Azure blob模块,可以使用 pip install azure-storage-blob 安装。以上代码运行结果如下:

 

 

如果有多个Storage Account,可以考虑加入多线程的方式来运行,在代码中增加一个myThread类,然后在 __main__ 中把 calculateBlob(Connection_String_List, 1) 运行替换为 many_thread(Connection_String_List) 即可。

class myThread(threading.Thread):
    def __init__(self, threadID, name, connection_string):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.connection_string = connection_string
    def run(self):
        print("开始线程:" + self.name)
        calculateBlob(self.connection_string, self.threadID)
        print("退出线程:" + self.name)
def many_thread(Connection_String_List):
    threads = []
    for i in range(len(Connection_String_List)):  # 循环创建多个线程
        t = myThread(i, "Thread-" + str(i), Connection_String_List[i])
        threads.append(t)
    for t in threads:  # 循环启动线程 一个线程对应一个连接字符串
        t.start()
    for t in threads:
        t.join()

 

遇见问题

在多线程执行时,可能会遇见问题:("Connection broken: ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)", ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)),出现此问题大都是由于客户端使用了已经断开的连接导致所导致的。所以一定要仔细调试多线程关闭代码。是否是把还需要运行的线程给关闭了。导致了以上的错误消息。

 

附录一:多线程计算Blob的完整代码

import os, uuid, datetime, threading
import logging
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
def calculateBlob(connect_string, count):
    try:
        blob_service_client = BlobServiceClient.from_connection_string(connect_string)
    except Exception as e:
        messages = str(count) + "Connect_String Error, Messages:" + e.args.__str__()
        print(messages)
        logging.info(messages)
    else:
        all_containers = blob_service_client.list_containers()
        for c in all_containers:
            count_name = c.name
            print(count_name)
            if count_name not in blobSize_Total:
                blobSize_Total[count_name] = 0
            if count_name not in blobSize_Daily:
                blobSize_Daily[count_name] = 0
            container_client = blob_service_client.get_container_client(count_name)
            generator = container_client.list_blobs()
            total_size_container = 0
            daily_size_container = 0
            for blob in generator:
                total_size_container += blob.size
                blob_create_time = blob.creation_time.strftime("%Y%m%d")
                if blob_create_time != now_date:
                    continue
                else:
                    # Calculate BlobSize in this month
                    daily_size_container += blob.size
                    # blobSize_Daily[count_name] += blob.size  # /(1024*1024)  # content_length - bytes
            blobSize_Total[count_name] += total_size_container / (1024 * 1024)
            blobSize_Daily[count_name] += daily_size_container / (1024 * 1024)
    return None
class myThread(threading.Thread):
    def __init__(self, threadID, name, connection_string):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.connection_string = connection_string
    def run(self):
        print("开始线程:" + self.name)
        calculateBlob(self.connection_string, self.threadID)
        print("退出线程:" + self.name)
def many_thread(Connection_String_List):
    threads = []
    for i in range(len(Connection_String_List)):  # 循环创建多个个线程
        t = myThread(i, "Thread-" + str(i), Connection_String_List[i])
        threads.append(t)
    for t in threads:  # 循环启动线程 - 一个线程对应一个连接字符串
        t.start()
    for t in threads:
        t.join()
if __name__ == '__main__':
    # connect string
    Connection_String_List =  ['DefaultEndpointsProtocol=https;AccountName=<your storage account 1>;AccountKey=<Key 1>;EndpointSuffix=core.chinacloudapi.cn', 'DefaultEndpointsProtocol=https;AccountName=<your storage account 2>;AccountKey=<Key 2>;EndpointSuffix=core.chinacloudapi.cn']
    # for i in Connection_String:
    start = datetime.datetime.now()
    print(start)
    # 定义全局变量 - blobSize_Daily & blobSize_Total
    blobSize_Daily = {}
    blobSize_Total = {}
    now_date = datetime.datetime.now().strftime("%Y%m%d")
    many_thread(Connection_String_List)
    print("Main Thread End")
    print(blobSize_Daily)
    print(blobSize_Total)
    end = datetime.datetime.now()
    print(end)

运行效果:

 

 

 

参考资料

快速入门:使用 Python v12 SDK 管理 blobhttps://docs.azure.cn/zh-cn/storage/blobs/storage-quickstart-blobs-python

Python 列表(List) : https://www.runoob.com/python/python-lists.html

BlobServiceClient Class : https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobserviceclient?view=azure-python

 

相关文章
|
20天前
|
NoSQL Unix 网络安全
【Azure Cache for Redis】Python Django-Redis连接Azure Redis服务遇上(104, 'Connection reset by peer')
【Azure Cache for Redis】Python Django-Redis连接Azure Redis服务遇上(104, 'Connection reset by peer')
【Azure Cache for Redis】Python Django-Redis连接Azure Redis服务遇上(104, 'Connection reset by peer')
|
20天前
|
存储 Java API
【Azure 存储服务】Java Storage SDK 调用 uploadWithResponse 代码示例(询问ChatGTP得代码原型后人力验证)
【Azure 存储服务】Java Storage SDK 调用 uploadWithResponse 代码示例(询问ChatGTP得代码原型后人力验证)
|
16天前
|
数据采集 开发工具 Python
海康威视工业相机SDK+Python+PyQt开发数据采集系统(支持软件触发、编码器触发)
该系统基于海康威视工业相机SDK,使用Python与PyQt开发,支持Gige与USB相机设备的搜索及双相机同时显示。系统提供软件触发与编码器触发模式,并可在数据采集过程中实时保存图像。此外,用户可以调节曝光时间和增益,并进行信息输入,这些信息将被保存至配置文件以便下次自动加载。参数调节与实时预览等功能进一步增强了系统的实用性。
40 1
|
20天前
|
存储 API 开发工具
【Azure Storage Blob】如何通过.NET Azure Storage Blobs SDK获取到Blob的MD5值呢?
【Azure Storage Blob】如何通过.NET Azure Storage Blobs SDK获取到Blob的MD5值呢?
|
20天前
|
网络安全 开发工具 Python
【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误
【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误
|
20天前
|
API 开发工具 网络架构
【Azure Developer】使用Python SDK去Azure Container Instance服务的Execute命令的疑问解释
【Azure Developer】使用Python SDK去Azure Container Instance服务的Execute命令的疑问解释
【Azure Developer】使用Python SDK去Azure Container Instance服务的Execute命令的疑问解释
|
20天前
|
开发工具 iOS开发 容器
【Azure Blob】关闭Blob 匿名访问,iOS Objective-C SDK连接Storage Account报错
【Azure Blob】关闭Blob 匿名访问,iOS Objective-C SDK连接Storage Account报错
|
20天前
|
机器学习/深度学习 开发工具 Python
【Azure 应用服务】使用Python Azure SDK 来获取 App Service的访问限制信息(Access Restrictions)
【Azure 应用服务】使用Python Azure SDK 来获取 App Service的访问限制信息(Access Restrictions)
|
3天前
|
存储 人工智能 数据挖掘
Python编程入门:从基础到实战
【9月更文挑战第10天】本文将引导你进入Python编程的世界,从基本语法到实际项目应用,逐步深入。我们将通过简单的例子和代码片段,帮助你理解并掌握Python编程的精髓。无论你是编程新手还是有一定经验的开发者,都能在这篇文章中找到有价值的信息。让我们一起开始Python编程之旅吧!
|
3天前
|
机器学习/深度学习 数据挖掘 开发者
探索Python编程:从基础到进阶的旅程
【9月更文挑战第10天】本文是一篇深入浅出的技术感悟文章,通过作者自身的学习经历,向读者展示了如何从Python编程的基础入门逐步深入到高级应用。文章不仅分享了实用的代码示例,还提供了学习资源和建议,旨在鼓励初学者坚持学习,不断探索编程世界的奥秘。