Dify平台集成安全护栏最佳实践

简介: Dify平台提供低代码构建AI大模型应用的解决方案,支持云服务与私有化部署。本文介绍了在工作流和Agent中集成安全护栏的最佳实践,包括插件和扩展API两种方案。插件方式适用于工作流,一键安装实现输入输出防控;扩展API方式适用于Agent和工作流私有化部署场景,通过本地服务适配安全护栏API。文中还详细说明了操作步骤、前提条件及常见问题处理方法,帮助用户快速实现内容安全控制。

Dify平台是AI大模型应用低代码构建平台,提供云服务和私有化部署两种使用方式。本文介绍如何在工作流和Agent中集成安全护栏的最佳实践。

方案概览

本文分别针对在Agent和工作流2种应用,提供了插件和扩展API两种方案对安全护栏集成。

工作流-插件:通过本地插件扩展的方式,一键安装安全护栏插件实现对大模型输入和输出进行防控。

Agent-扩展API:通过API扩展的方式对内容审查API进行扩展,需要部署一套安全护栏与Dify内容审查API标准协议进行适配的服务,考虑到私有化场景的网络、安全等个性化需求,建议本地化进行服务部署,本文提供实现服务的代码示例作为参考。

工作流-扩展API:私有化部署和公共云存在一定差异,具体使用方式可以参考Agent-扩展API。

私有化

官网公共云

使用场景

流式

支持场景

对接工作

流式

支持场景

对接工作

插件

工作流

0代码一键集成

工作流

0代码一键集成

扩展API

Agent

workflow工作流

chatflow工作流

参考本文代码示例,本地部署转发服务

Agent

chatflow工作流

参考本文代码示例,部署公网转发服务,需域名备案。

前提条件

1.已开通阿里云 AI 安全护栏产品(必须)。https://help.aliyun.com/document_detail/2872706.html

2.在控制台配置安全护栏开关和检测规则(可选)。https://yundun.console.aliyun.com/?p=guardrail&protectConfig/testConfig/testConfig#/protectConfig/testConfig/testConfig

操作步骤

1.工作流本地插件集成安全护栏

以下以公共云workflow作为演示示例,Dify官网云上使用流程一致。

1.1.下载插件

1.2.安装插件

1.3.在工作流中使用插件

大模型输入风险检测

在LLM节点之前新增安全护栏插件节点,检测内容为输入变量为input,检测类型为输入。

大模型输出风险检测

在LLM节点下一步,选择工具=>AI安全护栏节点。

安全护栏插件节点配置,输入变量为大模型text变量,检测类型为输出。

1.4.在阿里云安全护栏控制台排查

输入输出检测项开关配置

可以针对输入输分别配置敏感数据检测、提示词攻击的检测开关

控制台地址:https://yundun.console.aliyun.com/?p=guardrail#/protectConfig/testConfig/testConfig

查看安全检测结果

观测结果,输出内容存在S2风险。

控制台地址:https://yundun.console.aliyun.com/?p=guardrail#/resultTest/resultQuery

1.5.示例效果

示例分别演示了大模型输出存在敏感数据风险和输出正常结果的例子,具体应用时可以按照业务需求来编排。

2.Agent扩展内容审查API集成安全护栏

以下以私有化部署作为演示示例,Dify官网云上使用流程一致。

2.1本地化部署转发服务

安全护栏API单次输入内容最大长度是2000个字符,因此当输入长度>2000字符需要做适配,本示例处理方法如下:

输入审查:将输入按照2000个字符切分成多段,并发调用安全护栏API。

输出审查:由于Dify每隔300字符左右发起一次内容审查API调用,处理上截取最近2000字符调用。

  • 代码示例
from fastapi import FastAPI, Body, HTTPException, Header
from pydantic import BaseModel
import base64
from collections.abc import Generator
from typing import Any
import hmac
import hashlib
from urllib.parse import quote
import requests
from datetime import datetime
from datetime import timezone
import uuid
import json
import re
import concurrent.futures
# 可以根据需要调用不同区域的服务,支持上海(cn-shanghai)、北京(cn-beijing)、杭州(cn-hangzhou)、深圳(cn-shenzhen)
SERVICE_URL = "https://green-cip.cn-shanghai.aliyuncs.com"
# 超过这个长度时对文本进行切分
MAX_LENGTH = 2000
# 调用安全护栏的输入检测和输出检测的ServiceCode
SERVICE_INPUT = "query_security_check"
SERVICE_OUTPUT = "response_security_check"
ENCODING = "UTF-8"
ISO8601_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
ALGORITHM = "HmacSHA1"
def format_iso8601_date():
    return datetime.now(timezone.utc).strftime(ISO8601_DATE_FORMAT)
