Python语言企业项目实战(四)

简介: 教程来源 http://qcycj.cn/category/jiuwenhua.html 本文详解电商数据中台实战:基于FastAPI构建高性能数据服务,集成Celery异步任务、PostgreSQL/Redis/MinIO多组件,通过Docker Compose一键部署,并内置Prometheus+Grafana监控与自动化数据质量校验体系。

第六部分:部署与运维

6.1 Dockerfile

# docker/Dockerfile
FROM python:3.11-slim

LABEL maintainer="data-platform-team"
LABEL description="Data Middle Platform"

# 设置工作目录
WORKDIR /app

# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PYTHONPATH=/app \
    TZ=Asia/Shanghai

# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc \
    libpq-dev \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建日志目录
RUN mkdir -p /app/logs

# 暴露端口
EXPOSE 8000

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# 启动命令
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]

6.2 docker-compose.yml

# docker-compose.yml
version: '3.8'

services:
  postgres:
    image: postgres:15-alpine
    container_name: data-postgres
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres123
      POSTGRES_DB: data_warehouse
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./scripts/init_db.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - data-network

  redis:
    image: redis:7-alpine
    container_name: data-redis
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
    networks:
      - data-network

  minio:
    image: minio/minio:latest
    container_name: data-minio
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin123
    ports:
      - "9000:9000"
      - "9001:9001"
    volumes:
      - minio_data:/data
    command: server /data --console-address ":9001"
    networks:
      - data-network

  api:
    build:
      context: .
      dockerfile: docker/Dockerfile
    container_name: data-api
    environment:
      APP_ENV: production
      POSTGRES_HOST: postgres
      REDIS_HOST: redis
    ports:
      - "8000:8000"
    depends_on:
      - postgres
      - redis
    volumes:
      - ./logs:/app/logs
    networks:
      - data-network

  celery-worker:
    build:
      context: .
      dockerfile: docker/Dockerfile.celery
    container_name: data-celery-worker
    environment:
      APP_ENV: production
      POSTGRES_HOST: postgres
      REDIS_HOST: redis
    command: celery -A src.tasks.celery_app worker --loglevel=info --concurrency=4
    depends_on:
      - postgres
      - redis
    volumes:
      - ./logs:/app/logs
    networks:
      - data-network

  celery-beat:
    build:
      context: .
      dockerfile: docker/Dockerfile.celery
    container_name: data-celery-beat
    environment:
      APP_ENV: production
      POSTGRES_HOST: postgres
      REDIS_HOST: redis
    command: celery -A src.tasks.celery_app beat --loglevel=info
    depends_on:
      - postgres
      - redis
    volumes:
      - ./logs:/app/logs
    networks:
      - data-network

  nginx:
    image: nginx:alpine
    container_name: data-nginx
    ports:
      - "80:80"
    volumes:
      - ./docker/nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - api
    networks:
      - data-network

  grafana:
    image: grafana/grafana:latest
    container_name: data-grafana
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana
    networks:
      - data-network

  prometheus:
    image: prom/prometheus:latest
    container_name: data-prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./deploy/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
    networks:
      - data-network

volumes:
  postgres_data:
  redis_data:
  minio_data:
  grafana_data:
  prometheus_data:

networks:
  data-network:
    driver: bridge

第七部分:监控与告警

7.1 监控指标

# src/utils/metrics.py
"""
监控指标

使用Prometheus客户端库暴露指标。
"""

from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST

# 业务指标
request_total = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status"]
)

request_duration = Histogram(
    "http_request_duration_seconds",
    "HTTP request duration in seconds",
    ["method", "endpoint"],
    buckets=[0.01, 0.05, 0.1, 0.5, 1, 2, 5]
)

# 数据指标
data_sync_records = Counter(
    "data_sync_records_total",
    "Total records synced",
    ["source", "status"]
)

data_quality_score = Gauge(
    "data_quality_score",
    "Data quality score",
    ["table"]
)

# 系统指标
active_tasks = Gauge(
    "celery_active_tasks",
    "Number of active Celery tasks"
)

queue_length = Gauge(
    "celery_queue_length",
    "Celery queue length",
    ["queue"]
)


def setup_metrics():
    """设置监控指标"""
    # 可以在这里添加额外的初始化逻辑
    pass

7.2 数据质量监控

# src/quality/checker.py
"""
数据质量检查器

定期检查数据质量,发现异常时发送告警。
"""

from typing import Dict, List

import pandas as pd
from loguru import logger

from src.repository.dwd_repo import DWDRepository
from src.quality.notifier import send_alert


