Python语言企业项目实战(四)

简介: 教程来源 http://qcycj.cn/category/jiuwenhua.html 本文详解电商数据中台实战:基于FastAPI构建高性能数据服务,集成Celery异步任务、PostgreSQL/Redis/MinIO多组件,通过Docker Compose一键部署,并内置Prometheus+Grafana监控与自动化数据质量校验体系。

第六部分:部署与运维

6.1 Dockerfile

# docker/Dockerfile
FROM python:3.11-slim

LABEL maintainer="data-platform-team"
LABEL description="Data Middle Platform"

# 设置工作目录
WORKDIR /app

# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PYTHONPATH=/app \
    TZ=Asia/Shanghai

# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc \
    libpq-dev \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建日志目录
RUN mkdir -p /app/logs

# 暴露端口
EXPOSE 8000

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# 启动命令
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]

6.2 docker-compose.yml

# docker-compose.yml
version: '3.8'

services:
  postgres:
    image: postgres:15-alpine
    container_name: data-postgres
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres123
      POSTGRES_DB: data_warehouse
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./scripts/init_db.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - data-network

  redis:
    image: redis:7-alpine
    container_name: data-redis
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
    networks:
      - data-network

  minio:
    image: minio/minio:latest
    container_name: data-minio
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin123
    ports:
      - "9000:9000"
      - "9001:9001"
    volumes:
      - minio_data:/data
    command: server /data --console-address ":9001"
    networks:
      - data-network

  api:
    build:
      context: .
      dockerfile: docker/Dockerfile
    container_name: data-api
    environment:
      APP_ENV: production
      POSTGRES_HOST: postgres
      REDIS_HOST: redis
    ports:
      - "8000:8000"
    depends_on:
      - postgres
      - redis
    volumes:
      - ./logs:/app/logs
    networks:
      - data-network

  celery-worker:
    build:
      context: .
      dockerfile: docker/Dockerfile.celery
    container_name: data-celery-worker
    environment:
      APP_ENV: production
      POSTGRES_HOST: postgres
      REDIS_HOST: redis
    command: celery -A src.tasks.celery_app worker --loglevel=info --concurrency=4
    depends_on:
      - postgres
      - redis
    volumes:
      - ./logs:/app/logs
    networks:
      - data-network

  celery-beat:
    build:
      context: .
      dockerfile: docker/Dockerfile.celery
    container_name: data-celery-beat
    environment:
      APP_ENV: production
      POSTGRES_HOST: postgres
      REDIS_HOST: redis
    command: celery -A src.tasks.celery_app beat --loglevel=info
    depends_on:
      - postgres
      - redis
    volumes:
      - ./logs:/app/logs
    networks:
      - data-network

  nginx:
    image: nginx:alpine
    container_name: data-nginx
    ports:
      - "80:80"
    volumes:
      - ./docker/nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - api
    networks:
      - data-network

  grafana:
    image: grafana/grafana:latest
    container_name: data-grafana
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana
    networks:
      - data-network

  prometheus:
    image: prom/prometheus:latest
    container_name: data-prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./deploy/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
    networks:
      - data-network

volumes:
  postgres_data:
  redis_data:
  minio_data:
  grafana_data:
  prometheus_data:

networks:
  data-network:
    driver: bridge

第七部分:监控与告警

7.1 监控指标

# src/utils/metrics.py
"""
监控指标

使用Prometheus客户端库暴露指标。
"""

from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST

# 业务指标
request_total = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status"]
)

request_duration = Histogram(
    "http_request_duration_seconds",
    "HTTP request duration in seconds",
    ["method", "endpoint"],
    buckets=[0.01, 0.05, 0.1, 0.5, 1, 2, 5]
)

# 数据指标
data_sync_records = Counter(
    "data_sync_records_total",
    "Total records synced",
    ["source", "status"]
)

data_quality_score = Gauge(
    "data_quality_score",
    "Data quality score",
    ["table"]
)

# 系统指标
active_tasks = Gauge(
    "celery_active_tasks",
    "Number of active Celery tasks"
)

queue_length = Gauge(
    "celery_queue_length",
    "Celery queue length",
    ["queue"]
)


def setup_metrics():
    """设置监控指标"""
    # 可以在这里添加额外的初始化逻辑
    pass

7.2 数据质量监控

# src/quality/checker.py
"""
数据质量检查器

定期检查数据质量,发现异常时发送告警。
"""

from typing import Dict, List

import pandas as pd
from loguru import logger

from src.repository.dwd_repo import DWDRepository
from src.quality.notifier import send_alert


