五、疑难问题攻坚实战
5.1 案例一:内存泄漏排查
"""
案例:Python服务内存持续增长,最终OOM
"""
# 问题代码示例(有内存泄漏)
class OrderProcessor:
def __init__(self):
self.order_cache = {} # 无限增长的缓存
self.callbacks = [] # 回调列表无限增长
def process_order(self, order_id):
# 每次处理都缓存订单
order = self.load_order(order_id)
self.order_cache[order_id] = order # 永远不清理
# 注册回调
self.callbacks.append(lambda: self.on_order_complete(order_id))
return order
def on_order_complete(self, order_id):
pass
# 排查工具
def diagnose_memory_leak():
"""
内存泄漏排查步骤
"""
steps = [
"1. 使用tracemalloc获取内存快照对比",
"2. 使用objgraph查看对象引用关系",
"3. 使用memory_profiler分析内存增长趋势",
"4. 使用gc.collect()手动触发GC验证是否有循环引用",
"5. 在生产环境添加内存监控和dump功能"
]
return steps
# 修复后的代码
class OrderProcessorFixed:
def __init__(self, max_cache_size=1000, cache_ttl=3600):
from collections import OrderedDict
from datetime import datetime
# 使用LRU缓存
self.order_cache = OrderedDict()
self.max_cache_size = max_cache_size
self.cache_ttl = cache_ttl
# 使用弱引用避免回调持有对象
import weakref
self.callbacks = weakref.WeakSet()
def process_order(self, order_id):
# 检查缓存是否过期
if order_id in self.order_cache:
cached, timestamp = self.order_cache[order_id]
if time.time() - timestamp < self.cache_ttl:
return cached
else:
del self.order_cache[order_id]
order = self.load_order(order_id)
# 缓存订单
self.order_cache[order_id] = (order, time.time())
# LRU淘汰
while len(self.order_cache) > self.max_cache_size:
self.order_cache.popitem(last=False)
# 使用弱引用回调
callback = lambda: self.on_order_complete(order_id)
self.callbacks.add(callback)
return order
def load_order(self, order_id):
# 加载订单逻辑
pass
def on_order_complete(self, order_id):
# 订单完成后的处理
pass
5.2 案例二:线上死锁排查
"""
案例:MySQL死锁导致订单服务间歇性失败
"""
# 死锁场景复现
"""
-- 事务1
BEGIN;
UPDATE orders SET status = 'paid' WHERE id = 1;
UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 100;
COMMIT;
-- 事务2
BEGIN;
UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 100;
UPDATE orders SET status = 'paid' WHERE id = 1;
COMMIT;
-- 两个事务互相等待对方持有的锁,形成死锁
"""
# 死锁排查工具
class DeadlockAnalyzer:
@staticmethod
def analyze_mysql_deadlock():
"""分析MySQL死锁日志"""
# 查看最近的死锁信息
sql_show_engine_status = "SHOW ENGINE INNODB STATUS;"
# 从输出中提取死锁部分
# LATEST DETECTED DEADLOCK 部分包含详细信息
# 启用死锁日志
sql_setup = """
SET GLOBAL innodb_print_all_deadlocks = ON;
"""
return sql_show_engine_status
@staticmethod
def extract_deadlock_info(innodb_status):
"""从InnoDB状态中提取死锁信息"""
import re
# 正则匹配死锁部分
pattern = r'LATEST DETECTED DEADLOCK\n(.*?)\n---TRANSACTION'
match = re.search(pattern, innodb_status, re.DOTALL)
if match:
deadlock_info = match.group(1)
# 提取涉及的SQL
sql_pattern = r'\(1\) (?:TRANSACTION|WAITING FOR THIS LOCK TO BE GRANTED).*?\n(.*?)\n'
sqls = re.findall(sql_pattern, deadlock_info, re.DOTALL)
return {
"deadlock_info": deadlock_info,
"involved_sqls": sqls
}
return None
# 解决方案:调整事务顺序
class DeadlockSolution:
@staticmethod
def fix_order_processing():
"""
解决方案:所有事务按照相同的顺序访问资源
"""
# 定义资源访问顺序
RESOURCE_ORDER = ["orders", "inventory", "payment"]
def process_order_safe(order_id, product_id):
"""
按固定顺序访问资源
"""
for resource in RESOURCE_ORDER:
if resource == "orders":
# 操作订单表
pass
elif resource == "inventory":
# 操作库存表
pass
elif resource == "payment":
# 操作支付表
pass
# 使用乐观锁代替悲观锁
def update_with_optimistic_lock(table, id, updates, version):
sql = f"""
UPDATE {table}
SET {updates}, version = version + 1
WHERE id = %s AND version = %s
"""
affected = db.execute(sql, (id, version))
if affected == 0:
# 版本冲突,重试
raise OptimisticLockException("Data changed, retry")
return affected
return update_with_optimistic_lock
5.3 案例三:生产环境CPU飙升排查
"""
案例:某个时刻CPU突然飙升到100%
"""
# 排查流程脚本
class CPUSpikeInvestigation:
"""CPU飙升排查脚本"""
@staticmethod
def investigation_plan():
"""
系统化的排查计划
"""
plan = [
{
"phase": "1. 快速定位进程",
"commands": [
"top -c", # 查看CPU最高的进程
"ps aux --sort=-%cpu | head -10" # CPU Top 10进程
]
},
{
"phase": "2. 定位线程",
"commands": [
"top -H -p <pid>", # 查看进程内的线程CPU
"ps -T -p <pid>" # 列出所有线程
]
},
{
"phase": "3. 获取堆栈",
"commands": [
"jstack <pid> > stack.log" # Java
"py-spy dump --pid <pid>" # Python
"gdb attach <pid>", # C/C++
]
},
{
"phase": "4. 分析热点",
"commands": [
"perf top -p <pid>", # Linux perf
"py-spy record -o profile.svg --pid <pid>", # 生成火焰图
]
},
{
"phase": "5. 检查GC",
"commands": [
"jstat -gcutil <pid> 1000", # Java GC统计
]
}
]
return plan
@staticmethod
def python_hotspot_analysis():
"""Python热点分析代码"""
import cProfile
import pstats
import io
def profile_function(func):
def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
result = func(*args, **kwargs)
pr.disable()
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(20) # 打印前20个最耗时的函数
print(s.getvalue())
return result
return wrapper
return profile_function
@staticmethod
def analyze_flamegraph(svg_path):
"""分析火焰图"""
# 火焰图解读要点
tips = """
1. X轴宽度:函数占用CPU的时间比例
2. Y轴高度:调用栈深度
3. 颜色:通常随机,没有特殊含义
4. 平顶山(宽而平):函数本身耗时,需要优化该函数
5. 尖塔(窄而高):调用链深但每层耗时少
"""
return tips
# 实时CPU监控与告警
class CPUAlert:
"""CPU告警和自动诊断"""
def __init__(self, threshold=80):
self.threshold = threshold
self.history = []
def check_and_diagnose(self):
"""检查CPU并在超标时自动诊断"""
import psutil
cpu_percent = psutil.cpu_percent(interval=1)
self.history.append(cpu_percent)
if cpu_percent > self.threshold:
print(f"⚠️ CPU使用率超标: {cpu_percent}%")
# 自动诊断
self.auto_diagnose()
return True
return False
def auto_diagnose(self):
"""自动诊断CPU飙升原因"""
import psutil
# 1. 找出CPU最高的进程
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
try:
proc_info = proc.info
if proc_info['cpu_percent'] > 0:
processes.append(proc_info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
processes.sort(key=lambda x: x['cpu_percent'], reverse=True)
print("CPU Top 5 进程:")
for proc in processes[:5]:
print(f" PID: {proc['pid']}, Name: {proc['name']}, CPU: {proc['cpu_percent']}%")
# 2. 如果是Python进程,建议进一步分析
python_procs = [p for p in processes if 'python' in p['name'].lower()]
for proc in python_procs:
print(f"\n建议对 Python 进程 {proc['pid']} 执行:")
print(f" py-spy dump --pid {proc['pid']}")
print(f" py-spy record -o profile.svg --pid {proc['pid']}")
5.4 案例四:数据库连接池耗尽
"""
案例:高峰期数据库连接池耗尽,新请求无法获取连接
"""
class ConnectionPoolAnalyzer:
"""连接池问题分析器"""
@staticmethod
def analyze_pool_exhaustion():
"""
连接池耗尽原因分析
"""
reasons = {
"连接泄漏": {
"症状": "连接数持续增长不下降",
"定位": "监控连接数曲线,代码中搜索连接获取和释放",
"解决": "使用with语句或try-finally确保释放"
},
"慢查询堆积": {
"症状": "连接数突然飙升后居高不下",
"定位": "检查慢查询日志,分析是否有大量慢查询",
"解决": "优化慢查询,增加索引,增加连接池大小"
},
"事务过长": {
"症状": "连接长期被占用",
"定位": "检查事务边界,监控长事务",
"解决": "拆分长事务,使用编程式事务"
},
"连接池配置不当": {
"症状": "明明有负载但连接池很小",
"定位": "检查连接池配置(max_connections, min_idle)",
"解决": "根据压测结果调整连接池参数"
}
}
return reasons
@staticmethod
def implement_connection_monitor():
"""实现连接池监控"""
from sqlalchemy import event
from sqlalchemy.pool import Pool
import time
class ConnectionMonitor:
def __init__(self):
self.connections = {}
self.slow_threshold = 5 # 5秒
@event.listens_for(Pool, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
"""连接从池中取出时记录"""
conn_id = id(dbapi_conn)
self.connections[conn_id] = {
"checkout_time": time.time(),
"stack": traceback.format_stack()
}
@event.listens_for(Pool, "checkin")
def on_checkin(dbapi_conn, connection_record):
"""连接返回池时检查"""
conn_id = id(dbapi_conn)
if conn_id in self.connections:
checkout_info = self.connections[conn_id]
duration = time.time() - checkout_info["checkout_time"]
if duration > self.slow_threshold:
# 连接占用时间过长,记录警告
logging.warning(f"Connection held for {duration:.2f}s")
logging.warning("Stack trace when acquired:")
logging.warning("".join(checkout_info["stack"]))
del self.connections[conn_id]
return ConnectionMonitor()
@staticmethod
def implement_retry_with_backoff():
"""实现带退避的重试机制"""
import random
from contextlib import contextmanager
@contextmanager
def get_connection_with_retry(pool, max_retries=3, base_delay=0.1):
"""带重试的连接获取"""
for attempt in range(max_retries):
try:
conn = pool.get_connection(timeout=5)
yield conn
return
except Exception as e:
if attempt == max_retries - 1:
raise
# 指数退避 + 随机抖动
delay = base_delay * (2 ** attempt) + random.uniform(0, 0.1)
time.sleep(delay)
logging.warning(f"Failed to get connection (attempt {attempt+1}), retrying in {delay:.2f}s")
# 连接池配置优化
class ConnectionPoolOptimizer:
"""连接池优化配置"""
@staticmethod
def calculate_optimal_pool_size(peak_qps, avg_query_time_ms, safety_factor=1.5):
"""
计算最优连接池大小
peak_qps: 峰值QPS
avg_query_time_ms: 平均查询耗时(毫秒)
safety_factor: 安全系数
"""
# 每个请求占用连接的时间(秒)
hold_time = avg_query_time_ms / 1000
# 所需连接数 = QPS * 平均持有时间
required = peak_qps * hold_time
# 加上安全系数和最小连接数
optimal = max(required * safety_factor, 10)
return int(optimal)
@staticmethod
def get_optimized_config(peak_qps=1000, avg_query_time_ms=50):
"""获取优化后的配置"""
pool_size = ConnectionPoolOptimizer.calculate_optimal_pool_size(
peak_qps, avg_query_time_ms
)
return {
"pool_size": pool_size,
"max_overflow": pool_size // 2, # 最大溢出连接数
"pool_timeout": 30, # 获取连接超时(秒)
"pool_recycle": 3600, # 连接回收时间(秒)
"pool_pre_ping": True, # 使用前检查连接
}
# 使用示例
config = ConnectionPoolOptimizer.get_optimized_config(peak_qps=2000, avg_query_time_ms=100)
print(f"推荐连接池配置: pool_size={config['pool_size']}")