python编写api调用ceph对象网关

简介:
#_*_coding:utf-8_*_
#yum install python-boto
import  boto
import  boto.s3.connection
#pip install filechunkio
from  filechunkio  import   FileChunkIO
import  math
import   threading
import  os
import  Queue
class  Chunk( object ):
     num  =  0
     offset  =  0
     len  =  0
     def  __init__( self ,n,o,l):
         self .num = n
         self .offset = o
         self .length = l
 
 
 
class  CONNECTION( object ):
     def  __init__( self ,access_key,secret_key,ip,port,is_secure = False ,chrunksize = 8 << 20 ):  #chunksize最小8M否则上传过程会报错
         self .conn = boto.connect_s3(
         aws_access_key_id = access_key,
         aws_secret_access_key = secret_key,
         host = ip,port = port,
         is_secure = is_secure,
         calling_format = boto.s3.connection.OrdinaryCallingFormat()
         )
         self .chrunksize = chrunksize
         self .port = port
 
     #查询
     def  list_all( self ):
         all_buckets = self .conn.get_all_buckets()
         for  bucket  in  all_buckets:
             print  u '容器名: %s'  % (bucket.name)
             for  key  in  bucket. list ():
                 print  ' ' * 5 , "%-20s%-20s%-20s%-40s%-20s"  % (key.mode,key.owner. id ,key.size,key.last_modified.split( '.' )[ 0 ],key.name)
 
     def  list_single( self ,bucket_name):
         try :
             single_bucket  =  self .conn.get_bucket(bucket_name)
         except  Exception as e:
             print  'bucket %s is not exist'  % bucket_name
             return
         print  u '容器名: %s'  %  (single_bucket.name)
         for  key  in  single_bucket. list ():
             print  ' '  *  5 "%-20s%-20s%-20s%-40s%-20s"  %  (key.mode, key.owner. id , key.size, key.last_modified.split( '.' )[ 0 ], key.name)
 
     #普通小文件下载:文件大小<=8M
     def  dowload_file( self ,filepath,key_name,bucket_name):
         all_bucket_name_list  =  [i.name  for  in  self .conn.get_all_buckets()]
         if  bucket_name  not  in  all_bucket_name_list:
             print  'Bucket %s is not exist,please try again'  %  (bucket_name)
             return
         else :
             bucket  =  self .conn.get_bucket(bucket_name)
 
         all_key_name_list  =  [i.name  for  in  bucket.get_all_keys()]
         if  key_name  not  in  all_key_name_list:
             print  'File %s is not exist,please try again'  %  (key_name)
             return
         else :
             key  =  bucket.get_key(key_name)
 
         if  not  os.path.exists(os.path.dirname(filepath)):
             print  'Filepath %s is not exists, sure to create and try again'  %  (filepath)
             return
 
         if  os.path.exists(filepath):
             while  True :
                 d_tag  =  raw_input ( 'File %s already exists, sure you want to cover (Y/N)?'  %  (key_name)).strip()
                 if  d_tag  not  in  [ 'Y' 'N' or  len (d_tag)  = =  0 :
                     continue
                 elif  d_tag  = =  'Y' :
                     os.remove(filepath)
                     break
                 elif  d_tag  = =  'N' :
                     return
         os.mknod(filepath)
         try :
             key.get_contents_to_filename(filepath)
         except  Exception:
             pass
 
     # 普通小文件上传:文件大小<=8M
     def  upload_file( self ,filepath,key_name,bucket_name):
         try :
             bucket  =  self .conn.get_bucket(bucket_name)
         except  Exception as e:
             print  'bucket %s is not exist'  %  bucket_name
             tag  =  raw_input ( 'Do you want to create the bucket %s: (Y/N)?'  %  bucket_name).strip()
             while  tag  not  in  [ 'Y' 'N' ]:
                 tag  =  raw_input ( 'Please input (Y/N)' ).strip()
             if  tag  = =  'N' :
                 return
             elif  tag  = =  'Y' :
                 self .conn.create_bucket(bucket_name)
                 bucket  =  self .conn.get_bucket(bucket_name)
         all_key_name_list  =  [i.name  for  in  bucket.get_all_keys()]
         if  key_name  in  all_key_name_list:
             while  True :
                 f_tag  =  raw_input (u 'File already exists, sure you want to cover (Y/N)?: ' ).strip()
                 if  f_tag  not  in  [ 'Y' 'N' or  len (f_tag)  = =  0 :
                     continue
                 elif  f_tag  = =  'Y' :
                     break
                 elif  f_tag  = =  'N' :
                     return
         key = bucket.new_key(key_name)
         if  not  os.path.exists(filepath):
             print  'File %s does not exist, please make sure you want to upload file path and try again'  % (key_name)
             return
         try :
             f = file (filepath, 'rb' )
             data = f.read()
             key.set_contents_from_string(data)
         except  Exception:
             pass
 
     def  delete_file( self ,key_name,bucket_name):
         all_bucket_name_list  =  [i.name  for  in  self .conn.get_all_buckets()]
         if  bucket_name  not  in  all_bucket_name_list:
             print  'Bucket %s is not exist,please try again'  %  (bucket_name)
             return
         else :
             bucket  =  self .conn.get_bucket(bucket_name)
 
         all_key_name_list  =  [i.name  for  in  bucket.get_all_keys()]
         if  key_name  not  in  all_key_name_list:
             print  'File %s is not exist,please try again'  %  (key_name)
             return
         else :
             key  =  bucket.get_key(key_name)
 
         try :
             bucket.delete_key(key.name)
         except  Exception:
             pass
 
     def  delete_bucket( self ,bucket_name):
         all_bucket_name_list  =  [i.name  for  in  self .conn.get_all_buckets()]
         if  bucket_name  not  in  all_bucket_name_list:
             print  'Bucket %s is not exist,please try again'  %  (bucket_name)
             return
         else :
             bucket  =  self .conn.get_bucket(bucket_name)
 
         try :
             self .conn.delete_bucket(bucket.name)
         except  Exception:
             pass
 
 
     #队列生成
     def  init_queue( self ,filesize,chunksize):    #8<<20 :8*2**20
         chunkcnt = int (math.ceil(filesize * 1.0 / chunksize))
         q = Queue.Queue(maxsize = chunkcnt)
         for  in  range ( 0 ,chunkcnt):
             offset = chunksize * i
             length = min (chunksize,filesize - offset)
             c = Chunk(i + 1 ,offset,length)
             q.put(c)
         return  q
 
     #分片上传object
     def  upload_trunk( self ,filepath,mp,q, id ):
         while  not  q.empty():
             chunk = q.get()
             fp = FileChunkIO(filepath, 'r' ,offset = chunk.offset,bytes = chunk.length)
             mp.upload_part_from_file(fp,part_num = chunk.num)
             fp.close()
             q.task_done()
 
     #文件大小获取---->S3分片上传对象生成----->初始队列生成(--------------->文件切,生成切分对象)
     def  upload_file_multipart( self ,filepath,key_name,bucket_name,threadcnt = 8 ):
         filesize = os.stat(filepath).st_size
         try :
             bucket = self .conn.get_bucket(bucket_name)
         except  Exception as e:
             print  'bucket %s is not exist'  %  bucket_name
             tag = raw_input ( 'Do you want to create the bucket %s: (Y/N)?'  % bucket_name).strip()
             while  tag  not  in  [ 'Y' , 'N' ]:
                 tag = raw_input ( 'Please input (Y/N)' ).strip()
             if  tag  = =  'N' :
                 return
             elif  tag  = =  'Y' :
                 self .conn.create_bucket(bucket_name)
                 bucket  =  self .conn.get_bucket(bucket_name)
         all_key_name_list = [i.name  for  in  bucket.get_all_keys()]
         if  key_name   in  all_key_name_list:
             while  True :
                 f_tag = raw_input (u 'File already exists, sure you want to cover (Y/N)?: ' ).strip()
                 if  f_tag  not  in  [ 'Y' , 'N' or  len (f_tag)  = =  0 :
                     continue
                 elif  f_tag  = =  'Y' :
                     break
                 elif  f_tag  = =  'N' :
                     return
 
         mp = bucket.initiate_multipart_upload(key_name)
         q = self .init_queue(filesize, self .chrunksize)
         for  in  range ( 0 ,threadcnt):
             t = threading.Thread(target = self .upload_trunk,args = (filepath,mp,q,i))
             t.setDaemon( True )
             t.start()
         q.join()
         mp.complete_upload()
 
     #文件分片下载
     def  download_chrunk( self ,filepath,key_name,bucket_name,q, id ):
         while  not  q.empty():
             chrunk = q.get()
             offset = chrunk.offset
             length = chrunk.length
             bucket = self .conn.get_bucket(bucket_name)
             resp = bucket.connection.make_request( 'GET' ,bucket_name,key_name,headers = { 'Range' : "bytes=%d-%d"  % (offset,offset + length)})
             data = resp.read(length)
             fp = FileChunkIO(filepath, 'r+' ,offset = chrunk.offset,bytes = chrunk.length)
             fp.write(data)
             fp.close()
             q.task_done()
 
     def  download_file_multipart( self ,filepath,key_name,bucket_name,threadcnt = 8 ):
         all_bucket_name_list = [i.name  for  in  self .conn.get_all_buckets()]
         if  bucket_name  not  in  all_bucket_name_list:
             print  'Bucket %s is not exist,please try again'  % (bucket_name)
             return
         else :
             bucket = self .conn.get_bucket(bucket_name)
 
         all_key_name_list  =  [i.name  for  in  bucket.get_all_keys()]
         if  key_name  not  in  all_key_name_list:
             print  'File %s is not exist,please try again'  % (key_name)
             return
         else :
             key = bucket.get_key(key_name)
 
         if  not  os.path.exists(os.path.dirname(filepath)):
             print  'Filepath %s is not exists, sure to create and try again'  %  (filepath)
             return
 
         if  os.path.exists(filepath):
             while  True :
                 d_tag  =  raw_input ( 'File %s already exists, sure you want to cover (Y/N)?'  %  (key_name)).strip()
                 if  d_tag  not  in  [ 'Y' 'N' or  len (d_tag)  = =  0 :
                     continue
                 elif  d_tag  = =  'Y' :
                     os.remove(filepath)
                     break
                 elif  d_tag  = =  'N' :
                     return
         os.mknod(filepath)
         filesize = key.size
         q = self .init_queue(filesize, self .chrunksize)
         for  in  range ( 0 ,threadcnt):
             t = threading.Thread(target = self .download_chrunk,args = (filepath,key_name,bucket_name,q,i))
             t.setDaemon( True )
             t.start()
         q.join()
 
     def  generate_object_download_urls( self ,key_name,bucket_name,valid_time = 0 ):
         all_bucket_name_list  =  [i.name  for  in  self .conn.get_all_buckets()]
         if  bucket_name  not  in  all_bucket_name_list:
             print  'Bucket %s is not exist,please try again'  %  (bucket_name)
             return
         else :
             bucket  =  self .conn.get_bucket(bucket_name)
 
         all_key_name_list  =  [i.name  for  in  bucket.get_all_keys()]
         if  key_name  not  in  all_key_name_list:
             print  'File %s is not exist,please try again'  %  (key_name)
             return
         else :
             key  =  bucket.get_key(key_name)
 
         try :
             key.set_canned_acl( 'public-read' )
             download_url  =  key.generate_url(valid_time, query_auth = False , force_http = True )
             if  self .port ! =  80 :
                 x1 = download_url.split( '/' )[ 0 : 3 ]
                 x2 = download_url.split( '/' )[ 3 :]
                 s1 = u '/' .join(x1)
                 s2 = u '/' .join(x2)
 
                 s3 = ':%s/'  % ( str ( self .port))
                 download_url = s1 + s3 + s2
                 print  download_url
 
         except  Exception:
             pass
 
 
 
if  __name__  = =  '__main__' :
     #约定:
     #1:filepath指本地文件的路径(上传路径or下载路径),指的是绝对路径
     #2:bucket_name相当于文件在对象存储中的目录名或者索引名
     #3:key_name相当于文件在对象存储中对应的文件名或文件索引
 
     access_key  =  "65IY4EC1BSFYNH6SHWGW"
     secret_key  =  "viNfIftLHhrPt2MYK44DkWGvxZb82aYqLrCzGYLx"
     ip = '172.16.201.36'
     port = 8080
     conn = CONNECTION(access_key,secret_key,ip,port)
     #查看所有bucket以及其包含的文件
     #conn.list_all()
 
     #简单上传,用于文件大小<=8M
     # conn.upload_file('/etc/passwd','passwd','test_bucket01')
     #查看单一bucket下所包含的文件信息
     # conn.list_single('test_bucket01')
 
 
     #简单下载,用于文件大小<=8M
     # conn.dowload_file('/lhf_test/test01','passwd','test_bucket01')
     # conn.list_single('test_bucket01')
 
     #删除文件
     # conn.delete_file('passwd','test_bucket01')
     # conn.list_single('test_bucket01')
     #
     #删除bucket
     # conn.delete_bucket('test_bucket01')
     # conn.list_all()
 
     #切片上传(多线程),用于文件大小>8M,8M可修改,但不能小于8M,否则会报错切片太小
     # conn.upload_file_multipart('/etc/passwd','passwd_multi_upload','test_bucket01')
     # conn.list_single('test_bucket01')
 
     # 切片下载(多线程),用于文件大小>8M,8M可修改,但不能小于8M,否则会报错切片太小
     # conn.download_file_multipart('/lhf_test/passwd_multi_dowload','passwd_multi_upload','test_bucket01')
 
     #生成下载url
     #conn.generate_object_download_urls('passwd_multi_upload','test_bucket01')
     #conn.list_all()
舒琪
+关注
目录
打赏
0
0
0
0
6
分享
相关文章
淘宝商品详情API的调用流程(python请求示例以及json数据示例返回参考)
JSON数据示例:需要提供一个结构化的示例,展示商品详情可能包含的字段,如商品标题、价格、库存、描述、图片链接、卖家信息等。考虑到稳定性,示例应基于淘宝开放平台的标准响应格式。
[oeasy]python083_类_对象_成员方法_method_函数_function_isinstance
本文介绍了Python中类、对象、成员方法及函数的概念。通过超市商品分类的例子,形象地解释了“类型”的概念,如整型(int)和字符串(str)是两种不同的数据类型。整型对象支持数字求和,字符串对象支持拼接。使用`isinstance`函数可以判断对象是否属于特定类型,例如判断变量是否为整型。此外,还探讨了面向对象编程(OOP)与面向过程编程的区别,并简要介绍了`type`和`help`函数的用法。最后总结指出,不同类型的对象有不同的运算和方法,如字符串有`find`和`index`方法,而整型没有。更多内容可参考文末提供的蓝桥、GitHub和Gitee链接。
38 11
Python 高级编程与实战:构建 RESTful API
本文深入探讨了使用 Python 构建 RESTful API 的方法,涵盖 Flask、Django REST Framework 和 FastAPI 三个主流框架。通过实战项目示例,详细讲解了如何处理 GET、POST 请求,并返回相应数据。学习这些技术将帮助你掌握构建高效、可靠的 Web API。
1688平台API接口实战:Python实现店铺全量商品数据抓取
本文介绍如何使用Python通过1688开放平台的API接口自动化抓取店铺所有商品数据。首先,开发者需在1688开放平台完成注册并获取App Key和App Secret,申请“商品信息查询”权限。接着,利用`alibaba.trade.product.search4trade`接口,构建请求参数、生成MD5签名,并通过分页机制获取全量商品数据。文中详细解析了响应结构、存储优化及常见问题处理方法,还提供了竞品监控、库存预警等应用场景示例和完整代码。
实战指南:通过1688开放平台API获取商品详情数据(附Python代码及避坑指南)
1688作为国内最大的B2B供应链平台,其API为企业提供合法合规的JSON数据源,直接获取批发价、SKU库存等核心数据。相比爬虫方案,官方API避免了反爬严格、数据缺失和法律风险等问题。企业接入1688商品API需完成资质认证、创建应用、签名机制解析及调用接口四步。应用场景包括智能采购系统、供应商评估模型和跨境选品分析。提供高频问题解决方案及安全合规实践,确保数据安全与合法使用。立即访问1688开放平台,解锁B2B数据宝藏!
Python爬虫与1688图片搜索API接口:深度解析与显著收益
在电子商务领域,数据是驱动业务决策的核心。阿里巴巴旗下的1688平台作为全球领先的B2B市场,提供了丰富的API接口,特别是图片搜索API(`item_search_img`),允许开发者通过上传图片搜索相似商品。本文介绍如何结合Python爬虫技术高效利用该接口,提升搜索效率和用户体验,助力企业实现自动化商品搜索、库存管理优化、竞品监控与定价策略调整等,显著提高运营效率和市场竞争力。
161 3
如何利用Python爬虫淘宝商品详情高级版(item_get_pro)API接口及返回值解析说明
本文介绍了如何利用Python爬虫技术调用淘宝商品详情高级版API接口(item_get_pro),获取商品的详细信息,包括标题、价格、销量等。文章涵盖了环境准备、API权限申请、请求构建和返回值解析等内容,强调了数据获取的合规性和安全性。
Python如何显示对象的某个属性的所有值
本文介绍了如何在Python中使用`getattr`和`hasattr`函数来访问和检查对象的属性。通过这些工具,可以轻松遍历对象列表并提取特定属性的所有值,适用于数据处理和分析任务。示例包括获取对象列表中所有书籍的作者和检查动物对象的名称属性。
58 2
京东商品详情 API 接口指南(Python 篇)
本简介介绍如何使用Python抓取京东商品详情数据。首先,需搭建开发环境并安装必要的库(如requests、BeautifulSoup和lxml),了解京东反爬虫机制,确定商品ID获取方式。通过发送HTTP请求并解析HTML,可提取价格、优惠券、视频链接等信息。此方法适用于电商数据分析、竞品分析、购物助手及内容创作等场景,帮助用户做出更明智的购买决策,优化营销策略。
|
4月前
|
Python内存管理:掌握对象的生命周期与垃圾回收机制####
本文深入探讨了Python中的内存管理机制,特别是对象的生命周期和垃圾回收过程。通过理解引用计数、标记-清除及分代收集等核心概念,帮助开发者优化程序性能,避免内存泄漏。 ####
97 3