def percent_encode(value):
    if value is None:
        return ""
    return (
        quote(value.encode(ENCODING), safe="~").replace("+", "%20").replace("*", "%2A")
    )
def create_signature(string_to_sign, secret):
    secret = secret + "&"
    signature = hmac.new(
        secret.encode(ENCODING), string_to_sign.encode(ENCODING), hashlib.sha1
    ).digest()
    return base64.b64encode(signature).decode(ENCODING)
def create_string_to_sign(http_method, parameters):
    sorted_keys = sorted(parameters.keys())
    canonicalized_query_string = ""
    for key in sorted_keys:
        canonicalized_query_string += (
            "&" + percent_encode(key) + "=" + percent_encode(parameters[key])
        )
    string_to_sign = (
        http_method
        + "&"
        + percent_encode("/")
        + "&"
        + percent_encode(canonicalized_query_string[1:])
    )
    return string_to_sign
def split_text(text: str, max_length: int = 1950) -> list[str]:
    """将文本按 max_length 分段,尽量保留完整句子(识别多种标点)"""
    segments = []
    while len(text) > max_length:
        # 提取当前最大长度范围内的子串
        chunk = text[:max_length]
        # 使用正则查找最后一个句号、感叹号、问号等断句符号的位置
        match = None
        for pattern in [r"[。!?;:\.?!]+"]:  # 匹配多种结束符号
            matches = list(re.finditer(pattern, chunk))
            if matches:
                match = matches[-1]  # 取最后一个匹配项
        if match:
            cut_point = match.end()  # 包含标点符号
        else:
            cut_point = max_length  # 找不到就强制截断
        segments.append(text[:cut_point])
        text = text[cut_point:]
    if text:
        segments.append(text)
    return segments
def request(content_segment, type, aliyun_access_key, aliyun_access_secret):
    print(datetime.now(), f" [{type} request content]-> {content_segment}")
    # 3.1 构造请求参数
    parameters = {
        "Action": "TextModerationPlus",
        "Version": "2022-03-02",
        "AccessKeyId": aliyun_access_key,
        "Timestamp": format_iso8601_date(),
        "SignatureMethod": "HMAC-SHA1",
        "SignatureVersion": "1.0",
        "SignatureNonce": str(uuid.uuid4()),
        "Format": "JSON",
        "Service": (
            SERVICE_INPUT if type == "input" else SERVICE_OUTPUT
        ),
        "ServiceParameters": json.dumps(
            {"content": content_segment}, ensure_ascii=False
        ),
    }
    string_to_sign = create_string_to_sign("POST", parameters)
    signature = create_signature(string_to_sign, aliyun_access_secret)
    parameters["Signature"] = signature
    # 3.2 发送请求
    response = requests.post(SERVICE_URL, data=parameters)
    body = response.json()
    print(datetime.now(), " [response body]-> ", body)
    if response.status_code != 200:
        raise Exception(
            f"response http status_code not 200. status_code: {response.status_code}, body: {body}"
        )
    if body.get("Code") != 200:
        raise Exception(
            f"response code not 200. code: {body.get('Code')}, body: {body}"
        )
    return body
app = FastAPI()
class InputData(BaseModel):
    point: str
    params: dict = {}
@app.post("/api/dify/receive")
async def dify_receive(data: InputData = Body(...), authorization: str = Header(None)):
    """
    Receive API query data from Dify.
    """
    print(data)
    auth_scheme, _, api_key = authorization.partition(" ")
    if auth_scheme.lower() != "bearer":
        raise HTTPException(status_code=401, detail="Unauthorized")
    # api_key decode
    try:
        decoded_bytes = base64.b64decode(api_key)
        decoded_str = decoded_bytes.decode("utf-8")
        ak, sk = decoded_str.split(":", 1)
    except Exception as e:
        # 如果库调用失败,抛出异常
        raise HTTPException(status_code=401, detail=f"Base64 Decode AK/SK fail: {e}")
    point = data.point
    if point == "ping":
        return {"result": "pong"}
    if point == "app.moderation.input":
        return handle_app_moderation_input(params=data.params, ak=ak, sk=sk)
    elif point == "app.moderation.output":
        return handle_app_moderation_output(params=data.params, ak=ak, sk=sk)
    raise HTTPException(status_code=400, detail="Not implemented")
