四、代码审查与质量保障
4.1 有效的代码审查流程
# 代码审查检查清单
"""
【功能性】
□ 代码是否正确实现了需求?
□ 是否覆盖了边界情况?
□ 错误处理是否完善?
□ 是否有潜在的性能问题?
【可读性】
□ 命名是否清晰表达了意图?
□ 函数是否足够短小?
□ 注释是否解释了"为什么"?
□ 代码格式是否符合规范?
【可维护性】
□ 是否有重复代码?
□ 是否遵循了单一职责?
□ 依赖是否合理?
□ 是否易于测试?
【安全性】
□ 是否有SQL注入风险?
□ 是否有XSS漏洞?
□ 敏感信息是否被硬编码?
□ 权限检查是否正确?
【测试】
□ 单元测试是否充分?
□ 测试是否覆盖关键路径?
□ 测试是否独立可重复?
□ 边界条件是否有测试?
【文档】
□ 是否需要更新API文档?
□ 复杂逻辑是否有注释?
□ 是否需要更新架构文档?
"""
# 代码审查评论示例
"""
❌ 不好的评论:
"这里写错了"
"性能不好"
✅ 好的评论:
"suggestion: 这里使用字典而不是多个if-else会更清晰
```python
status_map = {'pending': 1, 'paid': 2, 'shipped': 3}
status_code = status_map.get(status, 0)
"
"question: 如果items为空,calculate_total会返回0,这是期望的行为吗?"
"nit: 变量名tmp不够清晰,建议改为order_total"
"praise: 这个使用策略模式的设计很好,易于扩展新的支付方式"
"blocker: 这里存在SQL注入风险,必须使用参数化查询"
"""
### 4.2 自动化质量检查
```python
# pytest.ini - 测试配置
[pytest]
minversion = 6.0
addopts =
-ra
-q
--strict-markers
--cov=src
--cov-report=html
--cov-report=term
--cov-fail-under=80
testpaths = tests
# 单元测试示例
import pytest
from decimal import Decimal
from myapp.order import OrderCalculator, Order, Item
class TestOrderCalculator:
"""订单计算器测试"""
@pytest.fixture
def calculator(self):
return OrderCalculator()
@pytest.fixture
def sample_order(self):
order = Order()
order.add_item(PhysicalItem(price=Decimal('10'), quantity=2))
order.add_item(DigitalItem(price=Decimal('5'), quantity=1))
return order
def test_calculate_total_without_discount(self, calculator, sample_order):
"""测试:无折扣时计算总价"""
total = calculator.calculate_total(sample_order)
# 物理商品:(10 + 10*0.1) * 2 = 22
# 数字商品:5 * 1 = 5
# 总计:27
assert total == Decimal('27')
def test_calculate_total_with_percentage_discount(self, calculator, sample_order):
"""测试:百分比折扣"""
sample_order.apply_coupon(PercentageCoupon(10)) # 9折
total = calculator.calculate_total(sample_order)
assert total == Decimal('24.30') # 27 * 0.9
def test_calculate_total_with_fixed_discount(self, calculator, sample_order):
"""测试:固定金额折扣"""
sample_order.apply_coupon(FixedAmountCoupon(Decimal('5')))
total = calculator.calculate_total(sample_order)
assert total == Decimal('22') # 27 - 5
def test_calculate_total_never_negative(self, calculator):
"""测试:总价不会为负数"""
order = Order()
order.add_item(GiftCardItem(price=Decimal('10'), quantity=1))
order.apply_coupon(FixedAmountCoupon(Decimal('20')))
total = calculator.calculate_total(order)
assert total >= Decimal('0')
assert total == Decimal('0')
def test_empty_order(self, calculator):
"""测试:空订单"""
order = Order()
total = calculator.calculate_total(order)
assert total == Decimal('0')
@pytest.mark.parametrize("price,quantity,expected", [
(Decimal('10'), 1, Decimal('11')), # 物理商品含税
(Decimal('10'), 2, Decimal('22')),
(Decimal('0'), 1, Decimal('0')),
(Decimal('0.01'), 1, Decimal('0.011')),
])
def test_physical_item_calculation(self, price, quantity, expected):
"""测试:物理商品价格计算参数化"""
item = PhysicalItem(price=price, quantity=quantity)
assert item.calculate_total() == expected
@pytest.mark.asyncio
async def test_concurrent_calculations(self, calculator):
"""测试:并发计算(异步测试)"""
import asyncio
orders = [create_test_order() for _ in range(100)]
tasks = [calculator.calculate_total_async(order) for order in orders]
results = await asyncio.gather(*tasks)
assert len(results) == 100
assert all(isinstance(r, Decimal) for r in results)
4.3 持续集成中的质量门禁
# .github/workflows/quality.yml
name: Code Quality Check
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
quality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install -r requirements-dev.txt
pip install ruff black mypy pytest pytest-cov
- name: Format check (Black)
run: black --check --diff src/
- name: Import sorting (isort)
run: isort --check-only --diff src/
- name: Linting (Ruff)
run: ruff check src/
- name: Type checking (mypy)
run: mypy src/ --ignore-missing-imports
- name: Run tests with coverage
run: |
pytest --cov=src --cov-report=xml --cov-fail-under=80
- name: Complexity check
run: |
radon cc src/ --min C --show-complexity
radon mi src/ --min B
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
fail_ci_if_error: true
五、实战案例:重构一个真实场景
5.1 初始代码(遗留系统)
# legacy_order_system.py - 一个需要重构的订单处理系统
import json
import hashlib
import smtplib
import sqlite3
from datetime import datetime
class OrderSystem:
def __init__(self, db_path):
self.conn = sqlite3.connect(db_path)
self.cursor = self.conn.cursor()
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY,
user_id TEXT,
data TEXT,
total REAL,
status TEXT,
created_at TEXT
)
''')
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
email TEXT,
name TEXT
)
''')
self.conn.commit()
def process_order(self, order_data):
# 解析输入
user_id = order_data.get('user_id')
items = order_data.get('items', [])
coupon = order_data.get('coupon')
# 检查用户
self.cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
user = self.cursor.fetchone()
if not user:
return {"error": "User not found"}
# 计算总价
total = 0
for item in items:
price = item.get('price', 0)
qty = item.get('quantity', 1)
total += price * qty
# 应用优惠券
if coupon:
if coupon['type'] == 'percentage':
total = total * (1 - coupon['value'] / 100)
elif coupon['type'] == 'fixed':
total = total - coupon['value']
# 检查库存
for item in items:
product_id = item.get('product_id')
qty = item.get('quantity', 1)
self.cursor.execute("SELECT stock FROM inventory WHERE product_id = ?", (product_id,))
row = self.cursor.fetchone()
if not row or row[0] < qty:
return {"error": f"Insufficient stock for product {product_id}"}
# 扣减库存
for item in items:
product_id = item.get('product_id')
qty = item.get('quantity', 1)
self.cursor.execute("UPDATE inventory SET stock = stock - ? WHERE product_id = ?", (qty, product_id))
# 保存订单
order_id = hashlib.md5(f"{user_id}{datetime.now()}".encode()).hexdigest()
order_data_str = json.dumps(items)
self.cursor.execute(
"INSERT INTO orders (id, user_id, data, total, status, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(order_id, user_id, order_data_str, total, 'pending', datetime.now().isoformat())
)
self.conn.commit()
# 发送邮件
try:
smtp = smtplib.SMTP('smtp.gmail.com', 587)
smtp.starttls()
smtp.login('system@myapp.com', 'password')
msg = f"Subject: Order Confirmation\n\nYour order {order_id} has been created. Total: ${total}"
smtp.sendmail('system@myapp.com', user[1], msg)
smtp.quit()
except:
pass
# 记录日志
with open('orders.log', 'a') as f:
f.write(f"{datetime.now()}: Order {order_id} created for user {user_id}\n")
return {
"order_id": order_id,
"total": total,
"status": "pending"
}
# 使用方式
system = OrderSystem('orders.db')
result = system.process_order({
'user_id': 'user123',
'items': [
{'product_id': 'p1', 'price': 10.99, 'quantity': 2},
{'product_id': 'p2', 'price': 25.50, 'quantity': 1}
],
'coupon': {'type': 'percentage', 'value': 10}
})
print(result)
5.2 识别问题
这段代码存在以下问题:
违反单一职责:一个函数做了验证、计算、数据库操作、邮件发送、日志记录
紧耦合:直接依赖SQLite、SMTP、文件系统
难以测试:无法mock依赖,测试会真实发邮件、写数据库
错误处理不完善:try/except吞掉了异常
安全问题:密码硬编码、SQL注入风险(虽然用了参数化,但其他地方可能有问题)
魔法数字和字符串:'pending'、'percentage'等
代码重复:多次遍历items
类型不安全:使用字典和字符串传递数据
5.3 重构步骤
# step1_domain_models.py - 创建领域模型
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import List, Optional
from uuid import uuid4
class OrderStatus(Enum):
PENDING = "pending"
PAID = "paid"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
class ProductType(Enum):
PHYSICAL = "physical"
DIGITAL = "digital"
GIFT_CARD = "gift_card"
@dataclass(frozen=True)
class Product:
"""商品实体"""
id: str
name: str
price: Decimal
product_type: ProductType
stock: int
def has_enough_stock(self, quantity: int) -> bool:
return self.stock >= quantity
def reduce_stock(self, quantity: int) -> 'Product':
if not self.has_enough_stock(quantity):
raise InsufficientStockError(self.id, self.stock, quantity)
return Product(
id=self.id,
name=self.name,
price=self.price,
product_type=self.product_type,
stock=self.stock - quantity
)
@dataclass
class OrderItem:
"""订单项值对象"""
product: Product
quantity: int
@property
def subtotal(self) -> Decimal:
return self.product.price * self.quantity
@property
def requires_shipping(self) -> bool:
return self.product.product_type == ProductType.PHYSICAL
@dataclass
class Coupon:
"""优惠券值对象"""
code: str
type: str # 'percentage' or 'fixed'
value: Decimal
def apply(self, amount: Decimal) -> Decimal:
if self.type == 'percentage':
return amount * (Decimal('1') - self.value / Decimal('100'))
elif self.type == 'fixed':
return max(amount - self.value, Decimal('0'))
else:
return amount
@dataclass
class Order:
"""订单聚合根"""
id: str = field(default_factory=lambda: str(uuid4()))
user_id: str = ""
items: List[OrderItem] = field(default_factory=list)
coupon: Optional[Coupon] = None
status: OrderStatus = OrderStatus.PENDING
created_at: datetime = field(default_factory=datetime.now)
paid_at: Optional[datetime] = None
@property
def subtotal(self) -> Decimal:
return sum(item.subtotal for item in self.items)
@property
def total(self) -> Decimal:
total = self.subtotal
if self.coupon:
total = self.coupon.apply(total)
return total
def add_item(self, product: Product, quantity: int):
if self.status != OrderStatus.PENDING:
raise OrderAlreadyProcessedError("Cannot modify order after processing")
if quantity <= 0:
raise InvalidQuantityError("Quantity must be positive")
if not product.has_enough_stock(quantity):
raise InsufficientStockError(product.id, product.stock, quantity)
self.items.append(OrderItem(product, quantity))
def apply_coupon(self, coupon: Coupon):
if self.status != OrderStatus.PENDING:
raise OrderAlreadyProcessedError("Cannot apply coupon after processing")
self.coupon = coupon
def mark_as_paid(self):
if self.status != OrderStatus.PENDING:
raise InvalidOrderStateError(f"Cannot pay order with status {self.status}")
self.status = OrderStatus.PAID
self.paid_at = datetime.now()
def mark_as_shipped(self):
if self.status != OrderStatus.PAID:
raise InvalidOrderStateError(f"Cannot ship order with status {self.status}")
self.status = OrderStatus.SHIPPED
def cancel(self):
if self.status in [OrderStatus.SHIPPED, OrderStatus.DELIVERED]:
raise OrderCannotBeCancelledError(f"Cannot cancel order with status {self.status}")
self.status = OrderStatus.CANCELLED
# step2_services.py - 创建服务层
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
import logging
logger = logging.getLogger(__name__)
# 端口定义
class OrderRepository(ABC):
@abstractmethod
async def save(self, order: Order) -> None:
pass
@abstractmethod
async def find_by_id(self, order_id: str) -> Optional[Order]:
pass
class ProductRepository(ABC):
@abstractmethod
async def find_by_id(self, product_id: str) -> Optional[Product]:
pass
@abstractmethod
async def update(self, product: Product) -> None:
pass
class UserRepository(ABC):
@abstractmethod
async def find_by_id(self, user_id: str) -> Optional[User]:
pass
class NotificationService(ABC):
@abstractmethod
async def send_order_confirmation(self, order: Order) -> None:
pass
class EventPublisher(ABC):
@abstractmethod
async def publish(self, event: Dict[str, Any]) -> None:
pass
# 应用服务
class OrderProcessor:
def __init__(
self,
order_repo: OrderRepository,
product_repo: ProductRepository,
user_repo: UserRepository,
notifier: NotificationService,
event_publisher: EventPublisher
):
self.order_repo = order_repo
self.product_repo = product_repo
self.user_repo = user_repo
self.notifier = notifier
self.event_publisher = event_publisher
async def create_order(
self,
user_id: str,
items_data: List[Dict],
coupon_data: Optional[Dict] = None
) -> Order:
"""创建订单的应用服务"""
# 验证用户
user = await self.user_repo.find_by_id(user_id)
if not user:
raise UserNotFoundError(user_id)
# 创建订单
order = Order(user_id=user_id)
# 添加商品
for item_data in items_data:
product = await self.product_repo.find_by_id(item_data['product_id'])
if not product:
raise ProductNotFoundError(item_data['product_id'])
order.add_item(product, item_data['quantity'])
# 扣减库存(在领域层之外处理,或者作为领域事件)
updated_product = product.reduce_stock(item_data['quantity'])
await self.product_repo.update(updated_product)
# 应用优惠券
if coupon_data:
coupon = Coupon(
code=coupon_data['code'],
type=coupon_data['type'],
value=Decimal(str(coupon_data['value']))
)
order.apply_coupon(coupon)
# 保存订单
await self.order_repo.save(order)
# 发送通知(异步)
await self.notifier.send_order_confirmation(order)
# 发布领域事件
await self.event_publisher.publish({
'type': 'order.created',
'order_id': order.id,
'user_id': user_id,
'total': str(order.total),
'timestamp': datetime.now().isoformat()
})
logger.info(f"Order created: {order.id} for user {user_id}, total: {order.total}")
return order
# step3_adapters.py - 实现适配器
import aiomysql
import json
import aiosmtplib
from email.mime.text import MIMEText
class MySQLOrderRepository(OrderRepository):
def __init__(self, pool):
self.pool = pool
async def save(self, order: Order) -> None:
async with self.pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("""
INSERT INTO orders (id, user_id, items, coupon, status, total, created_at, paid_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
status = %s, paid_at = %s
""", (
order.id, order.user_id,
json.dumps([self._item_to_dict(i) for i in order.items]),
json.dumps(order.coupon.__dict__) if order.coupon else None,
order.status.value, str(order.total),
order.created_at, order.paid_at,
order.status.value, order.paid_at
))
def _item_to_dict(self, item: OrderItem) -> Dict:
return {
'product_id': item.product.id,
'product_name': item.product.name,
'price': str(item.product.price),
'quantity': item.quantity,
'subtotal': str(item.subtotal)
}
async def find_by_id(self, order_id: str) -> Optional[Order]:
async with self.pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"SELECT * FROM orders WHERE id = %s",
(order_id,)
)
row = await cursor.fetchone()
if not row:
return None
# 重建订单对象
order = Order(
id=row[0],
user_id=row[1],
status=OrderStatus(row[5]),
created_at=row[7],
paid_at=row[8]
)
# 解析items和coupon...
return order
class EmailNotificationService(NotificationService):
def __init__(self, smtp_host: str, smtp_port: int, from_email: str, password: str):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.from_email = from_email
self.password = password
async def send_order_confirmation(self, order: Order) -> None:
"""发送订单确认邮件"""
subject = f"Order Confirmation - {order.id}"
body = f"""
Thank you for your order!
Order ID: {order.id}
Total: ${order.total}
Items:
{self._format_items(order.items)}
Status: {order.status.value}
"""
msg = MIMEText(body, 'plain', 'utf-8')
msg['Subject'] = subject
msg['From'] = self.from_email
msg['To'] = "user@example.com" # 应该从用户服务获取
try:
async with aiosmtplib.SMTP(hostname=self.smtp_host, port=self.smtp_port) as smtp:
await smtp.login(self.from_email, self.password)
await smtp.send_message(msg)
except Exception as e:
logger.error(f"Failed to send email for order {order.id}: {e}")
# 不抛出异常,只记录日志
def _format_items(self, items: List[OrderItem]) -> str:
return "\n".join(
f" - {item.product.name}: {item.quantity} x ${item.product.price} = ${item.subtotal}"
for item in items
)
# step4_main.py - 组装应用
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, HTTPException
# 依赖注入容器
class Container:
def __init__(self):
self.db_pool = None
self.order_repo = None
self.product_repo = None
self.user_repo = None
self.notifier = None
self.event_publisher = None
self.order_processor = None
async def init(self):
# 初始化数据库连接池
self.db_pool = await create_db_pool()
# 初始化仓储
self.order_repo = MySQLOrderRepository(self.db_pool)
self.product_repo = MySQLProductRepository(self.db_pool)
self.user_repo = MySQLUserRepository(self.db_pool)
# 初始化服务
self.notifier = EmailNotificationService(
smtp_host=os.getenv("SMTP_HOST"),
smtp_port=int(os.getenv("SMTP_PORT")),
from_email=os.getenv("FROM_EMAIL"),
password=os.getenv("EMAIL_PASSWORD")
)
self.event_publisher = RedisEventPublisher()
# 初始化应用服务
self.order_processor = OrderProcessor(
order_repo=self.order_repo,
product_repo=self.product_repo,
user_repo=self.user_repo,
notifier=self.notifier,
event_publisher=self.event_publisher
)
async def close(self):
if self.db_pool:
self.db_pool.close()
await self.db_pool.wait_closed()
# FastAPI应用
app = FastAPI()
container = Container()
@app.on_event("startup")
async def startup():
await container.init()
@app.on_event("shutdown")
async def shutdown():
await container.close()
@app.post("/api/orders")
async def create_order(
request: CreateOrderRequest,
processor: OrderProcessor = Depends(lambda: container.order_processor)
):
"""创建订单API"""
try:
order = await processor.create_order(
user_id=request.user_id,
items_data=[item.dict() for item in request.items],
coupon_data=request.coupon.dict() if request.coupon else None
)
return OrderResponse.from_entity(order)
except UserNotFoundError:
raise HTTPException(status_code=404, detail="User not found")
except ProductNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except InsufficientStockError as e:
raise HTTPException(status_code=409, detail=str(e))
except Exception as e:
logger.exception("Unexpected error creating order")
raise HTTPException(status_code=500, detail="Internal server error")
5.4 重构成果对比
来源:
https://xbivx.cn/