第四部分:数据服务API
4.1 FastAPI应用入口
# src/main.py
"""
FastAPI应用入口
这是整个数据中台的API服务入口。
使用FastAPI构建高性能、异步的数据API。
"""
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from loguru import logger
from config.settings import settings
from src.api.v1.router import api_router
from src.utils.metrics import setup_metrics
from src.utils.logger import setup_logging
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
应用生命周期管理
- startup: 启动时执行(连接数据库、加载配置等)
- shutdown: 关闭时执行(清理资源)
"""
# 启动时
logger.info(f"启动 {settings.APP_NAME} v{settings.APP_VERSION}")
logger.info(f"环境: {settings.APP_ENV}")
# 初始化数据库连接池
# init_db_pool()
# 初始化Redis连接
# init_redis()
yield
# 关闭时
logger.info(f"关闭 {settings.APP_NAME}")
# 关闭数据库连接池
# close_db_pool()
# 创建FastAPI应用实例
app = FastAPI(
title=settings.API_TITLE,
description=settings.API_DESCRIPTION,
version=settings.APP_VERSION,
docs_url="/docs" if settings.DEBUG else None,
redoc_url="/redoc" if settings.DEBUG else None,
lifespan=lifespan,
)
# 配置CORS(跨域资源共享)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 配置可信主机
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["*"] if settings.DEBUG else settings.ALLOWED_HOSTS,
)
# 注册路由
app.include_router(api_router, prefix=settings.API_V1_PREFIX)
# 健康检查端点
@app.get("/health")
async def health_check():
"""健康检查接口"""
return {
"status": "healthy",
"app": settings.APP_NAME,
"version": settings.APP_VERSION,
"env": settings.APP_ENV,
}
# 就绪检查端点
@app.get("/ready")
async def readiness_check():
"""就绪检查(K8s使用)"""
# 检查数据库连接等
return {"status": "ready"}
if __name__ == "__main__":
import uvicorn
setup_logging()
setup_metrics()
uvicorn.run(
"src.main:app",
host="0.0.0.0",
port=8000,
reload=settings.DEBUG,
workers=4 if not settings.DEBUG else 1,
)
4.2 API路由与Schema
# src/api/v1/router.py
"""
API路由汇总
将各个模块的路由汇聚到一起。
"""
from fastapi import APIRouter
from src.api.v1 import dashboard, analysis, user_profile
api_router = APIRouter()
# 注册各个子路由
api_router.include_router(
dashboard.router,
prefix="/dashboard",
tags=["实时看板"]
)
api_router.include_router(
analysis.router,
prefix="/analysis",
tags=["数据分析"]
)
api_router.include_router(
user_profile.router,
prefix="/user-profile",
tags=["用户画像"]
)
# src/api/schemas/common.py
"""
公共Pydantic模型
定义API请求和响应的数据结构。
使用Pydantic实现数据验证和序列化。
"""
from datetime import datetime
from typing import Any, Dict, Generic, List, Optional, TypeVar
from pydantic import BaseModel, Field
class ResponseBase(BaseModel):
"""响应基础模型"""
code: int = Field(200, description="状态码")
message: str = Field("success", description="消息")
timestamp: datetime = Field(default_factory=datetime.now, description="时间戳")
T = TypeVar("T")
class Response(ResponseBase, Generic[T]):
"""通用响应模型"""
data: Optional[T] = Field(None, description="响应数据")
class PageParams(BaseModel):
"""分页参数"""
page: int = Field(1, ge=1, description="页码")
page_size: int = Field(20, ge=1, le=100, description="每页大小")
class PageResponse(BaseModel, Generic[T]):
"""分页响应模型"""
items: List[T] = Field(..., description="数据列表")
total: int = Field(..., description="总记录数")
page: int = Field(..., description="当前页")
page_size: int = Field(..., description="每页大小")
total_pages: int = Field(..., description="总页数")
# src/api/schemas/dashboard.py
"""
实时看板相关Schema
"""
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field
class RealTimeMetrics(BaseModel):
"""实时指标"""
gmv: float = Field(..., description="实时GMV")
order_count: int = Field(..., description="实时订单数")
user_count: int = Field(..., description="实时用户数")
avg_order_value: float = Field(..., description="平均客单价")
class SalesTrendItem(BaseModel):
"""销售额趋势项"""
date: str = Field(..., description="日期")
gmv: float = Field(..., description="GMV")
order_count: int = Field(..., description="订单数")
class TopProductItem(BaseModel):
"""热销商品项"""
product_id: int = Field(..., description="商品ID")
product_name: str = Field(..., description="商品名称")
sales_count: int = Field(..., description="销量")
sales_amount: float = Field(..., description="销售额")
4.3 实时看板API实现
# src/api/v1/dashboard.py
"""
实时看板API
提供实时数据监控接口。
"""
from datetime import datetime, timedelta
from typing import List, Optional
from fastapi import APIRouter, Depends, Query
from src.api.schemas.common import Response
from src.api.schemas.dashboard import RealTimeMetrics, SalesTrendItem, TopProductItem
from src.service.dashboard_service import DashboardService
from src.utils.decorators import cache_response
router = APIRouter()
def get_dashboard_service() -> DashboardService:
"""依赖注入:获取DashboardService实例"""
return DashboardService()
@router.get("/realtime", response_model=Response[RealTimeMetrics])
@cache_response(ttl=10) # 缓存10秒
async def get_realtime_metrics(
service: DashboardService = Depends(get_dashboard_service)
) -> Response[RealTimeMetrics]:
"""
获取实时指标
返回当前时段的实时GMV、订单数、用户数等指标。
数据每10秒刷新一次(通过缓存控制)。
"""
metrics = await service.get_realtime_metrics()
return Response(data=metrics)
@router.get("/sales-trend", response_model=Response[List[SalesTrendItem]])
async def get_sales_trend(
days: int = Query(7, ge=1, le=30, description="天数"),
service: DashboardService = Depends(get_dashboard_service)
) -> Response[List[SalesTrendItem]]:
"""
获取销售额趋势
返回最近N天的销售额和订单数趋势。
"""
trend = await service.get_sales_trend(days)
return Response(data=trend)
@router.get("/top-products", response_model=Response[List[TopProductItem]])
async def get_top_products(
limit: int = Query(10, ge=1, le=50, description="数量"),
days: int = Query(7, ge=1, le=30, description="统计天数"),
service: DashboardService = Depends(get_dashboard_service)
) -> Response[List[TopProductItem]]:
"""
获取热销商品排行
返回最近N天销量最高的商品列表。
"""
top_products = await service.get_top_products(limit, days)
return Response(data=top_products)
# src/service/dashboard_service.py
"""
实时看板服务
实现看板数据的业务逻辑。
"""
from datetime import datetime, timedelta
from typing import List
import pandas as pd
from loguru import logger
from src.api.schemas.dashboard import RealTimeMetrics, SalesTrendItem, TopProductItem
from src.repository.dwd_repo import DWDRepository
class DashboardService:
"""实时看板服务"""
def __init__(self):
self.repo = DWDRepository()
async def get_realtime_metrics(self) -> RealTimeMetrics:
"""
获取实时指标
从数据仓库查询今日0点至今的数据。
"""
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
# 查询今日订单数据
orders = await self.repo.get_orders_since(today_start)
if orders.empty:
return RealTimeMetrics(gmv=0.0, order_count=0, user_count=0, avg_order_value=0.0)
# 计算指标
gmv = orders["pay_amount"].sum()
order_count = len(orders)
user_count = orders["user_id"].nunique()
avg_order_value = gmv / order_count if order_count > 0 else 0.0
return RealTimeMetrics(
gmv=float(gmv),
order_count=order_count,
user_count=user_count,
avg_order_value=float(avg_order_value),
)
async def get_sales_trend(self, days: int) -> List[SalesTrendItem]:
"""
获取销售额趋势
Args:
days: 统计天数
Returns:
每日的销售额和订单数
"""
start_date = datetime.now().date() - timedelta(days=days)
# 查询指定时间范围的数据
orders = await self.repo.get_orders_since(start_date)
if orders.empty:
return []
# 按日期分组聚合
orders["date"] = pd.to_datetime(orders["create_time"]).dt.date
daily_stats = orders.groupby("date").agg({
"pay_amount": "sum",
"order_id": "count",
}).reset_index()
trend = []
for _, row in daily_stats.iterrows():
trend.append(SalesTrendItem(
date=row["date"].isoformat(),
gmv=float(row["pay_amount"]),
order_count=int(row["order_id"]),
))
return trend
async def get_top_products(self, limit: int, days: int) -> List[TopProductItem]:
"""
获取热销商品排行
Args:
limit: 返回数量
days: 统计天数
Returns:
销量最高的商品列表
"""
start_date = datetime.now() - timedelta(days=days)
# 查询订单明细(包含商品信息)
order_items = await self.repo.get_order_items_since(start_date)
if order_items.empty:
return []
# 按商品分组统计
product_stats = order_items.groupby(["product_id", "product_name"]).agg({
"quantity": "sum",
"amount": "sum",
}).reset_index()
# 按销量排序
product_stats = product_stats.sort_values("quantity", ascending=False).head(limit)
top_products = []
for _, row in product_stats.iterrows():
top_products.append(TopProductItem(
product_id=int(row["product_id"]),
product_name=row["product_name"],
sales_count=int(row["quantity"]),
sales_amount=float(row["amount"]),
))
return top_products
第五部分:异步任务与定时调度
5.1 Celery配置
# src/tasks/celery_app.py
"""
Celery应用配置
Celery是Python的分布式任务队列,用于处理异步任务和定时任务。
"""
from celery import Celery
from celery.schedules import crontab
from config.settings import settings
# 创建Celery应用
celery_app = Celery(
"data_mid_platform",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND,
include=[
"src.tasks.data_sync",
"src.tasks.data_quality",
"src.tasks.report",
]
)
# 配置Celery
celery_app.conf.update(
# 任务序列化格式
task_serializer="json",
result_serializer="json",
accept_content=["json"],
# 时区设置
timezone="Asia/Shanghai",
enable_utc=True,
# 任务结果过期时间(秒)
result_expires=3600,
# 任务软超时时间(秒)
task_soft_time_limit=60,
task_time_limit=90,
# 最大重试次数
task_max_retries=3,
# 任务结果后端
result_backend_transport_options={
"retry_policy": {
"timeout": 5.0,
"max_retries": 3,
}
},
# 定时任务配置
beat_schedule={
# 每日凌晨2点同步订单数据
"sync-orders-daily": {
"task": "src.tasks.data_sync.sync_orders",
"schedule": crontab(hour=2, minute=0),
"args": (),
},
# 每日凌晨3点同步用户数据
"sync-users-daily": {
"task": "src.tasks.data_sync.sync_users",
"schedule": crontab(hour=3, minute=0),
"args": (),
},
# 每小时检查数据质量
"check-data-quality-hourly": {
"task": "src.tasks.data_quality.check_quality",
"schedule": crontab(minute=0),
"args": (),
},
# 每日凌晨5点生成RFM报表
"generate-rfm-report-daily": {
"task": "src.tasks.report.generate_rfm_report",
"schedule": crontab(hour=5, minute=0),
"args": (),
},
}
)
# 任务监控
@celery_app.task(bind=True)
def debug_task(self):
"""调试任务"""
print(f"Request: {self.request!r}")
5.2 数据同步任务
# src/tasks/data_sync.py
"""
数据同步任务
定时从业务数据库同步数据到数据仓库。
"""
from datetime import datetime
from celery import shared_task
from loguru import logger
from src.collector.mysql_collector import MySQLCollector
from src.processor.cleaner import DataCleaner
from src.repository.dwd_repo import DWDRepository
@shared_task(bind=True, max_retries=3, soft_time_limit=3600)
def sync_orders(self) -> dict:
"""
同步订单数据
从订单系统的MySQL数据库采集订单数据,
清洗后写入数据仓库。
"""
start_time = datetime.now()
logger.info("开始同步订单数据")
result = {
"task": "sync_orders",
"start_time": start_time.isoformat(),
"status": "running",
}
try:
# 1. 采集数据
collector = MySQLCollector(table_name="orders", db_name="order_db")
collector.connect()
collected_count = collector.collect(incremental=True)
# 2. 清洗数据
# 这里简化,实际应该将采集的数据传入
# cleaner = DataCleaner()
# cleaned_df = cleaner.clean(df)
# 3. 写入数据仓库
# repo = DWDRepository()
# repo.write_orders(cleaned_df)
result.update({
"status": "success",
"collected_count": collected_count,
})
logger.info(f"订单数据同步完成,共同步 {collected_count} 条")
except Exception as e:
logger.error(f"订单数据同步失败: {e}")
result["status"] = "failed"
result["error"] = str(e)
# 重试
raise self.retry(exc=e, countdown=60 * 5) # 5分钟后重试
finally:
collector.close()
result["end_time"] = datetime.now().isoformat()
result["duration_seconds"] = (datetime.now() - start_time).total_seconds()
return result
@shared_task
def sync_users() -> dict:
"""同步用户数据"""
start_time = datetime.now()
logger.info("开始同步用户数据")
result = {
"task": "sync_users",
"start_time": start_time.isoformat(),
"status": "running",
}
try:
collector = MySQLCollector(table_name="users", db_name="user_db")
collector.connect()
collected_count = collector.collect(incremental=True)
result.update({
"status": "success",
"collected_count": collected_count,
})
logger.info(f"用户数据同步完成,共同步 {collected_count} 条")
except Exception as e:
logger.error(f"用户数据同步失败: {e}")
result["status"] = "failed"
result["error"] = str(e)
raise
finally:
collector.close()
result["end_time"] = datetime.now().isoformat()
return result