Python语言企业项目实战(三)

简介: 教程来源 http://qcycj.cn/category/jiuwenhua.html 本项目基于FastAPI构建高性能数据服务API,提供实时看板、数据分析与用户画像等模块;集成Celery实现异步任务调度,支持订单/用户数据定时同步、数据质量检查及RFM报表生成,具备完善日志、监控、缓存与错误重试机制。

第四部分:数据服务API

4.1 FastAPI应用入口

# src/main.py
"""
FastAPI应用入口

这是整个数据中台的API服务入口。
使用FastAPI构建高性能、异步的数据API。
"""

from contextlib import asynccontextmanager

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from loguru import logger

from config.settings import settings
from src.api.v1.router import api_router
from src.utils.metrics import setup_metrics
from src.utils.logger import setup_logging


@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    应用生命周期管理

    - startup: 启动时执行(连接数据库、加载配置等)
    - shutdown: 关闭时执行(清理资源)
    """
    # 启动时
    logger.info(f"启动 {settings.APP_NAME} v{settings.APP_VERSION}")
    logger.info(f"环境: {settings.APP_ENV}")

    # 初始化数据库连接池
    # init_db_pool()

    # 初始化Redis连接
    # init_redis()

    yield

    # 关闭时
    logger.info(f"关闭 {settings.APP_NAME}")
    # 关闭数据库连接池
    # close_db_pool()


# 创建FastAPI应用实例
app = FastAPI(
    title=settings.API_TITLE,
    description=settings.API_DESCRIPTION,
    version=settings.APP_VERSION,
    docs_url="/docs" if settings.DEBUG else None,
    redoc_url="/redoc" if settings.DEBUG else None,
    lifespan=lifespan,
)

# 配置CORS(跨域资源共享)
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.CORS_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 配置可信主机
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["*"] if settings.DEBUG else settings.ALLOWED_HOSTS,
)

# 注册路由
app.include_router(api_router, prefix=settings.API_V1_PREFIX)

# 健康检查端点
@app.get("/health")
async def health_check():
    """健康检查接口"""
    return {
        "status": "healthy",
        "app": settings.APP_NAME,
        "version": settings.APP_VERSION,
        "env": settings.APP_ENV,
    }

# 就绪检查端点
@app.get("/ready")
async def readiness_check():
    """就绪检查(K8s使用)"""
    # 检查数据库连接等
    return {"status": "ready"}


if __name__ == "__main__":
    import uvicorn

    setup_logging()
    setup_metrics()

    uvicorn.run(
        "src.main:app",
        host="0.0.0.0",
        port=8000,
        reload=settings.DEBUG,
        workers=4 if not settings.DEBUG else 1,
    )

4.2 API路由与Schema

# src/api/v1/router.py
"""
API路由汇总

将各个模块的路由汇聚到一起。
"""

from fastapi import APIRouter

from src.api.v1 import dashboard, analysis, user_profile

api_router = APIRouter()

# 注册各个子路由
api_router.include_router(
    dashboard.router,
    prefix="/dashboard",
    tags=["实时看板"]
)
api_router.include_router(
    analysis.router,
    prefix="/analysis",
    tags=["数据分析"]
)
api_router.include_router(
    user_profile.router,
    prefix="/user-profile",
    tags=["用户画像"]
)
# src/api/schemas/common.py
"""
公共Pydantic模型

