OSS 触发器实用函数示例集锦-阿里云开发者社区

开发者社区> 阿里云Serverless Compute> 正文

OSS 触发器实用函数示例集锦

简介: OSS 触发器实用函数示例

自动解压

函数计算实现 oss 上传超大 zip 压缩文件的自动解压处理

利用函数计算对oss压缩文件做自动解压处理

姊妹篇(非oss触发器使用案例):
使用函数计算流式打包OSS文件

object 备份

oss 跨 region 备份或者 NAS 备份

下面代码代码示例,oss 触发器触发的函数, 自动将该 object 从本region复制到另外一个region, 或者说备份到nas

# -*- coding: utf-8 -*-
import oss2, json
import os
import logging

OSS_DEST_REGION = os.environ["OSS_DEST_REGION"]
OSS_DEST_BUCKET = os.environ["OSS_DEST_BUCKET"]
OSS_DEST_AK_ID = os.environ["OSS_DEST_AK_ID"]
OSS_DEST_AK_SK = os.environ["OSS_DEST_AK_SK"]
OSS_DEST_CALLBACK = os.environ["OSS_DEST_CALLBACK"]

# Close the info log printed by the oss SDK
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

def handler(event, context):
  evt_lst = json.loads(event)
  print(evt_lst)
  creds = context.credentials
  auth=oss2.StsAuth(
    creds.access_key_id,
    creds.access_key_secret,
    creds.security_token)

  evt = evt_lst['events'][0]
  bucket_name = evt['oss']['bucket']['name']
  endpoint = 'oss-' +  evt['region'] + '-internal.aliyuncs.com'
  bucket = oss2.Bucket(auth, endpoint, bucket_name)
  object_name = evt['oss']['object']['key']
  
  r = bucket.get_object(object_name)
  
  headers = None
  # copy object 去另外一个region
  
  ''' 如果有自定义的callback, 可以参考如下code
  call_back_event = {} # 用户自定义的参数,这里示例是一个空的dict
  callback_dict = {}
  callback_dict['callbackUrl'] = OSS_DEST_CALLBACK
  callback_body = '&'.join(['{0!s}={1!s}'.format(k, v) for k, v in call_back_event.items()])
  callback_dict['callbackBody'] = callback_body
  callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'

  callback_param = json.dumps(callback_dict).strip()
  base64_callback_body = base64.b64encode(bytes(callback_param, encoding='utf-8'))
  base64_callback_body = str(base64_callback_body, encoding='utf-8')
  headers = {'x-oss-callback': base64_callback_body}
  '''
  
  # 备份到另外一个region的bucket
  oss_dest_auth = oss2.Auth(OSS_DEST_AK_ID, OSS_DEST_AK_SK)
  oss_dest_bucket = oss2.Bucket(oss_dest_auth, OSS_DEST_REGION, OSS_DEST_BUCKET)
  oss_dest_bucket.put_object(filename, r, headers)
  
  # 备份到NAS
  # 写入函数计算挂载的nas,假设挂载的目录为 /mnt/nas_dir
  with open("/mnt/nas_dir","w") as f:
        f.write(r.read())

oss object 自动上传 FTP 备份

以下是这样的一个代码示例, 用户上传了一个文件到 bucket, 自动触发函数, 并且将这个文件的 md5 值传到某个服务用于校验, 同时将这个文件上传到 ftp 服务器

# -*- coding: utf-8 -*-
import oss2, json
import os
import logging
import requests
from hashlib import md5
import datetime, time
from ftplib import FTP

GMT_FORMAT = '%a, %d %b %Y %H:%M:%S GMT'
LOGGER = logging.getLogger()
MD5_HTTP_URL = "<http server url>"

# 将对应的ftp 信息改写正确
FTP_ADDR = "<ftp addr>"
FTP_PORT = 21
FTP_USER = "<ftp-user-name>"
FTP_PWD = "123456"
FTP_SAVE_ROOT_PATH = "/home/remote1/"


# Close the info log printed by the oss SDK
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

def handler(event, context):
  evt_lst = json.loads(event)
  #print(evt_lst)
  creds = context.credentials
  auth=oss2.StsAuth(
    creds.access_key_id,
    creds.access_key_secret,
    creds.security_token)

  evt = evt_lst['events'][0]
  bucket_name = evt['oss']['bucket']['name']
  endpoint = 'oss-' +  evt['region'] + '-internal.aliyuncs.com'
  bucket = oss2.Bucket(auth, endpoint, bucket_name)
  object_name = evt['oss']['object']['key']
  
  r = bucket.get_object(object_name)
  m = md5()
  while 1:
    data = r.read(4096)
    if not data:
      break
    m.update(data)
  md5_value = m.hexdigest()

  objectmeta = bucket.head_object(object_name)
  last_modified = objectmeta.headers['Last-Modified']
  
  # convert timeformat from GMT to YYYYMMDDHHmmss
  dt = datetime.datetime.strptime(last_modified, GMT_FORMAT)
  time_t = time.strftime("%Y%m%d%H%M%S", dt.utctimetuple())
  
  payload = {'md5': md5_value, 'time': time_t, 'filePath': object_name, 'bucketName':bucket_name}
  
  # LOGGER.info(payload)
  r = requests.get(MD5_HTTP_URL, params=payload)
  
  # 异常处理
  if r.status_code >= 400:
    LOGGER.error("upload md5 fail, payload = {} , detail = {}".format(payload, r.text))
    
  # upload file to ftp server
  ftp = FTP()
  #ftp.set_debuglevel(2)
  ftp.connect(FTP_ADDR, FTP_PORT)
  ftp.login(FTP_USER, FTP_PWD)
  
  # 生成对应的目录, object key中有/, 即有目录,这边也处理成有目录
  ftp.cwd(FTP_SAVE_ROOT_PATH)
  parent_dir = os.path.dirname(object_name)
  if parent_dir:
    try:
      ftp.mkd(parent_dir)
    except:
      pass 
  
  sever_will_savefile = FTP_SAVE_ROOT_PATH + object_name
  try:
    remote_stream=bucket.get_object(object_name)
    ftp.storbinary('STOR ' +  sever_will_savefile, remote_stream)
  except Exception as e:
    LOGGER.error("upload ftp server fail, savefile ={}, detail = {}".format(sever_will_savefile, str(e)))
  finally:
    ftp.close()

  return "OK"

持续更新中 ...

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
阿里云Serverless Compute
使用钉钉扫一扫加入圈子
+ 订阅

Serverless微服务应用实践分享

函数计算官网