class DataQualityChecker:
    """数据质量检查器"""

    def __init__(self):
        self.repo = DWDRepository()
        self.rules = self._load_rules()

    def _load_rules(self) -> Dict:
        """加载质量规则"""
        return {
            "orders": {
                "row_count": {"min": 1000, "max": 1000000},
                "null_checks": ["user_id", "pay_amount", "order_no"],
                "uniqueness": ["order_no"],
                "range_checks": {
                    "pay_amount": {"min": 0, "max": 100000}
                },
            },
            "users": {
                "row_count": {"min": 100, "max": 100000},
                "null_checks": ["username"],
                "uniqueness": ["user_id", "phone", "email"],
            }
        }

    def check_table(self, table_name: str) -> Dict:
        """
        检查单个表的数据质量

        Returns:
            质量检查结果
        """
        logger.info(f"检查数据质量: {table_name}")

        df = self.repo.get_table(table_name)
        rules = self.rules.get(table_name, {})

        results = {
            "table": table_name,
            "passed": True,
            "checks": [],
        }

        # 1. 行数检查
        if "row_count" in rules:
            min_rows = rules["row_count"]["min"]
            max_rows = rules["row_count"]["max"]
            actual_rows = len(df)

            if actual_rows < min_rows:
                results["passed"] = False
                results["checks"].append({
                    "name": "row_count",
                    "status": "failed",
                    "message": f"行数过少: {actual_rows} < {min_rows}"
                })
            elif actual_rows > max_rows:
                results["passed"] = False
                results["checks"].append({
                    "name": "row_count",
                    "status": "failed",
                    "message": f"行数过多: {actual_rows} > {max_rows}"
                })
            else:
                results["checks"].append({
                    "name": "row_count",
                    "status": "passed",
                    "message": f"行数正常: {actual_rows}"
                })

        # 2. 空值检查
        for col in rules.get("null_checks", []):
            if col in df.columns:
                null_count = df[col].isna().sum()
                if null_count > 0:
                    results["passed"] = False
                    results["checks"].append({
                        "name": f"null_check_{col}",
                        "status": "failed",
                        "message": f"{col}列有{null_count}个空值"
                    })
                else:
                    results["checks"].append({
                        "name": f"null_check_{col}",
                        "status": "passed",
                        "message": f"{col}列无空值"
                    })

        # 3. 唯一性检查
        for col in rules.get("uniqueness", []):
            if col in df.columns:
                is_unique = df[col].is_unique
                if not is_unique:
                    duplicate_count = df[col].duplicated().sum()
                    results["passed"] = False
                    results["checks"].append({
                        "name": f"uniqueness_{col}",
                        "status": "failed",
                        "message": f"{col}列有{duplicate_count}个重复值"
                    })
                else:
                    results["checks"].append({
                        "name": f"uniqueness_{col}",
                        "status": "passed",
                        "message": f"{col}列无重复"
                    })

        # 4. 范围检查
        for col, ranges in rules.get("range_checks", {}).items():
            if col in df.columns and pd.api.types.is_numeric_dtype(df[col]):
                out_of_range = df[(df[col] < ranges["min"]) | (df[col] > ranges["max"])]
                if len(out_of_range) > 0:
                    results["passed"] = False
                    results["checks"].append({
                        "name": f"range_check_{col}",
                        "status": "failed",
                        "message": f"{col}列有{len(out_of_range)}个值超出范围"
                    })

        return results

    def run_all_checks(self) -> Dict:
        """运行所有数据质量检查"""
        results = {
            "timestamp": pd.Timestamp.now().isoformat(),
            "tables": {}
        }

        all_passed = True

        for table_name in self.rules.keys():
            table_result = self.check_table(table_name)
            results["tables"][table_name] = table_result

            if not table_result["passed"]:
                all_passed = False

        results["overall_passed"] = all_passed

        # 发送告警
        if not all_passed:
            send_alert("数据质量检查失败", results)

        logger.info(f"数据质量检查完成,整体状态: {'通过' if all_passed else '失败'}")

        return results

本文通过构建一个完整的电商数据中台,全面展示了Python企业级项目开发的各个环节:

需求分析与架构设计:理解数据中台的业务价值,设计合理的系统架构

数据采集模块:从MySQL、MongoDB、Kafka等多个数据源采集数据

数据处理模块:数据清洗、关联、RFM用户分层

数据服务模块:使用FastAPI构建高性能数据API

任务调度模块:使用Celery处理异步和定时任务

部署运维:Docker容器化、docker-compose编排、监控告警

这个实战项目涵盖了Python企业级开发的核心技术栈:

Web框架:FastAPI

数据处理:Pandas、NumPy

任务队列:Celery、Redis

容器化:Docker、docker-compose

监控:Prometheus、Grafana

数据中台的设计思想和实现技巧可以复用到其他数据密集型项目。希望本文能帮助读者建立起Python企业级开发的完整知识体系,在实际工作中游刃有余。
来源:
http://qcycj.cn/category/jiuwenhua.html

相关文章
|
6天前
|
人工智能 数据可视化 安全
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
本文详解如何用阿里云Lighthouse一键部署OpenClaw,结合飞书CLI等工具,让AI真正“动手”——自动群发、生成科研日报、整理知识库。核心理念:未来软件应为AI而生,CLI即AI的“手脚”,实现高效、安全、可控的智能自动化。
29324 14
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
|
18天前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
40362 141
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
7天前
|
人工智能 JSON 监控
Claude Code 源码泄露:一份价值亿元的 AI 工程公开课
我以为顶级 AI 产品的护城河是模型。读完这 51.2 万行泄露的源码,我发现自己错了。
4669 20
|
6天前
|
人工智能 API 开发者
阿里云百炼 Coding Plan 售罄、Lite 停售、Pro 抢不到?最新解决方案
阿里云百炼Coding Plan Lite已停售,Pro版每日9:30限量抢购难度大。本文解析原因,并提供两大方案:①掌握技巧抢购Pro版;②直接使用百炼平台按量付费——新用户赠100万Tokens,支持Qwen3.5-Max等满血模型,灵活低成本。
1521 3
阿里云百炼 Coding Plan 售罄、Lite 停售、Pro 抢不到?最新解决方案

热门文章

最新文章