定义API请求和响应的数据结构。
使用Pydantic实现数据验证和序列化。
"""

from datetime import datetime
from typing import Any, Dict, Generic, List, Optional, TypeVar

from pydantic import BaseModel, Field


class ResponseBase(BaseModel):
    """响应基础模型"""
    code: int = Field(200, description="状态码")
    message: str = Field("success", description="消息")
    timestamp: datetime = Field(default_factory=datetime.now, description="时间戳")


T = TypeVar("T")


class Response(ResponseBase, Generic[T]):
    """通用响应模型"""
    data: Optional[T] = Field(None, description="响应数据")


class PageParams(BaseModel):
    """分页参数"""
    page: int = Field(1, ge=1, description="页码")
    page_size: int = Field(20, ge=1, le=100, description="每页大小")


class PageResponse(BaseModel, Generic[T]):
    """分页响应模型"""
    items: List[T] = Field(..., description="数据列表")
    total: int = Field(..., description="总记录数")
    page: int = Field(..., description="当前页")
    page_size: int = Field(..., description="每页大小")
    total_pages: int = Field(..., description="总页数")
# src/api/schemas/dashboard.py
"""
实时看板相关Schema
"""

from datetime import datetime
from typing import List, Optional

from pydantic import BaseModel, Field


class RealTimeMetrics(BaseModel):
    """实时指标"""
    gmv: float = Field(..., description="实时GMV")
    order_count: int = Field(..., description="实时订单数")
    user_count: int = Field(..., description="实时用户数")
    avg_order_value: float = Field(..., description="平均客单价")


class SalesTrendItem(BaseModel):
    """销售额趋势项"""
    date: str = Field(..., description="日期")
    gmv: float = Field(..., description="GMV")
    order_count: int = Field(..., description="订单数")


class TopProductItem(BaseModel):
    """热销商品项"""
    product_id: int = Field(..., description="商品ID")
    product_name: str = Field(..., description="商品名称")
    sales_count: int = Field(..., description="销量")
    sales_amount: float = Field(..., description="销售额")

4.3 实时看板API实现

# src/api/v1/dashboard.py
"""
实时看板API

提供实时数据监控接口。
"""

from datetime import datetime, timedelta
from typing import List, Optional

from fastapi import APIRouter, Depends, Query

from src.api.schemas.common import Response
from src.api.schemas.dashboard import RealTimeMetrics, SalesTrendItem, TopProductItem
from src.service.dashboard_service import DashboardService
from src.utils.decorators import cache_response

router = APIRouter()


def get_dashboard_service() -> DashboardService:
    """依赖注入:获取DashboardService实例"""
    return DashboardService()


@router.get("/realtime", response_model=Response[RealTimeMetrics])
@cache_response(ttl=10)  # 缓存10秒
async def get_realtime_metrics(
    service: DashboardService = Depends(get_dashboard_service)
) -> Response[RealTimeMetrics]:
    """
    获取实时指标

    返回当前时段的实时GMV、订单数、用户数等指标。
    数据每10秒刷新一次(通过缓存控制)。
    """
    metrics = await service.get_realtime_metrics()
    return Response(data=metrics)


@router.get("/sales-trend", response_model=Response[List[SalesTrendItem]])
async def get_sales_trend(
    days: int = Query(7, ge=1, le=30, description="天数"),
    service: DashboardService = Depends(get_dashboard_service)
) -> Response[List[SalesTrendItem]]:
    """
    获取销售额趋势

    返回最近N天的销售额和订单数趋势。
    """
    trend = await service.get_sales_trend(days)
    return Response(data=trend)


@router.get("/top-products", response_model=Response[List[TopProductItem]])
async def get_top_products(
    limit: int = Query(10, ge=1, le=50, description="数量"),
    days: int = Query(7, ge=1, le=30, description="统计天数"),
    service: DashboardService = Depends(get_dashboard_service)
) -> Response[List[TopProductItem]]:
    """
    获取热销商品排行

    返回最近N天销量最高的商品列表。
    """
    top_products = await service.get_top_products(limit, days)
    return Response(data=top_products)
# src/service/dashboard_service.py
"""
实时看板服务