def handle_app_moderation_input(params: dict, ak: str, sk: str):
    app_id = params.get("app_id")
    inputs = params.get("inputs", {})
    query = params.get("query")
    contents = (
        [query] if len(query) <= MAX_LENGTH else split_text(query, MAX_LENGTH - 50)
    )
    # 并发执行
    bodys = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(request, seg, "input", ak, sk) for seg in contents]
        for future in concurrent.futures.as_completed(futures):
            bodys.append(future.result())
    risk_levels = [r.get("Data", {}).get("RiskLevel", "") for r in bodys]
    sensitive_levels = [r.get("Data", {}).get("SensitiveLevel", "") for r in bodys]
    attack_levels = [r.get("Data", {}).get("AttackLevel", "") for r in bodys]
    suggestion = "Pass"
    # 可以根据不同的场景返回不同的回答内容
    output_response = "Your content violates our usage policy."
    # 只要有一个有风险就为Block
    if any(r != "" and r != "none" for r in risk_levels):
        suggestion = "Block"
        output_response = "Your content involves content security."
    elif any(a != "" and a != "S0" for a in sensitive_levels):
        suggestion = "Block"
        output_response = "Your content involves sensitive data."
    elif any(s != "" and s != "none" for s in attack_levels):
        suggestion = "Block"
        output_response = "Your content involves prompt attack."
    flagged = False
    action = "direct_output"
    if suggestion != "Pass":
        flagged = True
        # action = "overridden"
        # query = "[Query has been overridden due to sensitive information.]"
    response = {"flagged": flagged, "action": action}
    if flagged:
        if action == "direct_output":
            response["preset_response"] = output_response
        elif action == "overridden":
            response["inputs"] = inputs
            response["query"] = query
    print(response)
    return response
def handle_app_moderation_output(params: dict, ak: str, sk: str):
    app_id = params.get("app_id")
    text = params.get("text", "")
    print(len(text))
    if len(text) > MAX_LENGTH:
        content = text[-MAX_LENGTH:]
    else:
        content = text
    # 并发执行
    body = request(content, "output", ak, sk)
    risk_level = body.get("Data", {}).get("RiskLevel", "")
    sensitive_level = body.get("Data", {}).get("SensitiveLevel", "")
    attack_level = body.get("Data", {}).get("AttackLevel", "")
    suggestion = "Pass"
    # 可以根据不同的场景返回不同的回答内容
    output_response = "Your content violates our usage policy."
    # 只要有一个有风险就为Block
    if risk_level != "" and risk_level != "none":
        suggestion = "Block"
        output_response = "Your content involves content security."
    elif sensitive_level != "" and sensitive_level != "S0":
        suggestion = "Block"
        output_response = "Your content involves sensitive data."
    elif attack_level != "" and attack_level != "none":
        suggestion = "Block"
        output_response = "Your content involves prompt attack."
    flagged = False
    action = "direct_output"
    if suggestion != "Pass":
        flagged = True
        #action = "overridden"
    response = {"flagged": flagged, "action": action}
    replace_content = text
    keywords_list = []
    # 遍历 Result 字段中的每个结果
    for result in body.get("Data", {}).get("Result", []):
        # 提取 RiskWords(逗号分隔字符串)
        risk_words_str = result.get("RiskWords", "")
        if risk_words_str:
            risk_words_split = [kw.strip() for kw in risk_words_str.split(",")]
            keywords_list.extend(risk_words_split)
        # 获取命中自定义库的部分
        customized_hits = result.get("CustomizedHit", [])
        for hit in customized_hits:
            keywords_str = hit.get("KeyWords", "")
            if keywords_str:
                # 使用逗号分割字符串,并去除前后空格
                keywords_split = [kw.strip() for kw in keywords_str.split(",")]
                keywords_list.extend(keywords_split)
    for result in body.get("Data", {}).get("SensitiveResult", []):
        sensitive_data_list = result.get("SensitiveData", [])
        keywords_list.extend(sensitive_data_list)
    
    # 按长度排序避免部分覆盖
    for keyword in sorted(set(keywords_list), key=len, reverse=True): 
        if keyword:
            replace_content = replace_content.replace(keyword, "*" * len(keyword))
    if flagged:
        if action == "direct_output":
            response["preset_response"] = output_response
        elif action == "overridden":
            response["text"] = replace_content
    print(response)
    return response
if __name__ == "__main__":
    import uvicorn
    # 开放端口可以根据自定义选择
    uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
  • 启动脚本
pip install fastapi uvicorn
uvicorn main:app --reload --host 0.0.0.0
  • 参考文档  

安全护栏API文档:https://help.aliyun.com/document_detail/2875414.html

