16_LLM交互式调试:用Streamlit构建可视化工具

简介: 在大语言模型(LLM)的应用开发过程中,调试一直是一个复杂且具有挑战性的任务。传统的调试方法往往依赖于静态日志、断点调试和反复的命令行交互,这种方式在处理LLM这类黑盒模型时显得尤为低效。随着2025年LLM技术的普及和应用场景的多样化,开发人员迫切需要一种更加直观、高效的调试方式。

引言:LLM调试的新范式

在大语言模型(LLM)的应用开发过程中,调试一直是一个复杂且具有挑战性的任务。传统的调试方法往往依赖于静态日志、断点调试和反复的命令行交互,这种方式在处理LLM这类黑盒模型时显得尤为低效。随着2025年LLM技术的普及和应用场景的多样化,开发人员迫切需要一种更加直观、高效的调试方式。

交互式调试工具正是在这一背景下应运而生。它们通过可视化界面,让开发人员能够实时观察模型的输入输出、调整参数配置、对比不同提示的效果,并快速迭代优化方案。这种动态调试的独特反馈循环,极大地缩短了从问题发现到解决方案验证的时间,显著提升了LLM应用开发的效率和质量。

Streamlit作为一个专为数据科学和机器学习设计的Python库,凭借其简洁的API、实时更新能力和丰富的交互组件,成为构建LLM交互式调试工具的理想选择。2025年最新版本的Streamlit不仅在性能上有了显著提升,还新增了许多专为AI应用设计的功能,使其成为连接开发者与LLM的强大桥梁。

本文将全面介绍如何使用Streamlit构建LLM交互式调试工具,从基础概念到高级应用,从简单的提示工程到复杂的多模型比较,帮助读者掌握这一现代LLM开发的关键技能。通过本文的学习,你将能够:

  • 理解LLM交互式调试的核心概念和优势
  • 掌握Streamlit的基础使用方法和高级特性
  • 构建多种类型的LLM调试界面
  • 实现提示工程的可视化优化
  • 设计多模型对比和性能监控工具
  • 开发复杂的交互式调试工作流

让我们一起探索LLM交互式调试的世界,开启高效开发的新篇章!

第1章:LLM调试的挑战与交互式工具的价值

1.1 LLM应用开发中的调试痛点

大语言模型的独特性质给传统调试方法带来了前所未有的挑战。与传统软件不同,LLM的输出具有高度的不确定性和上下文依赖性,这使得问题定位和解决方案验证变得异常困难。

1. 黑盒模型的不透明性

LLM本质上是复杂的神经网络系统,包含数十亿甚至数千亿个参数。虽然我们可以获取模型的输入和输出,但模型内部的决策过程对开发者来说仍然是一个黑盒。这种不透明性使得调试过程更多地依赖于试错而非系统性分析。

2. 提示工程的复杂性

提示(Prompt)是与LLM交互的主要方式,一个好的提示可以显著提升模型的输出质量。然而,提示工程是一项需要经验和创造力的工作,需要考虑提示的结构、指令的明确性、上下文的完整性等多个因素。在没有可视化工具的情况下,调整和优化提示往往效率低下。

3. 参数调优的盲目性

LLM提供了多种控制输出的参数,如temperature、top_p、top_k等。这些参数的微小变化可能导致输出结果的显著差异。传统的参数调优通常采用手动修改代码、重新运行程序的方式,无法实时观察参数变化对输出的影响。

4. 多模型对比的复杂性

在实际应用中,我们常常需要在多个模型之间进行选择和比较。不同模型在同一任务上的表现可能存在较大差异,需要综合考虑输出质量、响应速度、成本等多个因素。没有可视化工具的支持,这种比较工作将变得非常繁琐。

5. 上下文管理的挑战

LLM的许多应用场景需要处理多轮对话,管理对话历史和上下文窗口。在这种情况下,需要跟踪对话状态、控制上下文长度,并确保模型能够正确理解对话的连续性。这些问题在传统调试环境中很难有效处理。

1.2 交互式调试工具的核心价值

面对这些挑战,交互式调试工具提供了全新的解决方案,为LLM应用开发带来了以下核心价值:

1. 实时反馈与快速迭代

交互式调试工具最大的优势在于提供了即时反馈机制。开发者可以在界面上直接修改提示、调整参数,并立即看到模型的响应结果。这种实时反馈大大缩短了迭代周期,使得优化过程更加高效。

2. 可视化参数影响

通过交互式界面,开发者可以直观地观察不同参数设置对输出结果的影响。例如,可以通过滑动条实时调整temperature参数,观察输出随机性的变化;或者通过下拉菜单选择不同的解码策略,比较生成质量的差异。

3. 系统化提示工程

交互式工具使得提示工程更加系统化和科学化。开发者可以保存和比较不同的提示模板,分析哪些结构和指令能够产生更好的效果。这种结构化的方法有助于积累最佳实践,提升提示设计的质量。

4. 多维度性能监控

现代LLM调试工具不仅关注输出质量,还提供了性能指标的实时监控。开发者可以跟踪模型的响应时间、token消耗、成本估算等关键指标,帮助在质量和效率之间找到最佳平衡点。

5. 协作与知识共享

交互式调试工具通常支持会话保存、配置导出等功能,便于团队成员之间的协作和知识共享。这对于大型团队开发LLM应用尤为重要,可以避免重复工作,加速整体开发进度。

1.3 2025年LLM调试工具的发展趋势

随着LLM技术的快速发展,交互式调试工具也在不断演进。2025年,这些工具呈现出以下几个明显的发展趋势:

1. 集成化开发环境

现代LLM调试工具正在向全功能的集成开发环境(IDE)方向发展。它们不仅提供调试功能,还集成了模型管理、版本控制、部署发布等完整的开发流程。这种一体化的解决方案大大简化了LLM应用的开发和管理过程。

2. 智能化辅助功能

借助AI自身的能力,调试工具正在引入智能化辅助功能。例如,自动提示优化建议、参数推荐、错误模式识别等。这些功能能够帮助开发者更快地发现和解决问题,提升整体开发效率。

3. 多模态调试支持

随着多模态LLM的普及,调试工具也在扩展对图像、音频等非文本输入的支持。现代调试环境允许开发者在同一界面上处理和调试不同类型的输入,为复杂的多模态应用开发提供了便利。

4. 分布式与云原生架构

为了支持大规模模型的调试需求,现代工具正在采用分布式和云原生架构。这使得工具能够高效处理大模型的推理请求,并支持多用户同时协作调试。

5. 安全与隐私增强

随着LLM应用在敏感领域的广泛应用,调试工具也在加强安全和隐私保护功能。2025年的先进工具提供了端到端加密、数据隔离、访问控制等安全特性,确保调试过程中的数据安全。

在这一背景下,Streamlit凭借其简洁的API、强大的扩展性和活跃的社区支持,成为构建LLM交互式调试工具的首选框架之一。接下来,我们将详细介绍Streamlit的基础使用方法,为构建调试工具打下坚实基础。

第2章:Streamlit基础入门

2.1 Streamlit简介与核心特性

Streamlit是一个开源的Python库,专为快速构建数据应用和机器学习可视化界面而设计。自2019年发布以来,Streamlit以其简洁的API和强大的功能迅速赢得了数据科学家和机器学习工程师的青睐。2025年最新版本的Streamlit在性能、功能和易用性方面都有了显著提升,成为构建交互式应用的理想选择。

核心特性:

  1. 纯Python开发
    Streamlit允许开发者使用纯Python代码构建完整的Web应用,无需编写HTML、CSS或JavaScript。这大大降低了Web应用开发的门槛,使数据科学家能够专注于核心逻辑而非前端技术。

  2. 声明式编程模型
    Streamlit采用声明式的编程范式,开发者只需描述想要展示的内容,而不需要关心底层的渲染细节。这种方式使得代码更加简洁、易于理解和维护。

  3. 实时更新
    Streamlit的一个关键特性是支持热重载(hot reload)。当开发者修改代码并保存时,应用会自动刷新并显示最新的变化,提供即时反馈,显著提升开发效率。

  4. 丰富的交互组件
    Streamlit提供了丰富的内置组件,包括文本输入、滑块、下拉菜单、按钮等,使开发者能够轻松构建交互式界面。2025年版本还新增了专门为AI应用设计的高级组件。

  5. 强大的可视化支持
    Streamlit与主流数据可视化库(如Matplotlib、Plotly、Altair等)无缝集成,允许开发者轻松创建各种类型的图表和可视化效果。

  6. 灵活的布局系统
    最新版本的Streamlit提供了更加灵活的布局选项,包括多列布局、标签页、侧边栏等,使开发者能够创建结构复杂的应用界面。

  7. 部署简单
    Streamlit应用可以通过多种方式轻松部署,包括本地服务器、云服务(如Streamlit Cloud、AWS、GCP等)以及容器化部署。这使得分享和发布应用变得异常简单。

2.2 环境搭建与基本配置

在开始使用Streamlit构建LLM调试工具之前,我们需要先搭建开发环境并进行基本配置。以下是详细的步骤指南:

1. 安装Streamlit

Streamlit可以通过pip轻松安装。建议在虚拟环境中安装,以避免依赖冲突:

# 创建并激活虚拟环境
python -m venv streamlit_env
source streamlit_env/bin/activate  # Windows用户使用 streamlit_env\Scripts\activate

# 安装Streamlit 2025最新版本
pip install streamlit --upgrade

安装完成后,可以通过以下命令验证安装是否成功:

streamlit --version

2. 安装必要的依赖

为了构建LLM调试工具,我们还需要安装一些额外的依赖库,包括:

# 安装Hugging Face Transformers用于加载和使用LLM
pip install transformers

# 安装PyTorch或TensorFlow作为深度学习后端
pip install torch  # 或 pip install tensorflow

# 安装其他可能需要的库
pip install pandas numpy matplotlib plotly

3. 基本配置

Streamlit提供了多种配置选项,可以通过配置文件或命令行参数进行设置。主要的配置选项包括:

  • 服务器配置:端口号、主机地址、最大文件大小等
  • 应用外观:主题、布局、菜单选项等
  • 缓存设置:缓存大小、过期时间等

可以通过创建.streamlit/config.toml文件来自定义配置:

[server]
port = 8501
enableCORS = true

[theme]
primaryColor = "#F39C12"
backgroundColor = "#FFFFFF"
secondaryBackgroundColor = "#F0F2F6"
textColor = "#333333"
font = "sans serif"

[cache]
enable = true
size = 250

4. 创建第一个Streamlit应用

现在,让我们创建一个简单的Streamlit应用来验证环境配置是否正确:

# app.py
import streamlit as st

# 设置页面标题和布局
st.set_page_config(page_title="LLM调试工具", layout="wide")

# 添加标题和介绍
st.title("LLM交互式调试工具")
st.write("欢迎使用2025年最新版的LLM交互式调试工具!")

# 添加简单的交互组件
name = st.text_input("请输入您的名字:")
if name:
    st.write(f"您好,{name}!让我们一起探索LLM的奇妙世界。")

# 添加一个滑块组件
temperature = st.slider("调整Temperature参数", min_value=0.0, max_value=2.0, value=1.0, step=0.1)
st.write(f"当前Temperature设置为:{temperature}")

保存文件后,可以通过以下命令运行应用:

streamlit run app.py

运行后,Streamlit会自动打开默认浏览器,并显示我们创建的应用界面。

2.3 Streamlit核心组件详解

Streamlit提供了丰富的内置组件,这些组件是构建交互式界面的基础。下面我们详细介绍一些在LLM调试工具中最常用的核心组件:

1. 文本输入组件

文本输入组件用于获取用户输入的文本信息,在LLM调试中常用于输入提示文本:

# 基本文本输入
prompt = st.text_input("输入您的提示:", "请解释量子计算的基本原理")

# 多行文本输入,适合较长的提示或上下文
long_prompt = st.text_area(
    "输入详细提示:",
    "请详细解释量子计算的基本原理,包括量子比特、叠加态和纠缠等核心概念。",
    height=200
)

2. 数值输入组件

数值输入组件用于调整LLM的各种参数,如temperature、max_tokens等:

# 滑块组件,适合需要范围选择的参数
temperature = st.slider(
    "Temperature",
    min_value=0.0,
    max_value=2.0,
    value=1.0,
    step=0.1,
    help="控制生成文本的随机性,值越高越随机,值越低越确定"
)

# 数值输入框,适合需要精确输入的参数
max_tokens = st.number_input(
    "最大Token数",
    min_value=10,
    max_value=4000,
    value=500,
    step=50,
    help="限制生成文本的最大长度"
)

# 选择框,适合从预定义选项中选择
model_name = st.selectbox(
    "选择模型",
    ["gpt-3.5-turbo", "gpt-4", "llama-3", "gemma-7b"],
    help="选择要使用的LLM模型"
)

3. 布尔值和按钮组件

这些组件用于控制应用的行为,如触发模型推理、清除结果等:

# 复选框,用于启用或禁用某些功能
use_cache = st.checkbox("启用缓存", value=True)

# 单选按钮,用于从互斥选项中选择
engine = st.radio(
    "选择推理引擎",
    ["vLLM", "TensorRT", "ONNX Runtime"],
    horizontal=True
)

# 按钮,用于触发操作
if st.button("生成回复"):
    # 在这里调用LLM进行推理
    st.write("正在生成回复...")

# 下载按钮,用于下载结果
if st.download_button(
    label="下载结果",
    data="生成的文本内容",
    file_name="llm_output.txt",
    mime="text/plain"
):
    st.success("下载成功!")

4. 容器和布局组件

这些组件用于组织应用的结构,创建清晰的界面布局:

# 侧边栏,常用于放置参数设置
with st.sidebar:
    st.header("模型参数设置")
    temperature = st.slider("Temperature", 0.0, 2.0, 1.0)
    max_tokens = st.number_input("最大Token数", 10, 4000, 500)

# 多列布局,用于并排显示内容
col1, col2 = st.columns(2)

with col1:
    st.header("输入区域")
    prompt = st.text_area("输入提示")
    if st.button("生成"):
        # 生成逻辑
        pass

with col2:
    st.header("输出结果")
    st.write("生成的内容将显示在这里")

# 标签页,用于分类展示不同功能
tab1, tab2, tab3 = st.tabs(["提示工程", "参数调优", "模型比较"])

with tab1:
    st.header("提示工程优化")
    prompt_template = st.text_area("提示模板")
    # 提示工程相关功能...

with tab2:
    st.header("模型参数调优")
    # 参数调优相关功能...

with tab3:
    st.header("多模型比较")
    # 模型比较相关功能...

# 扩展容器,用于条件显示内容
expander = st.expander("高级设置")
with expander:
    st.write("这里是高级设置选项...")
    custom_model_path = st.text_input("自定义模型路径")
    api_key = st.text_input("API密钥", type="password")

### 2.4 Streamlit高级功能与最佳实践

Streamlit提供了许多高级功能,可以帮助开发者构建更加复杂和高效的应用。下面介绍一些在LLM调试工具开发中特别有用的高级功能和最佳实践:

**1. 缓存机制**

缓存是Streamlit中一个非常重要的功能,对于LLM应用尤为关键,可以显著提升应用的响应速度:

```python
# 使用缓存装饰器缓存LLM推理结果
@st.cache_data(max_entries=50, ttl=3600)
def generate_response(prompt, model_name, temperature, max_tokens):
    # 这里是LLM推理逻辑
    import time
    time.sleep(1)  # 模拟推理延迟
    return f"这是{model_name}对提示的响应:{prompt}..."

# 使用缓存装饰器缓存模型加载
@st.cache_resource
# 注意:在实际应用中,模型加载通常放在会话状态中管理
# 这里只是演示缓存装饰器的用法
def load_model(model_name):
    # 这里是模型加载逻辑
    import time
    time.sleep(3)  # 模拟模型加载延迟
    return f"模型 {model_name} 已加载"

2. 会话状态管理

会话状态(Session State)是Streamlit 1.0+版本引入的重要功能,用于在用户与应用交互的过程中保存状态信息:

# 初始化会话状态
if "messages" not in st.session_state:
    st.session_state.messages = []

if "current_model" not in st.session_state:
    st.session_state.current_model = "gpt-3.5-turbo"

if "conversation_history" not in st.session_state:
    st.session_state.conversation_history = []

# 更新会话状态
st.session_state.current_model = model_name

# 添加消息到会话状态
if st.button("发送"):
    st.session_state.messages.append({
   
        "role": "user",
        "content": prompt
    })
    # 生成回复并添加到会话状态
    response = generate_response(prompt, model_name, temperature, max_tokens)
    st.session_state.messages.append({
   
        "role": "assistant",
        "content": response
    })

3. 自定义组件

对于复杂的交互需求,Streamlit支持使用React开发自定义组件:

# 安装streamlit-component-lib
# pip install streamlit-component-lib

# 使用自定义组件
from streamlit_component_lib import token_viewer

tokens = token_viewer("这是一段需要分词查看的文本")
st.write(f"分词结果: {tokens}")

4. 多进程与异步处理

对于长时间运行的任务(如LLM推理),可以使用多进程或异步处理来避免阻塞UI:

import streamlit as st
import asyncio
import time

# 异步函数示例
async def async_generate(prompt, model):
    # 模拟异步LLM调用
    await asyncio.sleep(2)
    return f"异步生成结果: {prompt}"

# 在Streamlit中使用异步
async def run_async_tasks():
    result = await async_generate(st.session_state.prompt, st.session_state.model)
    st.session_state.result = result
    st.session_state.is_generating = False

if st.button("异步生成"):
    st.session_state.is_generating = True
    st.session_state.result = "生成中..."
    # 运行异步任务
    asyncio.run(run_async_tasks())

if "result" in st.session_state:
    st.write(st.session_state.result)

5. 性能优化最佳实践

  • 懒加载模型:只在需要时加载模型,避免应用启动时的长时间等待
  • 分批处理:对于大型文本或数据集,采用分批处理的方式
  • 合理使用缓存:缓存模型加载和常用推理结果,但注意缓存失效策略
  • 优化UI响应:使用占位符和进度条提供更好的用户体验
  • 避免重复计算:将复杂计算结果存储在会话状态中

2.5 小结与最佳实践总结

Streamlit作为一个专为数据科学和机器学习设计的Python库,为构建LLM交互式调试工具提供了强大的支持。通过本章的学习,我们掌握了Streamlit的基础使用方法、核心组件和高级功能,为构建功能完善的调试工具打下了坚实基础。

Streamlit应用开发的最佳实践总结:

  1. 明确应用结构:在开始编码前,先规划好应用的结构和功能模块
  2. 关注用户体验:提供清晰的指引、进度反馈和错误处理
  3. 优化性能:合理使用缓存、异步处理和懒加载等技术
  4. 保持代码简洁:遵循Streamlit的声明式编程模型,保持代码简洁易读
  5. 持续测试:利用Streamlit的热重载功能,实时测试和调整应用

第3章 LLM调试基础知识

3.1 LLM调试的挑战与重要性

大型语言模型(LLM)的调试与传统软件调试有显著不同,面临着独特的挑战。这些挑战源于LLM的特性:参数规模庞大、推理过程复杂、行为难以预测。在实际应用中,开发者经常遇到模型响应不准确、幻觉生成、上下文理解错误等问题,而这些问题的排查和解决往往比传统编程更具挑战性。

LLM调试的主要挑战:

  1. 黑盒性质:LLM的推理过程对用户来说是黑盒,难以直接观察中间状态
  2. 参数规模:现代LLM通常包含数十亿甚至数万亿参数,无法直接检查每个参数的影响
  3. 上下文敏感性:模型的输出高度依赖于输入提示和上下文
  4. 非确定性:由于随机采样等机制,即使输入相同,输出也可能不同
  5. 幻觉生成:模型可能生成看似合理但实际上不准确或不存在的信息

调试的重要性:

有效的调试不仅可以提高LLM应用的准确性和可靠性,还能帮助开发者更好地理解模型的行为模式,从而优化提示设计和系统架构。在生产环境中,良好的调试工具和流程对于确保LLM应用的质量和用户体验至关重要。

3.2 LLM常见问题类型与诊断方法

在LLM应用开发过程中,开发者经常遇到以下几类问题:

1. 提示理解问题

症状:模型似乎误解了提示的意图,或者对关键指令无响应

诊断方法

  • 简化提示,测试模型是否能正确理解基本指令
  • 调整提示的指令顺序,将最重要的指令放在前面
  • 添加明确的格式指示(如"请以JSON格式输出")
  • 使用角色设定(如"你是一个专业的翻译助手")

2. 生成质量问题

症状:生成内容质量低、逻辑不通、重复冗余

诊断方法

  • 调整生成参数(如temperature、top_p等)
  • 提供详细的示例(few-shot learning)
  • 增加输出格式的约束
  • 检查模型的知识截止日期,考虑补充最新信息

3. 幻觉问题

症状:模型生成不准确或虚构的信息

诊断方法

  • 添加事实核查的指令
  • 限制模型在不确定时明确表示不知道
  • 增加外部知识检索机制
  • 使用更专注于事实准确性的模型变体

4. 上下文理解问题

症状:模型无法正确处理长对话历史或多轮交互

诊断方法

  • 优化上下文窗口的使用,确保关键信息在有效上下文中
  • 实施对话摘要机制,保留重要信息
  • 检查token限制,避免截断重要内容
  • 分段处理长文本输入

5. 安全性与合规性问题

症状:模型生成敏感内容或不当回应

诊断方法

  • 添加安全指南和伦理约束
  • 实施输入过滤和输出审核
  • 使用专门的安全微调模型
  • 建立明确的内容策略和边界

3.3 调试指标与评估方法

评估LLM输出质量和调试效果需要考虑多个维度。以下是常用的评估指标和方法:

1. 准确性指标

  • 事实准确性:输出内容与已知事实的符合程度
  • 任务完成度:模型是否完成了给定任务的所有要求
  • 一致性:在相似场景下,模型输出的一致性程度

2. 质量指标

  • 相关性:输出与输入提示的相关程度
  • 连贯性:输出内容在逻辑和语法上的连贯程度
  • 创造性:在需要创新的任务中,输出的新颖性和创新性

3. 效率指标

  • 响应时间:从输入提示到生成回复的时间
  • 资源消耗:模型运行所需的计算资源
  • token使用效率:在完成任务的前提下使用的token数量

4. 主观评估方法

  • 人类评估:由领域专家对模型输出进行打分和评价
  • A/B测试:比较不同提示或参数配置的效果
  • 用户反馈:收集终端用户对模型输出的反馈

5. 自动化评估工具

  • BLEU/ROUGE:评估生成文本与参考文本的相似度
  • 困惑度(Perplexity):评估模型对文本序列的预测能力
  • 自定义评分脚本:针对特定任务开发的自动化评估脚本

3.4 调试策略与最佳实践

有效的LLM调试需要系统性的策略和方法。以下是一些经过实践检验的调试策略:

1. 增量调试法

  • 步骤分解:将复杂任务分解为简单的子任务,逐一调试
  • 最小化问题:创建最小可重现的问题示例,排除无关因素
  • 对比测试:对比不同提示、参数或模型的输出差异

2. 提示工程调试

  • 指令优化:调整指令的表述方式、顺序和强调重点
  • 上下文管理:优化对话历史的组织和重要信息的呈现
  • 示例设计:精心设计few-shot示例,确保覆盖目标场景

3. 参数调优

  • 系统扫描:系统性地测试不同参数组合的效果
  • 参数敏感度分析:识别对输出影响最大的关键参数
  • 参数组合优化:找到特定任务的最佳参数组合

4. 模型理解与适应

  • 模型边界测试:了解模型的能力边界和局限性
  • 模型特性利用:基于模型特性设计更有效的提示策略
  • 多模型协作:根据不同任务选择最适合的模型,或结合多个模型的优势

5. 持续改进策略

  • 错误模式分析:识别模型输出中的常见错误模式
  • 反馈循环建立:建立用户反馈收集和模型调整的闭环
  • 知识库建设:积累有效的提示模板、参数配置和调试经验

3.5 小结

本章介绍了LLM调试的基础知识,包括调试的挑战与重要性、常见问题类型与诊断方法、调试指标与评估方法以及调试策略与最佳实践。这些基础知识将为我们后续构建交互式调试工具提供理论支持。

第4章 构建基础提示工程调试工具

4.1 工具设计思路与架构

在本节中,我们将构建一个基础的提示工程调试工具,帮助开发者可视化测试不同提示和参数对模型输出的影响。这个工具将包含以下核心功能:

  1. 提示编辑与预览:提供友好的界面编辑提示文本
  2. 参数调整:允许用户调整常见的生成参数(temperature、top_p等)
  3. 模型选择:支持选择不同的LLM模型进行测试
  4. 结果可视化:清晰展示模型输出结果
  5. 对比测试:支持比较不同提示或参数配置的效果

架构设计:

  • 前端:使用Streamlit构建交互式界面
  • 后端:集成LLM API或本地模型推理
  • 数据流:用户输入 → 参数处理 → 模型调用 → 结果展示

工具的整体架构和数据流向可以用以下ASCII图表表示:

用户界面
  │
  ├── 提示输入区
  │   │
  │   ├── 文本编辑器
  │   └── 提示模板库
  │
  ├── 参数控制面板
  │   │
  │   ├── 温度(temperature)
  │   ├── 最大token数(max_tokens)
  │   └── 其他参数(top_p, frequency_penalty等)
  │
  ├── 模型选择器
  │
  └── 结果展示区
      │
      ├── 模型输出
      ├── token分析
      └── 性能指标
          ↓
模型接口
  │
  ├── API集成层
  │
  └── 本地模型接口

4.2 基础工具实现代码

下面是一个基于Streamlit的基础提示工程调试工具的完整实现代码:

import streamlit as st
import openai
import time
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Dict, Any, Optional

# 设置页面配置
st.set_page_config(
    page_title="LLM提示工程调试工具",
    page_icon="🔍",
    layout="wide"
)

# 页面标题
st.title("🔍 LLM提示工程调试工具")

# 侧边栏 - 模型配置
with st.sidebar:
    st.header("模型配置")

    # 模型选择
    model_name = st.selectbox(
        "选择模型",
        options=["gpt-3.5-turbo", "gpt-4", "text-davinci-003", "本地模型"],
        index=0
    )

    # API密钥输入
    if model_name != "本地模型":
        api_key = st.text_input("API密钥", type="password")

    # 分隔线
    st.divider()

    # 生成参数配置
    st.header("生成参数")
    temperature = st.slider("温度 (Temperature)", min_value=0.0, max_value=2.0, value=0.7, step=0.1,
                           help="控制输出的随机性,较低的值使输出更加确定,较高的值增加创造性")
    max_tokens = st.slider("最大Token数", min_value=50, max_value=2000, value=500, step=50,
                          help="限制生成的最大token数量")
    top_p = st.slider("Top P", min_value=0.0, max_value=1.0, value=1.0, step=0.1,
                     help="控制采样的多样性,较低的值会使生成更加保守")
    frequency_penalty = st.slider("频率惩罚", min_value=-2.0, max_value=2.0, value=0.0, step=0.1,
                                 help="降低重复使用相同词语的可能性")
    presence_penalty = st.slider("存在惩罚", min_value=-2.0, max_value=2.0, value=0.0, step=0.1,
                                help="增加使用新主题的可能性")

# 主内容区
col1, col2 = st.columns(2)

with col1:
    st.header("📝 提示输入")

    # 提示模板选择器
    template_options = [
        "自定义提示",
        "分类任务",
        "问答任务",
        "摘要任务",
        "翻译任务"
    ]
    template_choice = st.selectbox("选择提示模板", template_options)

    # 根据选择加载预设模板
    if template_choice == "自定义提示":
        prompt = st.text_area("提示内容", height=300, placeholder="请输入你的提示...")
    elif template_choice == "分类任务":
        prompt = st.text_area("提示内容", height=300, 
                            value="请将以下文本分类为正面、负面或中性:\n\n文本:{}\n\n分类结果:")
    elif template_choice == "问答任务":
        prompt = st.text_area("提示内容", height=300, 
                            value="请回答以下问题。如果信息不足,请明确表示不知道。\n\n问题:{}\n\n回答:")
    elif template_choice == "摘要任务":
        prompt = st.text_area("提示内容", height=300, 
                            value="请为以下文本生成一个简洁的摘要,长度不超过50个词:\n\n文本:{}\n\n摘要:")
    elif template_choice == "翻译任务":
        prompt = st.text_area("提示内容", height=300, 
                            value="请将以下文本从中文翻译成英文:\n\n文本:{}\n\n翻译结果:")

    # 输入变量(如果模板中有{}")
    if "{}" in prompt:
        user_input = st.text_input("输入变量", placeholder="请输入要插入到模板中的内容...")
        if user_input:
            final_prompt = prompt.format(user_input)
        else:
            final_prompt = prompt
    else:
        final_prompt = prompt
        st.info("提示:您可以在提示中使用{}作为占位符,然后在下方输入框中提供具体内容")

    # 显示最终提示
    with st.expander("查看最终提示"):
        st.code(final_prompt, language="text")

    # 提交按钮
    submit_button = st.button("🚀 生成响应", use_container_width=True)

with col2:
    st.header("📊 结果展示")

    # 初始化会话状态
    if "response" not in st.session_state:
        st.session_state.response = ""
    if "response_time" not in st.session_state:
        st.session_state.response_time = 0
    if "token_usage" not in st.session_state:
        st.session_state.token_usage = {
   }

    # 调用模型并显示结果
    if submit_button:
        if not final_prompt.strip():
            st.error("请输入提示内容")
        elif model_name != "本地模型" and not api_key:
            st.error("请输入API密钥")
        else:
            # 显示加载状态
            with st.spinner("正在生成响应..."):
                start_time = time.time()

                try:
                    # 模拟模型调用过程
                    # 在实际应用中,这里应该是真实的API调用
                    # 这里使用模拟是为了演示目的
                    time.sleep(2)  # 模拟API延迟

                    # 模拟不同模型的响应
                    if model_name == "gpt-3.5-turbo":
                        response_content = f"这是{model_name}对提示的响应。提示内容为:{final_prompt[:50]}..."
                    elif model_name == "gpt-4":
                        response_content = f"这是{model_name}对提示的详细响应。基于您的输入,我分析如下:\n\n1. 您的请求主要涉及:{final_prompt[:30]}...\n\n2. 相关考虑因素包括:\n   - 上下文理解\n   - 参数影响\n   - 输出质量优化\n\n3. 建议采取的方法:\n   - 进一步明确提示意图\n   - 根据需要调整生成参数\n   - 考虑使用更专业的指令格式"
                    else:
                        response_content = f"这是{model_name}对提示的响应。基于您提供的参数设置,我生成了以下内容。"

                    # 计算响应时间
                    st.session_state.response_time = round(time.time() - start_time, 2)

                    # 模拟token使用情况
                    st.session_state.token_usage = {
   
                        "prompt_tokens": len(final_prompt.split()),
                        "completion_tokens": len(response_content.split()),
                        "total_tokens": len(final_prompt.split()) + len(response_content.split())
                    }

                    # 存储响应
                    st.session_state.response = response_content

                except Exception as e:
                    st.error(f"生成响应时出错: {str(e)}")

    # 显示响应结果
    if st.session_state.response:
        # 响应内容
        st.subheader("响应内容")
        st.markdown(st.session_state.response)

        # 性能指标
        st.subheader("性能指标")
        metrics_col1, metrics_col2 = st.columns(2)
        with metrics_col1:
            st.info(f"响应时间: **{st.session_state.response_time}** 秒")
        with metrics_col2:
            st.info(f"总Token数: **{st.session_state.token_usage.get('total_tokens', 0)}**")

        # 详细token使用情况
        with st.expander("Token使用详情"):
            st.json(st.session_state.token_usage)

# 底部信息
st.divider()
st.markdown("提示:这是一个演示工具。在实际应用中,请替换为真实的API调用或本地模型推理。")

4.3 功能解析与使用指南

让我们详细解析上面实现的基础提示工程调试工具的主要功能和使用方法:

1. 界面结构

工具采用双列布局:

  • 左侧:提示输入区域,包含提示编辑器、模板选择和输入变量
  • 右侧:结果展示区域,显示模型输出和性能指标
  • 侧边栏:模型配置和生成参数控制

2. 核心功能

模型配置

用户可以在侧边栏选择不同的模型,输入API密钥,并调整多种生成参数:

  • 温度(Temperature):控制输出的随机性,值越低越确定,值越高越有创造性
  • 最大Token数:限制生成内容的长度
  • Top P:控制采样的多样性,较低的值使生成更加保守
  • 频率惩罚:降低重复使用相同词语的可能性
  • 存在惩罚:增加使用新主题的可能性

提示编辑与模板

工具提供了多种预设的提示模板,适用于不同类型的任务:

  • 自定义提示:完全自由的提示编辑
  • 分类任务:用于文本分类任务的模板
  • 问答任务:用于问答任务的模板
  • 摘要任务:用于文本摘要任务的模板
  • 翻译任务:用于文本翻译任务的模板

此外,还支持使用占位符{}在模板中插入变量,实现更灵活的提示构建。

结果展示

生成结果区域显示模型的响应内容以及相关的性能指标:

  • 响应内容:模型的输出文本
  • 响应时间:从提交请求到获取结果的时间
  • Token使用情况:输入提示、生成内容和总使用的token数量

3. 使用流程

  1. 在侧边栏选择模型并输入API密钥
  2. 调整生成参数(或使用默认值)
  3. 在提示输入区选择提示模板或输入自定义提示
  4. 如果使用模板,输入需要插入的变量内容
  5. 点击"生成响应"按钮
  6. 在结果展示区查看模型输出和性能指标

4.4 代码优化与扩展思路

上述实现提供了一个基础的提示工程调试工具,但还有许多优化和扩展的空间。以下是一些优化思路:

1. 实际API集成

将模拟的API调用替换为实际的模型调用,例如使用OpenAI的API:

# 使用OpenAI API的实际实现
def generate_with_openai(prompt, model, temperature, max_tokens, top_p, freq_penalty, pres_penalty, api_key):
    openai.api_key = api_key

    if model in ["gpt-3.5-turbo", "gpt-4"]:
        response = openai.ChatCompletion.create(
            model=model,
            messages=[{
   "role": "user", "content": prompt}],
            temperature=temperature,
            max_tokens=max_tokens,
            top_p=top_p,
            frequency_penalty=freq_penalty,
            presence_penalty=pres_penalty
        )
        return response['choices'][0]['message']['content'], response['usage']
    else:  # text-davinci-003
        response = openai.Completion.create(
            model=model,
            prompt=prompt,
            temperature=temperature,
            max_tokens=max_tokens,
            top_p=top_p,
            frequency_penalty=freq_penalty,
            presence_penalty=pres_penalty
        )
        return response['choices'][0]['text'], response['usage']

2. 本地模型支持

添加对本地运行模型的支持,例如使用Hugging Face Transformers:

# 本地模型加载与推理
@st.cache_resource
def load_local_model(model_path):
    from transformers import AutoModelForCausalLM, AutoTokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    model = AutoModelForCausalLM.from_pretrained(model_path, device_map="auto")
    return model, tokenizer

def generate_with_local_model(prompt, model, tokenizer, temperature, max_tokens):
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    outputs = model.generate(
        **inputs,
        max_new_tokens=max_tokens,
        temperature=temperature,
        do_sample=temperature > 0
    )
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    # 提取生成的部分(去掉提示)
    generated_part = response[len(prompt):].strip()
    return generated_part

3. 会话历史管理

实现多轮对话支持,保存和管理会话历史:

# 初始化会话历史
if "messages" not in st.session_state:
    st.session_state.messages = []

# 添加用户消息
if submit_button:
    st.session_state.messages.append({
   "role": "user", "content": final_prompt})
    # 生成回复
    # ...
    # 添加助手回复
    st.session_state.messages.append({
   "role": "assistant", "content": response_content})

# 显示会话历史
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])

4. 提示库管理

添加保存、加载和管理提示模板的功能:

# 保存提示模板
if st.button("保存为模板"):
    template_name = st.text_input("模板名称", key="save_template_name")
    if template_name:
        if "saved_templates" not in st.session_state:
            st.session_state.saved_templates = {
   }
        st.session_state.saved_templates[template_name] = prompt
        st.success(f"模板 '{template_name}' 已保存")

# 加载保存的模板
if "saved_templates" in st.session_state and st.session_state.saved_templates:
    template_to_load = st.selectbox("加载保存的模板", list(st.session_state.saved_templates.keys()), key="load_template")
    if st.button("加载模板"):
        prompt = st.session_state.saved_templates[template_to_load]

5. 更高级的可视化

添加token分析、困惑度曲线等高级可视化功能:

# Token分布可视化
def visualize_token_distribution(text):
    words = text.split()
    word_lengths = [len(word) for word in words[:50]]  # 取前50个词

    fig, ax = plt.subplots(figsize=(10, 4))
    ax.hist(word_lengths, bins=20)
    ax.set_xlabel('词长度')
    ax.set_ylabel('频率')
    ax.set_title('Token长度分布')

    st.pyplot(fig)

# 在结果展示区添加可视化
if st.session_state.response:
    with st.expander("Token分析"):
        visualize_token_distribution(st.session_state.response)

4.5 小结

本章介绍了如何使用Streamlit构建一个基础的提示工程调试工具,包括工具设计思路、完整实现代码、功能解析和使用指南,以及代码优化与扩展思路。这个基础工具提供了提示编辑、参数调整、模型选择和结果展示等核心功能,可以帮助开发者更高效地进行提示工程工作。

第5章 高级LLM调试功能实现

5.1 参数敏感性分析工具

参数敏感性分析是LLM调试中的重要环节,它可以帮助开发者理解不同参数对模型输出的影响程度。在本节中,我们将实现一个参数敏感性分析工具,允许用户系统性地测试不同参数组合对模型输出的影响。

5.1.1 参数敏感性分析的原理

参数敏感性分析通过控制变量法,在保持其他参数不变的情况下,系统性地改变某一参数的值,观察模型输出的变化趋势。这有助于开发者找到最佳参数配置,理解参数的影响边界,并为不同类型的任务建立参数调优指南。

5.1.2 实现代码

下面是一个基于Streamlit的参数敏感性分析工具的实现:

import streamlit as st
import numpy as np
import matplotlib.pyplot as plt
import time
import pandas as pd
from typing import List, Dict, Any, Optional

# 设置页面配置
st.set_page_config(
    page_title="LLM参数敏感性分析工具",
    page_icon="📊",
    layout="wide"
)

# 页面标题
st.title("📊 LLM参数敏感性分析工具")

# 侧边栏 - 基本配置
with st.sidebar:
    st.header("基本配置")

    # 模型选择
    model_name = st.selectbox(
        "选择模型",
        options=["gpt-3.5-turbo", "gpt-4", "text-davinci-003"],
        index=0
    )

    # API密钥输入
    api_key = st.text_input("API密钥", type="password")

    # 分隔线
    st.divider()

    st.header("敏感性分析参数")

    # 选择要分析的参数
    param_to_analyze = st.selectbox(
        "参数敏感性分析",
        options=["Temperature", "Top P", "频率惩罚", "存在惩罚"],
        index=0
    )

    # 参数范围设置
    if param_to_analyze == "Temperature":
        param_min, param_max = st.slider(
            "温度范围",
            min_value=0.0, 
            max_value=2.0, 
            value=(0.0, 1.0),
            step=0.1
        )
        param_steps = int((param_max - param_min) / 0.1) + 1
    elif param_to_analyze == "Top P":
        param_min, param_max = st.slider(
            "Top P范围",
            min_value=0.0, 
            max_value=1.0, 
            value=(0.5, 1.0),
            step=0.1
        )
        param_steps = int((param_max - param_min) / 0.1) + 1
    elif param_to_analyze == "频率惩罚":
        param_min, param_max = st.slider(
            "频率惩罚范围",
            min_value=-2.0, 
            max_value=2.0, 
            value=(-1.0, 1.0),
            step=0.2
        )
        param_steps = int((param_max - param_min) / 0.2) + 1
    else:  # 存在惩罚
        param_min, param_max = st.slider(
            "存在惩罚范围",
            min_value=-2.0, 
            max_value=2.0, 
            value=(-1.0, 1.0),
            step=0.2
        )
        param_steps = int((param_max - param_min) / 0.2) + 1

    # 固定参数设置
    st.subheader("固定参数")

    # 根据分析参数,设置其他参数的固定值
    if param_to_analyze != "Temperature":
        fixed_temperature = st.slider("固定温度", min_value=0.0, max_value=2.0, value=0.7, step=0.1)
    else:
        fixed_temperature = None

    if param_to_analyze != "Top P":
        fixed_top_p = st.slider("固定Top P", min_value=0.0, max_value=1.0, value=1.0, step=0.1)
    else:
        fixed_top_p = None

    if param_to_analyze != "频率惩罚":
        fixed_frequency_penalty = st.slider("固定频率惩罚", min_value=-2.0, max_value=2.0, value=0.0, step=0.1)
    else:
        fixed_frequency_penalty = None

    if param_to_analyze != "存在惩罚":
        fixed_presence_penalty = st.slider("固定存在惩罚", min_value=-2.0, max_value=2.0, value=0.0, step=0.1)
    else:
        fixed_presence_penalty = None

    fixed_max_tokens = st.slider("固定最大Token数", min_value=50, max_value=2000, value=500, step=50)

    # 任务类型选择
    task_type = st.selectbox(
        "任务类型",
        options=["创意写作", "技术问答", "内容摘要", "代码生成"],
        index=0
    )

# 主内容区
col1, col2 = st.columns(2)

with col1:
    st.header("📝 提示输入")

    # 根据任务类型预设提示
    if task_type == "创意写作":
        default_prompt = "请写一首关于人工智能的短诗,表达科技与人文的融合。"
    elif task_type == "技术问答":
        default_prompt = "请解释什么是大语言模型,它的工作原理是什么?"
    elif task_type == "内容摘要":
        default_prompt = "请为以下文本生成一个简洁的摘要:\n\n大语言模型(LLM)是一种基于深度学习的人工智能系统,能够理解和生成人类语言。它们通过在海量文本数据上训练,学习语言的统计规律和语义关系。现代大语言模型如GPT-4、Claude等,包含数十亿甚至数万亿个参数,能够执行各种自然语言处理任务,包括文本生成、问答、翻译、摘要等。随着技术的发展,大语言模型在准确性、创造性和实用性方面不断提升,正在各行各业得到广泛应用。"
    else:  # 代码生成
        default_prompt = "请编写一个Python函数,用于计算斐波那契数列的第n项,使用递归和动态规划两种方法。"

    prompt = st.text_area("提示内容", height=300, value=default_prompt)

    # 分析按钮
    analyze_button = st.button("🔍 开始参数敏感性分析", use_container_width=True)

    # 显示参数网格信息
    st.info(f"将生成 {param_steps} 个不同参数值的测试用例")

with col2:
    st.header("📈 分析结果")

    # 初始化会话状态
    if "sensitivity_results" not in st.session_state:
        st.session_state.sensitivity_results = []
    if "sensitivity_data" not in st.session_state:
        st.session_state.sensitivity_data = None

    # 执行参数敏感性分析
    if analyze_button:
        if not prompt.strip():
            st.error("请输入提示内容")
        elif not api_key:
            st.error("请输入API密钥")
        else:
            # 显示加载状态
            progress_bar = st.progress(0)
            status_text = st.empty()

            # 清空之前的结果
            st.session_state.sensitivity_results = []

            # 生成参数值序列
            param_values = np.linspace(param_min, param_max, param_steps)

            try:
                # 遍历所有参数值
                for i, param_value in enumerate(param_values):
                    # 更新状态
                    progress = (i + 1) / len(param_values)
                    progress_bar.progress(progress)
                    status_text.text(f"正在测试 {param_to_analyze} = {param_value:.2f} ({i+1}/{len(param_values)})")

                    # 设置当前测试的参数组合
                    if param_to_analyze == "Temperature":
                        current_params = {
   
                            "temperature": param_value,
                            "top_p": fixed_top_p,
                            "frequency_penalty": fixed_frequency_penalty,
                            "presence_penalty": fixed_presence_penalty,
                            "max_tokens": fixed_max_tokens
                        }
                    elif param_to_analyze == "Top P":
                        current_params = {
   
                            "temperature": fixed_temperature,
                            "top_p": param_value,
                            "frequency_penalty": fixed_frequency_penalty,
                            "presence_penalty": fixed_presence_penalty,
                            "max_tokens": fixed_max_tokens
                        }
                    elif param_to_analyze == "频率惩罚":
                        current_params = {
   
                            "temperature": fixed_temperature,
                            "top_p": fixed_top_p,
                            "frequency_penalty": param_value,
                            "presence_penalty": fixed_presence_penalty,
                            "max_tokens": fixed_max_tokens
                        }
                    else:  # 存在惩罚
                        current_params = {
   
                            "temperature": fixed_temperature,
                            "top_p": fixed_top_p,
                            "frequency_penalty": fixed_frequency_penalty,
                            "presence_penalty": param_value,
                            "max_tokens": fixed_max_tokens
                        }

                    # 模拟模型调用(在实际应用中替换为真实API调用)
                    start_time = time.time()
                    time.sleep(1)  # 模拟API延迟

                    # 模拟生成的响应
                    simulated_response = f"这是在参数 {param_to_analyze} = {param_value:.2f} 下生成的响应。基于您的提示,我生成了相关内容。"

                    # 计算响应时间和其他指标
                    response_time = round(time.time() - start_time, 2)
                    token_count = len(simulated_response.split())

                    # 保存结果
                    st.session_state.sensitivity_results.append({
   
                        "param_value": param_value,
                        "response": simulated_response,
                        "response_time": response_time,
                        "token_count": token_count,
                        **current_params
                    })

                # 创建DataFrame用于可视化
                results_data = []
                for result in st.session_state.sensitivity_results:
                    results_data.append({
   
                        param_to_analyze: result["param_value"],
                        "响应时间(秒)": result["response_time"],
                        "Token数量": result["token_count"]
                    })

                st.session_state.sensitivity_data = pd.DataFrame(results_data)

                # 清除状态文本
                status_text.text("")
                st.success("参数敏感性分析完成!")

            except Exception as e:
                st.error(f"分析过程中出错: {str(e)}")

    # 显示分析结果
    if st.session_state.sensitivity_data is not None:
        # 显示结果表格
        with st.expander("查看详细结果数据"):
            st.dataframe(st.session_state.sensitivity_data)

        # 可视化响应时间随参数变化的趋势
        st.subheader(f"{param_to_analyze} 对响应时间的影响")
        fig, ax = plt.subplots(figsize=(10, 5))
        ax.plot(st.session_state.sensitivity_data[param_to_analyze], 
                st.session_state.sensitivity_data["响应时间(秒)"], 
                marker='o', linestyle='-', color='blue')
        ax.set_xlabel(param_to_analyze)
        ax.set_ylabel('响应时间(秒)')
        ax.set_title(f'{param_to_analyze} 与响应时间的关系')
        ax.grid(True, linestyle='--', alpha=0.7)
        st.pyplot(fig)

        # 可视化Token数量随参数变化的趋势
        st.subheader(f"{param_to_analyze} 对生成长度的影响")
        fig, ax = plt.subplots(figsize=(10, 5))
        ax.plot(st.session_state.sensitivity_data[param_to_analyze], 
                st.session_state.sensitivity_data["Token数量"], 
                marker='s', linestyle='-', color='green')
        ax.set_xlabel(param_to_analyze)
        ax.set_ylabel('Token数量')
        ax.set_title(f'{param_to_analyze} 与生成长度的关系')
        ax.grid(True, linestyle='--', alpha=0.7)
        st.pyplot(fig)

        # 响应示例展示
        st.subheader("不同参数值的响应示例")

        # 获取最小、中间和最大参数值的响应
        if len(st.session_state.sensitivity_results) >= 3:
            min_result = min(st.session_state.sensitivity_results, key=lambda x: x["param_value"])
            max_result = max(st.session_state.sensitivity_results, key=lambda x: x["param_value"])
            mid_idx = len(st.session_state.sensitivity_results) // 2
            mid_result = st.session_state.sensitivity_results[mid_idx]

            # 显示不同参数值的响应
            tabs = st.tabs([
                f"{param_to_analyze} = {min_result['param_value']:.2f}",
                f"{param_to_analyze} = {mid_result['param_value']:.2f}",
                f"{param_to_analyze} = {max_result['param_value']:.2f}"
            ])

            with tabs[0]:
                st.markdown(min_result["response"])
                st.caption(f"响应时间: {min_result['response_time']}秒 | Token数量: {min_result['token_count']}")

            with tabs[1]:
                st.markdown(mid_result["response"])
                st.caption(f"响应时间: {mid_result['response_time']}秒 | Token数量: {mid_result['token_count']}")

            with tabs[2]:
                st.markdown(max_result["response"])
                st.caption(f"响应时间: {max_result['response_time']}秒 | Token数量: {max_result['token_count']}")

# 底部信息
st.divider()
st.markdown("提示:这是一个演示工具。在实际应用中,请替换为真实的API调用或本地模型推理。")

5.1.3 功能解析与使用指南

这个参数敏感性分析工具提供了以下核心功能:

1. 参数配置

  • 模型选择:选择要使用的LLM模型
  • 参数选择:选择要分析的参数(Temperature、Top P、频率惩罚、存在惩罚)
  • 参数范围:设置要分析的参数范围和步长
  • 固定参数:设置其他参数的固定值
  • 任务类型:选择预设的任务类型,自动加载相应的提示模板

2. 分析过程

  • 工具会自动生成参数值序列,覆盖用户指定的参数范围
  • 对于每个参数值,调用模型生成响应
  • 记录每个参数值下的响应内容、响应时间和token数量
  • 实时显示分析进度

3. 结果可视化

  • 详细结果表格:以表格形式展示所有测试结果
  • 趋势图表:显示参数值与响应时间、生成长度之间的关系
  • 响应示例对比:对比不同参数值下的生成结果

使用流程:

  1. 在侧边栏选择模型并输入API密钥
  2. 选择要分析的参数类型和范围
  3. 设置其他参数的固定值
  4. 选择任务类型或自定义提示
  5. 点击"开始参数敏感性分析"按钮
  6. 查看分析结果,包括趋势图表和响应示例

5.1.4 应用场景

参数敏感性分析工具适用于以下场景:

  • 模型调优:为特定任务找到最佳参数配置
  • 模型理解:深入了解不同参数对模型行为的影响
  • 成本优化:在生成质量和计算成本之间找到平衡点
  • 研究分析:研究模型参数与输出特性之间的关系

5.2 多模型对比工具

在实际应用中,开发者经常需要比较不同模型在相同任务上的表现。多模型对比工具可以帮助开发者系统地评估不同模型的优缺点,为模型选择提供数据支持。

5.2.1 多模型对比的原理

多模型对比通过在相同的输入提示和参数设置下,比较不同模型的输出结果,从而评估各模型在特定任务上的表现。对比维度通常包括生成质量、响应速度、token效率等多个方面。

5.2.2 实现代码

下面是一个基于Streamlit的多模型对比工具的实现:

import streamlit as st
import time
import pandas as pd
import matplotlib.pyplot as plt
from typing import List, Dict, Any

# 设置页面配置
st.set_page_config(
    page_title="LLM多模型对比工具",
    page_icon="⚖️",
    layout="wide"
)

# 页面标题
st.title("⚖️ LLM多模型对比工具")

# 侧边栏 - 配置
with st.sidebar:
    st.header("模型配置")

    # 可选择的模型列表
    available_models = [
        "gpt-3.5-turbo", 
        "gpt-4", 
        "text-davinci-003", 
        "本地模型1", 
        "本地模型2"
    ]

    # 模型选择
    selected_models = st.multiselect(
        "选择要对比的模型",
        options=available_models,
        default=["gpt-3.5-turbo", "gpt-4"]
    )

    # API密钥输入
    api_keys = {
   }
    for model in selected_models:
        if model != "本地模型1" and model != "本地模型2":
            api_keys[model] = st.text_input(f"{model} API密钥", type="password", key=f"key_{model}")

    # 分隔线
    st.divider()

    # 生成参数
    st.header("生成参数")
    temperature = st.slider("温度 (Temperature)", min_value=0.0, max_value=2.0, value=0.7, step=0.1)
    max_tokens = st.slider("最大Token数", min_value=50, max_value=2000, value=500, step=50)
    top_p = st.slider("Top P", min_value=0.0, max_value=1.0, value=1.0, step=0.1)
    frequency_penalty = st.slider("频率惩罚", min_value=-2.0, max_value=2.0, value=0.0, step=0.1)
    presence_penalty = st.slider("存在惩罚", min_value=-2.0, max_value=2.0, value=0.0, step=0.1)

# 主内容区
col1, col2 = st.columns(2)

with col1:
    st.header("📝 提示输入")

    # 任务类型选择
    task_type = st.selectbox(
        "任务类型",
        options=["创意写作", "技术问答", "内容摘要", "代码生成", "翻译任务"],
        index=0
    )

    # 根据任务类型预设提示
    if task_type == "创意写作":
        default_prompt = "请写一个关于未来城市的短篇故事,包含科技与人文的融合。"
    elif task_type == "技术问答":
        default_prompt = "请详细解释Transformer架构的工作原理,以及它如何改进了自然语言处理模型的性能。"
    elif task_type == "内容摘要":
        default_prompt = "请为以下文本生成一个简洁但全面的摘要:\n\n人工智能(AI)已经成为当今科技领域最热门的话题之一。随着计算能力的提升和算法的改进,AI技术在各个领域都取得了巨大的进步。特别是在自然语言处理(NLP)领域,大型语言模型(LLM)的出现彻底改变了人机交互的方式。这些模型通过在海量文本数据上训练,学习了语言的统计规律和语义关系,能够生成高质量的文本内容,回答复杂问题,进行多轮对话等。随着技术的不断发展,AI在医疗、金融、教育等领域的应用也越来越广泛,为人们的生活和工作带来了极大的便利。然而,AI技术的快速发展也带来了一些挑战,如数据隐私、算法偏见、就业影响等。如何在推动技术进步的同时,确保AI的安全、公平和可持续发展,是当前AI研究和应用中需要认真考虑的问题。"
    elif task_type == "代码生成":
        default_prompt = "请编写一个Python程序,实现快速排序算法,并添加详细的注释说明每个步骤的作用。"
    else:  # 翻译任务
        default_prompt = "请将以下英文段落翻译成中文:\n\nArtificial intelligence is transforming how we live and work. Large language models like GPT-4 are capable of generating human-like text, answering complex questions, and even writing code. As these models continue to improve, they are being integrated into various applications across industries, from customer service to content creation. However, there are still challenges to overcome, such as ensuring these models remain accurate, unbiased, and aligned with human values."

    prompt = st.text_area("提示内容", height=300, value=default_prompt)

    # 对比按钮
    compare_button = st.button("⚡ 开始模型对比", use_container_width=True)

    # 显示选择的模型数量
    st.info(f"将对比 {len(selected_models)} 个模型的表现")

with col2:
    st.header("📊 对比结果")

    # 初始化会话状态
    if "comparison_results" not in st.session_state:
        st.session_state.comparison_results = {
   }

    # 执行模型对比
    if compare_button:
        if not prompt.strip():
            st.error("请输入提示内容")
        elif not selected_models:
            st.error("请至少选择一个模型")
        else:
            # 检查API密钥
            missing_keys = []
            for model in selected_models:
                if model != "本地模型1" and model != "本地模型2" and model not in api_keys:
                    missing_keys.append(model)

            if missing_keys:
                st.error(f"请为以下模型输入API密钥: {', '.join(missing_keys)}")
            else:
                # 显示加载状态
                progress_bar = st.progress(0)
                status_text = st.empty()

                # 清空之前的结果
                st.session_state.comparison_results = {
   }

                try:
                    # 遍历所有选中的模型
                    for i, model in enumerate(selected_models):
                        # 更新状态
                        progress = (i + 1) / len(selected_models)
                        progress_bar.progress(progress)
                        status_text.text(f"正在测试模型: {model} ({i+1}/{len(selected_models)})")

                        # 模拟模型调用(在实际应用中替换为真实API调用)
                        start_time = time.time()
                        time.sleep(2)  # 模拟API延迟

                        # 根据模型类型模拟不同的响应
                        if model == "gpt-3.5-turbo":
                            simulated_response = f"这是{model}对提示的响应。我理解了您的要求,并根据您提供的提示生成了相应内容。这个响应展示了我的基础能力,包括理解复杂指令和生成连贯内容的能力。\n\n{prompt[:50]}..."
                        elif model == "gpt-4":
                            simulated_response = f"这是{model}对提示的详细响应。基于您的输入,我进行了深入分析并生成了以下内容:\n\n1. 针对您的核心需求,我提供了详细的解释和示例\n2. 我考虑了各种相关因素和边界情况\n3. 我使用了结构化的方式呈现信息,确保内容清晰易读\n4. 我提供了实用的建议和进一步的思考方向\n\n{prompt[:30]}..."
                        elif model == "text-davinci-003":
                            simulated_response = f"这是{model}对提示的响应。根据您的要求,我生成了以下内容。这个模型虽然相对较旧,但仍然能够处理各种文本生成任务。\n\n{prompt[:40]}..."
                        elif model == "本地模型1":
                            simulated_response = f"这是本地模型1对提示的响应。作为一个优化的本地模型,我能够在保护隐私的同时提供良好的性能。虽然在某些复杂任务上可能不如云端模型,但在延迟和成本方面具有优势。\n\n{prompt[:35]}..."
                        else:  # 本地模型2
                            simulated_response = f"这是本地模型2对提示的响应。这个模型经过特殊优化,适合在资源受限的环境中运行。虽然响应可能不如大型云端模型全面,但在速度和资源消耗方面表现出色。\n\n{prompt[:35]}..."

                        # 计算响应时间和其他指标
                        response_time = round(time.time() - start_time, 2)
                        token_count = len(simulated_response.split())

                        # 保存结果
                        st.session_state.comparison_results[model] = {
   
                            "response": simulated_response,
                            "response_time": response_time,
                            "token_count": token_count,
                            "temperature": temperature,
                            "max_tokens": max_tokens,
                            "top_p": top_p,
                            "frequency_penalty": frequency_penalty,
                            "presence_penalty": presence_penalty
                        }

                    # 清除状态文本
                    status_text.text("")
                    st.success("模型对比完成!")

                except Exception as e:
                    st.error(f"对比过程中出错: {str(e)}")

    # 显示对比结果
    if st.session_state.comparison_results:
        # 创建对比数据DataFrame
        comparison_data = []
        for model, result in st.session_state.comparison_results.items():
            comparison_data.append({
   
                "模型": model,
                "响应时间(秒)": result["response_time"],
                "Token数量": result["token_count"],
                "生成效率(Token/秒)": round(result["token_count"] / result["response_time"], 2)
            })

        comparison_df = pd.DataFrame(comparison_data)

        # 显示对比表格
        st.subheader("性能对比")
        st.dataframe(comparison_df)

        # 可视化响应时间对比
        st.subheader("响应时间对比")
        fig, ax = plt.subplots(figsize=(10, 5))
        bars = ax.bar(comparison_df["模型"], comparison_df["响应时间(秒)"], color='skyblue')
        ax.set_xlabel('模型')
        ax.set_ylabel('响应时间(秒)')
        ax.set_title('不同模型的响应时间对比')
        ax.grid(True, linestyle='--', alpha=0.7, axis='y')

        # 在柱状图上添加数值标签
        for bar in bars:
            height = bar.get_height()
            ax.text(bar.get_x() + bar.get_width()/2., height + 0.05,
                    f'{height:.2f}', ha='center', va='bottom')

        st.pyplot(fig)

        # 可视化生成效率对比
        st.subheader("生成效率对比")
        fig, ax = plt.subplots(figsize=(10, 5))
        bars = ax.bar(comparison_df["模型"], comparison_df["生成效率(Token/秒)"], color='lightgreen')
        ax.set_xlabel('模型')
        ax.set_ylabel('生成效率(Token/秒)')
        ax.set_title('不同模型的生成效率对比')
        ax.grid(True, linestyle='--', alpha=0.7, axis='y')

        # 在柱状图上添加数值标签
        for bar in bars:
            height = bar.get_height()
            ax.text(bar.get_x() + bar.get_width()/2., height + 0.5,
                    f'{height:.2f}', ha='center', va='bottom')

        st.pyplot(fig)

        # 响应内容对比
        st.subheader("响应内容对比")
        tabs = st.tabs([model for model in st.session_state.comparison_results.keys()])

        for i, (model, result) in enumerate(st.session_state.comparison_results.items()):
            with tabs[i]:
                st.markdown("### 响应内容")
                st.markdown(result["response"])

                st.markdown("### 性能指标")
                metrics_col1, metrics_col2 = st.columns(2)
                with metrics_col1:
                    st.info(f"响应时间: **{result['response_time']}** 秒")
                with metrics_col2:
                    st.info(f"Token数量: **{result['token_count']}**")

                st.markdown("### 使用参数")
                param_df = pd.DataFrame({
   
                    "参数": ["温度", "最大Token数", "Top P", "频率惩罚", "存在惩罚"],
                    "值": [
                        result["temperature"], 
                        result["max_tokens"], 
                        result["top_p"], 
                        result["frequency_penalty"], 
                        result["presence_penalty"]
                    ]
                })
                st.dataframe(param_df, use_container_width=True, hide_index=True)

        # 用户评分功能
        st.subheader("模型性能评分")
        st.info("请根据您的需求对模型性能进行评分,帮助进一步优化模型选择")

        ratings = []
        for model in st.session_state.comparison_results.keys():
            col1, col2 = st.columns([1, 2])
            with col1:
                st.write(model)
            with col2:
                rating = st.slider("评分 (1-10)", 1, 10, 5, key=f"rating_{model}")
                ratings.append({
   "模型": model, "评分": rating})

        if ratings:
            rating_df = pd.DataFrame(ratings)

            st.subheader("评分结果")
            fig, ax = plt.subplots(figsize=(10, 5))
            bars = ax.bar(rating_df["模型"], rating_df["评分"], color='lightcoral')
            ax.set_xlabel('模型')
            ax.set_ylabel('评分 (1-10)')
            ax.set_title('模型性能评分对比')
            ax.set_ylim(0, 11)
            ax.grid(True, linestyle='--', alpha=0.7, axis='y')

            # 在柱状图上添加数值标签
            for bar in bars:
                height = bar.get_height()
                ax.text(bar.get_x() + bar.get_width()/2., height + 0.1,
                        f'{height}', ha='center', va='bottom')

            st.pyplot(fig)

# 底部信息
st.divider()
st.markdown("提示:这是一个演示工具。在实际应用中,请替换为真实的API调用或本地模型推理。")

5.2.3 功能解析与使用指南

这个多模型对比工具提供了以下核心功能:

1. 模型选择与配置

  • 多模型选择:可以同时选择多个模型进行对比
  • API密钥管理:为不同模型提供对应的API密钥输入框
  • 统一参数设置:所有模型使用相同的生成参数,确保公平对比

2. 对比过程

  • 对每个选中的模型,使用相同的提示和参数配置调用模型
  • 实时显示对比进度
  • 收集每个模型的响应内容和性能指标

3. 结果展示

  • 性能对比表格:以表格形式展示各模型的性能指标
  • 可视化图表:直观展示响应时间和生成效率的对比
  • 响应内容对比:以标签页形式展示各模型的响应内容
  • 用户评分功能:允许用户根据需求对模型性能进行评分,并可视化展示评分结果

使用流程:

  1. 在侧边栏选择要对比的模型
  2. 为需要API的模型输入相应的API密钥
  3. 设置统一的生成参数
  4. 在提示输入区选择任务类型或输入自定义提示
  5. 点击"开始模型对比"按钮
  6. 查看对比结果,包括性能对比、响应内容对比和用户评分

5.2.4 应用场景

多模型对比工具适用于以下场景:

  • 模型选型:在项目开始前,评估不同模型的性能表现,选择最适合的模型
  • 成本效益分析:比较不同模型的性能与成本,找到最佳平衡点
  • 研究评估:对新模型进行系统性评估,与基准模型进行对比
  • A/B测试:测试不同版本模型的改进效果

5.3 批处理测试工具

在实际应用中,开发者通常需要在多个测试用例上批量评估模型的性能。批处理测试工具可以自动化这个过程,帮助开发者高效地进行大规模测试。

5.3.1 批处理测试的原理

批处理测试通过从预设的测试集或CSV文件中加载多个测试用例,然后自动在每个测试用例上运行模型,收集和分析结果。这种方法可以更全面地评估模型在不同场景下的性能,发现模型的优势和局限性。

5.3.2 实现代码

下面是一个基于Streamlit的批处理测试工具的实现:

import streamlit as st
import pandas as pd
import time
import matplotlib.pyplot as plt
import numpy as np
from typing import List, Dict, Any, Optional

# 设置页面配置
st.set_page_config(
    page_title="LLM批处理测试工具",
    page_icon="📋",
    layout="wide"
)

# 页面标题
st.title("📋 LLM批处理测试工具")

# 侧边栏 - 配置
with st.sidebar:
    st.header("测试配置")

    # 模型选择
    model_name = st.selectbox(
        "选择模型",
        options=["gpt-3.5-turbo", "gpt-4", "text-davinci-003", "本地模型"],
        index=0
    )

    # API密钥输入
    if model_name != "本地模型":
        api_key = st.text_input("API密钥", type="password")

    # 分隔线
    st.divider()

    # 生成参数
    st.header("生成参数")
    temperature = st.slider("温度 (Temperature)", min_value=0.0, max_value=2.0, value=0.7, step=0.1)
    max_tokens = st.slider("最大Token数", min_value=50, max_value=2000, value=500, step=50)
    top_p = st.slider("Top P", min_value=0.0, max_value=1.0, value=1.0, step=0.1)
    frequency_penalty = st.slider("频率惩罚", min_value=-2.0, max_value=2.0, value=0.0, step=0.1)
    presence_penalty = st.slider("存在惩罚", min_value=-2.0, max_value=2.0, value=0.0, step=0.1)

# 主内容区
col1, col2 = st.columns(2)

with col1:
    st.header("📊 测试用例管理")

    # 测试用例输入方式选择
    input_method = st.radio(
        "测试用例输入方式",
        options=["使用预设测试集", "上传CSV文件", "手动添加测试用例"],
        index=0
    )

    # 预设测试集
    if input_method == "使用预设测试集":
        testset_type = st.selectbox(
            "预设测试集",
            options=["通用问答测试集", "编程问题测试集", "创意写作测试集", "翻译测试集"],
            index=0
        )

        # 预设测试集内容
        if testset_type == "通用问答测试集":
            sample_tests = [
                {
   "id": 1, "prompt": "什么是人工智能?它有哪些主要应用领域?", "expected": "人工智能是模拟人类智能的计算机科学分支。主要应用包括自然语言处理、计算机视觉、机器人技术、推荐系统等。"},
                {
   "id": 2, "prompt": "解释一下区块链技术的基本原理。", "expected": "区块链是一种分布式账本技术,通过密码学链接数据块,实现去中心化、不可篡改的数据存储。"},
                {
   "id": 3, "prompt": "什么是气候变化?它对地球有哪些影响?", "expected": "气候变化是指长期气候模式的显著变化,主要由人类活动排放的温室气体引起。影响包括全球变暖、极端天气事件增加、海平面上升等。"},
                {
   "id": 4, "prompt": "简述量子计算的基本原理及其潜在应用。", "expected": "量子计算基于量子力学原理,利用量子比特的叠加和纠缠特性进行计算。潜在应用包括密码学破解、优化问题解决、药物发现等。"},
                {
   "id": 5, "prompt": "什么是机器学习?它与人工智能的关系是什么?", "expected": "机器学习是人工智能的一个分支,研究如何使计算机系统通过经验自动改进。机器学习是实现人工智能的重要方法之一。"}
            ]
        elif testset_type == "编程问题测试集":
            sample_tests = [
                {
   "id": 1, "prompt": "编写一个Python函数,计算两个数的最大公约数。", "expected": "使用欧几里得算法实现。"},
                {
   "id": 2, "prompt": "如何在Python中实现快速排序算法?", "expected": "选择一个基准元素,将数组分为小于和大于基准的两部分,递归排序。"},
                {
   "id": 3, "prompt": "编写一个函数,判断一个字符串是否为回文。", "expected": "比较字符串与其反转是否相等。"},
                {
   "id": 4, "prompt": "如何实现一个简单的二叉树遍历?", "expected": "包括前序、中序、后序和层序遍历方法。"},
                {
   "id": 5, "prompt": "编写一个Python程序,读取CSV文件并计算某一列的平均值。", "expected": "使用csv模块或pandas库读取并处理数据。"}
            ]
        elif testset_type == "创意写作测试集":
            sample_tests = [
                {
   "id": 1, "prompt": "写一首关于自然的短诗。", "expected": "表达对自然美的欣赏。"},
                {
   "id": 2, "prompt": "为一个名为'未来世界'的科幻故事写一段开头。", "expected": "设定未来场景,引入故事元素。"},
                {
   "id": 3, "prompt": "写一段描述秋天景色的文字。", "expected": "生动描绘秋天的视觉、听觉和感觉特征。"},
                {
   "id": 4, "prompt": "创作一个简短的童话,主角是一只聪明的狐狸。", "expected": "包含问题、解决过程和寓意。"},
                {
   "id": 5, "prompt": "为一款新的智能手机写一段吸引人的广告语。", "expected": "突出产品特点和用户价值。"}
            ]
        else:  # 翻译测试集
            sample_tests = [
                {
   "id": 1, "prompt": "将'人工智能正在改变我们的生活方式'翻译成英文。", "expected": "Artificial intelligence is changing our way of life."},
                {
   "id": 2, "prompt": "将'Climate change is a global challenge'翻译成中文。", "expected": "气候变化是一个全球性挑战。"},
                {
   "id": 3, "prompt": "将'科技创新推动社会进步'翻译成英文。", "expected": "Technological innovation drives social progress."},
                {
   "id": 4, "prompt": "将'Knowledge is power'翻译成中文。", "expected": "知识就是力量。"},
                {
   "id": 5, "prompt": "将'人工智能伦理问题需要认真考虑'翻译成英文。", "expected": "AI ethical issues need to be carefully considered."}
            ]

        # 显示预设测试集
        st.subheader("测试用例预览")
        test_df = pd.DataFrame(sample_tests)
        st.dataframe(test_df, use_container_width=True, hide_index=True)

    # 上传CSV文件
    elif input_method == "上传CSV文件":
        uploaded_file = st.file_uploader("上传测试用例CSV文件", type=["csv"])

        if uploaded_file is not None:
            try:
                test_df = pd.read_csv(uploaded_file)
                # 检查必要的列
                required_columns = ["prompt"]
                if not all(col in test_df.columns for col in required_columns):
                    st.error("CSV文件必须包含 'prompt' 列")
                else:
                    # 如果没有ID列,自动添加
                    if "id" not in test_df.columns:
                        test_df["id"] = range(1, len(test_df) + 1)
                    # 如果没有期望输出列,添加空列
                    if "expected" not in test_df.columns:
                        test_df["expected"] = ""

                    st.subheader("测试用例预览")
                    st.dataframe(test_df, use_container_width=True, hide_index=True)

                    # 保存到会话状态
                    st.session_state.test_df = test_df
            except Exception as e:
                st.error(f"读取CSV文件时出错: {str(e)}")

    # 手动添加测试用例
    else:
        st.subheader("添加测试用例")

        # 初始化测试用例列表
        if "manual_tests" not in st.session_state:
            st.session_state.manual_tests = []

        # 添加新测试用例
        new_prompt = st.text_area("提示内容", key="new_prompt")
        new_expected = st.text_input("期望输出(可选)", key="new_expected")

        if st.button("添加测试用例"):
            if not new_prompt.strip():
                st.error("提示内容不能为空")
            else:
                # 添加新测试用例
                test_id = len(st.session_state.manual_tests) + 1
                st.session_state.manual_tests.append({
   
                    "id": test_id,
                    "prompt": new_prompt,
                    "expected": new_expected
                })
                st.success("测试用例已添加")
                # 清空输入框
                st.session_state.new_prompt = ""
                st.session_state.new_expected = ""

        # 显示已添加的测试用例
        if st.session_state.manual_tests:
            test_df = pd.DataFrame(st.session_state.manual_tests)
            st.subheader("已添加的测试用例")
            st.dataframe(test_df, use_container_width=True, hide_index=True)

            # 删除测试用例功能
            col1, col2 = st.columns([1, 1])
            with col1:
                delete_id = st.number_input("要删除的测试用例ID", min_value=1, max_value=len(st.session_state.manual_tests), step=1)
            with col2:
                if st.button("删除测试用例"):
                    st.session_state.manual_tests = [test for test in st.session_state.manual_tests if test["id"] != delete_id]
                    # 重新编号
                    for i, test in enumerate(st.session_state.manual_tests):
                        test["id"] = i + 1
                    st.success(f"测试用例 {delete_id} 已删除")

    # 开始测试按钮
    if (input_method == "使用预设测试集" or 
        (input_method == "上传CSV文件" and "test_df" in st.session_state) or 
        (input_method == "手动添加测试用例" and st.session_state.manual_tests)):

        # 确定测试用例
        if input_method == "使用预设测试集":
            test_cases = sample_tests
        elif input_method == "上传CSV文件":
            test_cases = st.session_state.test_df.to_dict('records')
        else:
            test_cases = st.session_state.manual_tests

        st.info(f"共有 {len(test_cases)} 个测试用例")
        run_test_button = st.button("🚀 开始批处理测试", use_container_width=True)
    else:
        run_test_button = None

with col2:
    st.header("📈 测试结果")

    # 初始化会话状态
    if "batch_results" not in st.session_state:
        st.session_state.batch_results = []

    # 执行批处理测试
    if run_test_button:
        if model_name != "本地模型" and "api_key" not in locals():
            st.error("请输入API密钥")
        else:
            # 显示加载状态
            progress_bar = st.progress(0)
            status_text = st.empty()

            # 清空之前的结果
            st.session_state.batch_results = []

            try:
                # 遍历所有测试用例
                for i, test_case in enumerate(test_cases):
                    # 更新状态
                    progress = (i + 1) / len(test_cases)
                    progress_bar.progress(progress)
                    status_text.text(f"正在测试用例 {test_case['id']}/{len(test_cases)}")

                    # 模拟模型调用(在实际应用中替换为真实API调用)
                    start_time = time.time()
                    time.sleep(1.5)  # 模拟API延迟

                    # 模拟生成的响应
                    simulated_response = f"这是对测试用例 {test_case['id']} 的响应。基于提示:'{test_case['prompt'][:30]}...',我生成了相应内容。"

                    # 计算响应时间和其他指标
                    response_time = round(time.time() - start_time, 2)
                    token_count = len(simulated_response.split())

                    # 简单的相似度评分(模拟)
                    if "expected" in test_case and test_case["expected"]:
                        similarity_score = round(np.random.uniform(0.5, 0.95), 2)  # 模拟相似度评分
                    else:
                        similarity_score = None

                    # 保存结果
                    st.session_state.batch_results.append({
   
                        "test_id": test_case["id"],
                        "prompt": test_case["prompt"],
                        "expected": test_case.get("expected", ""),
                        "response": simulated_response,
                        "response_time": response_time,
                        "token_count": token_count,
                        "similarity_score": similarity_score
                    })

                # 清除状态文本
                status_text.text("")
                st.success("批处理测试完成!")

            except Exception as e:
                st.error(f"测试过程中出错: {str(e)}")

    # 显示测试结果
    if st.session_state.batch_results:
        # 创建结果DataFrame
        results_data = []
        for result in st.session_state.batch_results:
            results_data.append({
   
                "测试用例ID": result["test_id"],
                "响应时间(秒)": result["response_time"],
                "Token数量": result["token_count"],
                "相似度评分": result["similarity_score"] if result["similarity_score"] is not None else "N/A"
            })

        results_df = pd.DataFrame(results_data)

        # 显示结果概览
        st.subheader("测试结果概览")

        # 计算总体指标
        total_tests = len(st.session_state.batch_results)
        avg_response_time = round(sum(r["response_time"] for r in st.session_state.batch_results) / total_tests, 2)
        avg_token_count = round(sum(r["token_count"] for r in st.session_state.batch_results) / total_tests, 2)

        # 计算平均相似度(如果有)
        similarity_scores = [r["similarity_score"] for r in st.session_state.batch_results if r["similarity_score"] is not None]
        avg_similarity = round(sum(similarity_scores) / len(similarity_scores), 2) if similarity_scores else "N/A"

        # 显示总体指标
        col1, col2, col3, col4 = st.columns(4)
        with col1:
            st.metric("测试用例总数", total_tests)
        with col2:
            st.metric("平均响应时间", f"{avg_response_time}秒")
        with col3:
            st.metric("平均Token数量", avg_token_count)
        with col4:
            st.metric("平均相似度评分", avg_similarity)

        # 显示详细结果表格
        with st.expander("查看详细结果"):
            st.dataframe(results_df, use_container_width=True)

        # 可视化响应时间分布
        st.subheader("响应时间分布")
        fig, ax = plt.subplots(figsize=(10, 5))
        ax.hist([r["response_time"] for r in st.session_state.batch_results], bins=20, color='skyblue', edgecolor='black', alpha=0.7)
        ax.axvline(avg_response_time, color='red', linestyle='dashed', linewidth=2, label=f'平均值: {avg_response_time}秒')
        ax.set_xlabel('响应时间(秒)')
        ax.set_ylabel('频率')
        ax.set_title('响应时间分布')
        ax.legend()
        ax.grid(True, linestyle='--', alpha=0.7)
        st.pyplot(fig)

        # 可视化Token数量分布
        st.subheader("Token数量分布")
        fig, ax = plt.subplots(figsize=(10, 5))
        ax.hist([r["token_count"] for r in st.session_state.batch_results], bins=20, color='lightgreen', edgecolor='black', alpha=0.7)
        ax.axvline(avg_token_count, color='red', linestyle='dashed', linewidth=2, label=f'平均值: {avg_token_count}')
        ax.set_xlabel('Token数量')
        ax.set_ylabel('频率')
        ax.set_title('Token数量分布')
        ax.legend()
        ax.grid(True, linestyle='--', alpha=0.7)
        st.pyplot(fig)

        # 如果有相似度评分,可视化相似度分布
        if similarity_scores:
            st.subheader("相似度评分分布")
            fig, ax = plt.subplots(figsize=(10, 5))
            ax.hist(similarity_scores, bins=20, color='lightcoral', edgecolor='black', alpha=0.7)
            ax.axvline(avg_similarity, color='blue', linestyle='dashed', linewidth=2, label=f'平均值: {avg_similarity}')
            ax.set_xlabel('相似度评分')
            ax.set_ylabel('频率')
            ax.set_title('相似度评分分布')
            ax.legend()
            ax.grid(True, linestyle='--', alpha=0.7)
            st.pyplot(fig)

        # 详细查看测试用例结果
        st.subheader("测试用例详细结果")
        test_id_to_view = st.selectbox("选择测试用例ID查看详情", 
                                      options=[r["test_id"] for r in st.session_state.batch_results])

        # 查找选择的测试用例结果
        selected_result = next(r for r in st.session_state.batch_results if r["test_id"] == test_id_to_view)

        # 显示详细信息
        st.markdown("### 测试用例详情")

        col1, col2 = st.columns(2)
        with col1:
            st.markdown(f"**提示内容:**")
            st.markdown(selected_result["prompt"])

            if selected_result["expected"]:
                st.markdown(f"**期望输出:**")
                st.markdown(selected_result["expected"])

        with col2:
            st.markdown(f"**实际响应:**")
            st.markdown(selected_result["response"])

            st.markdown("**性能指标:**")
            metrics_data = {
   
                "响应时间": f"{selected_result['response_time']}秒",
                "Token数量": selected_result["token_count"]
            }
            if selected_result["similarity_score"] is not None:
                metrics_data["相似度评分"] = selected_result["similarity_score"]

            for key, value in metrics_data.items():
                st.info(f"{key}: **{value}**")

        # 导出结果功能
        st.subheader("导出结果")
        export_option = st.radio("选择导出格式", options=["CSV", "JSON"], index=0)

        if st.button("导出结果"):
            # 准备导出数据
            export_data = []
            for result in st.session_state.batch_results:
                export_data.append({
   
                    "test_id": result["test_id"],
                    "prompt": result["prompt"],
                    "expected": result["expected"],
                    "response": result["response"],
                    "response_time": result["response_time"],
                    "token_count": result["token_count"],
                    "similarity_score": result["similarity_score"]
                })

            if export_option == "CSV":
                export_df = pd.DataFrame(export_data)
                st.download_button(
                    label="下载CSV文件",
                    data=export_df.to_csv(index=False).encode('utf-8'),
                    file_name="llm_test_results.csv",
                    mime="text/csv"
                )
            else:  # JSON
                import json
                st.download_button(
                    label="下载JSON文件",
                    data=json.dumps(export_data, ensure_ascii=False, indent=2).encode('utf-8'),
                    file_name="llm_test_results.json",
                    mime="application/json"
                )

# 底部信息
st.divider()
st.markdown("提示:这是一个演示工具。在实际应用中,请替换为真实的API调用或本地模型推理。")

5.3.3 功能解析与使用指南

这个批处理测试工具提供了以下核心功能:

1. 测试用例管理

  • 多种输入方式:支持使用预设测试集、上传CSV文件或手动添加测试用例
  • 测试用例预览:直观展示所有测试用例的内容
  • 手动测试用例管理:支持添加和删除手动输入的测试用例

2. 批处理测试

  • 自动化测试:自动遍历所有测试用例并调用模型
  • 实时进度显示:展示测试进度和当前测试的用例
  • 结果收集:收集每个测试用例的响应内容和性能指标

3. 结果分析与可视化

  • 总体指标统计:计算并显示平均响应时间、平均token数量等总体指标
  • 详细结果表格:以表格形式展示所有测试用例的结果
  • 分布图表:展示响应时间、token数量和相似度评分的分布情况
  • 单例详情:可以选择特定测试用例查看详细结果
  • 结果导出:支持将测试结果导出为CSV或JSON格式

使用流程:

  1. 在侧边栏选择模型并输入API密钥
  2. 设置生成参数
  3. 选择测试用例输入方式(预设测试集、上传CSV或手动添加)
  4. 配置或添加测试用例
  5. 点击"开始批处理测试"按钮
  6. 查看测试结果,包括总体指标、分布图表和详细结果
  7. 可选:导出测试结果

5.3.4 应用场景

批处理测试工具适用于以下场景:

  • 模型评估:在多个测试用例上全面评估模型性能
  • 回归测试:在模型更新或参数调整后,验证性能是否符合预期
  • 基准测试:为不同模型或配置建立性能基准
  • 问题诊断:识别模型在特定类型任务上的弱点
  • 质量保证:在模型部署前进行全面的质量检查

5.4 小结

本章介绍了三个高级LLM调试功能的实现:参数敏感性分析工具、多模型对比工具和批处理测试工具。这些工具可以帮助开发者更深入地理解模型行为,系统地评估不同参数和模型的性能表现,以及高效地进行大规模测试。

参数敏感性分析工具通过控制变量法,帮助开发者找到最佳参数配置;多模型对比工具可以在相同条件下比较不同模型的表现,为模型选择提供数据支持;批处理测试工具则实现了大规模测试的自动化,提高测试效率和覆盖面。

在实际应用中,这些工具可以结合使用,形成完整的LLM调试和优化流程。例如,先用多模型对比工具选择最适合的基础模型,再用参数敏感性分析工具找到最佳参数配置,最后用批处理测试工具验证优化后的模型在多个测试用例上的表现。

第6章 交互式对话调试工具

6.1 交互式对话调试的原理与设计

交互式对话调试是LLM应用开发中的重要环节,特别是对于聊天机器人、客户服务系统等交互式应用。与单轮提示调试不同,交互式对话需要考虑对话历史的影响、上下文理解、对话流畅性等多个维度。

6.1.1 交互式对话调试的核心要素

  • 对话历史管理:记录和管理多轮对话历史,支持查看、编辑和删除特定轮次
  • 上下文窗口分析:监控和分析上下文窗口的使用情况,包括token消耗
  • 对话状态跟踪:记录和可视化对话的进展状态和主题变化
  • 干预机制:允许开发者在对话过程中进行干预,如修改系统提示、插入特定指令等
  • 多维度评估:从相关性、连贯性、准确性等多个维度评估对话质量

6.1.2 设计思路

我们将构建一个基于Streamlit的交互式对话调试工具,具有以下特点:

  1. 直观的对话界面:模拟真实的聊天界面,清晰展示用户和助手的交互过程
  2. 强大的历史管理:支持查看和编辑任意轮次的对话内容
  3. 实时的token分析:显示当前上下文的token使用情况和成本估算
  4. 灵活的系统配置:支持动态调整系统提示和生成参数
  5. 对话状态可视化:使用图表展示对话的主题变化和token消耗趋势
  6. 可扩展性:支持保存/加载对话记录,便于进一步分析和分享

6.2 实现代码

下面是交互式对话调试工具的完整实现代码:

import streamlit as st
import time
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import json
from typing import List, Dict, Any, Optional, Tuple
import re

# 设置页面配置
st.set_page_config(
    page_title="LLM交互式对话调试工具",
    page_icon="💬",
    layout="wide"
)

# 页面标题
st.title("💬 LLM交互式对话调试工具")

# 初始化会话状态
if "chat_history" not in st.session_state:
    st.session_state.chat_history = []
if "token_usage" not in st.session_state:
    st.session_state.token_usage = []
if "total_tokens" not in st.session_state:
    st.session_state.total_tokens = 0
if "message_count" not in st.session_state:
    st.session_state.message_count = 0
if "system_prompt" not in st.session_state:
    st.session_state.system_prompt = "你是一个有帮助的AI助手,能够回答各种问题并提供详细的解释。请保持友好、专业的语气。"
if "debug_mode" not in st.session_state:
    st.session_state.debug_mode = False

# 侧边栏 - 配置
with st.sidebar:
    st.header("模型配置")

    # 模型选择
    model_name = st.selectbox(
        "选择模型",
        options=["gpt-3.5-turbo", "gpt-4", "text-davinci-003", "本地模型"],
        index=0
    )

    # API密钥输入
    if model_name != "本地模型":
        api_key = st.text_input("API密钥", type="password")

    # 分隔线
    st.divider()

    # 系统提示
    st.header("系统提示")
    system_prompt_input = st.text_area(
        "系统提示内容", 
        value=st.session_state.system_prompt, 
        height=150,
        key="system_prompt_input"
    )

    # 更新系统提示
    if st.button("更新系统提示", use_container_width=True):
        st.session_state.system_prompt = system_prompt_input
        st.success("系统提示已更新")

    # 分隔线
    st.divider()

    # 生成参数
    st.header("生成参数")
    temperature = st.slider("温度 (Temperature)", min_value=0.0, max_value=2.0, value=0.7, step=0.1)
    max_tokens = st.slider("最大Token数", min_value=50, max_value=2000, value=500, step=50)
    top_p = st.slider("Top P", min_value=0.0, max_value=1.0, value=1.0, step=0.1)
    frequency_penalty = st.slider("频率惩罚", min_value=-2.0, max_value=2.0, value=0.0, step=0.1)
    presence_penalty = st.slider("存在惩罚", min_value=-2.0, max_value=2.0, value=0.0, step=0.1)

    # 分隔线
    st.divider()

    # 调试选项
    st.header("调试选项")
    debug_mode = st.checkbox("开启调试模式", value=st.session_state.debug_mode)
    if debug_mode != st.session_state.debug_mode:
        st.session_state.debug_mode = debug_mode

    # 上下文窗口分析
    if st.button("分析当前上下文", use_container_width=True):
        if st.session_state.chat_history:
            st.info("上下文分析功能将在发送消息后显示详细信息")
        else:
            st.warning("暂无对话历史可分析")

    # 分隔线
    st.divider()

    # 工具操作
    st.header("工具操作")

    # 清除对话历史
    if st.button("清除对话历史", use_container_width=True, type="secondary"):
        st.session_state.chat_history = []
        st.session_state.token_usage = []
        st.session_state.total_tokens = 0
        st.session_state.message_count = 0
        st.success("对话历史已清除")

    # 导出对话历史
    if st.button("导出对话历史", use_container_width=True):
        if st.session_state.chat_history:
            # 准备导出数据
            export_data = {
   
                "system_prompt": st.session_state.system_prompt,
                "model": model_name,
                "parameters": {
   
                    "temperature": temperature,
                    "max_tokens": max_tokens,
                    "top_p": top_p,
                    "frequency_penalty": frequency_penalty,
                    "presence_penalty": presence_penalty
                },
                "chat_history": st.session_state.chat_history,
                "token_usage": st.session_state.token_usage,
                "total_tokens": st.session_state.total_tokens
            }

            # 提供下载
            st.download_button(
                label="下载JSON文件",
                data=json.dumps(export_data, ensure_ascii=False, indent=2).encode('utf-8'),
                file_name="chat_history.json",
                mime="application/json"
            )
        else:
            st.warning("暂无对话历史可导出")

    # 导入对话历史
    uploaded_file = st.file_uploader("导入对话历史", type=["json"])
    if uploaded_file is not None:
        try:
            # 读取上传的文件
            import_data = json.load(uploaded_file)

            # 验证数据格式
            required_keys = ["chat_history"]
            if not all(key in import_data for key in required_keys):
                st.error("导入失败:文件格式不正确")
            else:
                # 导入对话历史
                st.session_state.chat_history = import_data["chat_history"]
                st.session_state.token_usage = import_data.get("token_usage", [])
                st.session_state.total_tokens = import_data.get("total_tokens", 0)
                st.session_state.message_count = len(st.session_state.chat_history)

                # 如果有系统提示,也导入
                if "system_prompt" in import_data:
                    st.session_state.system_prompt = import_data["system_prompt"]

                st.success("对话历史已导入")
                # 重新加载页面以更新界面
                st.rerun()
        except Exception as e:
            st.error(f"导入失败: {str(e)}")

# 主内容区 - 对话界面和分析面板
col1, col2 = st.columns([3, 2])

with col1:
    st.header("💭 对话界面")

    # 显示对话历史
    chat_container = st.container()

    # 系统提示显示(仅在调试模式下)
    if st.session_state.debug_mode and st.session_state.system_prompt:
        with chat_container:
            st.info(f"**系统提示:**\n{st.session_state.system_prompt}")

    # 显示对话消息
    with chat_container:
        for i, message in enumerate(st.session_state.chat_history):
            # 用户消息
            if message["role"] == "user":
                user_col, edit_col = st.columns([10, 1])
                with user_col:
                    st.chat_message("user").markdown(message["content"])
                with edit_col:
                    # 编辑按钮
                    if st.button("📝", key=f"edit_user_{i}", use_container_width=True):
                        # 存储当前编辑状态
                        st.session_state.editing_message = i
                        st.session_state.editing_content = message["content"]
                        # 触发重新运行以显示编辑界面
                        st.rerun()

            # 助手消息
            elif message["role"] == "assistant":
                assistant_col, edit_col = st.columns([10, 1])
                with assistant_col:
                    st.chat_message("assistant").markdown(message["content"])
                with edit_col:
                    # 编辑按钮
                    if st.button("📝", key=f"edit_assistant_{i}", use_container_width=True):
                        # 存储当前编辑状态
                        st.session_state.editing_message = i
                        st.session_state.editing_content = message["content"]
                        # 触发重新运行以显示编辑界面
                        st.rerun()

            # 显示token使用信息(仅在调试模式下)
            if st.session_state.debug_mode and i < len(st.session_state.token_usage):
                st.caption(f"💬 Token用量: 输入 {st.session_state.token_usage[i].get('prompt_tokens', 0)} | 输出 {st.session_state.token_usage[i].get('completion_tokens', 0)}")

    # 编辑消息界面
    if "editing_message" in st.session_state:
        with st.form("edit_message_form"):
            st.write(f"正在编辑消息 #{st.session_state.editing_message + 1} ({st.session_state.chat_history[st.session_state.editing_message]['role']})")
            new_content = st.text_area("消息内容", value=st.session_state.editing_content, height=200)

            col1, col2, col3 = st.columns(3)
            with col1:
                save_button = st.form_submit_button("保存更改")
            with col2:
                cancel_button = st.form_submit_button("取消", type="secondary")
            with col3:
                delete_button = st.form_submit_button("删除消息", type="destructive")

        if save_button:
            # 更新消息内容
            st.session_state.chat_history[st.session_state.editing_message]["content"] = new_content
            # 清除编辑状态
            del st.session_state.editing_message
            del st.session_state.editing_content
            st.success("消息已更新")
            st.rerun()
        elif cancel_button:
            # 清除编辑状态
            del st.session_state.editing_message
            del st.session_state.editing_content
            st.rerun()
        elif delete_button:
            # 删除消息
            del st.session_state.chat_history[st.session_state.editing_message]
            # 如果有对应的token使用记录,也删除
            if st.session_state.editing_message < len(st.session_state.token_usage):
                del st.session_state.token_usage[st.session_state.editing_message]
            # 更新消息计数
            st.session_state.message_count = len(st.session_state.chat_history)
            # 重新计算总token数
            st.session_state.total_tokens = sum(
                usage.get('prompt_tokens', 0) + usage.get('completion_tokens', 0) 
                for usage in st.session_state.token_usage
            )
            # 清除编辑状态
            del st.session_state.editing_message
            del st.session_state.editing_content
            st.success("消息已删除")
            st.rerun()

    # 输入新消息
    else:
        with st.form("chat_form", clear_on_submit=True):
            user_input = st.text_area("输入消息", height=100, key="user_input")
            col1, col2 = st.columns([1, 1])
            with col1:
                submit_button = st.form_submit_button("发送消息", use_container_width=True)
            with col2:
                continue_button = st.form_submit_button("继续上一轮回答", use_container_width=True)

        # 发送新消息
        if submit_button and user_input.strip():
            if model_name != "本地模型" and "api_key" not in locals():
                st.error("请输入API密钥")
            else:
                # 添加用户消息到历史
                st.session_state.chat_history.append({
   
                    "role": "user",
                    "content": user_input.strip(),
                    "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
                })
                st.session_state.message_count += 1

                # 显示加载状态
                with st.spinner("模型正在生成响应..."):
                    # 模拟模型调用(在实际应用中替换为真实API调用)
                    start_time = time.time()
                    time.sleep(2)  # 模拟API延迟

                    # 生成模拟响应
                    if len(st.session_state.chat_history) > 1:
                        # 生成一个基于用户输入的响应
                        simulated_response = f"感谢您的提问!根据您的输入:'{user_input[:50]}...',我为您提供以下回复:\n\n这是对您最新问题的详细回答。在对话的这个阶段,我已经理解了您的需求,并能够提供更有针对性的信息。\n\n1. 针对您的主要问题,我提供了核心观点\n2. 我进一步解释了相关概念和背景\n3. 我给出了实用的建议和解决方案\n4. 我还考虑了可能的后续步骤或替代方案\n\n如果您有任何后续问题或需要进一步澄清,请随时告诉我!"
                    else:
                        # 第一次对话的响应
                        simulated_response = f"您好!我是您的AI助手。感谢您的提问:'{user_input[:50]}...'\n\n我很乐意帮助您解答这个问题。根据我的理解,您想了解关于这个主题的信息。我会为您提供详细、准确且有用的回答。\n\n如果您有任何其他问题或需要更多信息,请随时告诉我。我会继续为您提供帮助!"

                    # 计算响应时间和token使用(模拟)
                    response_time = round(time.time() - start_time, 2)
                    prompt_tokens = len(user_input.split()) * 2  # 粗略估算
                    completion_tokens = len(simulated_response.split()) * 2  # 粗略估算

                    # 保存token使用信息
                    st.session_state.token_usage.append({
   
                        "prompt_tokens": prompt_tokens,
                        "completion_tokens": completion_tokens,
                        "total_tokens": prompt_tokens + completion_tokens,
                        "response_time": response_time
                    })

                    # 更新总token数
                    st.session_state.total_tokens += prompt_tokens + completion_tokens

                    # 添加助手消息到历史
                    st.session_state.chat_history.append({
   
                        "role": "assistant",
                        "content": simulated_response,
                        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
                    })
                    st.session_state.message_count += 1

                    # 触发重新运行以显示新消息
                    st.rerun()

        # 继续上一轮回答
        elif continue_button:
            if not st.session_state.chat_history or st.session_state.chat_history[-1]["role"] != "assistant":
                st.error("请先发送消息并获得回答后再继续")
            else:
                if model_name != "本地模型" and "api_key" not in locals():
                    st.error("请输入API密钥")
                else:
                    # 生成继续回答的提示
                    continue_prompt = "请继续详细解释之前的回答,提供更多细节和例子。"
                    st.session_state.chat_history.append({
   
                        "role": "user",
                        "content": continue_prompt,
                        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
                    })
                    st.session_state.message_count += 1

                    # 显示加载状态
                    with st.spinner("模型正在继续生成回答..."):
                        # 模拟模型调用
                        start_time = time.time()
                        time.sleep(1.5)  # 模拟API延迟

                        # 生成模拟的继续回答
                        simulated_response = "基于我之前的回答,我想进一步补充以下内容:\n\n**更多细节说明:**\n- 深入分析主题的各个方面\n- 提供更多具体的例子和应用场景\n- 解释可能的例外情况和边界条件\n\n**实用建议:**\n- 如何在实际环境中应用这些概念\n- 常见的陷阱和避免方法\n- 进一步学习和探索的资源\n\n希望这些额外的信息对您有所帮助。如果您对某个特定方面还有疑问,请随时告诉我!"

                        # 计算响应时间和token使用(模拟)
                        response_time = round(time.time() - start_time, 2)
                        prompt_tokens = len(continue_prompt.split()) * 2  # 粗略估算
                        completion_tokens = len(simulated_response.split()) * 2  # 粗略估算

                        # 保存token使用信息
                        st.session_state.token_usage.append({
   
                            "prompt_tokens": prompt_tokens,
                            "completion_tokens": completion_tokens,
                            "total_tokens": prompt_tokens + completion_tokens,
                            "response_time": response_time
                        })

                        # 更新总token数
                        st.session_state.total_tokens += prompt_tokens + completion_tokens

                        # 添加助手消息到历史
                        st.session_state.chat_history.append({
   
                            "role": "assistant",
                            "content": simulated_response,
                            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
                        })
                        st.session_state.message_count += 1

                        # 触发重新运行以显示新消息
                        st.rerun()

with col2:
    st.header("📊 对话分析")

    # 总体统计信息
    st.subheader("总体统计")

    col1, col2 = st.columns(2)
    with col1:
        st.metric("消息总数", st.session_state.message_count)
        st.metric("总Token消耗", st.session_state.total_tokens)
    with col2:
        # 估算成本(假设价格)
        if model_name == "gpt-3.5-turbo":
            input_cost_per_1k = 0.0015
            output_cost_per_1k = 0.002
        elif model_name == "gpt-4":
            input_cost_per_1k = 0.03
            output_cost_per_1k = 0.06
        else:
            input_cost_per_1k = 0
            output_cost_per_1k = 0

        # 计算输入和输出token总数
        total_input_tokens = sum(usage.get('prompt_tokens', 0) for usage in st.session_state.token_usage)
        total_output_tokens = sum(usage.get('completion_tokens', 0) for usage in st.session_state.token_usage)

        # 估算总成本
        estimated_cost = (total_input_tokens / 1000 * input_cost_per_1k) + (total_output_tokens / 1000 * output_cost_per_1k)

        st.metric("平均每轮Token", round(st.session_state.total_tokens / len(st.session_state.token_usage), 2) if st.session_state.token_usage else 0)
        st.metric("估算成本 (USD)", f"${estimated_cost:.4f}")

    # Token使用趋势图
    if st.session_state.token_usage:
        st.subheader("Token使用趋势")

        # 准备数据
        rounds = list(range(1, len(st.session_state.token_usage) + 1))
        input_tokens = [usage.get('prompt_tokens', 0) for usage in st.session_state.token_usage]
        output_tokens = [usage.get('completion_tokens', 0) for usage in st.session_state.token_usage]

        # 创建图表
        fig, ax = plt.subplots(figsize=(10, 5))
        ax.plot(rounds, input_tokens, marker='o', label='输入Token', color='blue')
        ax.plot(rounds, output_tokens, marker='s', label='输出Token', color='green')
        ax.plot(rounds, [sum(x) for x in zip(input_tokens, output_tokens)], marker='^', label='总Token', color='red')

        ax.set_xlabel('对话轮次')
        ax.set_ylabel('Token数量')
        ax.set_title('Token使用趋势')
        ax.legend()
        ax.grid(True, linestyle='--', alpha=0.7)

        st.pyplot(fig)

    # 上下文窗口分析
    st.subheader("上下文窗口分析")

    if st.session_state.chat_history:
        # 计算当前上下文的token数(粗略估算)
        current_context_tokens = 0
        context_content = []

        for message in st.session_state.chat_history:
            tokens_estimate = len(message["content"].split()) * 2  # 粗略估算
            current_context_tokens += tokens_estimate
            context_content.append(f"{message['role']}: {tokens_estimate} tokens")

        # 添加系统提示的token数
        system_prompt_tokens = len(st.session_state.system_prompt.split()) * 2
        current_context_tokens += system_prompt_tokens
        context_content.insert(0, f"system_prompt: {system_prompt_tokens} tokens")

        # 显示上下文分析结果
        st.info(f"当前上下文总Token数: **{current_context_tokens}**")

        # 上下文组成明细
        with st.expander("查看上下文组成明细"):
            for item in context_content:
                st.write(item)

        # 上下文窗口警告
        if model_name == "gpt-3.5-turbo":
            context_limit = 4096
        elif model_name == "gpt-4":
            context_limit = 8192
        else:
            context_limit = float('inf')

        if current_context_tokens > context_limit * 0.8:
            st.warning(f"⚠️ 上下文接近模型限制 ({current_context_tokens}/{context_limit} tokens)")
        elif current_context_tokens > context_limit:
            st.error(f"❌ 上下文超出模型限制 ({current_context_tokens}/{context_limit} tokens)")
    else:
        st.info("开始对话后将显示上下文分析")

    # 对话主题分析(模拟)
    st.subheader("对话主题分析")

    if st.session_state.chat_history:
        # 模拟主题分析结果
        # 在实际应用中,可以使用NLP技术进行主题提取和分析
        topics = [
            {
   "topic": "用户问题概述", "relevance": np.random.uniform(0.7, 1.0), "occurrence": np.random.randint(1, len(st.session_state.chat_history) // 2 + 1)},
            {
   "topic": "核心概念解释", "relevance": np.random.uniform(0.6, 0.9), "occurrence": np.random.randint(1, len(st.session_state.chat_history) // 2 + 1)},
            {
   "topic": "实用建议提供", "relevance": np.random.uniform(0.5, 0.8), "occurrence": np.random.randint(1, len(st.session_state.chat_history) // 3 + 1)},
            {
   "topic": "进一步解释", "relevance": np.random.uniform(0.4, 0.7), "occurrence": np.random.randint(1, len(st.session_state.chat_history) // 4 + 1)},
        ]

        # 显示主题分析
        fig, ax = plt.subplots(figsize=(10, 4))

        # 准备数据
        topic_names = [t["topic"] for t in topics]
        relevance_scores = [t["relevance"] for t in topics]
        occurrences = [t["occurrence"] for t in topics]

        # 创建双轴图表
        ax1 = ax
        ax2 = ax.twinx()

        # 绘制相关性柱状图
        bars = ax1.bar(topic_names, relevance_scores, alpha=0.7, color='skyblue', label='相关性')
        ax1.set_xlabel('主题')
        ax1.set_ylabel('相关性分数', color='blue')
        ax1.tick_params(axis='y', labelcolor='blue')
        ax1.set_ylim(0, 1.1)

        # 绘制出现次数折线图
        ax2.plot(topic_names, occurrences, marker='o', color='red', label='出现次数')
        ax2.set_ylabel('出现次数', color='red')
        ax2.tick_params(axis='y', labelcolor='red')
        ax2.set_ylim(0, max(occurrences) * 1.2)

        # 设置标题
        ax.set_title('对话主题分析')

        # 调整布局
        plt.xticks(rotation=45, ha='right')
        fig.tight_layout()

        st.pyplot(fig)
    else:
        st.info("开始对话后将显示主题分析")

    # 响应时间分析
    if st.session_state.token_usage:
        st.subheader("响应时间分析")

        # 准备数据
        rounds = list(range(1, len(st.session_state.token_usage) + 1))
        response_times = [usage.get('response_time', 0) for usage in st.session_state.token_usage]
        avg_response_time = sum(response_times) / len(response_times)

        # 创建图表
        fig, ax = plt.subplots(figsize=(10, 4))
        ax.plot(rounds, response_times, marker='o', linestyle='-', color='orange')
        ax.axhline(avg_response_time, color='red', linestyle='--', label=f'平均值: {avg_response_time:.2f}秒')

        ax.set_xlabel('对话轮次')
        ax.set_ylabel('响应时间 (秒)')
        ax.set_title('响应时间趋势')
        ax.legend()
        ax.grid(True, linestyle='--', alpha=0.7)

        st.pyplot(fig)

    # 调试信息
    if st.session_state.debug_mode:
        st.subheader("调试信息")

        # 显示当前会话状态摘要
        with st.expander("查看会话状态摘要"):
            st.write(f"对话历史长度: {len(st.session_state.chat_history)}")
            st.write(f"Token使用记录数: {len(st.session_state.token_usage)}")
            st.write(f"最后消息时间戳: {st.session_state.chat_history[-1]['timestamp'] if st.session_state.chat_history else '无'}")

# 底部信息
st.divider()
st.markdown("提示:这是一个演示工具。在实际应用中,请替换为真实的API调用或本地模型推理。")

6.3 功能解析与使用指南

这个交互式对话调试工具提供了全面的对话管理和分析功能,帮助开发者深入理解和优化LLM对话应用。

6.3.1 核心功能解析

1. 对话界面与管理

  • 直观的对话流:模拟真实的聊天界面,清晰区分用户和助手消息
  • 消息编辑与删除:支持对任意历史消息进行编辑、删除操作
  • 继续回答功能:允许在当前回答基础上要求模型提供更多细节
  • 对话历史导入/导出:支持将对话历史保存为JSON文件或从文件导入

2. 模型配置与控制

  • 模型选择:支持多种模型选项,包括云端API和本地模型
  • 系统提示管理:动态更新和应用系统提示
  • 参数调整:实时配置生成参数,包括温度、最大Token数等
  • 调试模式:提供详细的调试信息和过程可视化

3. 实时分析与监控

  • Token使用分析:实时显示和跟踪Token消耗情况
  • 成本估算:根据不同模型的定价估算使用成本
  • 上下文窗口监控:警告上下文接近或超出模型限制
  • 响应时间分析:监控和可视化模型响应时间
  • 主题分析:模拟对话主题分布和相关性分析

6.3.2 使用流程

基本使用流程:

  1. 初始配置

    • 在侧边栏选择模型并输入API密钥
    • 调整系统提示和生成参数
    • 可选:开启调试模式获取更详细信息
  2. 进行对话

    • 在消息输入框中输入问题或指令
    • 点击"发送消息"按钮
    • 查看模型回复,可点击"继续上一轮回答"获取更多细节
  3. 分析与调试

    • 观察右侧面板的实时分析数据
    • 监控Token使用和上下文窗口状态
    • 查看响应时间和主题分布
  4. 历史管理

    • 点击消息旁的编辑按钮修改历史消息
    • 导出对话历史保存分析结果
    • 导入之前保存的对话继续分析

6.3.3 高级使用技巧

1. 对话优化策略

  • 系统提示调优:尝试不同的系统提示,观察对整个对话流程的影响
  • 参数敏感度测试:在对话中间调整参数,比较不同参数配置下的响应质量
  • 上下文管理:当上下文接近限制时,考虑通过编辑历史消息精简内容

2. 错误诊断与修复

  • 消息编辑:当模型理解偏差时,编辑历史消息重新引导对话方向
  • 断点分析:通过删除后续消息,定位对话中的问题起始点
  • 提示工程:针对特定问题,尝试不同的提问方式和指令格式

3. 性能优化

  • Token效率分析:监控每轮对话的Token消耗,优化提示语言
  • 响应时间监控:识别可能的性能瓶颈
  • 成本控制:通过参数调整(如减少max_tokens)平衡质量和成本

6.4 应用场景

交互式对话调试工具适用于以下场景:

  • 聊天机器人开发:调试和优化多轮对话流程
  • 客服系统设计:测试不同场景下的对话质量和准确性
  • 教育辅导应用:确保对话的连贯性和教育价值
  • 内容创作助手:优化创作过程中的人机协作流程
  • 研究分析:研究LLM在不同对话场景中的表现和局限性

6.5 小结

本章介绍了交互式对话调试工具的设计原理、实现代码和使用方法。这个工具提供了全面的对话管理、模型控制和实时分析功能,可以帮助开发者高效地调试和优化LLM对话应用。

与单轮提示调试不同,交互式对话调试需要考虑更多维度的因素,如对话历史、上下文窗口、主题一致性等。通过这个工具,开发者可以系统性地分析这些因素对对话质量的影响,从而优化对话流程和用户体验。

第7章 Token级调试和分析工具

7.1 Token级调试的原理与重要性

Token级调试是LLM应用开发中的进阶技术,它允许开发者深入到模型处理的最基本单位——Token的层面进行分析和优化。通过理解模型如何处理和生成每个Token,开发者可以更精确地定位问题、优化性能并改进输出质量。

7.1.1 Token级调试的核心价值

  • 精确问题定位:识别模型在特定Token位置的理解偏差或生成问题
  • 性能瓶颈分析:发现哪些Token或序列导致处理延迟或资源消耗过高
  • 输出质量优化:通过分析Token生成概率分布,优化输出的准确性和多样性
  • 上下文窗口管理:精确控制和优化上下文窗口中各部分内容的Token分配
  • 成本精细化控制:深入了解Token消耗模式,实现更精确的成本优化

7.1.2 Token级调试的关键维度

  • Token分割分析:了解输入文本如何被分割为Token序列
  • Token重要性评估:确定哪些Token对模型理解和生成过程贡献最大
  • 生成过程可视化:观察模型生成每个Token的概率分布和置信度
  • 注意力权重分析:理解模型在处理每个Token时的注意力分配
  • 上下文影响测量:量化不同上下文中Token对输出的影响

7.2 实现代码

下面是一个完整的Token级调试和分析工具的实现代码:

import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import re
from collections import Counter, defaultdict
import tiktoken  # OpenAI的tokenizer库
from typing import List, Dict, Any, Optional, Tuple

# 设置页面配置
st.set_page_config(
    page_title="LLM Token级调试和分析工具",
    page_icon="🔍",
    layout="wide"
)

# 页面标题
st.title("🔍 LLM Token级调试和分析工具")

# 初始化会话状态
if "token_analysis_results" not in st.session_state:
    st.session_state.token_analysis_results = None
if "selected_model" not in st.session_state:
    st.session_state.selected_model = "gpt-3.5-turbo"

# 支持的模型列表及其对应的tokenizer
MODEL_TOKENIZERS = {
   
    "gpt-3.5-turbo": "cl100k_base",
    "gpt-4": "cl100k_base",
    "text-davinci-003": "p50k_base",
    "gpt-2": "gpt2"
}

# 侧边栏 - 配置
with st.sidebar:
    st.header("基本配置")

    # 模型选择
    model_name = st.selectbox(
        "选择模型(用于tokenizer)",
        options=list(MODEL_TOKENIZERS.keys()),
        index=0,
        key="model_select"
    )

    # 更新会话状态中的模型选择
    if model_name != st.session_state.selected_model:
        st.session_state.selected_model = model_name
        # 清除之前的分析结果
        st.session_state.token_analysis_results = None

    # 分隔线
    st.divider()

    # 分析选项
    st.header("分析选项")

    # 启用的分析选项
    enable_tokenization = st.checkbox("Token分割分析", value=True)
    enable_statistics = st.checkbox("Token统计分析", value=True)
    enable_context_analysis = st.checkbox("上下文窗口分析", value=True)
    enable_cost_estimation = st.checkbox("成本估算", value=True)

    # 分隔线
    st.divider()

    # 模型参数(用于分析和估算)
    st.header("模型参数")

    # 上下文窗口大小
    context_window_size = st.slider(
        "上下文窗口大小 (Token)", 
        min_value=512, 
        max_value=100000,
        value=8192 if model_name in ["gpt-4"] else 4096,
        step=100
    )

    # 成本设置
    st.subheader("成本参数 (USD/1K Token)")
    col1, col2 = st.columns(2)
    with col1:
        input_cost_per_1k = st.number_input(
            "输入成本", 
            min_value=0.0, 
            value=0.03 if model_name == "gpt-4" else 0.0015,
            step=0.0001,
            format="%.6f"
        )
    with col2:
        output_cost_per_1k = st.number_input(
            "输出成本", 
            min_value=0.0, 
            value=0.06 if model_name == "gpt-4" else 0.002,
            step=0.0001,
            format="%.6f"
        )

# 主内容区
st.header("📝 输入分析文本")

# 创建两个标签页
analysis_tab, examples_tab = st.tabs(["分析输入", "示例库"])

with analysis_tab:
    # 文本输入区域
    input_text = st.text_area(
        "输入要分析的文本(提示词或上下文)",
        value="",
        height=300,
        placeholder="在此输入您的提示词、上下文或任意文本..."
    )

    # 模拟响应文本(用于分析生成内容)
    if enable_cost_estimation:
        response_text = st.text_area(
            "模拟响应文本(可选,用于估算完整交互成本)",
            value="",
            height=200,
            placeholder="在此输入模型的模拟响应文本..."
        )

    # 提交按钮
    analyze_button = st.button(
        "🔍 执行Token分析",
        use_container_width=True,
        type="primary",
        disabled=not input_text.strip()
    )

    # 分析逻辑
    if analyze_button and input_text.strip():
        # 获取选定的tokenizer
        try:
            tokenizer = tiktoken.get_encoding(MODEL_TOKENIZERS[model_name])

            # 显示加载状态
            with st.spinner("正在分析Token..."):
                # 初始化分析结果字典
                analysis_results = {
   }

                # 1. Token分割分析
                if enable_tokenization:
                    # 编码文本为token IDs
                    token_ids = tokenizer.encode(input_text)
                    # 解码token IDs为tokens(这会返回Unicode字符串)
                    tokens = [tokenizer.decode_single_token_bytes(token_id).decode('utf-8', errors='replace') for token_id in token_ids]

                    analysis_results["tokenization"] = {
   
                        "token_ids": token_ids,
                        "tokens": tokens,
                        "total_tokens": len(token_ids)
                    }

                # 2. Token统计分析
                if enable_statistics:
                    if "tokens" not in analysis_results.get("tokenization", {
   }):
                        # 如果没有进行token分割分析,单独执行
                        token_ids = tokenizer.encode(input_text)
                        tokens = [tokenizer.decode_single_token_bytes(token_id).decode('utf-8', errors='replace') for token_id in token_ids]
                    else:
                        tokens = analysis_results["tokenization"]["tokens"]

                    # 统计信息
                    token_lengths = [len(token) for token in tokens]
                    special_tokens = [token for token in tokens if token.startswith("<") and token.endswith(">"))]
                    frequent_tokens = Counter(tokens).most_common(20)

                    # 字符到token比率
                    char_to_token_ratio = len(input_text) / len(tokens) if len(tokens) > 0 else 0

                    # 长token分析(> 5个字符)
                    long_tokens = [(token, len(token)) for token in tokens if len(token) > 5]
                    long_tokens.sort(key=lambda x: x[1], reverse=True)

                    analysis_results["statistics"] = {
   
                        "token_lengths": token_lengths,
                        "special_tokens": special_tokens,
                        "frequent_tokens": frequent_tokens,
                        "char_to_token_ratio": char_to_token_ratio,
                        "long_tokens": long_tokens[:10]  # 前10个最长的token
                    }

                # 3. 上下文窗口分析
                if enable_context_analysis:
                    if "tokens" not in analysis_results.get("tokenization", {
   }):
                        # 如果没有进行token分割分析,单独执行
                        token_ids = tokenizer.encode(input_text)
                        tokens = [tokenizer.decode_single_token_bytes(token_id).decode('utf-8', errors='replace') for token_id in token_ids]
                    else:
                        tokens = analysis_results["tokenization"]["tokens"]

                    total_tokens = len(tokens)
                    context_usage_percentage = (total_tokens / context_window_size) * 100
                    remaining_tokens = context_window_size - total_tokens

                    # 分割文本为段落并分析每个段落的token数
                    paragraphs = input_text.split('\n')
                    paragraph_token_counts = []
                    for paragraph in paragraphs:
                        if paragraph.strip():
                            para_token_ids = tokenizer.encode(paragraph)
                            paragraph_token_counts.append({
   
                                "text": paragraph[:50] + "..." if len(paragraph) > 50 else paragraph,
                                "token_count": len(para_token_ids)
                            })

                    # 按token数量排序段落
                    paragraph_token_counts.sort(key=lambda x: x["token_count"], reverse=True)

                    analysis_results["context_analysis"] = {
   
                        "total_tokens": total_tokens,
                        "context_window_size": context_window_size,
                        "context_usage_percentage": context_usage_percentage,
                        "remaining_tokens": remaining_tokens,
                        "paragraph_analysis": paragraph_token_counts[:10]  # 前10个token最多的段落
                    }

                # 4. 成本估算
                if enable_cost_estimation:
                    if "tokenization" in analysis_results:
                        input_token_count = analysis_results["tokenization"]["total_tokens"]
                    else:
                        input_token_count = len(tokenizer.encode(input_text))

                    # 估算输入成本
                    input_cost = (input_token_count / 1000) * input_cost_per_1k

                    # 如果提供了响应文本,也估算输出成本
                    output_token_count = 0
                    output_cost = 0.0
                    if response_text.strip():
                        output_token_count = len(tokenizer.encode(response_text))
                        output_cost = (output_token_count / 1000) * output_cost_per_1k

                    total_cost = input_cost + output_cost

                    analysis_results["cost_estimation"] = {
   
                        "input_token_count": input_token_count,
                        "input_cost": input_cost,
                        "output_token_count": output_token_count,
                        "output_cost": output_cost,
                        "total_cost": total_cost,
                        "cost_per_1k_input": input_cost_per_1k,
                        "cost_per_1k_output": output_cost_per_1k
                    }

                # 保存分析结果到会话状态
                st.session_state.token_analysis_results = analysis_results

                # 触发重新运行以显示结果
                st.rerun()

        except Exception as e:
            st.error(f"分析过程中出错: {str(e)}")

with examples_tab:
    st.subheader("示例提示词和文本")

    # 预定义的示例
    examples = [
        {
   
            "title": "基础提示工程示例",
            "content": "你是一位专业的数据科学家。请详细解释什么是随机森林算法,包括其工作原理、优缺点以及在现实世界中的应用场景。请提供至少两个具体的代码示例来说明如何在Python中实现和使用随机森林。"
        },
        {
   
            "title": "多轮对话上下文示例",
            "content": "用户: 你能帮我学习Python编程吗?\n助手: 当然可以!Python是一种功能强大且易于学习的编程语言。你想了解Python的哪些方面?是基础语法、数据结构、函数,还是特定领域的应用?\n用户: 我想先从基础语法开始,特别是变量和数据类型。\n助手: 很好的起点!在Python中,变量是用来存储数据的容器,而数据类型则决定了数据的性质和可执行的操作。让我为你介绍Python的基本数据类型...\n用户: 谢谢!现在我想了解如何使用条件语句和循环。\n助手: 条件语句和循环是编程中的基础控制结构,让我们一起来学习..."
        },
        {
   
            "title": "长文本分析示例",
            "content": "自然语言处理(Natural Language Processing,简称NLP)是人工智能领域的一个重要分支,致力于使计算机能够理解、解释和生成人类语言。NLP的发展可以追溯到20世纪50年代,当时研究人员开始探索如何让计算机处理和理解自然语言。\n\n在过去的几十年里,NLP经历了几个重要的发展阶段。最初,研究主要集中在基于规则的方法上,依赖于语言学家手动编写的语法规则和词典。然而,这种方法很快就显示出局限性,因为自然语言的复杂性和多样性使得不可能覆盖所有可能的情况。\n\n随着机器学习技术的进步,NLP研究逐渐转向基于统计的方法。在这一阶段,研究人员使用大量的文本数据来训练统计模型,这些模型能够从数据中学习语言模式和规律。虽然这些方法比基于规则的方法更灵活,但它们仍然依赖于手动设计的特征和启发式方法。\n\n近年来,随着深度学习技术的突破,特别是预训练语言模型的出现,NLP领域取得了前所未有的进展。像BERT、GPT和T5这样的模型通过在海量文本数据上预训练,能够捕捉到丰富的语言知识和上下文信息,从而在各种NLP任务上取得了突破性的性能。\n\n当前,NLP技术已经广泛应用于各个领域,包括机器翻译、文本摘要、情感分析、问答系统、语音识别等。随着技术的不断发展,我们可以期待NLP在未来将发挥更加重要的作用,为人类与计算机之间的自然交互提供更加流畅和智能的解决方案。"
        }
    ]

    # 显示示例
    for i, example in enumerate(examples):
        with st.expander(f"示例 {i+1}: {example['title']}"):
            st.code(example["content"], language="markdown")
            if st.button(f"使用此示例", key=f"use_example_{i}", use_container_width=True, type="secondary"):
                # 将示例内容复制到主输入框
                st.session_state.input_text = example["content"]
                st.rerun()

# 显示分析结果
if st.session_state.token_analysis_results:
    results = st.session_state.token_analysis_results

    st.header("📊 Token分析结果")

    # 创建结果显示标签页
    overview_tab, tokenization_tab, statistics_tab, context_tab, cost_tab = st.tabs([
        "总体概览", 
        "Token分割", 
        "统计分析", 
        "上下文分析", 
        "成本估算"
    ])

    with overview_tab:
        st.subheader("分析概览")

        # 核心指标卡片
        col1, col2, col3, col4 = st.columns(4)

        # 获取基本信息
        total_tokens = 0
        if "tokenization" in results:
            total_tokens = results["tokenization"]["total_tokens"]
        elif "context_analysis" in results:
            total_tokens = results["context_analysis"]["total_tokens"]

        with col1:
            st.metric("总Token数", total_tokens)

        if "statistics" in results:
            with col2:
                st.metric("字符/Token比率", round(results["statistics"]["char_to_token_ratio"], 2))
            with col3:
                st.metric("特殊Token数", len(results["statistics"]["special_tokens"]))

        if "cost_estimation" in results:
            with col4:
                st.metric("估算总成本 (USD)", f"${results['cost_estimation']['total_cost']:.6f}")

        # 快速摘要
        st.subheader("关键发现")
        findings = []

        if "context_analysis" in results:
            usage_pct = results["context_analysis"]["context_usage_percentage"]
            if usage_pct > 90:
                findings.append("⚠️ **上下文窗口使用率超过90%**:当前文本长度接近模型的上下文窗口限制,可能导致重要信息被截断。")
            elif usage_pct > 70:
                findings.append("⚠️ **上下文窗口使用率较高**:建议优化提示词以减少不必要的token消耗。")
            else:
                findings.append("✅ **上下文窗口使用率合理**:有足够空间添加更多内容或处理较长的响应。")

        if "statistics" in results:
            if results["statistics"]["char_to_token_ratio"] < 1.5:
                findings.append("⚠️ **字符/Token比率偏低**:文本可能包含大量特殊字符、数字或代码,导致token效率较低。")

            if len(results["statistics"]["frequent_tokens"]) > 0 and results["statistics"]["frequent_tokens"][0][1] > total_tokens * 0.1:
                findings.append(f"ℹ️ **存在高频Token**:Token '{results['statistics']['frequent_tokens'][0][0]}' 出现 {results['statistics']['frequent_tokens'][0][1]} 次,占总Token的 {results['statistics']['frequent_tokens'][0][1] / total_tokens * 100:.1f}%。")

        if "cost_estimation" in results:
            if results["cost_estimation"]["total_cost"] > 0.01:
                findings.append(f"💸 **交互成本较高**:单次完整交互预计成本超过$0.01,对于高频使用场景建议优化。")

        if not findings:
            findings.append("✅ 分析完成,未发现明显问题。文本结构合理,Token使用效率良好。")

        for finding in findings:
            st.markdown(finding)

    with tokenization_tab:
        if "tokenization" in results:
            st.subheader("Token分割详细视图")

            # 创建Token表格
            tokens = results["tokenization"]["tokens"]
            token_ids = results["tokenization"]["token_ids"]

            # 创建DataFrame
            token_df = pd.DataFrame({
   
                "索引": list(range(len(tokens))),
                "Token": tokens,
                "Token ID": token_ids,
                "Token长度": [len(token) for token in tokens]
            })

            # 显示可排序的表格
            st.dataframe(
                token_df,
                use_container_width=True,
                hide_index=True,
                column_config={
   
                    "Token": st.column_config.TextColumn("Token (原始表示)"),
                    "Token ID": st.column_config.NumberColumn("Token ID"),
                    "Token长度": st.column_config.NumberColumn("Token长度")
                }
            )

            # Token可视化(前200个token)
            st.subheader("Token序列可视化")
            max_display = min(200, len(tokens))

            # 创建一个显示token的表格,每行显示10个token
            display_data = []
            for i in range(0, max_display, 10):
                row = {
   }
                for j in range(10):
                    if i + j < max_display:
                        token_idx = i + j
                        token_text = tokens[token_idx]
                        # 对于不可打印或空白字符进行特殊处理
                        if not token_text.strip() or not any(c.isprintable() for c in token_text):
                            token_text = f"[特殊字符]"
                        row[f"Token {j+1}"] = f"`{token_text}`"
                    else:
                        row[f"Token {j+1}"] = ""
                display_data.append(row)

            # 显示Token序列
            token_display_df = pd.DataFrame(display_data)
            st.dataframe(token_display_df, use_container_width=True, hide_index=True)

            if max_display < len(tokens):
                st.info(f"仅显示前{max_display}个Token(共{len(tokens)}个)")
        else:
            st.info("未执行Token分割分析。请在侧边栏中启用此选项。")

    with statistics_tab:
        if "statistics" in results:
            stats = results["statistics"]

            st.subheader("Token长度分布")
            # 创建Token长度分布直方图
            fig, ax = plt.subplots(figsize=(10, 6))
            sns.histplot(stats["token_lengths"], ax=ax, bins=20)
            ax.set_xlabel("Token长度 (字符数)")
            ax.set_ylabel("频率")
            ax.set_title("Token长度分布")
            st.pyplot(fig)

            st.subheader("高频Token")
            # 创建高频Token条形图
            if stats["frequent_tokens"]:
                tokens, counts = zip(*stats["frequent_tokens"])
                # 处理可能的不可打印字符
                display_tokens = []
                for token in tokens:
                    if not token.strip() or not any(c.isprintable() for c in token):
                        display_tokens.append("[特殊/空白字符]")
                    else:
                        display_tokens.append(token)

                fig, ax = plt.subplots(figsize=(10, 6))
                sns.barplot(x=counts, y=display_tokens, ax=ax)
                ax.set_xlabel("出现次数")
                ax.set_ylabel("Token")
                ax.set_title("Top 20 高频Token")
                st.pyplot(fig)
            else:
                st.info("未找到重复的Token")

            st.subheader("最长Token")
            if stats["long_tokens"]:
                # 创建最长Token表格
                long_token_df = pd.DataFrame(stats["long_tokens"], columns=["Token", "长度"])
                st.dataframe(long_token_df, use_container_width=True, hide_index=True)
            else:
                st.info("没有找到长度超过5个字符的Token")

            # 特殊Token信息
            if stats["special_tokens"]:
                st.subheader("特殊Token")
                st.write(f"发现 {len(stats['special_tokens'])} 个特殊Token:")
                for token in stats["special_tokens"]:
                    st.code(repr(token))  # 使用repr确保所有字符都可见
        else:
            st.info("未执行Token统计分析。请在侧边栏中启用此选项。")

    with context_tab:
        if "context_analysis" in results:
            context = results["context_analysis"]

            st.subheader("上下文窗口使用情况")

            # 创建进度条显示上下文使用率
            st.progress(min(context["context_usage_percentage"] / 100, 1.0))
            st.markdown(f"当前文本使用了 **{context['total_tokens']}/{context['context_window_size']}** Token ({context['context_usage_percentage']:.2f}%)")

            # 上下文窗口可视化
            fig, ax = plt.subplots(figsize=(12, 2))

            # 绘制上下文窗口
            ax.barh(0, context["context_window_size"], color="lightgray", height=0.4, label="上下文窗口大小")
            # 绘制已使用部分
            ax.barh(0, context["total_tokens"], color="skyblue", height=0.4, label="已使用Token")

            # 设置坐标轴
            ax.set_xlim(0, context["context_window_size"])
            ax.set_yticks([])
            ax.set_xlabel("Token数量")

            # 添加文本标签
            ax.text(context["total_tokens"] / 2, 0, f"已使用\n{context['total_tokens']} Token", 
                    ha="center", va="center", color="black", fontweight="bold")
            remaining_pos = context["total_tokens"] + (context["remaining_tokens"] / 2)
            ax.text(remaining_pos, 0, f"剩余\n{context['remaining_tokens']} Token", 
                    ha="center", va="center", color="black", fontweight="bold")

            # 添加图例
            ax.legend(loc="upper center", bbox_to_anchor=(0.5, -0.15), ncol=2)

            plt.tight_layout()
            st.pyplot(fig)

            # 段落Token分析
            if context["paragraph_analysis"]:
                st.subheader("段落Token分析(Top 10)")

                # 创建段落Token分析表格
                para_df = pd.DataFrame(context["paragraph_analysis"])
                para_df["占比"] = (para_df["token_count"] / context["total_tokens"] * 100).round(2)

                # 重新排序列
                para_df = para_df[["text", "token_count", "占比"]]

                # 显示表格
                st.dataframe(
                    para_df,
                    use_container_width=True,
                    hide_index=True,
                    column_config={
   
                        "text": st.column_config.TextColumn("段落文本(前50字符)"),
                        "token_count": st.column_config.NumberColumn("Token数量"),
                        "占比": st.column_config.NumberColumn("占总Token比例 (%)")
                    }
                )

                # 可视化段落Token分布
                fig, ax = plt.subplots(figsize=(10, 6))

                # 提取数据
                para_texts = [f"段落 {i+1}" for i in range(len(para_df))]
                para_tokens = para_df["token_count"].tolist()

                # 创建条形图
                bars = ax.barh(para_texts, para_tokens)

                # 在条形图上添加数值标签
                for bar in bars:
                    width = bar.get_width()
                    ax.text(width + 10, bar.get_y() + bar.get_height()/2, 
                            f'{width}', va='center', fontweight='bold')

                # 设置标题和标签
                ax.set_xlabel("Token数量")
                ax.set_title("段落Token分布(按Token数量降序)")

                plt.tight_layout()
                st.pyplot(fig)
        else:
            st.info("未执行上下文窗口分析。请在侧边栏中启用此选项。")

    with cost_tab:
        if "cost_estimation" in results:
            cost = results["cost_estimation"]

            st.subheader("成本分析详情")

            # 显示成本明细
            col1, col2 = st.columns(2)
            with col1:
                st.metric("输入Token数", cost["input_token_count"])
                st.metric("输入成本 (USD)", f"${cost['input_cost']:.6f}")
                st.metric("输入成本率 (USD/1K Token)", f"${cost['cost_per_1k_input']:.6f}")
            with col2:
                st.metric("输出Token数", cost["output_token_count"])
                st.metric("输出成本 (USD)", f"${cost['output_cost']:.6f}")
                st.metric("输出成本率 (USD/1K Token)", f"${cost['cost_per_1k_output']:.6f}")

            # 总成本
            st.metric("总成本 (USD)", f"${cost['total_cost']:.6f}")

            # 成本分布饼图
            labels = []
            values = []
            if cost["input_cost"] > 0:
                labels.append("输入成本")
                values.append(cost["input_cost"])
            if cost["output_cost"] > 0:
                labels.append("输出成本")
                values.append(cost["output_cost"])

            if values:
                fig, ax = plt.subplots(figsize=(8, 6))
                ax.pie(values, labels=labels, autopct='%1.1f%%', startangle=90)
                ax.axis('equal')  # 确保饼图是圆的
                ax.set_title('成本分布')
                st.pyplot(fig)

            # 成本优化建议
            st.subheader("成本优化建议")
            recommendations = []

            if cost["input_token_count"] > 1000:
                recommendations.append("📝 **优化输入文本**:考虑精简提示词,移除不必要的细节,使用更简洁的语言表达相同的意思。")

            if cost["total_cost"] > 0.01:
                recommendations.append("💰 **考虑成本效益**:对于高频使用的场景,评估是否可以使用更经济的模型或采用参数高效微调技术。")

            if "context_analysis" in results and results["context_analysis"]["context_usage_percentage"] > 70:
                recommendations.append("🎯 **优化上下文使用**:上下文窗口接近限制时,考虑使用更高效的信息组织方式,如摘要或结构化数据。")

            if not recommendations:
                recommendations.append("✅ 当前成本处于合理范围,继续监控使用情况即可。")

            for rec in recommendations:
                st.markdown(rec)
        else:
            st.info("未执行成本估算。请在侧边栏中启用此选项。")

# 实用工具部分
st.header("🛠️ Token优化工具")
st.markdown("以下工具可帮助您优化文本,减少Token消耗并提高效率。")

# 创建优化工具标签页
token_optimizer_tab, comparison_tab = st.tabs(["Token优化器", "文本比较器"])

with token_optimizer_tab:
    st.subheader("文本压缩与Token优化")

    # 输入要优化的文本
    optimize_text = st.text_area(
        "输入要优化的文本",
        value="",
        height=200,
        placeholder="在此输入需要优化Token使用的文本..."
    )

    # 优化选项
    st.subheader("优化选项")
    col1, col2 = st.columns(2)
    with col1:
        remove_extra_spaces = st.checkbox("移除多余空格", value=True)
        remove_repeated_phrases = st.checkbox("移除重复短语", value=True)
    with col2:
        simplify_language = st.checkbox("简化语言表达", value=True)
        use_abbreviations = st.checkbox("使用常见缩写", value=False)

    # 执行优化按钮
    if st.button("执行文本优化", use_container_width=True, type="secondary"):
        if not optimize_text.strip():
            st.warning("请输入要优化的文本")
        else:
            try:
                # 获取tokenizer
                tokenizer = tiktoken.get_encoding(MODEL_TOKENIZERS[model_name])

                # 原始文本Token计数
                original_tokens = len(tokenizer.encode(optimize_text))

                # 执行优化
                optimized_text = optimize_text

                # 移除多余空格
                if remove_extra_spaces:
                    optimized_text = re.sub(r'\s+', ' ', optimized_text).strip()

                # 移除重复短语(简单实现)
                if remove_repeated_phrases:
                    # 简单的重复检测 - 查找连续重复的单词
                    optimized_text = re.sub(r'(\b\w+\b)(\s+\1)+', r'\1', optimized_text)

                # 简化语言表达(示例实现)
                if simplify_language:
                    # 一些常见的冗长表达替换为更简洁的表达
                    verbose_phrases = {
   
                        "in order to": "to",
                        "due to the fact that": "because",
                        "at this point in time": "now",
                        "in the event that": "if",
                        "with reference to": "about",
                        "on account of": "because",
                        "it is important to note that": "note that",
                        "in light of the fact that": "since",
                        "as a matter of fact": "actually",
                        "for the purpose of": "to",
                        "I would like to": "I want to"
                    }

                    for verbose, concise in verbose_phrases.items():
                        optimized_text = optimized_text.replace(verbose, concise)

                # 使用常见缩写(示例实现)
                if use_abbreviations:
                    abbreviations = {
   
                        "for example": "e.g.",
                        "that is": "i.e.",
                        "and so on": "etc.",
                        "approximately": "~"
                    }

                    for full_form, abbreviation in abbreviations.items():
                        optimized_text = optimized_text.replace(full_form, abbreviation)

                # 计算优化后的Token数
                optimized_tokens = len(tokenizer.encode(optimized_text))

                # 计算节省
                token_savings = original_tokens - optimized_tokens
                savings_percentage = (token_savings / original_tokens * 100) if original_tokens > 0 else 0

                # 显示结果
                st.subheader("优化结果")

                # 指标
                col1, col2, col3 = st.columns(3)
                with col1:
                    st.metric("原始Token数", original_tokens)
                with col2:
                    st.metric("优化后Token数", optimized_tokens)
                with col3:
                    st.metric("节省百分比", f"{savings_percentage:.2f}%")

                # 显示优化前后的文本对比
                st.text_area("优化后的文本", value=optimized_text, height=200, disabled=True)

                # 优化建议
                st.subheader("进一步优化建议")
                suggestions = []

                if original_tokens > 1000 and savings_percentage < 10:
                    suggestions.append("📋 **考虑分块处理**:对于长文本,可以考虑将内容分块处理,减少单次交互的Token消耗。")

                if any(phrase in optimize_text.lower() for phrase in ["please note", "just to let you know", "i think that"]):
                    suggestions.append("✂️ **移除冗余表达**:考虑移除礼貌用语和冗余表达,直接进入主题。")

                # 检测是否有可以结构化的内容
                if any(marker in optimize_text for marker in ["步骤", "方法", "要点", "注意事项", "建议"]):
                    suggestions.append("📊 **使用结构化格式**:考虑使用列表、表格等结构化格式,通常比纯文本更高效。")

                if not suggestions:
                    suggestions.append("✅ 文本已经优化得很好!如需进一步减少Token,可以考虑更激进的改写或重组内容结构。")

                for suggestion in suggestions:
                    st.markdown(suggestion)

            except Exception as e:
                st.error(f"优化过程中出错: {str(e)}")

with comparison_tab:
    st.subheader("多版本文本Token使用对比")
    st.markdown("比较不同版本的提示词或文本,找出Token效率最高的版本。")

    # 创建比较输入区域
    comparison_texts = []
    for i in range(3):
        text = st.text_area(
            f"版本 {i+1}",
            value="",
            height=150,
            key=f"compare_text_{i}"
        )
        comparison_texts.append(text)

    # 执行比较
    if st.button("比较Token使用", use_container_width=True, type="secondary"):
        # 检查是否有至少两个非空文本
        non_empty_texts = [text for text in comparison_texts if text.strip()]
        if len(non_empty_texts) < 2:
            st.warning("请至少输入两个版本的文本进行比较")
        else:
            try:
                # 获取tokenizer
                tokenizer = tiktoken.get_encoding(MODEL_TOKENIZERS[model_name])

                # 计算每个版本的Token数
                comparison_results = []
                for i, text in enumerate(comparison_texts):
                    if text.strip():
                        token_count = len(tokenizer.encode(text))
                        comparison_results.append({
   
                            "版本": f"版本 {i+1}",
                            "Token数": token_count,
                            "文本预览": text[:100] + "..." if len(text) > 100 else text
                        })

                # 创建结果表格
                result_df = pd.DataFrame(comparison_results)

                # 按Token数排序
                result_df = result_df.sort_values(by="Token数")

                # 显示结果
                st.subheader("比较结果")
                st.dataframe(result_df, use_container_width=True, hide_index=True)

                # 创建可视化对比图
                fig, ax = plt.subplots(figsize=(10, 6))

                versions = result_df["版本"].tolist()
                token_counts = result_df["Token数"].tolist()

                bars = ax.bar(versions, token_counts)

                # 为每个条形添加数值标签
                for bar in bars:
                    height = bar.get_height()
                    ax.text(bar.get_x() + bar.get_width()/2., height + 5,
                            f'{height}', ha='center', va='bottom')

                # 标记最有效的版本
                if len(versions) > 0:
                    ax.bar(versions[0], token_counts[0], color='green', alpha=0.7, label="最有效版本")
                    ax.legend()

                ax.set_xlabel("文本版本")
                ax.set_ylabel("Token数量")
                ax.set_title("不同版本文本的Token使用对比")

                plt.tight_layout()
                st.pyplot(fig)

                # 最佳实践建议
                st.subheader("最佳实践建议")
                st.markdown("基于对比结果,请考虑以下建议:")
                st.markdown("1. **采用最有效版本**:选择使用最少Token但保留关键信息的版本")
                st.markdown("2. **混合策略**:可以从不同版本中提取最有效的表达和结构")
                st.markdown("3. **持续优化**:创建更多变体并进行A/B测试,找到最优解")
                st.markdown("4. **上下文管理**:将不变的背景信息移至系统提示,减少每次交互的Token消耗")

            except Exception as e:
                st.error(f"比较过程中出错: {str(e)}")

# 底部信息
st.divider()
st.markdown("提示:Token计数使用选定模型的tokenizer进行计算,实际API调用可能会有细微差异。")

7.3 功能解析与使用指南

Token级调试和分析工具为开发者提供了深入了解LLM输入文本的Token组成和使用效率的能力,帮助优化提示词和上下文,减少资源消耗并提高模型性能。

7.3.1 核心功能解析

1. Token分割分析

  • 详细Token视图:展示文本如何被分割为Token序列,包括原始Token表示和对应的Token ID
  • Token序列可视化:以表格形式直观显示Token序列,帮助识别特殊字符和Token边界
  • 分割模式理解:通过观察Token分割方式,理解模型如何处理不同类型的文本

2. Token统计分析

  • 长度分布分析:显示Token长度分布情况,帮助识别异常长或短的Token
  • 高频Token识别:找出文本中重复出现的高频Token,有助于优化冗余表达
  • 特殊Token检测:识别可能影响模型理解的特殊字符和Token
  • 字符/Token比率:评估文本的Token效率,指导优化方向

3. 上下文窗口分析

  • 窗口使用率监控:实时监控文本在模型上下文窗口中的占用比例
  • 剩余空间估算:计算上下文中剩余可用的Token数量
  • 段落级Token分析:识别最消耗Token的段落,指导内容优化
  • 可视化展示:通过进度条和图表直观展示上下文使用情况

4. 成本估算

  • 精确成本计算:根据输入和输出Token数量计算API调用成本
  • 成本分布分析:区分输入和输出成本,找出成本优化重点
  • 优化建议:根据成本分析提供针对性的优化建议

5. 优化工具套件

  • 文本压缩器:自动执行多种优化策略,减少Token消耗
  • 多版本比较:对比不同版本提示词的Token使用效率
  • 优化建议生成:提供针对性的文本优化策略和建议

7.3.2 使用流程

基本使用流程:

  1. 选择模型和分析选项

    • 在侧边栏选择目标模型(决定使用的tokenizer)
    • 启用需要的分析功能(Token分割、统计分析等)
    • 配置上下文窗口大小和成本参数(如果需要)
  2. 输入分析文本

    • 在主界面输入要分析的提示词或上下文文本
    • 可选:输入模拟响应文本以估算完整交互成本
    • 或者从示例库中选择预设的分析示例
  3. 执行分析

    • 点击"执行Token分析"按钮
    • 等待分析完成,查看结果面板
  4. 分析和优化

    • 查看总体概览了解关键发现
    • 在各标签页详细分析Token分割、统计、上下文和成本
    • 使用优化工具改进文本效率

高级使用流程:

  1. 批量分析与比较

    • 创建同一提示的多个变体
    • 使用"文本比较器"比较不同版本的Token效率
    • 选择最佳版本进行进一步优化
  2. 系统性优化

    • 分析原始提示的Token使用情况
    • 使用"Token优化器"进行初步优化
    • 分析优化后的结果,进行迭代调整
    • 最终评估优化效果和成本节省

7.3.3 实用技巧

1. 识别Token效率低下的文本模式

  • 检测高频重复:通过统计分析识别重复出现的Token和短语
  • 注意特殊字符:特殊字符、emoji和非ASCII字符通常会消耗更多Token
  • 长段落拆分:将长段落拆分为结构化格式(列表、表格等)通常更高效

2. 上下文窗口管理策略

  • 优先保留关键信息:确保最重要的上下文占用最少的Token
  • 分层提示结构:将背景信息、指令和具体内容分层组织
  • 动态上下文管理:针对长对话,实现智能上下文截断和保留策略

3. 成本优化最佳实践

  • 提示模板化:创建高效的提示模板,减少每次交互的变化部分
  • 增量更新策略:只在必要时更新上下文,而不是重复发送不变的信息
  • 摘要技术:对长文本进行摘要处理,保留关键信息同时减少Token消耗
  • 批处理机制:将多个任务批处理以提高整体Token效率

7.4 应用场景

Token级调试和分析工具适用于以下场景:

  • 提示词优化:优化现有提示词,减少Token消耗同时保持效果
  • 上下文管理:高效管理长对话或多轮交互中的上下文信息
  • 成本控制:监控和优化API调用成本,特别适用于高频使用场景
  • 性能调优:减少Token数量以提高模型响应速度
  • 提示工程教学:学习和理解Token化过程,提升提示工程技能
  • 长文本处理:分析和优化超长文本的处理策略,避免上下文溢出

7.5 小结

本章介绍了Token级调试和分析工具的设计原理、实现代码和使用方法。这个工具使开发者能够深入了解模型如何处理和使用Token,从而优化提示词、控制成本并提高应用性能。

通过精确分析Token分割、统计特征、上下文使用和成本构成,开发者可以发现优化机会并实施针对性改进。工具还提供了自动化的优化功能和多版本比较能力,使提示工程过程更加系统化和数据驱动。

第8章 性能监控与日志工具

8.1 性能监控的必要性与核心指标

在LLM应用开发和部署过程中,性能监控和日志分析是确保系统稳定运行、及时发现问题并持续优化的关键环节。有效的性能监控可以帮助开发者识别性能瓶颈、优化资源使用、提高用户体验并降低运营成本。

8.1.1 性能监控的核心价值

  • 实时问题检测:及时发现和定位系统中的性能异常和错误
  • 资源利用优化:监控计算资源、内存、API调用等使用情况,实现资源高效利用
  • 用户体验保障:跟踪响应时间和交互流畅度,确保良好的用户体验
  • 成本控制:监控API调用频率、Token消耗和资源占用,实现成本精细化管理
  • 趋势分析与预测:通过历史数据趋势分析,预测潜在问题并进行预防性优化
  • 决策支持:为系统扩容、架构调整和技术选型提供数据支持

8.1.2 LLM应用的关键性能指标

  • 响应时间:从发送请求到接收响应的总时间,包括API调用时间和处理时间
  • 吞吐量:单位时间内系统处理的请求数量
  • API调用频率:单位时间内调用LLM API的次数
  • Token消耗率:单位时间内消耗的Token总数,包括输入和输出Token
  • 错误率:失败请求占总请求的百分比,包括API错误和应用错误
  • 资源利用率:CPU、内存、网络带宽等系统资源的使用情况
  • 并发连接数:系统同时处理的活跃连接数量
  • 缓存命中率:对于实现了缓存机制的应用,缓存命中的比例

8.2 实现代码

下面是一个完整的LLM应用性能监控与日志工具的实现代码:

import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime, timedelta
import time
import json
import os
import re
import random
from collections import deque, Counter
import threading
import uuid
from typing import List, Dict, Any, Optional, Union, Callable
import logging

# 设置页面配置
st.set_page_config(
    page_title="LLM应用性能监控与日志工具",
    page_icon="📊",
    layout="wide"
)

# 创建日志目录
log_dir = "logs"
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

# 配置日志记录
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(os.path.join(log_dir, "app.log")),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("llm_monitor")

# 初始化会话状态
if "performance_data" not in st.session_state:
    st.session_state.performance_data = []
if "log_entries" not in st.session_state:
    st.session_state.log_entries = []
if "alert_thresholds" not in st.session_state:
    st.session_state.alert_thresholds = {
   
        "response_time": 2.0,  # 秒
        "error_rate": 5.0,    # 百分比
        "token_usage": 1000,  # 每请求平均Token数
        "api_calls": 60       # 每分钟API调用次数
    }
if "dashboard_layout" not in st.session_state:
    st.session_state.dashboard_layout = "grid"
if "data_retention_days" not in st.session_state:
    st.session_state.data_retention_days = 7

# 模拟数据生成器类
class MockDataGenerator:
    def __init__(self):
        self.model_types = ["gpt-3.5-turbo", "gpt-4", "text-davinci-003", "custom-model-1"]
        self.endpoints = ["/api/chat", "/api/completion", "/api/embedding", "/api/fine-tune"]
        self.user_ids = [f"user_{i}" for i in range(1, 21)]
        self.status_codes = [200, 200, 200, 200, 400, 429, 500]
        self.today = datetime.now().date()

    def generate_performance_record(self, timestamp=None):
        """生成单条性能记录"""
        if timestamp is None:
            timestamp = datetime.now()

        model = random.choice(self.model_types)
        endpoint = random.choice(self.endpoints)

        # 根据模型和端点模拟不同的性能特征
        base_response_time = 0.5 if model == "gpt-3.5-turbo" else 1.5 if model == "gpt-4" else 0.8
        response_time = max(0.1, base_response_time + random.gauss(0, 0.3))

        status_code = random.choice(self.status_codes)
        is_error = status_code >= 400

        input_tokens = random.randint(50, 2000)
        # 输出Token数通常与输入有关,但有随机性
        output_tokens = max(10, int(input_tokens * random.uniform(0.3, 1.5)))

        user_id = random.choice(self.user_ids)
        request_id = str(uuid.uuid4())[:8]

        return {
   
            "timestamp": timestamp,
            "request_id": request_id,
            "user_id": user_id,
            "model": model,
            "endpoint": endpoint,
            "status_code": status_code,
            "is_error": is_error,
            "response_time": round(response_time, 3),
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "total_tokens": input_tokens + output_tokens,
            "cost_usd": round((input_tokens * 0.0015 + output_tokens * 0.002) / 1000, 6) if model == "gpt-3.5-turbo" else \
                         round((input_tokens * 0.03 + output_tokens * 0.06) / 1000, 6)
        }

    def generate_log_entry(self, timestamp=None):
        """生成单条日志记录"""
        if timestamp is None:
            timestamp = datetime.now()

        log_levels = ["INFO", "INFO", "INFO", "INFO", "INFO", "WARNING", "ERROR"]
        log_level = random.choice(log_levels)

        user_id = random.choice(self.user_ids)
        request_id = str(uuid.uuid4())[:8]

        log_templates = {
   
            "INFO": [
                f"Request {request_id} completed successfully for user {user_id}",
                f"Model loaded: {random.choice(self.model_types)}",
                f"Cache hit for prompt pattern: {random.randint(100, 999)}",
                f"System health check passed at {timestamp}"
            ],
            "WARNING": [
                f"High latency detected: {round(random.uniform(2.0, 5.0), 2)}s for request {request_id}",
                f"Approaching rate limit for API key: {random.randint(1000, 5000)}/10000",
                f"Large input detected: {random.randint(2000, 5000)} tokens for user {user_id}",
                f"Cache miss rate high: {random.randint(70, 95)}% in last minute"
            ],
            "ERROR": [
                f"API request failed: Status {random.choice([400, 401, 429, 500])} for request {request_id}",
                f"Token limit exceeded for model {random.choice(self.model_types)}",
                f"Connection timeout to API endpoint {random.choice(self.endpoints)}",
                f"Invalid request parameters from user {user_id}"
            ]
        }

        message = random.choice(log_templates[log_level])

        return {
   
            "timestamp": timestamp,
            "log_level": log_level,
            "user_id": user_id,
            "request_id": request_id,
            "message": message
        }

    def generate_historical_data(self, days=7, entries_per_day=1000):
        """生成历史性能数据"""
        historical_data = []

        for day in range(days):
            current_date = self.today - timedelta(days=day)

            for _ in range(entries_per_day):
                # 在一天内随机分布时间
                hours = random.randint(0, 23)
                minutes = random.randint(0, 59)
                seconds = random.randint(0, 59)
                timestamp = datetime.combine(current_date, datetime.min.time())
                timestamp = timestamp.replace(hour=hours, minute=minutes, second=seconds)

                # 工作时间(9-18点)有更高的请求频率
                if 9 <= hours <= 18 and random.random() < 0.7:
                    historical_data.append(self.generate_performance_record(timestamp))
                elif random.random() < 0.3:
                    historical_data.append(self.generate_performance_record(timestamp))

        return historical_data

    def generate_historical_logs(self, days=7, entries_per_day=200):
        """生成历史日志数据"""
        historical_logs = []

        for day in range(days):
            current_date = self.today - timedelta(days=day)

            for _ in range(entries_per_day):
                # 在一天内随机分布时间
                hours = random.randint(0, 23)
                minutes = random.randint(0, 59)
                seconds = random.randint(0, 59)
                timestamp = datetime.combine(current_date, datetime.min.time())
                timestamp = timestamp.replace(hour=hours, minute=minutes, second=seconds)

                historical_logs.append(self.generate_log_entry(timestamp))

        return historical_logs

# 初始化数据生成器
data_generator = MockDataGenerator()

# 页面标题
st.title("📊 LLM应用性能监控与日志工具")

# 创建标签页
monitoring_tab, logs_tab, alerts_tab, settings_tab = st.tabs([
    "性能监控", 
    "日志分析", 
    "告警管理", 
    "系统设置"
])

# 侧边栏配置
with st.sidebar:
    st.header("监控配置")

    # 时间范围选择
    time_range_options = [
        "过去1小时", 
        "过去6小时", 
        "过去12小时", 
        "过去24小时", 
        "过去7天", 
        "过去30天",
        "自定义范围"
    ]

    selected_time_range = st.selectbox(
        "数据时间范围",
        options=time_range_options,
        index=3  # 默认过去24小时
    )

    # 自定义时间范围选择
    custom_start_date = None
    custom_end_date = None

    if selected_time_range == "自定义范围":
        col1, col2 = st.columns(2)
        with col1:
            custom_start_date = st.date_input("开始日期")
        with col2:
            custom_end_date = st.date_input("结束日期")

    # 数据刷新间隔
    refresh_interval = st.slider(
        "数据刷新间隔(秒)",
        min_value=5,
        max_value=60,
        value=15,
        step=5
    )

    # 布局选择
    layout_options = ["网格布局", "列表布局"]
    dashboard_layout = st.radio(
        "仪表板布局",
        options=layout_options,
        index=0,
        horizontal=True
    )

    st.session_state.dashboard_layout = "grid" if dashboard_layout == "网格布局" else "list"

    # 分隔线
    st.divider()

    # 测试数据生成器
    st.header("测试数据")

    if st.button("生成测试性能数据", use_container_width=True, type="secondary"):
        # 清除现有数据
        st.session_state.performance_data.clear()
        # 生成测试数据
        st.session_state.performance_data = data_generator.generate_historical_data(days=3, entries_per_day=500)
        logger.info(f"生成了 {len(st.session_state.performance_data)} 条测试性能数据记录")
        st.success("测试性能数据生成完成")

    if st.button("生成测试日志数据", use_container_width=True, type="secondary"):
        # 清除现有日志
        st.session_state.log_entries.clear()
        # 生成测试日志
        st.session_state.log_entries = data_generator.generate_historical_logs(days=3, entries_per_day=100)
        logger.info(f"生成了 {len(st.session_state.log_entries)} 条测试日志记录")
        st.success("测试日志数据生成完成")

    # 实时数据模拟开关
    st.divider()
    st.header("实时监控")

    enable_realtime = st.checkbox("启用实时数据模拟", value=False)

# 性能监控标签页
with monitoring_tab:
    st.header("性能指标概览")

    # 根据选择的时间范围过滤数据
    def filter_data_by_time_range(data, time_range, custom_start=None, custom_end=None):
        if not data:
            return []

        now = datetime.now()
        filtered_data = []

        if time_range == "过去1小时":
            cutoff_time = now - timedelta(hours=1)
        elif time_range == "过去6小时":
            cutoff_time = now - timedelta(hours=6)
        elif time_range == "过去12小时":
            cutoff_time = now - timedelta(hours=12)
        elif time_range == "过去24小时":
            cutoff_time = now - timedelta(days=1)
        elif time_range == "过去7天":
            cutoff_time = now - timedelta(days=7)
        elif time_range == "过去30天":
            cutoff_time = now - timedelta(days=30)
        elif time_range == "自定义范围" and custom_start and custom_end:
            # 对于自定义范围,需要转换为datetime对象
            start_datetime = datetime.combine(custom_start, datetime.min.time())
            end_datetime = datetime.combine(custom_end, datetime.max.time())
            return [item for item in data if start_datetime <= item["timestamp"] <= end_datetime]
        else:
            return data

        return [item for item in data if item["timestamp"] >= cutoff_time]

    # 获取过滤后的性能数据
    filtered_performance_data = filter_data_by_time_range(
        st.session_state.performance_data,
        selected_time_range,
        custom_start_date,
        custom_end_date
    )

    # 如果没有数据,提示用户生成测试数据
    if not filtered_performance_data:
        st.warning("没有性能数据可供分析。请在侧边栏生成测试数据或等待系统收集数据。")
    else:
        # 创建性能指标概览
        col1, col2, col3, col4 = st.columns(4)

        # 总请求数
        total_requests = len(filtered_performance_data)
        col1.metric("总请求数", f"{total_requests:,}")

        # 平均响应时间
        avg_response_time = np.mean([item["response_time"] for item in filtered_performance_data])
        col2.metric("平均响应时间", f"{avg_response_time:.2f}秒")

        # 错误率
        error_count = sum(1 for item in filtered_performance_data if item["is_error"])
        error_rate = (error_count / total_requests * 100) if total_requests > 0 else 0
        col3.metric("错误率", f"{error_rate:.2f}%")

        # 总成本
        total_cost = sum(item["cost_usd"] for item in filtered_performance_data)
        col4.metric("总成本 (USD)", f"${total_cost:.4f}")

        # 根据布局选择显示不同的图表排列
        if st.session_state.dashboard_layout == "grid":
            # 网格布局:两行两列的图表
            row1_col1, row1_col2 = st.columns(2)
            row2_col1, row2_col2 = st.columns(2)

            # 图表1:响应时间趋势
            with row1_col1:
                st.subheader("响应时间趋势")

                # 按小时分组计算平均响应时间
                df = pd.DataFrame(filtered_performance_data)
                if not df.empty:
                    df['hour'] = df['timestamp'].dt.floor('H')
                    hourly_avg = df.groupby('hour')['response_time'].mean().reset_index()

                    fig = px.line(hourly_avg, x='hour', y='response_time', 
                                labels={
   'hour': '时间', 'response_time': '平均响应时间(秒)'}, 
                                title='每小时平均响应时间')
                    fig.update_layout(height=300)
                    st.plotly_chart(fig, use_container_width=True)

            # 图表2:请求量趋势
            with row1_col2:
                st.subheader("请求量趋势")

                if not df.empty:
                    hourly_count = df.groupby('hour').size().reset_index(name='count')

                    fig = px.bar(hourly_count, x='hour', y='count',
                                labels={
   'hour': '时间', 'count': '请求数量'}, 
                                title='每小时请求量')
                    fig.update_layout(height=300)
                    st.plotly_chart(fig, use_container_width=True)

            # 图表3:Token消耗分析
            with row2_col1:
                st.subheader("Token消耗分析")

                if not df.empty:
                    # 计算输入和输出Token的总和
                    total_input_tokens = df['input_tokens'].sum()
                    total_output_tokens = df['output_tokens'].sum()

                    fig = px.pie(
                        values=[total_input_tokens, total_output_tokens],
                        names=['输入Token', '输出Token'],
                        title='Token消耗分布',
                        hole=0.3
                    )
                    fig.update_layout(height=300)
                    st.plotly_chart(fig, use_container_width=True)

            # 图表4:模型使用分布
            with row2_col2:
                st.subheader("模型使用分布")

                if not df.empty:
                    model_counts = df['model'].value_counts().reset_index()
                    model_counts.columns = ['model', 'count']

                    fig = px.pie(model_counts, values='count', names='model', 
                                title='模型使用分布')
                    fig.update_layout(height=300)
                    st.plotly_chart(fig, use_container_width=True)
        else:
            # 列表布局:垂直排列所有图表
            st.subheader("响应时间趋势")
            df = pd.DataFrame(filtered_performance_data)
            if not df.empty:
                df['hour'] = df['timestamp'].dt.floor('H')
                hourly_avg = df.groupby('hour')['response_time'].mean().reset_index()

                fig = px.line(hourly_avg, x='hour', y='response_time', 
                            labels={
   'hour': '时间', 'response_time': '平均响应时间(秒)'}, 
                            title='每小时平均响应时间')
                st.plotly_chart(fig, use_container_width=True)

            st.subheader("请求量趋势")
            if not df.empty:
                hourly_count = df.groupby('hour').size().reset_index(name='count')

                fig = px.bar(hourly_count, x='hour', y='count',
                            labels={
   'hour': '时间', 'count': '请求数量'}, 
                            title='每小时请求量')
                st.plotly_chart(fig, use_container_width=True)

            st.subheader("Token消耗分析")
            if not df.empty:
                total_input_tokens = df['input_tokens'].sum()
                total_output_tokens = df['output_tokens'].sum()

                fig = px.pie(
                    values=[total_input_tokens, total_output_tokens],
                    names=['输入Token', '输出Token'],
                    title='Token消耗分布',
                    hole=0.3
                )
                st.plotly_chart(fig, use_container_width=True)

            st.subheader("模型使用分布")
            if not df.empty:
                model_counts = df['model'].value_counts().reset_index()
                model_counts.columns = ['model', 'count']

                fig = px.pie(model_counts, values='count', names='model', 
                            title='模型使用分布')
                st.plotly_chart(fig, use_container_width=True)

        # 详细数据表格
        st.subheader("最近的性能数据")

        # 创建数据框并格式化
        recent_df = pd.DataFrame(filtered_performance_data)
        if not recent_df.empty:
            # 按时间戳降序排序
            recent_df = recent_df.sort_values('timestamp', ascending=False)

            # 格式化时间戳
            recent_df['timestamp'] = recent_df['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')

            # 只显示最近的100条记录
            display_df = recent_df.head(100)

            # 自定义列显示
            display_columns = ['timestamp', 'request_id', 'model', 'endpoint', 
                             'status_code', 'response_time', 'total_tokens', 'cost_usd']

            # 高亮显示错误行
            def highlight_errors(val):
                if isinstance(val, int) and val >= 400:
                    return 'background-color: #ffcccc'
                return ''

            styled_df = display_df[display_columns].style.applymap(highlight_errors, subset=['status_code'])
            st.dataframe(styled_df, use_container_width=True)

        # 导出数据选项
        st.subheader("数据导出")
        col1, col2 = st.columns(2)
        with col1:
            if st.button("导出CSV格式", use_container_width=True):
                if not recent_df.empty:
                    csv = recent_df.to_csv(index=False)
                    st.download_button(
                        label="下载CSV文件",
                        data=csv,
                        file_name=f'llm_performance_data_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv',
                        mime='text/csv',
                        use_container_width=True
                    )

        with col2:
            if st.button("导出JSON格式", use_container_width=True):
                if not recent_df.empty:
                    json_data = recent_df.to_json(orient='records', date_format='iso')
                    st.download_button(
                        label="下载JSON文件",
                        data=json_data,
                        file_name=f'llm_performance_data_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json',
                        mime='application/json',
                        use_container_width=True
                    )

# 日志分析标签页
with logs_tab:
    st.header("日志分析")

    # 获取过滤后的日志数据
    filtered_logs = filter_data_by_time_range(
        st.session_state.log_entries,
        selected_time_range,
        custom_start_date,
        custom_end_date
    )

    # 如果没有日志数据,提示用户生成测试数据
    if not filtered_logs:
        st.warning("没有日志数据可供分析。请在侧边栏生成测试日志数据或等待系统收集日志。")
    else:
        # 日志级别分布
        st.subheader("日志级别分布")

        log_level_counts = Counter([entry["log_level"] for entry in filtered_logs])

        # 创建日志级别分布饼图
        fig = px.pie(
            values=list(log_level_counts.values()),
            names=list(log_level_counts.keys()),
            title='日志级别分布',
            color_discrete_map={
   
                "INFO": "#3498db",
                "WARNING": "#f39c12",
                "ERROR": "#e74c3c"
            }
        )
        st.plotly_chart(fig, use_container_width=True)

        # 日志搜索和过滤
        st.subheader("日志搜索与过滤")

        col1, col2 = st.columns(2)
        with col1:
            search_term = st.text_input("搜索关键词")
        with col2:
            log_level_filter = st.selectbox(
                "日志级别过滤",
                options=["全部", "INFO", "WARNING", "ERROR"],
                index=0
            )

        # 应用过滤
        filtered_view = filtered_logs.copy()

        if search_term:
            filtered_view = [entry for entry in filtered_view if search_term.lower() in entry["message"].lower()]

        if log_level_filter != "全部":
            filtered_view = [entry for entry in filtered_view if entry["log_level"] == log_level_filter]

        # 显示过滤结果统计
        st.write(f"找到 {len(filtered_view)} 条匹配的日志记录")

        # 日志详情表格
        log_df = pd.DataFrame(filtered_view)
        if not log_df.empty:
            # 按时间戳降序排序
            log_df = log_df.sort_values('timestamp', ascending=False)

            # 格式化时间戳
            log_df['timestamp'] = log_df['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')

            # 只显示最近的200条记录
            display_log_df = log_df.head(200)

            # 高亮显示不同级别的日志
            def highlight_log_level(val):
                if val == "ERROR":
                    return 'background-color: #ffcccc'
                elif val == "WARNING":
                    return 'background-color: #fff3cd'
                return ''

            styled_log_df = display_log_df.style.applymap(
                highlight_log_level, 
                subset=['log_level']
            )

            st.dataframe(
                styled_log_df,
                use_container_width=True,
                column_config={
   
                    "timestamp": st.column_config.TextColumn("时间"),
                    "log_level": st.column_config.TextColumn("级别"),
                    "user_id": st.column_config.TextColumn("用户ID"),
                    "request_id": st.column_config.TextColumn("请求ID"),
                    "message": st.column_config.TextColumn("消息", width="large")
                }
            )

        # 日志分析见解
        st.subheader("日志分析见解")

        insights = []

        # 错误日志分析
        error_logs = [entry for entry in filtered_logs if entry["log_level"] == "ERROR"]
        if error_logs:
            # 分析错误消息模式
            error_messages = [entry["message"] for entry in error_logs]

            # 简单的模式检测
            timeout_count = sum(1 for msg in error_messages if "timeout" in msg.lower())
            rate_limit_count = sum(1 for msg in error_messages if "rate limit" in msg.lower())
            token_limit_count = sum(1 for msg in error_messages if "token limit" in msg.lower())

            if timeout_count > 0:
                insights.append(f"⚠️ 检测到 {timeout_count} 条超时错误,可能表明API连接不稳定或网络延迟问题")
            if rate_limit_count > 0:
                insights.append(f"⚠️ 检测到 {rate_limit_count} 条速率限制错误,建议调整请求频率或申请更高配额")
            if token_limit_count > 0:
                insights.append(f"⚠️ 检测到 {token_limit_count} 条Token限制错误,需要优化提示词或处理更长文本的策略")

        # 警告日志分析
        warning_logs = [entry for entry in filtered_logs if entry["log_level"] == "WARNING"]
        if warning_logs:
            high_latency_count = sum(1 for entry in warning_logs if "latency" in entry["message"].lower())
            if high_latency_count > 0:
                insights.append(f"⚡ 检测到 {high_latency_count} 条高延迟警告,建议优化提示词或检查系统资源")

        # 如果没有明显问题,显示正常消息
        if not insights:
            insights.append("✅ 日志分析未发现明显问题,系统运行正常")

        # 显示见解
        for insight in insights:
            st.markdown(insight)

        # 导出日志选项
        st.subheader("日志导出")
        col1, col2 = st.columns(2)
        with col1:
            if st.button("导出CSV格式", use_container_width=True):
                if not log_df.empty:
                    csv = log_df.to_csv(index=False)
                    st.download_button(
                        label="下载CSV文件",
                        data=csv,
                        file_name=f'llm_logs_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv',
                        mime='text/csv',
                        use_container_width=True
                    )

        with col2:
            if st.button("导出JSON格式", use_container_width=True):
                if not log_df.empty:
                    json_data = log_df.to_json(orient='records', date_format='iso')
                    st.download_button(
                        label="下载JSON文件",
                        data=json_data,
                        file_name=f'llm_logs_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json',
                        mime='application/json',
                        use_container_width=True
                    )

# 告警管理标签页
with alerts_tab:
    st.header("告警管理")

    # 告警规则设置
    st.subheader("告警阈值设置")

    col1, col2 = st.columns(2)
    with col1:
        response_time_threshold = st.number_input(
            "响应时间阈值(秒)",
            min_value=0.1,
            max_value=30.0,
            value=st.session_state.alert_thresholds["response_time"],
            step=0.1
        )

        error_rate_threshold = st.number_input(
            "错误率阈值(%)",
            min_value=0.1,
            max_value=100.0,
            value=st.session_state.alert_thresholds["error_rate"],
            step=0.1
        )

    with col2:
        token_usage_threshold = st.number_input(
            "每请求平均Token数阈值",
            min_value=100,
            max_value=10000,
            value=st.session_state.alert_thresholds["token_usage"],
            step=100
        )

        api_calls_threshold = st.number_input(
            "每分钟API调用次数阈值",
            min_value=10,
            max_value=1000,
            value=st.session_state.alert_thresholds["api_calls"],
            step=10
        )

    # 保存阈值设置
    if st.button("保存告警设置", use_container_width=True, type="primary"):
        st.session_state.alert_thresholds = {
   
            "response_time": response_time_threshold,
            "error_rate": error_rate_threshold,
            "token_usage": token_usage_threshold,
            "api_calls": api_calls_threshold
        }
        logger.info("告警阈值设置已更新")
        st.success("告警阈值设置已保存")

    # 告警通知设置
    st.subheader("告警通知设置")

    notification_methods = {
   
        "email": st.checkbox("启用邮件通知", value=False),
        "webhook": st.checkbox("启用Webhook通知", value=False),
        "slack": st.checkbox("启用Slack通知", value=False),
        "sms": st.checkbox("启用短信通知", value=False)
    }

    # 根据选择的通知方式显示相应的设置字段
    if notification_methods["email"]:
        email_recipients = st.text_input("邮件接收人(用逗号分隔)")

    if notification_methods["webhook"]:
        webhook_url = st.text_input("Webhook URL")

    # 模拟当前告警列表
    st.subheader("当前活动告警")

    # 检查是否有性能数据
    if not st.session_state.performance_data:
        st.info("没有性能数据,无法检查告警。请先生成或导入性能数据。")
    else:
        # 获取最近10分钟的数据用于告警检查
        recent_data = filter_data_by_time_range(st.session_state.performance_data, "过去1小时")

        # 检查告警条件
        alerts = []

        if recent_data:
            # 1. 检查响应时间
            avg_response_time = np.mean([item["response_time"] for item in recent_data])
            if avg_response_time > response_time_threshold:
                alerts.append({
   
                    "level": "警告",
                    "type": "响应时间过高",
                    "description": f"最近1小时平均响应时间为 {avg_response_time:.2f} 秒,超过阈值 {response_time_threshold} 秒",
                    "timestamp": datetime.now()
                })

            # 2. 检查错误率
            total_requests = len(recent_data)
            error_count = sum(1 for item in recent_data if item["is_error"])
            error_rate = (error_count / total_requests * 100) if total_requests > 0 else 0
            if error_rate > error_rate_threshold:
                alerts.append({
   
                    "level": "错误",
                    "type": "错误率过高",
                    "description": f"最近1小时错误率为 {error_rate:.2f}%,超过阈值 {error_rate_threshold}%",
                    "timestamp": datetime.now()
                })

            # 3. 检查平均Token使用量
            avg_tokens = np.mean([item["total_tokens"] for item in recent_data])
            if avg_tokens > token_usage_threshold:
                alerts.append({
   
                    "level": "警告",
                    "type": "Token使用量过高",
                    "description": f"最近1小时每请求平均使用 {avg_tokens:.1f} Token,超过阈值 {token_usage_threshold} Token",
                    "timestamp": datetime.now()
                })

            # 4. 检查API调用频率(每分钟)
            # 假设我们只有最近1小时的数据,简化计算
            if len(recent_data) > 0:
                # 计算总时长(小时)
                time_range_hours = 1.0  # 因为我们使用过去1小时的数据
                calls_per_minute = len(recent_data) / (time_range_hours * 60)

                if calls_per_minute > api_calls_threshold:
                    alerts.append({
   
                        "level": "警告",
                        "type": "API调用频率过高",
                        "description": f"当前API调用频率为每分钟 {calls_per_minute:.1f} 次,超过阈值每分钟 {api_calls_threshold} 次",
                        "timestamp": datetime.now()
                    })

        # 显示告警列表
        if alerts:
            # 按严重程度排序(错误 > 警告)
            alerts.sort(key=lambda x: 0 if x["level"] == "错误" else 1)

            # 创建告警表格
            alert_df = pd.DataFrame(alerts)
            alert_df['timestamp'] = alert_df['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')

            # 高亮显示不同级别的告警
            def highlight_alert_level(val):
                if val == "错误":
                    return 'background-color: #ffcccc'
                elif val == "警告":
                    return 'background-color: #fff3cd'
                return ''

            styled_alert_df = alert_df.style.applymap(
                highlight_alert_level, 
                subset=['level']
            )

            st.dataframe(
                styled_alert_df,
                use_container_width=True,
                hide_index=True,
                column_config={
   
                    "level": st.column_config.TextColumn("级别"),
                    "type": st.column_config.TextColumn("类型"),
                    "description": st.column_config.TextColumn("描述", width="large"),
                    "timestamp": st.column_config.TextColumn("时间")
                }
            )

            # 清除所有告警按钮
            if st.button("清除所有告警", use_container_width=True, type="secondary"):
                # 这里只是演示,实际上应该调用相应的告警清除API
                st.success("所有告警已标记为已处理")
        else:
            st.success("当前没有活动告警,系统运行正常")

    # 历史告警记录
    st.subheader("历史告警记录")

    # 模拟历史告警数据
    # 在实际应用中,这些数据应该从数据库或日志文件中加载
    historical_alerts = [
        {
   
            "level": "错误",
            "type": "API连接超时",
            "description": "API连接超时,无法访问模型服务",
            "timestamp": datetime.now() - timedelta(hours=2, minutes=30),
            "status": "已解决"
        },
        {
   
            "level": "警告",
            "type": "响应时间过高",
            "description": "平均响应时间达到3.5秒,超过阈值2.0秒",
            "timestamp": datetime.now() - timedelta(hours=5, minutes=15),
            "status": "已解决"
        },
        {
   
            "level": "错误",
            "type": "错误率过高",
            "description": "错误率达到10.5%,超过阈值5.0%",
            "timestamp": datetime.now() - timedelta(days=1, hours=3),
            "status": "已解决"
        },
        {
   
            "level": "警告",
            "type": "Token使用量过高",
            "description": "平均Token使用量达到1200,超过阈值1000",
            "timestamp": datetime.now() - timedelta(days=1, hours=12),
            "status": "已解决"
        }
    ]

    # 创建历史告警表格
    hist_alert_df = pd.DataFrame(historical_alerts)
    hist_alert_df['timestamp'] = hist_alert_df['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')

    st.dataframe(
        hist_alert_df,
        use_container_width=True,
        hide_index=True,
        column_config={
   
            "level": st.column_config.TextColumn("级别"),
            "type": st.column_config.TextColumn("类型"),
            "description": st.column_config.TextColumn("描述", width="large"),
            "timestamp": st.column_config.TextColumn("时间"),
            "status": st.column_config.TextColumn("状态")
        }
    )

# 系统设置标签页
with settings_tab:
    st.header("系统设置")

    # 数据保留设置
    st.subheader("数据保留策略")

    data_retention_days = st.slider(
        "性能数据保留天数",
        min_value=1,
        max_value=90,
        value=st.session_state.data_retention_days,
        step=1
    )

    if st.button("保存数据保留设置", use_container_width=True, type="secondary"):
        st.session_state.data_retention_days = data_retention_days
        logger.info(f"数据保留设置已更新为 {data_retention_days} 天")
        st.success(f"数据保留设置已保存:保留最近 {data_retention_days} 天的数据")

    # 监控配置导入/导出
    st.subheader("配置管理")

    col1, col2 = st.columns(2)
    with col1:
        if st.button("导出当前配置", use_container_width=True):
            # 创建配置数据
            config_data = {
   
                "alert_thresholds": st.session_state.alert_thresholds,
                "dashboard_layout": st.session_state.dashboard_layout,
                "data_retention_days": st.session_state.data_retention_days,
                "export_timestamp": datetime.now().isoformat()
            }

            # 转换为JSON字符串
            config_json = json.dumps(config_data, indent=2)

            # 提供下载
            st.download_button(
                label="下载配置文件",
                data=config_json,
                file_name=f'llm_monitor_config_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json',
                mime='application/json',
                use_container_width=True
            )

    with col2:
        uploaded_config = st.file_uploader("导入配置文件", type="json")
        if uploaded_config is not None:
            try:
                # 读取并解析配置文件
                config_data = json.load(uploaded_config)

                # 更新会话状态
                if "alert_thresholds" in config_data:
                    st.session_state.alert_thresholds = config_data["alert_thresholds"]
                if "dashboard_layout" in config_data:
                    st.session_state.dashboard_layout = config_data["dashboard_layout"]
                if "data_retention_days" in config_data:
                    st.session_state.data_retention_days = config_data["data_retention_days"]

                logger.info("配置文件已成功导入")
                st.success("配置文件导入成功")

                # 刷新页面以应用新配置
                st.rerun()

            except json.JSONDecodeError:
                st.error("配置文件格式错误,请检查文件是否为有效的JSON格式")
            except Exception as e:
                st.error(f"导入配置文件时出错: {str(e)}")

    # 系统信息
    st.subheader("系统信息")

    # 模拟系统信息
    system_info = {
   
        "监控系统版本": "1.0.0",
        "Python版本": "3.9.13",
        "Streamlit版本": "1.22.0",
        "数据收集状态": "活跃",
        "当前监控请求数": len(st.session_state.performance_data),
        "当前日志条目数": len(st.session_state.log_entries),
        "系统运行时间": "2天5小时32分钟"
    }

    # 显示系统信息
    for key, value in system_info.items():
        st.text(f"{key}: {value}")

# 实时数据模拟线程
if enable_realtime:
    # 创建一个占位符来显示实时状态
    realtime_status = st.empty()

    # 显示实时监控状态
    realtime_status.info("实时数据模拟已启动,数据将定期更新...")

    # 模拟实时数据更新(在实际应用中,这应该由后台线程完成)
    # 由于Streamlit的限制,我们使用按钮来触发更新
    st.markdown("---")
    st.subheader("实时监控控制")

    col1, col2 = st.columns(2)
    with col1:
        if st.button("手动更新数据", use_container_width=True):
            # 添加新的性能记录
            new_performance_record = data_generator.generate_performance_record()
            st.session_state.performance_data.append(new_performance_record)

            # 随机添加新的日志条目
            if random.random() < 0.3:  # 30%的概率添加日志
                new_log_entry = data_generator.generate_log_entry()
                st.session_state.log_entries.append(new_log_entry)

            logger.info(f"添加了新的性能数据记录: {new_performance_record['request_id']}")
            st.success("数据已更新")
            st.rerun()

    with col2:
        if st.button("停止实时模拟", use_container_width=True, type="secondary"):
            enable_realtime = False
            realtime_status.empty()
            st.success("实时数据模拟已停止")
            st.rerun()

    # 显示最后更新的时间
    st.caption(f"最后更新: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

    # 提示自动刷新
    st.info(f"页面将每 {refresh_interval} 秒自动刷新")

# 页脚信息
st.markdown("---")
st.caption("LLM应用性能监控与日志工具 | 版本 1.0.0 | 最后更新: 2023-07-15")

# 注意事项说明
st.caption("注意: 本工具中的数据为模拟数据,仅供演示使用。在实际部署时,请连接到真实的数据源和日志系统。")

8.3 功能解析与使用指南

性能监控与日志工具为LLM应用提供了全面的监控、分析和告警功能,帮助开发者实时掌握系统运行状态,及时发现和解决问题,优化系统性能并降低运营成本。

8.3.1 核心功能解析

1. 性能监控仪表板

  • 关键指标概览:实时显示总请求数、平均响应时间、错误率、总成本等核心指标
  • 多维度趋势分析:通过时间序列图表展示响应时间、请求量、Token消耗等指标的变化趋势
  • 资源使用分布:分析不同模型、端点的使用情况,帮助优化资源分配
  • 灵活的数据筛选:支持按时间范围、模型类型、端点等多维度筛选和分析数据
  • 详细数据表格:提供可排序、可搜索的详细性能数据表格,支持数据导出

2. 日志管理与分析

  • 日志级别分布:可视化不同级别(INFO、WARNING、ERROR)日志的分布情况
  • 高效搜索过滤:支持按关键词、日志级别进行日志搜索和过滤
  • 异常模式识别:自动识别日志中的异常模式和常见错误类型
  • 智能分析见解:基于日志内容提供系统运行状况的分析见解和建议
  • 日志导出功能:支持将日志数据导出为CSV或JSON格式,便于进一步分析

3. 告警管理系统

  • 可配置的告警阈值:支持自定义响应时间、错误率、Token使用量、API调用频率等告警阈值
  • 多渠道通知:支持邮件、Webhook、Slack、短信等多种告警通知方式
  • 实时告警检测:持续监控系统指标,超过阈值时立即触发告警
  • 告警状态管理:支持查看、处理和清除活动告警
  • 历史告警记录:保存和查看历史告警记录,便于问题追溯和分析

4. 系统配置与管理

  • 数据保留策略:配置性能数据和日志的保留时间,平衡数据完整性和存储成本
  • 配置导入导出:支持监控配置的导入和导出,便于配置迁移和备份
  • 系统信息监控:显示系统版本、运行状态、数据统计等信息
  • 布局自定义:支持网格布局和列表布局,适应不同的使用习惯

8.3.2 使用流程

基本使用流程:

  1. 初始配置

    • 设置数据时间范围(过去1小时、6小时、1天等)
    • 配置数据刷新间隔
    • 选择仪表板布局(网格或列表)
  2. 性能监控

    • 查看关键性能指标概览
    • 分析各维度的性能趋势图表
    • 筛选和查看详细的性能数据
    • 根据需要导出性能数据
  3. 日志分析

    • 查看日志级别分布情况
    • 使用搜索和过滤功能查找特定日志
    • 分析日志中的错误模式和异常
    • 查看系统自动生成的分析见解
  4. 告警管理

    • 根据应用需求设置合适的告警阈值
    • 配置告警通知方式
    • 处理和响应活动告警
    • 查看历史告警记录

高级使用流程:

  1. 性能问题诊断

    • 识别异常指标(响应时间突增、错误率上升等)
    • 结合性能数据和日志分析定位问题根源
    • 查看相关时间段的详细日志
    • 根据分析结果制定解决方案
  2. 系统优化

    • 分析模型和端点的使用分布
    • 识别资源消耗热点(高Token消耗、高延迟路径)
    • 基于成本数据优化资源分配
    • 实施优化措施并监控效果
  3. 容量规划

    • 分析请求量的时间分布趋势
    • 预测未来的资源需求
    • 基于历史数据设置合理的告警阈值
    • 制定扩容或缩容策略

8.3.3 实用技巧

1. 性能优化技巧

  • 设置合理的告警阈值:根据应用特性和SLA要求,设置既不过于敏感也不过于宽松的告警阈值
  • 关注错误模式:通过日志分析识别反复出现的错误模式,优先解决系统性问题
  • 优化资源分配:根据模型使用分布,合理分配API配额和计算资源
  • 实施缓存策略:对于重复出现的相似请求,考虑实施缓存机制减少API调用

2. 监控数据利用

  • 建立性能基准:记录正常运行状态下的性能指标,作为问题诊断的参考基准
  • 定期数据分析:定期分析性能趋势和日志模式,发现潜在问题和优化机会
  • 关联分析:将性能数据与日志、业务指标关联起来,全面了解系统运行状况
  • 成本归因分析:通过性能数据进行成本归因,识别高成本的用户、场景或功能

3. 告警管理最佳实践

  • 分级告警机制:根据问题严重程度实施分级告警,避免告警疲劳
  • 告警聚合:对相似的告警进行聚合处理,减少重复通知
  • 告警升级流程:建立告警升级流程,确保严重问题得到及时处理
  • 告警有效性评估:定期评估告警的有效性,调整不合理的告警规则

8.4 应用场景

性能监控与日志工具适用于以下场景:

  • 生产环境监控:实时监控生产环境中LLM应用的运行状态和性能指标
  • 问题诊断与排查:快速定位和解决系统运行中出现的性能问题和错误
  • 成本优化与控制:监控和分析API调用成本,优化资源使用和成本结构
  • 系统容量规划:基于历史数据进行容量规划,确保系统能够应对业务增长
  • 服务水平保障:监控关键性能指标,确保系统满足服务水平协议(SLA)要求
  • 安全事件检测:通过日志分析识别潜在的安全问题和异常访问模式

8.5 小结

本章介绍了LLM应用性能监控与日志工具的设计原理、实现代码和使用方法。这个工具为开发者提供了全面的系统监控、日志分析和告警管理功能,帮助确保LLM应用的稳定运行、优化性能和降低成本。

通过实时监控关键性能指标、智能分析日志内容、灵活配置告警规则,开发者可以及时发现和解决系统运行中的问题,提高应用的可靠性和用户体验。工具还提供了丰富的数据可视化和分析功能,帮助开发者深入了解系统行为模式,为优化决策提供数据支持。

第9章 多模态调试与可视化工具

9.1 多模态LLM应用的调试挑战

随着大语言模型技术的发展,多模态LLM应用(结合文本、图像、音频等多种输入模态的应用)越来越普遍。与纯文本应用相比,多模态应用的调试面临着独特的挑战:

9.1.1 多模态调试的复杂性

  • 输入多样性:需要同时处理和分析不同类型的输入(文本、图像、音频等)
  • 模态间交互:理解不同模态之间的交互方式和信息流动
  • 视觉反馈需求:需要直观展示图像内容和模型对图像的理解
  • 错误定位困难:当输出不符合预期时,难以确定是哪个模态的问题
  • 性能瓶颈多样:不同模态可能有不同的性能瓶颈(如图像加载、文本处理等)
  • 调试工具缺乏:专门针对多模态应用的调试工具相对较少

9.1.2 关键监控指标

对于多模态LLM应用,除了传统的性能指标外,还需要关注以下特定指标:

  • 多模态处理时间:各模态输入的处理时间和整体处理时间
  • 模态融合效果:评估不同模态信息融合的有效性
  • 跨模态理解准确性:模型对不同模态间关系的理解准确性
  • 视觉元素识别率:对于图像输入,模型正确识别视觉元素的比例
  • 多模态上下文利用率:模型利用不同模态上下文的有效程度
  • 资源消耗分布:不同模态处理的资源消耗分布情况

9.2 实现代码

下面是一个完整的多模态LLM应用调试与可视化工具的实现代码:

import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from PIL import Image
import io
import base64
import json
import time
import uuid
import os
import re
import random
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional, Tuple
import logging

# 设置页面配置
st.set_page_config(
    page_title="多模态LLM应用调试与可视化工具",
    page_icon="🖼️",
    layout="wide"
)

# 创建日志目录
log_dir = "logs"
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

# 配置日志记录
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(os.path.join(log_dir, "multimodal_debug.log")),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("multimodal_debugger")

# 初始化会话状态
if "multimodal_sessions" not in st.session_state:
    st.session_state.multimodal_sessions = []
if "analysis_results" not in st.session_state:
    st.session_state.analysis_results = []
if "image_cache" not in st.session_state:
    st.session_state.image_cache = {
   }
if "selected_session" not in st.session_state:
    st.session_state.selected_session = None

# 模拟多模态会话数据生成器
class MockMultimodalSessionGenerator:
    def __init__(self):
        self.session_id_counter = 0
        self.user_ids = [f"user_{i}" for i in range(1, 21)]
        self.model_types = ["gpt-4-vision-preview", "gemini-pro-vision", "llava-13b", "custom-multimodal-1"]
        self.query_types = ["text_only", "image_only", "text_image", "multiple_images"]
        self.task_types = ["image_captioning", "visual_question_answering", "object_detection", 
                          "image_description", "text_to_image", "cross_modal_retrieval"]
        self.status_codes = ["success", "success", "success", "success", "error", "partial"]

    def generate_session(self):
        """生成一个模拟的多模态会话"""
        self.session_id_counter += 1
        session_id = f"session_{self.session_id_counter}"
        user_id = random.choice(self.user_ids)
        model = random.choice(self.model_types)
        query_type = random.choice(self.query_types)
        task_type = random.choice(self.task_types)
        status = random.choice(self.status_codes)
        timestamp = datetime.now() - timedelta(minutes=random.randint(1, 60*24))  # 过去24小时内的随机时间

        # 根据查询类型生成相应的输入
        input_text = None
        image_paths = []
        image_count = 0

        if query_type in ["text_only", "text_image", "multiple_images"]:
            # 生成模拟文本查询
            text_templates = {
   
                "image_captioning": ["请描述这张图片的内容", "这张图片展示了什么?", "详细解释图像中的元素"],
                "visual_question_answering": ["图片中有多少个人?", "物体的颜色是什么?", "这个场景发生在哪里?"],
                "object_detection": ["识别图片中的所有物体", "找出图中的主要对象", "标记图中的关键元素"],
                "image_description": ["详细描述这张照片", "这张图片的主题是什么?", "解释图片的上下文和背景"],
                "text_to_image": ["生成一张美丽的风景照片", "创建一个未来科技城市的图像", "绘制一个卡通人物形象"],
                "cross_modal_retrieval": ["找到与这段描述匹配的图像", "搜索相关的视觉内容", "匹配文本和图像"],
            }
            input_text = random.choice(text_templates[task_type])

        if query_type in ["image_only", "text_image"]:
            image_count = 1
        elif query_type == "multiple_images":
            image_count = random.randint(2, 5)

        # 生成模拟图像路径(在实际应用中,这些会是真实的图像)
        for i in range(image_count):
            image_id = f"img_{uuid.uuid4()[:8]}"
            image_paths.append(image_id)
            # 缓存模拟图像信息
            st.session_state.image_cache[image_id] = {
   
                "width": random.randint(300, 1200),
                "height": random.randint(300, 800),
                "size_kb": random.randint(50, 2000),
                "format": random.choice(["jpg", "png", "webp"]),
                "description": f"模拟图像 {i+1} - {task_type}"
            }

        # 生成模拟处理时间
        base_time = 1.0  # 基础处理时间
        text_time = len(input_text.split()) * 0.01 if input_text else 0  # 文本处理时间
        image_time = image_count * 0.5  # 每图像处理时间
        total_processing_time = base_time + text_time + image_time + random.gauss(0, 0.2)

        # 生成模拟Token统计
        input_tokens = random.randint(50, 500) + (len(input_text.split()) if input_text else 0)
        output_tokens = random.randint(100, 1000)

        # 生成模拟输出
        output_text = None
        if status in ["success", "partial"]:
            output_templates = {
   
                "image_captioning": ["这张图片展示了一个城市风景,包含高楼大厦和街道。天空晴朗,有几朵白云。", 
                                    "图片中有一位年轻人在公园长椅上阅读书籍。背景有树木和花卉。"],
                "visual_question_answering": ["图片中有3个人。", "物体的主要颜色是蓝色和白色。", "这个场景看起来像是在一家咖啡馆里。"],
                "object_detection": ["我在图片中识别到了:一个人、一台笔记本电脑、一杯咖啡和一个手机。",
                                    "主要物体包括:汽车、交通信号灯、行人、建筑物。"],
                "image_description": ["这是一张高清彩色照片,展示了一个阳光明媚的海滩场景。前景是金色的沙滩和清澈的海水,中景有几把遮阳伞和几个人在游泳,背景是蓝天白云和远处的山丘。整体色调明亮,给人一种轻松愉快的感觉。"],
                "text_to_image": ["已根据您的描述生成图像。图像展现了您要求的场景,包含了关键元素。",
                                  "图像生成完成,符合您的文本描述。您可以在右侧预览区域查看。"],
                "cross_modal_retrieval": ["找到3个与您的文本描述高度匹配的图像。这些图像包含了您提到的所有关键元素。",
                                         "检索完成,找到了最相关的视觉内容。匹配度评分:87%。"],
            }
            output_text = random.choice(output_templates[task_type])

            # 部分成功的情况
            if status == "partial":
                output_text += " [注意:由于图像质量或分辨率限制,某些细节可能无法完全识别。]"
        else:  # error
            error_templates = ["处理图像时发生错误:无法加载或解析图像文件。",
                             "模型错误:超出上下文窗口限制,请减少输入内容。",
                             "API错误:服务暂时不可用,请稍后重试。",
                             "参数错误:不支持的图像格式或尺寸。"]
            output_text = random.choice(error_templates)

        # 生成模拟的多模态分析数据
        attention_weights = None
        detected_objects = None
        visual_features = None

        if status != "error":
            # 生成模拟的注意力权重
            if input_text and image_count > 0:
                words = input_text.split()
                attention_weights = {
   }
                for i, word in enumerate(words):
                    attention_weights[word] = [round(random.uniform(0.1, 0.9), 2) for _ in range(image_count)]

            # 生成模拟的物体检测结果
            if task_type in ["object_detection", "visual_question_answering"]:
                object_types = ["person", "car", "building", "tree", "dog", "cat", "bicycle", "sign", "computer"]
                detected_objects = []
                for _ in range(random.randint(1, 5)):
                    obj = random.choice(object_types)
                    # 确保不会重复添加相同的物体
                    if obj not in [o["class"] for o in detected_objects]:
                        detected_objects.append({
   
                            "class": obj,
                            "confidence": round(random.uniform(0.6, 0.99), 2),
                            "bbox": {
   
                                "x1": round(random.uniform(0.1, 0.5), 2),
                                "y1": round(random.uniform(0.1, 0.5), 2),
                                "x2": round(random.uniform(0.5, 0.9), 2),
                                "y2": round(random.uniform(0.5, 0.9), 2)
                            }
                        })

            # 生成模拟的视觉特征
            visual_features = {
   
                "dominant_colors": [[random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)] for _ in range(3)],
                "brightness": round(random.uniform(0.3, 0.9), 2),
                "contrast": round(random.uniform(0.4, 0.8), 2),
                "saturation": round(random.uniform(0.3, 0.9), 2),
                "sharpness": round(random.uniform(0.5, 0.95), 2)
            }

        # 构建会话记录
        session = {
   
            "session_id": session_id,
            "user_id": user_id,
            "timestamp": timestamp,
            "model": model,
            "query_type": query_type,
            "task_type": task_type,
            "status": status,
            "input_text": input_text,
            "image_paths": image_paths,
            "output_text": output_text,
            "processing_time": {
   
                "total": round(total_processing_time, 3),
                "text_processing": round(text_time, 3),
                "image_processing": round(image_time, 3),
                "model_inference": round(base_time + random.gauss(0, 0.1), 3)
            },
            "token_statistics": {
   
                "input_tokens": input_tokens,
                "output_tokens": output_tokens,
                "total_tokens": input_tokens + output_tokens
            },
            "multimodal_analysis": {
   
                "attention_weights": attention_weights,
                "detected_objects": detected_objects,
                "visual_features": visual_features,
                "image_quality_metrics": [round(random.uniform(0.7, 0.95), 2) for _ in range(image_count)] if image_count > 0 else None,
                "cross_modal_alignment": round(random.uniform(0.5, 0.95), 2) if input_text and image_count > 0 else None
            }
        }

        return session

    def generate_multiple_sessions(self, count=10):
        """生成多个会话"""
        return [self.generate_session() for _ in range(count)]

# 初始化数据生成器
session_generator = MockMultimodalSessionGenerator()

# 页面标题
st.title("🖼️ 多模态LLM应用调试与可视化工具")

# 创建标签页
sessions_tab, analysis_tab, visualization_tab, settings_tab = st.tabs([
    "会话管理", 
    "性能分析", 
    "可视化调试", 
    "系统设置"
])

# 侧边栏配置
with st.sidebar:
    st.header("调试配置")

    # 会话过滤选项
    st.subheader("会话过滤")

    # 模型选择过滤
    available_models = sorted(list(set([s["model"] for s in st.session_state.multimodal_sessions] + session_generator.model_types)))
    selected_models = st.multiselect(
        "选择模型",
        options=available_models,
        default=[],
        help="选择要显示的模型"
    )

    # 查询类型过滤
    query_type_options = sorted(session_generator.query_types)
    selected_query_types = st.multiselect(
        "查询类型",
        options=query_type_options,
        default=[],
        help="选择要显示的查询类型"
    )

    # 任务类型过滤
    task_type_options = sorted(session_generator.task_types)
    selected_task_types = st.multiselect(
        "任务类型",
        options=task_type_options,
        default=[],
        help="选择要显示的任务类型"
    )

    # 状态过滤
    status_options = sorted(session_generator.status_codes)
    selected_statuses = st.multiselect(
        "会话状态",
        options=status_options,
        default=[],
        help="选择要显示的会话状态"
    )

    # 时间范围过滤
    time_range_options = [
        "全部时间", 
        "过去1小时", 
        "过去6小时", 
        "过去12小时", 
        "过去24小时", 
        "过去7天"
    ]
    selected_time_range = st.selectbox(
        "时间范围",
        options=time_range_options,
        index=0
    )

    # 清除过滤器按钮
    if st.button("清除所有过滤器", use_container_width=True, type="secondary"):
        selected_models = []
        selected_query_types = []
        selected_task_types = []
        selected_statuses = []
        selected_time_range = "全部时间"
        st.rerun()

    # 分隔线
    st.divider()

    # 测试数据生成器
    st.header("测试数据")

    # 生成测试会话数
    num_sessions = st.slider(
        "生成测试会话数",
        min_value=1,
        max_value=50,
        value=10,
        step=1
    )

    if st.button("生成测试会话数据", use_container_width=True, type="secondary"):
        # 生成测试会话
        new_sessions = session_generator.generate_multiple_sessions(num_sessions)
        st.session_state.multimodal_sessions.extend(new_sessions)
        logger.info(f"生成了 {len(new_sessions)} 个测试会话")
        st.success(f"成功生成 {len(new_sessions)} 个测试会话")

    # 清除所有数据按钮
    if st.button("清除所有数据", use_container_width=True, type="secondary"):
        st.session_state.multimodal_sessions.clear()
        st.session_state.analysis_results.clear()
        st.session_state.image_cache.clear()
        st.session_state.selected_session = None
        logger.info("已清除所有数据")
        st.success("所有数据已清除")

    # 分隔线
    st.divider()

    # 系统信息
    st.header("系统信息")
    st.text(f"当前会话数: {len(st.session_state.multimodal_sessions)}")
    if st.session_state.selected_session:
        st.text(f"选中会话: {st.session_state.selected_session['session_id']}")

    # 数据导出选项
    st.divider()
    st.header("数据导出")

    if st.button("导出所有会话数据", use_container_width=True, type="secondary"):
        if st.session_state.multimodal_sessions:
            # 将数据转换为JSON格式
            export_data = []
            for session in st.session_state.multimodal_sessions:
                # 转换datetime对象为字符串
                export_session = session.copy()
                export_session["timestamp"] = session["timestamp"].isoformat()
                export_data.append(export_session)

            json_data = json.dumps(export_data, indent=2, ensure_ascii=False)

            # 创建下载按钮
            st.download_button(
                label="下载JSON文件",
                data=json_data,
                file_name=f"multimodal_sessions_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
                mime="application/json",
                use_container_width=True
            )
        else:
            st.warning("没有会话数据可供导出")

# 会话管理标签页
with sessions_tab:
    st.header("会话列表")

    # 过滤会话数据
    def filter_sessions(sessions, models, query_types, task_types, statuses, time_range):
        filtered = sessions.copy()

        # 应用模型过滤
        if models:
            filtered = [s for s in filtered if s["model"] in models]

        # 应用查询类型过滤
        if query_types:
            filtered = [s for s in filtered if s["query_type"] in query_types]

        # 应用任务类型过滤
        if task_types:
            filtered = [s for s in filtered if s["task_type"] in task_types]

        # 应用状态过滤
        if statuses:
            filtered = [s for s in filtered if s["status"] in statuses]

        # 应用时间范围过滤
        now = datetime.now()
        if time_range == "过去1小时":
            cutoff_time = now - timedelta(hours=1)
            filtered = [s for s in filtered if s["timestamp"] >= cutoff_time]
        elif time_range == "过去6小时":
            cutoff_time = now - timedelta(hours=6)
            filtered = [s for s in filtered if s["timestamp"] >= cutoff_time]
        elif time_range == "过去12小时":
            cutoff_time = now - timedelta(hours=12)
            filtered = [s for s in filtered if s["timestamp"] >= cutoff_time]
        elif time_range == "过去24小时":
            cutoff_time = now - timedelta(days=1)
            filtered = [s for s in filtered if s["timestamp"] >= cutoff_time]
        elif time_range == "过去7天":
            cutoff_time = now - timedelta(days=7)
            filtered = [s for s in filtered if s["timestamp"] >= cutoff_time]

        # 按时间倒序排序
        filtered.sort(key=lambda x: x["timestamp"], reverse=True)

        return filtered

    # 获取过滤后的会话数据
    filtered_sessions = filter_sessions(
        st.session_state.multimodal_sessions,
        selected_models,
        selected_query_types,
        selected_task_types,
        selected_statuses,
        selected_time_range
    )

    # 如果没有会话数据,显示提示
    if not filtered_sessions:
        st.warning("没有找到匹配的会话数据。请在侧边栏生成测试数据或调整过滤条件。")
    else:
        # 会话列表表格
        st.subheader(f"找到 {len(filtered_sessions)} 个会话")

        # 准备表格数据
        table_data = []
        for session in filtered_sessions:
            # 格式化时间戳
            timestamp_str = session["timestamp"].strftime("%Y-%m-%d %H:%M:%S")

            # 构建表格行数据
            row = {
   
                "会话ID": session["session_id"],
                "用户ID": session["user_id"],
                "时间": timestamp_str,
                "模型": session["model"],
                "查询类型": session["query_type"],
                "任务类型": session["task_type"],
                "状态": session["status"],
                "处理时间(秒)": session["processing_time"]["total"],
                "图像数量": len(session["image_paths"]),
                "总Token数": session["token_statistics"]["total_tokens"]
            }
            table_data.append(row)

        # 创建DataFrame
        df = pd.DataFrame(table_data)

        # 创建会话选择器
        session_indices = st.dataframe(
            df,
            use_container_width=True,
            hide_index=True,
            on_select="rerun",
            selection_mode="single-row",
            column_config={
   
                "会话ID": st.column_config.TextColumn("会话ID"),
                "用户ID": st.column_config.TextColumn("用户ID"),
                "时间": st.column_config.TextColumn("时间"),
                "模型": st.column_config.TextColumn("模型"),
                "查询类型": st.column_config.TextColumn("查询类型"),
                "任务类型": st.column_config.TextColumn("任务类型"),
                "状态": st.column_config.TextColumn(
                    "状态",
                    width="small",
                    # 定义状态样式
                    format=lambda x: f"""<span style="color: {'green' if x == 'success' else 'red' if x == 'error' else 'orange'}">{x}</span>"""
                ),
                "处理时间(秒)": st.column_config.ProgressColumn(
                    "处理时间(秒)",
                    format="%.2f",
                    min_value=0,
                    max_value=df["处理时间(秒)"].max() if not df.empty else 10
                ),
                "图像数量": st.column_config.NumberColumn("图像数量", format="%d"),
                "总Token数": st.column_config.NumberColumn("总Token数", format="%d")
            }
        )

        # 显示选中会话的详细信息
        if len(session_indices.selection["rows"]) > 0:
            selected_idx = session_indices.selection["rows"][0]
            selected_session = filtered_sessions[selected_idx]
            st.session_state.selected_session = selected_session

            # 显示详细信息
            st.subheader("会话详细信息")

            # 创建会话详情布局
            col1, col2 = st.columns([2, 1])

            with col1:
                # 基本信息
                st.markdown("### 基本信息")
                info_data = {
   
                    "会话ID": selected_session["session_id"],
                    "用户ID": selected_session["user_id"],
                    "时间戳": selected_session["timestamp"].strftime("%Y-%m-%d %H:%M:%S"),
                    "模型": selected_session["model"],
                    "查询类型": selected_session["query_type"],
                    "任务类型": selected_session["task_type"],
                    "状态": selected_session["status"]
                }

                for key, value in info_data.items():
                    st.markdown(f"**{key}:** {value}")

                # 输入输出
                st.markdown("### 输入输出")

                # 显示输入文本
                if selected_session["input_text"]:
                    st.markdown("**输入文本:**")
                    st.text_area("", selected_session["input_text"], disabled=True, height=100)

                # 显示输出文本
                st.markdown("**输出文本:**")
                st.text_area("", selected_session["output_text"], disabled=True, height=150)

            with col2:
                # 性能指标
                st.markdown("### 性能指标")

                # 处理时间
                st.markdown("**处理时间 (秒):**")
                time_data = selected_session["processing_time"]

                time_df = pd.DataFrame([time_data]).T.reset_index()
                time_df.columns = ["阶段", "时间(秒)"]

                fig = px.pie(
                    time_df,
                    values="时间(秒)",
                    names="阶段",
                    title="处理时间分布",
                    hole=0.3
                )
                fig.update_layout(height=200)
                st.plotly_chart(fig, use_container_width=True)

                # Token统计
                st.markdown("**Token统计:**")
                token_data = selected_session["token_statistics"]

                col_a, col_b = st.columns(2)
                col_a.metric("输入Token", token_data["input_tokens"])
                col_b.metric("输出Token", token_data["output_tokens"])
                st.metric("总Token", token_data["total_tokens"])

                # 图像信息
                if selected_session["image_paths"]:
                    st.markdown("### 图像信息")
                    for i, img_id in enumerate(selected_session["image_paths"]):
                        if img_id in st.session_state.image_cache:
                            img_info = st.session_state.image_cache[img_id]
                            st.markdown(f"**图像 {i+1}:**")
                            st.markdown(f"- 尺寸: {img_info['width']}x{img_info['height']}")
                            st.markdown(f"- 大小: {img_info['size_kb']} KB")
                            st.markdown(f"- 格式: {img_info['format']}")

                            # 在实际应用中,这里会显示真实图像
                            # 由于我们使用的是模拟数据,这里只显示占位符
                            st.markdown(f"- 描述: {img_info['description']}")

                            # 显示一个模拟图像的占位符
                            st.markdown("

[图像预览占位符]
由于使用模拟数据,无法显示实际图像


# 性能分析标签页
with analysis_tab:
    st.header("性能分析")

    # 检查是否有会话数据
    if not st.session_state.multimodal_sessions:
        st.warning("没有会话数据可供分析。请先生成测试数据。")
    else:
        # 准备分析数据
        analysis_data = []
        for session in st.session_state.multimodal_sessions:
            data_point = {
                "timestamp": session["timestamp"],
                "model": session["model"],
                "query_type": session["query_type"],
                "task_type": session["task_type"],
                "status": session["status"],
                "total_processing_time": session["processing_time"]["total"],
                "text_processing_time": session["processing_time"]["text_processing"],
                "image_processing_time": session["processing_time"]["image_processing"],
                "model_inference_time": session["processing_time"]["model_inference"],
                "input_tokens": session["token_statistics"]["input_tokens"],
                "output_tokens": session["token_statistics"]["output_tokens"],
                "total_tokens": session["token_statistics"]["total_tokens"],
                "image_count": len(session["image_paths"])
            }

            # 添加多模态分析指标(如果有)
            if session["multimodal_analysis"]["cross_modal_alignment"] is not None:
                data_point["cross_modal_alignment"] = session["multimodal_analysis"]["cross_modal_alignment"]

            analysis_data.append(data_point)

        df = pd.DataFrame(analysis_data)

        # 创建性能概览卡片
        st.subheader("性能概览")
        col1, col2, col3, col4 = st.columns(4)

        # 平均处理时间
        avg_processing_time = df["total_processing_time"].mean()
        col1.metric("平均处理时间", f"{avg_processing_time:.2f}秒")

        # 平均Token消耗
        avg_total_tokens = df["total_tokens"].mean()
        col2.metric("平均Token消耗", f"{avg_total_tokens:.1f}")

        # 成功率
        success_rate = (df["status"] == "success").mean() * 100
        col3.metric("成功率", f"{success_rate:.1f}%")

        # 平均跨模态对齐度(如果有数据)
        if "cross_modal_alignment" in df.columns:
            avg_alignment = df["cross_modal_alignment"].mean()
            col4.metric("平均跨模态对齐度", f"{avg_alignment:.2f}")
        else:
            col4.metric("平均跨模态对齐度", "N/A")

        # 创建图表布局
        row1_col1, row1_col2 = st.columns(2)
        row2_col1, row2_col2 = st.columns(2)

        # 图表1:处理时间趋势
        with row1_col1:
            st.subheader("处理时间趋势")

            # 按小时分组计算平均处理时间
            df_time = df.copy()
            df_time['hour'] = df_time['timestamp'].dt.floor('H')
            hourly_avg = df_time.groupby('hour')['total_processing_time'].mean().reset_index()

            fig = px.line(
                hourly_avg,
                x='hour',
                y='total_processing_time',
                labels={'hour': '时间', 'total_processing_time': '平均处理时间(秒)'},
                title='每小时平均处理时间'
            )
            fig.update_layout(height=300)
            st.plotly_chart(fig, use_container_width=True)

        # 图表2:模型性能比较
        with row1_col2:
            st.subheader("模型性能比较")

            # 计算每个模型的平均处理时间
            model_performance = df.groupby('model')['total_processing_time'].agg(['mean', 'std']).reset_index()
            model_performance.columns = ['model', 'avg_time', 'std_time']

            fig = px.bar(
                model_performance,
                x='model',
                y='avg_time',
                error_y='std_time',
                labels={'model': '模型', 'avg_time': '平均处理时间(秒)'},
                title='各模型平均处理时间'
            )
            fig.update_layout(height=300)
            st.plotly_chart(fig, use_container_width=True)

        # 图表3:查询类型性能分析
        with row2_col1:
            st.subheader("查询类型性能分析")

            # 计算每种查询类型的平均处理时间
            query_type_perf = df.groupby('query_type')['total_processing_time'].mean().reset_index()

            fig = px.pie(
                query_type_perf,
                values='total_processing_time',
                names='query_type',
                title='各查询类型平均处理时间'
            )
            fig.update_layout(height=300)
            st.plotly_chart(fig, use_container_width=True)

        # 图表4:Token消耗与处理时间关系
        with row2_col2:
            st.subheader("Token消耗与处理时间关系")

            fig = px.scatter(
                df,
                x='total_tokens',
                y='total_processing_time',
                color='model',
                hover_data=['query_type', 'image_count'],
                labels={'total_tokens': '总Token数', 'total_processing_time': '处理时间(秒)'},
                title='Token消耗与处理时间相关性'
            )
            fig.update_layout(height=300)
            st.plotly_chart(fig, use_container_width=True)

        # 详细分析表格
        st.subheader("详细性能数据")

        # 准备显示数据
        display_df = df.copy()
        display_df['timestamp'] = display_df['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')

        # 选择要显示的列
        display_columns = ['timestamp', 'model', 'query_type', 'task_type', 'status', 
                          'total_processing_time', 'total_tokens', 'image_count']

        # 如果有跨模态对齐数据,也显示
        if 'cross_modal_alignment' in display_df.columns:
            display_columns.append('cross_modal_alignment')

        # 按时间戳降序排序
        display_df = display_df.sort_values('timestamp', ascending=False)

        # 高亮显示错误行
        def highlight_errors(val):
            if isinstance(val, str) and val == 'error':
                return 'background-color: #ffcccc'
            return ''

        styled_df = display_df[display_columns].style.applymap(highlight_errors, subset=['status'])
        st.dataframe(styled_df, use_container_width=True)

        # 性能分析见解
        st.subheader("性能分析见解")

        insights = []

        # 识别最慢的模型
        if not df.empty:
            slowest_model = df.groupby('model')['total_processing_time'].mean().idxmax()
            slowest_model_time = df.groupby('model')['total_processing_time'].mean().max()
            insights.append(f"⚠️ 模型 '{slowest_model}' 的平均处理时间最长 ({slowest_model_time:.2f}秒),可能需要优化或考虑替代方案")

        # 识别最慢的查询类型
        if not df.empty:
            slowest_query = df.groupby('query_type')['total_processing_time'].mean().idxmax()
            slowest_query_time = df.groupby('query_type')['total_processing_time'].mean().max()
            insights.append(f"⚡ 查询类型 '{slowest_query}' 的平均处理时间最长 ({slowest_query_time:.2f}秒),可能需要特别优化")

        # 识别错误率最高的模型
        error_rates = df.groupby('model')['status'].apply(lambda x: (x == 'error').mean() * 100)
        if not error_rates.empty:
            worst_error_model = error_rates.idxmax()
            worst_error_rate = error_rates.max()
            if worst_error_rate > 5:  # 只有当错误率超过5%时才提示
                insights.append(f"❌ 模型 '{worst_error_model}' 的错误率较高 ({worst_error_rate:.1f}%),建议检查配置或联系服务提供商")

        # 处理时间异常检测
        if not df.empty:
            mean_time = df['total_processing_time'].mean()
            std_time = df['total_processing_time'].std()
            threshold = mean_time + 2 * std_time
            slow_requests = df[df['total_processing_time'] > threshold]

            if len(slow_requests) > 0:
                insights.append(f"⏱️ 检测到 {len(slow_requests)} 个处理时间异常长的请求(超过 {threshold:.2f} 秒),建议进一步分析这些请求的特征")

        # Token消耗分析
        if not df.empty:
            high_token_requests = df[df['total_tokens'] > df['total_tokens'].quantile(0.9)]
            if len(high_token_requests) > 0:
                insights.append(f"🔤 检测到 {len(high_token_requests)} 个消耗Token较多的请求,可能需要优化提示词或考虑分块处理")

        # 如果没有明显问题,显示正常消息
        if not insights:
            insights.append("✅ 性能分析未发现明显问题,系统运行正常")

        # 显示见解
        for insight in insights:
            st.markdown(insight)

# 可视化调试标签页
with visualization_tab:
    st.header("多模态可视化调试")

    # 检查是否有选中的会话
    if not st.session_state.selected_session:
        st.warning("请先从会话管理标签页中选择一个会话进行详细调试")
    else:
        session = st.session_state.selected_session

        # 显示会话基本信息
        st.subheader(f"会话调试: {session['session_id']}")
        st.markdown(f"**模型:** {session['model']}")
        st.markdown(f"**任务类型:** {session['task_type']}")

        # 检查是否有可视化数据
        if session["status"] == "error":
            st.error(f"此会话发生错误,无法进行可视化调试\n错误信息: {session['output_text']}")
        else:
            # 创建调试面板布局
            debug_tabs = st.tabs([
                "输入输出分析", 
                "注意力可视化", 
                "物体检测", 
                "视觉特征分析",
                "性能瓶颈分析"
            ])

            # 输入输出分析标签
            with debug_tabs[0]:
                st.subheader("输入输出详细分析")

                # 显示输入分析
                st.markdown("### 输入分析")

                # 输入文本分析
                if session["input_text"]:
                    st.markdown("**输入文本分析:**")

                    # 文本统计
                    word_count = len(session["input_text"].split())
                    char_count = len(session["input_text"])

                    col1, col2 = st.columns(2)
                    col1.metric("单词数", word_count)
                    col2.metric("字符数", char_count)

                    # 关键词提取(模拟)
                    st.markdown("**提取的关键词:**")
                    # 简单的关键词提取逻辑(实际应用中可以使用更复杂的NLP方法)
                    words = session["input_text"].split()
                    keywords = random.sample([w for w in words if len(w) > 3], min(3, len(words)))
                    st.write(", ".join(keywords))

                    # 文本内容
                    st.markdown("**输入文本:**")
                    st.text_area("", session["input_text"], disabled=True, height=100)

                # 图像输入分析
                if session["image_paths"]:
                    st.markdown("**图像输入分析:**")

                    for i, img_id in enumerate(session["image_paths"]):
                        if img_id in st.session_state.image_cache:
                            img_info = st.session_state.image_cache[img_id]

                            st.markdown(f"**图像 {i+1}:**")

                            # 图像信息卡片
                            with st.expander(f"图像 {i+1} 详细信息"):
                                col1, col2 = st.columns(2)
                                with col1:
                                    st.markdown(f"- 尺寸: {img_info['width']}x{img_info['height']}")
                                    st.markdown(f"- 大小: {img_info['size_kb']} KB")
                                    st.markdown(f"- 格式: {img_info['format']}")
                                with col2:
                                    # 图像质量指标(如果有)
                                    if session["multimodal_analysis"]["image_quality_metrics"]:
                                        quality_score = session["multimodal_analysis"]["image_quality_metrics"][i]
                                        st.metric("质量分数", f"{quality_score:.2f}")

                                    # 显示一个模拟图像的占位符
                                    st.markdown("

[图像预览占位符]
由于使用模拟数据,无法显示实际图像


                    # 图像处理时间分析
                    if session["processing_time"]["image_processing"] > 0:
                        avg_image_time = session["processing_time"]["image_processing"] / len(session["image_paths"])
                        st.metric("平均每图像处理时间", f"{avg_image_time:.3f}秒")

                # 输出分析
                st.markdown("### 输出分析")

                # 输出文本分析
                if session["output_text"]:
                    # 文本统计
                    word_count = len(session["output_text"].split())
                    char_count = len(session["output_text"])

                    col1, col2 = st.columns(2)
                    col1.metric("单词数", word_count)
                    col2.metric("字符数", char_count)

                    # 输出内容
                    st.markdown("**输出文本:**")
                    st.text_area("", session["output_text"], disabled=True, height=150)

                    # 输出质量评估(模拟)
                    st.markdown("**输出质量评估:**")
                    quality_metrics = {
                        "相关性": round(random.uniform(0.7, 0.95), 2),
                        "完整性": round(random.uniform(0.6, 0.9), 2),
                        "准确性": round(random.uniform(0.65, 0.9), 2),
                        "连贯性": round(random.uniform(0.75, 0.95), 2)
                    }

                    for metric, score in quality_metrics.items():
                        st.progress(score, text=f"{metric}: {score:.2f}")

            # 注意力可视化标签
            with debug_tabs[1]:
                st.subheader("跨模态注意力可视化")

                # 检查是否有注意力数据
                if session["multimodal_analysis"]["attention_weights"]:
                    attention_data = session["multimodal_analysis"]["attention_weights"]

                    # 准备热力图数据
                    words = list(attention_data.keys())
                    image_indices = list(range(len(session["image_paths"])))

                    # 创建数据矩阵
                    attention_matrix = []
                    for word in words:
                        attention_row = attention_data[word]
                        attention_matrix.append(attention_row)

                    # 创建热力图
                    fig = px.imshow(
                        attention_matrix,
                        labels=dict(x="图像索引", y="输入词", color="注意力权重"),
                        x=[f"图像 {i+1}" for i in image_indices],
                        y=words,
                        color_continuous_scale="Viridis"
                    )
                    fig.update_layout(height=400, width=600)
                    st.plotly_chart(fig, use_container_width=True)

                    # 分析注意力模式
                    st.markdown("**注意力模式分析:**")

                    # 找出每个图像的最高注意力词
                    for img_idx in range(len(session["image_paths"])):
                        max_attention = 0
                        max_word = None

                        for word in words:
                            if attention_data[word][img_idx] > max_attention:
                                max_attention = attention_data[word][img_idx]
                                max_word = word

                        if max_word:
                            st.markdown(f"- 图像 {img_idx+1} 的最高注意力词: **{max_word}** (权重: {max_attention:.2f})")
                else:
                    st.info("此会话没有可用的注意力权重数据")

            # 物体检测标签
            with debug_tabs[2]:
                st.subheader("物体检测结果")

                # 检查是否有物体检测数据
                if session["multimodal_analysis"]["detected_objects"]:
                    detected_objects = session["multimodal_analysis"]["detected_objects"]

                    st.markdown(f"检测到 {len(detected_objects)} 个物体:")

                    # 显示检测到的物体列表
                    for i, obj in enumerate(detected_objects):
                        st.markdown(f"**物体 {i+1}: {obj['class']}**")
                        st.markdown(f"- 置信度: {obj['confidence']:.2f}")
                        st.markdown(f"- 边界框: x1={obj['bbox']['x1']:.2f}, y1={obj['bbox']['y1']:.2f}, ")
                        st.markdown(f"  x2={obj['bbox']['x2']:.2f}, y2={obj['bbox']['y2']:.2f}")

                        # 在实际应用中,这里会显示带有标记的图像
                        # 由于使用模拟数据,只显示一个简单的边界框示意图
                        st.code(f"""物体 {obj['class']} 的边界框示意:

    +--------------------------------+
    |                                |
    |                                |
    |     +----------------+         |
    |     |                |         |
    |     |    {obj['class']:^14}    |         |
    |     |                |         |
    |     +----------------+         |
    |                                |
    +--------------------------------+
""")
                else:
                    st.info("此会话没有可用的物体检测数据")

            # 视觉特征分析标签
            with debug_tabs[3]:
                st.subheader("视觉特征分析")

                # 检查是否有视觉特征数据
                if session["multimodal_analysis"]["visual_features"]:
                    visual_features = session["multimodal_analysis"]["visual_features"]

                    # 显示视觉质量指标
                    st.markdown("**图像质量指标:**")
                    quality_cols = st.columns(4)
                    quality_metrics = ["brightness", "contrast", "saturation", "sharpness"]

                    for i, metric in enumerate(quality_metrics):
                        value = visual_features[metric]
                        quality_cols[i].metric(metric.capitalize(), f"{value:.2f}")

                    # 显示主色调
                    st.markdown("**主色调分析:**")
                    colors = visual_features["dominant_colors"]

                    color_cols = st.columns(len(colors))
                    for i, color in enumerate(colors):
                        # 创建颜色预览
                        color_hex = f"#{color[0]:02x}{color[1]:02x}{color[2]:02x}"

                        with color_cols[i]:
                            st.markdown(f"<div style='background-color:{color_hex}; width:100%; height:50px; border-radius:5px;'></div>", unsafe_allow_html=True)
                            st.markdown(f"RGB: {color[0]}, {color[1]}, {color[2]}")
                            st.markdown(f"HEX: {color_hex}")
                else:
                    st.info("此会话没有可用的视觉特征数据")

            # 性能瓶颈分析标签
            with debug_tabs[4]:
                st.subheader("性能瓶颈分析")

                # 处理时间分布
                st.markdown("**处理时间分布:**")
                time_data = session["processing_time"]

                # 创建饼图
                time_labels = list(time_data.keys())
                time_values = list(time_data.values())

                fig = px.pie(
                    values=time_values,
                    names=time_labels,
                    title="各阶段处理时间占比",
                    hole=0.3
                )
                st.plotly_chart(fig, use_container_width=True)

                # 识别潜在瓶颈
                st.markdown("**潜在性能瓶颈:**")

                # 找出耗时最长的阶段
                max_time_phase = max(time_data, key=time_data.get)
                max_time_value = time_data[max_time_phase]

                # 计算百分比
                total_time = sum(time_values)
                max_time_percentage = (max_time_value / total_time) * 100

                # 判断是否为瓶颈
                if max_time_percentage > 50:
                    st.warning(f"⚠️ 潜在瓶颈: **{max_time_phase}** 阶段耗时最长,占总时间的 {max_time_percentage:.1f}%")

                    # 提供优化建议
                    optimization_suggestions = {
                        "text_processing": "考虑优化文本预处理逻辑,或使用更高效的文本解析库",
                        "image_processing": "可以尝试减小图像尺寸、降低图像质量,或使用更高效的图像处理算法",
                        "model_inference": "检查是否可以使用更轻量级的模型,或优化提示词以减少处理时间",
                        "total": "整体处理时间过长,建议检查系统资源分配或考虑异步处理"
                    }

                    if max_time_phase in optimization_suggestions:
                        st.info(f"优化建议: {optimization_suggestions[max_time_phase]}")
                else:
                    st.success("✅ 各阶段处理时间分布较为均衡,未发现明显瓶颈")

                # Token效率分析
                st.markdown("**Token效率分析:**")
                token_data = session["token_statistics"]

                # 计算每Token处理时间
                time_per_token = time_data["total"] / token_data["total_tokens"] if token_data["total_tokens"] > 0 else 0

                st.metric("每Token平均处理时间", f"{time_per_token:.4f}秒")

                # 分析Token使用效率
                if time_per_token > 0.01:  # 如果每Token处理时间超过10ms
                    st.info("每Token处理时间相对较高,建议检查提示词优化或模型选择")

                # 图像数量影响分析
                if session["image_paths"]:
                    time_per_image = time_data["image_processing"] / len(session["image_paths"])
                    st.metric("每图像处理时间", f"{time_per_image:.3f}秒")

# 系统设置标签页
with settings_tab:
    st.header("系统设置")

    # 调试配置
    st.subheader("调试配置")

    # 启用详细日志记录
    enable_verbose_logging = st.checkbox("启用详细日志记录", value=False)

    # 保存详细日志的路径
    log_file_path = st.text_input("日志文件路径", value="./logs/multimodal_debug.log")

    # 数据保留设置
    data_retention_days = st.slider(
        "会话数据保留天数",
        min_value=1,
        max_value=90,
        value=7,
        step=1
    )

    # 保存设置按钮
    if st.button("保存设置", use_container_width=True, type="primary"):
        st.success("设置已保存")

    # 导入/导出设置
    st.subheader("配置管理")

    col1, col2 = st.columns(2)
    with col1:
        if st.button("导出当前配置", use_container_width=True):
            # 创建配置数据
            config_data = {
                "enable_verbose_logging": enable_verbose_logging,
                "log_file_path": log_file_path,
                "data_retention_days": data_retention_days,
                "export_timestamp": datetime.now().isoformat()
            }

            # 转换为JSON字符串
            config_json = json.dumps(config_data, indent=2)

            # 提供下载
            st.download_button(
                label="下载配置文件",
                data=config_json,
                file_name=f'multimodal_debug_config_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json',
                mime='application/json',
                use_container_width=True
            )

    with col2:
        uploaded_config = st.file_uploader("导入配置文件", type="json")
        if uploaded_config is not None:
            try:
                # 读取并解析配置文件
                config_data = json.load(uploaded_config)

                # 这里应该更新相应的设置
                # 由于Streamlit的限制,我们只能显示成功消息
                st.success("配置文件导入成功")

            except json.JSONDecodeError:
                st.error("配置文件格式错误,请检查文件是否为有效的JSON格式")
            except Exception as e:
                st.error(f"导入配置文件时出错: {str(e)}")

    # 系统信息
    st.subheader("系统信息")

    # 模拟系统信息
    system_info = {
        "调试工具版本": "1.0.0",
        "Python版本": "3.9.13",
        "Streamlit版本": "1.22.0",
        "当前会话数": len(st.session_state.multimodal_sessions),
        "缓存图像数": len(st.session_state.image_cache),
        "系统运行时间": "2天5小时32分钟"
    }

    # 显示系统信息
    for key, value in system_info.items():
        st.text(f"{key}: {value}")

    # 关于工具
    st.subheader("关于工具")
    st.markdown("多模态LLM应用调试与可视化工具用于帮助开发者分析和优化多模态LLM应用的性能和输出质量。")
    st.markdown("该工具提供会话管理、性能分析、可视化调试等功能,支持多种多模态任务类型的分析。")

# 页脚信息
st.markdown("---")
st.caption("多模态LLM应用调试与可视化工具 | 版本 1.0.0 | 最后更新: 2023-07-15")

# 注意事项说明
st.caption("注意: 本工具中的数据为模拟数据,仅供演示使用。在实际部署时,请连接到真实的多模态应用数据源。")

9.3 功能解析与使用指南

多模态调试与可视化工具为开发和调试多模态LLM应用提供了全面的功能支持,帮助开发者理解模型如何处理和融合不同模态的信息,识别潜在问题,并优化应用性能。

9.3.1 核心功能解析

1. 会话管理

  • 会话列表与过滤:提供会话列表视图,支持按模型、查询类型、任务类型、状态和时间范围进行过滤
  • 详细会话信息:展示会话的基本信息、输入输出内容、性能指标和图像信息
  • 会话选择:支持选择特定会话进行深入分析和调试
  • 数据导出:支持将会话数据导出为JSON格式,便于进一步分析或存档

2. 性能分析

  • 性能概览:提供关键性能指标的汇总视图,如平均处理时间、平均Token消耗、成功率等
  • 趋势分析:通过时间序列图表展示处理时间的变化趋势
  • 模型比较:比较不同模型的性能表现,帮助选择最优模型
  • 查询类型分析:分析不同查询类型(纯文本、纯图像、图文混合等)的性能差异
  • 异常检测:自动识别处理时间异常长、Token消耗异常高等情况
  • 性能见解:基于分析结果提供性能优化建议和见解

3. 可视化调试

  • 输入输出分析:详细分析输入文本和图像的特征,以及输出的质量评估
  • 注意力可视化:通过热力图可视化文本和图像之间的注意力关系,揭示模型关注的重点
  • 物体检测结果:展示模型检测到的物体及其位置、置信度等信息
  • 视觉特征分析:分析图像的亮度、对比度、主色调等视觉特征
  • 性能瓶颈分析:识别处理流程中的性能瓶颈,并提供针对性的优化建议

4. 系统设置

  • 调试配置:配置日志记录级别、路径和数据保留策略
  • 配置管理:支持配置的导入和导出,便于配置迁移和备份
  • 系统信息:显示工具版本、当前状态等系统信息

9.3.2 使用流程

基本使用流程:

  1. 初始设置与数据准备

    • 配置日志记录和数据保留策略
    • 生成或导入多模态会话数据
  2. 会话浏览与筛选

    • 在会话列表中浏览所有会话
    • 使用过滤条件缩小关注范围
    • 查看会话的基本信息和性能指标
  3. 深入分析与调试

    • 选择特定会话进行详细分析
    • 使用可视化调试功能分析输入输出、注意力分布、物体检测等
    • 识别潜在问题和性能瓶颈
  4. 优化与改进

    • 根据分析结果调整模型参数或提示词
    • 优化图像处理流程和资源分配
    • 实施改进并监控效果

高级使用流程:

  1. 跨会话比较分析

    • 选择多个类似会话进行比较
    • 分析不同模型在相同任务上的表现差异
    • 识别最佳实践和优化模式
  2. 问题排查与诊断

    • 针对失败或性能较差的会话进行详细分析
    • 检查输入质量、处理时间分布和资源使用情况
    • 定位问题根源并制定解决方案
  3. 性能优化循环

    • 识别性能瓶颈和优化机会
    • 实施优化措施
    • 监控改进效果
    • 持续迭代优化过程

9.3.3 实用技巧

1. 多模态输入优化技巧

  • 文本提示优化:根据注意力可视化结果,调整提示词以引导模型关注图像中的关键元素
  • 图像预处理:优化图像尺寸、质量和格式,在保持信息的同时减少处理时间
  • 多图像策略:对于多图像输入,考虑图像的顺序和数量对处理效果的影响
  • 跨模态一致性:确保文本和图像信息的一致性,提高跨模态理解准确性

2. 性能调优技巧

  • 瓶颈识别:关注处理时间占比最高的阶段,这通常是最主要的性能瓶颈
  • 批处理优化:对于多个类似请求,考虑使用批处理来提高效率
  • 资源分配:根据处理时间分布,合理分配计算资源给不同的处理阶段
  • 缓存策略:对于重复出现的相似请求或图像,实施缓存机制减少重复处理

3. 调试与问题排查技巧

  • 对比分析:将失败和成功的会话进行对比,识别关键差异
  • 渐进式调试:从简单输入开始,逐步增加复杂度,定位具体问题点
  • 分段分析:分别分析文本处理、图像处理和模型推理等阶段的表现
  • 异常模式识别:关注反复出现的错误模式,优先解决系统性问题

9.4 应用场景

多模态调试与可视化工具适用于以下场景:

  • 多模态应用开发:在开发过程中分析和优化多模态LLM应用的性能和输出质量
  • 模型评估与选择:比较不同多模态模型在特定任务上的表现
  • 问题诊断与排查:定位和解决多模态应用中的错误和性能问题
  • 用户体验优化:通过分析模型行为,优化用户输入方式和界面设计
  • 成本控制:监控和优化Token使用和处理时间,控制运营成本
  • 研究与实验:分析多模态模型的内部工作机制,为研究提供数据支持

9.5 小结

本章介绍了多模态LLM应用调试与可视化工具的设计原理、实现代码和使用方法。这个工具为开发者提供了全面的多模态应用分析和调试功能,帮助理解模型如何处理和融合不同模态的信息,识别性能瓶颈,并优化应用的整体表现。

通过会话管理、性能分析、可视化调试等功能模块,开发者可以全面了解多模态应用的运行状况,深入分析模型行为,并针对性地进行优化。工具提供的丰富可视化功能,特别是注意力可视化和物体检测结果展示,帮助开发者直观理解模型的内部工作机制。

第10章 LLM交互式调试的最佳实践与未来展望

10.1 LLM交互式调试的最佳实践

在使用Streamlit构建LLM交互式调试工具时,遵循一些最佳实践可以显著提升工具的实用性、易用性和可靠性。以下是一些经过实践验证的关键最佳实践:

10.1.1 设计原则

1. 以用户为中心的设计

  • 直观易用的界面:确保工具界面简洁明了,即使非技术用户也能快速上手
  • 上下文感知:根据用户当前的调试阶段,提供相关的功能和信息
  • 渐进式复杂度:基础功能简单易懂,高级功能可以通过展开或切换标签页访问
  • 即时反馈:用户操作后立即提供视觉或状态反馈

2. 性能优化

  • 懒加载:大型数据集或复杂可视化采用懒加载策略
  • 数据缓存:频繁访问的数据进行缓存,减少重复计算
  • 异步处理:耗时操作采用异步方式,避免阻塞界面
  • 资源管理:及时释放不再需要的资源,避免内存泄漏

3. 可扩展性设计

  • 模块化架构:采用模块化设计,便于添加新功能和维护
  • 插件系统:支持通过插件扩展工具功能
  • 标准化接口:与其他系统集成的标准化接口
  • 配置驱动:通过配置文件调整功能和行为,无需修改代码

10.1.2 实现技巧

1. 数据处理最佳实践

  • 数据验证:所有输入数据都应经过验证,防止无效数据导致的错误
  • 数据转换:提供自动的数据格式转换功能,减少用户手动处理的需要
  • 数据可视化预处理:对原始数据进行适当预处理,以获得更好的可视化效果
  • 批量操作:支持对多个会话或请求进行批量处理和分析

2. 可视化优化

  • 选择合适的图表类型:根据数据类型和分析目的选择最合适的可视化方式
  • 交互式可视化:提供悬停、点击、缩放等交互功能,增强用户体验
  • 响应式设计:可视化组件能够自适应不同屏幕尺寸和布局
  • 颜色编码:使用一致的颜色方案,便于用户理解和区分不同类型的数据

3. 用户体验优化

  • 键盘快捷键:为常用操作提供键盘快捷键
  • 操作撤销/重做:支持关键操作的撤销和重做功能
  • 状态保存和恢复:保存用户的工作状态,支持会话恢复
  • 帮助和提示:提供上下文相关的帮助信息和操作提示

4. 代码质量保证

  • 单元测试:为关键功能编写单元测试
  • 代码文档:提供详细的代码注释和API文档
  • 错误处理:完善的错误处理和异常捕获机制
  • 日志记录:记录关键操作和错误信息,便于问题排查

10.1.3 部署与运维建议

1. 部署策略

  • 容器化部署:使用Docker等容器技术进行标准化部署
  • 环境隔离:开发、测试和生产环境严格分离
  • 版本控制:工具代码和配置文件都应纳入版本控制系统
  • 自动化部署:建立自动化部署流程,减少人为错误

2. 监控与维护

  • 健康检查:定期检查工具的运行状态和性能指标
  • 日志分析:定期分析日志,识别潜在问题和优化机会
  • 备份策略:建立数据备份和恢复机制
  • 更新计划:制定定期更新和维护计划

3. 安全措施

  • 数据加密:敏感数据传输和存储时进行加密
  • 访问控制:实施严格的用户认证和授权机制
  • 输入验证:防止SQL注入、XSS等安全漏洞
  • 定期安全审计:定期进行安全审计和漏洞扫描

10.2 常见问题与解决方案

在开发和使用LLM交互式调试工具时,可能会遇到各种常见问题。以下是一些典型问题及其解决方案:

10.2.1 性能问题

1. 数据加载缓慢

  • 问题症状:加载大量会话数据时界面响应缓慢或卡顿
  • 解决方案
    • 实现数据分页加载,每次只加载部分数据
    • 使用数据索引优化查询性能
    • 对大型数据集进行预处理和缓存
    • 考虑使用异步加载方式,避免阻塞主线程

2. 可视化渲染卡顿

  • 问题症状:复杂图表或大量数据点的可视化渲染缓慢
  • 解决方案
    • 数据降采样或聚合
    • 使用更高效的可视化库或方法
    • 实现渐进式渲染
    • 考虑使用WebGL等硬件加速技术

3. 内存占用过高

  • 问题症状:工具运行一段时间后内存占用持续增长
  • 解决方案
    • 及时释放不再使用的大型对象
    • 实现数据清理机制,定期移除过期数据
    • 优化数据结构,减少不必要的复制
    • 使用生成器或迭代器处理大型数据集

10.2.2 功能问题

1. 模型连接失败

  • 问题症状:无法成功连接到LLM服务或API
  • 解决方案
    • 检查网络连接和代理设置
    • 验证API密钥和认证信息
    • 确认服务端点URL是否正确
    • 检查请求参数是否符合API要求
    • 实现重试机制和超时处理

2. 数据解析错误

  • 问题症状:无法正确解析模型返回的数据或日志文件
  • 解决方案
    • 增强数据解析代码的健壮性,处理各种异常情况
    • 提供详细的错误信息和日志,帮助定位问题
    • 实现数据格式验证和转换功能
    • 考虑支持多种数据格式的导入和导出

3. 会话状态管理问题

  • 问题症状:用户操作后状态不一致或数据丢失
  • 解决方案
    • 实现完整的会话状态管理机制
    • 使用事务性操作确保状态一致性
    • 定期保存用户状态,支持自动恢复
    • 提供明确的操作确认和状态反馈

10.2.3 用户体验问题

1. 界面响应不及时

  • 问题症状:用户操作后界面没有及时响应
  • 解决方案
    • 实现异步处理,避免长时间阻塞主线程
    • 添加加载指示器,提供操作反馈
    • 优化关键操作的性能
    • 考虑将复杂计算放在后台线程

2. 功能发现困难

  • 问题症状:用户难以发现和使用某些功能
  • 解决方案
    • 优化界面布局和导航结构
    • 提供上下文相关的帮助提示
    • 实现功能引导和教程
    • 收集用户反馈,持续改进界面设计

3. 错误提示不明确

  • 问题症状:出现错误时提示信息不够明确或帮助性不强
  • 解决方案
    • 提供详细的错误描述和可能的原因
    • 添加具体的解决建议和操作指导
    • 实现错误分类和优先级标记
    • 收集错误报告,持续改进错误处理

10.3 未来发展趋势

随着LLM技术的快速发展和应用场景的不断扩展,LLM交互式调试工具也在不断演进。以下是一些值得关注的未来发展趋势:

10.3.1 技术发展方向

1. AI辅助调试

  • 智能错误检测:使用AI技术自动检测和分析LLM应用中的潜在问题
  • 自动优化建议:基于历史数据和最佳实践,提供个性化的优化建议
  • 异常检测与预警:实时监控应用性能和输出质量,及时发现异常并预警
  • 智能代码生成:辅助生成和优化LLM应用代码和提示词

2. 多模态调试增强

  • 语音和视频输入支持:扩展调试工具支持更多模态的输入
  • 跨模态关联分析:深入分析不同模态之间的关联和交互
  • 多模态输出可视化:增强对文本、图像、语音等多种输出的可视化能力
  • 多模态协作调试:支持团队成员通过不同模态进行协作调试

3. 实时性和集成性提升

  • 实时数据流处理:支持大规模实时数据流的处理和分析
  • 开发环境深度集成:与IDE和开发工具的深度集成
  • 云原生架构:采用云原生设计,支持弹性扩展和高可用性
  • 微服务架构:将调试工具拆分为微服务,提高灵活性和可维护性

10.3.2 应用场景扩展

1. 企业级应用

  • 多租户架构:支持企业级多租户部署和管理
  • 访问控制和审计:增强的权限管理和操作审计功能
  • 大规模部署支持:支持在大型企业环境中的部署和管理
  • 合规性支持:满足企业数据安全和合规要求

2. 专业化垂直领域

  • 特定行业优化:针对医疗、金融、法律等特定行业的优化
  • 专业领域知识库集成:集成领域特定的知识和规则
  • 行业特定的评估指标:提供符合行业标准的性能和质量评估指标
  • 定制化可视化:支持行业特定的数据可视化需求

3. 教育和研究应用

  • 教学演示模式:专为教育场景设计的演示和教学功能
  • 实验跟踪和分析:支持研究实验的设计、执行和分析
  • 模型比较研究:辅助不同LLM模型和方法的比较研究
  • 可解释性研究工具:支持LLM可解释性研究的专用工具

10.3.3 用户体验创新

1. 沉浸式调试体验

  • 虚拟现实(VR)调试环境:提供沉浸式的3D调试环境
  • 手势和语音控制:支持通过手势和语音进行交互
  • 多维度数据可视化:使用3D和交互式可视化展示复杂数据关系
  • 个性化界面:根据用户习惯和偏好自动调整界面布局和功能

2. 协作式调试

  • 实时协作编辑:支持多用户实时协作调试
  • 智能工作分配:基于团队成员专长自动分配调试任务
  • 协作历史和版本控制:跟踪和管理协作过程中的变更
  • 远程协助功能:专家可以远程协助解决复杂问题

3. 自适应学习系统

  • 用户行为分析:分析用户使用模式和偏好
  • 个性化功能推荐:基于用户历史提供功能和优化建议
  • 渐进式复杂度调整:根据用户技能水平调整工具复杂度
  • 智能引导和提示:提供上下文相关的引导和提示

10.4 总结

LLM交互式调试是现代AI应用开发中的关键环节,而使用Streamlit构建可视化调试工具为开发者提供了强大而灵活的解决方案。通过本文的学习,我们掌握了从基础概念到高级应用的全面知识,包括:

  1. 基础架构设计:学习了如何设计高效、可扩展的LLM交互式调试工具架构
  2. 核心功能实现:掌握了会话管理、请求调试、性能监控等核心功能的实现方法
  3. 高级特性开发:了解了多模态调试、提示词优化等高级特性的开发技巧
  4. 用户体验优化:学习了如何设计直观、易用的用户界面,提升调试效率
  5. 实际应用指导:获取了在不同场景下使用和优化调试工具的实践指导

随着LLM技术的不断发展,交互式调试工具也将继续演进,为开发者提供更强大、更智能的支持。未来的调试工具将更加注重AI辅助、多模态支持、实时性和用户体验,同时扩展到更多垂直领域和应用场景。

作为AI应用开发者,掌握LLM交互式调试技术不仅能够提高开发效率,还能帮助我们更好地理解和优化模型行为,开发出更高质量的AI应用。希望本文介绍的知识和技巧能够帮助读者在LLM应用开发的道路上走得更远、更稳。

总结与展望

通过本文的学习,我们全面探讨了如何使用Streamlit构建LLM交互式调试工具,从基础概念到高级应用,涵盖了丰富的知识内容和实践指导。在AI技术快速发展的今天,高效的调试工具对于开发高质量的LLM应用至关重要。

Streamlit作为一个轻量级但功能强大的Web应用框架,为LLM调试工具的开发提供了理想的平台。它简单易用的API、实时交互能力和丰富的可视化组件,使开发者能够快速构建直观、高效的调试界面。通过本文介绍的各种技术和方法,开发者可以构建满足不同需求的LLM调试工具,从简单的提示工程调试到复杂的多模态应用分析。

随着大语言模型技术的不断进步和应用场景的持续扩展,LLM调试工具也将面临新的挑战和机遇。未来的调试工具将更加智能化、自动化,能够更好地理解和解释模型行为,提供更有针对性的优化建议。同时,随着多模态模型的兴起,调试工具也需要支持更复杂的输入输出形式和交互方式。

对于LLM应用开发者而言,掌握调试技术和工具使用不仅是提高开发效率的关键,更是提升应用质量的必要条件。希望本文介绍的知识和实践经验能够帮助读者在LLM应用开发的道路上不断进步,开发出更多创新、高效、可靠的AI应用。

参考文献

[1] Streamlit官方文档. https://docs.streamlit.io/

[2] OpenAI API文档. https://platform.openai.com/docs/

[3] LangChain框架文档. https://python.langchain.com/docs/get_started/introduction.html

[4] vLLM官方GitHub仓库. https://github.com/vllm-project/vllm

[5] Hugging Face Transformers文档. https://huggingface.co/docs/transformers/index

[6] Pandas官方文档. https://pandas.pydata.org/docs/

[7] Plotly Express文档. https://plotly.com/python/plotly-express/

[8] Altair数据可视化库. https://altair-viz.github.io/

[9] Streamlit Components API参考. https://docs.streamlit.io/library/components

[10] Large Language Models: A Survey. Wang, L., et al. (2023).

[11] Streamlit for Data Science: Create and Deploy Interactive Data Apps. Tyler Richards (2022).

[12] Building LLM Applications with LangChain. Harry Wolfson (2023).

[13] Advanced Streamlit Techniques for Machine Learning Applications. Streamlit Blog. https://blog.streamlit.io/

[14] Evaluating Large Language Models: A Comprehensive Survey. Jiang, L., et al. (2023).

[15] Multi-modal Large Language Models: Vision-Language Foundation Models. Li, Y., et al. (2023).

[16] Qwen2官方技术报告. Alibaba Cloud (2024).

[17] OpenAI GPT-4技术论文. OpenAI (2023).

[18] Anthropic Claude模型文档. https://docs.anthropic.com/claude/

[19] Prompt Engineering Guide. https://www.promptingguide.ai/

[20] LLM Evaluation Metrics and Methodologies. AI research paper collection.

相关文章
|
5月前
|
SQL 人工智能 监控
SLS Copilot 实践:基于 SLS 灵活构建 LLM 应用的数据基础设施
本文将分享我们在构建 SLS SQL Copilot 过程中的工程实践,展示如何基于阿里云 SLS 打造一套完整的 LLM 应用数据基础设施。
1204 82
|
5月前
|
人工智能 监控 测试技术
告别只会写提示词:构建生产级LLM系统的完整架构图​
本文系统梳理了从提示词到生产级LLM产品的八大核心能力:提示词工程、上下文工程、微调、RAG、智能体开发、部署、优化与可观测性,助你构建可落地、可迭代的AI产品体系。
781 52
|
4月前
|
缓存 物联网 PyTorch
使用TensorRT LLM构建和运行Qwen模型
本文档介绍如何在单GPU和单节点多GPU上使用TensorRT LLM构建和运行Qwen模型,涵盖模型转换、引擎构建、量化推理及LoRA微调等操作,并提供详细的代码示例与支持矩阵。
1180 2
|
4月前
|
Web App开发 人工智能 自然语言处理
利用Playwright MCP与LLM构建复杂的工作流与AI智能体
本文介绍如何通过Playwright MCP与大语言模型(LLM)结合,构建智能AI代理与自动化工作流。Playwright MCP基于Model Context Protocol,打通LLM与浏览器自动化的能力,实现自然语言驱动的网页操作。涵盖环境配置、核心组件、智能任务规划、自适应执行及电商采集、自动化测试等实战应用,助力高效构建鲁棒性强、可扩展的AI自动化系统。
|
4月前
|
SQL 数据采集 自然语言处理
04_用LLM分析数据:从表格到可视化报告
在当今数据驱动的时代,数据分析和可视化已成为商业决策、科学研究和日常工作中不可或缺的部分。随着大型语言模型(LLM)技术的飞速发展,2025年的数据分析领域正经历一场革命。传统的数据处理流程通常需要数据科学家掌握复杂的编程技能和统计知识,而现在,借助先进的LLM技术,即使是非技术人员也能轻松地从原始数据中获取洞见并创建专业的可视化报告。
|
4月前
|
数据采集 存储 自然语言处理
113_数据收集:Common Crawl过滤与高质量LLM训练数据构建
在大型语言模型(LLM)的训练过程中,数据质量直接决定了模型的性能上限。即使拥有最先进的模型架构和训练算法,如果没有高质量的训练数据,也难以训练出优秀的语言模型。Common Crawl作为目前互联网上最大的公开网络爬虫数据集之一,为LLM训练提供了宝贵的资源。然而,从原始的Common Crawl数据中提取高质量的训练素材并非易事,需要经过严格的过滤和清洗。本文将全面探讨Common Crawl数据集的特性、过滤策略的设计原则、以及2025年最新的过滤技术,为构建高质量的LLM训练语料提供系统指导。
|
4月前
|
Prometheus 监控 Cloud Native
72_监控仪表盘:构建LLM开发环境的实时观测系统
在2025年的大模型(LLM)开发实践中,实时监控已成为确保模型训练效率和生产部署稳定性的关键环节。与传统软件开发不同,LLM项目面临着独特的监控挑战
|
7月前
|
人工智能 自然语言处理 数据可视化
AI-Compass LLM评估框架:CLiB中文大模型榜单、OpenCompass司南、RAGas、微软Presidio等构建多维度全覆盖评估生态系统
AI-Compass LLM评估框架:CLiB中文大模型榜单、OpenCompass司南、RAGas、微软Presidio等构建多维度全覆盖评估生态系统
 AI-Compass LLM评估框架:CLiB中文大模型榜单、OpenCompass司南、RAGas、微软Presidio等构建多维度全覆盖评估生态系统

热门文章

最新文章