实现看板数据的业务逻辑。
"""

from datetime import datetime, timedelta
from typing import List

import pandas as pd
from loguru import logger

from src.api.schemas.dashboard import RealTimeMetrics, SalesTrendItem, TopProductItem
from src.repository.dwd_repo import DWDRepository


class DashboardService:
    """实时看板服务"""

    def __init__(self):
        self.repo = DWDRepository()

    async def get_realtime_metrics(self) -> RealTimeMetrics:
        """
        获取实时指标

        从数据仓库查询今日0点至今的数据。
        """
        today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)

        # 查询今日订单数据
        orders = await self.repo.get_orders_since(today_start)

        if orders.empty:
            return RealTimeMetrics(gmv=0.0, order_count=0, user_count=0, avg_order_value=0.0)

        # 计算指标
        gmv = orders["pay_amount"].sum()
        order_count = len(orders)
        user_count = orders["user_id"].nunique()
        avg_order_value = gmv / order_count if order_count > 0 else 0.0

        return RealTimeMetrics(
            gmv=float(gmv),
            order_count=order_count,
            user_count=user_count,
            avg_order_value=float(avg_order_value),
        )

    async def get_sales_trend(self, days: int) -> List[SalesTrendItem]:
        """
        获取销售额趋势

        Args:
            days: 统计天数

        Returns:
            每日的销售额和订单数
        """
        start_date = datetime.now().date() - timedelta(days=days)

        # 查询指定时间范围的数据
        orders = await self.repo.get_orders_since(start_date)

        if orders.empty:
            return []

        # 按日期分组聚合
        orders["date"] = pd.to_datetime(orders["create_time"]).dt.date
        daily_stats = orders.groupby("date").agg({
            "pay_amount": "sum",
            "order_id": "count",
        }).reset_index()

        trend = []
        for _, row in daily_stats.iterrows():
            trend.append(SalesTrendItem(
                date=row["date"].isoformat(),
                gmv=float(row["pay_amount"]),
                order_count=int(row["order_id"]),
            ))

        return trend

    async def get_top_products(self, limit: int, days: int) -> List[TopProductItem]:
        """
        获取热销商品排行

        Args:
            limit: 返回数量
            days: 统计天数

        Returns:
            销量最高的商品列表
        """
        start_date = datetime.now() - timedelta(days=days)

        # 查询订单明细(包含商品信息)
        order_items = await self.repo.get_order_items_since(start_date)

        if order_items.empty:
            return []

        # 按商品分组统计
        product_stats = order_items.groupby(["product_id", "product_name"]).agg({
            "quantity": "sum",
            "amount": "sum",
        }).reset_index()

        # 按销量排序
        product_stats = product_stats.sort_values("quantity", ascending=False).head(limit)

        top_products = []
        for _, row in product_stats.iterrows():
            top_products.append(TopProductItem(
                product_id=int(row["product_id"]),
                product_name=row["product_name"],
                sales_count=int(row["quantity"]),
                sales_amount=float(row["amount"]),
            ))

        return top_products

第五部分:异步任务与定时调度

5.1 Celery配置

# src/tasks/celery_app.py
"""
Celery应用配置

Celery是Python的分布式任务队列,用于处理异步任务和定时任务。
"""

from celery import Celery
from celery.schedules import crontab

from config.settings import settings

# 创建Celery应用
celery_app = Celery(
    "data_mid_platform",
    broker=settings.CELERY_BROKER_URL,
    backend=settings.CELERY_RESULT_BACKEND,
    include=[
        "src.tasks.data_sync",
        "src.tasks.data_quality",
        "src.tasks.report",
    ]
)

# 配置Celery
celery_app.conf.update(
    # 任务序列化格式
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],

    # 时区设置
    timezone="Asia/Shanghai",
    enable_utc=True,

    # 任务结果过期时间(秒)
    result_expires=3600,

    # 任务软超时时间(秒)
    task_soft_time_limit=60,
    task_time_limit=90,

    # 最大重试次数
    task_max_retries=3,

    # 任务结果后端
    result_backend_transport_options={
        "retry_policy": {
            "timeout": 5.0,
            "max_retries": 3,
        }
    },

    # 定时任务配置
    beat_schedule={
        # 每日凌晨2点同步订单数据
        "sync-orders-daily": {
            "task": "src.tasks.data_sync.sync_orders",
            "schedule": crontab(hour=2, minute=0),
            "args": (),
        },
        # 每日凌晨3点同步用户数据
        "sync-users-daily": {
            "task": "src.tasks.data_sync.sync_users",
            "schedule": crontab(hour=3, minute=0),
            "args": (),
        },
        # 每小时检查数据质量
        "check-data-quality-hourly": {
            "task": "src.tasks.data_quality.check_quality",
            "schedule": crontab(minute=0),
            "args": (),
        },
        # 每日凌晨5点生成RFM报表
        "generate-rfm-report-daily": {
            "task": "src.tasks.report.generate_rfm_report",
            "schedule": crontab(hour=5, minute=0),
            "args": (),
        },
    }
)


# 任务监控
@celery_app.task(bind=True)
def debug_task(self):
    """调试任务"""
    print(f"Request: {self.request!r}")

5.2 数据同步任务

# src/tasks/data_sync.py
"""
数据同步任务

