三、常见问题类型与定位技巧
3.1 性能问题定位
3.1.1 CPU飙升问题
class CPUSpikeAnalyzer:
"""CPU飙升问题分析器"""
@staticmethod
def identify_culprit():
"""
定位CPU飙升的罪魁祸首
"""
commands = {
"top -H -p <pid>": "查看进程中线程的CPU使用情况",
"jstack <pid>": "获取Java线程堆栈",
"perf top -p <pid>": "实时分析CPU热点(Linux)",
"py-spy top --pid <pid>": "Python进程CPU分析",
}
return commands
@staticmethod
def analyze_python_cpu(pid):
"""分析Python进程CPU问题"""
import subprocess
# 使用py-spy采样
result = subprocess.run(
["py-spy", "record", "-o", "profile.svg", "--pid", str(pid), "--duration", "30"],
capture_output=True
)
# 分析火焰图定位热点函数
return "查看生成的profile.svg文件,找出CPU占用最高的函数调用栈"
@staticmethod
def common_cpu_causes():
"""常见的CPU飙升原因"""
return {
"死循环": "检查while/for循环条件是否永远为真",
"频繁GC": "检查GC日志,分析内存分配情况",
"正则表达式灾难性回溯": "检查复杂的正则表达式,特别是嵌套量词",
"序列化/反序列化": "检查JSON/Protobuf序列化热点",
"加密/解密操作": "检查是否有大量的加密计算",
"上下文切换频繁": "检查锁竞争和线程切换"
}
# Python中检测死循环的示例
import signal
import traceback
def detect_deadlock(timeout=5):
"""检测可能的死循环"""
def timeout_handler(signum, frame):
print("Possible dead loop detected!")
traceback.print_stack(frame)
raise TimeoutError("Function execution timeout")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
3.1.2 内存问题定位
class MemoryAnalyzer:
"""内存问题分析器"""
@staticmethod
def analyze_memory_leak():
"""分析内存泄漏"""
tools = {
"tracemalloc": "Python内置,追踪内存分配",
"memory_profiler": "逐行分析内存使用",
"objgraph": "分析对象引用关系",
"guppy3": "Heapy内存分析",
"Valgrind": "C/C++内存泄漏检测"
}
return tools
@staticmethod
def python_memory_debug():
"""Python内存调试示例"""
import tracemalloc
import gc
# 启用内存追踪
tracemalloc.start()
# 获取当前内存快照
snapshot1 = tracemalloc.take_snapshot()
# 执行可能泄漏的操作
# suspicious_operation()
# 获取新快照
snapshot2 = tracemalloc.take_snapshot()
# 比较差异
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
for stat in top_stats[:10]:
print(stat)
@staticmethod
def common_memory_issues():
"""常见内存问题"""
return {
"缓存无限增长": "检查缓存是否有过期策略",
"大对象未释放": "检查循环引用,使用weakref",
"全局/静态集合": "检查类变量和模块级变量",
"线程局部存储": "检查ThreadLocal是否及时清理",
"String拼接": "大量字符串拼接导致内存碎片",
"连接未关闭": "检查数据库/HTTP连接是否正确关闭"
}
# 循环引用检测
import gc
import objgraph
def find_cycles():
"""查找循环引用"""
gc.collect()
objgraph.show_most_common_types(limit=20)
# 查找特定对象的循环引用
objgraph.show_backrefs(some_object, max_depth=5)
# 显示引用循环
objgraph.show_chain(
objgraph.find_backref_chain(
objgraph.by_type('MyClass')[0],
objgraph.is_proper_module
)
)
3.1.3 慢查询定位
class SlowQueryAnalyzer:
"""慢查询分析器"""
@staticmethod
def analyze_mysql_slow_query():
"""MySQL慢查询分析"""
# 开启慢查询日志
commands = """
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 1; -- 超过1秒记录
SET GLOBAL log_queries_not_using_indexes = 'ON';
"""
# 分析慢查询日志
analyze_sql = """
SELECT
digest,
count_star,
avg_timer_wait / 1000000000 as avg_seconds,
max_timer_wait / 1000000000 as max_seconds,
sum_rows_examined,
sum_rows_sent
FROM performance_schema.events_statements_summary_by_digest
ORDER BY avg_timer_wait DESC
LIMIT 10;
"""
return analyze_sql
@staticmethod
def analyze_explain(explain_result):
"""分析EXPLAIN结果"""
analysis = {
"type": {
"ALL": "全表扫描,需要添加索引",
"index": "全索引扫描,可能需要优化",
"range": "范围扫描,较好",
"ref": "使用非唯一索引,较好",
"eq_ref": "使用唯一索引,很好",
"const": "使用主键/唯一索引,最好"
},
"extra": {
"Using filesort": "需要额外排序,考虑添加索引",
"Using temporary": "使用临时表,需要优化",
"Using index": "覆盖索引,好",
"Using where": "需要过滤,检查索引"
}
}
return analysis
# Python中分析数据库查询
import time
from contextlib import contextmanager
@contextmanager
def query_timer(query_name):
"""查询耗时监控"""
start = time.time()
yield
duration = time.time() - start
if duration > 1.0: # 超过1秒
print(f"慢查询警告: {query_name} 耗时 {duration:.2f}s")
# 记录到慢查询收集系统
record_slow_query(query_name, duration)
# 使用示例
with query_timer("get_user_orders"):
orders = db.execute("SELECT * FROM orders WHERE user_id = %s", user_id)
3.2 数据不一致问题定位
class DataInconsistencyAnalyzer:
"""数据不一致问题分析器"""
@staticmethod
def analyze_scenarios():
"""数据不一致的典型场景"""
scenarios = {
"主从延迟": {
"症状": "写入后立即读取读不到最新数据",
"定位": "检查主从延迟监控,SHOW SLAVE STATUS",
"解决": "读写分离时强制读主库,或使用半同步复制"
},
"分布式事务": {
"症状": "跨系统操作部分成功部分失败",
"定位": "检查事务日志,对比各系统数据",
"解决": "引入事务消息,最终一致性方案"
},
"缓存一致": {
"症状": "更新数据库后缓存还是旧数据",
"定位": "检查缓存更新/删除逻辑",
"解决": "采用Cache Aside模式,先更新DB后删除缓存"
},
"并发冲突": {
"症状": "数据被覆盖,丢失更新",
"定位": "检查是否有并发写同一数据",
"解决": "使用乐观锁(版本号)或悲观锁"
}
}
return scenarios
@staticmethod
def design_consistency_check(primary_db, replica_db, table, key_column):
"""设计一致性校验SQL"""
check_sql = f"""
SELECT
p.{key_column},
p.data_hash as primary_hash,
r.data_hash as replica_hash,
CASE
WHEN r.{key_column} IS NULL THEN 'missing_in_replica'
WHEN p.data_hash != r.data_hash THEN 'data_mismatch'
ELSE 'consistent'
END as status
FROM {table} p
LEFT JOIN {table}_replica r ON p.{key_column} = r.{key_column}
WHERE p.last_updated > DATE_SUB(NOW(), INTERVAL 1 HOUR)
HAVING status != 'consistent'
"""
return check_sql
@staticmethod
def implement_compensating_transaction():
"""实现补偿事务"""
class CompensatingTransaction:
def __init__(self):
self.actions = []
self.compensations = []
def add_step(self, action, compensation):
self.actions.append(action)
self.compensations.append(compensation)
def execute(self):
completed = []
try:
for i, action in enumerate(self.actions):
result = action()
completed.append(i)
if not result:
raise Exception(f"Step {i} failed")
return True
except Exception as e:
# 执行补偿
for i in reversed(completed):
self.compensations[i]()
raise e
# 使用示例
tx = CompensatingTransaction()
tx.add_step(
action=lambda: deduct_inventory(product_id, quantity),
compensation=lambda: restore_inventory(product_id, quantity)
)
tx.add_step(
action=lambda: create_order(order_data),
compensation=lambda: cancel_order(order_id)
)
tx.add_step(
action=lambda: charge_payment(amount),
compensation=lambda: refund_payment(amount)
)
try:
tx.execute()
except Exception as e:
print(f"事务失败,已执行补偿: {e}")
3.3 服务可用性问题定位
class AvailabilityAnalyzer:
"""服务可用性问题分析器"""
@staticmethod
def analyze_incident(incident_time, service_name):
"""分析服务故障"""
# 故障时间线分析
timeline = [
f"{incident_time - 300}: 故障前5分钟检查",
f"{incident_time}: 故障发生时刻",
f"{incident_time + 300}: 故障后5分钟检查",
]
# 检查变更
changes = {
"代码部署": "检查部署记录",
"配置变更": "检查配置中心变更历史",
"依赖变更": "检查外部服务/中间件变更",
"流量变化": "检查监控系统的流量曲线"
}
# 检查资源
resources = {
"CPU": "检查CPU使用率曲线",
"内存": "检查内存使用和GC情况",
"磁盘": "检查磁盘空间和IO",
"网络": "检查网络延迟和丢包"
}
return {
"timeline": timeline,
"changes_to_check": changes,
"resources_to_check": resources
}
@staticmethod
def implement_health_check():
"""实现健康检查端点"""
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/health/liveness")
async def liveness():
"""存活探针:检查进程是否活着"""
return {"status": "alive"}
@app.get("/health/readiness")
async def readiness():
"""就绪探针:检查服务是否准备好接收流量"""
checks = []
# 检查数据库连接
try:
await db.execute("SELECT 1")
checks.append(("database", True))
except Exception as e:
checks.append(("database", False, str(e)))
# 检查Redis连接
try:
await redis.ping()
checks.append(("redis", True))
except Exception as e:
checks.append(("redis", False, str(e)))
# 检查消息队列
try:
await mq.check_connection()
checks.append(("message_queue", True))
except Exception as e:
checks.append(("message_queue", False, str(e)))
# 检查依赖服务
try:
async with aiohttp.ClientSession() as session:
async with session.get("https://api.dependency.com/health") as resp:
checks.append(("dependency_service", resp.status == 200))
except Exception as e:
checks.append(("dependency_service", False, str(e)))
all_healthy = all(check[1] for check in checks)
status_code = 200 if all_healthy else 503
return {
"status": "ready" if all_healthy else "not_ready",
"checks": [{"name": c[0], "healthy": c[1], "error": c[2] if len(c) > 2 else None}
for c in checks]
}, status_code
@staticmethod
def implement_circuit_breaker():
"""实现断路器"""
import asyncio
from datetime import datetime
class CircuitBreaker:
def __init__(self, name, failure_threshold=5, recovery_timeout=60):
self.name = name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
async def call(self, func, *args, **kwargs):
if self.state == "OPEN":
if datetime.now() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
print(f"断路器 {self.name} 进入半开状态")
else:
raise Exception(f"断路器 {self.name} 打开,服务不可用")
try:
result = await func(*args, **kwargs)
if self.state == "HALF_OPEN":
# 半开状态下成功,关闭断路器
self.state = "CLOSED"
self.failure_count = 0
print(f"断路器 {self.name} 已关闭")
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.state == "CLOSED" and self.failure_count >= self.failure_threshold:
self.state = "OPEN"
print(f"断路器 {self.name} 打开(失败次数:{self.failure_count})")
elif self.state == "HALF_OPEN":
self.state = "OPEN"
print(f"断路器 {self.name} 半开状态下失败,重新打开")
raise e
return CircuitBreaker