一、系统概述
随着企业数字化进程的加速,数据已成为决策的重要依据,但传统数据分析流程存在诸多痛点。业务人员往往需要依赖专业数据团队进行SQL查询和报表制作,这个过程耗时耗力且响应迟缓,沟通成本高、技术门槛也高。特别是在门票销售这样的实时性要求较高的行业,快速获取数据洞察对业务决策至关重要。
结合昨天我们刚讲过的Qwen-Agent和前期讲过的Text2SQL技术以及Gradio前端展示技术,构建了一个智能化的门票数据分析平台,让业务人员能够通过自然语言直接进行数据查询和分析,大幅降低了技术门槛,提高了决策效率。旨在通过自然语言交互实现数据库查询和可视化分析。
系统结合了大语言模型、数据库操作和数据可视化技术,为用户提供直观的门票销售数据分析体验。该系统不仅解决了传统数据分析流程的痛点,更为企业级智能数据分析应用提供了新的思路和方案。
系统采用分层架构设计,确保各模块之间的松耦合和高内聚。整体架构包含四个核心层次:
- 用户交互层:基于Gradio构建的Web界面,提供直观的聊天式交互体验。该层负责捕获用户输入、渲染查询结果,并管理用户会话状态。
- 智能代理层:系统的"大脑",基于Qwen-Agent框架实现。该层负责自然语言理解、对话管理、工具调用决策和结果整合。
- 工具执行层:模块化的工具系统,核心包括SQL执行工具和可视化生成工具。该层负责具体的业务逻辑执行和数据处理。
- 数据服务层:包括MySQL数据库和文件系统,为系统提供数据存储和持久化支持。
二、核心组件
1. Qwen-Agent智能代理
- 上下文感知:能够理解多轮对话的上下文关系
- 意图识别:准确识别用户的查询意图和业务需求
- 工具协调:智能决策何时以及如何调用工具函数
- 结果整合:将工具执行结果整合成用户友好的格式
2. Text2SQL转换引擎
- 自然语言理解:解析用户查询的语义结构,识别意图、实体和条件
- 模式映射:将自然语言中的概念映射到数据库中的表、列和关系
- SQL生成:构建符合目标数据库方言的正确语法结构
- 查询优化:确保生成的SQL在执行时具有良好性能
3. Gradio前端展示
- 简单易用:界面简单、设计友好,几行代码即可创建功能完整的Web界面
- 多样化组件:支持文本、图像、音频、视频等多种输入输出格式
- 即时分享:一键生成可公开访问的链接
- 框架无关:可与TensorFlow、PyTorch、Scikit-learn等任何机器学习框架配合使用
- 高度可定制:提供灵活的自定义选项满足不同需求和应用场景
三、系统流程
1. 流程图
2. 流程分解
2.1 基础流程
第一步:用户输入与消息传递 (步骤1-2)
- 用户输入:用户在Web界面输入自然语言问题,如"查询2023年门票销售情况"
- 消息捕获:WebUI组件捕获用户输入,格式化为标准消息格式
- 会话管理:根据对话历史生成唯一的session_id,确保多用户隔离
第二步:智能分析与SQL生成 (步骤3-4)
- 意图理解:Agent结合system_prompt中的业务知识,理解用户查询意图
- 上下文整合:如果有历史对话,会结合上下文进行更准确的理解
- SQL生成:基于数据表结构和业务逻辑,生成符合规范的SQL查询语句
- 工具调用决策:判断需要调用exc_sql工具执行数据查询
第三步:数据查询执行 (步骤5-7)
- 数据库连接:从连接池获取MySQL数据库连接,设置超时和字符集
- SQL执行:使用pandas的read_sql方法执行查询,返回DataFrame
- 结果验证:检查查询结果的有效性,处理空结果或异常情况
第四步:可视化与结果组装 (步骤8-10)
1. 智能可视化:
- 自动分析数据类型(分类变量vs数值变量)
- 根据数据特征选择图表类型(普通柱状图vs堆积柱状图)
- 处理中文显示和格式美化
- 文件保存:将生成的图表保存为PNG文件,使用时间戳确保文件名唯一
2. 结果组装:
- 将DataFrame转换为Markdown表格格式
- 生成图片的Markdown引用链接
- 组合表格和图片为完整响应
第五步:结果返回与显示 (步骤11-12)
- 结果传递:工具结果逐级返回给Agent和WebUI
- 界面渲染:WebUI将Markdown内容渲染为美观的显示格式
- 用户交互:用户可以看到结构化数据和可视化图表,可进行后续交互
2.2 自然语言到SQL的转换流程
- 意图解析:理解用户查询的业务背景和具体需求
- 条件提取:识别时间范围、筛选条件、分组维度等关键要素
- SQL构造:根据数据库schema构建符合语法的SQL语句
- 优化验证:检查SQL的合理性和执行效率
2.3 Agent创建过程
- 配置LLM参数:指定模型版本、超时设置、重试策略
- 实例化Assistant:传入名称、描述、系统提示词等
- 注册工具函数:将'exc_sql'工具绑定到Agent
- 返回初始化完成的bot对象
系统运行界面:
查询过程中生成的图示:
此处也可导入echarts的组件,达到动态炫酷的展示效果!
四、核心代码分解
1. Assistant智能代理
llm_cfg = { 'model': 'qwen-turbo', 'timeout': 30, 'retry_count': 3, } bot = Assistant( llm=llm_cfg, name='门票助手', description='门票查询与订单分析', system_message=system_prompt, function_list=['exc_sql'], )
- 使用qwen-turbo模型作为语言理解核心
- 通过system_prompt定义专业领域知识
- 函数调用机制实现工具扩展
2. 系统提示词设计
system_prompt = """我是门票助手,以下是关于门票订单表相关的字段,我可能会编写对应的SQL,对数据进行查询 -- 门票订单表 CREATE TABLE tkt_orders ( order_time DATETIME, -- 订单日期 ...... quantity INT -- 商品数量 ); 一日门票,对应多种SKU: Universal Studios Beijing One-Day Dated Ticket-Standard Universal Studios Beijing One-Day Dated Ticket-Child Universal Studios Beijing One-Day Dated Ticket-Senior
- 设计原则:明确的角色定义、数据结构说明、SQL查询模式示例、输出格式规范
- 领域专业化:系统提示词不仅定义了助手角色,更包含了详细的数据表结构说明和业务逻辑,使模型能够理解门票业务的特殊性。
- 查询模式预置:通过提供常见的SQL查询模板,如一日门票、二日门票的统计方式,引导模型生成符合业务需求的查询语句。
- 输出规范约束:明确要求原样输出工具返回内容,避免模型过度"聪明"地总结或简化,确保用户获得完整的数据信息。
3. SQLAlchemy引擎配置
engine = create_engine( f'mysql+mysqlconnector://root:Aa123456!@localhost:3306/{database}?charset=utf8mb4', connect_args={'connect_timeout': 10}, pool_size=10, max_overflow=20 )
连接参数详解:
- 连接字符串格式:数据库类型+驱动://用户名:密码@主机:端口/数据库名
- 连接池配置:pool_size控制连接数,max_overflow控制超额连接
- 字符集设置:utf8mb4支持完整Unicode字符
- 并发支持:pool_size=10确保系统能同时处理多个查询请求
- 弹性扩展:max_overflow=20在高峰期提供额外连接缓冲
- 故障隔离:超时设置防止单次查询阻塞整个系统
4. 数据查询执行
df = pd.read_sql(sql_input, engine)
- 使用pandas的read_sql方法直接执行SQL
- 自动将结果转换为DataFrame格式
- 支持复杂的SQL查询语句
5. 可视化函数架构
def generate_chart_png(df_sql, save_path): # 数据类型识别与处理 object_columns = df_sql.select_dtypes(include='O').columns.tolist() num_columns = df_sql.select_dtypes(exclude='O').columns.tolist() # 智能图表类型选择 if len(object_columns) > 0: # 堆积柱状图逻辑 pivot_df = df_sql.pivot_table(...) else: # 普通柱状图逻辑 bottom = np.zeros(len(df_sql))
可视化策略:
- 自动识别数据类型:系统能够区分分类变量和数值变量,这是选择合适图表类型的基础
- 智能图表选择:根据数据特征选择最佳可视化方式
- 当数据包含多个分类维度时,自动选择堆积柱状图展示复合关系
- 单一维度数据使用普通柱状图,保持图表简洁性
- 透视表自动生成:通过pivot_table实现数据重组,满足复杂多维度的可视化需求
6. 中文显示解决方案
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun', 'Arial Unicode MS'] plt.rcParams['axes.unicode_minus'] = False
- 设置中文字体优先级列表
- 解决负号显示异常问题
- 确保图表中文字符正常渲染
7. 函数的调用机制
functions_desc = [ { "name": "exc_sql", "description": "对于生成的SQL,进行SQL查询", # 参数定义确保SQL语句的正确传递 } ]
- 能力边界清晰:语言模型负责理解意图和生成SQL,专业工具负责执行和可视化
- 安全性保障:通过参数校验确保输入的SQL符合预期格式
- 可扩展性强:新的分析功能可以通过添加工具函数快速集成
8. @register_tool装饰器
('exc_sql') class ExcSQLTool(BaseTool): description = '对于生成的SQL,进行SQL查询,并自动可视化' parameters = [{'name': 'sql_input', 'type': 'string', 'required': True}]
- 使用装饰器注册工具名称
- 继承BaseTool基类
- 定义工具描述和参数规范
9. 工具调用接口
def call(self, params: str, **kwargs) -> str: args = json.loads(params) sql_input = args['sql_input']
- JSON格式参数解析
- 异常处理与错误返回
- 统一的返回格式
10. 使用建议与默认提示
chatbot_config = { 'prompt.suggestions': [ '2023年4、5、6月一日门票,二日门票的销量多少?帮我按照周进行统计', '2023年7月的不同省份的入园人数统计', '帮我查看2023年10月1-7日销售渠道订单金额排名', ] }
- 预置典型查询问题
- 降低用户学习成本
- 引导用户使用模式
11. 异常处理机制
try: df = pd.read_sql(sql_input, engine) # 正常处理逻辑 except Exception as e: return f"SQL执行或可视化出错: {str(e)}"
- 数据库连接异常捕获
- SQL执行错误处理
- 可视化过程异常管理
12. 性能优化
dashscope.timeout = 30 # API调用超时设置 connect_args={'connect_timeout': 10} # 数据库连接超时 pool_size=10, max_overflow=20 # 连接池配置
- 合理的超时设置
- 数据库连接池管理
- 资源释放与清理
五:优化与不足
- 针对SQL生成可能出现的错误,系统建立了多级校验机制。包括语法检查、执行验证和错误反馈循环,确保生成SQL的准确性和安全性。
- 通过异步处理、连接池管理和缓存策略优化系统性能。数据库查询使用连接池复用连接,图表生成结果进行缓存,避免重复计算。
- 采用会话隔离和资源池化技术支持多用户并发访问。每个会话有独立的数据空间,关键资源通过池化管理,确保系统稳定运行。
六、总结
本项目成功构建了一个基于Qwen-Agent和Text2SQL的智能门票数据分析系统。通过自然语言交互大幅降低了数据分析的技术门槛,提高了业务决策效率,自动化报表生成,减少人工数据处理工作量,让非技术人员也能深度参与数据分析过程。
系统展示了大语言模型在企业级应用中的巨大潜力。基于提示工程的Text2SQL方案为类似项目提供了新的技术思路,避免了专门模型训练的复杂性。下一步将扩展系统分析能力,支持预测分析和异常检测等高级功能。同时探索更多应用场景,将这一技术方案推广到其他行业领域。
附录:完整实例代码
import os import asyncio from typing import Optional import dashscope from qwen_agent.agents import Assistant from qwen_agent.gui import WebUI import pandas as pd from sqlalchemy import create_engine from qwen_agent.tools.base import BaseTool, register_tool import matplotlib.pyplot as plt import io import base64 import time import numpy as np # 解决中文显示问题 plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun', 'Arial Unicode MS'] # 优先使用的中文字体 plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 # 定义资源文件根目录 ROOT_RESOURCE = os.path.join(os.path.dirname(__file__), 'resource') # 配置 DashScope dashscope.api_key = os.getenv('DASHSCOPE_API_KEY', '') # 从环境变量获取 API Key dashscope.timeout = 30 # 设置超时时间为 30 秒 # ====== 门票助手 system prompt 和函数描述 ====== system_prompt = """我是门票助手,以下是关于门票订单表相关的字段,我可能会编写对应的SQL,对数据进行查询 -- 门票订单表 CREATE TABLE tkt_orders ( order_time DATETIME, -- 订单日期 account_id INT, -- 预定用户ID gov_id VARCHAR(18), -- 商品使用人ID(身份证号) gender VARCHAR(10), -- 使用人性别 age INT, -- 年龄 province VARCHAR(30), -- 使用人省份 SKU VARCHAR(100), -- 商品SKU名 product_serial_no VARCHAR(30), -- 商品ID eco_main_order_id VARCHAR(20), -- 订单ID sales_channel VARCHAR(20), -- 销售渠道 status VARCHAR(30), -- 商品状态 order_value DECIMAL(10,2), -- 订单金额 quantity INT -- 商品数量 ); 一日门票,对应多种SKU: Universal Studios Beijing One-Day Dated Ticket-Standard Universal Studios Beijing One-Day Dated Ticket-Child Universal Studios Beijing One-Day Dated Ticket-Senior 二日门票,对应多种SKU: USB 1.5-Day Dated Ticket Standard USB 1.5-Day Dated Ticket Discounted 一日门票、二日门票查询 SUM(CASE WHEN SKU LIKE 'Universal Studios Beijing One-Day%' THEN quantity ELSE 0 END) AS one_day_ticket_sales, SUM(CASE WHEN SKU LIKE 'USB%' THEN quantity ELSE 0 END) AS two_day_ticket_sales 我将回答用户关于门票相关的问题 每当 exc_sql 工具返回 markdown 表格和图片时,你必须原样输出工具返回的全部内容(包括图片 markdown),不要只总结表格,也不要省略图片。这样用户才能直接看到表格和图片。 """ functions_desc = [ { "name": "exc_sql", "description": "对于生成的SQL,进行SQL查询", "parameters": { "type": "object", "properties": { "sql_input": { "type": "string", "description": "生成的SQL语句", } }, "required": ["sql_input"], }, }, ] # ====== 会话隔离 DataFrame 存储 ====== # 用于存储每个会话的 DataFrame,避免多用户数据串扰 _last_df_dict = {} def get_session_id(kwargs): """根据 kwargs 获取当前会话的唯一 session_id,这里用 messages 的 id""" messages = kwargs.get('messages') if messages is not None: return id(messages) return None # ====== exc_sql 工具类实现 ====== ('exc_sql') class ExcSQLTool(BaseTool): """ SQL查询工具,执行传入的SQL语句并返回结果,并自动进行可视化。 """ description = '对于生成的SQL,进行SQL查询,并自动可视化' parameters = [{ 'name': 'sql_input', 'type': 'string', 'description': '生成的SQL语句', 'required': True }] def call(self, params: str, **kwargs) -> str: import json import matplotlib.pyplot as plt import io, os, time import numpy as np args = json.loads(params) sql_input = args['sql_input'] database = args.get('database', 'world') engine = create_engine( f'mysql+mysqlconnector://root:Aa123456!@localhost:3306/{database}?charset=utf8mb4', connect_args={'connect_timeout': 10}, pool_size=10, max_overflow=20 ) try: df = pd.read_sql(sql_input, engine) md = df.head(10).to_markdown(index=False) # 自动创建目录 save_dir = os.path.join(os.path.dirname(__file__), 'image_show') os.makedirs(save_dir, exist_ok=True) filename = f'bar_{int(time.time()*1000)}.png' save_path = os.path.join(save_dir, filename) # 生成图表 generate_chart_png(df, save_path) img_path = os.path.join('image_show', filename) img_md = f'' return f"{md}\n\n{img_md}" except Exception as e: return f"SQL执行或可视化出错: {str(e)}" # ========== 通用可视化函数 ========== def generate_chart_png(df_sql, save_path): columns = df_sql.columns x = np.arange(len(df_sql)) # 获取object类型 object_columns = df_sql.select_dtypes(include='O').columns.tolist() if columns[0] in object_columns: object_columns.remove(columns[0]) num_columns = df_sql.select_dtypes(exclude='O').columns.tolist() if len(object_columns) > 0: # 对数据进行透视,以便为每个日期和销售渠道创建堆积柱状图 pivot_df = df_sql.pivot_table(index=columns[0], columns=object_columns, values=num_columns, fill_value=0) # 绘制堆积柱状图 fig, ax = plt.subplots(figsize=(10, 6)) # 为每个销售渠道和票类型创建柱状图 bottoms = None for col in pivot_df.columns: ax.bar(pivot_df.index, pivot_df[col], bottom=bottoms, label=str(col)) if bottoms is None: bottoms = pivot_df[col].copy() else: bottoms += pivot_df[col] else: print('进入到else...') bottom = np.zeros(len(df_sql)) for column in columns[1:]: plt.bar(x, df_sql[column], bottom=bottom, label=column) bottom += df_sql[column] plt.xticks(x, df_sql[columns[0]]) plt.legend() plt.title("销售统计") plt.xlabel(columns[0]) plt.ylabel("门票数量") plt.xticks(rotation=45) plt.tight_layout() plt.savefig(save_path) plt.close() # ====== 初始化门票助手服务 ====== def init_agent_service(): """初始化门票助手服务""" llm_cfg = { 'model': 'qwen-turbo-2025-04-28', 'timeout': 30, 'retry_count': 3, } try: bot = Assistant( llm=llm_cfg, name='门票助手', description='门票查询与订单分析', system_message=system_prompt, function_list=['exc_sql'], # 移除绘图工具 ) print("助手初始化成功!") return bot except Exception as e: print(f"助手初始化失败: {str(e)}") raise def app_gui(): """图形界面模式,提供 Web 图形界面""" try: print("正在启动 Web 界面...") # 初始化助手 bot = init_agent_service() # 配置聊天界面,列举3个典型门票查询问题 chatbot_config = { 'prompt.suggestions': [ '2023年4、5、6月一日门票,二日门票的销量多少?帮我按照周进行统计', '2023年7月的不同省份的入园人数统计', '帮我查看2023年10月1-7日销售渠道订单金额排名', ] } print("Web 界面准备就绪,正在启动服务...") # 启动 Web 界面 WebUI( bot, chatbot_config=chatbot_config ).run() except Exception as e: print(f"启动 Web 界面失败: {str(e)}") print("请检查网络连接和 API Key 配置") if __name__ == '__main__': # 运行模式选择 app_gui() # 图形界面模式(默认)
数据库结构参考: