MajorRAG 聊天问答系统实现分析
一、API入口和路由
1.1 路由定义
文件位置: app/routes/ChatRoutes.py
主要路由
| 路由 | 方法 | 功能 | 认证 |
|---|---|---|---|
/api/chat/sessions |
GET | 获取历史会话列表 | 必需 |
/api/chat/sessions |
POST | 创建新会话 | 必需 |
/api/chat/sessions/<id> |
PUT | 更新会话标题 | 必需 |
/api/chat/sessions/<id> |
DELETE | 删除会话 | 必需 |
/api/chat/sessions/<id>/content |
GET | 获取会话内容 | 必需 |
/api/chat/sessions/<id>/content |
POST | 流式问答入口 | 必需 |
1.2 流式响应入口
核心代码: ChatRoutes.py 第234-289行
@chat_bp.route('/sessions/<int:session_id>/content', methods=['POST'])
def add_chat_content(session_id: int):
"""
实现Server-Sent Events (SSE)流式响应
接收参数:
- session_id: 会话ID
- content: 用户输入内容(JSON格式)
返回:SSE格式的流式响应
"""
def generate_sse_response():
"""SSE生成器函数"""
try:
# 调用ChatService的add_chat_content生成器
# 这个生成器会逐步yield每个处理步骤的数据
for step_data in chat_service.add_chat_content(session_id, content):
# 格式化为SSE格式: data: {json}\n\n
sse_data = f"data: {json.dumps(step_data, ensure_ascii=False)}\n\n"
yield sse_data
except Exception as e:
logger.error(f"SSE流式响应生成失败: {e}")
error_data = {
'step': 'error',
'status': 'error',
'message': f'处理失败: {str(e)}'
}
yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"
return Response(
generate_sse_response(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
}
)
二、RAG完整处理流程(9步)
2.1 流程概览
核心文件: app/service/ChatService.py(1415行代码)
聊天问答的核心在 ChatService.add_chat_content() 方法中实现,采用9步处理流程:
用户输入
↓
Step 0: 创建系统上下文
↓
Step 1: 保存用户消息到chat_content_history
↓
Step 1.1: 保存用户输入到chat_context
↓
Step 2: 意图分析(调用DeepSeek)
├─→ 返回JSON: plan、action、queries、rewrite
│
├→ action="retrieve_vector"(知识库检索)
│ ↓
│ Step 3: 向量检索(Milvus + BGE-M3)
│ ↓
│ Step 4: BM25全文检索(OpenSearch)
│ ↓
│ Step 5: RRF结果合并
│ ↓
│ Step 6: 补齐元数据
│ ↓
│ Step 7: 生成答案(DeepSeek)
│ ↓
│ Step 8: 保存AI响应到chat_content_history
│ ↓
│ Step 9: 保存AI上下文到chat_context
│
├→ action="answer_chat"(多轮对话)
│ ↓
│ 调用ChatDeepSeekService进行多轮对话
│ ↓
│ Step 8: 保存AI响应
│ ↓
│ Step 9: 保存AI上下文
│
└→ action="clarify"(需要澄清)
↓
保存澄清问题和上下文
2.2 详细步骤说明
Step 0: 创建系统上下文
方法: _step_0_check_and_create_system_context(session_id)
def _step_0_check_and_create_system_context(self, session_id: int):
"""
为会话创建第一条系统提示词记录
目的:
- 确保意图识别有正确的系统提示词
- 只在会话第一次使用时创建
实现:
1. 检查chat_context中是否已有type='routes'的记录
2. 如果没有,创建系统提示词记录
3. 保存到chat_context表
"""
# 检查是否已存在系统上下文
check_sql = """
SELECT COUNT(*) FROM chat_context
WHERE chat_id = :session_id AND type = 'routes'
"""
if count == 0:
# 构建系统提示词
system_prompt = self.deepseek_service._build_intent_prompt()
system_context = {
"role": "system",
"content": system_prompt
}
# 保存到chat_context
insert_sql = """
INSERT INTO chat_context (chat_id, context, type)
VALUES (:chat_id, :context, :type)
"""
输出: 无流式输出
Step 1: 保存用户消息
方法: _step_1_save_user_message(session_id, content)
def _step_1_save_user_message(self, session_id: int, content: str):
"""
保存用户消息到chat_content_history表
消息格式(JSON):
{
"session_id": str(session_id),
"user_id": "user",
"content": content,
"created_at": "2025-01-01T12:00:00"
}
"""
chat_content = {
'session_id': str(session_id),
'user_id': 'user',
'content': content,
'created_at': datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
}
insert_sql = """
INSERT INTO chat_content_history (chat_id, chat_content)
VALUES (:chat_id, :chat_content)
"""
# 同时更新会话的chat_time
update_sql = """
UPDATE chat_history
SET chat_time = NOW()
WHERE id = :session_id
"""
yield {
'step': 1,
'step_name': '保存用户消息',
'status': 'completed',
'message': '用户消息已保存'
}
Step 1.1: 保存用户输入到chat_context
方法: _step_1_1_save_user_context(session_id, content)
def _step_1_1_save_user_context(self, session_id: int, content: str):
"""
保存用户输入到chat_context表,用于后续的上下文对话
上下文格式(JSON):
{
"role": "user",
"content": content
}
type: 'chat'(区别于'routes'的系统提示词)
"""
user_context = {
"role": "user",
"content": content
}
insert_sql = """
INSERT INTO chat_context (chat_id, context, type)
VALUES (:chat_id, :context, :type)
"""
输出: 无流式输出
Step 2: 意图分析(关键步骤)
方法: _step_2_query_understanding(session_id, content)
def _step_2_query_understanding(self, session_id: int, content: str):
"""
调用MeanDeepSeekService进行意图识别
返回JSON结构:
{
"intent": "chat" | "kb_retrieval" | "clarify",
"confidence": 0.72,
"queries": ["TL-SE2109", "端口隔离"],
"rewrite": "TL-SE2109 端口隔离配置方法",
"followup": null,
"plan": {
"action": "retrieve_vector" | "answer_chat" | "clarify",
"args": {"queries": [...], "k": 6}
}
}
"""
yield {
'step': 2,
'step_name': '意图分析',
'status': 'processing',
'message': '正在分析用户意图...'
}
# 调用MeanDeepSeekService
mean_result = self.mean_deepseek_service.get_mean(session_id, content)
# 解析JSON结果
try:
plan_data = json.loads(mean_result)
action = plan_data.get('plan', {
}).get('action', 'answer_chat')
queries = plan_data.get('queries', [])
rewrite = plan_data.get('rewrite', content)
except json.JSONDecodeError:
# JSON解析失败,使用默认值
action = 'answer_chat'
queries = [content]
rewrite = content
yield {
'step': 2,
'step_name': '意图分析',
'status': 'completed',
'message': f'意图分析完成,动作: {action}'
}
return action, queries, rewrite
流式输出:
# 开始
{
'step': 2, 'step_name': '意图分析', 'status': 'processing', ...}
# 完成
{
'step': 2, 'step_name': '意图分析', 'status': 'completed', ...}
Step 3: 向量检索
方法: _step_3_vector_search(queries, k, user_id)
文件位置: app/service/ChatService.py 第711-797行
def _step_3_vector_search(self, queries: list, k: int = 6,
user_id: str = 'default') -> Generator:
"""
向量检索流程:
1. 使用BGE-M3模型编码query为1024维向量
2. 在Milvus中执行COSINE相似度搜索
3. 添加user_id过滤条件确保数据隔离
4. 返回topK个相似的chunks
参数:
- queries: 关键词列表(来自意图分析)
- k: 每个query返回的结果数量
- user_id: 用户ID(用于数据隔离)
返回:
[
{
'content_id': int,
'file_id': int,
'title_id': int,
'content': str,
'content_type': str,
'type': str,
'score': float, # 相似度分数
'sources': [{'type': 'vector', 'rank': 1, ...}]
},
...
]
"""
yield {
'step': 3,
'step_name': '向量检索',
'status': 'processing',
'message': f'正在进行向量检索,查询数: {len(queries)}'
}
all_vector_results = []
for query in queries:
# 1. 使用BGE模型进行向量化
query_vector = self.vector_manager.model.encode(
query,
normalize_embeddings=True,
convert_to_tensor=False
).tolist() # 转换为list格式
# 2. 确保collection已加载
self.vector_manager.collection.load()
# 3. 在Milvus中搜索
search_params = {
"metric_type": "COSINE", # 余弦相似度
"params": {
"nprobe": 10} # 搜索聚类数
}
# 4. 添加user_id过滤条件
expr = f'user_id == "{user_id}"'
search_results = self.vector_manager.collection.search(
data=[query_vector],
anns_field="vector",
param=search_params,
limit=k,
expr=expr, # 用户隔离
output_fields=["user_id", "file_id", "title_id", "content_id",
"type", "content", "content_type"]
)
# 5. 处理搜索结果
for hits in search_results:
for hit in hits:
result = {
'content_id': hit.entity.get('content_id'),
'file_id': hit.entity.get('file_id'),
'title_id': hit.entity.get('title_id'),
'content': hit.entity.get('content'),
'content_type': hit.entity.get('content_type'),
'type': hit.entity.get('type'),
'score': hit.score
}
all_vector_results.append(result)
yield {
'step': 3,
'step_name': '向量检索',
'status': 'completed',
'message': f'向量检索完成,找到 {len(all_vector_results)} 个结果'
}
return all_vector_results
向量管理器: utils/bge/Vector_bge_1024_Manager.py
class Vector_bge_1024_Manager:
def __init__(self):
# 模型配置
self.model_name = "BAAI/bge-m3"
self.dimension = 1024
# 加载BGE模型
self.model = SentenceTransformer(self.model_name)
# 连接Milvus
self.collection_name = "major_bge_1024_collection"
self.collection = Collection(self.collection_name)
流式输出:
# 开始
{
'step': 3, 'status': 'processing', 'message': '正在进行向量检索...'}
# 完成
{
'step': 3, 'status': 'completed', 'message': '向量检索完成,找到 6 个结果'}
Step 4: BM25全文检索
方法: _step_4_bm25_search(rewrite, k, user_id)
文件位置: app/service/ChatService.py 第799-937行
def _step_4_bm25_search(self, rewrite: str, k: int = 12,
user_id: str = 'default') -> Generator:
"""
BM25全文检索流程:
1. 使用OpenSearch进行BM25搜索
2. 搜索所有内容索引(major_pdf, major_word, major_excel等)
3. 使用ik_max_word分析器进行中文分词
4. 添加user_id过滤确保数据隔离
5. 返回topK个相关文档
参数:
- rewrite: 重写的查询语句(来自意图分析)
- k: 返回的结果数量
- user_id: 用户ID(用于数据隔离)
返回:
[
{
'content_id': int,
'file_id': int,
'title_id': int,
'content': str,
'type': str,
'score': float, # BM25分数
'sources': [{'type': 'bm25', 'rank': 1, ...}]
},
...
]
"""
yield {
'step': 4,
'step_name': 'BM25检索',
'status': 'processing',
'message': '正在进行全文检索...'
}
# 要搜索的索引列表
search_indices = [
'major_pdf',
'major_word',
'major_excel',
'major_ppt',
'major_txt'
]
# 构建BM25搜索查询
search_query = {
"query": {
"bool": {
"must": [
{
"term": {
"user_id": user_id # 用户隔离
}
}
],
"should": [
{
"match": {
"content": {
"query": rewrite,
"analyzer": "ik_max_word", # 中文分词
"boost": 1.0
}
}
}
],
"minimum_should_match": 1
}
},
"size": k,
"_source": ["user_id", "file_id", "title_id", "content_id",
"type", "content"],
"sort": [
{
"_score": {
"order": "desc"}}
]
}
all_bm25_results = []
# 在所有内容索引中搜索
for index_name in search_indices:
try:
response = client.search(index=index_name, body=search_query)
hits = response.get('hits', {
}).get('hits', [])
for hit in hits:
source = hit.get('_source', {
})
result = {
'content_id': source.get('content_id'),
'file_id': source.get('file_id'),
'title_id': source.get('title_id'),
'content': source.get('content'),
'type': source.get('type'),
'score': hit.get('_score', 0.0)
}
all_bm25_results.append(result)
except Exception as e:
logger.warning(f"索引 {index_name} 搜索失败: {e}")
continue
yield {
'step': 4,
'step_name': 'BM25检索',
'status': 'completed',
'message': f'全文检索完成,找到 {len(all_bm25_results)} 个结果'
}
return all_bm25_results
OpenSearch配置: config/db.yaml
opensearch:
host: localhost
port: 9200
user: admin
password: admin
indices:
- major_files # 文件索引(不用于内容检索)
- major_pdf # PDF文档索引
- major_word # Word文档索引
- major_excel # Excel文档索引
- major_ppt # PPT文档索引
- major_txt # TXT文档索引
流式输出:
# 开始
{
'step': 4, 'status': 'processing', 'message': '正在进行全文检索...'}
# 完成
{
'step': 4, 'status': 'completed', 'message': '全文检索完成,找到 12 个结果'}
Step 5: RRF结果合并
方法: _step_5_merge_results(vector_results, bm25_results, k)
文件位置: app/service/ChatService.py 第939-1043行
def _step_5_merge_results(self, vector_results, bm25_results, k: int = 10):
"""
使用RRF(Reciprocal Rank Fusion)合并两种检索结果
RRF算法:
- 公式:RRF分数 = 1 / (rank + rrf_constant)
- rrf_constant通常取60(论文推荐值)
- 优势:不依赖于原始分数,只依赖排名
参数:
- vector_results: 向量检索结果
- bm25_results: BM25检索结果
- k: 最终返回的结果数量
返回:
[
{
'content_id': int,
'file_id': int,
'content': str,
'rrf_score': float, # RRF合并分数
'final_rank': int, # 最终排名
'sources': [ # 来源信息
{'type': 'vector', 'rank': 1, 'score': 0.85, 'rrf_score': 0.016},
{'type': 'bm25', 'rank': 3, 'score': 12.5, 'rrf_score': 0.015}
]
},
...
]
"""
yield {
'step': 5,
'step_name': '结果合并',
'status': 'processing',
'message': '正在合并检索结果...'
}
rrf_constant = 60
merged_scores = {
} # {content_id: rrf_score}
result_details = {
} # {content_id: result_with_sources}
# 处理向量检索结果
for rank, result in enumerate(vector_results):
content_id = result.get('content_id')
# 计算RRF分数
rrf_score = 1.0 / (rank + 1 + rrf_constant)
if content_id not in merged_scores:
merged_scores[content_id] = 0.0
result_details[content_id] = result.copy()
result_details[content_id]['sources'] = []
# 累加RRF分数
merged_scores[content_id] += rrf_score
# 记录来源
result_details[content_id]['sources'].append({
'type': 'vector',
'rank': rank + 1,
'score': result.get('score', 0.0),
'rrf_score': rrf_score
})
# 处理BM25结果(同样计算RRF分数)
for rank, result in enumerate(bm25_results):
content_id = result.get('content_id')
# 计算RRF分数
rrf_score = 1.0 / (rank + 1 + rrf_constant)
if content_id not in merged_scores:
merged_scores[content_id] = 0.0
result_details[content_id] = result.copy()
result_details[content_id]['sources'] = []
# 累加RRF分数
merged_scores[content_id] += rrf_score
# 记录来源
result_details[content_id]['sources'].append({
'type': 'bm25',
'rank': rank + 1,
'score': result.get('score', 0.0),
'rrf_score': rrf_score
})
# 按RRF分数排序取前K个
sorted_results = sorted(
merged_scores.items(),
key=lambda x: x[1],
reverse=True
)
final_results = []
for i, (content_id, rrf_score) in enumerate(sorted_results[:k]):
result = result_details[content_id].copy()
result['final_rank'] = i + 1
result['rrf_score'] = rrf_score
final_results.append(result)
yield {
'step': 5,
'step_name': '结果合并',
'status': 'completed',
'message': f'结果合并完成,返回前 {len(final_results)} 个结果'
}
return final_results
RRF示例:
假设有两个结果:
- 向量检索:content_id=123 排名第1
- BM25检索:content_id=123 排名第3
RRF计算:
- vector RRF = 1/(1+60) = 0.0164
- bm25 RRF = 1/(3+60) = 0.0159
- 总RRF = 0.0164 + 0.0159 = 0.0323
如果content_id=456在向量检索中排名第5,在BM25中没有出现:
- vector RRF = 1/(5+60) = 0.0154
- 总RRF = 0.0154
则content_id=123的最终排名会高于content_id=456
流式输出:
# 开始
{
'step': 5, 'status': 'processing', 'message': '正在合并检索结果...'}
# 完成
{
'step': 5, 'status': 'completed', 'message': '结果合并完成,返回前 10 个结果'}
Step 6: 元数据补齐
方法: _step_6_answer_completion(final_results)
文件位置: app/service/ChatService.py 第1045-1168行
def _step_6_answer_completion(self, final_results):
"""
从MySQL数据库补齐元数据:
1. 获取file_id对应的file_type
2. 根据file_type确定查询的表名
3. 从对应的content表查询image_path, title_content, original_filename
4. 补充到最终结果中
参数:
- final_results: RRF合并后的结果列表
返回:
[
{
'content_id': int,
'file_id': int,
'content': str,
'rrf_score': float,
'image_path': str, # 新增:图片路径
'title_content': str, # 新增:标题内容
'original_filename': str # 新增:原始文件名
},
...
]
"""
yield {
'step': 6,
'step_name': '补齐元数据',
'status': 'processing',
'message': '正在补齐元数据...'
}
# 1) 收集所有file_id
file_ids = set()
for result in final_results:
file_id = result.get('file_id')
if file_id is not None:
file_ids.add(file_id)
if not file_ids:
return final_results
# 2) 从files表查询file_type
placeholders = ','.join([':id' + str(i) for i in range(len(file_ids))])
query_sql = f"""
SELECT id as file_id, file_type
FROM files
WHERE id IN ({placeholders})
"""
params = {
f'id{i}': fid for i, fid in enumerate(file_ids)}
result = self.mysql_manager.execute_query(query_sql, params)
# 构建file_type映射
file_type_mapping = {
}
for row in result:
file_type_mapping[row[0]] = row[1]
# 3) 根据file_type查询对应的content表
enhanced_results = []
for result in final_results:
enhanced_result = result.copy()
file_id = result.get('file_id')
content_id = result.get('content_id')
if file_id in file_type_mapping and content_id:
file_type = file_type_mapping[file_id]
table_name = self._get_table_name_from_file_type(file_type)
if table_name:
# 查询元数据
metadata_sql = f"""
SELECT image_path, title_content, original_filename
FROM {table_name}
WHERE content_id = :content_id
LIMIT 1
"""
metadata_result = self.mysql_manager.execute_query(
metadata_sql,
{
'content_id': content_id}
)
if metadata_result:
metadata_row = metadata_result[0]
enhanced_result['image_path'] = metadata_row[0]
enhanced_result['title_content'] = metadata_row[1]
enhanced_result['original_filename'] = metadata_row[2]
enhanced_results.append(enhanced_result)
yield {
'step': 6,
'step_name': '补齐元数据',
'status': 'completed',
'message': '元数据补齐完成'
}
return enhanced_results
def _get_table_name_from_file_type(self, file_type: str) -> str:
"""
文件类型到表名映射
"""
file_type_to_table = {
'pdf': 'pdf_content',
'ppt': 'ppt_content',
'pptx': 'ppt_content',
'doc': 'word_content',
'docx': 'word_content',
'xls': 'excel_content',
'xlsx': 'excel_content',
'txt': 'txt_content',
'md': 'txt_content'
}
return file_type_to_table.get(file_type.lower())
流式输出:
# 开始
{
'step': 6, 'status': 'processing', 'message': '正在补齐元数据...'}
# 完成
{
'step': 6, 'status': 'completed', 'message': '元数据补齐完成'}
Step 7: 答案生成
方法: _step_7_generate_answer(session_id, content, mysql_final_results)
def _step_7_generate_answer(self, session_id: int, content: str,
mysql_final_results: list):
"""
基于检索结果生成答案
调用AnswerDeepSeekService生成块级JSON格式的答案
"""
yield {
'step': 7,
'step_name': '生成答案',
'status': 'processing',
'message': '正在生成AI回复...'
}
# 调用AnswerDeepSeekService
for answer_step in self.answer_deepseek_service.generate_answer(
session_id, content, mysql_final_results
):
yield answer_step
yield {
'step': 7,
'step_name': '生成答案',
'status': 'completed',
'message': '答案生成完成'
}
流式输出:
# 开始
{
'step': 7, 'status': 'processing', 'message': '正在生成AI回复...'}
# 块级推送(持续多次)
{
'step': 8, 'status': 'processing', 'message': '{"t":"block_start",...}'}
{
'step': 8, 'status': 'processing', 'message': '{"t":"delta",...}'}
...
# 完成
{
'step': 7, 'status': 'completed', 'message': '答案生成完成'}
Step 8: 保存AI响应
方法: _step_8_save_ai_response(session_id, ai_answer)
def _step_8_save_ai_response(self, session_id: int, ai_answer: str):
"""
保存AI响应到chat_content_history表
消息格式(JSON):
{
"session_id": str(session_id),
"user_id": "assistant",
"content": ai_answer,
"created_at": "2025-01-01T12:00:00"
}
"""
chat_content = {
'session_id': str(session_id),
'user_id': 'assistant',
'content': ai_answer,
'created_at': datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
}
insert_sql = """
INSERT INTO chat_content_history (chat_id, chat_content)
VALUES (:chat_id, :chat_content)
"""
# 同时更新会话的chat_time
update_sql = """
UPDATE chat_history
SET chat_time = NOW()
WHERE id = :session_id
"""
输出: 无流式输出
Step 9: 保存AI上下文
方法: _step_9_save_ai_context(session_id, ai_answer)
def _step_9_save_ai_context(self, session_id: int, ai_answer: str):
"""
保存AI上下文到chat_context表
处理流程:
1. 从块级JSON中提取纯文本内容
2. 格式化为上下文JSON
3. 保存到chat_context,type='chat'
上下文格式(JSON):
{
"role": "assistant",
"content": "提取的纯文本内容"
}
"""
# 从块级JSON中提取内容
extracted_content = self._extract_content_from_stream_json(ai_answer)
ai_context = {
"role": "assistant",
"content": extracted_content
}
insert_sql = """
INSERT INTO chat_context (chat_id, context, type)
VALUES (:chat_id, :context, :type)
"""
输出: 无流式输出
三、LLM模型和调用方式
3.1 模型配置
配置文件: config/model.yaml
# 三个独立的DeepSeek模型配置
# 1. 意图识别模型(temperature=0.3,更保守)
intent_models:
deepseek:
api_key: "sk-xxx"
base_url: "https://api.deepseek.com/v1"
model_name: "deepseek-chat"
max_tokens: 4096
temperature: 0.3 # 更保守,保证意图识别准确
stream: true
# 2. 对话模型(temperature=0.7,更灵活)
chat_models:
deepseek:
api_key: "sk-xxx"
base_url: "https://api.deepseek.com/v1"
model_name: "deepseek-chat"
max_tokens: 4096
temperature: 0.7 # 更灵活,支持创造性对话
stream: true
# 3. 答案生成模型(temperature=0.3,保证准确性)
answer_models:
deepseek:
api_key: "sk-xxx"
base_url: "https://api.deepseek.com/v1"
model_name: "deepseek-chat"
max_tokens: 4096
temperature: 0.3 # 保证答案准确性
stream: true
3.2 三个核心服务类
A. MeanDeepSeekService(意图识别)
文件位置: app/service/models/MeanDeepSeekService.py
class MeanDeepSeekService:
def __init__(self):
"""从config/model.yaml加载intent_models配置"""
self.api_key = config['intent_models']['deepseek']['api_key']
self.base_url = config['intent_models']['deepseek']['base_url']
self.model_name = config['intent_models']['deepseek']['model_name']
self.max_tokens = config['intent_models']['deepseek']['max_tokens']
self.temperature = config['intent_models']['deepseek']['temperature']
self.stream = config['intent_models']['deepseek']['stream']
def get_mean(self, session_id: int, user_input: str) -> str:
"""
意图识别和动作规划
流程:
1. 获取chat_context中的所有上下文消息
2. 调用DeepSeek API进行流式识别
3. 收集响应内容(JSON格式的plan)
4. 返回JSON字符串给ChatService
返回JSON格式:
{
"intent": "chat" | "kb_retrieval" | "clarify",
"confidence": 0.72,
"queries": ["关键词1", "关键词2"],
"rewrite": "重写的查询语句",
"followup": null,
"plan": {
"action": "retrieve_vector" | "answer_chat" | "clarify",
"args": {"queries": [...], "k": 6}
}
}
"""
# 1. 获取上下文消息
messages = self._get_messages_from_context(session_id)
# 2. 添加当前用户输入
messages.append({
"role": "user",
"content": user_input
})
# 3. 调用DeepSeek API
data = {
'model': self.model_name,
'messages': messages,
'max_tokens': self.max_tokens,
'temperature': self.temperature,
'stream': self.stream
}
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
response = requests.post(
f'{self.base_url}/chat/completions',
json=data,
headers=headers,
stream=True
)
# 4. 处理SSE流式响应
collected_content = ""
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data_str = line[6:] # 移除'data: '前缀
if data_str.strip() == '[DONE]':
break
try:
data_obj = json.loads(data_str)
choices = data_obj.get('choices', [])
if choices:
delta = choices[0].get('delta', {
})
content = delta.get('content', '')
collected_content += content
except json.JSONDecodeError:
continue
return collected_content # 完整的JSON字符串
def _build_intent_prompt(self) -> str:
"""
构建意图识别系统提示词
"""
prompt = """你是一个"意图路由与动作计划生成器",任务是:
1. 从用户输入和历史对话中识别用户意图(intent):
- "chat":闲聊、解释、不需要知识库的对话
- "kb_retrieval":需要从知识库检索信息
- "clarify":问题模糊,需要澄清
2. 给出信心分数(confidence):0.0–1.0
3. 输出queries(最小必要关键词,1–5个)用于向量化
- 提取核心概念、型号、关键词
- 去除无用词("怎么"、"如何"等)
4. 输出rewrite(一句话的自然语言检索串)用于BM25
- 保留完整语义
- 适合全文搜索
5. 输出plan,包含action和args:
- action可选:
* "retrieve_vector":执行知识库检索
* "answer_chat":直接对话回答
* "clarify":请求用户澄清
- args参数:{"queries": [...], "k": 6}
输出必须是严格JSON格式,不要包含任何解释或额外文本。
示例输出:
{
"intent": "kb_retrieval",
"confidence": 0.95,
"queries": ["TL-SE2109", "端口隔离"],
"rewrite": "TL-SE2109 端口隔离配置方法",
"followup": null,
"plan": {
"action": "retrieve_vector",
"args": {"queries": ["TL-SE2109", "端口隔离"], "k": 6}
}
}
"""
return prompt
B. ChatDeepSeekService(多轮对话)
文件位置: app/service/models/ChatDeepSeekService.py
class ChatDeepSeekService:
def __init__(self):
"""从config/model.yaml加载chat_models配置"""
self.api_key = config['chat_models']['deepseek']['api_key']
self.base_url = config['chat_models']['deepseek']['base_url']
self.model_name = config['chat_models']['deepseek']['model_name']
self.max_tokens = config['chat_models']['deepseek']['max_tokens']
self.temperature = config['chat_models']['deepseek']['temperature']
self.stream = config['chat_models']['deepseek']['stream']
def chat(self, session_id: int, user_input: str, mode: str = 'with_kb'):
"""
多轮对话
用于非检索相关的问题回答
支持多轮对话,带有知识库上下文或纯聊天
参数:
- session_id: 会话ID
- user_input: 用户输入
- mode: 'with_kb'(带知识库)或 'pure_chat'(纯聊天)
返回:块级JSON流式数据
"""
# 1. 获取历史上下文
history_messages = self._get_messages_from_context(session_id)
# 2. 构建聊天提示词
format_prompt = self._build_chat_prompt(user_input)
# 3. 调用DeepSeek进行聊天
for chat_block in self._call_deepseek_for_chat(
history_messages, format_prompt
):
yield chat_block
def _build_chat_prompt(self, user_input: str) -> str:
"""
构建聊天输出格式提示词
"""
prompt = f"""用户问题:{user_input}
输出JSON格式要求:
1)输出格式必须是json格式,不要使用```json```代码块包裹
2)你的回答需要严格按照以下示例构造json:
[
{
{"t":"block_start", "id":"b1", "kind":"text"}},
{
{"t":"delta", "id":"b1", "text":"文本内容\\n"}},
{
{"t":"block_end", "id":"b1"}},
{
{"t":"block_start", "id":"b2", "kind":"table", "columns":["列1","列2"]}},
{
{"t":"delta", "id":"b2", "row":["值1","值2"]}},
{
{"t":"delta", "id":"b2", "row":["值3","值4"]}},
{
{"t":"block_end", "id":"b2"}},
{
{"t":"block_start", "id":"b3", "kind":"image", "alt":"图片描述"}},
{
{"t":"block_update", "id":"b3", "url":"images/xxx.jpg", "width":960, "height":540}},
{
{"t":"block_end", "id":"b3"}}
]
3)kind类型说明:
- "text":文本块,用delta推送text内容
- "table":表格块,需要先定义columns,然后用delta推送每一行row
- "image":图片块,用block_update推送url、width、height
4)每个内容块都要有block_start和block_end配对
5)所有JSON对象必须在同一行,不要换行
6)id必须唯一,建议使用b1, b2, b3...格式
"""
return prompt
def _call_deepseek_for_chat(self, history_messages: list, prompt: str):
"""
调用DeepSeek进行聊天,返回块级JSON
"""
messages = history_messages + [{
"role": "user", "content": prompt}]
data = {
'model': self.model_name,
'messages': messages,
'max_tokens': self.max_tokens,
'temperature': self.temperature,
'stream': self.stream
}
# 调用API并处理流式响应(同AnswerDeepSeekService)
# 返回块级JSON格式的数据
# ...
C. AnswerDeepSeekService(答案生成)
文件位置: app/service/models/AnswerDeepSeekService.py
class AnswerDeepSeekService:
def __init__(self):
"""从config/model.yaml加载answer_models配置"""
self.api_key = config['answer_models']['deepseek']['api_key']
self.base_url = config['answer_models']['deepseek']['base_url']
self.model_name = config['answer_models']['deepseek']['model_name']
self.max_tokens = config['answer_models']['deepseek']['max_tokens']
self.temperature = config['answer_models']['deepseek']['temperature']
self.stream = config['answer_models']['deepseek']['stream']
def generate_answer(self, session_id: int, user_input: str,
mysql_final_results: List[Dict]):
"""
基于检索结果生成答案
参数:
- session_id: 会话ID
- user_input: 用户问题
- mysql_final_results: 检索到的参考资料列表
返回:块级JSON格式的答案
"""
# 1. 获取历史上下文
history_messages = self._get_messages_from_context(session_id)
# 2. 构建答案生成提示词
prompt = self._build_answer_prompt(user_input, mysql_final_results)
# 3. 调用DeepSeek生成答案
for answer_block in self._call_deepseek_for_answer(
history_messages, prompt
):
yield answer_block
def _build_answer_prompt(self, user_input: str,
mysql_final_results: List[Dict]) -> str:
"""
构建答案生成提示词
"""
prompt = f"""你是一个"基于检索证据作答"的中文助手。
请基于以下参考资料,针对用户的问题提供准确、详细的回答:
用户问题:{user_input}
参考资料:
{json.dumps(mysql_final_results, ensure_ascii=False, indent=2)}
要求:
1. 首先基于参考资料逐条分析:
- 该条参考资料的展示:
* 若是文本:用Markdown的### 标签开始
* 若是图片:用url:image_path格式
- 该条参考资料的出处:
格式:引用【original_filename】标题:title_content
- content和用户问题的关联
2. 然后基于用户问题和逐条分析的结果,给出归纳后的答案:
- 归纳后的答案
- 归纳后的答案出处,按文档、标题分组
- 如果参考资料中没有相关信息,明确说明
3. 输出必须是JSON格式,格式同ChatDeepSeekService的块级JSON
示例输出:
[
{
{"t":"block_start", "id":"b1", "kind":"text"}},
{
{"t":"delta", "id":"b1", "text":"### 参考资料分析\\n"}},
{
{"t":"delta", "id":"b1", "text":"1. 引用【设备手册.pdf】标题:端口配置\\n"}},
{
{"t":"delta", "id":"b1", "text":" 配置步骤如下...\\n"}},
{
{"t":"block_end", "id":"b1"}},
{
{"t":"block_start", "id":"b2", "kind":"text"}},
{
{"t":"delta", "id":"b2", "text":"### 归纳答案\\n"}},
{
{"t":"delta", "id":"b2", "text":"端口隔离的配置方法...\\n"}},
{
{"t":"block_end", "id":"b2"}},
{
{"t":"block_start", "id":"b3", "kind":"image", "alt":"配置界面"}},
{
{"t":"block_update", "id":"b3", "url":"images/config.jpg", "width":800, "height":600}},
{
{"t":"block_end", "id":"b3"}}
]
"""
return prompt
def _call_deepseek_for_answer(self, history_messages: list, prompt: str):
"""
调用DeepSeek生成答案,返回块级JSON
流程:
1. 调用API获取流式响应
2. 解析SSE格式数据
3. 识别完整的JSON块
4. 逐块推送给前端
"""
messages = history_messages + [{
"role": "user", "content": prompt}]
data = {
'model': self.model_name,
'messages': messages,
'max_tokens': self.max_tokens,
'temperature': self.temperature,
'stream': self.stream
}
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
response = requests.post(
f'{self.base_url}/chat/completions',
json=data,
headers=headers,
stream=True
)
# 处理块级推送逻辑
collected_content = ""
json_buffer = ""
waiting_for_complete_json = False
brace_count = 0
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data_str = line[6:]
if data_str.strip() == '[DONE]':
break
try:
data_obj = json.loads(data_str)
choices = data_obj.get('choices', [])
if choices:
delta = choices[0].get('delta', {
})
content = delta.get('content', '')
if content:
collected_content += content
# 处理块级推送逻辑
for char in content:
json_buffer += char
if char == '{':
if not waiting_for_complete_json:
waiting_for_complete_json = True
brace_count += 1
elif char == '}':
brace_count -= 1
if waiting_for_complete_json and brace_count == 0:
# 完整的JSON块接收完毕,可以推送
try:
json_start = json_buffer.rfind('{')
if json_start != -1:
json_block = json_buffer[json_start:]
json.loads(json_block) # 验证JSON
# 推送给前端
yield {
'step': 8,
'step_name': '渲染答案',
'status': 'processing',
'message': json_block
}
except (json.JSONDecodeError, ValueError):
pass
waiting_for_complete_json = False
except json.JSONDecodeError:
continue
# 返回完整内容用于保存
return collected_content
四、聊天历史管理
4.1 数据库表结构
chat_history(会话表)
用途: 存储用户的所有会话
CREATE TABLE chat_history (
id BIGINT AUTO_INCREMENT PRIMARY KEY, -- 会话ID
user_id VARCHAR(100) NOT NULL, -- 用户ID
chat_title VARCHAR(255) NOT NULL, -- 会话标题
chat_time DATETIME DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP, -- 最后聊天时间
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, -- 创建时间
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP, -- 更新时间
status TINYINT(1) DEFAULT 1, -- 状态(0:删除,1:正常)
INDEX idx_user_id (user_id),
INDEX idx_chat_time (chat_time),
INDEX idx_status (status)
);
chat_content_history(会话消息表)
用途: 存储每条用户和AI的消息
CREATE TABLE chat_content_history (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
chat_id BIGINT NOT NULL, -- 关联的会话ID
chat_content TEXT NOT NULL, -- 会话内容(JSON格式)
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_chat_id (chat_id),
INDEX idx_created_at (created_at),
FOREIGN KEY (chat_id) REFERENCES chat_history(id) ON DELETE CASCADE
);
chat_content格式(JSON):
{
"session_id": "123",
"user_id": "user" | "assistant",
"content": "消息内容",
"created_at": "2025-01-01T12:00:00"
}
chat_context(上下文表)
用途: 存储用于LLM调用的上下文消息
CREATE TABLE chat_context (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
chat_id BIGINT NOT NULL, -- 关联的会话ID
context TEXT NOT NULL, -- 上下文(JSON格式)
type VARCHAR(50) NOT NULL, -- 类型: 'routes'或'chat'
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_chat_id (chat_id),
INDEX idx_type (type),
INDEX idx_created_at (created_at),
FOREIGN KEY (chat_id) REFERENCES chat_history(id) ON DELETE CASCADE
);
context格式(JSON):
type='routes'(系统提示词):
{
"role": "system",
"content": "你是一个意图路由与动作计划生成器..."
}
type='chat'(用户消息和AI回复):
{
"role": "user" | "assistant",
"content": "消息内容"
}
4.2 会话管理API
文件位置: app/routes/ChatRoutes.py
获取会话列表
@chat_bp.route('/sessions', methods=['GET'])
def get_chat_sessions():
"""
获取用户所有会话,按chat_time倒序
返回:
[
{
"id": 1,
"chat_title": "新会话",
"chat_time": "2025-01-01 12:00:00",
"created_at": "2025-01-01 10:00:00"
},
...
]
"""
sql = """
SELECT id, chat_title, chat_time, created_at
FROM chat_history
WHERE user_id = :user_id AND status = 1
ORDER BY chat_time DESC
"""
创建新会话
@chat_bp.route('/sessions', methods=['POST'])
def create_chat_session():
"""
创建新会话
参数(可选):
- chat_title: 会话标题(如果为空,自动生成)
自动生成规则:
- 查询用户的会话列表
- 生成唯一标题:"新会话"、"新会话1"、"新会话2"等
返回:
{
"success": true,
"session_id": 123,
"chat_title": "新会话"
}
"""
# 如果chat_title为空,生成唯一标题
if not chat_title:
chat_title = self._generate_unique_title(user_id)
insert_sql = """
INSERT INTO chat_history (user_id, chat_title, chat_time)
VALUES (:user_id, :chat_title, NOW())
"""
更新会话标题
@chat_bp.route('/sessions/<int:session_id>', methods=['PUT'])
def update_chat_session(session_id: int):
"""
更新会话标题
参数:
- chat_title: 新的标题
返回:
{
"success": true,
"message": "会话标题已更新"
}
"""
update_sql = """
UPDATE chat_history
SET chat_title = :chat_title, updated_at = NOW()
WHERE id = :session_id AND user_id = :user_id
"""
删除会话
@chat_bp.route('/sessions/<int:session_id>', methods=['DELETE'])
def delete_chat_session(session_id: int):
"""
删除会话(软删除,设置status=0)
流程:
1. 设置status=0(软删除)
2. 级联删除chat_content_history中的内容(外键CASCADE)
3. 级联删除chat_context中的上下文(外键CASCADE)
4. 如果用户没有其他会话,自动创建一个新会话
返回:
{
"success": true,
"message": "会话已删除"
}
"""
# 软删除会话
update_sql = """
UPDATE chat_history
SET status = 0, updated_at = NOW()
WHERE id = :session_id AND user_id = :user_id
"""
# 检查用户是否还有其他会话
check_sql = """
SELECT COUNT(*) FROM chat_history
WHERE user_id = :user_id AND status = 1
"""
# 如果没有其他会话,创建一个新会话
if count == 0:
create_chat_session()
获取会话内容
@chat_bp.route('/sessions/<int:session_id>/content', methods=['GET'])
def get_chat_content(session_id: int):
"""
获取会话内容
返回格式化的消息列表,解析JSON格式
返回:
[
{
"session_id": "123",
"user_id": "user",
"content": "用户问题",
"created_at": "2025-01-01T12:00:00"
},
{
"session_id": "123",
"user_id": "assistant",
"content": "AI回答",
"created_at": "2025-01-01T12:00:05"
},
...
]
"""
query_sql = """
SELECT chat_content, created_at
FROM chat_content_history
WHERE chat_id = :session_id
ORDER BY created_at ASC
"""
# 解析JSON格式的chat_content
messages = []
for row in result:
chat_content = json.loads(row[0])
messages.append(chat_content)
return messages
4.3 上下文管理
获取上下文消息(用于LLM调用):
def _get_messages_from_context(self, session_id: int) -> list:
"""
从chat_context表按created_at顺序获取所有上下文
返回格式化的messages列表,用于DeepSeek API调用
返回:
[
{"role": "system", "content": "系统提示词"},
{"role": "user", "content": "用户问题1"},
{"role": "assistant", "content": "AI回答1"},
{"role": "user", "content": "用户问题2"},
...
]
"""
sql = """
SELECT context
FROM chat_context
WHERE chat_id = :session_id
ORDER BY created_at ASC
"""
messages = []
for row in result:
context_data = json.loads(row[0]) # 解析JSON
messages.append(context_data)
return messages
4.4 内容提取机制
从块级JSON中提取内容用于上下文保存:
文件位置: app/service/ChatService.py 第1263-1413行
def _extract_content_from_stream_json(self, ai_answer: str) -> str:
"""
从流式JSON数组中提取纯文本内容
处理逻辑:
1. 识别block_start和block_end
2. 提取不同type的内容:
- 文本块: 从delta中的text字段
- 表格块: 构建表格文本描述
- 图片块: 构建图片文本描述
3. 拼接所有内容
参数:
- ai_answer: 块级JSON字符串
返回:
提取的纯文本内容(用于保存到chat_context)
"""
try:
# 解析JSON数组
json_data = json.loads(ai_answer)
except json.JSONDecodeError:
return ai_answer
extracted_content = []
# 当前块的临时变量
current_text_content = ""
current_table_columns = []
current_table_rows = []
current_image_info = {
}
for item in json_data:
t_type = item.get('t')
kind = item.get('kind')
if t_type == 'block_start':
# 块开始,初始化临时变量
if kind == 'text':
current_text_content = ""
elif kind == 'table':
current_table_columns = item.get('columns', [])
current_table_rows = []
elif kind == 'image':
current_image_info = {
'alt': item.get('alt', ''),
'url': ''
}
elif t_type == 'delta':
# 增量数据
if kind == 'text' or item.get('text'):
text_content = item.get('text', '')
current_text_content += text_content
elif item.get('row'):
current_table_rows.append(item.get('row', []))
elif t_type == 'block_update':
# 更新块(主要用于图片)
if 'url' in item:
current_image_info['url'] = item.get('url', '')
elif t_type == 'block_end':
# 块结束,保存内容
if current_text_content:
extracted_content.append(current_text_content.strip())
current_text_content = ""
elif current_table_columns or current_table_rows:
table_text = self._build_table_description(
current_table_columns,
current_table_rows
)
extracted_content.append(table_text)
current_table_columns = []
current_table_rows = []
elif current_image_info.get('alt') or current_image_info.get('url'):
image_text = self._build_image_description(current_image_info)
extracted_content.append(image_text)
current_image_info = {
}
# 拼接所有内容
final_content = "\n\n".join(extracted_content)
return final_content
def _build_table_description(self, columns: list, rows: list) -> str:
"""
构建表格的文本描述
格式:
表格:
列名:列1 | 列2 | 列3
值1 | 值2 | 值3
值4 | 值5 | 值6
"""
if not columns and not rows:
return ""
table_lines = ["表格:"]
if columns:
table_lines.append("列名:" + " | ".join(columns))
for row in rows:
table_lines.append(" | ".join([str(cell) for cell in row]))
return "\n".join(table_lines)
def _build_image_description(self, image_info: dict) -> str:
"""
构建图片的文本描述
格式:
图片:配置界面 (images/config.jpg)
"""
alt = image_info.get('alt', '')
url = image_info.get('url', '')
if alt and url:
return f"图片:{alt} ({url})"
elif alt:
return f"图片:{alt}"
elif url:
return f"图片:({url})"
else:
return ""
五、流式响应的实现
5.1 SSE(Server-Sent Events)流式框架
优势:
- 单向推送(服务器到客户端)
- 自动重连
- 简单的文本协议
- 浏览器原生支持
格式:
data: {json}\n\n
实现: ChatRoutes.py 第234-289行
def generate_sse_response():
"""
SSE生成器函数
流程:
1. 调用ChatService.add_chat_content(生成器)
2. 逐步yield每个处理步骤的数据
3. 格式化为SSE格式
4. 错误处理
"""
try:
for step_data in chat_service.add_chat_content(session_id, content):
# 格式化为SSE格式: data: {json}\n\n
sse_data = f"data: {json.dumps(step_data, ensure_ascii=False)}\n\n"
yield sse_data
except Exception as e:
logger.error(f"SSE流式响应生成失败: {e}")
error_data = {
'step': 'error',
'status': 'error',
'message': f'处理失败: {str(e)}'
}
yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"
return Response(
generate_sse_response(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
}
)
5.2 流式数据结构
基础步骤事件格式
{
'step': int, # 步骤号(0-9)
'step_name': str, # 步骤名称
'status': str, # 'processing' | 'completed' | 'error'
'message': str # 状态消息
}
示例:
# Step 2 开始
{
'step': 2,
'step_name': '意图分析',
'status': 'processing',
'message': '正在分析用户意图...'
}
# Step 2 完成
{
'step': 2,
'step_name': '意图分析',
'status': 'completed',
'message': '意图分析完成,动作: retrieve_vector'
}
块级流式推送格式(Step 7-8)
对于答案生成和聊天回复,使用块级JSON格式:
文本块:
{
"t":"block_start", "id":"b1", "kind":"text"}
{
"t":"delta", "id":"b1", "text":"文本内容第一部分\n"}
{
"t":"delta", "id":"b1", "text":"文本内容第二部分\n"}
{
"t":"block_end", "id":"b1"}
表格块:
{
"t":"block_start", "id":"b2", "kind":"table", "columns":["列1","列2","列3"]}
{
"t":"delta", "id":"b2", "row":["值1","值2","值3"]}
{
"t":"delta", "id":"b2", "row":["值4","值5","值6"]}
{
"t":"block_end", "id":"b2"}
图片块:
{
"t":"block_start", "id":"b3", "kind":"image", "alt":"配置界面截图"}
{
"t":"block_update", "id":"b3", "url":"images/config.jpg", "width":960, "height":540}
{
"t":"block_end", "id":"b3"}
5.3 前端接收示例
const eventSource = new EventSource(`/api/chat/sessions/${
sessionId}/content`);
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
if (data.step === 8 && data.status === 'processing') {
// 块级JSON推送
const block = JSON.parse(data.message);
if (block.t === 'block_start') {
// 创建新块容器
createBlock(block.id, block.kind);
}
else if (block.t === 'delta') {
// 追加内容到现有块
if (block.text) {
appendText(block.id, block.text);
}
else if (block.row) {
appendTableRow(block.id, block.row);
}
}
else if (block.t === 'block_update') {
// 更新块属性(如图片URL)
updateBlock(block.id, block);
}
else if (block.t === 'block_end') {
// 块结束,标记完成
finalizeBlock(block.id);
}
}
else {
// 步骤进度更新
updateProgress(data.step, data.status, data.message);
}
};
eventSource.onerror = function(event) {
console.error('SSE连接错误', event);
eventSource.close();
};
六、完整流程示意图
用户提问: "TL-SE2109的端口隔离怎么配置"
│
↓
POST /api/chat/sessions/{id}/content
│
↓ (返回SSE流)
Step 0: 创建系统上下文
├─→ 检查chat_context是否有type='routes'记录
├─→ 如果没有,创建系统提示词
└─→ 无流式输出
Step 1: 保存用户消息到chat_content_history
├─→ INSERT INTO chat_content_history
└─→ SSE: {step: 1, status: 'completed', ...}
Step 1.1: 保存用户输入到chat_context (role: user)
├─→ INSERT INTO chat_context with type='chat'
└─→ 无流式输出
Step 2: 意图分析
├─→ SSE: {step: 2, status: 'processing', ...}
├─→ 调用 MeanDeepSeekService.get_mean()
│ └─→ POST https://api.deepseek.com/v1/chat/completions
│ ├─→ model: deepseek-chat
│ ├─→ messages: [系统提示词, 历史消息, 当前查询]
│ ├─→ temperature: 0.3
│ └─→ stream: true
├─→ 收集流式响应,返回JSON:
│ {
│ "intent": "kb_retrieval",
│ "confidence": 0.95,
│ "queries": ["TL-SE2109", "端口隔离"],
│ "rewrite": "TL-SE2109 端口隔离配置方法",
│ "plan": {
│ "action": "retrieve_vector",
│ "args": {"k": 6}
│ }
│ }
└─→ SSE: {step: 2, status: 'completed', ...}
Step 3: 向量检索
├─→ SSE: {step: 3, status: 'processing', ...}
├─→ 使用BGE-M3编码queries为1024维向量
│ └─→ model.encode(query, normalize_embeddings=True)
├─→ 在Milvus中执行COSINE搜索
│ ├─→ collection: major_bge_1024_collection
│ ├─→ metric_type: COSINE
│ ├─→ expr: 'user_id == "current_user"' (数据隔离)
│ └─→ limit: 6
├─→ 返回6个相似chunks with sources=['vector']
└─→ SSE: {step: 3, status: 'completed', message: '找到 6 个结果'}
Step 4: 全文检索(BM25)
├─→ SSE: {step: 4, status: 'processing', ...}
├─→ 在OpenSearch中搜索
│ ├─→ 索引: major_pdf, major_word, major_excel, major_ppt, major_txt
│ ├─→ 查询: "TL-SE2109 端口隔离配置方法"
│ ├─→ 分析器: ik_max_word (中文分词)
│ ├─→ must: {term: {user_id: "current_user"}} (数据隔离)
│ └─→ size: 12
├─→ 返回12个相关docs with sources=['bm25']
└─→ SSE: {step: 4, status: 'completed', message: '找到 12 个结果'}
Step 5: RRF结果合并
├─→ SSE: {step: 5, status: 'processing', ...}
├─→ 计算RRF分数: 1/(rank + 1 + 60)
│ ├─→ 向量结果: rank 1-6
│ └─→ BM25结果: rank 1-12
├─→ 按RRF分数排序
├─→ 返回前10个合并结果
└─→ SSE: {step: 5, status: 'completed', message: '返回前 10 个结果'}
Step 6: 元数据补齐
├─→ SSE: {step: 6, status: 'processing', ...}
├─→ 从files表获取file_type
│ └─→ SELECT id, file_type FROM files WHERE id IN (...)
├─→ 根据file_type确定content表
│ └─→ pdf → pdf_content, word → word_content等
├─→ 查询image_path, title_content, original_filename
│ └─→ SELECT ... FROM {table_name} WHERE content_id = ...
└─→ SSE: {step: 6, status: 'completed', ...}
Step 7: 答案生成
├─→ SSE: {step: 7, status: 'processing', message: '正在生成AI回复...'}
├─→ 调用 AnswerDeepSeekService.generate_answer()
│ └─→ POST https://api.deepseek.com/v1/chat/completions
│ ├─→ model: deepseek-chat
│ ├─→ messages: [历史消息, 答案生成提示词+检索结果]
│ ├─→ temperature: 0.3
│ └─→ stream: true
├─→ 收集块级JSON并逐块推送:
│ └─→ SSE: {step: 8, status: 'processing', message: '{"t":"block_start",...}'}
│ └─→ SSE: {step: 8, status: 'processing', message: '{"t":"delta",...}'}
│ └─→ SSE: {step: 8, status: 'processing', message: '{"t":"delta",...}'}
│ └─→ ...(持续推送)
│ └─→ SSE: {step: 8, status: 'processing', message: '{"t":"block_end",...}'}
└─→ SSE: {step: 7, status: 'completed', message: '答案生成完成'}
Step 8: 保存AI响应到chat_content_history
├─→ 格式化为标准JSON
│ {
│ 'session_id': '123',
│ 'user_id': 'assistant',
│ 'content': '生成的完整答案(块级JSON)',
│ 'created_at': '2025-01-01T12:00:00'
│ }
├─→ INSERT INTO chat_content_history
├─→ UPDATE chat_history SET chat_time = NOW()
└─→ 无流式输出
Step 9: 保存AI上下文到chat_context
├─→ 从块级JSON中提取纯文本内容
│ └─→ _extract_content_from_stream_json()
├─→ 格式化为上下文JSON
│ {
│ "role": "assistant",
│ "content": "提取的纯文本内容"
│ }
├─→ INSERT INTO chat_context with type='chat'
└─→ 无流式输出
最终结果:
├─→ 前端收到完整的答案渲染(包含文本、表格、图片等)
├─→ 用户可进行后续提问(基于完整的历史上下文)
└─→ 所有数据持久化到数据库(chat_content_history和chat_context)
七、关键文件总结
| 文件位置 | 功能 | 核心类/方法 |
|---|---|---|
app/routes/ChatRoutes.py |
API路由定义 | add_chat_content() - SSE流式入口 |
app/service/ChatService.py |
核心RAG逻辑(1415行) | add_chat_content() - 9步处理流程 |
app/service/models/MeanDeepSeekService.py |
意图识别 | get_mean() - 调用DeepSeek进行意图分析 |
app/service/models/ChatDeepSeekService.py |
多轮对话 | chat() - 非检索类问答 |
app/service/models/AnswerDeepSeekService.py |
答案生成 | generate_answer() - 基于检索结果生成答案 |
utils/bge/Vector_bge_1024_Manager.py |
向量管理 | BGE-M3模型,1024维向量,Milvus存储 |
utils/OpenSearchManager.py |
全文检索 | BM25搜索,中文分词,多索引支持 |
utils/MysqlManager.py |
MySQL连接 | SQLAlchemy ORM,会话管理 |
config/model.yaml |
LLM配置 | 3个DeepSeek模型配置 |
config/db.yaml |
数据库配置 | MySQL、Milvus、OpenSearch连接配置 |
install/db.sql |
数据库初始化 | 3个chat表结构定义 |
八、架构特点总结
1. 双检索融合(Hybrid Search)
- 向量检索:语义相似度(COSINE),基于BGE-M3模型
- 全文检索:BM25倒排索引,支持中文分词(ik_max_word)
- 合并方式:RRF(Reciprocal Rank Fusion)互惠排名融合
2. 多轮对话支持
- 完整上下文历史保存在
chat_context表 - 支持引用前置答案进行追问
- 灵活的意图识别和动作规划
3. 数据隔离
- 所有检索添加
user_id过滤条件 - 向量库(Milvus)和全文索引(OpenSearch)都支持多用户隔离
- 会话数据按
user_id严格隔离
4. 流式实时性
- SSE实时推送处理步骤(0-9步)
- 块级JSON支持前端逐块渲染
- 支持文本、表格、图片等多种内容类型
5. 灵活的提示词工程
- 三层提示词设计:
- 系统提示词(意图识别)
- 聊天输出格式提示词
- 答案生成提示词
- 动态调整
temperature参数控制生成特性:- 意图识别:0.3(更保守)
- 对话模型:0.7(更灵活)
- 答案生成:0.3(更准确)
- 支持自定义的输出格式和内容结构
6. 智能意图路由
- 自动识别三种意图:
chat:闲聊/解释kb_retrieval:知识库检索clarify:需要澄清
- 三种动作规划:
retrieve_vector:执行检索流程answer_chat:直接对话clarify:请求澄清
7. 完整的上下文管理
- 两个上下文存储:
chat_content_history:完整消息历史(用于展示)chat_context:LLM调用上下文(用于推理)
- 自动提取块级JSON中的纯文本用于上下文保存
- 支持长对话历史
8. 可扩展的输出格式
- 块级JSON格式支持:
- 文本块(
kind: text) - 表格块(
kind: table) - 图片块(
kind: image)
- 文本块(
- 易于扩展新的内容类型
- 前端可灵活渲染不同类型的内容
分析完成日期: 2025-11-12