七、监控与可观测性
7.1 结构化日志
# src/infrastructure/logging/structured_logger.py
import json
import logging
import sys
from datetime import datetime
from typing import Any, Dict, Optional
from contextvars import ContextVar
# 请求上下文
request_id_var: ContextVar[str] = ContextVar('request_id', default='')
user_id_var: ContextVar[str] = ContextVar('user_id', default='')
trace_id_var: ContextVar[str] = ContextVar('trace_id', default='')
class StructuredJSONFormatter(logging.Formatter):
"""结构化JSON日志格式化器"""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
"request_id": request_id_var.get(),
"user_id": user_id_var.get(),
"trace_id": trace_id_var.get()
}
# 添加异常信息
if record.exc_info:
log_entry["exception"] = {
"type": record.exc_info[0].__name__,
"message": str(record.exc_info[1]),
"traceback": self.formatException(record.exc_info)
}
# 添加额外字段
if hasattr(record, 'extra_fields'):
log_entry.update(record.extra_fields)
return json.dumps(log_entry, ensure_ascii=False)
def setup_logging(level: str = "INFO", json_format: bool = True):
"""配置日志系统"""
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, level.upper()))
# 清除现有处理器
root_logger.handlers.clear()
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
if json_format:
console_handler.setFormatter(StructuredJSONFormatter())
else:
console_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
))
root_logger.addHandler(console_handler)
# 文件处理器(生产环境)
if level == "PRODUCTION":
file_handler = logging.handlers.RotatingFileHandler(
"/var/log/app/app.log",
maxBytes=104857600, # 100MB
backupCount=10
)
file_handler.setFormatter(StructuredJSONFormatter())
root_logger.addHandler(file_handler)
def get_logger(name: str):
"""获取日志器实例"""
return logging.getLogger(name)
# 使用示例
logger = get_logger(__name__)
# 带上下文的日志
logger.info("User logged in", extra={
"extra_fields": {
"user_id": "user_123",
"ip_address": "192.168.1.1",
"user_agent": "Mozilla/5.0"
}
})
# 错误日志
try:
risky_operation()
except Exception as e:
logger.exception("Operation failed", extra={
"extra_fields": {"operation": "risky_operation", "params": params}
})
7.2 Metrics采集
# src/infrastructure/metrics/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Summary, generate_latest
from fastapi import Response
import time
from functools import wraps
from typing import Callable
# 定义Metrics
http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
http_request_duration_seconds = Histogram(
'http_request_duration_seconds',
'HTTP request duration in seconds',
['method', 'endpoint'],
buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10)
)
db_query_duration = Histogram(
'db_query_duration_seconds',
'Database query duration in seconds',
['query_type'],
buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1)
)
active_connections = Gauge(
'active_connections',
'Number of active connections',
['type']
)
order_processing_time = Summary(
'order_processing_time_seconds',
'Time to process an order',
['order_type']
)
business_metrics = {
'orders_created_total': Counter('orders_created_total', 'Total orders created', ['status']),
'payment_success_total': Counter('payment_success_total', 'Total successful payments'),
'payment_failure_total': Counter('payment_failure_total', 'Total failed payments', ['reason']),
'cart_abandonment_rate': Gauge('cart_abandonment_rate', 'Cart abandonment rate')
}
def track_request_metrics(func: Callable):
"""请求指标装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
method = kwargs.get('request', args[0] if args else None).method
endpoint = func.__name__
start_time = time.time()
try:
response = await func(*args, **kwargs)
status = response.status_code if hasattr(response, 'status_code') else 200
http_requests_total.labels(method=method, endpoint=endpoint, status=status).inc()
return response
except Exception as e:
http_requests_total.labels(method=method, endpoint=endpoint, status=500).inc()
raise
finally:
duration = time.time() - start_time
http_request_duration_seconds.labels(method=method, endpoint=endpoint).observe(duration)
return wrapper
def track_db_query(query_type: str):
"""数据库查询指标装饰器"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
db_query_duration.labels(query_type=query_type).observe(duration)
return wrapper
return decorator
# 在FastAPI中暴露metrics端点
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
return Response(content=generate_latest(), media_type="text/plain")
# 业务指标更新示例
def record_order_created(status: str):
business_metrics['orders_created_total'].labels(status=status).inc()
def record_payment_result(success: bool, reason: str = None):
if success:
business_metrics['payment_success_total'].inc()
else:
business_metrics['payment_failure_total'].labels(reason=reason).inc()
7.3 分布式追踪
# src/infrastructure/tracing/tracing.py
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
from opentelemetry.trace import Status, StatusCode
from fastapi import FastAPI, Request
from typing import Optional
def setup_tracing(service_name: str, jaeger_host: str = "localhost", jaeger_port: int = 6831):
"""配置分布式追踪"""
# 创建资源
resource = Resource(attributes={
SERVICE_NAME: service_name,
"environment": os.getenv("ENVIRONMENT", "development"),
"version": os.getenv("APP_VERSION", "unknown")
})
# 创建TracerProvider
provider = TracerProvider(resource=resource, id_generator=RandomIdGenerator())
# 配置Jaeger导出器
jaeger_exporter = JaegerExporter(
agent_host_name=jaeger_host,
agent_port=jaeger_port,
)
# 添加批量处理器
provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
# 设置全局TracerProvider
trace.set_tracer_provider(provider)
return trace.get_tracer(__name__)
def instrument_app(app: FastAPI, engine):
"""自动instrument FastAPI应用"""
# Instrument FastAPI
FastAPIInstrumentor.instrument_app(app)
# Instrument SQLAlchemy
SQLAlchemyInstrumentor().instrument(
engine=engine,
commenter_options={
"enable_commenter": True,
"commenter": lambda sql, params, _: f"/* trace_id={trace.get_current_span().get_span_context().trace_id} */ {sql}"
}
)
# Instrument HTTPX client
HTTPXClientInstrumentor().instrument()
# 添加中间件注入trace_id到响应头
@app.middleware("http")
async def trace_middleware(request: Request, call_next):
# 从请求头提取trace_id
trace_id = request.headers.get("X-Trace-Id")
if trace_id:
# 创建自定义span
tracer = trace.get_tracer(__name__)
ctx = trace.set_span_in_context(trace.get_current_span())
with tracer.start_as_current_span("http_request", context=ctx):
response = await call_next(request)
response.headers["X-Trace-Id"] = trace_id
return response
response = await call_next(request)
span = trace.get_current_span()
if span:
span_context = span.get_span_context()
if span_context.is_valid:
response.headers["X-Trace-Id"] = format(span_context.trace_id, '032x')
return response
# 手动创建Span
def create_span(name: str, attributes: dict = None):
"""创建自定义Span的装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(name) as span:
if attributes:
for key, value in attributes.items():
span.set_attribute(key, value)
try:
result = await func(*args, **kwargs)
span.set_status(Status(StatusCode.OK))
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
return wrapper
return decorator
# 使用示例
@create_span("process_payment", attributes={"payment_method": "credit_card"})
async def process_payment(order_id: str, amount: Decimal):
tracer = trace.get_tracer(__name__)
# 创建子Span
with tracer.start_as_current_span("validate_payment") as span:
# 验证逻辑
span.set_attribute("order_id", order_id)
pass
with tracer.start_as_current_span("call_payment_gateway") as span:
# 调用支付网关
result = await payment_gateway.charge(amount)
span.set_attribute("transaction_id", result.transaction_id)
return result
八、效能度量与持续改进
8.1 DORA指标
# scripts/measure_dora_metrics.py
import subprocess
import json
from datetime import datetime, timedelta
from typing import Dict, List
import requests
class DORAMetrics:
"""DORA指标采集器"""
def __init__(self, github_token: str, repo: str):
self.github_token = github_token
self.repo = repo
self.github_api = "https://api.github.com/repos"
def deployment_frequency(self, days: int = 30) -> Dict:
"""部署频率 - 每天部署次数"""
# 从GitHub Actions获取部署记录
headers = {"Authorization": f"Bearer {self.github_token}"}
url = f"{self.github_api}/{self.repo}/actions/runs"
response = requests.get(url, headers=headers)
runs = response.json()["workflow_runs"]
# 筛选成功的部署
deployments = [
run for run in runs
if run["conclusion"] == "success"
and "deploy" in run["name"].lower()
and datetime.fromisoformat(run["created_at"]) > datetime.now() - timedelta(days=days)
]
deployments_per_day = len(deployments) / days
# 评级
if deployments_per_day >= 10:
rating = "Elite"
elif deployments_per_day >= 5:
rating = "High"
elif deployments_per_day >= 1:
rating = "Medium"
else:
rating = "Low"
return {
"metric": "Deployment Frequency",
"value": f"{deployments_per_day:.2f} deployments/day",
"rating": rating,
"total_deployments": len(deployments),
"period_days": days
}
def lead_time_for_changes(self) -> Dict:
"""变更前置时间 - 从代码提交到部署的时间"""
# 获取最近的PR
headers = {"Authorization": f"Bearer {self.github_token}"}
url = f"{self.github_api}/{self.repo}/pulls?state=closed&sort=updated&direction=desc&per_page=50"
response = requests.get(url, headers=headers)
pulls = response.json()
lead_times = []
for pr in pulls:
if not pr["merged_at"]:
continue
# PR创建时间
created_at = datetime.fromisoformat(pr["created_at"].replace("Z", "+00:00"))
# PR合并时间
merged_at = datetime.fromisoformat(pr["merged_at"].replace("Z", "+00:00"))
# 变更前置时间
lead_time = (merged_at - created_at).total_seconds() / 3600 # 小时
lead_times.append(lead_time)
if not lead_times:
return {"metric": "Lead Time for Changes", "value": "N/A", "rating": "N/A"}
median_lead_time = sorted(lead_times)[len(lead_times) // 2]
# 评级(小时)
if median_lead_time < 1:
rating = "Elite"
elif median_lead_time < 24:
rating = "High"
elif median_lead_time < 168: # 1周
rating = "Medium"
else:
rating = "Low"
return {
"metric": "Lead Time for Changes",
"value": f"{median_lead_time:.1f} hours",
"rating": rating,
"samples": len(lead_times)
}
def time_to_restore_service(self) -> Dict:
"""服务恢复时间 - 从故障发生到恢复的时间"""
# 从监控系统获取故障记录
# 这里需要集成如Sentry、DataDog等APM系统
# 模拟数据
incidents = [
{"duration_minutes": 15, "severity": "high"},
{"duration_minutes": 45, "severity": "medium"},
{"duration_minutes": 120, "severity": "low"},
]
if not incidents:
return {"metric": "Time to Restore Service", "value": "N/A", "rating": "N/A"}
avg_restore_time = sum(i["duration_minutes"] for i in incidents) / len(incidents)
# 评级(分钟)
if avg_restore_time < 60:
rating = "Elite"
elif avg_restore_time < 240:
rating = "High"
elif avg_restore_time < 1440: # 24小时
rating = "Medium"
else:
rating = "Low"
return {
"metric": "Time to Restore Service",
"value": f"{avg_restore_time:.1f} minutes",
"rating": rating,
"incidents": len(incidents)
}
def change_failure_rate(self) -> Dict:
"""变更失败率 - 导致服务降级的变更比例"""
# 统计部署失败和回滚的次数
headers = {"Authorization": f"Bearer {self.github_token}"}
url = f"{self.github_api}/{self.repo}/deployments"
response = requests.get(url, headers=headers)
deployments = response.json()
total_deployments = len(deployments)
failed_deployments = sum(1 for d in deployments if d["status"] == "failure")
if total_deployments == 0:
return {"metric": "Change Failure Rate", "value": "N/A", "rating": "N/A"}
failure_rate = (failed_deployments / total_deployments) * 100
# 评级
if failure_rate < 15:
rating = "Elite"
elif failure_rate < 25:
rating = "High"
elif failure_rate < 45:
rating = "Medium"
else:
rating = "Low"
return {
"metric": "Change Failure Rate",
"value": f"{failure_rate:.1f}%",
"rating": rating,
"total_deployments": total_deployments,
"failed_deployments": failed_deployments
}
def generate_report(self) -> Dict:
"""生成DORA指标报告"""
return {
"timestamp": datetime.now().isoformat(),
"metrics": [
self.deployment_frequency(),
self.lead_time_for_changes(),
self.time_to_restore_service(),
self.change_failure_rate()
]
}
8.2 开发者体验度量
# scripts/developer_experience.py
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime, timedelta
import subprocess
@dataclass
class DevExperienceMetrics:
"""开发者体验指标"""
# 环境搭建时间
environment_setup_time: Optional[float] = None # 分钟
# 本地构建时间
local_build_time: Optional[float] = None # 秒
# 测试执行时间
test_execution_time: Optional[float] = None # 秒
# CI反馈时间
ci_feedback_time: Optional[float] = None # 分钟
# 代码审查等待时间
pr_review_wait_time: Optional[float] = None # 小时
# 新人上手时间(调查获得)
onboarding_time_days: Optional[float] = None
def measure_ci_feedback_time(self, repo: str, token: str):
"""测量CI反馈时间"""
import requests
headers = {"Authorization": f"Bearer {token}"}
url = f"https://api.github.com/repos/{repo}/actions/runs"
response = requests.get(url, headers=headers)
runs = response.json()["workflow_runs"]
feedback_times = []
for run in runs[:10]: # 最近10次运行
created = datetime.fromisoformat(run["created_at"])
updated = datetime.fromisoformat(run["updated_at"])
feedback_time = (updated - created).total_seconds() / 60 # 分钟
feedback_times.append(feedback_time)
self.ci_feedback_time = sum(feedback_times) / len(feedback_times)
return self.ci_feedback_time
def measure_build_time(self):
"""测量本地构建时间"""
import time
start = time.time()
result = subprocess.run(["poetry", "build"], capture_output=True)
self.local_build_time = time.time() - start
return self.local_build_time
def generate_suggestions(self) -> List[str]:
"""生成改进建议"""
suggestions = []
if self.ci_feedback_time and self.ci_feedback_time > 10:
suggestions.append("CI时间超过10分钟,建议优化:")
suggestions.append("- 并行化测试执行")
suggestions.append("- 使用缓存加速依赖安装")
suggestions.append("- 将测试分层,快速反馈单元测试")
if self.local_build_time and self.local_build_time > 30:
suggestions.append("本地构建时间过长,建议:")
suggestions.append("- 使用增量构建")
suggestions.append("- 缓存依赖")
if self.pr_review_wait_time and self.pr_review_wait_time > 24:
suggestions.append("PR审查等待时间过长,建议:")
suggestions.append("- 设定SLA(4小时内响应)")
suggestions.append("- 使用PR审查轮值制度")
suggestions.append("- 小批量提交PR")
return suggestions
8.3 效能仪表板
# scripts/dashboard.py - 简单的效能仪表板
from flask import Flask, render_template, jsonify
from datetime import datetime
import psutil
import platform
app = Flask(__name__)
class EngineeringDashboard:
"""工程效能仪表板"""
def __init__(self):
self.dora = DORAMetrics(github_token=os.getenv("GITHUB_TOKEN"), repo="myorg/myapp")
self.dev_metrics = DevExperienceMetrics()
def get_system_health(self):
"""系统健康状态"""
return {
"cpu_usage": psutil.cpu_percent(),
"memory_usage": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent,
"system": platform.system(),
"python_version": platform.python_version(),
"uptime": datetime.now() - datetime.fromtimestamp(psutil.boot_time())
}
def get_team_velocity(self):
"""团队效能指标"""
# 从项目管理工具(如Jira)获取
return {
"sprint_velocity": 85, # 故事点
"on_time_delivery": 0.92, # 准时交付率
"bug_rate": 0.05, # Bug率
"technical_debt_ratio": 0.15 # 技术债务比例
}
def get_quality_metrics(self):
"""代码质量指标"""
return {
"test_coverage": 85.5, # %
"cyclomatic_complexity": 4.2, # 平均
"code_duplication": 3.1, # %
"sonar_quality_gate": "Passed",
"security_vulnerabilities": 0
}
@app.route("/")
def dashboard():
"""效能仪表板主页"""
dashboard = EngineeringDashboard()
return render_template("dashboard.html",
dora_metrics=dashboard.dora.generate_report(),
system_health=dashboard.get_system_health(),
team_velocity=dashboard.get_team_velocity(),
quality_metrics=dashboard.get_quality_metrics(),
refresh_time=datetime.now()
)
@app.route("/api/metrics")
def api_metrics():
"""API获取指标数据"""
dashboard = EngineeringDashboard()
return jsonify({
"dora": dashboard.dora.generate_report(),
"quality": dashboard.get_quality_metrics(),
"timestamp": datetime.now().isoformat()
})