自愈型RAG系统:从脆弱管道到闭环智能体的工程实践

简介: 传统RAG系统脆弱,用户真实查询易导致答非所问。自愈RAG通过闭环架构提升鲁棒性:HyDE优化检索、查询分解处理复合问题、CRAG评分过滤、交叉编码器重排序、动态学习积累经验,实现持续优化与自我纠正,构建企业级可靠应用。

RAG系统在生产环境中有个老大难问题:脆弱。演示时用精心准备的问题去问,效果看起来很惊艳。但真正上线后,用户的问题五花八门,向量数据库返回的文档语义上相似但实际答非所问,LLM又特别喜欢讨好,拿着一堆噪音数据照样能编出一套看似合理的答案。

那么问题出在哪呢?标准RAG是典型的开环架构:输入 → 嵌入 → 检索 → 生成,一条线走到底。每个环节都假设上游输出是完美的,一旦某步出错,错误就会一路传导到最终结果。

要做企业级的RAG应用,必须转向闭环系统,也就是所谓的自愈RAG。这里的核心思路是让系统具备自省能力:检测到问题后能自主纠正,而不是把错误直接甩给用户。

第一部分:自动检索

RAG的第一个坑其实是用户本身。没人会按照向量搜索的最佳实践来写查询,要么用行话缩写,要么问题模糊不清,要么一个问题里塞了好几件事。自愈系统需要在输入端加一道"防护栏",把这些原始查询转换成高质量的检索请求。

策略1:假设文档嵌入(HyDE)

传统检索是拿短问题去匹配长文档,比如用"crag架构"这几个字去搜整段技术文档。这种模态不匹配会严重影响召回质量。

HyDE的思路是这样的,先让LLM根据问题"编造"一个假设性的答案,然后用这个假设答案去做向量检索。因为假设答案和真实文档在形态上更接近,匹配效果自然更好。

文档片段展示了其工作方式,HyDE能处理各类查询,且不需要修改底层的GPT-3和Contriever/mContriever模型。

比如说:

用户查询:"CRAG评分器怎么工作的?"

HyDE生成:"CRAG评分器通过评估检索文档的相关性来运作,它会对每个文档打分……"(虚构内容)

向量搜索:用生成的内容去检索,而不是用原始问题

代码实现(hyde.py):

 from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings  
from llama_index.core.indices.query.query_transform import HyDEQueryTransform  
from llama_index.core.query_engine import TransformQueryEngine  
from llama_index.llms.openai import OpenAI  

# 1. 配置用于生成假设文档的LLM  
Settings.llm = OpenAI(model="gpt-4-turbo", temperature=0.7)  

def build_hyde_engine(index):  
    # 初始化HyDE转换  
    # include_original=True 确保同时搜索原始查询和假设文档  
    hyde = HyDEQueryTransform(include_original=True)  

    # 创建标准检索引擎  
    base_query_engine = index.as_query_engine(similarity_top_k=5)  

    # 用TransformQueryEngine包装  
    # 这个中间件会拦截查询,生成假设文档,然后执行搜索  
    hyde_engine = TransformQueryEngine(base_query_engine, query_transform=hyde)  

    return hyde_engine  

# 使用示例  
# index = VectorStoreIndex.from_documents(docs)  
# engine = build_hyde_engine(index)  
 # response = engine.query("Explain the self-correction mechanism in CRAG")

策略2:查询分解

用户问"Llama-3和GPT-4在代码任务上谁表现更好",简单检索很难找到一篇文档同时包含两个模型的对比数据。查询分解就是把这种复合问题拆成原子级子查询:"Llama-3代码能力"和"GPT-4代码能力",分别检索后再合并结果。

代码实现(query_decomposition.py):

 from langchain_openai import ChatOpenAI  
from langchain_core.prompts import ChatPromptTemplate  
from langchain_core.pydantic_v1 import BaseModel, Field  
from typing import List  

# 定义输出结构  
class SubQueries(BaseModel):  
    """待检索的子问题集合"""  
    questions: List[str] = Field(description="List of atomic sub-questions.")  

# 配置规划用的LLM  
llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)  

system_prompt = """You are an expert researcher. Break down the user's complex query.   
into simple, atomic sub-queries that a search engine can answer."""  

prompt = ChatPromptTemplate.from_messages([  
    ("system", system_prompt),  
    ("human", "{query}")  
])  

# 构建处理链  
planner = prompt | llm.with_structured_output(SubQueries)  

def plan_query(query: str):  
    result = planner.invoke({"query": query})  
    return result.questions  

# 使用示例  
# sub_qs = plan_query("Compare Llama-3 and GPT-4 on coding benchmarks")  
# print(sub_qs)   
 # 输出:

第二部分:控制层

文档检索回来了如何判断它们靠不靠谱?CRAG的做法是在流程里加一个"评分员"角色,对每个检索到的文档进行相关性评估。如果发现数据质量不行,系统不会硬着头皮生成答案,而是触发备用方案(比如去搜网页)。

检索评估器的工作原理:评估检索文档与输入的相关性,估算置信度,然后根据结果触发不同的后续动作——{正确、错误、模糊}三种状态对应不同处理路径。

这种分支决策逻辑用图结构来实现最合适,LangGraph正好派上用场。

CRAG工作流程如下:

  1. 检索:拿到候选文档
  2. 评分:LLM判断每个文档"相关"还是"不相关"
  3. 决策:相关就直接生成答案;不相关则改写查询后去搜网页

代码实现(corrective_rag.py):

 from typing import List, TypedDict  
from langchain_core.prompts import PromptTemplate  
from langchain_core.documents import Document  
from langchain_community.tools.tavily_search import TavilySearchResults  
from langchain_openai import ChatOpenAI  
from langgraph.graph import END, StateGraph, START  

# --- 1. 状态定义 ---  
class GraphState(TypedDict):  
    question: str  
    generation: str  
    web_search: str  # 'Yes'或'No'标记  
    documents: List  

# --- 2. 组件初始化 ---  
grader_llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)  
generator_llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)  
web_tool = TavilySearchResults(k=3)  

# --- 3. 节点定义 ---  

def grade_documents(state):  
    """  
    自愈核心节点:过滤低质量文档  
    """  
    print("---CHECK RELEVANCE---")  
    question = state["question"]  
    documents = state["documents"]  

    # 二分类结构化输出  
    structured_llm = grader_llm.with_structured_output(dict)  

    prompt = PromptTemplate(  
        template="""You are a grader assessing relevance.   
        Doc: {context}   
        Question: {question}  
        Return JSON with key 'score' as 'yes' or 'no'.""",  
        input_variables=["context", "question"],  
    )  
    chain = prompt | structured_llm  

    filtered_docs = []  
    web_search = "No"  

    for d in documents:  
        grade = chain.invoke({"question": question, "context": d.page_content})  
        if grade.get('score') == 'yes':  
            filtered_docs.append(d)  
        else:  
            # 丢失上下文时触发回退  
            web_search = "Yes"  

    return {"documents": filtered_docs, "question": question, "web_search": web_search}  

def transform_query(state):  
    """  
    自我纠正:重写查询以提升网页搜索效果  
    """  
    print("---TRANSFORM QUERY---")  
    question = state["question"]  
    # 简易重写链  
    prompt = PromptTemplate(template="Rewrite this for web search: {question}", input_variables=["question"])  
    chain = prompt | generator_llm  
    better_q = chain.invoke({"question": question}).content  
    return {"question": better_q}  

def web_search_node(state):  
    print("---WEB SEARCH---")  
    docs = web_tool.invoke({"query": state["question"]})  
    # 网页结果追加到已有文档  
    web_results = [Document(page_content=d["content"]) for d in docs]  
    return {"documents": state["documents"] + web_results}  

def generate(state):  
    print("---GENERATE---")  
    # 这里接标准RAG生成链  
    # generation = rag_chain.invoke(...)  
    return {"generation": "Final Answer Placeholder"}  

# --- 4. 图构建 ---  
workflow = StateGraph(GraphState)  

# 添加节点  
workflow.add_node("retrieve", lambda x: {"documents": []})  # 检索占位  
workflow.add_node("grade_documents", grade_documents)  
workflow.add_node("transform_query", transform_query)  
workflow.add_node("web_search_node", web_search_node)  
workflow.add_node("generate", generate)  

# 添加边  
workflow.add_edge(START, "retrieve")  
workflow.add_edge("retrieve", "grade_documents")  

def decide_to_generate(state):  
    if state["web_search"] == "Yes":  
        return "transform_query"  
    return "generate"  

workflow.add_conditional_edges(  
    "grade_documents",  
    decide_to_generate,  
    {"transform_query": "transform_query", "generate": "generate"}  
)  
workflow.add_edge("transform_query", "web_search_node")  
workflow.add_edge("web_search_node", "generate")  
workflow.add_edge("generate", END)  

 app = workflow.compile()

第三部分:自动排序

向量检索用的双编码器(Bi-Encoder)速度快但精度有限。文档被压缩成单个向量后,很多语义细节都丢了。解决办法是引入交叉编码器(Cross-Encoder)做二次排序。

交叉编码器把查询和文档作为一个整体输入,直接输出相关性分数的计算开销比较大,所以一般采用两阶段策略:

  1. 粗筛:向量库快速召回Top 50
  2. 精排:交叉编码器对这50个文档重新打分,保留Top 5

代码实现(reranker.py):

 from sentence_transformers import CrossEncoder  

class Reranker:  
    def __init__(self):  
        # 加载MS MARCO优化过的模型  
        self.model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')  

    def rerank(self, query, documents, top_k=5):  
        # 构造配对:[[query, doc1], [query, doc2]...]  
        pairs = [[query, doc] for doc in documents]  

        # 批量打分  
        scores = self.model.predict(pairs)  

        # 排序截取  
        results = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)  
         return [doc for doc, score in results[:top_k]]

第四部分:自动学习

高级的自愈系统不只是即时修复问题,还会从历史错误中学习,避免同样的坑反复踩。实现方式是动态少样本学习(Dynamic Few-Shot Learning)。

当系统生成了一个好答案(用户点了赞),就把这对查询-答案存到一个专门的"黄金样本库"里。后续遇到相似问题时,检索这些成功案例注入到prompt中,相当于用系统自己的成功经验来指导新的回答。

代码实现(dynamic_prompting.py):

 from llama_index.core import VectorStoreIndex, Document  
from llama_index.core.prompts import PromptTemplate  

class LearningManager:  
    def __init__(self):  
        self.good_examples = []  
        self.index = None  

    def add_good_example(self, query, answer):  
        """用户点赞时调用"""  
        doc = Document(text=f"Q: {query}\nA: {answer}")  
        self.good_examples.append(doc)  
        # 重建索引(生产环境建议用支持增量更新的向量库)  
        self.index = VectorStoreIndex.from_documents(self.good_examples)  

    def get_dynamic_prompt(self, current_query):  
        if not self.index:  
            return ""  

        # 检索相似的历史成功案例  
        retriever = self.index.as_retriever(similarity_top_k=2)  
        nodes = retriever.retrieve(current_query)  

        examples_text = "\n\n".join([n.text for n in nodes])  
        return f"Here are examples of how to answer correctly:\n{examples_text}"  

# 在管道中使用  
# manager = LearningManager()  
# few_shot_context = manager.get_dynamic_prompt(user_query)  
 # final_prompt = f"{few_shot_context}\n\nQuestion: {user_query}..."

进阶方向:DSPy自动优化

如果想要更程序化的优化方式,DSPy是个值得关注的框架。它把prompt当成可优化的程序来处理,他会跑一遍验证集并根据准确率等指标自动重写prompt和更新少样本示例。

 import dspy  

# 1. 定义RAG签名  
class GenerateAnswer(dspy.Signature):  
    """用简短事实性答案回答问题"""  
    context = dspy.InputField()  
    question = dspy.InputField()  
    answer = dspy.OutputField()  

# 2. 定义模块  
class RAG(dspy.Module):  
    def __init__(self):  
        super().__init__()  
        self.retrieve = dspy.Retrieve(k=3)  
        self.generate = dspy.ChainOfThought(GenerateAnswer)  

    def forward(self, question):  
        context = self.retrieve(question).passages  
        return self.generate(context=context, question=question)  

# 3. 优化  
# MIPROv2会运行管道,遇到失败就重试并重写指令  
# 目标是最大化指定metric(精确匹配、语义相似度等)  
optimizer = dspy.MIPROv2(metric=dspy.evaluate.SemanticF1)  
 optimized_rag = optimizer.compile(RAG(), trainset=training_data)

完整系统集成

各个组件都准备好了:HyDE、查询分解、CRAG、交叉编码器重排序、动态提示。现在把它们串成一个完整的自愈RAG系统。这个编排层负责协调整个流程:解析查询、增强检索、校验上下文、优化相关性、收集反馈学习、最终生成稳定可靠的答案。

 import os  
import json  
import asyncio  
from typing import List, Dict, Any, Optional  
from datetime import datetime  

# 导入各组件  
from hyde import build_hyde_engine, Settings  
from query_decomposition import plan_query, SubQueries  
from corrective_rag import app as crag_app, GraphState  
from reranker import Reranker  
from dynamic_prompting import LearningManager  

# 核心依赖  
from llama_index.core import VectorStoreIndex, Document, SimpleDirectoryReader  
from llama_index.llms.openai import OpenAI  
from langchain_openai import ChatOpenAI  
from langchain_core.prompts import PromptTemplate  
from sentence_transformers import CrossEncoder  

class SelfHealingRAGSystem:  
    """  
    完整自愈RAG系统,整合全部组件  
    """  

    def __init__(self, openai_api_key: str = None):  
        """初始化RAG系统"""  
        # API密钥配置  
        if openai_api_key:  
            os.environ["OPENAI_API_KEY"] = openai_api_key  

        # 组件初始化  
        print("🚀 Initializing Self-Healing RAG System...")  

        # 核心LLM  
        self.llm = OpenAI(model="gpt-4-turbo", temperature=0.3)  
        Settings.llm = self.llm  

        # 初始化各组件  
        self.reranker = Reranker()  
        self.learning_manager = LearningManager()  
        self.vector_index = None  
        self.hyde_engine = None  

        # 演示数据  
        self.sample_documents = self._create_sample_documents()  
        self._setup_vector_index()  

        # 统计  
        self.query_stats = {  
            "total_queries": 0,  
            "hyde_used": 0,  
            "decomposed_queries": 0,  
            "crag_activated": 0,  
            "reranked": 0,  
            "learning_applied": 0  
        }  

        print("✅ System initialized successfully!")  

    def _create_sample_documents(self) -> List[Document]:  
        """创建演示用的示例文档"""  
        sample_texts = [  
            """Retrieval-Augmented Generation (RAG) is a technique that combines   
            pre-trained language models with external knowledge retrieval. RAG systems   
            retrieve relevant documents from a knowledge base and use them to generate   
            more accurate and factual responses.""",  

            """Corrective RAG (CRAG) introduces a self-correction mechanism that grades   
            retrieved documents for relevance. If documents are deemed irrelevant, the   
            system triggers alternative retrieval strategies like web search.""",  

            """HyDE (Hypothetical Document Embeddings) improves retrieval by generating   
            hypothetical documents that answer the query, then searching for real documents   
            similar to these hypothetical ones.""",  

            """Cross-encoder reranking provides more accurate document scoring compared   
            to bi-encoder similarity search. It processes query-document pairs together   
            to produce refined relevance scores.""",  

            """DSPy enables automatic prompt optimization by treating prompts as programs   
            that can be compiled and optimized against specific metrics like accuracy   
            or semantic similarity.""",  

            """Self-healing RAG systems implement feedback loops that learn from successful   
            query-answer pairs, storing them as examples for future similar queries to   
            improve performance over time.""",  

            """Query decomposition breaks complex multi-part questions into atomic   
            sub-queries that can be individually processed and then combined for   
            comprehensive answers.""",  

            """Vector databases enable semantic search by converting documents into   
            high-dimensional embeddings that capture semantic meaning rather than   
            just keyword matches."""  
        ]  

        return [Document(text=text, metadata={"id": i}) for i, text in enumerate(sample_texts)]  

    def _setup_vector_index(self):  
        """用示例文档构建向量索引"""  
        print("📚 Setting up vector index...")  
        self.vector_index = VectorStoreIndex.from_documents(self.sample_documents)  
        self.hyde_engine = build_hyde_engine(self.vector_index)  
        print("✅ Vector index ready!")  

    def enhanced_retrieve(self, query: str, use_hyde: bool = True, top_k: int = 5) -> List[Document]:  
        """支持HyDE的增强检索"""  
        print(f"🔍 Retrieving documents for: '{query}'")  

        if use_hyde:  
            print("  🧠 Using HyDE for enhanced retrieval...")  
            response = self.hyde_engine.query(query)  
            # 从HyDE响应提取文档  
            documents = response.source_nodes  
            self.query_stats["hyde_used"] += 1  
        else:  
            print("  📖 Using standard retrieval...")  
            retriever = self.vector_index.as_retriever(similarity_top_k=top_k)  
            nodes = retriever.retrieve(query)  
            documents = nodes  

        # 转换为Document对象  
        docs = []  
        for node in documents:  
            doc = Document(  
                page_content=node.text if hasattr(node, 'text') else str(node),  
                metadata=node.metadata if hasattr(node, 'metadata') else {}  
            )  
            docs.append(doc)  

        print(f"  ✅ Retrieved {len(docs)} documents")  
        return docs  

    def decompose_and_retrieve(self, query: str) -> tuple[List[str], List[Document]]:  
        """分解复杂查询并分别检索"""  
        print(f"🔧 Decomposing query: '{query}'")  

        try:  
            sub_queries = plan_query(query)  
            if len(sub_queries) > 1:  
                print(f"  📝 Decomposed into {len(sub_queries)} sub-queries:")  
                for i, sq in enumerate(sub_queries, 1):  
                    print(f"    {i}. {sq}")  

                # 对每个子查询检索  
                all_docs = []  
                for sq in sub_queries:  
                    docs = self.enhanced_retrieve(sq, use_hyde=False, top_k=3)  
                    all_docs.extend(docs)  

                self.query_stats["decomposed_queries"] += 1  
                return sub_queries, all_docs  
            else:  
                print("  ➡️ Query doesn't need decomposition")  
                docs = self.enhanced_retrieve(query)  
                return [query], docs  
        except Exception as e:  
            print(f"  ⚠️ Error in decomposition: {e}")  
            docs = self.enhanced_retrieve(query)  
            return [query], docs  

    def apply_crag(self, query: str, documents: List[Document]) -> tuple[List[Document], str]:  
        """应用CRAG过滤文档"""  
        print("🔍 Applying CRAG (Corrective RAG)...")  

        try:  
            # 准备CRAG状态  
            state = GraphState(  
                question=query,  
                generation="",  
                web_search="No",  
                documents=documents  
            )  

            # 正常情况下会跑完整CRAG流程  
            # 这里为演示做简化处理  
            filtered_docs = []  
            for doc in documents[:3]:  # 演示限制  
                # 简单相关性检查(实际应该用LLM)  
                if any(keyword in doc.page_content.lower() for keyword in query.lower().split()):  
                    filtered_docs.append(doc)  

            if len(filtered_docs) < len(documents):  
                self.query_stats["crag_activated"] += 1  
                print(f"  🚨 CRAG filtered {len(documents) - len(filtered_docs)} irrelevant documents")  

            return filtered_docs, "Documents filtered by CRAG"  

        except Exception as e:  
            print(f"  ⚠️ Error in CRAG: {e}")  
            return documents, "CRAG not applied due to error"  

    def apply_reranking(self, query: str, documents: List[Document], top_k: int = 3) -> List[Document]:  
        """交叉编码器重排序"""  
        print("🎯 Applying cross-encoder reranking...")  

        try:  
            # 提取文本用于重排序  
            doc_texts = [doc.page_content for doc in documents]  

            if len(doc_texts) > 1:  
                reranked_texts = self.reranker.rerank(query, doc_texts, top_k)  

                # 映射回Document对象  
                reranked_docs = []  
                for text in reranked_texts:  
                    for doc in documents:  
                        if doc.page_content == text:  
                            reranked_docs.append(doc)  
                            break  

                self.query_stats["reranked"] += 1  
                print(f"  ✅ Reranked to top {len(reranked_docs)} documents")  
                return reranked_docs  
            else:  
                print("  ➡️ Not enough documents for reranking")  
                return documents  

        except Exception as e:  
            print(f"  ⚠️ Error in reranking: {e}")  
            return documents  

    def apply_dynamic_prompting(self, query: str) -> str:  
        """动态少样本学习"""  
        print("🧠 Applying dynamic prompting...")  

        try:  
            few_shot_context = self.learning_manager.get_dynamic_prompt(query)  
            if few_shot_context:  
                self.query_stats["learning_applied"] += 1  
                print("  ✅ Applied learned examples from previous successes")  
            else:  
                print("  ➡️ No relevant past examples found")  
            return few_shot_context  
        except Exception as e:  
            print(f"  ⚠️ Error in dynamic prompting: {e}")  
            return ""  

    def generate_answer(self, query: str, documents: List[Document], few_shot_context: str = "") -> str:  
        """基于检索文档生成答案"""  
        print("✍️ Generating final answer...")  

        # 合并文档内容  
        context = "\n\n".join([doc.page_content for doc in documents[:3]])  

        # 构建prompt,可选包含少样本示例  
        prompt_parts = []  
        if few_shot_context:  
            prompt_parts.append(few_shot_context)  

        prompt_parts.extend([  
            "Context:",  
            context,  
            f"\nQuestion: {query}",  
            "\nAnswer based on the provided context:"  
        ])  

        prompt = "\n".join(prompt_parts)  

        try:  
            response = self.llm.complete(prompt)  
            answer = response.text.strip()  
            print("  ✅ Answer generated successfully")  
            return answer  
        except Exception as e:  
            print(f"  ⚠️ Error generating answer: {e}")  
            return f"I apologize, but I encountered an error generating an answer: {e}"  

    def full_pipeline(self, query: str, user_feedback: bool = None, previous_answer: str = None) -> Dict[str, Any]:  
        """  
        运行完整自愈RAG管道  
        """  
        start_time = datetime.now()  
        print(f"\n🔄 Starting Self-Healing RAG Pipeline")  
        print(f"Query: '{query}'")  
        print("=" * 60)  

        self.query_stats["total_queries"] += 1  

        # 步骤1:查询增强  
        sub_queries, documents = self.decompose_and_retrieve(query)  

        # 步骤2:文档校验(CRAG)  
        filtered_docs, crag_status = self.apply_crag(query, documents)  

        # 步骤3:文档重排序  
        reranked_docs = self.apply_reranking(query, filtered_docs)  

        # 步骤4:动态提示  
        few_shot_context = self.apply_dynamic_prompting(query)  

        # 步骤5:答案生成  
        answer = self.generate_answer(query, reranked_docs, few_shot_context)  

        # 步骤6:学习(如有反馈)  
        if user_feedback is True and previous_answer:  
            try:  
                self.learning_manager.add_good_example(query, previous_answer)  
                print("📚 Added successful example to learning system")  
            except Exception as e:  
                print(f"⚠️ Error adding to learning system: {e}")  

        end_time = datetime.now()  
        processing_time = (end_time - start_time).total_seconds()  

        result = {  
            "query": query,  
            "sub_queries": sub_queries,  
            "documents_found": len(documents),  
            "documents_filtered": len(filtered_docs),  
            "final_documents": len(reranked_docs),  
            "answer": answer,  
            "crag_status": crag_status,  
            "processing_time": processing_time,  
            "components_used": self._get_components_used()  
        }  

        print("\n" + "=" * 60)  
        print(f"✅ Pipeline completed in {processing_time:.2f} seconds")  
        print(f"📊 Documents: {len(documents)} → {len(filtered_docs)} → {len(reranked_docs)}")  

        return result  

    def _get_components_used(self) -> List[str]:  
        """获取本次查询用到的组件"""  
        components = ["Vector Retrieval"]  

        if self.query_stats["hyde_used"] > 0:  
            components.append("HyDE")  
        if self.query_stats["decomposed_queries"] > 0:  
            components.append("Query Decomposition")  
        if self.query_stats["crag_activated"] > 0:  
            components.append("CRAG")  
        if self.query_stats["reranked"] > 0:  
            components.append("Cross-Encoder Reranking")  
        if self.query_stats["learning_applied"] > 0:  
            components.append("Dynamic Prompting")  

        return components  

    def get_system_stats(self) -> Dict[str, Any]:  
        """获取系统统计信息"""  
        return {  
            "total_queries": self.query_stats["total_queries"],  
            "hyde_usage_rate": f"{(self.query_stats['hyde_used'] / max(1, self.query_stats['total_queries']) * 100):.1f}%",  
            "decomposition_rate": f"{(self.query_stats['decomposed_queries'] / max(1, self.query_stats['total_queries']) * 100):.1f}%",  
            "crag_activation_rate": f"{(self.query_stats['crag_activated'] / max(1, self.query_stats['total_queries']) * 100):.1f}%",  
            "reranking_rate": f"{(self.query_stats['reranked'] / max(1, self.query_stats['total_queries']) * 100):.1f}%",  
            "learning_rate": f"{(self.query_stats['learning_applied'] / max(1, self.query_stats['total_queries']) * 100):.1f}%",  
            "learned_examples": len(self.learning_manager.good_examples)  
        }  

def demo_interactive_session():  
    """交互式演示"""  
    print("""  
    🎯 Self-Healing RAG System Demo  
    ================================  

    This system demonstrates:  
    • HyDE: Hypothetical Document Embeddings  
    • Query Decomposition: Breaking complex queries  
    • CRAG: Corrective RAG with document grading  
    • Cross-Encoder Reranking: Precision ranking  
    • Dynamic Learning: Few-shot from success examples  

    """)  

    # 初始化系统  
    system = SelfHealingRAGSystem()  

    # 演示用查询  
    demo_queries = [  
        "What is RAG and how does it work?",  
        "Compare HyDE and standard retrieval methods",  
        "How does CRAG improve retrieval quality and what are the benefits of cross-encoder reranking?",  
        "Explain the self-correction mechanisms in modern RAG systems",  
        "What are the advantages of DSPy optimization for prompts?"  
    ]  

    print("🔥 Running Demo Queries...")  
    print("=" * 50)  

    results = []  
    for i, query in enumerate(demo_queries, 1):  
        print(f"\n📋 Demo Query {i}/{len(demo_queries)}")  
        result = system.full_pipeline(query)  
        results.append(result)  

        print(f"\n💡 Answer:")  
        print(f"{result['answer']}")  
        print(f"\n📊 Components Used: {', '.join(result['components_used'])}")  

        # 模拟正反馈用于学习  
        if i > 1:  # 第二个查询开始加反馈  
            system.full_pipeline(query, user_feedback=True, previous_answer=result['answer'])  

    # 最终统计  
    print("\n" + "=" * 60)  
    print("📈 SYSTEM PERFORMANCE STATISTICS")  
    print("=" * 60)  
    stats = system.get_system_stats()  
    for key, value in stats.items():  
        print(f"{key.replace('_', ' ').title()}: {value}")  

    return system, results  

