1. 前言
1.1 AI 时代内容安全的重要性
随着大模型技术的发展,企业越来越依赖这些模型来进行业务处理。然而,数据安全成为了不容忽视的问题。主要有两方面的隐患:
- AI 生成内容的不可控性:LLM 的回答可能产生涉黄、涉暴等内容,为业务和企业引入内容安全风险
- 个人/企业敏感信息泄漏:国内外大模型厂商林立,而且也有越来越多的 AI 中转代理软件和服务,用户使用过程中存在信息泄漏风险
本文介绍的敏感信息拦截插件旨在为解决这两个问题提供方案,一方面实现 AI 生成内容的安全防护,另一方面可以防止敏感信息泄露。同时也提供了内容还原的机制,可以实现敏感信息不泄漏的同时,用户使用体验也不受影响。
1.2 什么是 AI Gateway
AI Gateway 的比较准确的定义是 AI Native 的 API Gateway,是基于 API Gateway 的能⼒来满⾜ AI Native 的需求。例如:
- 将传统的 QPS 限流扩展到 Token 限流。
- 将传统的负载均衡/重试/fallback 能力延伸,支持对接多个大模型厂商 API,提高整体稳定性。
- 扩展可观测能力,支持不同模型之间效果的对比 A/B Test,以及对话上下⽂链路 Tracing 等。
Higress[1]是阿⾥云开源的⼀款 AI Native 的 API Gateway,本文介绍的插件,也是基于此理念,本身可以作为一个通用的 Higress 网关插件,用在多种场景。但也面向 AI 场景做了优化,例如支持 SSE 协议,实现流式的内容过滤。
Higress 支持多种语言编写 Wasm 插件,插件更新采用热插拔机制对流量无损,可以很方便地热更新插件逻辑,无需重启或升级网关。这里使用了本人最擅长的 Rust 语言来开发这个敏感信息拦截插件。
2. 插件使用简介和示例
2.1 功能简介
插件的核心处理逻辑如上所示,主要针对请求/返回中的敏感信息进行拦截和替换,以保护数据安全。
- 拦截:当检测到请求或返回数据中包含敏感词时,插件会直接拦截并返回预设错误消息:
- 直接拦截:在处理数据范围内出现敏感词时直接拦截,并返回预设错误信息。
- 内置敏感词库和自定义敏感词:支持系统内置敏感词库和自定义敏感词。
系统内置敏感词目前使用了开源项目 senstive-word[2]中的词库。
- 替换:将请求数据中的敏感词替换为脱敏字符串,传递给后端服务。部分脱敏数据在后端服务返回后可进行还原:
- 脱敏字符串:将请求数据中的敏感词替换为脱敏字符串。
- 保证敏感数据不出域:保证敏感数据不会泄露到外部。
- 部分数据可还原:部分脱敏数据在后端服务返回后可进行还原。
- 自定义规则:支持标准正则和 GROK 规则,替换字符串支持变量替换。
插件的处理数据范围:
- 对于 openai 协议: 请求/返回对话内容。
- 对于通用的 json 协议: 只处理指定字段。
- 对于非 json 协议: 整个请求/返回 body。
2.2 使用示例
deny_message: "提问或回答中包含敏感信息,已被屏蔽" deny_words: - "张三" replace_roles: - regex: "%{MOBILE}" type: "replace" value: "***********" # 手机号 13800138000 -> "***********" - regex: "%{EMAILLOCALPART}@%{HOSTNAME:domain}" type: "replace" restore: true value: "****@$domain" # 电子邮箱 admin@gmail.com -> ****@gmail.com - regex: "%{IP}" type: "replace" restore: true value: "***.***.***.***" # ip 192.168.0.1 -> ***.***.***.*** - regex: "%{CHINAID}" type: "replace" value: "****" # 身份证号 110000000000000000 -> ****
注意这里的 %{EMAILLOCALPART} 使用到了 GROK 规则表达式[3],这是一种预定义的正则匹配方式,常见的匹配方式有:
- 手机号匹配:%{MOBILE}
- IP 地址匹配:%{IP}
- 中国居民身份证匹配:%{CHINAID}
- 电子邮箱匹配:%{EMAILLOCALPART}@%{HOSTNAME}
敏感信息拦截
- 请求拦截
- 用户请求内容 张三怎么样
- 插件返回内容 提问或回答中包含敏感信息,已被屏蔽
- 返回拦截
- 用户请求内容 XX的最大股东是谁
- 后端返回内容 XX的最大股东是张三
- 插件返回内容 提问或回答中包含敏感信息,已被屏蔽
敏感信息替换
- 用户请求内容 帮我整理这个用户的信息形成表格,姓名是张三,手机号是13800138000
- 敏感词替换后后端收到内容 帮我整理这个用户的信息形成表格,姓名是张三,手机号是***********
敏感信息替换后还原
- 用户请求内容 用 sendmail 给 test@gmail.com 发送一封内容为 测试 的邮件
- 敏感词替换后请求大模型内容 用 sendmail 给 ****@gmail.com 发送一封标题为 "测试标题",内容为 "测试内容" 的邮件
- 大模型返回内容 echo "测试内容" | sendmail -s "测试标题" ****@gmail.com
- 敏感词还原后返回用户内容 echo "测试内容" | sendmail -s "测试标题" test@gmail.com
3. 插件实现过程简介
3.1 如何使用 Rust 开发 Higress 插件
这里简单介绍一下如何用 Rust 开发一个 Higress 插件。
准备工作
确保您的系统上已安装 Rust 和 Cargo(Rust 的包管理工具)。如果尚未安装,请执行以下命令:
curl https://sh.rustup.rs -sSf | sh
由于我们正在处理最新的技术特性,基本的 Rust 安装是不够的。还需要安装 Rust 的 Nightly 工具链和 WASM 目标平台的支持:
rustup toolchain install nightly rustup target add wasm32-wasi
创建库
git clone https://github.com/alibaba/higress.git cd higress/plugins/wasm-rust/extensions cargo new --lib demo-wasm
这将在 higress/plugins/wasm-rust/extensionsdemo-wasm 目录中创建一个模板库项目。在 src/目录中会找到 lib.rs 文件,以及一个 Cargo.toml 文件,该文件告诉 Cargo 如何构建您的项目。
设置 Crate 类型
生成的库由 Envoy 的 C++ 代码加载,因此无需包含任何 Rust 特定的信息。为此,我们将设置 crate 类型为 cdylib 以生成更小的二进制文件。打开 Cargo.toml 文件,在 [lib] 部分添加:
[lib] crate-type = ["cdylib"]
添加依赖项
插件需要基于 higress 提供的 SDK 和修改后的 proxy-wasm-rust-sdk。将它添加到 Cargo.toml 作为依赖项:
[dependencies] higress-wasm-rust = { path = "../../", version = "0.1.0" } proxy-wasm = { git="https://github.com/higress-group/proxy-wasm-rust-sdk", branch="main", version="0.2.2" }
开始编码
编辑 src/lib.rs 文件。
use higress_wasm_rust::log::Log; use higress_wasm_rust::plugin_wrapper::{HttpContextWrapper, RootContextWrapper}; use higress_wasm_rust::rule_matcher::{on_configure, RuleMatcher, SharedRuleMatcher}; use multimap::MultiMap; use proxy_wasm::traits::{Context, HttpContext, RootContext}; use proxy_wasm::types::{Bytes, ContextType, DataAction, HeaderAction, LogLevel}; use serde::Deserialize; use std::cell::RefCell; use std::ops::DerefMut; use std::rc::Rc; proxy_wasm::main! {{ proxy_wasm::set_log_level(LogLevel::Trace); proxy_wasm::set_root_context(|_|Box::new(DemoWasmRoot::new())); }} const PLUGIN_NAME: &str = "demo-wasm"; // 核心代码逻辑 #[derive(Default, Debug, Deserialize, Clone)] struct DemoWasmConfig { // 配置文件结构体 } struct DemoWasm { // 每个请求对应的插件实例 log: Log, config: Option<DemoWasmConfig>, } impl Context for DemoWasm {} impl HttpContext for DemoWasm {} impl HttpContextWrapper<DemoWasmConfig> for DemoWasm { fn on_config(&mut self, config: &DemoWasmConfig) { // 获取config self.log.info(&format!("on_config {:?}", config)); self.config = Some(config.clone()) } fn on_http_request_complete_headers( &mut self, headers: &MultiMap<String, String>, ) -> HeaderAction { // 请求header获取完成回调 self.log .info(&format!("on_http_request_complete_headers {:?}", headers)); HeaderAction::Continue } fn on_http_response_complete_headers( &mut self, headers: &MultiMap<String, String>, ) -> HeaderAction { // 返回header获取完成回调 self.log .info(&format!("on_http_response_complete_headers {:?}", headers)); HeaderAction::Continue } fn cache_request_body(&self) -> bool { // 是否缓存请求body true } fn cache_response_body(&self) -> bool { // 是否缓存返回body true } fn on_http_request_complete_body(&mut self, req_body: &Bytes) -> DataAction { // 请求body获取完成回调 self.log.info(&format!( "on_http_request_complete_body {}", String::from_utf8(req_body.clone()).unwrap_or("".to_string()) )); DataAction::Continue } fn on_http_response_complete_body(&mut self, res_body: &Bytes) -> DataAction { // 返回body获取完成回调 self.log.info(&format!( "on_http_response_complete_body {}", String::from_utf8(res_body.clone()).unwrap_or("".to_string()) )); DataAction::Continue } } // 核心代码逻辑结束 struct DemoWasmRoot { log: Log, rule_matcher: SharedRuleMatcher<DemoWasmConfig>, } impl DemoWasmRoot { fn new() -> Self { DemoWasmRoot { log: Log::new(PLUGIN_NAME.to_string()), rule_matcher: Rc::new(RefCell::new(RuleMatcher::default())), } } } impl Context for DemoWasmRoot {} impl RootContext for DemoWasmRoot { fn on_configure(&mut self, _plugin_configuration_size: usize) -> bool { on_configure( self, _plugin_configuration_size, self.rule_matcher.borrow_mut().deref_mut(), &self.log, ) } fn create_http_context(&self, context_id: u32) -> Option<Box<dyn HttpContext>> { self.create_http_context_use_wrapper(context_id) } fn get_type(&self) -> Option<ContextType> { Some(ContextType::HttpContext) } } impl RootContextWrapper<DemoWasmConfig> for DemoWasmRoot { fn rule_matcher(&self) -> &SharedRuleMatcher<DemoWasmConfig> { &self.rule_matcher } fn create_http_context_wrapper( &self, _context_id: u32, ) -> Option<Box<dyn HttpContextWrapper<DemoWasmConfig>>> { Some(Box::new(DemoWasm { config: None, log: Log::new(PLUGIN_NAME.to_string()), })) } }
编译 WASM 模块
现在,我们需要将 Rust 代码编译成 WASM 模块:
cargo build --target wasm32-wasi --release
这会在 target/目录下生成 .wasm 二进制文件,然后我们可以复制出来:
cp target/wasm32-wasi/release/demo-wasm.wasm ./plugin.wasm
构建 WASM 镜像及部署
dockerfile。
FROM scratch COPY plugin.wasm plugin.wasm
用以上 dockerfile 打包为镜像并推送。
docker build -t registry.cn-hangzhou.aliyuncs.com/demo/demo-wasm:1.0.0 . docker push registry.cn-hangzhou.aliyuncs.com/demo/demo-wasm:1.0.0
在 higress 控制台的插件配置 -> 添加插件 -> 镜像地址中填入刚才的镜像地址 registry.cn-hangzhou.aliyuncs.com/demo/demo-wasm:1.0.0 及其他参数。即完成插件部署。
3.2 敏感词插件实现原理
敏感词插件通过解析请求和响应数据,使用配置文件定义敏感词列表以及处理规则来识别敏感词。插件的核心逻辑如下:
数据解析
插件在 on_http_request_complete_body 中按照配置支持的协议对请求进行解析,并提取出需要处理的内容。
fn on_http_request_complete_body(&mut self, req_body: &Bytes) -> DataAction { if self.config.is_none() { return DataAction::Continue; } let config = self.config.as_ref().unwrap(); let mut req_body = match String::from_utf8(req_body.clone()) { Ok(r) => r, Err(_) => return DataAction::Continue, }; if config.deny_openai { if let Ok(r) = serde_json::from_str(req_body.as_str()) { let req: Req = r; // openai 协议 return DataAction::Continue; } } if !config.deny_jsonpath.is_empty() { if let Ok(r) = serde_json::from_str(req_body.as_str()) { // jsonpath配置 return DataAction::Continue; } } if config.deny_raw { // raw原始数据 return DataAction::Continue; } DataAction::Continue }
敏感词拦截
插件会检查请求数据是否包含系统内置或自定义的敏感词,如果包含则根据配置直接拦截请求并返回错误信息。
使用 jieba 自定义词库方式对敏感词进行初始化,最终从 double-array trie 中对敏感词进行匹配拦截。
fn check(&self, message: &str) -> bool { for word in self.jieba.cut(message, true) { if self.words.contains(word) { return true; } } false } fn check_message(&self, message: &str) -> bool { if let Some(config) = &self.config { config.deny_words.check(message) || (config.system_deny && SYSTEM.deny_word.check(message)) } else { false } }
敏感词替换
对于需要替换的敏感词,插件会按照定义好的规则进行替换,这些规则支持正则表达式和 GROK 模式。
fn grok_to_pattern(&self, pattern: &str) -> (String, bool) { let mut ok = true; let mut ret = pattern.to_string(); for _c in self.grok_regex.captures_iter(pattern) { if _c.is_err() { ok = false; continue; } let c = _c.unwrap(); if let (Some(full), Some(name)) = (c.get(0), c.name("pattern")) { if let Some(p) = self.grok_patterns.get(name.as_str()) { if let Some(alias) = c.name("alias") { ret = ret.replace(full.as_str(), &format!("(?P<{}>{})", alias.as_str(), p)); } else { ret = ret.replace(full.as_str(), p); } } else { ok = false; } } } (ret, ok) } fn replace_request_msg(&mut self, message: &str) -> String { let config = self.config.as_ref().unwrap(); let mut msg = message.to_string(); for rule in &config.replace_roles { msg = rule.regex.replace_all(&msg, &rule.value).to_string(); } msg }
数据恢复
如果被替换词只对应一个原始词,插件可以在响应中将脱敏后的数据恢复为原始数据。
# 替换部分: for _m in rule.regex.find_iter(&msg) { if _m.is_err() { continue; } let m = _m.unwrap(); let from_word = m.as_str(); let to_word = match rule.type_ { Type::Hash => { let digest = md5::compute(from_word.as_bytes()); format!("{:x}", digest) } Type::Replace => rule.regex.replace(from_word, &rule.value).to_string(), }; replace_pair.push((from_word.to_string(), to_word.clone())); if rule.restore && !to_word.is_empty() { match self.mask_map.entry(to_word) { std::collections::hash_map::Entry::Occupied(mut e) => { e.insert(None); } std::collections::hash_map::Entry::Vacant(e) => { e.insert(Some(from_word.to_string())); } } } } for (from_word, to_word) in replace_pair { msg = msg.replace(&from_word, &to_word); } #恢复部分: if let Ok(body_str) = std::str::from_utf8(&body) { let mut new_str = body_str.to_string(); if self.is_openai { let messages = self.process_sse_message(body_str); for message in messages { let mut new_message = message.clone(); for (from_word, to_word) in self.mask_map.iter() { if let Some(to) = to_word { new_message = new_message.replace(from_word, to); } } if new_message != message { new_str = new_str.replace( &json!(message).to_string(), &json!(new_message).to_string(), ); } } } else { for (from_word, to_word) in self.mask_map.iter() { if let Some(to) = to_word { new_str = new_str.replace(from_word, to); } } } if new_str != body_str { self.replace_http_response_body(new_str.as_bytes()); } }
4. 总结
本⽂对敏感信息拦截插件的使用方式和实现原理进行了简单介绍,它能够自动检测并处理请求和响应中的敏感词,有效防止敏感信息泄露。通过对不同数据范围的支持和灵活的配置选项,该插件能够适应各种应用场景,确保数据的安全性和合规性。希望对你有帮助!
插件的实现已经提交 PR 给 Higress 开源社区,可以到这里查看完整的代码实现链接
也欢迎⼤家提出宝贵建议,可以直接在上⾯ PR 中评论,或者在 Higress 社区交流群(钉钉群号:30735012403)⾥⼀起沟通。
相关链接:
[1] Higress
[2] senstive-word
[3] GROK 规则表达式
作者:刘毅杰,棱镜七彩信息科技有限公司研发,Higress Member