OSS 触发器实用函数示例集锦

本文涉及的产品
简介: OSS 触发器实用函数示例

自动解压

函数计算实现 oss 上传超大 zip 压缩文件的自动解压处理

利用函数计算对oss压缩文件做自动解压处理

姊妹篇(非oss触发器使用案例):
使用函数计算流式打包OSS文件

object 备份

oss 跨 region 备份或者 NAS 备份

下面代码代码示例,oss 触发器触发的函数, 自动将该 object 从本region复制到另外一个region, 或者说备份到nas

# -*- coding: utf-8 -*-
import oss2, json
import os
import logging

OSS_DEST_REGION = os.environ["OSS_DEST_REGION"]
OSS_DEST_BUCKET = os.environ["OSS_DEST_BUCKET"]
OSS_DEST_AK_ID = os.environ["OSS_DEST_AK_ID"]
OSS_DEST_AK_SK = os.environ["OSS_DEST_AK_SK"]
OSS_DEST_CALLBACK = os.environ["OSS_DEST_CALLBACK"]

# Close the info log printed by the oss SDK
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

def handler(event, context):
  evt_lst = json.loads(event)
  print(evt_lst)
  creds = context.credentials
  auth=oss2.StsAuth(
    creds.access_key_id,
    creds.access_key_secret,
    creds.security_token)

  evt = evt_lst['events'][0]
  bucket_name = evt['oss']['bucket']['name']
  endpoint = 'oss-' +  evt['region'] + '-internal.aliyuncs.com'
  bucket = oss2.Bucket(auth, endpoint, bucket_name)
  object_name = evt['oss']['object']['key']
  
  r = bucket.get_object(object_name)
  
  headers = None
  # copy object 去另外一个region
  
  ''' 如果有自定义的callback, 可以参考如下code
  call_back_event = {} # 用户自定义的参数,这里示例是一个空的dict
  callback_dict = {}
  callback_dict['callbackUrl'] = OSS_DEST_CALLBACK
  callback_body = '&'.join(['{0!s}={1!s}'.format(k, v) for k, v in call_back_event.items()])
  callback_dict['callbackBody'] = callback_body
  callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'

  callback_param = json.dumps(callback_dict).strip()
  base64_callback_body = base64.b64encode(bytes(callback_param, encoding='utf-8'))
  base64_callback_body = str(base64_callback_body, encoding='utf-8')
  headers = {'x-oss-callback': base64_callback_body}
  '''
  
  # 备份到另外一个region的bucket
  oss_dest_auth = oss2.Auth(OSS_DEST_AK_ID, OSS_DEST_AK_SK)
  oss_dest_bucket = oss2.Bucket(oss_dest_auth, OSS_DEST_REGION, OSS_DEST_BUCKET)
  oss_dest_bucket.put_object(filename, r, headers)
  
  # 备份到NAS
  # 写入函数计算挂载的nas,假设挂载的目录为 /mnt/nas_dir
  with open("/mnt/nas_dir","w") as f:
        f.write(r.read())

oss object 自动上传 FTP 备份

以下是这样的一个代码示例, 用户上传了一个文件到 bucket, 自动触发函数, 并且将这个文件的 md5 值传到某个服务用于校验, 同时将这个文件上传到 ftp 服务器

# -*- coding: utf-8 -*-
import oss2, json
import os
import logging
import requests
from hashlib import md5
import datetime, time
from ftplib import FTP

GMT_FORMAT = '%a, %d %b %Y %H:%M:%S GMT'
LOGGER = logging.getLogger()
MD5_HTTP_URL = "<http server url>"

# 将对应的ftp 信息改写正确
FTP_ADDR = "<ftp addr>"
FTP_PORT = 21
FTP_USER = "<ftp-user-name>"
FTP_PWD = "123456"
FTP_SAVE_ROOT_PATH = "/home/remote1/"


# Close the info log printed by the oss SDK
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

