- 在基本设置区域,选择请求处理程序类型为处理事件请求。
- 在函数代码区域,根据实际业务选择运行环境,设置代码上传方式为使用示例代码方式,并选择对象存储OSS触发函数。
- 在触发器配置区域,选择触发器类型为对象存储OSS,指定Bucket名称为
osstest-20
、文件前缀为img
、文件后缀为png
,触发事件选择oss:ObjectCreated:PutObject
和oss:ObjectCreated:PostObject
事件,角色名称为AliyunOSSEventNotificationRole
。
首次创建函数时,您可以根据页面提示授予事件源访问函数计算权限。更多内容,请参见授予事件源访问函数计算的权限。
关于OSS触发器事件的详细信息,请参见OSS触发器概述。
2、通过函数调用内容安全图片审核增强版服务。
官方提供了一种通过函数计算来触发内容安全增量扫描的方案,但是该方案没有处理文件的示例,业务方的需求是把检测到的违规文件直接禁止公共读,需要置为PRIVATE权限,加完处理后的示例代码如下:
# -*- coding: utf-8 -*-importjson, uuidimportconfigparserfromaliyunsdkcore.clientimportAcsClientfromaliyunsdkcore.requestimportCommonRequestimportoss2# 需要自行根据检测内容确定好bucket路径及object名称defoss_acl_private(bucket_name, object_name): auth=oss2.Auth("accessKeyId", "accessSecret") bucket=oss2.Bucket(auth, 'https://oss-cn-shanghai.aliyuncs.com', bucket_name) # yourObjectName填写Object完整路径,完整路径中不能包含Bucket名称。bucket.put_object_acl(object_name, oss2.OBJECT_ACL_PRIVATE) defhandler(event, context): config=configparser.ConfigParser() config.read("config.ini", encoding="GB18030") # AccessKey ID和AccessKey Secret写入的配置文件。evt=json.loads(event) evt=evt['events'][0] bucket_name=evt['oss']['bucket']['name'] object_name=evt['oss']['object']['key'] ossRegionId=evt['region'] client=AcsClient(config.get("config", "accessKeyId"), config.get("config", "accessSecret"), 'cn-shanghai') request=CommonRequest() request.set_read_timeout(6000) # 读超时时间配置request.set_connect_timeout(3000) # 连接超时时间配置request.set_accept_format('json') request.set_method('POST') request.set_protocol_type('https') # https | httprequest.set_domain('green-cip.cn-shanghai.aliyuncs.com') request.set_version('2022-03-02') request.set_action_name('ImageModeration') request.add_query_param("Service", "baselineCheck") request.add_query_param("ServiceParameters", { # 'imageUrl': 'https://www.aliyun.com/1.jpg','dataId': str(uuid.uuid4()), 'ossObjectName': object_name, 'ossRegionId': ossRegionId, 'ossBucketName': bucket_name, }) response=client.do_action_with_exception(request) # print(str(response, encoding='utf-8'))response_json=json.loads(response) ifresponse_json.get("Code") ==200: foriteminresponse_json.get("Data", {}).get("Result", []): print('item', item) ifitem["Label"] =="nonLabel": print("nonLabel 无命中标签") elif"pornographic"initem["Label"] andint(item["Confidence"]) >80: print("疑似含有色情内容") # oss_acl_private(bucket_name, object_name)elif"political"initem["Label"] andint(item["Confidence"]) >80: print("疑似含有涉政内容") # oss_acl_private(bucket_name, object_name)elif"violent"initem["Label"] andint(item["Confidence"]) >80: print("疑似含有暴恐内容") # oss_acl_private(bucket_name, object_name)elif"fraud"initem["Label"] andint(item["Confidence"]) >80: print("疑似含有风险内容") else: print("请求失败,错误码:", response_json.get("Code"))