阿里云存储服务OSSAPI(python)
Re阿里云存储服务OSSAPI(python)
几个例子
在自带的oss_sample.py 就包含了一些主要操作的例子,可以参考借鉴。下面给出几个常见操作的范例:
list buckets 罗列所有存储空间
1: from oss_api import * 2: from oss_xml_handler import * 3: 4: HOST='storage.aliyun.com' 5: ACCESS_ID = '' 6: SECRET_ACCESS_KEY = '' 7: 8: oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY) 9: 10: res = oss.list_all_my_buckets() 11: if 2 == (res.status / 100): 12: http_body = res.read() 13: bucket_list = GetServiceXml(http_body) 14: for bucket in bucket_list.list(): 15: print bucket 16: else: 17: print 'ERROR'
list objects 罗列存储空间中所有对象
1: from oss_api import * 2: from oss_xml_handler import * 3: 4: HOST='storage.aliyun.com' 5: ACCESS_ID = '' 6: SECRET_ACCESS_KEY = '' 7: 8: oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY) 9: 10: res = oss.list_bucket(bucket_name) 11: if 2 == (res.status / 100): 12: data = res.read() 13: h = GetBucketXml(data) 14: (file_list, common_list) = h.list() 15: for each in file_list: 16: print each 17:
create a bucket 创建一个存储空间
1: from oss_api import * 2: from oss_xml_handler import * 3: import sys 4: 5: HOST='storage.aliyun.com' 6: ACCESS_ID = '' 7: SECRET_ACCESS_KEY = '' 8: 9: oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY) 10: new_bucket = sys.argv[1] 11: res = oss.put_bucket(new_bucket) 12: if 2 == (res.status / 100) : 13: print 'Succeed' 14: else: 15: print 'Fail\n%s' % res.read()
修改指定存储空间访问控制权限
因为创建存储空间默认为private ,需要自己设置权限
1: from oss_api import * 2: from oss_xml_handler import * 3: import sys 4: 5: HOST='storage.aliyun.com' 6: ACCESS_ID = '' 7: SECRET_ACCESS_KEY = '' 8: bucket_name = sys.argv[1] 9: 10: oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY) 11: res = oss.put_bucket(bucket_name, 'public-read') 12: 13: if 2 == (res.status / 100) : 14: print 'Succeed' 15: else: 16: print 'Fail\n%s' % res.read()
上传一个文件到存储空间
1: from oss_api import * 2: from oss_xml_handler import * 3: 4: HOST='storage.aliyun.com' 5: ACCESS_ID = '' 6: SECRET_ACCESS_KEY = '' 7: bucket_name = sys.argv[1] 8: file_name = sys.argv[2] 9: 10: oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY) 11: 12: res = oss.put_object_from_file(bucket_name, '1.jpg', file_name, 'image/jpg') 13: if 2 == (res.status / 100): 14: print 'Succeed' 15: else: 16: print 'Fail\n%s' % res.read()
从存储空间下载指定文件
1: from oss_api import * 2: from oss_xml_handler import * 3: 4: HOST='storage.aliyun.com' 5: ACCESS_ID = '' 6: SECRET_ACCESS_KEY = '' 7: 8: oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY) 9: res = oss.get_object_to_file('hust', '1.jpg', './new.JPEG') 10: if 2 == (res.status / 100): 11: print 'Succeed' 12: else: 13: print 'Fail\n%s' % res.read()
从存储空间删除指定文件
API 没有直接提供像delete_bucket() 样的delete_object() 的操作,但是用object_operation() 也是一样的,实际上delete_bucket() 也只是调用DELETE 方式的bucket_operation() 方法。
1: from oss_api import * 2: from oss_xml_handler import * 3: 4: HOST='storage.aliyun.com' 5: ACCESS_ID = '' 6: SECRET_ACCESS_KEY = '' 7: 8: oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY) 9: res = oss.object_operation('DELETE','hust','1.jpg') 10: if 2 == (res.status/100): 11: print 'Succeed!' 12: else: 13: print res.read()
合并对象(object)为对象组(object group)
1: from oss_api import * 2: from oss_xml_handler import * 3: 4: HOST='storage.aliyun.com' 5: ACCESS_ID = '' 6: SECRET_ACCESS_KEY = '' 7: 8: obs_msg_list = [[0,'800.png','43B0177C78BF904970F7B1C9005001F3'],[1,'new.JPEG','5C6E71FDF26A82FBD42BC5D31787FE20']] 9: 10: xml_string = r'' 11: for part in obs_msg_list: 12: if isinstance(part[1], unicode): 13: file_path = part[1].encode('utf-8') 14: else: 15: file_path = part[1] 16: xml_string = r'' 17: xml_string = r'' str(part[0]) r'' 18: xml_string = r'' str(file_path) r'' 19: xml_string = r''' str(part[2]).upper() r''' 20: xml_string = r'' 21: xml_string = r'' 22: 23: oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY) 24: res = oss.post_object_group('hust','group.msh',xml_string) 25: if 2 == (res.status / 100): 26: print 'Successed!' 27: else: 28: print res.read()
开始我试图自己写xml ………,老是返回 InvalidXMLFormat ,估计还是编码的问题。
-------------------------
Re阿里云存储服务OSSAPI(python)
来个复杂点的
下面对上面的操作进行了封装,能够提供一个简单的命令行操作,写的有点乱。
1: #coding=utf8 2: from oss_api import * 3: from oss_xml_handler import * 4: import os 5: import sys 6: 7: HOST='storage.aliyun.com' 8: ACCESS_ID = '' 9: SECRET_ACCESS_KEY = '' 10: 11: class OssFS: 12: def __init__(self, oHost, oId='', oKey=''): 13: self.oHost = oHost 14: self.oId = oId 15: self.oKey = oKey 16: self.oss = OssAPI(oHost,oId,oKey) 17: self.buckets = [] 18: res = self.oss.list_all_my_buckets() 19: http_body = res.read() 20: if 2 == (res.status/100): 21: bucket_list = GetServiceXml(http_body) 22: for bucket in bucket_list.list(): 23: (h1,h2) = bucket 24: self.buckets.append(str(h1)) 25: else: 26: print http_body 27: 28: def show_buckets(self): 29: '''Show all buckets''' 30: for each in self.buckets: 31: print each 32: 33: def sizetoKMGT(self,snum): 34: '''convert size number to KB/MB/GB/TB''' 35: if 0 == int(snum): 36: return 'obj grp' 37: lsize = ['Bytes', 'KB', 'MB', 'GB', 'TB'] 38: i = 0 39: snum = int(snum) * 1.0 40: while snum >; 1024: 41: snum = snum / 1024.0 42: i = 1; 43: return ('%.2f' ' ' lsize)%(snum) 44: 45: 46: def show_objects(self, bucket = '', path = ''): 47: '''Show all objects in the path from the bucket''' 48: res = self.oss.list_bucket(bucket) 49: http_body = res.read() 50: if 2 == (res.status / 100): 51: h = GetBucketXml(http_body) 52: (file_list ,common_list) = h.list() 53: for each in file_list: 54: (fname, ctime, etag, fsize, owner, owner2, fstyle) = each 55: print 's\t %s\t %s' % (str(fname),self.sizetoKMGT(fsize),etag) 56: else: 57: print http_body 58: 59: def file_upload(self, bucket='' ,path='' ,file_name=''): 60: '''Upload a file from local PC to the OSS''' 61: res = self.oss.put_object_from_file(bucket, file_name, file_name) 62: if 2 == (res.status/100): 63: print 'Success to upload the file: ' file_name 64: return True 65: else: 66: print res.read() 67: 68: def is_file(self, bucket, file_name): 69: '''Wether the file_name is in the bucket or not''' 70: res = self.oss.list_bucket(bucket) 71: http_body = res.read() 72: if 2 ==(res.status / 100): 73: h = GetBucketXml(http_body) 74: (file_list, common_list) = h.list() 75: for each in file_list: 76: fname = str(each[0]) 77: if fname == file_name: 78: return True 79: return False 80: 81: def file_download(self, bucket, file_name): 82: '''Download the file(file_name) from the bucket''' 83: res = self.get_object_to_file(bucket, file_name, file_name) 84: if 2 == (res.status / 100): 85: print 'Get file: ' file_name 86: else: 87: print res.read() 88: 89: def file_delete(self, bucket, file_name): 90: '''Delete the file(file_name) from the bucket''' 91: res = self.oss.object_operation('DELETE', bucket, file_name) 92: if 2 == (res.status/100): 93: print 'Success to delete the file: ' file_name 94: return True 95: else: 96: print res.read() 97: return False 98: 99: if __name__=='__main__': 100: isRoot = True; 101: #isRoot wether is root floder 102: uBucket = '' 103: uPath = '' 104: uFs = OssFS(HOST,ACCESS_ID,SECRET_ACCESS_KEY) 105: 106: print '===============================================' 107: while True: 108: strCmd = raw_input('Input your command:') 109: cmd = strCmd.split(' ') 110: 111: if 'ls' == cmd[0]: 112: if 1 == len(cmd): 113: if True == isRoot: 114: uFs.show_buckets() 115: continue 116: else: 117: uFs.show_objects(uBucket,uPath) 118: continue 119: elif 2 == len(cmd): 120: if 'local' == cmd[1]: 121: for each in os.listdir('.'): 122: print each 123: continue 124: else: 125: print '>>>ERROR: BAD option! (Try \'ls local\')' 126: continue 127: 128: elif 'cd' == cmd[0]: 129: if len(cmd) != 2: 130: print '>>>ERROR: BAD option! (Bucket name or file name need)' 131: continue 132: else: 133: if '.' == cmd[1]: 134: continue 135: elif '..' == cmd[1]: 136: if True == isRoot: 137: print 'Root already' 138: else: 139: isRoot = True 140: continue 141: else: 142: if False == isRoot: 143: print '>>>ERROR BAD bucket, you are in bucket already' 144: continue 145: uBucket = cmd[1] 146: if uBucket not in uFs.buckets: 147: print '>>>ERROR BAD bucket' uBucket ' is not in your buckets' 148: continue 149: isRoot = False 150: continue 151: 152: elif 'put' == cmd[0]: 153: if True == isRoot: 154: print '>>>ERROR: BAD PATH (you must cd a bucket first )' 155: continue 156: if len(cmd) != 2: 157: print '>>>ERROR: BAD option! (put a file one time)' 158: continue 159: else: 160: file_name = cmd[1] 161: if os.path.isfile(file_name): #file exist 162: uFs.file_upload(uBucket,uPath,file_name) 163: continue 164: else: 165: print '>>>ERROR: BAD file name (file not exist!)' 166: continue 167: 168: elif 'get' == cmd[0]: 169: if len(cmd) !=2: 170: print '>>>ERROR: BAD option' 171: continue 172: if True == isRoot: 173: print '>>>ERROR: BAD get, access a bucket first' 174: continue 175: file_name = cmd[1] 176: if False == uFs.is_file(uBucket, file_name): 177: print file_name ' is not in your bucket' uBucket 178: continue 179: uFs.file_download(uBacket,file_name) #didnot consider the file has already exist 180: continue 181: 182: elif 'delete' == cmd[0]: 183: if True == isRoot: 184: print '>>>BAD delete, I will not let you delete your bucket!' 185: continue 186: if len(cmd) != 2: 187: print '>>>BAD options' 188: continue 189: file_name = cmd[1] 190: if False == uFs.is_file(uBucket, file_name): 191: print file_name ' is not in your bucket' uBucket 192: continue 193: uFs.file_delete(uBucket, file_name) 194: continue 195: 196: elif 'help' == cmd[0]: 197: pass 198: continue 199: 200: elif 'quit' == cmd[0]: 201: break 202: 203: else: 204: print 'Unable to recognize your command!' 205: continue 206: print '==============================================='
赞0
踩0