Dify内容审查API扩展文档:https://docs.dify.ai/zh-hans/guides/tools/extensions/api-based/moderation#app-moderation-output-%E6%89%A9%E5%B1%95%E7%82%B9

调整内容替换 or 直接拒答

  • 输出内容审核的示例代码中默认是内容替换,将命中的关键词或者敏感内容数据全部替换为*(星号)
  • 可以通过调整返回的action=direct_output切换到直接回复拒答内容

2.2新增API扩展

  • 设置-API扩展页面,新增 API 扩展

  • API Endpoint:填写部署转发服务脚本后的可访问地址
  • API-key:填写的是阿里云AK/SK通过:拼接后的Base64字符串,伪代码如下:
  • base64({aliyun_accessKey_id}:{aliyun_accessKey_secret})
import base64
# AccessKeyId 和 AccessKeySecret
access_key_id = ""
access_key_secret = ""
# 拼接并编码
auth_str = f"{access_key_id}:{access_key_secret}"
encoded_auth = base64.b64encode(auth_str.encode('utf-8')).decode('utf-8')
print(encoded_auth)

2.3 在Agent配置API扩展

  • 在Agent页面的右下角选择管理去配置内容审查

  • 选择API 扩展
  • 选择刚才新建的安全护栏接口的API 扩展
  • 根据需要打开输入和输出内容的开关

  • 在输出时dify会累计约300个字符做一次内容审查

2.4示例效果

以下是API内容审查在Agent中的效果示例

以下是API内容审查在私有化版workflow中的效果示例

常见问题

私有化版本安装时提示失败

方法一:禁用签名校验

cd ${dify_path}/docker
# 停止
docker compose down
vi .env
# 将FORCE_VERIFYING_SIGNATURE修改为false
FORCE_VERIFYING_SIGNATURE=false
# 重启
docker compose up -d

方法二:使用签名版

官方文档:https://docs.dify.ai/zh-hans/plugins/publish-plugins/signing-plugins-for-third-party-signature-verification

  1. 下载签名版和签名所用的公钥
  2. 将用公钥放在插件守护程序可以访问的位置。

例如,在 docker/volumes/plugin_daemon 下创建 public_keys 目录,并将公钥文件复制到对应路径:

mkdir docker/volumes/plugin_daemon/public_keys
cp dify_sign.public.pem docker/volumes/plugin_daemon/public_keys
  1. 修改docker-compose.yaml
services:
  plugin_daemon:
    environment:
      FORCE_VERIFYING_SIGNATURE: true
      THIRD_PARTY_SIGNATURE_VERIFICATION_ENABLED: true
      THIRD_PARTY_SIGNATURE_VERIFICATION_PUBLIC_KEYS: /app/storage/public_keys/dify_sign.public.pem

请注意,docker/volumes/plugin_daemonplugin_daemon 容器中被挂载到 /app/storage。确保在 THIRD_PARTY_SIGNATURE_VERIFICATION_PUBLIC_KEYS 中指定的路径对应于容器内的路径。

  1. 重启容器
cd docker
docker compose down
docker compose up -d

重启服务后,第三方签名验证功能将在当前社区版环境中启用。

