OSS 触发器实用函数示例集锦

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
函数计算FC,每月15万CU 3个月
简介: OSS 触发器实用函数示例

自动解压

函数计算实现 oss 上传超大 zip 压缩文件的自动解压处理

利用函数计算对oss压缩文件做自动解压处理

姊妹篇(非oss触发器使用案例):
使用函数计算流式打包OSS文件

object 备份

oss 跨 region 备份或者 NAS 备份

下面代码代码示例,oss 触发器触发的函数, 自动将该 object 从本region复制到另外一个region, 或者说备份到nas

# -*- coding: utf-8 -*-
import oss2, json
import os
import logging

OSS_DEST_REGION = os.environ["OSS_DEST_REGION"]
OSS_DEST_BUCKET = os.environ["OSS_DEST_BUCKET"]
OSS_DEST_AK_ID = os.environ["OSS_DEST_AK_ID"]
OSS_DEST_AK_SK = os.environ["OSS_DEST_AK_SK"]
OSS_DEST_CALLBACK = os.environ["OSS_DEST_CALLBACK"]

# Close the info log printed by the oss SDK
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

def handler(event, context):
  evt_lst = json.loads(event)
  print(evt_lst)
  creds = context.credentials
  auth=oss2.StsAuth(
    creds.access_key_id,
    creds.access_key_secret,
    creds.security_token)

  evt = evt_lst['events'][0]
  bucket_name = evt['oss']['bucket']['name']
  endpoint = 'oss-' +  evt['region'] + '-internal.aliyuncs.com'
  bucket = oss2.Bucket(auth, endpoint, bucket_name)
  object_name = evt['oss']['object']['key']
  
  r = bucket.get_object(object_name)
  
  headers = None
  # copy object 去另外一个region
  
  ''' 如果有自定义的callback, 可以参考如下code
  call_back_event = {} # 用户自定义的参数,这里示例是一个空的dict
  callback_dict = {}
  callback_dict['callbackUrl'] = OSS_DEST_CALLBACK
  callback_body = '&'.join(['{0!s}={1!s}'.format(k, v) for k, v in call_back_event.items()])
  callback_dict['callbackBody'] = callback_body
  callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'

  callback_param = json.dumps(callback_dict).strip()
  base64_callback_body = base64.b64encode(bytes(callback_param, encoding='utf-8'))
  base64_callback_body = str(base64_callback_body, encoding='utf-8')
  headers = {'x-oss-callback': base64_callback_body}
  '''
  
  # 备份到另外一个region的bucket
  oss_dest_auth = oss2.Auth(OSS_DEST_AK_ID, OSS_DEST_AK_SK)
  oss_dest_bucket = oss2.Bucket(oss_dest_auth, OSS_DEST_REGION, OSS_DEST_BUCKET)
  oss_dest_bucket.put_object(filename, r, headers)
  
  # 备份到NAS
  # 写入函数计算挂载的nas,假设挂载的目录为 /mnt/nas_dir
  with open("/mnt/nas_dir","w") as f:
        f.write(r.read())

oss object 自动上传 FTP 备份

以下是这样的一个代码示例, 用户上传了一个文件到 bucket, 自动触发函数, 并且将这个文件的 md5 值传到某个服务用于校验, 同时将这个文件上传到 ftp 服务器

# -*- coding: utf-8 -*-
import oss2, json
import os
import logging
import requests
from hashlib import md5
import datetime, time
from ftplib import FTP

GMT_FORMAT = '%a, %d %b %Y %H:%M:%S GMT'
LOGGER = logging.getLogger()
MD5_HTTP_URL = "<http server url>"

# 将对应的ftp 信息改写正确
FTP_ADDR = "<ftp addr>"
FTP_PORT = 21
FTP_USER = "<ftp-user-name>"
FTP_PWD = "123456"
FTP_SAVE_ROOT_PATH = "/home/remote1/"


# Close the info log printed by the oss SDK
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

