程序员进阶工程师必备技能之性能、稳定性与安全优化(二)

简介: 教程来源 https://unbgv.cn/ 本文系统介绍稳定性保障核心实践:涵盖SLA指标定义、超时重试与断路器容错、令牌桶限流与服务降级、幂等性设计,以及基于混沌工程的故障演练方法,全面提升系统可靠性与韧性。

二、稳定性保障:让系统更可靠

2.1 稳定性核心指标
image.png
SLA对应允许停机时间:
image.png
2.2 容错设计模式
2.2.1 超时与重试

class ResilientClient:
    """具备容错能力的客户端"""

    def __init__(self, timeout=5, max_retries=3, backoff_factor=2):
        self.timeout = timeout
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor

    async def call_with_retry(self, func, *args, **kwargs):
        """带超时和重试的调用"""
        last_exception = None

        for attempt in range(self.max_retries):
            try:
                # 设置超时
                result = await asyncio.wait_for(
                    func(*args, **kwargs),
                    timeout=self.timeout
                )
                return result

            except asyncio.TimeoutError:
                last_exception = TimeoutError(f"Timeout after {self.timeout}s")
                logging.warning(f"Attempt {attempt + 1} timed out")

            except Exception as e:
                last_exception = e
                logging.warning(f"Attempt {attempt + 1} failed: {e}")

            # 最后一次不等待
            if attempt < self.max_retries - 1:
                wait_time = self.backoff_factor ** attempt
                logging.info(f"Retrying in {wait_time}s...")
                await asyncio.sleep(wait_time)

        raise last_exception

# 使用断路器模式防止雪崩
class CircuitBreaker:
    """断路器:防止级联故障"""

    def __init__(self, failure_threshold=5, timeout=60, half_open_requests=3):
        self.failure_threshold = failure_threshold   # 失败阈值
        self.timeout = timeout                       # 恢复超时
        self.half_open_requests = half_open_requests  # 半开状态请求数

        self.failures = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
        self.half_open_successes = 0

    async def call(self, func, *args, **kwargs):
        """带断路器保护的调用"""
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "HALF_OPEN"
                self.half_open_successes = 0
                logging.info("Circuit breaker moved to HALF_OPEN")
            else:
                raise CircuitBreakerOpenError("Circuit breaker is OPEN")

        try:
            result = await func(*args, **kwargs)

            if self.state == "HALF_OPEN":
                self.half_open_successes += 1
                if self.half_open_successes >= self.half_open_requests:
                    self.state = "CLOSED"
                    self.failures = 0
                    logging.info("Circuit breaker closed (recovered)")

            return result

        except Exception as e:
            self.failures += 1
            self.last_failure_time = time.time()

            if self.state == "CLOSED" and self.failures >= self.failure_threshold:
                self.state = "OPEN"
                logging.error(f"Circuit breaker opened after {self.failures} failures")

            raise e

2.2.2 限流与降级

class RateLimiter:
    """令牌桶限流器"""

    def __init__(self, rate=100, capacity=200):
        self.rate = rate          # 令牌生成速率(每秒)
        self.capacity = capacity  # 桶容量
        self.tokens = capacity    # 当前令牌数
        self.last_refill = time.time()
        self._lock = threading.Lock()

    def acquire(self, tokens=1):
        """获取令牌,成功返回True"""
        with self._lock:
            self._refill()

            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now

# 限流装饰器
def rate_limit(rate=100, capacity=200):
    limiter = RateLimiter(rate, capacity)

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if not limiter.acquire():
                raise RateLimitExceededError("Rate limit exceeded")
            return func(*args, **kwargs)
        return wrapper
    return decorator