class DataQualityChecker:
    """数据质量检查器"""

    def __init__(self):
        self.repo = DWDRepository()
        self.rules = self._load_rules()

    def _load_rules(self) -> Dict:
        """加载质量规则"""
        return {
            "orders": {
                "row_count": {"min": 1000, "max": 1000000},
                "null_checks": ["user_id", "pay_amount", "order_no"],
                "uniqueness": ["order_no"],
                "range_checks": {
                    "pay_amount": {"min": 0, "max": 100000}
                },
            },
            "users": {
                "row_count": {"min": 100, "max": 100000},
                "null_checks": ["username"],
                "uniqueness": ["user_id", "phone", "email"],
            }
        }

    def check_table(self, table_name: str) -> Dict:
        """
        检查单个表的数据质量

        Returns:
            质量检查结果
        """
        logger.info(f"检查数据质量: {table_name}")

        df = self.repo.get_table(table_name)
        rules = self.rules.get(table_name, {})

        results = {
            "table": table_name,
            "passed": True,
            "checks": [],
        }

        # 1. 行数检查
        if "row_count" in rules:
            min_rows = rules["row_count"]["min"]
            max_rows = rules["row_count"]["max"]
            actual_rows = len(df)

            if actual_rows < min_rows:
                results["passed"] = False
                results["checks"].append({
                    "name": "row_count",
                    "status": "failed",
                    "message": f"行数过少: {actual_rows} < {min_rows}"
                })
            elif actual_rows > max_rows:
                results["passed"] = False
                results["checks"].append({
                    "name": "row_count",
                    "status": "failed",
                    "message": f"行数过多: {actual_rows} > {max_rows}"
                })
            else:
                results["checks"].append({
                    "name": "row_count",
                    "status": "passed",
                    "message": f"行数正常: {actual_rows}"
                })

        # 2. 空值检查
        for col in rules.get("null_checks", []):
            if col in df.columns:
                null_count = df[col].isna().sum()
                if null_count > 0:
                    results["passed"] = False
                    results["checks"].append({
                        "name": f"null_check_{col}",
                        "status": "failed",
                        "message": f"{col}列有{null_count}个空值"
                    })
                else:
                    results["checks"].append({
                        "name": f"null_check_{col}",
                        "status": "passed",
                        "message": f"{col}列无空值"
                    })

        # 3. 唯一性检查
        for col in rules.get("uniqueness", []):
            if col in df.columns:
                is_unique = df[col].is_unique
                if not is_unique:
                    duplicate_count = df[col].duplicated().sum()
                    results["passed"] = False
                    results["checks"].append({
                        "name": f"uniqueness_{col}",
                        "status": "failed",
                        "message": f"{col}列有{duplicate_count}个重复值"
                    })
                else:
                    results["checks"].append({
                        "name": f"uniqueness_{col}",
                        "status": "passed",
                        "message": f"{col}列无重复"
                    })

        # 4. 范围检查
        for col, ranges in rules.get("range_checks", {}).items():
            if col in df.columns and pd.api.types.is_numeric_dtype(df[col]):
                out_of_range = df[(df[col] < ranges["min"]) | (df[col] > ranges["max"])]
                if len(out_of_range) > 0:
                    results["passed"] = False
                    results["checks"].append({
                        "name": f"range_check_{col}",
                        "status": "failed",
                        "message": f"{col}列有{len(out_of_range)}个值超出范围"
                    })

        return results

    def run_all_checks(self) -> Dict:
        """运行所有数据质量检查"""
        results = {
            "timestamp": pd.Timestamp.now().isoformat(),
            "tables": {}
        }

        all_passed = True

        for table_name in self.rules.keys():
            table_result = self.check_table(table_name)
            results["tables"][table_name] = table_result

            if not table_result["passed"]:
                all_passed = False

        results["overall_passed"] = all_passed

        # 发送告警
        if not all_passed:
            send_alert("数据质量检查失败", results)

        logger.info(f"数据质量检查完成,整体状态: {'通过' if all_passed else '失败'}")

        return results

本文通过构建一个完整的电商数据中台,全面展示了Python企业级项目开发的各个环节:

需求分析与架构设计:理解数据中台的业务价值,设计合理的系统架构

数据采集模块:从MySQL、MongoDB、Kafka等多个数据源采集数据

数据处理模块:数据清洗、关联、RFM用户分层

数据服务模块:使用FastAPI构建高性能数据API

任务调度模块:使用Celery处理异步和定时任务

部署运维:Docker容器化、docker-compose编排、监控告警

这个实战项目涵盖了Python企业级开发的核心技术栈:

Web框架:FastAPI

数据处理:Pandas、NumPy

任务队列:Celery、Redis

容器化:Docker、docker-compose

监控:Prometheus、Grafana

数据中台的设计思想和实现技巧可以复用到其他数据密集型项目。希望本文能帮助读者建立起Python企业级开发的完整知识体系,在实际工作中游刃有余。
来源:
http://qcycj.cn/category/jiuwenhua.html