def handler(event, context):
  evt_lst = json.loads(event)
  #print(evt_lst)
  creds = context.credentials
  auth=oss2.StsAuth(
    creds.access_key_id,
    creds.access_key_secret,
    creds.security_token)

  evt = evt_lst['events'][0]
  bucket_name = evt['oss']['bucket']['name']
  endpoint = 'oss-' +  evt['region'] + '-internal.aliyuncs.com'
  bucket = oss2.Bucket(auth, endpoint, bucket_name)
  object_name = evt['oss']['object']['key']
  
  r = bucket.get_object(object_name)
  m = md5()
  while 1:
    data = r.read(4096)
    if not data:
      break
    m.update(data)
  md5_value = m.hexdigest()

  objectmeta = bucket.head_object(object_name)
  last_modified = objectmeta.headers['Last-Modified']
  
  # convert timeformat from GMT to YYYYMMDDHHmmss
  dt = datetime.datetime.strptime(last_modified, GMT_FORMAT)
  time_t = time.strftime("%Y%m%d%H%M%S", dt.utctimetuple())
  
  payload = {'md5': md5_value, 'time': time_t, 'filePath': object_name, 'bucketName':bucket_name}
  
  # LOGGER.info(payload)
  r = requests.get(MD5_HTTP_URL, params=payload)
  
  # 异常处理
  if r.status_code >= 400:
    LOGGER.error("upload md5 fail, payload = {} , detail = {}".format(payload, r.text))
    
  # upload file to ftp server
  ftp = FTP()
  #ftp.set_debuglevel(2)
  ftp.connect(FTP_ADDR, FTP_PORT)
  ftp.login(FTP_USER, FTP_PWD)
  
  # 生成对应的目录, object key中有/, 即有目录,这边也处理成有目录
  ftp.cwd(FTP_SAVE_ROOT_PATH)
  parent_dir = os.path.dirname(object_name)
  if parent_dir:
    try:
      ftp.mkd(parent_dir)
    except:
      pass 
  
  sever_will_savefile = FTP_SAVE_ROOT_PATH + object_name
  try:
    remote_stream=bucket.get_object(object_name)
    ftp.storbinary('STOR ' +  sever_will_savefile, remote_stream)
  except Exception as e:
    LOGGER.error("upload ftp server fail, savefile ={}, detail = {}".format(sever_will_savefile, str(e)))
  finally:
    ftp.close()

  return "OK"

持续更新中 ...

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
5天前
|
存储 SQL 关系型数据库
MySQL 进阶使用【函数、索引、视图、存储过程、存储函数、触发器】(2)
MySQL 进阶使用【函数、索引、视图、存储过程、存储函数、触发器】
|
5天前
|
存储 SQL 关系型数据库
MySQL 进阶使用【函数、索引、视图、存储过程、存储函数、触发器】(1)
MySQL 进阶使用【函数、索引、视图、存储过程、存储函数、触发器】
|
5天前
|
存储 关系型数据库 MySQL
MySQL 进阶使用【函数、索引、视图、存储过程、存储函数、触发器】(4)
MySQL 进阶使用【函数、索引、视图、存储过程、存储函数、触发器】
|
5天前
|
存储 SQL 关系型数据库
MySQL 进阶使用【函数、索引、视图、存储过程、存储函数、触发器】(3)
MySQL 进阶使用【函数、索引、视图、存储过程、存储函数、触发器】
|
6天前
|
机器学习/深度学习 人工智能 分布式计算
人工智能平台PAI产品使用合集之在easy_rec中,将model_dir设置为oss地址时,oss相关配置需要加载在环境中,有完整的示例吗
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
6天前
|
运维 Serverless API
Serverless 应用引擎产品使用合集之哪里添加函数计算触发器能够触发某个函数work
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
30天前
|
运维 监控
zabbix 触发器之count函数
zabbix监控中使用count函数,可以设置连续几次都异常才发出告警,确认多次以减少了很多误告警,提高了运维效率,这样一来,只要发出告警基本上就已经确定发生故障了。
|
1月前
|
人工智能 运维 Serverless
Serverless 应用引擎产品使用之阿里函数计算中3.0的函数只加自定义域名没有任何触发器的意义如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
1月前
|
弹性计算 监控 Serverless
Serverless 应用引擎操作报错合集之阿里函数计算中调用zip-oss-fc函数返回时候出现错误代码如何解决
Serverless 应用引擎(SAE)是阿里云提供的Serverless PaaS平台,支持Spring Cloud、Dubbo、HSF等主流微服务框架,简化应用的部署、运维和弹性伸缩。在使用SAE过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
Serverless 应用引擎操作报错合集之阿里函数计算中调用zip-oss-fc函数返回时候出现错误代码如何解决
|
11月前
|
存储 SQL 算法
MySQL高级第一篇(共四篇)之索引、视图、存储过程和函数、触发器
MySQL官方对索引的定义为:索引(index)是帮助MySQL高效获取数据的数据结构(有序)。在数据之外,数据库系统还维护着满足特定查找算法的数据结构,这些数据结构以某种方式引用(指向)数据, 这样就可以在这些数据结构上实现高级查找算法,这种数据结构就是索引。如下面的示意图所示 :
246 0

热门文章

最新文章