五、架构落地的最佳实践
5.1 分阶段落地策略
# 架构落地三阶段
"""
阶段一:概念验证(1-2周)
目标:验证核心假设
产出:
- 最小可行架构(端到端跑通)
- 关键性能指标基线
- 技术风险评估
阶段二:基础设施搭建(2-4周)
目标:建立开发框架和CI/CD
产出:
- 代码模板和脚手架
- 自动化测试框架
- 持续集成流水线
- 监控告警系统
阶段三:迭代交付(持续)
目标:业务功能逐步迁移
产出:
- 每2周一个可交付版本
- 架构度量报告
- 团队知识沉淀
"""
5.2 架构治理
# governance.py - 架构治理规则
ARCHITECTURE_RULES = {
"layering": {
"description": "严格的分层架构",
"rules": [
"表现层不能直接调用基础设施层",
"领域层不能依赖基础设施层",
"所有外部依赖必须通过接口定义"
],
"enforcement": "使用静态代码分析检查"
},
"dependency": {
"description": "依赖方向控制",
"rules": [
"禁止循环依赖",
"依赖方向只能向内(依赖抽象)",
"核心模块不能依赖具体框架"
],
"enforcement": "使用dependency-cruiser检查"
},
"api_stability": {
"description": "API稳定性",
"rules": [
"公开API必须有版本号",
"破坏性变更必须升级主版本",
"废弃API必须有1个版本过渡期"
],
"enforcement": "API兼容性自动化测试"
},
"database": {
"description": "数据库访问规范",
"rules": [
"禁止在事务中调用外部API",
"大事务必须拆分",
"N+1查询检查",
"禁止SELECT *"
],
"enforcement": "代码审查 + 自动化检测"
}
}
def enforce_architecture_rules(pr_changes):
"""
在PR时自动检查架构规则
"""
violations = []
for change in pr_changes:
# 检查分层违规
if "controller" in change.file and "infrastructure" in change.imports:
violations.append(f"分层违规:{change.file} 直接依赖基础设施")
# 检查循环依赖
if detect_circular_dependency(change):
violations.append(f"循环依赖:{change.file}")
# 检查API兼容性
if not is_api_compatible(change):
violations.append(f"API不兼容:{change.file} 的破坏性变更")
if violations:
return {
"approved": False,
"violations": violations,
"suggestion": "请修复架构违规问题后重新提交"
}
return {"approved": True}
六、实战案例:微服务架构落地
6.1 背景
从单体架构迁移到微服务架构,需要解决服务拆分、服务发现、配置管理、链路追踪等问题。
6.2 服务发现组件
# service_registry.py - 服务注册与发现组件
import asyncio
import json
import random
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
import aioredis
@dataclass
class ServiceInstance:
"""服务实例信息"""
service_name: str
instance_id: str
host: str
port: int
metadata: Dict = field(default_factory=dict)
healthy: bool = True
last_heartbeat: datetime = field(default_factory=datetime.now)
tags: List[str] = field(default_factory=list)
def address(self) -> str:
return f"{self.host}:{self.port}"
def is_alive(self, ttl_seconds: int = 30) -> bool:
"""检查实例是否存活"""
if not self.healthy:
return False
age = (datetime.now() - self.last_heartbeat).total_seconds()
return age < ttl_seconds
class ServiceRegistry:
"""服务注册中心"""
def __init__(self, redis_url: str):
self.redis = aioredis.from_url(redis_url, decode_responses=True)
self._watchers: Dict[str, List] = {}
self._running = False
async def register(self, instance: ServiceInstance, ttl: int = 30):
"""注册服务实例"""
key = f"service:{instance.service_name}:{instance.instance_id}"
value = json.dumps({
"instance_id": instance.instance_id,
"host": instance.host,
"port": instance.port,
"metadata": instance.metadata,
"tags": instance.tags,
"healthy": instance.healthy,
"last_heartbeat": instance.last_heartbeat.isoformat()
})
# 存储服务信息,设置TTL
await self.redis.setex(key, ttl * 2, value)
# 添加到服务列表
await self.redis.sadd(f"service:{instance.service_name}:instances", instance.instance_id)
# 启动心跳
asyncio.create_task(self._heartbeat(instance, ttl))
logger.info(f"服务注册成功: {instance.service_name}/{instance.instance_id}")
async def _heartbeat(self, instance: ServiceInstance, ttl: int):
"""发送心跳"""
key = f"service:{instance.service_name}:{instance.instance_id}"
while self._running:
await asyncio.sleep(ttl / 2) # 每TTL/2秒发送一次心跳
try:
# 更新心跳时间
instance.last_heartbeat = datetime.now()
update = {
"last_heartbeat": instance.last_heartbeat.isoformat(),
"healthy": instance.healthy
}
await self.redis.hset(key, mapping=update)
# 续期
await self.redis.expire(key, ttl * 2)
except Exception as e:
logger.error(f"心跳发送失败: {e}")
break
async def discover(self, service_name: str) -> List[ServiceInstance]:
"""发现服务实例"""
instances_key = f"service:{service_name}:instances"
instance_ids = await self.redis.smembers(instances_key)
instances = []
for instance_id in instance_ids:
key = f"service:{service_name}:{instance_id}"
data = await self.redis.get(key)
if data:
instance_dict = json.loads(data)
instance = ServiceInstance(
service_name=service_name,
instance_id=instance_dict["instance_id"],
host=instance_dict["host"],
port=instance_dict["port"],
metadata=instance_dict.get("metadata", {}),
healthy=instance_dict.get("healthy", True),
last_heartbeat=datetime.fromisoformat(instance_dict["last_heartbeat"]),
tags=instance_dict.get("tags", [])
)
if instance.is_alive():
instances.append(instance)
return instances
async def get_instance(self, service_name: str, load_balance: str = "round_robin") -> Optional[ServiceInstance]:
"""获取一个可用实例(支持负载均衡)"""
instances = await self.discover(service_name)
if not instances:
return None
if load_balance == "random":
return random.choice(instances)
elif load_balance == "round_robin":
# 轮询实现
key = f"lb:{service_name}:round_robin"
index = await self.redis.incr(key)
return instances[index % len(instances)]
elif load_balance == "least_connection":
# 最少连接数
# 需要维护每个实例的连接计数
return min(instances, key=lambda i: i.metadata.get("connection_count", 0))
return instances[0]
async def watch(self, service_name: str, callback):
"""监听服务变更"""
if service_name not in self._watchers:
self._watchers[service_name] = []
self._watchers[service_name].append(callback)
# 启动监听任务
asyncio.create_task(self._watch_loop(service_name, callback))
async def _watch_loop(self, service_name: str, callback):
"""监听循环"""
last_instances = []
while self._running:
await asyncio.sleep(5)
current_instances = await self.discover(service_name)
# 检查变更
if self._has_changed(last_instances, current_instances):
await callback(current_instances)
last_instances = current_instances
def _has_changed(self, old: List, new: List) -> bool:
"""检查实例列表是否变化"""
if len(old) != len(new):
return True
old_ids = {i.instance_id for i in old}
new_ids = {i.instance_id for i in new}
return old_ids != new_ids
async def close(self):
"""关闭注册中心"""
self._running = False
await self.redis.close()
# 使用示例
class ServiceClient:
"""服务客户端 - 自动处理服务发现和负载均衡"""
def __init__(self, registry: ServiceRegistry, service_name: str):
self.registry = registry
self.service_name = service_name
self._http_session = None
async def _get_session(self):
if not self._http_session:
import aiohttp
self._http_session = aiohttp.ClientSession()
return self._http_session
async def request(self, path: str, method: str = "GET", **kwargs):
"""发送请求到服务"""
instance = await self.registry.get_instance(self.service_name)
if not instance:
raise ServiceUnavailableError(f"No available instance for {self.service_name}")
url = f"http://{instance.address()}{path}"
session = await self._get_session()
# 添加服务发现相关的请求头
headers = kwargs.get("headers", {})
headers["X-Service-Name"] = self.service_name
headers["X-Instance-Id"] = instance.instance_id
try:
async with session.request(method, url, headers=headers, **kwargs) as response:
return await response.json()
except Exception as e:
# 标记实例不健康
instance.healthy = False
raise ServiceRequestError(f"请求失败: {e}")
6.3 服务注册与使用
# 服务注册示例(在服务启动时执行)
async def start_service():
# 创建注册中心
registry = ServiceRegistry("redis://localhost:6379")
await registry.initialize()
# 创建服务实例
instance = ServiceInstance(
service_name="order-service",
instance_id=f"order-service-{os.getpid()}",
host="192.168.1.100",
port=8080,
metadata={
"version": "1.0.0",
"environment": "production",
"connection_count": 0
},
tags=["api", "v1"]
)
# 注册服务
await registry.register(instance, ttl=30)
# 启动HTTP服务...
app = create_app(registry)
await app.start()
# 服务调用示例
async def call_order_service():
registry = ServiceRegistry("redis://localhost:6379")
await registry.initialize()
client = ServiceClient(registry, "order-service")
# 调用订单服务
result = await client.request("/api/orders/123", method="GET")
print(f"订单信息: {result}")