# 降级策略
class DegradationStrategy:
    """服务降级策略"""

    def __init__(self, fallback_func):
        self.fallback_func = fallback_func
        self.degraded = False
        self.degraded_until = None

    @property
    def is_degraded(self):
        if self.degraded_until and time.time() > self.degraded_until:
            self.degraded = False
            self.degraded_until = None
        return self.degraded

    def degrade(self, duration=60):
        """进入降级模式"""
        self.degraded = True
        self.degraded_until = time.time() + duration
        logging.warning(f"Service degraded for {duration}s")

    def execute(self, primary_func, *args, **kwargs):
        """执行主逻辑或降级逻辑"""
        if self.is_degraded:
            return self.fallback_func(*args, **kwargs)

        try:
            return primary_func(*args, **kwargs)
        except Exception as e:
            logging.error(f"Primary function failed: {e}")
            self.degrade()
            return self.fallback_func(*args, **kwargs)

# 使用示例
def get_recommendations_fallback(user_id):
    """降级方案:返回默认推荐"""
    return DEFAULT_RECOMMENDATIONS

@rate_limit(rate=50, capacity=100)
def get_recommendations(user_id):
    recommendations = recommendation_service.get(user_id)
    return recommendations

# 包装降级
strategy = DegradationStrategy(get_recommendations_fallback)
result = strategy.execute(get_recommendations, user_id)

2.2.3 幂等性设计