def handler(event, context):
  evt_lst = json.loads(event)
  #print(evt_lst)
  creds = context.credentials
  auth=oss2.StsAuth(
    creds.access_key_id,
    creds.access_key_secret,
    creds.security_token)

  evt = evt_lst['events'][0]
  bucket_name = evt['oss']['bucket']['name']
  endpoint = 'oss-' +  evt['region'] + '-internal.aliyuncs.com'
  bucket = oss2.Bucket(auth, endpoint, bucket_name)
  object_name = evt['oss']['object']['key']
  
  r = bucket.get_object(object_name)
  m = md5()
  while 1:
    data = r.read(4096)
    if not data:
      break
    m.update(data)
  md5_value = m.hexdigest()

  objectmeta = bucket.head_object(object_name)
  last_modified = objectmeta.headers['Last-Modified']
  
  # convert timeformat from GMT to YYYYMMDDHHmmss
  dt = datetime.datetime.strptime(last_modified, GMT_FORMAT)
  time_t = time.strftime("%Y%m%d%H%M%S", dt.utctimetuple())
  
  payload = {'md5': md5_value, 'time': time_t, 'filePath': object_name, 'bucketName':bucket_name}
  
  # LOGGER.info(payload)
  r = requests.get(MD5_HTTP_URL, params=payload)
  
  # 异常处理
  if r.status_code >= 400:
    LOGGER.error("upload md5 fail, payload = {} , detail = {}".format(payload, r.text))
    
  # upload file to ftp server
  ftp = FTP()
  #ftp.set_debuglevel(2)
  ftp.connect(FTP_ADDR, FTP_PORT)
  ftp.login(FTP_USER, FTP_PWD)
  
  # 生成对应的目录, object key中有/, 即有目录,这边也处理成有目录
  ftp.cwd(FTP_SAVE_ROOT_PATH)
  parent_dir = os.path.dirname(object_name)
  if parent_dir:
    try:
      ftp.mkd(parent_dir)
    except:
      pass 
  
  sever_will_savefile = FTP_SAVE_ROOT_PATH + object_name
  try:
    remote_stream=bucket.get_object(object_name)
    ftp.storbinary('STOR ' +  sever_will_savefile, remote_stream)
  except Exception as e:
    LOGGER.error("upload ftp server fail, savefile ={}, detail = {}".format(sever_will_savefile, str(e)))
  finally:
    ftp.close()

  return "OK"

持续更新中 ...

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
2月前
|
存储 运维 Java
函数计算产品使用问题之怎么配置定时触发器来调用Java函数
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
2月前
|
存储 运维 Serverless
函数计算产品使用问题之OSS触发器是否可以只设置文件前缀
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
3月前
|
Java Serverless 数据库连接
函数计算操作报错合集之调用打包的OSS函数时发生报错,该怎么办
Serverless 应用引擎(SAE)是阿里云提供的Serverless PaaS平台,支持Spring Cloud、Dubbo、HSF等主流微服务框架,简化应用的部署、运维和弹性伸缩。在使用SAE过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
3月前
|
运维 Serverless 数据处理
函数计算产品使用问题之在对象存储服务(OSS)上创建ZIP包解压触发器后,触发器未按预期执行,一般是什么导致的
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
4月前
|
运维 Serverless KVM
函数计算产品使用问题之如何处理冷启动时间过长的问题
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
4月前
|
监控 Serverless 持续交付
函数计算产品使用问题之如何使用定时触发器预热函数
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
4月前
|
JSON Serverless 对象存储
函数计算产品使用问题之如何创建一个同时具有HTTP触发器和OSS触发器的函数
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
3月前
|
存储 SQL 数据库
MySQL设计规约问题之为什么要避免使用存储过程、触发器和函数
MySQL设计规约问题之为什么要避免使用存储过程、触发器和函数
|
4月前
|
监控 Serverless 持续交付
函数计算产品使用问题之如何使用定时触发器预热函数
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
4月前
|
运维 Serverless 数据处理
函数计算产品使用问题之OSS触发器是否只支持事件处理函数
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。