第六部分:部署与运维
6.1 Dockerfile
# docker/Dockerfile
FROM python:3.11-slim
LABEL maintainer="data-platform-team"
LABEL description="Data Middle Platform"
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/app \
TZ=Asia/Shanghai
# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
libpq-dev \
curl \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建日志目录
RUN mkdir -p /app/logs
# 暴露端口
EXPOSE 8000
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 启动命令
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]
6.2 docker-compose.yml
# docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:15-alpine
container_name: data-postgres
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres123
POSTGRES_DB: data_warehouse
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
- ./scripts/init_db.sql:/docker-entrypoint-initdb.d/init.sql
networks:
- data-network
redis:
image: redis:7-alpine
container_name: data-redis
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
networks:
- data-network
minio:
image: minio/minio:latest
container_name: data-minio
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin123
ports:
- "9000:9000"
- "9001:9001"
volumes:
- minio_data:/data
command: server /data --console-address ":9001"
networks:
- data-network
api:
build:
context: .
dockerfile: docker/Dockerfile
container_name: data-api
environment:
APP_ENV: production
POSTGRES_HOST: postgres
REDIS_HOST: redis
ports:
- "8000:8000"
depends_on:
- postgres
- redis
volumes:
- ./logs:/app/logs
networks:
- data-network
celery-worker:
build:
context: .
dockerfile: docker/Dockerfile.celery
container_name: data-celery-worker
environment:
APP_ENV: production
POSTGRES_HOST: postgres
REDIS_HOST: redis
command: celery -A src.tasks.celery_app worker --loglevel=info --concurrency=4
depends_on:
- postgres
- redis
volumes:
- ./logs:/app/logs
networks:
- data-network
celery-beat:
build:
context: .
dockerfile: docker/Dockerfile.celery
container_name: data-celery-beat
environment:
APP_ENV: production
POSTGRES_HOST: postgres
REDIS_HOST: redis
command: celery -A src.tasks.celery_app beat --loglevel=info
depends_on:
- postgres
- redis
volumes:
- ./logs:/app/logs
networks:
- data-network
nginx:
image: nginx:alpine
container_name: data-nginx
ports:
- "80:80"
volumes:
- ./docker/nginx.conf:/etc/nginx/nginx.conf
depends_on:
- api
networks:
- data-network
grafana:
image: grafana/grafana:latest
container_name: data-grafana
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
networks:
- data-network
prometheus:
image: prom/prometheus:latest
container_name: data-prometheus
ports:
- "9090:9090"
volumes:
- ./deploy/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
networks:
- data-network
volumes:
postgres_data:
redis_data:
minio_data:
grafana_data:
prometheus_data:
networks:
data-network:
driver: bridge
第七部分:监控与告警
7.1 监控指标
# src/utils/metrics.py
"""
监控指标
使用Prometheus客户端库暴露指标。
"""
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
# 业务指标
request_total = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"]
)
request_duration = Histogram(
"http_request_duration_seconds",
"HTTP request duration in seconds",
["method", "endpoint"],
buckets=[0.01, 0.05, 0.1, 0.5, 1, 2, 5]
)
# 数据指标
data_sync_records = Counter(
"data_sync_records_total",
"Total records synced",
["source", "status"]
)
data_quality_score = Gauge(
"data_quality_score",
"Data quality score",
["table"]
)
# 系统指标
active_tasks = Gauge(
"celery_active_tasks",
"Number of active Celery tasks"
)
queue_length = Gauge(
"celery_queue_length",
"Celery queue length",
["queue"]
)
def setup_metrics():
"""设置监控指标"""
# 可以在这里添加额外的初始化逻辑
pass
7.2 数据质量监控
# src/quality/checker.py
"""
数据质量检查器
定期检查数据质量,发现异常时发送告警。
"""
from typing import Dict, List
import pandas as pd
from loguru import logger
from src.repository.dwd_repo import DWDRepository
from src.quality.notifier import send_alert
class DataQualityChecker:
"""数据质量检查器"""
def __init__(self):
self.repo = DWDRepository()
self.rules = self._load_rules()
def _load_rules(self) -> Dict:
"""加载质量规则"""
return {
"orders": {
"row_count": {"min": 1000, "max": 1000000},
"null_checks": ["user_id", "pay_amount", "order_no"],
"uniqueness": ["order_no"],
"range_checks": {
"pay_amount": {"min": 0, "max": 100000}
},
},
"users": {
"row_count": {"min": 100, "max": 100000},
"null_checks": ["username"],
"uniqueness": ["user_id", "phone", "email"],
}
}
def check_table(self, table_name: str) -> Dict:
"""
检查单个表的数据质量
Returns:
质量检查结果
"""
logger.info(f"检查数据质量: {table_name}")
df = self.repo.get_table(table_name)
rules = self.rules.get(table_name, {})
results = {
"table": table_name,
"passed": True,
"checks": [],
}
# 1. 行数检查
if "row_count" in rules:
min_rows = rules["row_count"]["min"]
max_rows = rules["row_count"]["max"]
actual_rows = len(df)
if actual_rows < min_rows:
results["passed"] = False
results["checks"].append({
"name": "row_count",
"status": "failed",
"message": f"行数过少: {actual_rows} < {min_rows}"
})
elif actual_rows > max_rows:
results["passed"] = False
results["checks"].append({
"name": "row_count",
"status": "failed",
"message": f"行数过多: {actual_rows} > {max_rows}"
})
else:
results["checks"].append({
"name": "row_count",
"status": "passed",
"message": f"行数正常: {actual_rows}"
})
# 2. 空值检查
for col in rules.get("null_checks", []):
if col in df.columns:
null_count = df[col].isna().sum()
if null_count > 0:
results["passed"] = False
results["checks"].append({
"name": f"null_check_{col}",
"status": "failed",
"message": f"{col}列有{null_count}个空值"
})
else:
results["checks"].append({
"name": f"null_check_{col}",
"status": "passed",
"message": f"{col}列无空值"
})
# 3. 唯一性检查
for col in rules.get("uniqueness", []):
if col in df.columns:
is_unique = df[col].is_unique
if not is_unique:
duplicate_count = df[col].duplicated().sum()
results["passed"] = False
results["checks"].append({
"name": f"uniqueness_{col}",
"status": "failed",
"message": f"{col}列有{duplicate_count}个重复值"
})
else:
results["checks"].append({
"name": f"uniqueness_{col}",
"status": "passed",
"message": f"{col}列无重复"
})
# 4. 范围检查
for col, ranges in rules.get("range_checks", {}).items():
if col in df.columns and pd.api.types.is_numeric_dtype(df[col]):
out_of_range = df[(df[col] < ranges["min"]) | (df[col] > ranges["max"])]
if len(out_of_range) > 0:
results["passed"] = False
results["checks"].append({
"name": f"range_check_{col}",
"status": "failed",
"message": f"{col}列有{len(out_of_range)}个值超出范围"
})
return results
def run_all_checks(self) -> Dict:
"""运行所有数据质量检查"""
results = {
"timestamp": pd.Timestamp.now().isoformat(),
"tables": {}
}
all_passed = True
for table_name in self.rules.keys():
table_result = self.check_table(table_name)
results["tables"][table_name] = table_result
if not table_result["passed"]:
all_passed = False
results["overall_passed"] = all_passed
# 发送告警
if not all_passed:
send_alert("数据质量检查失败", results)
logger.info(f"数据质量检查完成,整体状态: {'通过' if all_passed else '失败'}")
return results
本文通过构建一个完整的电商数据中台,全面展示了Python企业级项目开发的各个环节:
需求分析与架构设计:理解数据中台的业务价值,设计合理的系统架构
数据采集模块:从MySQL、MongoDB、Kafka等多个数据源采集数据
数据处理模块:数据清洗、关联、RFM用户分层
数据服务模块:使用FastAPI构建高性能数据API
任务调度模块:使用Celery处理异步和定时任务
部署运维:Docker容器化、docker-compose编排、监控告警
这个实战项目涵盖了Python企业级开发的核心技术栈:
Web框架:FastAPI
数据处理:Pandas、NumPy
任务队列:Celery、Redis
容器化:Docker、docker-compose
监控:Prometheus、Grafana
数据中台的设计思想和实现技巧可以复用到其他数据密集型项目。希望本文能帮助读者建立起Python企业级开发的完整知识体系,在实际工作中游刃有余。
来源:
http://qcycj.cn/category/jiuwenhua.html