1. 背景
我们要在 Higress 网关中编写 WebAssembly(wasm)插件,使得在 http 请求的各个阶段(requestHeader,requestBody,responseHeader,responseBody)能够将相应的请求或返回捕获进行业务逻辑的处理。具体到本比赛,主要需要实现的是缓存对大模型的请求(openai 接口的形式)在本地(或云数据库),并设计语义级别的缓存命中逻辑来实现降低响应请求且减少 token 费用的目的。
AI Cache 示例
以上图为例,本比赛主要的问题可以归纳为:(1)如何根据 Query 字符串生成合适的 Query 向量 ⇒ 向量生成器选型。(2)如何根据 Query 向量进行语义级别的查找,快速找到合适的缓存向量 ⇒ 缓存命中逻辑设计。(3)如何管理大量的缓存⇒向量数据库选型及重复初始化逻辑。
实际上 Redis 也具备 Vector Store 能力,这里的 Cache Store 和 Vector Store 是可以合并的。不过本 Demo 将二者分开了,Cache Store 使用 Redis,Vector Store 使用阿里云 DashVector 服务。
2. 网关环境搭建
首先我们需要在线上搭建网关环境以供测试和评测使用,本文也提供了本地搭建网关环境的方法供读者参考。注意这里的线上搭建环境是以赛题介绍为基础展开的,若读者已经搭建好线上的开源网关或企业网关可跳过本节。
2.1 搭建线上企业版 Higress 环境
赛题支持开源 Higress 和企业版 Higress 两种不同的配置,本文以企业版 Higress 为例进行展示。
2.1.1 申请免费试用并创建相应资源
2.1.2 创建服务 (通义千问的地址、Redis 服务、Dashscope 服务)
2.1.3 创建路由转发给通义千问
2.1.4 开启 Higress 插件市场中的 AI Proxy 插件
网关需要 AI Proxy 插件作为处理 AI 请求的支撑,我们可以采用插件市场中已有的 ai-proxy 插件。从源码编译的命令和上次如下所示。最后,配置环节需要提供大模型服务商的 api 和 token key 等,注意比赛需要使用通义千问的 qwen_long 模型。
2.1.5 编译上传 AI Cache 插件* (注意可能需要修改 “ai-cache” 名称防止和插件市场已有插件重复)
本比赛的核心在于自定义 AI Cache 插件以实现更鲁棒的缓存逻辑。首先还是以 Higress 源码提供的基础代码为例进行编译和上传,配置环节需要提供 redis 服务的名称。注意此步骤会在后续迭代代码中反复使用。
git clone https://github.com/alibaba/higress.git cd higress/plugins/wasm-go PLUGIN_NAME=ai-cache EXTRA_TAGS=proxy_wasm_version_0_2_100 make build
2.2 本地测试环境搭建和代码更新逻辑
由于线上环境的测试的成本较高,我们也可以采用 Higress + LobeChat 快速搭建私人 GPT 助理[1]的方式起两个 Docker 容器进行本地测试,参考 dockerfile 如下:
version: '3.9' networks: higress-net: external: false services: higress: image: registry.cn-hangzhou.aliyuncs.com/ztygw/aio-redis:1.4.1-rc.1 environment: - GATEWAY_COMPONENT_LOG_LEVEL=misc:error,wasm:debug # 重要,开启日志 - CONFIG_TEMPLATE=ai-proxy - DEFAULT_AI_SERVICE=qwen - DASHSCOPE_API_KEY= [YOUR_KEY] networks: - higress-net ports: - "9080:8080/tcp" - "9001:8001/tcp" volumes: - 本地data目录:/data - 本地log目录:/var/log/higress/ # 重要,方便在容器restrat之后查看日志 restart: always lobechat: image: lobehub/lobe-chat environment: - CODE=123456ed - OPENAI_API_KEY=unused - OPENAI_PROXY_URL=http://higress:8080/v1 networks: - higress-net ports: - "3210:3210/tcp" restart: always
主要更改了 Higress 的 image,environment 以及 volumes 的配置,启动和重启就是 docker compose up -d docker compose restart。
进一步地,我们需要了解本地代码编写的逻辑如何能反馈到测试环境中。和线上网关环境直接上传编译后的二进制 wasm 插件不同的是,这里需要采用的是:本地编写代码 ⇒ 本地编译 wasm 插件 ⇒ Docker 打包镜像并上传 ⇒ 修改本地测试环境配置中的镜像版本 ⇒ 开始测试并打印日志的流程。具体参考代码如下:
cd ${workspaceFolder}/higress/plugins/wasm-go PLUGIN_NAME=ai-cache EXTRA_TAGS=proxy_wasm_version_0_2_100 make build // 修改版本号(version.txt) export cur_version=$(cat ${workspaceFolder}/version.txt) && docker build -t [YOUR IMAGE_BASE_URL]:$cur_version -f Dockerfile . && docker push [YOUR_IMAGE_BASE_URL]:$cur_version // 修改本地测试环境配置中的镜像版本 sudo bash -c \"sed -i 's|oci://registry.cn-hangzhou.aliyuncs.com/XXX:[0-9]*\\\\.[0-9]*\\\\.[0-9]*|oci://registry.cn-hangzhou.aliyuncs.com/XXX:$(cat version.txt)|g' data/wasmplugins/ai-cache-1.0.0.yaml\
3. 文本向量请求逻辑及缓存命中逻辑编写
在本节中,我们将通过一个简单的示例来说明如何在网关中编写请求外部服务的缓存逻辑。
当查询到达时,与 Redis 中存储的键进行匹配(`redisSearchHandler`)。如果完全一致,则直接返回结果(`handleCacheHit`)。 如果不匹配,则请求 `text_embedding` 接口将查询转换为 `query_embedding`(`fetchAndProcessEmbeddings`)。 使用 `query_embedding` 与向量数据库中的向量进行 ANN 搜索,返回最接近的键,并通过阈值进行过滤(`performQueryAndRespond`)。 如果返回结果为空或距离大于阈值,则丢弃结果,本轮缓存未命中,最后将 `query_embedding` 存入向量数据库(`uploadQueryEmbedding`)。 如果距离小于阈值,则再次调用 Redis 对最相似的键进行匹配(`redisSearchHandler`)。 在响应阶段,请求 Redis 新增键值对,键为查询的问题,值为LLM 返回结果。
可以看到,除了 Redis 服务外,我们还需要请求文本向量化服务和向量数据库服务,这里我们分别选取向量生成器:阿里灵积通用文本向量接口[2]和向量数据库:阿里向量检索服务 DashVector[3]作为服务商。
注意:由于 wasm 插件不支持协程等特性,调用外部服务需遵循:如何在插件中请求外部服务[4]。
3.1 外部服务声明和注册
为了实现思路 1-5 的连续外部服务调用。在 Higress 相关的配置上,我们首先需要声明外部服务:
DashVectorClient wrapper.HttpClient `yaml:"-" json:"-"` DashScopeClient wrapper.HttpClient `yaml:"-" json:"-"` redisClient wrapper.RedisClient `yaml:"-" json:"-"`
并且在 ParseConfig 函数中注册外部服务:
c.DashVectorInfo.DashVectorClient = wrapper.NewClusterClient(wrapper.DnsCluster{ ServiceName: c.DashVectorInfo.DashVectorServiceName, Port: 443, Domain: c.DashVectorInfo.DashVectorAuthApiEnd, }) c.DashVectorInfo.DashScopeClient = wrapper.NewClusterClient(wrapper.DnsCluster{ ServiceName: c.DashVectorInfo.DashScopeServiceName, Port: 443, Domain: "dashscope.aliyuncs.com", })
这里的 ParseConfig 函数是在 http 请求的各个阶段回调函数(requestHeader,requestBody,responseHeader,responseBody)之前的注册函数。
3.2 AI Cache 配置文件
在增加了上述外部服务的基础上,对应的 AI Cache 的配置文件也需要进行修改,此处对应 1.1.5 节的配置。示例配置如下:
Dash: dashScopeKey: "YOUR_DASHSCOPE_KEY" # 这个是文本向量的key dashScopeServiceName: "qwen" # 重要,需要和scope对应的服务名匹配 dashVectorCollection: "YOUR_CLUSTER_NAME" dashVectorEnd: "YOUR_VECTOR_END" dashVectorKey: "YOUR_DASHVECTOR_KEY" # 这个是DASHVECTOR的key dashVectorServiceName: "DashVector.dns" # 重要,需要新建一个vector对应的DNS服务 sessionID: "XXX" # 可用可不用,主要用于重复初始化逻辑 redis: # 重要 serviceName: "redis.static" timeout: 2000
3.3 连续 callback 实现连续服务调用
基于上述思路,实现的核心代码如下。其中的核心难点仍在于如何实现服务间的连续调用问题,以 onHttpRequestBody 函数为例,代码需要实现并发逻辑,而不是简单的顺序逻辑。因此,主函数代码必须返回 types.Action,即声明是阻塞还是继续执行。当前逻辑要求在处理完缓存命中逻辑后才能继续操作,因此主函数需要返回 types.Pause。最后,根据我们的处理逻辑,在调用外部服务的回调函数中,根据是否命中缓存执行 proxywasm.ResumeHttpRequest() 或直接返回 proxywasm.SendHttpResponse()。
// ===================== 以下是主要逻辑 ===================== // 主handler函数,根据key从redis中获取value ,如果不命中,则首先调用文本向量化接口向量化query,然后调用向量搜索接口搜索最相似的出现过的key,最后再次调用redis获取结果 // 可以把所有handler单独提取为文件,这里为了方便读者复制就和主逻辑放在一个文件中了 // // 1. query 进来和 redis 中存的 key 匹配 (redisSearchHandler) ,若完全一致则直接返回 (handleCacheHit) // 2. 否则请求 text_embdding 接口将 query 转换为 query_embedding (fetchAndProcessEmbeddings) // 3. 用 query_embedding 和向量数据库中的向量做 ANN search,返回最接近的 key ,并用阈值过滤 (performQueryAndRespond) // 4. 若返回结果为空或大于阈值,舍去,本轮 cache 未命中, 最后将 query_embedding 存入向量数据库 (uploadQueryEmbedding) // 5. 若小于阈值,则再次调用 redis对 most similar key 做匹配。(redisSearchHandler) // 7. 在 response 阶段请求 redis 新增key/LLM返回结果 func redisSearchHandler(key string, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool, ifUseEmbedding bool) error { err := config.redisClient.Get(config.CacheKeyPrefix+key, func(response resp.Value) { if err := response.Error(); err == nil && !response.IsNull() { log.Warnf("cache hit, key:%s", key) handleCacheHit(key, response, stream, ctx, config, log) } else { log.Warnf("cache miss, key:%s", key) if ifUseEmbedding { handleCacheMiss(key, err, response, ctx, config, log, key, stream) } else { proxywasm.ResumeHttpRequest() return } } }) return err } // 简单处理缓存命中的情况, 从redis中获取到value后,直接返回 func handleCacheHit(key string, response resp.Value, stream bool, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log) { log.Warnf("cache hit, key:%s", key) ctx.SetContext(CacheKeyContextKey, nil) if !stream { proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "application/json; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnResponseTemplate, response.String())), -1) } else { proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "text/event-stream; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnStreamResponseTemplate, response.String())), -1) } } // 处理缓存未命中的情况,调用fetchAndProcessEmbeddings函数向量化query func handleCacheMiss(key string, err error, response resp.Value, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, queryString string, stream bool) { if err != nil { log.Warnf("redis get key:%s failed, err:%v", key, err) } if response.IsNull() { log.Warnf("cache miss, key:%s", key) } fetchAndProcessEmbeddings(key, ctx, config, log, queryString, stream) } // 调用文本向量化接口向量化query, 向量化成功后调用processFetchedEmbeddings函数处理向量化结果 func fetchAndProcessEmbeddings(key string, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, queryString string, stream bool) { Emb_url, Emb_requestBody, Emb_headers := ConstructTextEmbeddingParameters(&config, log, []string{queryString}) config.DashVectorInfo.DashScopeClient.Post( Emb_url, Emb_headers, Emb_requestBody, func(statusCode int, responseHeaders http.Header, responseBody []byte) { // log.Infof("statusCode:%d, responseBody:%s", statusCode, string(responseBody)) log.Infof("Successfully fetched embeddings for key: %s", key) if statusCode != 200 { log.Errorf("Failed to fetch embeddings, statusCode: %d, responseBody: %s", statusCode, string(responseBody)) ctx.SetContext(QueryEmbeddingKey, nil) proxywasm.ResumeHttpRequest() } else { processFetchedEmbeddings(key, responseBody, ctx, config, log, stream) } }, 10000) } // 先将向量化的结果存入上下文ctx变量,其次发起向量搜索请求 func processFetchedEmbeddings(key string, responseBody []byte, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool) { text_embedding_raw, _ := ParseTextEmbedding(responseBody) text_embedding := text_embedding_raw.Output.Embeddings[0].Embedding // ctx.SetContext(CacheKeyContextKey, text_embedding) ctx.SetContext(QueryEmbeddingKey, text_embedding) ctx.SetContext(CacheKeyContextKey, key) performQueryAndRespond(key, text_embedding, ctx, config, log, stream) } // 调用向量搜索接口搜索最相似的key,搜索成功后调用redisSearchHandler函数获取最相似的key的结果 func performQueryAndRespond(key string, text_embedding []float64, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool) { vector_url, vector_request, vector_headers, err := ConstructEmbeddingQueryParameters(config, text_embedding) if err != nil { log.Errorf("Failed to perform query, err: %v", err) proxywasm.ResumeHttpRequest() return } config.DashVectorInfo.DashVectorClient.Post( vector_url, vector_headers, vector_request, func(statusCode int, responseHeaders http.Header, responseBody []byte) { log.Infof("statusCode:%d, responseBody:%s", statusCode, string(responseBody)) query_resp, err_query := ParseQueryResponse(responseBody) if err_query != nil { log.Errorf("Failed to parse response: %v", err) proxywasm.ResumeHttpRequest() return } if len(query_resp.Output) < 1 { log.Warnf("query response is empty") uploadQueryEmbedding(ctx, config, log, key, text_embedding) return } most_similar_key := query_resp.Output[0].Fields["query"].(string) log.Infof("most similar key:%s", most_similar_key) most_similar_score := query_resp.Output[0].Score if most_similar_score < 0.1 { ctx.SetContext(CacheKeyContextKey, nil) redisSearchHandler(most_similar_key, ctx, config, log, stream, false) } else { log.Infof("the most similar key's score is too high, key:%s, score:%f", most_similar_key, most_similar_score) uploadQueryEmbedding(ctx, config, log, key, text_embedding) proxywasm.ResumeHttpRequest() return } }, 100000) } // 未命中cache,则将新的query embedding和对应的key存入向量数据库 func uploadQueryEmbedding(ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, key string, text_embedding []float64) error { vector_url, vector_body, err := ConsturctEmbeddingInsertParameters(&config, log, text_embedding, key) if err != nil { log.Errorf("Failed to construct embedding insert parameters: %v", err) proxywasm.ResumeHttpRequest() return nil } err = config.DashVectorInfo.DashVectorClient.Post( vector_url, [][2]string{ {"Content-Type", "application/json"}, {"dashvector-auth-token", config.DashVectorInfo.DashVectorKey}, }, vector_body, func(statusCode int, responseHeaders http.Header, responseBody []byte) { if statusCode != 200 { log.Errorf("Failed to upload query embedding: %s", responseBody) } else { log.Infof("Successfully uploaded query embedding for key: %s", key) } proxywasm.ResumeHttpRequest() }, 10000, ) if err != nil { log.Errorf("Failed to upload query embedding: %v", err) proxywasm.ResumeHttpRequest() return nil } return nil } // ===================== 以上是主要逻辑 =====================
此外,该逻辑只能在返回值为 types.Action 的函数中使用,例如 onHttpResponseBody 这样的流式处理函数无法以类似方式处理。尽管可以确保请求被发送出去,但由于没有阻塞操作,无法调用回调函数。如果有需要,可以参考 wasm-go/pkg/wrapper/http_wrapper.go,添加信号变量进行修改。
4. 总结
本文的完整代码已发布在 GitHub[5]。本文提供的思路仅为抛砖引玉,如何在实际场景中解决复杂的缓存需求仍需各位的智慧。祝大家在比赛中取得理想的成绩!
相关链接:[1] Higress + LobeChat 快速搭建私人 GPT 助理
[4] 如何在插件中请求外部服务
[5] GitHub
作者介绍:杨贝宁,爱丁堡大学博士在读,研究方向为向量数据库