四、监控体系与可观测性
4.1 三支柱可观测性模型
现代可观测性体系由三大支柱构成:指标(Metrics)、日志(Logs)、追踪(Traces)。
┌─────────────────────────────────────────────────────────────────┐
│ 可观测性三支柱 │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 指标 │ │ 日志 │ │ 追踪 │ │
│ │ Metrics │ │ Logs │ │ Traces │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ "什么出错了"│ │ "具体错误" │ │ "错误路径" │ │
│ │ 聚合数据 │ │ 事件详情 │ │ 请求链路 │ │
│ │ 时序趋势 │ │ 错误堆栈 │ │ 服务依赖 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
4.2 Prometheus + Pushgateway 监控短任务
对于短生命周期任务(如定时脚本、批处理作业),Prometheus默认的拉取模式并不适用。此时需要使用Pushgateway作为中间件,允许脚本主动推送指标数据。
import time
import requests
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
class ScriptMonitor:
"""脚本监控器 - 推送指标到Pushgateway"""
def __init__(self, pushgateway_url, job_name):
self.pushgateway_url = pushgateway_url
self.job_name = job_name
self.registry = CollectorRegistry()
# 定义指标
self.duration = Gauge(
'script_execution_duration_seconds',
'脚本执行耗时',
['script_name', 'env'],
registry=self.registry
)
self.result = Gauge(
'script_result',
'执行结果 0=成功 1=失败 2=超时',
['script_name', 'status'],
registry=self.registry
)
self.rows_processed = Gauge(
'script_rows_processed',
'处理的数据行数',
['script_name'],
registry=self.registry
)
def __call__(self, func):
"""装饰器:自动记录脚本执行指标"""
from functools import wraps
@wraps(func)
def wrapper(*args, **kwargs):
script_name = func.__name__
start_time = time.time()
try:
result_value = func(*args, **kwargs)
status = "success"
self.result.labels(script_name=script_name, status=status).set(0)
return result_value
except Exception as e:
status = "failure"
self.result.labels(script_name=script_name, status=status).set(1)
raise
finally:
duration = time.time() - start_time
self.duration.labels(script_name=script_name, env=os.getenv('ENV', 'dev')).set(duration)
# 推送到Pushgateway
push_to_gateway(
self.pushgateway_url,
job=f"{self.job_name}_{script_name}",
registry=self.registry
)
return wrapper
# 使用示例
monitor = ScriptMonitor('http://pushgateway:9091', 'data_pipeline')
@monitor
def process_daily_orders():
"""每日订单处理脚本"""
orders = fetch_orders()
for order in orders:
process_order(order)
return len(orders)
4.3 链路追踪集成
在微服务架构中,需要将中间件的调用纳入链路追踪体系:
# 使用OpenTelemetry集成RabbitMQ
from opentelemetry import trace
from opentelemetry.instrumentation.pika import PikaInstrumentor
# 自动instrument RabbitMQ客户端
PikaInstrumentor().instrument()
# 手动创建Span
tracer = trace.get_tracer(__name__)
def publish_with_trace(channel, exchange, routing_key, message):
"""带链路追踪的消息发布"""
with tracer.start_as_current_span("publish_message") as span:
span.set_attribute("messaging.system", "rabbitmq")
span.set_attribute("messaging.destination", exchange)
span.set_attribute("messaging.routing_key", routing_key)
# 注入trace上下文到消息头
headers = {}
trace.get_current_span().set_attribute("message_id", str(uuid.uuid4()))
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=json.dumps(message),
properties=pika.BasicProperties(
headers=headers,
delivery_mode=2
)
)
五、故障排查实战
5.1 消息积压的紧急处理
# 场景:消费者处理速度跟不上生产速度
# 1. 临时增加消费者数量
# 使用多线程消费
from concurrent.futures import ThreadPoolExecutor
class ScalableConsumer:
def __init__(self, channel, queue, num_workers=10):
self.channel = channel
self.queue = queue
self.executor = ThreadPoolExecutor(max_workers=num_workers)
# 设置预取计数为worker数
channel.basic_qos(prefetch_count=num_workers)
def start(self):
for _ in range(self.num_workers):
self.executor.submit(self._consume)
def _consume(self):
for method, properties, body in self.channel.consume(self.queue):
try:
# 处理消息
self.process_message(body)
self.channel.basic_ack(method.delivery_tag)
except Exception:
# 失败的消息可以重新入队或进入死信队列
self.channel.basic_nack(method.delivery_tag, requeue=False)
# 2. 临时扩大队列容量
# rabbitmqctl set_policy increase-limit "^overflow_queue$" \
# '{"max-length":1000000,"overflow":"drop-head"}'
# 3. 启用惰性队列(牺牲部分性能换取稳定性)
# rabbitmqctl set_policy lazy-queue "^lazy_" '{"queue-mode":"lazy"}'
5.2 数据库中间件故障排查
# 1. 定位慢查询
# 查看Mycat慢查询统计
mysql -h127.0.0.1 -P9066 -u root -p -e "SHOW @@SQL.SLOW WHERE execute_time > 1000"
# 2. 分析数据节点健康状态
mysql -h127.0.0.1 -P9066 -u root -p -e "SHOW @@DATANODE WHERE active=0"
# 3. 连接池检查
mysql -h127.0.0.1 -P9066 -u root -p -e "SHOW @@DATASOURCE" | \
awk '{if($5>0.8*$4) print "连接池使用率过高:",$0}'
# 4. 分片均衡性检查
mysql -h127.0.0.1 -P9066 -u root -p -e "SHOW @@DATA_NODE" | \
awk '{count[$2]++; sum[$2]+=$5} END {for(i in count) print i,sum[i]/count[i]}'
5.3 Redis 常见故障与处理
# 场景1:缓存穿透 - 查询不存在的数据
# 解决方案:布隆过滤器 + 缓存空值
class CachePenetrationGuard:
def __init__(self, redis_client, bloom_filter):
self.redis = redis_client
self.bloom = bloom_filter
def get(self, key):
# 1. 布隆过滤器快速判断
if not self.bloom.exists(key):
return None
# 2. 从缓存获取
value = self.redis.get(key)
if value is not None:
return value
# 3. 缓存空值,防止穿透
if value is None:
# 设置短时间空值缓存
self.redis.setex(key, 60, "null")
return None
# 4. 从数据库加载
return self.load_from_db(key)
# 场景2:缓存雪崩 - 大量缓存同时过期
# 解决方案:过期时间加随机偏移
def set_with_jitter(key, value, base_ttl):
"""设置缓存,添加随机过期时间"""
jitter = random.randint(0, base_ttl // 10) # 10%随机偏移
actual_ttl = base_ttl + jitter
redis.setex(key, actual_ttl, value)
# 场景3:缓存击穿 - 热点key过期瞬间大量请求
# 解决方案:互斥锁
def get_with_mutex(key, load_func, mutex_ttl=10):
"""使用互斥锁防止缓存击穿"""
value = redis.get(key)
if value is not None:
return value
# 尝试获取锁
lock_key = f"{key}:lock"
if redis.setnx(lock_key, "1"):
redis.expire(lock_key, mutex_ttl)
try:
# 从数据库加载
value = load_func()
redis.setex(key, 3600, value)
return value
finally:
redis.delete(lock_key)
else:
# 等待其他线程加载
time.sleep(0.1)
return get_with_mutex(key, load_func, mutex_ttl)