Dify平台是AI大模型应用低代码构建平台,提供云服务和私有化部署两种使用方式。本文介绍如何在工作流和Agent中集成安全护栏的最佳实践。
方案概览
本文分别针对在Agent和工作流2种应用,提供了插件和扩展API两种方案对安全护栏集成。
工作流-插件:通过本地插件扩展的方式,一键安装安全护栏插件实现对大模型输入和输出进行防控。
Agent-扩展API:通过API扩展的方式对内容审查API进行扩展,需要部署一套安全护栏与Dify内容审查API标准协议进行适配的服务,考虑到私有化场景的网络、安全等个性化需求,建议本地化进行服务部署,本文提供实现服务的代码示例作为参考。
工作流-扩展API:私有化部署和公共云存在一定差异,具体使用方式可以参考Agent-扩展API。
私有化 |
官网公共云 |
|||||
使用场景 |
流式 |
支持场景 |
对接工作 |
流式 |
支持场景 |
对接工作 |
插件 |
否 |
工作流 |
0代码一键集成 |
否 |
工作流 |
0代码一键集成 |
扩展API |
是 |
Agent workflow工作流 chatflow工作流 |
参考本文代码示例,本地部署转发服务 |
是 |
Agent chatflow工作流 |
参考本文代码示例,部署公网转发服务,需域名备案。 |
前提条件
1.已开通阿里云 AI 安全护栏产品(必须)。https://help.aliyun.com/document_detail/2872706.html
2.在控制台配置安全护栏开关和检测规则(可选)。https://yundun.console.aliyun.com/?p=guardrail&protectConfig/testConfig/testConfig#/protectConfig/testConfig/testConfig
操作步骤
1.工作流本地插件集成安全护栏
以下以公共云workflow作为演示示例,Dify官网云上使用流程一致。
1.1.下载插件
1.2.安装插件
1.3.在工作流中使用插件
大模型输入风险检测
在LLM节点之前新增安全护栏插件节点,检测内容为输入变量为input,检测类型为输入。
大模型输出风险检测
在LLM节点下一步,选择工具=>AI安全护栏节点。
安全护栏插件节点配置,输入变量为大模型text变量,检测类型为输出。
1.4.在阿里云安全护栏控制台排查
输入输出检测项开关配置
可以针对输入输分别配置敏感数据检测、提示词攻击的检测开关
控制台地址:https://yundun.console.aliyun.com/?p=guardrail#/protectConfig/testConfig/testConfig
查看安全检测结果
观测结果,输出内容存在S2风险。
控制台地址:https://yundun.console.aliyun.com/?p=guardrail#/resultTest/resultQuery
1.5.示例效果
示例分别演示了大模型输出存在敏感数据风险和输出正常结果的例子,具体应用时可以按照业务需求来编排。
2.Agent扩展内容审查API集成安全护栏
以下以私有化部署作为演示示例,Dify官网云上使用流程一致。
2.1本地化部署转发服务
安全护栏API单次输入内容最大长度是2000个字符,因此当输入长度>2000字符需要做适配,本示例处理方法如下:
输入审查:将输入按照2000个字符切分成多段,并发调用安全护栏API。
输出审查:由于Dify每隔300字符左右发起一次内容审查API调用,处理上截取最近2000字符调用。
- 代码示例
from fastapi import FastAPI, Body, HTTPException, Header from pydantic import BaseModel import base64 from collections.abc import Generator from typing import Any import hmac import hashlib from urllib.parse import quote import requests from datetime import datetime from datetime import timezone import uuid import json import re import concurrent.futures # 可以根据需要调用不同区域的服务,支持上海(cn-shanghai)、北京(cn-beijing)、杭州(cn-hangzhou)、深圳(cn-shenzhen) SERVICE_URL = "https://green-cip.cn-shanghai.aliyuncs.com" # 超过这个长度时对文本进行切分 MAX_LENGTH = 2000 # 调用安全护栏的输入检测和输出检测的ServiceCode SERVICE_INPUT = "query_security_check" SERVICE_OUTPUT = "response_security_check" ENCODING = "UTF-8" ISO8601_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" ALGORITHM = "HmacSHA1" def format_iso8601_date(): return datetime.now(timezone.utc).strftime(ISO8601_DATE_FORMAT) def percent_encode(value): if value is None: return "" return ( quote(value.encode(ENCODING), safe="~").replace("+", "%20").replace("*", "%2A") ) def create_signature(string_to_sign, secret): secret = secret + "&" signature = hmac.new( secret.encode(ENCODING), string_to_sign.encode(ENCODING), hashlib.sha1 ).digest() return base64.b64encode(signature).decode(ENCODING) def create_string_to_sign(http_method, parameters): sorted_keys = sorted(parameters.keys()) canonicalized_query_string = "" for key in sorted_keys: canonicalized_query_string += ( "&" + percent_encode(key) + "=" + percent_encode(parameters[key]) ) string_to_sign = ( http_method + "&" + percent_encode("/") + "&" + percent_encode(canonicalized_query_string[1:]) ) return string_to_sign def split_text(text: str, max_length: int = 1950) -> list[str]: """将文本按 max_length 分段,尽量保留完整句子(识别多种标点)""" segments = [] while len(text) > max_length: # 提取当前最大长度范围内的子串 chunk = text[:max_length] # 使用正则查找最后一个句号、感叹号、问号等断句符号的位置 match = None for pattern in [r"[。!?;:\.?!]+"]: # 匹配多种结束符号 matches = list(re.finditer(pattern, chunk)) if matches: match = matches[-1] # 取最后一个匹配项 if match: cut_point = match.end() # 包含标点符号 else: cut_point = max_length # 找不到就强制截断 segments.append(text[:cut_point]) text = text[cut_point:] if text: segments.append(text) return segments def request(content_segment, type, aliyun_access_key, aliyun_access_secret): print(datetime.now(), f" [{type} request content]-> {content_segment}") # 3.1 构造请求参数 parameters = { "Action": "TextModerationPlus", "Version": "2022-03-02", "AccessKeyId": aliyun_access_key, "Timestamp": format_iso8601_date(), "SignatureMethod": "HMAC-SHA1", "SignatureVersion": "1.0", "SignatureNonce": str(uuid.uuid4()), "Format": "JSON", "Service": ( SERVICE_INPUT if type == "input" else SERVICE_OUTPUT ), "ServiceParameters": json.dumps( {"content": content_segment}, ensure_ascii=False ), } string_to_sign = create_string_to_sign("POST", parameters) signature = create_signature(string_to_sign, aliyun_access_secret) parameters["Signature"] = signature # 3.2 发送请求 response = requests.post(SERVICE_URL, data=parameters) body = response.json() print(datetime.now(), " [response body]-> ", body) if response.status_code != 200: raise Exception( f"response http status_code not 200. status_code: {response.status_code}, body: {body}" ) if body.get("Code") != 200: raise Exception( f"response code not 200. code: {body.get('Code')}, body: {body}" ) return body app = FastAPI() class InputData(BaseModel): point: str params: dict = {} @app.post("/api/dify/receive") async def dify_receive(data: InputData = Body(...), authorization: str = Header(None)): """ Receive API query data from Dify. """ print(data) auth_scheme, _, api_key = authorization.partition(" ") if auth_scheme.lower() != "bearer": raise HTTPException(status_code=401, detail="Unauthorized") # api_key decode try: decoded_bytes = base64.b64decode(api_key) decoded_str = decoded_bytes.decode("utf-8") ak, sk = decoded_str.split(":", 1) except Exception as e: # 如果库调用失败,抛出异常 raise HTTPException(status_code=401, detail=f"Base64 Decode AK/SK fail: {e}") point = data.point if point == "ping": return {"result": "pong"} if point == "app.moderation.input": return handle_app_moderation_input(params=data.params, ak=ak, sk=sk) elif point == "app.moderation.output": return handle_app_moderation_output(params=data.params, ak=ak, sk=sk) raise HTTPException(status_code=400, detail="Not implemented") def handle_app_moderation_input(params: dict, ak: str, sk: str): app_id = params.get("app_id") inputs = params.get("inputs", {}) query = params.get("query") contents = ( [query] if len(query) <= MAX_LENGTH else split_text(query, MAX_LENGTH - 50) ) # 并发执行 bodys = [] with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(request, seg, "input", ak, sk) for seg in contents] for future in concurrent.futures.as_completed(futures): bodys.append(future.result()) risk_levels = [r.get("Data", {}).get("RiskLevel", "") for r in bodys] sensitive_levels = [r.get("Data", {}).get("SensitiveLevel", "") for r in bodys] attack_levels = [r.get("Data", {}).get("AttackLevel", "") for r in bodys] suggestion = "Pass" # 可以根据不同的场景返回不同的回答内容 output_response = "Your content violates our usage policy." # 只要有一个有风险就为Block if any(r != "" and r != "none" for r in risk_levels): suggestion = "Block" output_response = "Your content involves content security." elif any(a != "" and a != "S0" for a in sensitive_levels): suggestion = "Block" output_response = "Your content involves sensitive data." elif any(s != "" and s != "none" for s in attack_levels): suggestion = "Block" output_response = "Your content involves prompt attack." flagged = False action = "direct_output" if suggestion != "Pass": flagged = True # action = "overridden" # query = "[Query has been overridden due to sensitive information.]" response = {"flagged": flagged, "action": action} if flagged: if action == "direct_output": response["preset_response"] = output_response elif action == "overridden": response["inputs"] = inputs response["query"] = query print(response) return response def handle_app_moderation_output(params: dict, ak: str, sk: str): app_id = params.get("app_id") text = params.get("text", "") print(len(text)) if len(text) > MAX_LENGTH: content = text[-MAX_LENGTH:] else: content = text # 并发执行 body = request(content, "output", ak, sk) risk_level = body.get("Data", {}).get("RiskLevel", "") sensitive_level = body.get("Data", {}).get("SensitiveLevel", "") attack_level = body.get("Data", {}).get("AttackLevel", "") suggestion = "Pass" # 可以根据不同的场景返回不同的回答内容 output_response = "Your content violates our usage policy." # 只要有一个有风险就为Block if risk_level != "" and risk_level != "none": suggestion = "Block" output_response = "Your content involves content security." elif sensitive_level != "" and sensitive_level != "S0": suggestion = "Block" output_response = "Your content involves sensitive data." elif attack_level != "" and attack_level != "none": suggestion = "Block" output_response = "Your content involves prompt attack." flagged = False action = "direct_output" if suggestion != "Pass": flagged = True #action = "overridden" response = {"flagged": flagged, "action": action} replace_content = text keywords_list = [] # 遍历 Result 字段中的每个结果 for result in body.get("Data", {}).get("Result", []): # 提取 RiskWords(逗号分隔字符串) risk_words_str = result.get("RiskWords", "") if risk_words_str: risk_words_split = [kw.strip() for kw in risk_words_str.split(",")] keywords_list.extend(risk_words_split) # 获取命中自定义库的部分 customized_hits = result.get("CustomizedHit", []) for hit in customized_hits: keywords_str = hit.get("KeyWords", "") if keywords_str: # 使用逗号分割字符串,并去除前后空格 keywords_split = [kw.strip() for kw in keywords_str.split(",")] keywords_list.extend(keywords_split) for result in body.get("Data", {}).get("SensitiveResult", []): sensitive_data_list = result.get("SensitiveData", []) keywords_list.extend(sensitive_data_list) # 按长度排序避免部分覆盖 for keyword in sorted(set(keywords_list), key=len, reverse=True): if keyword: replace_content = replace_content.replace(keyword, "*" * len(keyword)) if flagged: if action == "direct_output": response["preset_response"] = output_response elif action == "overridden": response["text"] = replace_content print(response) return response if __name__ == "__main__": import uvicorn # 开放端口可以根据自定义选择 uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
- 启动脚本
pip install fastapi uvicorn uvicorn main:app --reload --host 0.0.0.0
- 参考文档
安全护栏API文档:https://help.aliyun.com/document_detail/2875414.html
Dify内容审查API扩展文档:https://docs.dify.ai/zh-hans/guides/tools/extensions/api-based/moderation#app-moderation-output-%E6%89%A9%E5%B1%95%E7%82%B9
调整内容替换 or 直接拒答
- 输出内容审核的示例代码中默认是内容替换,将命中的关键词或者敏感内容数据全部替换为*(星号)
- 可以通过调整返回的action=direct_output切换到直接回复拒答内容
2.2新增API扩展
- 设置-API扩展页面,新增 API 扩展
- API Endpoint:填写部署转发服务脚本后的可访问地址
- API-key:填写的是阿里云AK/SK通过:拼接后的Base64字符串,伪代码如下:
- base64({aliyun_accessKey_id}:{aliyun_accessKey_secret})
import base64 # AccessKeyId 和 AccessKeySecret access_key_id = "" access_key_secret = "" # 拼接并编码 auth_str = f"{access_key_id}:{access_key_secret}" encoded_auth = base64.b64encode(auth_str.encode('utf-8')).decode('utf-8') print(encoded_auth)
2.3 在Agent配置API扩展
- 在Agent页面的右下角选择管理去配置内容审查
- 选择API 扩展
- 选择刚才新建的安全护栏接口的API 扩展
- 根据需要打开输入和输出内容的开关
- 在输出时dify会累计约300个字符做一次内容审查
2.4示例效果
以下是API内容审查在Agent中的效果示例
以下是API内容审查在私有化版workflow中的效果示例
常见问题
私有化版本安装时提示失败
方法一:禁用签名校验
cd ${dify_path}/docker # 停止 docker compose down vi .env # 将FORCE_VERIFYING_SIGNATURE修改为false FORCE_VERIFYING_SIGNATURE=false # 重启 docker compose up -d
方法二:使用签名版
- 下载签名版和签名所用的公钥
- 将用公钥放在插件守护程序可以访问的位置。
例如,在 docker/volumes/plugin_daemon
下创建 public_keys
目录,并将公钥文件复制到对应路径:
mkdir docker/volumes/plugin_daemon/public_keys cp dify_sign.public.pem docker/volumes/plugin_daemon/public_keys
- 修改
docker-compose.yaml
services: plugin_daemon: environment: FORCE_VERIFYING_SIGNATURE: true THIRD_PARTY_SIGNATURE_VERIFICATION_ENABLED: true THIRD_PARTY_SIGNATURE_VERIFICATION_PUBLIC_KEYS: /app/storage/public_keys/dify_sign.public.pem
请注意,docker/volumes/plugin_daemon
在 plugin_daemon
容器中被挂载到 /app/storage
。确保在 THIRD_PARTY_SIGNATURE_VERIFICATION_PUBLIC_KEYS
中指定的路径对应于容器内的路径。
- 重启容器
cd docker docker compose down docker compose up -d
重启服务后,第三方签名验证功能将在当前社区版环境中启用。