定时从业务数据库同步数据到数据仓库。
"""

from datetime import datetime

from celery import shared_task
from loguru import logger

from src.collector.mysql_collector import MySQLCollector
from src.processor.cleaner import DataCleaner
from src.repository.dwd_repo import DWDRepository


@shared_task(bind=True, max_retries=3, soft_time_limit=3600)
def sync_orders(self) -> dict:
    """
    同步订单数据

    从订单系统的MySQL数据库采集订单数据,
    清洗后写入数据仓库。
    """
    start_time = datetime.now()
    logger.info("开始同步订单数据")

    result = {
        "task": "sync_orders",
        "start_time": start_time.isoformat(),
        "status": "running",
    }

    try:
        # 1. 采集数据
        collector = MySQLCollector(table_name="orders", db_name="order_db")
        collector.connect()

        collected_count = collector.collect(incremental=True)

        # 2. 清洗数据
        # 这里简化,实际应该将采集的数据传入
        # cleaner = DataCleaner()
        # cleaned_df = cleaner.clean(df)

        # 3. 写入数据仓库
        # repo = DWDRepository()
        # repo.write_orders(cleaned_df)

        result.update({
            "status": "success",
            "collected_count": collected_count,
        })

        logger.info(f"订单数据同步完成,共同步 {collected_count} 条")

    except Exception as e:
        logger.error(f"订单数据同步失败: {e}")
        result["status"] = "failed"
        result["error"] = str(e)

        # 重试
        raise self.retry(exc=e, countdown=60 * 5)  # 5分钟后重试

    finally:
        collector.close()

    result["end_time"] = datetime.now().isoformat()
    result["duration_seconds"] = (datetime.now() - start_time).total_seconds()

    return result


@shared_task
def sync_users() -> dict:
    """同步用户数据"""
    start_time = datetime.now()
    logger.info("开始同步用户数据")

    result = {
        "task": "sync_users",
        "start_time": start_time.isoformat(),
        "status": "running",
    }

    try:
        collector = MySQLCollector(table_name="users", db_name="user_db")
        collector.connect()

        collected_count = collector.collect(incremental=True)

        result.update({
            "status": "success",
            "collected_count": collected_count,
        })

        logger.info(f"用户数据同步完成,共同步 {collected_count} 条")

    except Exception as e:
        logger.error(f"用户数据同步失败: {e}")
        result["status"] = "failed"
        result["error"] = str(e)
        raise

    finally:
        collector.close()

    result["end_time"] = datetime.now().isoformat()
    return result

来源:
http://qcycj.cn/category/jiuwenhua.html

相关文章
|
6天前
|
人工智能 数据可视化 安全
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
本文详解如何用阿里云Lighthouse一键部署OpenClaw,结合飞书CLI等工具,让AI真正“动手”——自动群发、生成科研日报、整理知识库。核心理念:未来软件应为AI而生,CLI即AI的“手脚”,实现高效、安全、可控的智能自动化。
18005 12
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
|
17天前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
29545 141
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
7天前
|
人工智能 JSON 监控
Claude Code 源码泄露:一份价值亿元的 AI 工程公开课
我以为顶级 AI 产品的护城河是模型。读完这 51.2 万行泄露的源码,我发现自己错了。
4606 20
|
6天前
|
人工智能 API 开发者
阿里云百炼 Coding Plan 售罄、Lite 停售、Pro 抢不到?最新解决方案
阿里云百炼Coding Plan Lite已停售,Pro版每日9:30限量抢购难度大。本文解析原因,并提供两大方案:①掌握技巧抢购Pro版;②直接使用百炼平台按量付费——新用户赠100万Tokens,支持Qwen3.5-Max等满血模型,灵活低成本。
1448 3
阿里云百炼 Coding Plan 售罄、Lite 停售、Pro 抢不到?最新解决方案