python编写api调用ceph对象网关

简介:
#_*_coding:utf-8_*_
#yum install python-boto
import  boto
import  boto.s3.connection
#pip install filechunkio
from  filechunkio  import   FileChunkIO
import  math
import   threading
import  os
import  Queue
class  Chunk( object ):
     num  =  0
     offset  =  0
     len  =  0
     def  __init__( self ,n,o,l):
         self .num = n
         self .offset = o
         self .length = l
 
 
 
class  CONNECTION( object ):
     def  __init__( self ,access_key,secret_key,ip,port,is_secure = False ,chrunksize = 8 << 20 ):  #chunksize最小8M否则上传过程会报错
         self .conn = boto.connect_s3(
         aws_access_key_id = access_key,
         aws_secret_access_key = secret_key,
         host = ip,port = port,
         is_secure = is_secure,
         calling_format = boto.s3.connection.OrdinaryCallingFormat()
         )
         self .chrunksize = chrunksize
         self .port = port
 
     #查询
     def  list_all( self ):
         all_buckets = self .conn.get_all_buckets()
         for  bucket  in  all_buckets:
             print  u '容器名: %s'  % (bucket.name)
             for  key  in  bucket. list ():
                 print  ' ' * 5 , "%-20s%-20s%-20s%-40s%-20s"  % (key.mode,key.owner. id ,key.size,key.last_modified.split( '.' )[ 0 ],key.name)
 
     def  list_single( self ,bucket_name):
         try :
             single_bucket  =  self .conn.get_bucket(bucket_name)
         except  Exception as e:
             print  'bucket %s is not exist'  % bucket_name
             return
         print  u '容器名: %s'  %  (single_bucket.name)
         for  key  in  single_bucket. list ():
             print  ' '  *  5 "%-20s%-20s%-20s%-40s%-20s"  %  (key.mode, key.owner. id , key.size, key.last_modified.split( '.' )[ 0 ], key.name)
 
     #普通小文件下载:文件大小<=8M
     def  dowload_file( self ,filepath,key_name,bucket_name):
         all_bucket_name_list  =  [i.name  for  in  self .conn.get_all_buckets()]
         if  bucket_name  not  in  all_bucket_name_list:
             print  'Bucket %s is not exist,please try again'  %  (bucket_name)
             return
         else :
             bucket  =  self .conn.get_bucket(bucket_name)
 
         all_key_name_list  =  [i.name  for  in  bucket.get_all_keys()]
         if  key_name  not  in  all_key_name_list:
             print  'File %s is not exist,please try again'  %  (key_name)
             return
         else :
             key  =  bucket.get_key(key_name)
 
         if  not  os.path.exists(os.path.dirname(filepath)):
             print  'Filepath %s is not exists, sure to create and try again'  %  (filepath)
             return
 
         if  os.path.exists(filepath):
             while  True :
                 d_tag  =  raw_input ( 'File %s already exists, sure you want to cover (Y/N)?'  %  (key_name)).strip()
                 if  d_tag  not  in  [ 'Y' 'N' or  len (d_tag)  = =  0 :
                     continue
                 elif  d_tag  = =  'Y' :
                     os.remove(filepath)
                     break
                 elif  d_tag  = =  'N' :
                     return
         os.mknod(filepath)
         try :
             key.get_contents_to_filename(filepath)
         except  Exception:
             pass
 
     # 普通小文件上传:文件大小<=8M
     def  upload_file( self ,filepath,key_name,bucket_name):
         try :
             bucket  =  self .conn.get_bucket(bucket_name)
         except  Exception as e:
             print  'bucket %s is not exist'  %  bucket_name
             tag  =  raw_input ( 'Do you want to create the bucket %s: (Y/N)?'  %  bucket_name).strip()
             while  tag  not  in  [ 'Y' 'N' ]:
                 tag  =  raw_input ( 'Please input (Y/N)' ).strip()
             if  tag  = =  'N' :
                 return
             elif  tag  = =  'Y' :
                 self .conn.create_bucket(bucket_name)
                 bucket  =  self .conn.get_bucket(bucket_name)
         all_key_name_list  =  [i.name  for  in  bucket.get_all_keys()]
         if  key_name  in  all_key_name_list:
             while  True :
                 f_tag  =  raw_input (u 'File already exists, sure you want to cover (Y/N)?: ' ).strip()
                 if  f_tag  not  in  [ 'Y' 'N' or  len (f_tag)  = =  0 :
                     continue
                 elif  f_tag  = =  'Y' :
                     break
                 elif  f_tag  = =  'N' :
                     return
         key = bucket.new_key(key_name)
         if  not  os.path.exists(filepath):
             print  'File %s does not exist, please make sure you want to upload file path and try again'  % (key_name)
             return
         try :
             f = file (filepath, 'rb' )
             data = f.read()
             key.set_contents_from_string(data)
         except  Exception:
             pass
 
     def  delete_file( self ,key_name,bucket_name):
         all_bucket_name_list  =  [i.name  for  in  self .conn.get_all_buckets()]
         if  bucket_name  not  in  all_bucket_name_list:
             print  'Bucket %s is not exist,please try again'  %  (bucket_name)
             return
         else :
             bucket  =  self .conn.get_bucket(bucket_name)
 
         all_key_name_list  =  [i.name  for  in  bucket.get_all_keys()]
         if  key_name  not  in  all_key_name_list:
             print  'File %s is not exist,please try again'  %  (key_name)
             return
         else :
             key  =  bucket.get_key(key_name)
 
         try :
             bucket.delete_key(key.name)
         except  Exception:
             pass
 
     def  delete_bucket( self ,bucket_name):
         all_bucket_name_list  =  [i.name  for  in  self .conn.get_all_buckets()]
         if  bucket_name  not  in  all_bucket_name_list:
             print  'Bucket %s is not exist,please try again'  %  (bucket_name)
             return
         else :
             bucket  =  self .conn.get_bucket(bucket_name)
 
         try :
             self .conn.delete_bucket(bucket.name)
         except  Exception:
             pass
 
 
     #队列生成
     def  init_queue( self ,filesize,chunksize):    #8<<20 :8*2**20
         chunkcnt = int (math.ceil(filesize * 1.0 / chunksize))
         q = Queue.Queue(maxsize = chunkcnt)
         for  in  range ( 0 ,chunkcnt):
             offset = chunksize * i
             length = min (chunksize,filesize - offset)
             c = Chunk(i + 1 ,offset,length)
             q.put(c)
         return  q
 
     #分片上传object
     def  upload_trunk( self ,filepath,mp,q, id ):
         while  not  q.empty():
             chunk = q.get()
             fp = FileChunkIO(filepath, 'r' ,offset = chunk.offset,bytes = chunk.length)
             mp.upload_part_from_file(fp,part_num = chunk.num)
             fp.close()
             q.task_done()
 
     #文件大小获取---->S3分片上传对象生成----->初始队列生成(--------------->文件切,生成切分对象)
     def  upload_file_multipart( self ,filepath,key_name,bucket_name,threadcnt = 8 ):
         filesize = os.stat(filepath).st_size
         try :
             bucket = self .conn.get_bucket(bucket_name)
         except  Exception as e:
             print  'bucket %s is not exist'  %  bucket_name
             tag = raw_input ( 'Do you want to create the bucket %s: (Y/N)?'  % bucket_name).strip()
             while  tag  not  in  [ 'Y' , 'N' ]:
                 tag = raw_input ( 'Please input (Y/N)' ).strip()
             if  tag  = =  'N' :
                 return
             elif  tag  = =  'Y' :
                 self .conn.create_bucket(bucket_name)
                 bucket  =  self .conn.get_bucket(bucket_name)
         all_key_name_list = [i.name  for  in  bucket.get_all_keys()]
         if  key_name   in  all_key_name_list:
             while  True :
                 f_tag = raw_input (u 'File already exists, sure you want to cover (Y/N)?: ' ).strip()
                 if  f_tag  not  in  [ 'Y' , 'N' or  len (f_tag)  = =  0 :
                     continue
                 elif  f_tag  = =  'Y' :
                     break
                 elif  f_tag  = =  'N' :
                     return
 
         mp = bucket.initiate_multipart_upload(key_name)
         q = self .init_queue(filesize, self .chrunksize)
         for  in  range ( 0 ,threadcnt):
             t = threading.Thread(target = self .upload_trunk,args = (filepath,mp,q,i))
             t.setDaemon( True )
             t.start()
         q.join()
         mp.complete_upload()
 
     #文件分片下载
     def  download_chrunk( self ,filepath,key_name,bucket_name,q, id ):
         while  not  q.empty():
             chrunk = q.get()
             offset = chrunk.offset
             length = chrunk.length
             bucket = self .conn.get_bucket(bucket_name)
             resp = bucket.connection.make_request( 'GET' ,bucket_name,key_name,headers = { 'Range' : "bytes=%d-%d"  % (offset,offset + length)})
             data = resp.read(length)
             fp = FileChunkIO(filepath, 'r+' ,offset = chrunk.offset,bytes = chrunk.length)
             fp.write(data)
             fp.close()
             q.task_done()
 
     def  download_file_multipart( self ,filepath,key_name,bucket_name,threadcnt = 8 ):
         all_bucket_name_list = [i.name  for  in  self .conn.get_all_buckets()]
         if  bucket_name  not  in  all_bucket_name_list:
             print  'Bucket %s is not exist,please try again'  % (bucket_name)
             return
         else :
             bucket = self .conn.get_bucket(bucket_name)
 
         all_key_name_list  =  [i.name  for  in  bucket.get_all_keys()]
         if  key_name  not  in  all_key_name_list:
             print  'File %s is not exist,please try again'  % (key_name)
             return
         else :
             key = bucket.get_key(key_name)
 
         if  not  os.path.exists(os.path.dirname(filepath)):
             print  'Filepath %s is not exists, sure to create and try again'  %  (filepath)
             return
 
         if  os.path.exists(filepath):
             while  True :
                 d_tag  =  raw_input ( 'File %s already exists, sure you want to cover (Y/N)?'  %  (key_name)).strip()
                 if  d_tag  not  in  [ 'Y' 'N' or  len (d_tag)  = =  0 :
                     continue
                 elif  d_tag  = =  'Y' :
                     os.remove(filepath)
                     break
                 elif  d_tag  = =  'N' :
                     return
         os.mknod(filepath)
         filesize = key.size
         q = self .init_queue(filesize, self .chrunksize)
         for  in  range ( 0 ,threadcnt):
             t = threading.Thread(target = self .download_chrunk,args = (filepath,key_name,bucket_name,q,i))
             t.setDaemon( True )
             t.start()
         q.join()
 
     def  generate_object_download_urls( self ,key_name,bucket_name,valid_time = 0 ):
         all_bucket_name_list  =  [i.name  for  in  self .conn.get_all_buckets()]
         if  bucket_name  not  in  all_bucket_name_list:
             print  'Bucket %s is not exist,please try again'  %  (bucket_name)
             return
         else :
             bucket  =  self .conn.get_bucket(bucket_name)
 
         all_key_name_list  =  [i.name  for  in  bucket.get_all_keys()]
         if  key_name  not  in  all_key_name_list:
             print  'File %s is not exist,please try again'  %  (key_name)
             return
         else :
             key  =  bucket.get_key(key_name)
 
         try :
             key.set_canned_acl( 'public-read' )
             download_url  =  key.generate_url(valid_time, query_auth = False , force_http = True )
             if  self .port ! =  80 :
                 x1 = download_url.split( '/' )[ 0 : 3 ]
                 x2 = download_url.split( '/' )[ 3 :]
                 s1 = u '/' .join(x1)
                 s2 = u '/' .join(x2)
 
                 s3 = ':%s/'  % ( str ( self .port))
                 download_url = s1 + s3 + s2
                 print  download_url
 
         except  Exception:
             pass
 
 
 
if  __name__  = =  '__main__' :
     #约定:
     #1:filepath指本地文件的路径(上传路径or下载路径),指的是绝对路径
     #2:bucket_name相当于文件在对象存储中的目录名或者索引名
     #3:key_name相当于文件在对象存储中对应的文件名或文件索引
 
     access_key  =  "65IY4EC1BSFYNH6SHWGW"
     secret_key  =  "viNfIftLHhrPt2MYK44DkWGvxZb82aYqLrCzGYLx"
     ip = '172.16.201.36'
     port = 8080
     conn = CONNECTION(access_key,secret_key,ip,port)
     #查看所有bucket以及其包含的文件
     #conn.list_all()
 
     #简单上传,用于文件大小<=8M
     # conn.upload_file('/etc/passwd','passwd','test_bucket01')
     #查看单一bucket下所包含的文件信息
     # conn.list_single('test_bucket01')
 
 
     #简单下载,用于文件大小<=8M
     # conn.dowload_file('/lhf_test/test01','passwd','test_bucket01')
     # conn.list_single('test_bucket01')
 
     #删除文件
     # conn.delete_file('passwd','test_bucket01')
     # conn.list_single('test_bucket01')
     #
     #删除bucket
     # conn.delete_bucket('test_bucket01')
     # conn.list_all()
 
     #切片上传(多线程),用于文件大小>8M,8M可修改,但不能小于8M,否则会报错切片太小
     # conn.upload_file_multipart('/etc/passwd','passwd_multi_upload','test_bucket01')
     # conn.list_single('test_bucket01')
 
     # 切片下载(多线程),用于文件大小>8M,8M可修改,但不能小于8M,否则会报错切片太小
     # conn.download_file_multipart('/lhf_test/passwd_multi_dowload','passwd_multi_upload','test_bucket01')
 
     #生成下载url
     #conn.generate_object_download_urls('passwd_multi_upload','test_bucket01')
     #conn.list_all()
目录
相关文章
|
1月前
|
人工智能 监控 负载均衡
一文详述:AI 网关与 API 网关到底有什么区别?
近年来,AI发展迅猛,大模型成为推动业务创新的关键力量。企业面临如何安全管理和部署AI应用的挑战,需设计既能满足当前需求又可适应未来发展的基础架构。AI网关应运而生,在集成、管理和优化AI应用中扮演重要角色。本文探讨AI网关与API网关的区别,分析AI系统为何需要专门网关,并提供选择合适AI网关的建议。AI网关不仅支持多种模型,还具备高级安全性和性能优化功能,有助于企业在复杂环境中灵活应用AI技术。
81 1
|
10天前
|
JSON 安全 API
如何使用Python开发API接口?
在现代软件开发中,API(应用程序编程接口)用于不同软件组件之间的通信和数据交换,实现系统互操作性。Python因其简单易用和强大功能,成为开发API的热门选择。本文详细介绍了Python开发API的基础知识、优势、实现方式(如Flask和Django框架)、实战示例及注意事项,帮助读者掌握高效、安全的API开发技巧。
37 3
如何使用Python开发API接口?
|
3天前
|
JSON API 数据格式
如何使用Python开发1688商品详情API接口?
本文介绍了如何使用Python开发1688商品详情API接口,获取商品的标题、价格、销量和评价等详细信息。主要内容包括注册1688开放平台账号、安装必要Python模块、了解API接口、生成签名、编写Python代码、解析返回数据以及错误处理和日志记录。通过这些步骤,开发者可以轻松地集成1688商品数据到自己的应用中。
17 1
|
11天前
|
前端开发 API 开发者
Python Web开发者必看!AJAX、Fetch API实战技巧,让前后端交互如丝般顺滑!
在Web开发中,前后端的高效交互是提升用户体验的关键。本文通过一个基于Flask框架的博客系统实战案例,详细介绍了如何使用AJAX和Fetch API实现不刷新页面查看评论的功能。从后端路由设置到前端请求处理,全面展示了这两种技术的应用技巧,帮助Python Web开发者提升项目质量和开发效率。
26 1
|
17天前
|
JSON API 数据格式
如何使用Python和Flask构建一个简单的RESTful API。Flask是一个轻量级的Web框架
本文介绍了如何使用Python和Flask构建一个简单的RESTful API。Flask是一个轻量级的Web框架,适合小型项目和微服务。文章从环境准备、创建基本Flask应用、定义资源和路由、请求和响应处理、错误处理等方面进行了详细说明,并提供了示例代码。通过这些步骤,读者可以快速上手构建自己的RESTful API。
25 2
|
1月前
|
存储 JSON API
Python| 如何使用 DALL·E 和 OpenAI API 生成图像(1)
Python| 如何使用 DALL·E 和 OpenAI API 生成图像(1)
41 7
Python| 如何使用 DALL·E 和 OpenAI API 生成图像(1)
|
7天前
|
安全 API 网络架构
Python中哪个框架最适合做API?
本文介绍了Python生态系统中几个流行的API框架,包括Flask、FastAPI、Django Rest Framework(DRF)、Falcon和Tornado。每个框架都有其独特的优势和适用场景。Flask轻量灵活,适合小型项目;FastAPI高性能且自动生成文档,适合需要高吞吐量的API;DRF功能强大,适合复杂应用;Falcon高性能低延迟,适合快速API开发;Tornado异步非阻塞,适合高并发场景。文章通过示例代码和优缺点分析,帮助开发者根据项目需求选择合适的框架。
23 0
|
30天前
|
JSON API 数据格式
使用Python和Flask构建简单的RESTful API
【10月更文挑战第12天】使用Python和Flask构建简单的RESTful API
42 1
|
1月前
|
数据采集 人工智能 自然语言处理
Python实时查询股票API的FinanceAgent框架构建股票(美股/A股/港股)AI Agent
金融领域Finance AI Agents方面的工作,发现很多行业需求和用户输入的 query都是和查询股价/行情/指数/财报汇总/金融理财建议相关。如果需要准确的 金融实时数据就不能只依赖LLM 来生成了。常规的方案包括 RAG (包括调用API )再把对应数据和prompt 一起拼接送给大模型来做文本生成。稳定的一些商业机构的金融数据API基本都是收费的,如果是以科研和demo性质有一些开放爬虫API可以使用。这里主要介绍一下 FinanceAgent,github地址 https://github.com/AI-Hub-Admin/FinanceAgent
|
1月前
|
API 数据库 网络架构
深入浅出:使用Python Flask实现RESTful API
【10月更文挑战第7天】在数字化时代,掌握如何高效构建和部署RESTful API是后端开发者的必备技能。本文将引导你了解如何使用Python Flask框架快速打造一个简单而强大的RESTful服务。从基础环境搭建到API设计原则,再到实际代码示例,我们将一步步揭开Flask框架的神秘面纱,让你轻松上手,并能够自信地处理更复杂的项目。

热门文章

最新文章