阿里云OSS文件上传下载与文件删除及检索示例
实践环境
运行环境:
Python 3.5.4
CentOS Linux release 7.4.1708 (Core)/Win10
需要安装以下类库:
pip3 install setuptools_rust1.1.2
pip3 install Crypto1.4.1 # Win10下,安装后,需要更改 site-packages下crypto包名称为Crypto
pip3 install cryptography3.3.2 # 注意,如果不指定版本,安装oss2时会报错:error: can't find Rust compiler
pip3 install oss22.15.0
上传本地文件到阿里云OSS示例
#!/usr/bin/env python # -*- coding: utf-8 -*- import traceback import os # 批量上传文件到OSS def upload_files(bucket, target_dir_path, exclusion_list=[]): oss_objects_path = [] target_dir_path = os.path.normpath(target_dir_path).replace('\\', '/') for root, dirs, files in os.walk(target_dir_path): for file in files: target_file_path = os.path.normpath(os.path.join(root, file)) target_file_relative_path = target_file_path.replace('\\', '/').replace(target_dir_path, '').lstrip('/') if target_file_relative_path in exclusion_list: continue object_path = 'f2b/artifacts/web-admin-react/%s' % target_file_relative_path upload_file(bucket, target_file_path, object_path) oss_objects_path.append(object_path) return oss_objects_path # 上传文件到OSS def upload_file(bucket, target_file_path, object_path): with open(target_file_path, 'rb') as fileobj: res = bucket.put_object(object_path, fileobj) # object_path为Object的完整路径,路径中不能包含Bucket名称。 if res.status != 200: raise Exception('upload %s error,status:%s' % (target_file_path, res.status)) if __name__ == '__main__': try: import oss2 auth = oss2.Auth('ossAccessKeyId', 'ossAccessKeySecret') # oss2.Bucket(auth, endpoint, bucket_name) # endpoint填写Bucket所在地域对应的endpoint,bucket_name为Bucket名称。以华东1(杭州)为例,填写为https://oss-cn-hangzhou.aliyuncs.com。 bucket = oss2.Bucket(auth, 'https://oss-cn-shenzhen.aliyuncs.com', 'exampleBucket') oss_objects_path = [] # 存放上传成功文件对应的OSS对象相对路径 target_path = 'D:\\artifact-eb34ea94.tar.gz' if not os.path.exists(target_path): print('success:false,待上传路径(%s)不存在' % target_path) exit(0) if os.path.isdir(target_path): # 如果为目录 oss_objects_path = upload_files(bucket, target_path) else: object_path = 'f2b/artifacts/web-admin-react/artifact-eb34ea94.tar.gz' upload_file(bucket, target_path, object_path) oss_objects_path.append(object_path) print(','.join(oss_objects_path)) except Exception: print('success:false,%s' % traceback.format_exc())
参考连接:
https://help.aliyun.com/document_detail/88426.htm?spm=a2c4g.11186623.0.0.9e7e7dbbsOWOh6#t22317.html
https://help.aliyun.com/document_detail/31848.html
下载阿里云OSS文件对象到本地文件示例
#!/usr/bin/env python # -*- coding: utf-8 -*- import traceback if __name__ == '__main__': try: import oss2 auth = oss2.Auth('ossAccessKeyId', 'ossAccessKeySecret') # oss2.Bucket(auth, endpoint, bucket_name) # endpoint填写Bucket所在地域对应的endpoint,bucket_name为Bucket名称。以华东1(杭州)为例,填写为https://oss-cn-hangzhou.aliyuncs.com。 bucket = oss2.Bucket(auth, 'https://oss-cn-shenzhen.aliyuncs.com', 'exampleBucket') target_file_local_path = 'D:\\artifacts-17a86f.tar.gz' # 本地文件路径 oss_object_path = 'f2b/artifacts/cloud-f2b-web-admin-react/artifact-eb34ea94.tar.gz' # bucket.get_object_to_file('object_path', 'object_local_path') # object_path 填写Object完整路径,完整路径中不包含Bucket名称,例如testfolder/exampleobject.txt。 # object_local_path 下载的Object在本地存储的文件路径,形如 D:\\localpath\\examplefile.txt。如果指定路径的文件存在会覆盖,不存在则新建。 try: res = bucket.get_object_to_file(oss_object_path, target_file_local_path) if res.status != 200: print('success:false,download fail, unknow exception, status:%s' % res.status) except Exception: print('success:false,%s' % traceback.format_exc()) except Exception: print('success:false,%s' % traceback.format_exc())
参考连接:
https://help.aliyun.com/document_detail/88442.html
列举指定前缀的所有文件
#!/usr/bin/env python # -*- coding: utf-8 -*- import traceback if __name__ == '__main__': try: import oss2 auth = oss2.Auth('ossAccessKeyId', 'ossAccessKeySecret') bucket = oss2.Bucket(auth, 'https://oss-cn-shenzhen.aliyuncs.com', 'exampleBucket') result_file_list = [] for obj in oss2.ObjectIteratorV2(bucket, prefix='f2b/www/alpha/f2b/icec-cloud-f2b-mobile'): result_file_list.append(obj.key) print(obj.key) print(','.join(result_file_list)) except Exception: print('success:false,%s' % traceback.format_exc())
参考连接:
https://help.aliyun.com/document_detail/88458.html
批量删除OSS对象
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys import traceback if __name__ == '__main__': try: import oss2 auth = oss2.Auth('ossAccessKeyId', 'ossAccessKeySecret') bucket = oss2.Bucket(auth, 'https://oss-cn-shenzhen.aliyuncs.com', 'exampleBucket') oss_object_path_list = ''.join(sys.argv[1:2]).split(',') index = 0 oss_objects_to_delete = oss_object_path_list[index: index+1000] # API限制,每次最多删除1000个文件 while oss_objects_to_delete: result = bucket.batch_delete_objects(oss_object_path_list[index: index+1000]) # 打印成功删除的文件名。 print(result.deleted_keys) print('批量删除以下OSS对象成功') print(''.join(result.deleted_keys)) index += 1000 oss_objects_to_delete = oss_object_path_list[index: index+1000] except Exception: print('success:false,%s' % traceback.format_exc())
参考连接: