MajorRAG聊天问答系统实现分析(3/3)

本文涉及的产品
交互式建模 PAI-DSW,每月250计算时 3个月
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
模型训练 PAI-DLC,100CU*H 3个月
简介: 一个RAG项目,全文共三个部分:MajorRAG概述、MajorRAG文件内容提取实现分析、MajorRAG聊天问答系统实现分析。1)第一次做RAG,欢迎带着指导意见评论2)希望指出不足时可以附带替换方法博客地址:https://zhangcraigxg.github.io

MajorRAG 聊天问答系统实现分析

一、API入口和路由

1.1 路由定义

文件位置: app/routes/ChatRoutes.py

主要路由

路由 方法 功能 认证
/api/chat/sessions GET 获取历史会话列表 必需
/api/chat/sessions POST 创建新会话 必需
/api/chat/sessions/<id> PUT 更新会话标题 必需
/api/chat/sessions/<id> DELETE 删除会话 必需
/api/chat/sessions/<id>/content GET 获取会话内容 必需
/api/chat/sessions/<id>/content POST 流式问答入口 必需

1.2 流式响应入口

核心代码: ChatRoutes.py 第234-289行

@chat_bp.route('/sessions/<int:session_id>/content', methods=['POST'])
def add_chat_content(session_id: int):
    """
    实现Server-Sent Events (SSE)流式响应

    接收参数:
    - session_id: 会话ID
    - content: 用户输入内容(JSON格式)

    返回:SSE格式的流式响应
    """

    def generate_sse_response():
        """SSE生成器函数"""
        try:
            # 调用ChatService的add_chat_content生成器
            # 这个生成器会逐步yield每个处理步骤的数据
            for step_data in chat_service.add_chat_content(session_id, content):
                # 格式化为SSE格式: data: {json}\n\n
                sse_data = f"data: {json.dumps(step_data, ensure_ascii=False)}\n\n"
                yield sse_data

        except Exception as e:
            logger.error(f"SSE流式响应生成失败: {e}")
            error_data = {
   
                'step': 'error',
                'status': 'error',
                'message': f'处理失败: {str(e)}'
            }
            yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"

    return Response(
        generate_sse_response(),
        mimetype='text/event-stream',
        headers={
   
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive',
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Headers': 'Cache-Control'
        }
    )

二、RAG完整处理流程(9步)

2.1 流程概览

核心文件: app/service/ChatService.py(1415行代码)

聊天问答的核心在 ChatService.add_chat_content() 方法中实现,采用9步处理流程:

用户输入
  ↓
Step 0: 创建系统上下文
  ↓
Step 1: 保存用户消息到chat_content_history
  ↓
Step 1.1: 保存用户输入到chat_context
  ↓
Step 2: 意图分析(调用DeepSeek)
  ├─→ 返回JSON: plan、action、queries、rewrite
  │
  ├→ action="retrieve_vector"(知识库检索)
  │   ↓
  │   Step 3: 向量检索(Milvus + BGE-M3)
  │   ↓
  │   Step 4: BM25全文检索(OpenSearch)
  │   ↓
  │   Step 5: RRF结果合并
  │   ↓
  │   Step 6: 补齐元数据
  │   ↓
  │   Step 7: 生成答案(DeepSeek)
  │   ↓
  │   Step 8: 保存AI响应到chat_content_history
  │   ↓
  │   Step 9: 保存AI上下文到chat_context
  │
  ├→ action="answer_chat"(多轮对话)
  │   ↓
  │   调用ChatDeepSeekService进行多轮对话
  │   ↓
  │   Step 8: 保存AI响应
  │   ↓
  │   Step 9: 保存AI上下文
  │
  └→ action="clarify"(需要澄清)
      ↓
      保存澄清问题和上下文

2.2 详细步骤说明

Step 0: 创建系统上下文

方法: _step_0_check_and_create_system_context(session_id)

def _step_0_check_and_create_system_context(self, session_id: int):
    """
    为会话创建第一条系统提示词记录

    目的:
    - 确保意图识别有正确的系统提示词
    - 只在会话第一次使用时创建

    实现:
    1. 检查chat_context中是否已有type='routes'的记录
    2. 如果没有,创建系统提示词记录
    3. 保存到chat_context表
    """
    # 检查是否已存在系统上下文
    check_sql = """
        SELECT COUNT(*) FROM chat_context
        WHERE chat_id = :session_id AND type = 'routes'
    """

    if count == 0:
        # 构建系统提示词
        system_prompt = self.deepseek_service._build_intent_prompt()
        system_context = {
   
            "role": "system",
            "content": system_prompt
        }

        # 保存到chat_context
        insert_sql = """
            INSERT INTO chat_context (chat_id, context, type)
            VALUES (:chat_id, :context, :type)
        """

输出: 无流式输出


Step 1: 保存用户消息

方法: _step_1_save_user_message(session_id, content)

def _step_1_save_user_message(self, session_id: int, content: str):
    """
    保存用户消息到chat_content_history表

    消息格式(JSON):
    {
        "session_id": str(session_id),
        "user_id": "user",
        "content": content,
        "created_at": "2025-01-01T12:00:00"
    }
    """
    chat_content = {
   
        'session_id': str(session_id),
        'user_id': 'user',
        'content': content,
        'created_at': datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
    }

    insert_sql = """
        INSERT INTO chat_content_history (chat_id, chat_content)
        VALUES (:chat_id, :chat_content)
    """

    # 同时更新会话的chat_time
    update_sql = """
        UPDATE chat_history
        SET chat_time = NOW()
        WHERE id = :session_id
    """

    yield {
   
        'step': 1,
        'step_name': '保存用户消息',
        'status': 'completed',
        'message': '用户消息已保存'
    }

Step 1.1: 保存用户输入到chat_context

方法: _step_1_1_save_user_context(session_id, content)

def _step_1_1_save_user_context(self, session_id: int, content: str):
    """
    保存用户输入到chat_context表,用于后续的上下文对话

    上下文格式(JSON):
    {
        "role": "user",
        "content": content
    }

    type: 'chat'(区别于'routes'的系统提示词)
    """
    user_context = {
   
        "role": "user",
        "content": content
    }

    insert_sql = """
        INSERT INTO chat_context (chat_id, context, type)
        VALUES (:chat_id, :context, :type)
    """

输出: 无流式输出


Step 2: 意图分析(关键步骤)

方法: _step_2_query_understanding(session_id, content)

def _step_2_query_understanding(self, session_id: int, content: str):
    """
    调用MeanDeepSeekService进行意图识别

    返回JSON结构:
    {
        "intent": "chat" | "kb_retrieval" | "clarify",
        "confidence": 0.72,
        "queries": ["TL-SE2109", "端口隔离"],
        "rewrite": "TL-SE2109 端口隔离配置方法",
        "followup": null,
        "plan": {
            "action": "retrieve_vector" | "answer_chat" | "clarify",
            "args": {"queries": [...], "k": 6}
        }
    }
    """
    yield {
   
        'step': 2,
        'step_name': '意图分析',
        'status': 'processing',
        'message': '正在分析用户意图...'
    }

    # 调用MeanDeepSeekService
    mean_result = self.mean_deepseek_service.get_mean(session_id, content)

    # 解析JSON结果
    try:
        plan_data = json.loads(mean_result)
        action = plan_data.get('plan', {
   }).get('action', 'answer_chat')
        queries = plan_data.get('queries', [])
        rewrite = plan_data.get('rewrite', content)
    except json.JSONDecodeError:
        # JSON解析失败,使用默认值
        action = 'answer_chat'
        queries = [content]
        rewrite = content

    yield {
   
        'step': 2,
        'step_name': '意图分析',
        'status': 'completed',
        'message': f'意图分析完成,动作: {action}'
    }

    return action, queries, rewrite

流式输出:

# 开始
{
   'step': 2, 'step_name': '意图分析', 'status': 'processing', ...}

# 完成
{
   'step': 2, 'step_name': '意图分析', 'status': 'completed', ...}

Step 3: 向量检索

方法: _step_3_vector_search(queries, k, user_id)

文件位置: app/service/ChatService.py 第711-797行

def _step_3_vector_search(self, queries: list, k: int = 6,
                          user_id: str = 'default') -> Generator:
    """
    向量检索流程:
    1. 使用BGE-M3模型编码query为1024维向量
    2. 在Milvus中执行COSINE相似度搜索
    3. 添加user_id过滤条件确保数据隔离
    4. 返回topK个相似的chunks

    参数:
    - queries: 关键词列表(来自意图分析)
    - k: 每个query返回的结果数量
    - user_id: 用户ID(用于数据隔离)

    返回:
    [
        {
            'content_id': int,
            'file_id': int,
            'title_id': int,
            'content': str,
            'content_type': str,
            'type': str,
            'score': float,  # 相似度分数
            'sources': [{'type': 'vector', 'rank': 1, ...}]
        },
        ...
    ]
    """
    yield {
   
        'step': 3,
        'step_name': '向量检索',
        'status': 'processing',
        'message': f'正在进行向量检索,查询数: {len(queries)}'
    }

    all_vector_results = []

    for query in queries:
        # 1. 使用BGE模型进行向量化
        query_vector = self.vector_manager.model.encode(
            query,
            normalize_embeddings=True,
            convert_to_tensor=False
        ).tolist()  # 转换为list格式

        # 2. 确保collection已加载
        self.vector_manager.collection.load()

        # 3. 在Milvus中搜索
        search_params = {
   
            "metric_type": "COSINE",  # 余弦相似度
            "params": {
   "nprobe": 10}   # 搜索聚类数
        }

        # 4. 添加user_id过滤条件
        expr = f'user_id == "{user_id}"'

        search_results = self.vector_manager.collection.search(
            data=[query_vector],
            anns_field="vector",
            param=search_params,
            limit=k,
            expr=expr,  # 用户隔离
            output_fields=["user_id", "file_id", "title_id", "content_id",
                          "type", "content", "content_type"]
        )

        # 5. 处理搜索结果
        for hits in search_results:
            for hit in hits:
                result = {
   
                    'content_id': hit.entity.get('content_id'),
                    'file_id': hit.entity.get('file_id'),
                    'title_id': hit.entity.get('title_id'),
                    'content': hit.entity.get('content'),
                    'content_type': hit.entity.get('content_type'),
                    'type': hit.entity.get('type'),
                    'score': hit.score
                }
                all_vector_results.append(result)

    yield {
   
        'step': 3,
        'step_name': '向量检索',
        'status': 'completed',
        'message': f'向量检索完成,找到 {len(all_vector_results)} 个结果'
    }

    return all_vector_results

向量管理器: utils/bge/Vector_bge_1024_Manager.py

class Vector_bge_1024_Manager:
    def __init__(self):
        # 模型配置
        self.model_name = "BAAI/bge-m3"
        self.dimension = 1024

        # 加载BGE模型
        self.model = SentenceTransformer(self.model_name)

        # 连接Milvus
        self.collection_name = "major_bge_1024_collection"
        self.collection = Collection(self.collection_name)

流式输出:

# 开始
{
   'step': 3, 'status': 'processing', 'message': '正在进行向量检索...'}

# 完成
{
   'step': 3, 'status': 'completed', 'message': '向量检索完成,找到 6 个结果'}

Step 4: BM25全文检索

方法: _step_4_bm25_search(rewrite, k, user_id)

文件位置: app/service/ChatService.py 第799-937行

def _step_4_bm25_search(self, rewrite: str, k: int = 12,
                        user_id: str = 'default') -> Generator:
    """
    BM25全文检索流程:
    1. 使用OpenSearch进行BM25搜索
    2. 搜索所有内容索引(major_pdf, major_word, major_excel等)
    3. 使用ik_max_word分析器进行中文分词
    4. 添加user_id过滤确保数据隔离
    5. 返回topK个相关文档

    参数:
    - rewrite: 重写的查询语句(来自意图分析)
    - k: 返回的结果数量
    - user_id: 用户ID(用于数据隔离)

    返回:
    [
        {
            'content_id': int,
            'file_id': int,
            'title_id': int,
            'content': str,
            'type': str,
            'score': float,  # BM25分数
            'sources': [{'type': 'bm25', 'rank': 1, ...}]
        },
        ...
    ]
    """
    yield {
   
        'step': 4,
        'step_name': 'BM25检索',
        'status': 'processing',
        'message': '正在进行全文检索...'
    }

    # 要搜索的索引列表
    search_indices = [
        'major_pdf',
        'major_word',
        'major_excel',
        'major_ppt',
        'major_txt'
    ]

    # 构建BM25搜索查询
    search_query = {
   
        "query": {
   
            "bool": {
   
                "must": [
                    {
   
                        "term": {
   
                            "user_id": user_id  # 用户隔离
                        }
                    }
                ],
                "should": [
                    {
   
                        "match": {
   
                            "content": {
   
                                "query": rewrite,
                                "analyzer": "ik_max_word",  # 中文分词
                                "boost": 1.0
                            }
                        }
                    }
                ],
                "minimum_should_match": 1
            }
        },
        "size": k,
        "_source": ["user_id", "file_id", "title_id", "content_id",
                   "type", "content"],
        "sort": [
            {
   "_score": {
   "order": "desc"}}
        ]
    }

    all_bm25_results = []

    # 在所有内容索引中搜索
    for index_name in search_indices:
        try:
            response = client.search(index=index_name, body=search_query)
            hits = response.get('hits', {
   }).get('hits', [])

            for hit in hits:
                source = hit.get('_source', {
   })
                result = {
   
                    'content_id': source.get('content_id'),
                    'file_id': source.get('file_id'),
                    'title_id': source.get('title_id'),
                    'content': source.get('content'),
                    'type': source.get('type'),
                    'score': hit.get('_score', 0.0)
                }
                all_bm25_results.append(result)

        except Exception as e:
            logger.warning(f"索引 {index_name} 搜索失败: {e}")
            continue

    yield {
   
        'step': 4,
        'step_name': 'BM25检索',
        'status': 'completed',
        'message': f'全文检索完成,找到 {len(all_bm25_results)} 个结果'
    }

    return all_bm25_results

OpenSearch配置: config/db.yaml

opensearch:
  host: localhost
  port: 9200
  user: admin
  password: admin
  indices:
    - major_files   # 文件索引(不用于内容检索)
    - major_pdf     # PDF文档索引
    - major_word    # Word文档索引
    - major_excel   # Excel文档索引
    - major_ppt     # PPT文档索引
    - major_txt     # TXT文档索引

流式输出:

# 开始
{
   'step': 4, 'status': 'processing', 'message': '正在进行全文检索...'}

# 完成
{
   'step': 4, 'status': 'completed', 'message': '全文检索完成,找到 12 个结果'}

Step 5: RRF结果合并

方法: _step_5_merge_results(vector_results, bm25_results, k)

文件位置: app/service/ChatService.py 第939-1043行

def _step_5_merge_results(self, vector_results, bm25_results, k: int = 10):
    """
    使用RRF(Reciprocal Rank Fusion)合并两种检索结果

    RRF算法:
    - 公式:RRF分数 = 1 / (rank + rrf_constant)
    - rrf_constant通常取60(论文推荐值)
    - 优势:不依赖于原始分数,只依赖排名

    参数:
    - vector_results: 向量检索结果
    - bm25_results: BM25检索结果
    - k: 最终返回的结果数量

    返回:
    [
        {
            'content_id': int,
            'file_id': int,
            'content': str,
            'rrf_score': float,      # RRF合并分数
            'final_rank': int,        # 最终排名
            'sources': [              # 来源信息
                {'type': 'vector', 'rank': 1, 'score': 0.85, 'rrf_score': 0.016},
                {'type': 'bm25', 'rank': 3, 'score': 12.5, 'rrf_score': 0.015}
            ]
        },
        ...
    ]
    """
    yield {
   
        'step': 5,
        'step_name': '结果合并',
        'status': 'processing',
        'message': '正在合并检索结果...'
    }

    rrf_constant = 60
    merged_scores = {
   }      # {content_id: rrf_score}
    result_details = {
   }     # {content_id: result_with_sources}

    # 处理向量检索结果
    for rank, result in enumerate(vector_results):
        content_id = result.get('content_id')

        # 计算RRF分数
        rrf_score = 1.0 / (rank + 1 + rrf_constant)

        if content_id not in merged_scores:
            merged_scores[content_id] = 0.0
            result_details[content_id] = result.copy()
            result_details[content_id]['sources'] = []

        # 累加RRF分数
        merged_scores[content_id] += rrf_score

        # 记录来源
        result_details[content_id]['sources'].append({
   
            'type': 'vector',
            'rank': rank + 1,
            'score': result.get('score', 0.0),
            'rrf_score': rrf_score
        })

    # 处理BM25结果(同样计算RRF分数)
    for rank, result in enumerate(bm25_results):
        content_id = result.get('content_id')

        # 计算RRF分数
        rrf_score = 1.0 / (rank + 1 + rrf_constant)

        if content_id not in merged_scores:
            merged_scores[content_id] = 0.0
            result_details[content_id] = result.copy()
            result_details[content_id]['sources'] = []

        # 累加RRF分数
        merged_scores[content_id] += rrf_score

        # 记录来源
        result_details[content_id]['sources'].append({
   
            'type': 'bm25',
            'rank': rank + 1,
            'score': result.get('score', 0.0),
            'rrf_score': rrf_score
        })

    # 按RRF分数排序取前K个
    sorted_results = sorted(
        merged_scores.items(),
        key=lambda x: x[1],
        reverse=True
    )

    final_results = []
    for i, (content_id, rrf_score) in enumerate(sorted_results[:k]):
        result = result_details[content_id].copy()
        result['final_rank'] = i + 1
        result['rrf_score'] = rrf_score
        final_results.append(result)

    yield {
   
        'step': 5,
        'step_name': '结果合并',
        'status': 'completed',
        'message': f'结果合并完成,返回前 {len(final_results)} 个结果'
    }

    return final_results

RRF示例:

假设有两个结果:
- 向量检索:content_id=123 排名第1
- BM25检索:content_id=123 排名第3

RRF计算:
- vector RRF = 1/(1+60) = 0.0164
- bm25 RRF = 1/(3+60) = 0.0159
- 总RRF = 0.0164 + 0.0159 = 0.0323

如果content_id=456在向量检索中排名第5,在BM25中没有出现:
- vector RRF = 1/(5+60) = 0.0154
- 总RRF = 0.0154

则content_id=123的最终排名会高于content_id=456

流式输出:

# 开始
{
   'step': 5, 'status': 'processing', 'message': '正在合并检索结果...'}

# 完成
{
   'step': 5, 'status': 'completed', 'message': '结果合并完成,返回前 10 个结果'}

Step 6: 元数据补齐

方法: _step_6_answer_completion(final_results)

文件位置: app/service/ChatService.py 第1045-1168行

def _step_6_answer_completion(self, final_results):
    """
    从MySQL数据库补齐元数据:
    1. 获取file_id对应的file_type
    2. 根据file_type确定查询的表名
    3. 从对应的content表查询image_path, title_content, original_filename
    4. 补充到最终结果中

    参数:
    - final_results: RRF合并后的结果列表

    返回:
    [
        {
            'content_id': int,
            'file_id': int,
            'content': str,
            'rrf_score': float,
            'image_path': str,           # 新增:图片路径
            'title_content': str,        # 新增:标题内容
            'original_filename': str     # 新增:原始文件名
        },
        ...
    ]
    """
    yield {
   
        'step': 6,
        'step_name': '补齐元数据',
        'status': 'processing',
        'message': '正在补齐元数据...'
    }

    # 1) 收集所有file_id
    file_ids = set()
    for result in final_results:
        file_id = result.get('file_id')
        if file_id is not None:
            file_ids.add(file_id)

    if not file_ids:
        return final_results

    # 2) 从files表查询file_type
    placeholders = ','.join([':id' + str(i) for i in range(len(file_ids))])
    query_sql = f"""
        SELECT id as file_id, file_type
        FROM files
        WHERE id IN ({placeholders})
    """

    params = {
   f'id{i}': fid for i, fid in enumerate(file_ids)}
    result = self.mysql_manager.execute_query(query_sql, params)

    # 构建file_type映射
    file_type_mapping = {
   }
    for row in result:
        file_type_mapping[row[0]] = row[1]

    # 3) 根据file_type查询对应的content表
    enhanced_results = []

    for result in final_results:
        enhanced_result = result.copy()

        file_id = result.get('file_id')
        content_id = result.get('content_id')

        if file_id in file_type_mapping and content_id:
            file_type = file_type_mapping[file_id]
            table_name = self._get_table_name_from_file_type(file_type)

            if table_name:
                # 查询元数据
                metadata_sql = f"""
                    SELECT image_path, title_content, original_filename
                    FROM {table_name}
                    WHERE content_id = :content_id
                    LIMIT 1
                """

                metadata_result = self.mysql_manager.execute_query(
                    metadata_sql,
                    {
   'content_id': content_id}
                )

                if metadata_result:
                    metadata_row = metadata_result[0]
                    enhanced_result['image_path'] = metadata_row[0]
                    enhanced_result['title_content'] = metadata_row[1]
                    enhanced_result['original_filename'] = metadata_row[2]

        enhanced_results.append(enhanced_result)

    yield {
   
        'step': 6,
        'step_name': '补齐元数据',
        'status': 'completed',
        'message': '元数据补齐完成'
    }

    return enhanced_results

def _get_table_name_from_file_type(self, file_type: str) -> str:
    """
    文件类型到表名映射
    """
    file_type_to_table = {
   
        'pdf': 'pdf_content',
        'ppt': 'ppt_content',
        'pptx': 'ppt_content',
        'doc': 'word_content',
        'docx': 'word_content',
        'xls': 'excel_content',
        'xlsx': 'excel_content',
        'txt': 'txt_content',
        'md': 'txt_content'
    }
    return file_type_to_table.get(file_type.lower())

流式输出:

# 开始
{
   'step': 6, 'status': 'processing', 'message': '正在补齐元数据...'}

# 完成
{
   'step': 6, 'status': 'completed', 'message': '元数据补齐完成'}

Step 7: 答案生成

方法: _step_7_generate_answer(session_id, content, mysql_final_results)

def _step_7_generate_answer(self, session_id: int, content: str,
                            mysql_final_results: list):
    """
    基于检索结果生成答案

    调用AnswerDeepSeekService生成块级JSON格式的答案
    """
    yield {
   
        'step': 7,
        'step_name': '生成答案',
        'status': 'processing',
        'message': '正在生成AI回复...'
    }

    # 调用AnswerDeepSeekService
    for answer_step in self.answer_deepseek_service.generate_answer(
        session_id, content, mysql_final_results
    ):
        yield answer_step

    yield {
   
        'step': 7,
        'step_name': '生成答案',
        'status': 'completed',
        'message': '答案生成完成'
    }

流式输出:

# 开始
{
   'step': 7, 'status': 'processing', 'message': '正在生成AI回复...'}

# 块级推送(持续多次)
{
   'step': 8, 'status': 'processing', 'message': '{"t":"block_start",...}'}
{
   'step': 8, 'status': 'processing', 'message': '{"t":"delta",...}'}
...

# 完成
{
   'step': 7, 'status': 'completed', 'message': '答案生成完成'}

Step 8: 保存AI响应

方法: _step_8_save_ai_response(session_id, ai_answer)

def _step_8_save_ai_response(self, session_id: int, ai_answer: str):
    """
    保存AI响应到chat_content_history表

    消息格式(JSON):
    {
        "session_id": str(session_id),
        "user_id": "assistant",
        "content": ai_answer,
        "created_at": "2025-01-01T12:00:00"
    }
    """
    chat_content = {
   
        'session_id': str(session_id),
        'user_id': 'assistant',
        'content': ai_answer,
        'created_at': datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
    }

    insert_sql = """
        INSERT INTO chat_content_history (chat_id, chat_content)
        VALUES (:chat_id, :chat_content)
    """

    # 同时更新会话的chat_time
    update_sql = """
        UPDATE chat_history
        SET chat_time = NOW()
        WHERE id = :session_id
    """

输出: 无流式输出


Step 9: 保存AI上下文

方法: _step_9_save_ai_context(session_id, ai_answer)

def _step_9_save_ai_context(self, session_id: int, ai_answer: str):
    """
    保存AI上下文到chat_context表

    处理流程:
    1. 从块级JSON中提取纯文本内容
    2. 格式化为上下文JSON
    3. 保存到chat_context,type='chat'

    上下文格式(JSON):
    {
        "role": "assistant",
        "content": "提取的纯文本内容"
    }
    """
    # 从块级JSON中提取内容
    extracted_content = self._extract_content_from_stream_json(ai_answer)

    ai_context = {
   
        "role": "assistant",
        "content": extracted_content
    }

    insert_sql = """
        INSERT INTO chat_context (chat_id, context, type)
        VALUES (:chat_id, :context, :type)
    """

输出: 无流式输出


三、LLM模型和调用方式

3.1 模型配置

配置文件: config/model.yaml

# 三个独立的DeepSeek模型配置

# 1. 意图识别模型(temperature=0.3,更保守)
intent_models:
  deepseek:
    api_key: "sk-xxx"
    base_url: "https://api.deepseek.com/v1"
    model_name: "deepseek-chat"
    max_tokens: 4096
    temperature: 0.3      # 更保守,保证意图识别准确
    stream: true

# 2. 对话模型(temperature=0.7,更灵活)
chat_models:
  deepseek:
    api_key: "sk-xxx"
    base_url: "https://api.deepseek.com/v1"
    model_name: "deepseek-chat"
    max_tokens: 4096
    temperature: 0.7      # 更灵活,支持创造性对话
    stream: true

# 3. 答案生成模型(temperature=0.3,保证准确性)
answer_models:
  deepseek:
    api_key: "sk-xxx"
    base_url: "https://api.deepseek.com/v1"
    model_name: "deepseek-chat"
    max_tokens: 4096
    temperature: 0.3      # 保证答案准确性
    stream: true

3.2 三个核心服务类

A. MeanDeepSeekService(意图识别)

文件位置: app/service/models/MeanDeepSeekService.py

class MeanDeepSeekService:
    def __init__(self):
        """从config/model.yaml加载intent_models配置"""
        self.api_key = config['intent_models']['deepseek']['api_key']
        self.base_url = config['intent_models']['deepseek']['base_url']
        self.model_name = config['intent_models']['deepseek']['model_name']
        self.max_tokens = config['intent_models']['deepseek']['max_tokens']
        self.temperature = config['intent_models']['deepseek']['temperature']
        self.stream = config['intent_models']['deepseek']['stream']

    def get_mean(self, session_id: int, user_input: str) -> str:
        """
        意图识别和动作规划

        流程:
        1. 获取chat_context中的所有上下文消息
        2. 调用DeepSeek API进行流式识别
        3. 收集响应内容(JSON格式的plan)
        4. 返回JSON字符串给ChatService

        返回JSON格式:
        {
            "intent": "chat" | "kb_retrieval" | "clarify",
            "confidence": 0.72,
            "queries": ["关键词1", "关键词2"],
            "rewrite": "重写的查询语句",
            "followup": null,
            "plan": {
                "action": "retrieve_vector" | "answer_chat" | "clarify",
                "args": {"queries": [...], "k": 6}
            }
        }
        """
        # 1. 获取上下文消息
        messages = self._get_messages_from_context(session_id)

        # 2. 添加当前用户输入
        messages.append({
   
            "role": "user",
            "content": user_input
        })

        # 3. 调用DeepSeek API
        data = {
   
            'model': self.model_name,
            'messages': messages,
            'max_tokens': self.max_tokens,
            'temperature': self.temperature,
            'stream': self.stream
        }

        headers = {
   
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        }

        response = requests.post(
            f'{self.base_url}/chat/completions',
            json=data,
            headers=headers,
            stream=True
        )

        # 4. 处理SSE流式响应
        collected_content = ""
        for line in response.iter_lines():
            if line:
                line = line.decode('utf-8')
                if line.startswith('data: '):
                    data_str = line[6:]  # 移除'data: '前缀

                    if data_str.strip() == '[DONE]':
                        break

                    try:
                        data_obj = json.loads(data_str)
                        choices = data_obj.get('choices', [])
                        if choices:
                            delta = choices[0].get('delta', {
   })
                            content = delta.get('content', '')
                            collected_content += content
                    except json.JSONDecodeError:
                        continue

        return collected_content  # 完整的JSON字符串

    def _build_intent_prompt(self) -> str:
        """
        构建意图识别系统提示词
        """
        prompt = """你是一个"意图路由与动作计划生成器",任务是:

1. 从用户输入和历史对话中识别用户意图(intent):
   - "chat":闲聊、解释、不需要知识库的对话
   - "kb_retrieval":需要从知识库检索信息
   - "clarify":问题模糊,需要澄清

2. 给出信心分数(confidence):0.0–1.0

3. 输出queries(最小必要关键词,1–5个)用于向量化
   - 提取核心概念、型号、关键词
   - 去除无用词("怎么"、"如何"等)

4. 输出rewrite(一句话的自然语言检索串)用于BM25
   - 保留完整语义
   - 适合全文搜索

5. 输出plan,包含action和args:
   - action可选:
     * "retrieve_vector":执行知识库检索
     * "answer_chat":直接对话回答
     * "clarify":请求用户澄清
   - args参数:{"queries": [...], "k": 6}

输出必须是严格JSON格式,不要包含任何解释或额外文本。

示例输出:
{
    "intent": "kb_retrieval",
    "confidence": 0.95,
    "queries": ["TL-SE2109", "端口隔离"],
    "rewrite": "TL-SE2109 端口隔离配置方法",
    "followup": null,
    "plan": {
        "action": "retrieve_vector",
        "args": {"queries": ["TL-SE2109", "端口隔离"], "k": 6}
    }
}
"""
        return prompt

B. ChatDeepSeekService(多轮对话)

文件位置: app/service/models/ChatDeepSeekService.py

class ChatDeepSeekService:
    def __init__(self):
        """从config/model.yaml加载chat_models配置"""
        self.api_key = config['chat_models']['deepseek']['api_key']
        self.base_url = config['chat_models']['deepseek']['base_url']
        self.model_name = config['chat_models']['deepseek']['model_name']
        self.max_tokens = config['chat_models']['deepseek']['max_tokens']
        self.temperature = config['chat_models']['deepseek']['temperature']
        self.stream = config['chat_models']['deepseek']['stream']

    def chat(self, session_id: int, user_input: str, mode: str = 'with_kb'):
        """
        多轮对话

        用于非检索相关的问题回答
        支持多轮对话,带有知识库上下文或纯聊天

        参数:
        - session_id: 会话ID
        - user_input: 用户输入
        - mode: 'with_kb'(带知识库)或 'pure_chat'(纯聊天)

        返回:块级JSON流式数据
        """
        # 1. 获取历史上下文
        history_messages = self._get_messages_from_context(session_id)

        # 2. 构建聊天提示词
        format_prompt = self._build_chat_prompt(user_input)

        # 3. 调用DeepSeek进行聊天
        for chat_block in self._call_deepseek_for_chat(
            history_messages, format_prompt
        ):
            yield chat_block

    def _build_chat_prompt(self, user_input: str) -> str:
        """
        构建聊天输出格式提示词
        """
        prompt = f"""用户问题:{user_input}

输出JSON格式要求:
1)输出格式必须是json格式,不要使用```json```代码块包裹
2)你的回答需要严格按照以下示例构造json:

[
  {
   {"t":"block_start", "id":"b1", "kind":"text"}},
  {
   {"t":"delta", "id":"b1", "text":"文本内容\\n"}},
  {
   {"t":"block_end", "id":"b1"}},

  {
   {"t":"block_start", "id":"b2", "kind":"table", "columns":["列1","列2"]}},
  {
   {"t":"delta", "id":"b2", "row":["值1","值2"]}},
  {
   {"t":"delta", "id":"b2", "row":["值3","值4"]}},
  {
   {"t":"block_end", "id":"b2"}},

  {
   {"t":"block_start", "id":"b3", "kind":"image", "alt":"图片描述"}},
  {
   {"t":"block_update", "id":"b3", "url":"images/xxx.jpg", "width":960, "height":540}},
  {
   {"t":"block_end", "id":"b3"}}
]

3)kind类型说明:
   - "text":文本块,用delta推送text内容
   - "table":表格块,需要先定义columns,然后用delta推送每一行row
   - "image":图片块,用block_update推送url、width、height

4)每个内容块都要有block_start和block_end配对
5)所有JSON对象必须在同一行,不要换行
6)id必须唯一,建议使用b1, b2, b3...格式
"""
        return prompt

    def _call_deepseek_for_chat(self, history_messages: list, prompt: str):
        """
        调用DeepSeek进行聊天,返回块级JSON
        """
        messages = history_messages + [{
   "role": "user", "content": prompt}]

        data = {
   
            'model': self.model_name,
            'messages': messages,
            'max_tokens': self.max_tokens,
            'temperature': self.temperature,
            'stream': self.stream
        }

        # 调用API并处理流式响应(同AnswerDeepSeekService)
        # 返回块级JSON格式的数据
        # ...

C. AnswerDeepSeekService(答案生成)

文件位置: app/service/models/AnswerDeepSeekService.py

class AnswerDeepSeekService:
    def __init__(self):
        """从config/model.yaml加载answer_models配置"""
        self.api_key = config['answer_models']['deepseek']['api_key']
        self.base_url = config['answer_models']['deepseek']['base_url']
        self.model_name = config['answer_models']['deepseek']['model_name']
        self.max_tokens = config['answer_models']['deepseek']['max_tokens']
        self.temperature = config['answer_models']['deepseek']['temperature']
        self.stream = config['answer_models']['deepseek']['stream']

    def generate_answer(self, session_id: int, user_input: str,
                       mysql_final_results: List[Dict]):
        """
        基于检索结果生成答案

        参数:
        - session_id: 会话ID
        - user_input: 用户问题
        - mysql_final_results: 检索到的参考资料列表

        返回:块级JSON格式的答案
        """
        # 1. 获取历史上下文
        history_messages = self._get_messages_from_context(session_id)

        # 2. 构建答案生成提示词
        prompt = self._build_answer_prompt(user_input, mysql_final_results)

        # 3. 调用DeepSeek生成答案
        for answer_block in self._call_deepseek_for_answer(
            history_messages, prompt
        ):
            yield answer_block

    def _build_answer_prompt(self, user_input: str,
                            mysql_final_results: List[Dict]) -> str:
        """
        构建答案生成提示词
        """
        prompt = f"""你是一个"基于检索证据作答"的中文助手。

请基于以下参考资料,针对用户的问题提供准确、详细的回答:

用户问题:{user_input}

参考资料:
{json.dumps(mysql_final_results, ensure_ascii=False, indent=2)}

要求:
1. 首先基于参考资料逐条分析:
   - 该条参考资料的展示:
     * 若是文本:用Markdown的### 标签开始
     * 若是图片:用url:image_path格式
   - 该条参考资料的出处:
     格式:引用【original_filename】标题:title_content
   - content和用户问题的关联

2. 然后基于用户问题和逐条分析的结果,给出归纳后的答案:
   - 归纳后的答案
   - 归纳后的答案出处,按文档、标题分组
   - 如果参考资料中没有相关信息,明确说明

3. 输出必须是JSON格式,格式同ChatDeepSeekService的块级JSON

示例输出:
[
  {
   {"t":"block_start", "id":"b1", "kind":"text"}},
  {
   {"t":"delta", "id":"b1", "text":"### 参考资料分析\\n"}},
  {
   {"t":"delta", "id":"b1", "text":"1. 引用【设备手册.pdf】标题:端口配置\\n"}},
  {
   {"t":"delta", "id":"b1", "text":"   配置步骤如下...\\n"}},
  {
   {"t":"block_end", "id":"b1"}},

  {
   {"t":"block_start", "id":"b2", "kind":"text"}},
  {
   {"t":"delta", "id":"b2", "text":"### 归纳答案\\n"}},
  {
   {"t":"delta", "id":"b2", "text":"端口隔离的配置方法...\\n"}},
  {
   {"t":"block_end", "id":"b2"}},

  {
   {"t":"block_start", "id":"b3", "kind":"image", "alt":"配置界面"}},
  {
   {"t":"block_update", "id":"b3", "url":"images/config.jpg", "width":800, "height":600}},
  {
   {"t":"block_end", "id":"b3"}}
]
"""
        return prompt

    def _call_deepseek_for_answer(self, history_messages: list, prompt: str):
        """
        调用DeepSeek生成答案,返回块级JSON

        流程:
        1. 调用API获取流式响应
        2. 解析SSE格式数据
        3. 识别完整的JSON块
        4. 逐块推送给前端
        """
        messages = history_messages + [{
   "role": "user", "content": prompt}]

        data = {
   
            'model': self.model_name,
            'messages': messages,
            'max_tokens': self.max_tokens,
            'temperature': self.temperature,
            'stream': self.stream
        }

        headers = {
   
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        }

        response = requests.post(
            f'{self.base_url}/chat/completions',
            json=data,
            headers=headers,
            stream=True
        )

        # 处理块级推送逻辑
        collected_content = ""
        json_buffer = ""
        waiting_for_complete_json = False
        brace_count = 0

        for line in response.iter_lines():
            if line:
                line = line.decode('utf-8')
                if line.startswith('data: '):
                    data_str = line[6:]

                    if data_str.strip() == '[DONE]':
                        break

                    try:
                        data_obj = json.loads(data_str)
                        choices = data_obj.get('choices', [])
                        if choices:
                            delta = choices[0].get('delta', {
   })
                            content = delta.get('content', '')

                            if content:
                                collected_content += content

                                # 处理块级推送逻辑
                                for char in content:
                                    json_buffer += char

                                    if char == '{':
                                        if not waiting_for_complete_json:
                                            waiting_for_complete_json = True
                                        brace_count += 1
                                    elif char == '}':
                                        brace_count -= 1
                                        if waiting_for_complete_json and brace_count == 0:
                                            # 完整的JSON块接收完毕,可以推送
                                            try:
                                                json_start = json_buffer.rfind('{')
                                                if json_start != -1:
                                                    json_block = json_buffer[json_start:]
                                                    json.loads(json_block)  # 验证JSON

                                                    # 推送给前端
                                                    yield {
   
                                                        'step': 8,
                                                        'step_name': '渲染答案',
                                                        'status': 'processing',
                                                        'message': json_block
                                                    }
                                            except (json.JSONDecodeError, ValueError):
                                                pass
                                            waiting_for_complete_json = False

                    except json.JSONDecodeError:
                        continue

        # 返回完整内容用于保存
        return collected_content

四、聊天历史管理

4.1 数据库表结构

chat_history(会话表)

用途: 存储用户的所有会话

CREATE TABLE chat_history (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,           -- 会话ID
    user_id VARCHAR(100) NOT NULL,                  -- 用户ID
    chat_title VARCHAR(255) NOT NULL,               -- 会话标题
    chat_time DATETIME DEFAULT CURRENT_TIMESTAMP
              ON UPDATE CURRENT_TIMESTAMP,          -- 最后聊天时间
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,  -- 创建时间
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
               ON UPDATE CURRENT_TIMESTAMP,         -- 更新时间
    status TINYINT(1) DEFAULT 1,                    -- 状态(0:删除,1:正常)

    INDEX idx_user_id (user_id),
    INDEX idx_chat_time (chat_time),
    INDEX idx_status (status)
);

chat_content_history(会话消息表)

用途: 存储每条用户和AI的消息

CREATE TABLE chat_content_history (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    chat_id BIGINT NOT NULL,                        -- 关联的会话ID
    chat_content TEXT NOT NULL,                     -- 会话内容(JSON格式)
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
               ON UPDATE CURRENT_TIMESTAMP,

    INDEX idx_chat_id (chat_id),
    INDEX idx_created_at (created_at),
    FOREIGN KEY (chat_id) REFERENCES chat_history(id) ON DELETE CASCADE
);

chat_content格式(JSON):

{
   
    "session_id": "123",
    "user_id": "user" | "assistant",
    "content": "消息内容",
    "created_at": "2025-01-01T12:00:00"
}

chat_context(上下文表)

用途: 存储用于LLM调用的上下文消息

CREATE TABLE chat_context (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    chat_id BIGINT NOT NULL,                        -- 关联的会话ID
    context TEXT NOT NULL,                          -- 上下文(JSON格式)
    type VARCHAR(50) NOT NULL,                      -- 类型: 'routes'或'chat'
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
               ON UPDATE CURRENT_TIMESTAMP,

    INDEX idx_chat_id (chat_id),
    INDEX idx_type (type),
    INDEX idx_created_at (created_at),
    FOREIGN KEY (chat_id) REFERENCES chat_history(id) ON DELETE CASCADE
);

context格式(JSON):

type='routes'(系统提示词):

{
   
    "role": "system",
    "content": "你是一个意图路由与动作计划生成器..."
}

type='chat'(用户消息和AI回复):

{
   
    "role": "user" | "assistant",
    "content": "消息内容"
}

4.2 会话管理API

文件位置: app/routes/ChatRoutes.py

获取会话列表

@chat_bp.route('/sessions', methods=['GET'])
def get_chat_sessions():
    """
    获取用户所有会话,按chat_time倒序

    返回:
    [
        {
            "id": 1,
            "chat_title": "新会话",
            "chat_time": "2025-01-01 12:00:00",
            "created_at": "2025-01-01 10:00:00"
        },
        ...
    ]
    """
    sql = """
        SELECT id, chat_title, chat_time, created_at
        FROM chat_history
        WHERE user_id = :user_id AND status = 1
        ORDER BY chat_time DESC
    """

创建新会话

@chat_bp.route('/sessions', methods=['POST'])
def create_chat_session():
    """
    创建新会话

    参数(可选):
    - chat_title: 会话标题(如果为空,自动生成)

    自动生成规则:
    - 查询用户的会话列表
    - 生成唯一标题:"新会话"、"新会话1"、"新会话2"等

    返回:
    {
        "success": true,
        "session_id": 123,
        "chat_title": "新会话"
    }
    """
    # 如果chat_title为空,生成唯一标题
    if not chat_title:
        chat_title = self._generate_unique_title(user_id)

    insert_sql = """
        INSERT INTO chat_history (user_id, chat_title, chat_time)
        VALUES (:user_id, :chat_title, NOW())
    """

更新会话标题

@chat_bp.route('/sessions/<int:session_id>', methods=['PUT'])
def update_chat_session(session_id: int):
    """
    更新会话标题

    参数:
    - chat_title: 新的标题

    返回:
    {
        "success": true,
        "message": "会话标题已更新"
    }
    """
    update_sql = """
        UPDATE chat_history
        SET chat_title = :chat_title, updated_at = NOW()
        WHERE id = :session_id AND user_id = :user_id
    """

删除会话

@chat_bp.route('/sessions/<int:session_id>', methods=['DELETE'])
def delete_chat_session(session_id: int):
    """
    删除会话(软删除,设置status=0)

    流程:
    1. 设置status=0(软删除)
    2. 级联删除chat_content_history中的内容(外键CASCADE)
    3. 级联删除chat_context中的上下文(外键CASCADE)
    4. 如果用户没有其他会话,自动创建一个新会话

    返回:
    {
        "success": true,
        "message": "会话已删除"
    }
    """
    # 软删除会话
    update_sql = """
        UPDATE chat_history
        SET status = 0, updated_at = NOW()
        WHERE id = :session_id AND user_id = :user_id
    """

    # 检查用户是否还有其他会话
    check_sql = """
        SELECT COUNT(*) FROM chat_history
        WHERE user_id = :user_id AND status = 1
    """

    # 如果没有其他会话,创建一个新会话
    if count == 0:
        create_chat_session()

获取会话内容

@chat_bp.route('/sessions/<int:session_id>/content', methods=['GET'])
def get_chat_content(session_id: int):
    """
    获取会话内容

    返回格式化的消息列表,解析JSON格式

    返回:
    [
        {
            "session_id": "123",
            "user_id": "user",
            "content": "用户问题",
            "created_at": "2025-01-01T12:00:00"
        },
        {
            "session_id": "123",
            "user_id": "assistant",
            "content": "AI回答",
            "created_at": "2025-01-01T12:00:05"
        },
        ...
    ]
    """
    query_sql = """
        SELECT chat_content, created_at
        FROM chat_content_history
        WHERE chat_id = :session_id
        ORDER BY created_at ASC
    """

    # 解析JSON格式的chat_content
    messages = []
    for row in result:
        chat_content = json.loads(row[0])
        messages.append(chat_content)

    return messages

4.3 上下文管理

获取上下文消息(用于LLM调用):

def _get_messages_from_context(self, session_id: int) -> list:
    """
    从chat_context表按created_at顺序获取所有上下文

    返回格式化的messages列表,用于DeepSeek API调用

    返回:
    [
        {"role": "system", "content": "系统提示词"},
        {"role": "user", "content": "用户问题1"},
        {"role": "assistant", "content": "AI回答1"},
        {"role": "user", "content": "用户问题2"},
        ...
    ]
    """
    sql = """
        SELECT context
        FROM chat_context
        WHERE chat_id = :session_id
        ORDER BY created_at ASC
    """

    messages = []
    for row in result:
        context_data = json.loads(row[0])  # 解析JSON
        messages.append(context_data)

    return messages

4.4 内容提取机制

从块级JSON中提取内容用于上下文保存:

文件位置: app/service/ChatService.py 第1263-1413行

def _extract_content_from_stream_json(self, ai_answer: str) -> str:
    """
    从流式JSON数组中提取纯文本内容

    处理逻辑:
    1. 识别block_start和block_end
    2. 提取不同type的内容:
       - 文本块: 从delta中的text字段
       - 表格块: 构建表格文本描述
       - 图片块: 构建图片文本描述
    3. 拼接所有内容

    参数:
    - ai_answer: 块级JSON字符串

    返回:
    提取的纯文本内容(用于保存到chat_context)
    """
    try:
        # 解析JSON数组
        json_data = json.loads(ai_answer)
    except json.JSONDecodeError:
        return ai_answer

    extracted_content = []

    # 当前块的临时变量
    current_text_content = ""
    current_table_columns = []
    current_table_rows = []
    current_image_info = {
   }

    for item in json_data:
        t_type = item.get('t')
        kind = item.get('kind')

        if t_type == 'block_start':
            # 块开始,初始化临时变量
            if kind == 'text':
                current_text_content = ""
            elif kind == 'table':
                current_table_columns = item.get('columns', [])
                current_table_rows = []
            elif kind == 'image':
                current_image_info = {
   
                    'alt': item.get('alt', ''),
                    'url': ''
                }

        elif t_type == 'delta':
            # 增量数据
            if kind == 'text' or item.get('text'):
                text_content = item.get('text', '')
                current_text_content += text_content
            elif item.get('row'):
                current_table_rows.append(item.get('row', []))

        elif t_type == 'block_update':
            # 更新块(主要用于图片)
            if 'url' in item:
                current_image_info['url'] = item.get('url', '')

        elif t_type == 'block_end':
            # 块结束,保存内容
            if current_text_content:
                extracted_content.append(current_text_content.strip())
                current_text_content = ""

            elif current_table_columns or current_table_rows:
                table_text = self._build_table_description(
                    current_table_columns,
                    current_table_rows
                )
                extracted_content.append(table_text)
                current_table_columns = []
                current_table_rows = []

            elif current_image_info.get('alt') or current_image_info.get('url'):
                image_text = self._build_image_description(current_image_info)
                extracted_content.append(image_text)
                current_image_info = {
   }

    # 拼接所有内容
    final_content = "\n\n".join(extracted_content)
    return final_content

def _build_table_description(self, columns: list, rows: list) -> str:
    """
    构建表格的文本描述

    格式:
    表格:
    列名:列1 | 列2 | 列3
    值1 | 值2 | 值3
    值4 | 值5 | 值6
    """
    if not columns and not rows:
        return ""

    table_lines = ["表格:"]

    if columns:
        table_lines.append("列名:" + " | ".join(columns))

    for row in rows:
        table_lines.append(" | ".join([str(cell) for cell in row]))

    return "\n".join(table_lines)

def _build_image_description(self, image_info: dict) -> str:
    """
    构建图片的文本描述

    格式:
    图片:配置界面 (images/config.jpg)
    """
    alt = image_info.get('alt', '')
    url = image_info.get('url', '')

    if alt and url:
        return f"图片:{alt} ({url})"
    elif alt:
        return f"图片:{alt}"
    elif url:
        return f"图片:({url})"
    else:
        return ""

五、流式响应的实现

5.1 SSE(Server-Sent Events)流式框架

优势:

  • 单向推送(服务器到客户端)
  • 自动重连
  • 简单的文本协议
  • 浏览器原生支持

格式:

data: {json}\n\n

实现: ChatRoutes.py 第234-289行

def generate_sse_response():
    """
    SSE生成器函数

    流程:
    1. 调用ChatService.add_chat_content(生成器)
    2. 逐步yield每个处理步骤的数据
    3. 格式化为SSE格式
    4. 错误处理
    """
    try:
        for step_data in chat_service.add_chat_content(session_id, content):
            # 格式化为SSE格式: data: {json}\n\n
            sse_data = f"data: {json.dumps(step_data, ensure_ascii=False)}\n\n"
            yield sse_data

    except Exception as e:
        logger.error(f"SSE流式响应生成失败: {e}")
        error_data = {
   
            'step': 'error',
            'status': 'error',
            'message': f'处理失败: {str(e)}'
        }
        yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"

return Response(
    generate_sse_response(),
    mimetype='text/event-stream',
    headers={
   
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
        'Access-Control-Allow-Origin': '*',
        'Access-Control-Allow-Headers': 'Cache-Control'
    }
)

5.2 流式数据结构

基础步骤事件格式

{
   
    'step': int,              # 步骤号(0-9)
    'step_name': str,          # 步骤名称
    'status': str,             # 'processing' | 'completed' | 'error'
    'message': str             # 状态消息
}

示例:

# Step 2 开始
{
   
    'step': 2,
    'step_name': '意图分析',
    'status': 'processing',
    'message': '正在分析用户意图...'
}

# Step 2 完成
{
   
    'step': 2,
    'step_name': '意图分析',
    'status': 'completed',
    'message': '意图分析完成,动作: retrieve_vector'
}

块级流式推送格式(Step 7-8)

对于答案生成和聊天回复,使用块级JSON格式:

文本块:

{
   "t":"block_start", "id":"b1", "kind":"text"}
{
   "t":"delta", "id":"b1", "text":"文本内容第一部分\n"}
{
   "t":"delta", "id":"b1", "text":"文本内容第二部分\n"}
{
   "t":"block_end", "id":"b1"}

表格块:

{
   "t":"block_start", "id":"b2", "kind":"table", "columns":["列1","列2","列3"]}
{
   "t":"delta", "id":"b2", "row":["值1","值2","值3"]}
{
   "t":"delta", "id":"b2", "row":["值4","值5","值6"]}
{
   "t":"block_end", "id":"b2"}

图片块:

{
   "t":"block_start", "id":"b3", "kind":"image", "alt":"配置界面截图"}
{
   "t":"block_update", "id":"b3", "url":"images/config.jpg", "width":960, "height":540}
{
   "t":"block_end", "id":"b3"}

5.3 前端接收示例

const eventSource = new EventSource(`/api/chat/sessions/${
     sessionId}/content`);

eventSource.onmessage = function(event) {
   
    const data = JSON.parse(event.data);

    if (data.step === 8 && data.status === 'processing') {
   
        // 块级JSON推送
        const block = JSON.parse(data.message);

        if (block.t === 'block_start') {
   
            // 创建新块容器
            createBlock(block.id, block.kind);
        }
        else if (block.t === 'delta') {
   
            // 追加内容到现有块
            if (block.text) {
   
                appendText(block.id, block.text);
            }
            else if (block.row) {
   
                appendTableRow(block.id, block.row);
            }
        }
        else if (block.t === 'block_update') {
   
            // 更新块属性(如图片URL)
            updateBlock(block.id, block);
        }
        else if (block.t === 'block_end') {
   
            // 块结束,标记完成
            finalizeBlock(block.id);
        }
    }
    else {
   
        // 步骤进度更新
        updateProgress(data.step, data.status, data.message);
    }
};

eventSource.onerror = function(event) {
   
    console.error('SSE连接错误', event);
    eventSource.close();
};

六、完整流程示意图

用户提问: "TL-SE2109的端口隔离怎么配置"
    │
    ↓
POST /api/chat/sessions/{id}/content
    │
    ↓ (返回SSE流)
Step 0: 创建系统上下文
    ├─→ 检查chat_context是否有type='routes'记录
    ├─→ 如果没有,创建系统提示词
    └─→ 无流式输出

Step 1: 保存用户消息到chat_content_history
    ├─→ INSERT INTO chat_content_history
    └─→ SSE: {step: 1, status: 'completed', ...}

Step 1.1: 保存用户输入到chat_context (role: user)
    ├─→ INSERT INTO chat_context with type='chat'
    └─→ 无流式输出

Step 2: 意图分析
    ├─→ SSE: {step: 2, status: 'processing', ...}
    ├─→ 调用 MeanDeepSeekService.get_mean()
    │   └─→ POST https://api.deepseek.com/v1/chat/completions
    │       ├─→ model: deepseek-chat
    │       ├─→ messages: [系统提示词, 历史消息, 当前查询]
    │       ├─→ temperature: 0.3
    │       └─→ stream: true
    ├─→ 收集流式响应,返回JSON:
    │   {
    │     "intent": "kb_retrieval",
    │     "confidence": 0.95,
    │     "queries": ["TL-SE2109", "端口隔离"],
    │     "rewrite": "TL-SE2109 端口隔离配置方法",
    │     "plan": {
    │       "action": "retrieve_vector",
    │       "args": {"k": 6}
    │     }
    │   }
    └─→ SSE: {step: 2, status: 'completed', ...}

Step 3: 向量检索
    ├─→ SSE: {step: 3, status: 'processing', ...}
    ├─→ 使用BGE-M3编码queries为1024维向量
    │   └─→ model.encode(query, normalize_embeddings=True)
    ├─→ 在Milvus中执行COSINE搜索
    │   ├─→ collection: major_bge_1024_collection
    │   ├─→ metric_type: COSINE
    │   ├─→ expr: 'user_id == "current_user"' (数据隔离)
    │   └─→ limit: 6
    ├─→ 返回6个相似chunks with sources=['vector']
    └─→ SSE: {step: 3, status: 'completed', message: '找到 6 个结果'}

Step 4: 全文检索(BM25)
    ├─→ SSE: {step: 4, status: 'processing', ...}
    ├─→ 在OpenSearch中搜索
    │   ├─→ 索引: major_pdf, major_word, major_excel, major_ppt, major_txt
    │   ├─→ 查询: "TL-SE2109 端口隔离配置方法"
    │   ├─→ 分析器: ik_max_word (中文分词)
    │   ├─→ must: {term: {user_id: "current_user"}} (数据隔离)
    │   └─→ size: 12
    ├─→ 返回12个相关docs with sources=['bm25']
    └─→ SSE: {step: 4, status: 'completed', message: '找到 12 个结果'}

Step 5: RRF结果合并
    ├─→ SSE: {step: 5, status: 'processing', ...}
    ├─→ 计算RRF分数: 1/(rank + 1 + 60)
    │   ├─→ 向量结果: rank 1-6
    │   └─→ BM25结果: rank 1-12
    ├─→ 按RRF分数排序
    ├─→ 返回前10个合并结果
    └─→ SSE: {step: 5, status: 'completed', message: '返回前 10 个结果'}

Step 6: 元数据补齐
    ├─→ SSE: {step: 6, status: 'processing', ...}
    ├─→ 从files表获取file_type
    │   └─→ SELECT id, file_type FROM files WHERE id IN (...)
    ├─→ 根据file_type确定content表
    │   └─→ pdf → pdf_content, word → word_content等
    ├─→ 查询image_path, title_content, original_filename
    │   └─→ SELECT ... FROM {table_name} WHERE content_id = ...
    └─→ SSE: {step: 6, status: 'completed', ...}

Step 7: 答案生成
    ├─→ SSE: {step: 7, status: 'processing', message: '正在生成AI回复...'}
    ├─→ 调用 AnswerDeepSeekService.generate_answer()
    │   └─→ POST https://api.deepseek.com/v1/chat/completions
    │       ├─→ model: deepseek-chat
    │       ├─→ messages: [历史消息, 答案生成提示词+检索结果]
    │       ├─→ temperature: 0.3
    │       └─→ stream: true
    ├─→ 收集块级JSON并逐块推送:
    │   └─→ SSE: {step: 8, status: 'processing', message: '{"t":"block_start",...}'}
    │   └─→ SSE: {step: 8, status: 'processing', message: '{"t":"delta",...}'}
    │   └─→ SSE: {step: 8, status: 'processing', message: '{"t":"delta",...}'}
    │   └─→ ...(持续推送)
    │   └─→ SSE: {step: 8, status: 'processing', message: '{"t":"block_end",...}'}
    └─→ SSE: {step: 7, status: 'completed', message: '答案生成完成'}

Step 8: 保存AI响应到chat_content_history
    ├─→ 格式化为标准JSON
    │   {
    │     'session_id': '123',
    │     'user_id': 'assistant',
    │     'content': '生成的完整答案(块级JSON)',
    │     'created_at': '2025-01-01T12:00:00'
    │   }
    ├─→ INSERT INTO chat_content_history
    ├─→ UPDATE chat_history SET chat_time = NOW()
    └─→ 无流式输出

Step 9: 保存AI上下文到chat_context
    ├─→ 从块级JSON中提取纯文本内容
    │   └─→ _extract_content_from_stream_json()
    ├─→ 格式化为上下文JSON
    │   {
    │     "role": "assistant",
    │     "content": "提取的纯文本内容"
    │   }
    ├─→ INSERT INTO chat_context with type='chat'
    └─→ 无流式输出

最终结果:
    ├─→ 前端收到完整的答案渲染(包含文本、表格、图片等)
    ├─→ 用户可进行后续提问(基于完整的历史上下文)
    └─→ 所有数据持久化到数据库(chat_content_history和chat_context)

七、关键文件总结

文件位置 功能 核心类/方法
app/routes/ChatRoutes.py API路由定义 add_chat_content() - SSE流式入口
app/service/ChatService.py 核心RAG逻辑(1415行) add_chat_content() - 9步处理流程
app/service/models/MeanDeepSeekService.py 意图识别 get_mean() - 调用DeepSeek进行意图分析
app/service/models/ChatDeepSeekService.py 多轮对话 chat() - 非检索类问答
app/service/models/AnswerDeepSeekService.py 答案生成 generate_answer() - 基于检索结果生成答案
utils/bge/Vector_bge_1024_Manager.py 向量管理 BGE-M3模型,1024维向量,Milvus存储
utils/OpenSearchManager.py 全文检索 BM25搜索,中文分词,多索引支持
utils/MysqlManager.py MySQL连接 SQLAlchemy ORM,会话管理
config/model.yaml LLM配置 3个DeepSeek模型配置
config/db.yaml 数据库配置 MySQL、Milvus、OpenSearch连接配置
install/db.sql 数据库初始化 3个chat表结构定义

八、架构特点总结

1. 双检索融合(Hybrid Search)

  • 向量检索:语义相似度(COSINE),基于BGE-M3模型
  • 全文检索:BM25倒排索引,支持中文分词(ik_max_word)
  • 合并方式:RRF(Reciprocal Rank Fusion)互惠排名融合

2. 多轮对话支持

  • 完整上下文历史保存在 chat_context
  • 支持引用前置答案进行追问
  • 灵活的意图识别和动作规划

3. 数据隔离

  • 所有检索添加 user_id 过滤条件
  • 向量库(Milvus)和全文索引(OpenSearch)都支持多用户隔离
  • 会话数据按 user_id 严格隔离

4. 流式实时性

  • SSE实时推送处理步骤(0-9步)
  • 块级JSON支持前端逐块渲染
  • 支持文本、表格、图片等多种内容类型

5. 灵活的提示词工程

  • 三层提示词设计:
    1. 系统提示词(意图识别)
    2. 聊天输出格式提示词
    3. 答案生成提示词
  • 动态调整 temperature 参数控制生成特性:
    • 意图识别:0.3(更保守)
    • 对话模型:0.7(更灵活)
    • 答案生成:0.3(更准确)
  • 支持自定义的输出格式和内容结构

6. 智能意图路由

  • 自动识别三种意图:
    • chat:闲聊/解释
    • kb_retrieval:知识库检索
    • clarify:需要澄清
  • 三种动作规划:
    • retrieve_vector:执行检索流程
    • answer_chat:直接对话
    • clarify:请求澄清

7. 完整的上下文管理

  • 两个上下文存储:
    • chat_content_history:完整消息历史(用于展示)
    • chat_context:LLM调用上下文(用于推理)
  • 自动提取块级JSON中的纯文本用于上下文保存
  • 支持长对话历史

8. 可扩展的输出格式

  • 块级JSON格式支持:
    • 文本块(kind: text
    • 表格块(kind: table
    • 图片块(kind: image
  • 易于扩展新的内容类型
  • 前端可灵活渲染不同类型的内容

分析完成日期: 2025-11-12

目录
相关文章
|
25天前
刚刚参加了一个MCP赛事,奖金还可以,搭友们可以去试试看
社区8月比赛未获奖有点失落,但发现通义灵码×蚂蚁百宝箱MCP赛事正火热进行!参赛即有机会赢取丰厚奖金,激励满满,令人眼前一亮。已跃跃欲试,搭友们快来一起冲榜夺奖吧!https://tianchi.aliyun.com/competition/entrance/532442
|
25天前
|
人工智能 前端开发 关系型数据库
MajorRAG 概述(1/3)
一个RAG项目,全文共三个部分:MajorRAG概述、MajorRAG文件内容提取实现分析、MajorRAG聊天问答系统实现分析。 1)第一次做RAG,欢迎带着指导意见评论 2)希望指出不足时可以附带替换方法
111 1
|
25天前
|
存储 关系型数据库 MySQL
MajorRAG文件内容提取实现分析(2/3)
一个RAG项目,全文共三个部分:MajorRAG概述、MajorRAG文件内容提取实现分析、MajorRAG聊天问答系统实现分析。 1)第一次做RAG,欢迎带着指导意见评论 2)希望指出不足时可以附带替换方法 博客地址:https://zhangcraigxg.github.io
106 1
|
27天前
|
自然语言处理 语音技术 Apache
阶跃星辰发布首个开源 LLM 级音频编辑大模型 Step-Audio-EditX
阶跃星辰发布全球首个开源LLM级音频编辑大模型Step-Audio-EditX,支持零样本TTS、多语言方言及情感、风格、副语言特征精准控制,采用统一LLM框架,实现文本驱动音频创作。
490 88
|
1月前
|
人工智能 搜索推荐 API
蚂蚁百宝箱联手深铁打造全国首个地铁 AI 智能体「深铁宝」:你的全能城市向导来啦~
蚂蚁百宝箱联合深铁集团、深圳通推出全国首个“公共出行+城市服务”AI智能体「深铁宝」,上线于深圳地铁、深圳通及支付宝APP,实现一句话直达、秒级响应的智慧出行体验,涵盖出行规划、乘车码快捷调取、周边生活服务推荐等一站式功能,助力城市交通与服务数字化升级。
266 30
|
23天前
|
编解码 人工智能 文字识别
【Github热门项目】DeepSeek-OCR项目上线即突破7k+星!突破10倍无损压缩,重新定义文本-视觉信息处理
DeepSeek-OCR开源即获7k+星,首创“上下文光学压缩”技术,仅用100视觉token超越传统OCR模型256token性能,压缩比达10-20倍,精度仍超97%。30亿参数实现单卡日处理20万页,显著降低大模型长文本输入成本,重新定义高效文档理解新范式。
【Github热门项目】DeepSeek-OCR项目上线即突破7k+星!突破10倍无损压缩,重新定义文本-视觉信息处理
|
安全 JavaScript Docker
Agent Skills技术协议与开源实现,让大模型拥有“即插即用”技能
Anthropic推出Agent Skills协议,通过模块化技能封装提升大模型智能体的专业能力。ModelScope开源项目MS-Agent已实现该协议,支持技能的动态加载、自主执行与安全沙箱运行,推动智能体能力的可组合与可扩展发展。
479 28
|
27天前
|
JavaScript 搜索推荐 开发者
ChatPPT+魔搭社区:MCP 2.0全面升级!
ChatPPT MCP2.0正式发布,联合魔搭ModelScope推出云端智能体服务,支持生成、编辑、演讲、动画等全链路功能,开放Streamable HTTP协议与本地Stdio双模式,已接入20+平台,服务300+开发者。
493 11
ChatPPT+魔搭社区:MCP 2.0全面升级!
|
19天前
|
人工智能 文字识别 物联网
ModelScope魔搭社区发布月报 -- 25年11月
魔搭ModelScope三周年庆!见证开源大模型从追赶到领跑,11月硬核更新不断:Qwen3-VL、MiniMax-M2等新模态齐发,AIGC生态爆发,OCR、语音、Agent全面进化。11月22日杭州AI开源生态大会,不见不散!
302 4
|
13天前
|
数据采集 人工智能 自然语言处理
Meta SAM3开源:让图像分割,听懂你的话
Meta发布并开源SAM 3,首个支持文本或视觉提示的统一图像视频分割模型,可精准分割“红色条纹伞”等开放词汇概念,覆盖400万独特概念,性能达人类水平75%–80%,推动视觉分割新突破。
838 59
Meta SAM3开源:让图像分割,听懂你的话

热门文章

最新文章