相关文章
|
1月前
|
数据采集 存储 关系型数据库
Python语言企业项目实战(一)
教程来源 http://qcycj.cn/category/jiuqi.html 本文带你用Python从零构建生产级电商数据中台,涵盖多源数据采集(MySQL/MongoDB/API/日志)、Pandas清洗聚合、FastAPI高性能服务、Celery+Redis任务调度、数据质量监控及Docker/K8s容器化部署,助你掌握企业级数据平台全栈能力。
|
1月前
|
缓存 监控 API
Python语言企业项目实战(三)
教程来源 http://qcycj.cn/category/jiuwenhua.html 本项目基于FastAPI构建高性能数据服务API,提供实时看板、数据分析与用户画像等模块;集成Celery实现异步任务调度,支持订单/用户数据定时同步、数据质量检查及RFM报表生成,具备完善日志、监控、缓存与错误重试机制。
|
19天前
|
移动开发 自然语言处理 小程序
前端组件库——Wot Design Uni知识点大全(三)
教程来源 https://rvtst.cn Wot Design Uni 是基于 Vue3 的跨平台 UI 组件库,支持微信/支付宝小程序、H5、App 等多端兼容;提供样式隔离修复、virtualHost 渲染优化、虚拟列表、按需引入、国际化(15+语言)等完整解决方案。
|
1月前
|
弹性计算 人工智能 机器人
阿里云ECS/轻量服务器+本地全平台部署OpenClaw|集成QQ机器人+千问Qwen3.6-Plus+Coding Plan大模型配置保姆级教程
2026年,开源AI自动化框架OpenClaw(曾用名Clawdbot)已成为个人与团队效率提升的核心工具,凭借“行动式AI”能力,可将自然语言指令转化为文件管理、系统控制、数据处理、社交交互等实际任务执行。本文完整覆盖2026年阿里云轻量服务器部署及本地MacOS/Linux/Windows11部署OpenClaw(Clawdbot)步骤流程及阿里云千问Qwen3.6-Plus配置或市场上免费大模型Coding Plan API配置及常见问题解答,同步新增阿里云ECS云服务器专业部署、QQ机器人全流程集成方案,所有操作附可直接复制的代码命令、可视化指引与高频问题排查方案。
341 14
|
1月前
|
Web App开发 Rust 前端开发
基于Rust开发的m3u8下载器:支持断点续传、边下边播
M3U8 Quicker是一款轻量(仅2MB)跨平台M3U8下载播放器,基于Tauri+Rust+React开发。支持断点续传、AES解密、边下边播、自动转MP4及Chrome一键抓取地址,让课程保存与媒体管理更高效稳定。
236 4
|
1月前
|
弹性计算 人工智能 测试技术
2026年阿里云服务器特价和轻量应用服务器抢购活动入口、活动时间及规则介绍
2026年阿里云推出特惠活动,降低上云门槛。每日限量抢购的轻量应用服务器,2核2G配置38元/年,2核4G配置9.9元/月起,适合追求极致性价比的用户。同时,长期特价云服务器ECS,2核2G配置99元/年、2核4G配置199元/年,提供稳定性能和固定带宽。活动包括抢购规则、配置价格、购买资格及续费政策等详细信息。用户可根据自身需求和业务场景选择适合的套餐。
|
1月前
|
JSON 前端开发 JavaScript
基于LangChain的简易智能旅游助手Agent
本文分享基于LangChain开发的智能旅游助手Agent,支持“查天气+荐景点”双功能,对比ReAct与FunctionCall两种实现模式,并详解工具封装、记忆管理、执行框架等LangChain核心优势。代码开源,含FastAPI后端与原生HTML/JS前端。
192 3
|
1月前
|
缓存 人工智能 安全
Claude Code 偷偷烧钱?逆向工程揭露 7 个叠加 Bug,Max 20x 一天耗尽 43% 周配额
一位 Claude Max 20x 订阅用户仅一天就烧掉了一周 43% 的 token 配额。他逆向分析 Claude Code 源码,找到了 7 个可以叠加触发的缓存 Bug,最致命的是 Extra Usage 模式会静默将缓存时长从 1 小时降级为 5 分钟,形成"死亡螺旋"。
363 3
|
5月前
|
安全 Java API
Java日期处理完全指南(新手也能轻松掌握的Java时间格式化与日期API教程)
教程来源https://www.vpshk.cn/本文介绍Java 8引入的java.time包,详解LocalDateTime、LocalDate等类的使用,涵盖获取当前时间、格式化、解析字符串及日期运算,助你轻松掌握现代Java日期处理方法,适合初学者快速上手。
|
4月前
|
安全 Linux 数据库
什么是CMS(网站管理系统),哪款 CMS适合你
本文阐述 CMS 定义及选择的六大核心考量,结合 PageAdmin、阿里云SaaS 建站实例给出实践建议,强调 CMS 选择无最优,唯有贴合实际需求才最合适。
244 1