if __name__ == "__main__":  
    # 设置OpenAI API密钥  
    # os.environ["OPENAI_API_KEY"] = "your-key-here"  

     demo_interactive_session()

总结

经典的RAG到自愈RAG,本质上是从"检索"到"推理"的升级。HyDE和查询分解确保问对问题;CRAG和交叉编码器确保读对文档;自动学习机制则让系统不再反复犯同样的错。这套组合下来,RAG系统的泛化性会有质的提升。

https://avoid.overfit.cn/post/d95478d7799646acbed0e0d2dc2c480d

作者:Subrata Samanta

目录
相关文章
|
6天前
|
云安全 人工智能 自然语言处理
|
10天前
|
人工智能 Java API
Java 正式进入 Agentic AI 时代:Spring AI Alibaba 1.1 发布背后的技术演进
Spring AI Alibaba 1.1 正式发布,提供极简方式构建企业级AI智能体。基于ReactAgent核心,支持多智能体协作、上下文工程与生产级管控,助力开发者快速打造可靠、可扩展的智能应用。
890 30
|
4天前
|
机器学习/深度学习 人工智能 自然语言处理
Z-Image:冲击体验上限的下一代图像生成模型
通义实验室推出全新文生图模型Z-Image,以6B参数实现“快、稳、轻、准”突破。Turbo版本仅需8步亚秒级生成,支持16GB显存设备,中英双语理解与文字渲染尤为出色,真实感和美学表现媲美国际顶尖模型,被誉为“最值得关注的开源生图模型之一”。
510 4
|
6天前
|
机器学习/深度学习 人工智能 数据可视化
1秒生图!6B参数如何“以小博大”生成超真实图像?
Z-Image是6B参数开源图像生成模型,仅需16GB显存即可生成媲美百亿级模型的超真实图像,支持中英双语文本渲染与智能编辑,登顶Hugging Face趋势榜,首日下载破50万。
421 19
|
13天前
|
数据采集 人工智能 自然语言处理
Meta SAM3开源:让图像分割,听懂你的话
Meta发布并开源SAM 3,首个支持文本或视觉提示的统一图像视频分割模型,可精准分割“红色条纹伞”等开放词汇概念,覆盖400万独特概念,性能达人类水平75%–80%,推动视觉分割新突破。
835 59
Meta SAM3开源:让图像分割,听懂你的话
|
3天前
|
弹性计算 网络协议 Linux
阿里云ECS云服务器详细新手购买流程步骤(图文详解)
新手怎么购买阿里云服务器ECS?今天出一期阿里云服务器ECS自定义购买流程:图文全解析,阿里云服务器ECS购买流程图解,自定义购买ECS的设置选项是最复杂的,以自定义购买云服务器ECS为例,包括付费类型、地域、网络及可用区、实例、镜像、系统盘、数据盘、公网IP、安全组及登录凭证详细设置教程:
185 114
|
10天前
|
人工智能 前端开发 算法
大厂CIO独家分享:AI如何重塑开发者未来十年
在 AI 时代,若你还在紧盯代码量、执着于全栈工程师的招聘,或者仅凭技术贡献率来评判价值,执着于业务提效的比例而忽略产研价值,你很可能已经被所谓的“常识”困住了脚步。
485 42
大厂CIO独家分享:AI如何重塑开发者未来十年