class IdempotentHandler:
    """幂等性处理器"""

    def __init__(self, redis_client, ttl=86400):
        self.redis = redis_client
        self.ttl = ttl

    def idempotent(self, key_prefix):
        """幂等性装饰器"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # 生成唯一请求ID
                request_id = kwargs.get('request_id') or str(uuid.uuid4())
                idempotent_key = f"{key_prefix}:{request_id}"

                # 检查是否已处理
                result = self.redis.get(idempotent_key)
                if result is not None:
                    logging.info(f"Duplicate request: {request_id}")
                    return json.loads(result)

                # 执行处理
                result = func(*args, **kwargs)

                # 缓存结果
                self.redis.setex(
                    idempotent_key,
                    self.ttl,
                    json.dumps(result)
                )

                return result
            return wrapper
        return decorator

# 使用示例
handler = IdempotentHandler(redis_client)

@handler.idempotent("payment")
def process_payment(order_id, amount, request_id=None):
    """处理支付 - 同一request_id只会执行一次"""
    # 扣款逻辑
    return {"status": "success", "transaction_id": str(uuid.uuid4())}

2.3 故障演练与混沌工程

# 混沌实验配置
class ChaosExperiment:
    """混沌实验执行器"""

    def __init__(self):
        self.experiments = []

    def add_latency_injection(self, service, latency_ms=100, probability=0.1):
        """注入延迟"""
        self.experiments.append({
            "type": "latency",
            "service": service,
            "latency_ms": latency_ms,
            "probability": probability
        })

    def add_error_injection(self, service, error_code=500, probability=0.05):
        """注入错误"""
        self.experiments.append({
            "type": "error",
            "service": service,
            "error_code": error_code,
            "probability": probability
        })

    def execute(self):
        """执行混沌实验"""
        for exp in self.experiments:
            asyncio.create_task(self._run_experiment(exp))

    async def _run_experiment(self, exp):
        while True:
            if random.random() < exp["probability"]:
                await self._inject_fault(exp)
            await asyncio.sleep(1)

    async def _inject_fault(self, exp):
        if exp["type"] == "latency":
            # 注入网络延迟
            await asyncio.sleep(exp["latency_ms"] / 1000)
        elif exp["type"] == "error":
            # 注入错误响应
            raise HTTPException(status_code=exp["error_code"])

# 在测试环境运行混沌实验
if os.getenv("CHAOS_ENABLED") == "true":
    chaos = ChaosExperiment()
    chaos.add_latency_injection("database", latency_ms=200, probability=0.05)
    chaos.add_error_injection("payment_service", error_code=503, probability=0.02)
    chaos.execute()

来源:
https://oieaw.cn/

相关文章
|
10天前
|
缓存 运维 安全
程序员进阶工程师必备技能之性能、稳定性与安全优化(一)
教程来源 https://qcycj.cn/ 本文聚焦系统三大生命线——性能、稳定性和安全性,揭示其对用户体验与业务存续的决定性影响。通过代码、架构、运维三层优化方法论,系统性提升响应速度、故障抵御力与数据防护力,构建高可用数字底座。
|
10天前
|
运维 监控 中间件
程序员进阶工程师必备技能之中间件深度使用与运维(二)
教程来源 https://bncne.cn/ 本节深入解析数据库中间件架构原理与MyCat实战:涵盖MySQL协议兼容、智能SQL路由、三级连接池;详解schema.xml/rule.xml配置、哈希/时间分片策略;实现读写分离、负载均衡及全链路运维监控与性能优化。
|
8天前
|
SQL 安全 算法
软件开发进阶技能之性能与安全调优(四)
本节聚焦安全调优:严守最小权限原则,缩减攻击面。涵盖SQL注入(禁用拼接,用PreparedStatement/MyBatis #{})、XSS(输出编码+框架自动转义)、命令注入(白名单校验);JWT安全(强密钥、合理过期、防算法混淆);会话固定防护与bcrypt加盐哈希密码存储。
|
10天前
|
缓存 安全 程序员
程序员进阶工程师必备技能之性能、稳定性与安全优化(四)
教程来源 https://vbzcj.cn/ 本文详解性能压测与容量规划实战:基于Locust实现登录、浏览、加购、下单全链路压测;提供QPS计算、服务器/数据库/缓存容量公式,并结合双十一场景示例。辅以优化优先级矩阵,强调性能、稳定、安全三位一体的系统工程思维。
|
10天前
|
SQL 安全 程序员
程序员进阶工程师必备技能之性能、稳定性与安全优化(三)
教程来源 https://xcfsr.cn/ 本节系统梳理Web、认证、数据及基础设施四层安全威胁,详解SQL注入、XSS、JWT攻击等防御方案,涵盖参数化查询、HTML转义、短时效令牌、字段加密、动态脱敏、API签名与速率限制等实战措施,全面提升系统安全性。
|
10天前
|
Arthas 运维 Java
程序员必备的十大技能(进阶版)之性能调优与故障排查(二)
教程来源 qfcrz.cn 本节系统梳理Java内存问题排查全流程:涵盖JVM内存结构(堆、元空间、直接内存等)、四大典型泄漏场景(静态集合、ThreadLocal、监听器、动态代理)、jstat/jmap/jcmd/Arthas等实战工具用法、MAT深度分析技巧(Dominator Tree、OQL查询),以及GC调优策略与I/O问题定位方法。
|
10天前
|
运维 Java 程序员
程序员必备的十大技能(进阶版)之性能调优与故障排查(一)
教程来源 https://qeext.cn/ 本文系统讲解性能调优与故障排查核心技能,涵盖故障方法论、CPU/内存/I/O/网络/数据库问题定位、Java诊断工具(Arthas/JVM)、全链路压测及混沌工程,辅以实战案例与黄金排查原则,助开发者从“重启党”进阶为问题终结者。
|
10天前
|
运维 监控 程序员
程序员必备的十大技能(进阶版)之性能调优与故障排查(三)
教程来源 https://bgnno.cn/ 本节系统讲解网络与数据库性能问题排查:涵盖TCP连接状态分析、TIME_WAIT优化、延迟诊断、连接池配置;以及慢查询定位、索引失效识别、事务锁监控和连接池调优,助力高效定位并解决生产环境常见瓶颈。
|
10天前
|
运维 Java 测试技术
程序员必备的十大技能(进阶版)之性能调优与故障排查(四)
教程来源 https://aescc.cn/ 本节系统介绍Java应用性能分析与稳定性保障核心实践:涵盖Arthas诊断、火焰图性能剖析、全链路压测方法论及JMeter实战、混沌工程故障注入(ChaosBlade),并复盘Full GC与数据库死锁两大典型故障案例,助力高效定位与解决线上性能问题。
|
2月前
|
存储 安全 算法
【分布式】分布式一致性协议:2PC/3PC、Paxos、Raft、ZAB 核心原理、区别(2026必考Raft)
本文系统梳理分布式一致性核心理论与四大协议(2PC/3PC、Paxos、Raft、ZAB),聚焦原理、差异及工程实践。重点强化2026年必考的Raft协议——涵盖Leader选举、日志复制、安全性机制、快照与成员变更等核心考点,构建完整、可落地的知识体系。