相关文章
|
23天前
|
数据可视化 Java BI
将 Spring 微服务与 BI 工具集成:最佳实践
本文探讨了 Spring 微服务与商业智能(BI)工具集成的潜力与实践。随着微服务架构和数据分析需求的增长,Spring Boot 和 Spring Cloud 提供了构建可扩展、弹性服务的框架,而 BI 工具则增强了数据可视化与实时分析能力。文章介绍了 Spring 微服务的核心概念、BI 工具在企业中的作用,并深入分析了两者集成带来的优势,如实时数据处理、个性化报告、数据聚合与安全保障。同时,文中还总结了集成过程中的最佳实践,包括事件驱动架构、集中配置管理、数据安全控制、模块化设计与持续优化策略,旨在帮助企业构建高效、智能的数据驱动系统。
将 Spring 微服务与 BI 工具集成:最佳实践
|
3月前
|
人工智能 运维 API
Dify开发者必看:如何破解MCP集成与Prompt迭代难题?
Dify 是一个面向AI时代的开源大语言模型(LLM)应用开发平台,致力于让复杂的人工智能应用构建变得简单高效,目前已在全球范围内形成显著影响力,其 GitHub 仓库 Star 数截至 2025 年 6 月已突破 100,000+,目前,Dify 已经成为 LLMOps 领域增长最快的开源项目之一。
|
4月前
|
运维 安全 关系型数据库
【产品升级】Dataphin V5.1版本发布:跨云数据集成、指标管理、平台运维带来重大更新!
V5.1版本新增多项功能:对接AWS生态(支持Amazon EMR、Redshift等),强化研发技术支撑(如API认证升级、全量任务隔离),完善运营消费链路(新增业务指标管理、指标关系图),提升平台综合能力(自定义菜单、缩短升级停机时间)。这些功能助力企业实现高效数据治理与分析,未来还将拓展智能化与国际化支持。
294 0
|
3月前
|
人工智能 搜索推荐 API
AI-Compass DeepSearch深度搜索生态:集成阿里ZeroSearch、字节DeerFlow、MindSearch等前沿平台,实现超越传统关键词匹配的智能信息检索革命
AI-Compass DeepSearch深度搜索生态:集成阿里ZeroSearch、字节DeerFlow、MindSearch等前沿平台,实现超越传统关键词匹配的智能信息检索革命
AI-Compass DeepSearch深度搜索生态:集成阿里ZeroSearch、字节DeerFlow、MindSearch等前沿平台,实现超越传统关键词匹配的智能信息检索革命
|
2月前
|
供应链 监控 搜索推荐
35页PPT|零售行业自助数据分析方法论:指标体系构建平台集成、会员与商品精细化运营实践
在零售行业环境剧变的背景下,传统“人找货”模式正被“货找人”取代。消费者需求日益个性化,购买路径多元化,企业亟需构建统一的指标体系,借助BI平台实现数据驱动的精细化运营。本文从指标体系构建、平台集成到会员与商品运营实践,系统梳理零售经营分析的方法论,助力企业实现敏捷决策与业务闭环。
35页PPT|零售行业自助数据分析方法论:指标体系构建平台集成、会员与商品精细化运营实践
|
3月前
|
机器学习/深度学习 人工智能 监控
CI/CD与模型监控平台集成MLOps系统实现的全面路径
MLOps是机器学习模型在生产环境中持续优化、部署和维护的关键。通过CI/CD流水线和模型监控平台的结合,可以大大提高模型开发和运维的效率,实现高效、稳定的模型服务。随着AI技术的快速发展,MLOps将在企业级AI应用中发挥越来越重要的作用。
CI/CD与模型监控平台集成MLOps系统实现的全面路径
|
人工智能 运维 API
Dify 开发者必看:如何破解 MCP 集成与 Prompt 迭代难题?
Dify 是面向 AI 时代的开源大语言模型应用开发平台,GitHub Star 数超 10 万,为 LLMOps 领域增长最快项目之一。然而其在 MCP 协议集成、Prompt 敏捷调整及运维配置管理上存在短板。Nacos 3.0 作为阿里巴巴开源的注册配置中心,升级支持 MCP 动态管理、Prompt 实时变更与 Dify 环境变量托管,显著提升 Dify 应用的灵活性与运维效率。通过 Nacos,Dify 可动态发现 MCP 服务、按需路由调用,实现 Prompt 无感更新和配置白屏化运维,大幅降低 AI 应用开发门槛与复杂度。
617 20
|
7月前
|
人工智能 网络协议 Java
RuoYi AI:1人搞定AI中台!开源全栈式AI开发平台,快速集成大模型+RAG+支付等模块
RuoYi AI 是一个全栈式 AI 开发平台,支持本地 RAG 方案,集成多种大语言模型和多媒体功能,适合企业和个人开发者快速搭建个性化 AI 应用。
1635 77
RuoYi AI:1人搞定AI中台!开源全栈式AI开发平台,快速集成大模型+RAG+支付等模块
|
3月前
|
人工智能 JavaScript 安全
一文教你高效集成Qwen Code与ModelGate千万免费Toknn模型网关平台
本文详解如何高效集成Qwen Code与ModelGate模型网关平台,涵盖环境搭建、API配置、代码生成等关键步骤,助你实现智能编程与多模型管理,大幅提升AI开发效率。
|
6月前
|
人工智能 运维 关系型数据库
云服务API与MCP深度集成,RDS MCP最佳实践
近日,阿里云数据库RDS发布开源RDS MCP Server,将复杂的技术操作转化为自然语言交互,实现"对话即运维"的流畅体验。通过将RDS OpenAPI能力封装为MCP协议工具,用户只需像聊天一样描述需求,即可完成数据库实例创建、性能调优、故障排查等专业操作。本文介绍了RDS MCP(Model Context Protocol)的最佳实践及其应用,0代码,两步即可轻松完成RDS实例选型与创建,快来体验!
云服务API与MCP深度集成,RDS MCP最佳实践

热门文章

最新文章