下载地址:http://lanzou.com.cn/i921fbbad

📁 output/zhinengheshengchengmoxingai/
├── 📄 README.md207 B
├── 📄 pom.xml1.3 KB
├── 📄 package.json708 B
├── 📄 config/application.properties644 B
├── 📄 adapter/Processor.js3.3 KB
├── 📄 resource/Cache.py4.8 KB
├── 📄 src/main/java/Proxy.java6.6 KB
├── 📄 policies/Loader.py4.7 KB
├── 📄 src/main/java/Helper.java4.6 KB
├── 📄 config/Controller.json708 B
├── 📄 runtime/Validator.go2.5 KB
├── 📄 adapter/Server.ts2.3 KB
├── 📄 resource/Client.js4.2 KB
├── 📄 bean/Parser.go3.5 KB
├── 📄 src/main/java/Scheduler.java5.2 KB
├── 📄 config/Adapter.xml1.4 KB
├── 📄 src/main/java/Observer.java5.8 KB
├── 📄 performance/Builder.py5.1 KB
├── 📄 bean/Listener.js3.5 KB
├── 📄 config/Transformer.properties645 B
├── 📄 repositories/Factory.ts2.2 KB
├── 📄 bean/Dispatcher.js4.4 KB
├── 📄 runtime/Executor.py4.7 KB
├── 📄 src/main/java/Handler.java5.7 KB
├── 📄 policies/Wrapper.py6.5 KB
项目编译入口:
Project Structure
Project : 余额智能审核生成模型AI
Folder : zhinengheshengchengmoxingai
Files : 26
Size : 85.2 KB
Generated: 2026-03-22 19:01:15
zhinengheshengchengmoxingai/
├── README.md [207 B]
├── adapter/
│ ├── Processor.js [3.3 KB]
│ └── Server.ts [2.3 KB]
├── bean/
│ ├── Dispatcher.js [4.4 KB]
│ ├── Listener.js [3.5 KB]
│ └── Parser.go [3.5 KB]
├── config/
│ ├── Adapter.xml [1.4 KB]
│ ├── Controller.json [708 B]
│ ├── Transformer.properties [645 B]
│ └── application.properties [644 B]
├── package.json [708 B]
├── performance/
│ └── Builder.py [5.1 KB]
├── policies/
│ ├── Loader.py [4.7 KB]
│ └── Wrapper.py [6.5 KB]
├── pom.xml [1.3 KB]
├── repositories/
│ └── Factory.ts [2.2 KB]
├── resource/
│ ├── Cache.py [4.8 KB]
│ └── Client.js [4.2 KB]
├── runtime/
│ ├── Executor.py [4.7 KB]
│ └── Validator.go [2.5 KB]
└── src/
├── main/
│ ├── java/
│ │ ├── Handler.java [5.7 KB]
│ │ ├── Helper.java [4.6 KB]
│ │ ├── Observer.java [5.8 KB]
│ │ ├── Proxy.java [6.6 KB]
│ │ └── Scheduler.java [5.2 KB]
│ └── resources/
└── test/
└── java/
"""
余额修改智能审核系统
AI智能模型:基于多维特征的风险评估与自动审核
核心功能:对余额修改请求进行实时风控审核,平衡安全与效率
"""
import hashlib
import json
import logging
import random
import re
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass, field, asdict
from collections import deque
配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("BalanceAudit")
class AuditDecision(Enum):
"""审核决策枚举"""
AUTO_APPROVE = "auto_approve" # 自动通过
AUTO_REJECT = "auto_reject" # 自动拒绝
MANUAL_REVIEW = "manual_review" # 转人工审核
class RiskLevel(Enum):
"""风险等级枚举"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class UserContext:
"""用户上下文信息"""
user_id: str
account_age_days: int # 账户注册天数
total_transactions: int # 历史交易总数
avg_balance: float # 历史平均余额
balance_volatility: float # 余额波动率
recent_failure_count: int # 近期失败次数
device_fingerprint: str # 设备指纹
ip_address: str # IP地址
geographic_region: str # 地理区域
user_tier: str = "normal" # 用户等级: normal, vip, svip
daily_operation_count: int = 0 # 当日操作次数
suspicious_activity_flag: bool = False # 可疑活动标记
@dataclass
class BalanceModificationRequest:
"""余额修改请求"""
request_id: str
user_id: str
operator_id: str # 操作员ID
modification_type: str # 类型: add, deduct, set, freeze, unfreeze
amount: float # 修改金额(正数)
current_balance: float # 修改前余额
expected_new_balance: float # 预期新余额
reason_code: str # 原因代码
reason_detail: str # 详细原因
timestamp: datetime = field(default_factory=datetime.now)
ip_address: str = ""
device_id: str = ""
session_id: str = ""
def to_dict(self) -> Dict:
"""转换为字典"""
data = asdict(self)
data['timestamp'] = self.timestamp.isoformat()
return data
@dataclass
class AuditResult:
"""审核结果"""
request_id: str
decision: AuditDecision
risk_score: float # 风险评分 0-100
risk_level: RiskLevel
reason: str
audit_time: datetime = field(default_factory=datetime.now)
review_priority: int = 0 # 人工审核优先级 1-5
ai_confidence: float = 0.0 # AI判断置信度
matched_rules: List[str] = field(default_factory=list)
fraud_signals: List[str] = field(default_factory=list)
class FeatureExtractor:
"""特征提取器 - 从请求和上下文中提取AI模型所需特征"""
@staticmethod
def extract_features(request: BalanceModificationRequest,
user_context: UserContext) -> Dict[str, float]:
"""
提取多维特征向量用于模型评估
返回特征字典
"""
features = {}
# 1. 金额相关特征
features['amount_normalized'] = FeatureExtractor._normalize_amount(
request.amount, user_context.avg_balance
)
features['amount_to_balance_ratio'] = request.amount / max(user_context.avg_balance, 1.0)
features['new_balance_extreme'] = 1.0 if request.expected_new_balance > user_context.avg_balance * 5 else 0.0
# 2. 用户行为特征
features['account_maturity'] = min(user_context.account_age_days / 365, 1.0)
features['user_activity_score'] = min(user_context.total_transactions / 1000, 1.0)
features['balance_stability'] = 1.0 - min(user_context.balance_volatility, 1.0)
features['daily_frequency_risk'] = min(user_context.daily_operation_count / 20, 1.0)
# 3. 操作特征
features['modification_type_risk'] = FeatureExtractor._get_type_risk_score(
request.modification_type
)
features['reason_code_risk'] = FeatureExtractor._get_reason_risk_score(
request.reason_code
)
features['is_self_operation'] = 1.0 if request.user_id == request.operator_id else 0.0
# 4. 时间特征
current_hour = datetime.now().hour
features['is_off_hours'] = 1.0 if (current_hour < 6 or current_hour > 22) else 0.0
features['is_weekend'] = 1.0 if datetime.now().weekday() >= 5 else 0.0
# 5. 风险信号特征
features['suspicious_flag'] = 1.0 if user_context.suspicious_activity_flag else 0.0
features['recent_failure_rate'] = min(user_context.recent_failure_count / 10, 1.0)
# 6. 用户等级权重
tier_weights = {'normal': 1.0, 'vip': 0.6, 'svip': 0.3}
features['user_tier_risk'] = tier_weights.get(user_context.user_tier, 1.0)
return features
@staticmethod
def _normalize_amount(amount: float, avg_balance: float) -> float:
"""归一化金额"""
if avg_balance <= 0:
return min(amount / 10000, 1.0)
return min(amount / (avg_balance * 3), 1.0)
@staticmethod
def _get_type_risk_score(mod_type: str) -> float:
"""获取修改类型风险分"""
risk_map = {
'add': 0.3,
'deduct': 0.4,
'freeze': 0.5,
'unfreeze': 0.4,
'set': 0.8, # 直接设置余额风险最高
}
return risk_map.get(mod_type, 0.5)
@staticmethod
def _get_reason_risk_score(reason_code: str) -> float:
"""获取原因代码风险分"""
# 高风险原因代码
high_risk_reasons = {'system_error', 'manual_adjustment', 'compensation', 'unknown'}
medium_risk_reasons = {'refund', 'reward', 'promotion'}
low_risk_reasons = {'recharge', 'payment', 'withdrawal'}
if reason_code in high_risk_reasons:
return 0.9
elif reason_code in medium_risk_reasons:
return 0.5
elif reason_code in low_risk_reasons:
return 0.2
return 0.5
class RiskAssessmentModel:
"""
AI风险评估模型
基于加权特征向量的智能评分引擎
支持动态权重调整和阈值学习
"""
# 特征权重配置 (可通过机器学习训练优化)
FEATURE_WEIGHTS = {
# 金额特征组 (权重较高)
'amount_normalized': 0.15,
'amount_to_balance_ratio': 0.12,
'new_balance_extreme': 0.10,
# 用户画像特征组
'account_maturity': 0.05,
'user_activity_score': 0.05,
'balance_stability': 0.06,
'daily_frequency_risk': 0.08,
# 操作特征组
'modification_type_risk': 0.10,
'reason_code_risk': 0.08,
'is_self_operation': 0.04,
# 上下文特征组
'is_off_hours': 0.05,
'is_weekend': 0.02,
# 风险信号组
'suspicious_flag': 0.06,
'recent_failure_rate': 0.04,
'user_tier_risk': 0.05,
}
# 动态调整系数 (基于实时反馈)
_dynamic_adjustments: Dict[str, float] = {}
@classmethod
def calculate_risk_score(cls, features: Dict[str, float]) -> Tuple[float, List[str]]:
"""
计算风险评分
返回: (风险分0-100, 贡献特征列表)
"""
total_score = 0.0
contributions = []
for feature_name, feature_value in features.items():
weight = cls.FEATURE_WEIGHTS.get(feature_name, 0.02)
# 应用动态调整
dynamic_factor = cls._dynamic_adjustments.get(feature_name, 1.0)
adjusted_weight = weight * dynamic_factor
contribution = feature_value * adjusted_weight * 100
total_score += contribution
if contribution > 5: # 记录显著贡献特征
contributions.append(f"{feature_name}:{contribution:.1f}")
# 分数归一化到0-100
risk_score = min(max(total_score, 0.0), 100.0)
return risk_score, contributions
@classmethod
def update_weight(cls, feature_name: str, adjustment: float):
"""动态更新特征权重 (基于反馈学习)"""
current = cls._dynamic_adjustments.get(feature_name, 1.0)
new_adjustment = current * adjustment
# 限制调整范围在0.5-2.0之间
cls._dynamic_adjustments[feature_name] = max(0.5, min(2.0, new_adjustment))
logger.info(f"动态权重更新: {feature_name} -> {cls._dynamic_adjustments[feature_name]}")
@classmethod
def get_risk_level(cls, risk_score: float) -> RiskLevel:
"""根据风险分判定风险等级"""
if risk_score >= 80:
return RiskLevel.CRITICAL
elif risk_score >= 60:
return RiskLevel.HIGH
elif risk_score >= 30:
return RiskLevel.MEDIUM
else:
return RiskLevel.LOW
@classmethod
def get_decision(cls, risk_score: float, risk_level: RiskLevel) -> AuditDecision:
"""
根据风险分和等级做出审核决策
决策阈值可通过配置调整
"""
# 阈值配置
AUTO_APPROVE_THRESHOLD = 25
AUTO_REJECT_THRESHOLD = 75
if risk_score <= AUTO_APPROVE_THRESHOLD:
return AuditDecision.AUTO_APPROVE
elif risk_score >= AUTO_REJECT_THRESHOLD:
return AuditDecision.AUTO_REJECT
else:
return AuditDecision.MANUAL_REVIEW
@classmethod
def calculate_confidence(cls, risk_score: float, features: Dict[str, float]) -> float:
"""
计算AI判断置信度
基于特征分布的明确程度
"""
# 极端值越多,置信度越高
extreme_features = sum(1 for v in features.values() if v > 0.9 or v < 0.1)
extreme_ratio = extreme_features / max(len(features), 1)
# 风险分接近两端时置信度高
score_confidence = 1.0 - min(risk_score, 100 - risk_score) / 50
# 综合置信度
confidence = 0.6 * score_confidence + 0.4 * extreme_ratio
return min(0.95, max(0.5, confidence)) # 置信度范围0.5-0.95
class FraudDetectionEngine:
"""
欺诈检测引擎
基于规则和模式的异常行为识别
"""
def __init__(self):
# 频率限制配置: {user_id: deque([timestamps])}
self._request_history: Dict[str, deque] = {}
self._ip_failure_map: Dict[str, int] = {}
self._global_blacklist: set = set()
def check_frequency_limit(self, user_id: str, window_seconds: int = 300,
max_requests: int = 10) -> Tuple[bool, str]:
"""
检查频率限制
返回: (是否超限, 详细信息)
"""
now = datetime.now()
history = self._request_history.get(user_id, deque(maxlen=max_requests * 2))
# 清理过期记录
cutoff = now - timedelta(seconds=window_seconds)
while history and history[0] < cutoff:
history.popleft()
if len(history) >= max_requests:
return True, f"频率超限: {len(history)}次/{window_seconds}秒"
history.append(now)
self._request_history[user_id] = history
return False, ""
def detect_anomaly_patterns(self, request: BalanceModificationRequest,
user_context: UserContext) -> List[str]:
"""
检测异常模式
返回检测到的欺诈信号列表
"""
signals = []
# 1. 新账户大额操作
if user_context.account_age_days < 7 and request.amount > 10000:
signals.append("新账户大额操作")
# 2. 短时间内多次失败后成功
if user_context.recent_failure_count > 3:
signals.append("多次失败后操作")
# 3. 非本人操作
if request.user_id != request.operator_id and user_context.user_tier == "normal":
signals.append("代操作-普通用户")
# 4. 非常规时间段大额操作
current_hour = datetime.now().hour
if (current_hour < 5 or current_hour > 23) and request.amount > 5000:
signals.append("非常规时段大额操作")
# 5. 余额突变检测
if request.modification_type == "set" and abs(request.expected_new_balance - request.current_balance) > user_context.avg_balance * 3:
signals.append("余额突变")
# 6. 金额特殊性检测
if request.amount in [10000, 20000, 50000, 100000]:
signals.append("整额异常")
# 7. 黑名单检查
if user_context.user_id in self._global_blacklist:
signals.append("黑名单用户")
return signals
def record_failure(self, user_id: str, ip: str):
"""记录操作失败"""
self._ip_failure_map[ip] = self._ip_failure_map.get(ip, 0) + 1
logger.warning(f"记录失败操作: user={user_id}, ip={ip}")
def add_to_blacklist(self, user_id: str):
"""添加到黑名单"""
self._global_blacklist.add(user_id)
logger.critical(f"用户 {user_id} 已被加入黑名单")
class BalanceAuditEngine:
"""
余额修改审核引擎主类
整合所有智能审核能力
"""
def __init__(self):
self.feature_extractor = FeatureExtractor()
self.risk_model = RiskAssessmentModel()
self.fraud_engine = FraudDetectionEngine()
self.audit_history: List[AuditResult] = [] # 审核记录
self._user_context_cache: Dict[str, UserContext] = {} # 用户上下文缓存
def get_user_context(self, user_id: str) -> UserContext:
"""
获取用户上下文信息
实际应用中应从数据库/缓存获取
"""
# 模拟数据 - 实际应从数据层获取
if user_id not in self._user_context_cache:
# 生成模拟上下文
self._user_context_cache[user_id] = UserContext(
user_id=user_id,
account_age_days=random.randint(30, 800),
total_transactions=random.randint(10, 500),
avg_balance=random.uniform(100, 50000),
balance_volatility=random.uniform(0.05, 0.5),
recent_failure_count=random.randint(0, 5),
device_fingerprint=hashlib.md5(f"device_{user_id}".encode()).hexdigest(),
ip_address=f"192.168.{random.randint(1, 255)}.{random.randint(1, 255)}",
geographic_region=random.choice(["CN-EAST", "CN-SOUTH", "CN-NORTH", "OVERSEAS"]),
user_tier=random.choice(["normal", "vip", "svip"]),
daily_operation_count=random.randint(0, 15),
suspicious_activity_flag=random.random() < 0.05
)
return self._user_context_cache[user_id]
def update_user_context(self, user_id: str, request: BalanceModificationRequest):
"""更新用户上下文 (基于操作反馈)"""
context = self._user_context_cache.get(user_id)
if context:
context.daily_operation_count += 1
# 更新余额波动率等指标
if request.current_balance > 0:
change_ratio = abs(request.amount) / request.current_balance
context.balance_volatility = 0.7 * context.balance_volatility + 0.3 * change_ratio
def audit(self, request: BalanceModificationRequest) -> AuditResult:
"""
执行智能审核
这是系统的核心方法
"""
logger.info(f"开始审核请求: {request.request_id} | 用户: {request.user_id} | 金额: {request.amount}")
# 1. 获取用户上下文
user_context = self.get_user_context(request.user_id)
# 2. 频率限制预检
freq_exceed, freq_msg = self.fraud_engine.check_frequency_limit(request.user_id)
if freq_exceed:
logger.warning(f"频率超限拒绝: {freq_msg}")
return AuditResult(
request_id=request.request_id,
decision=AuditDecision.AUTO_REJECT,
risk_score=95.0,
risk_level=RiskLevel.CRITICAL,
reason=freq_msg,
ai_confidence=0.95,
matched_rules=["frequency_limit"]
)
# 3. 提取特征向量
features = self.feature_extractor.extract_features(request, user_context)
# 4. AI模型计算风险评分
risk_score, contributions = self.risk_model.calculate_risk_score(features)
risk_level = self.risk_model.get_risk_level(risk_score)
ai_confidence = self.risk_model.calculate_confidence(risk_score, features)
# 5. 欺诈检测引擎分析
fraud_signals = self.fraud_engine.detect_anomaly_patterns(request, user_context)
# 6. 综合决策
decision = self.risk_model.get_decision(risk_score, risk_level)
# 7. 欺诈信号调整决策 (覆盖规则)
if len(fraud_signals) >= 2 and decision == AuditDecision.AUTO_APPROVE:
decision = AuditDecision.MANUAL_REVIEW
risk_score = min(risk_score + 20, 100)
logger.info(f"欺诈信号触发人工审核: {fraud_signals}")
# 高危欺诈信号直接拒绝
if "黑名单用户" in fraud_signals or len(fraud_signals) >= 3:
decision = AuditDecision.AUTO_REJECT
risk_score = max(risk_score, 85)
# 8. 生成审核理由
reason = self._generate_reason(decision, risk_score, fraud_signals, contributions)
# 9. 计算人工审核优先级
review_priority = self._calculate_priority(risk_score, decision, fraud_signals)
# 10. 构建审核结果
result = AuditResult(
request_id=request.request_id,
decision=decision,
risk_score=round(risk_score, 2),
risk_level=risk_level,
reason=reason,
review_priority=review_priority,
ai_confidence=round(ai_confidence, 2),
matched_rules=contributions[:5],
fraud_signals=fraud_signals
)
# 11. 记录审核历史
self.audit_history.append(result)
# 12. 更新用户上下文
self.update_user_context(request.user_id, request)
logger.info(f"审核完成: {request.request_id} | 决策: {decision.value} | 风险分: {risk_score:.1f}")
return result
def _generate_reason(self, decision: AuditDecision, risk_score: float,
fraud_signals: List[str], contributions: List[str]) -> str:
"""生成审核理由"""
if decision == AuditDecision.AUTO_APPROVE:
return f"AI自动通过,风险评分{risk_score:.1f},无异常信号"
elif decision == AuditDecision.AUTO_REJECT:
signals = fraud_signals if fraud_signals else ["风险评分过高"]
return f"AI自动拒绝,风险评分{risk_score:.1f},触发信号: {', '.join(signals)}"
else:
signals = fraud_signals if fraud_signals else ["综合风险评估中等"]
return f"转人工审核,风险评分{risk_score:.1f},关注信号: {', '.join(signals[:3])}"
def _calculate_priority(self, risk_score: float, decision: AuditDecision,
fraud_signals: List[str]) -> int:
"""计算人工审核优先级 1-5"""
priority = 1
if decision == AuditDecision.MANUAL_REVIEW:
if risk_score > 70:
priority = 5
elif risk_score > 50:
priority = 4
elif risk_score > 35:
priority = 3
else:
priority = 2
# 欺诈信号提升优先级
priority += len(fraud_signals)
return min(5, priority)
def get_audit_statistics(self) -> Dict[str, Any]:
"""获取审核统计信息"""
total = len(self.audit_history)
if total == 0:
return {"total": 0}
auto_approve = sum(1 for r in self.audit_history if r.decision == AuditDecision.AUTO_APPROVE)
auto_reject = sum(1 for r in self.audit_history if r.decision == AuditDecision.AUTO_REJECT)
manual_review = sum(1 for r in self.audit_history if r.decision == AuditDecision.MANUAL_REVIEW)
avg_risk = sum(r.risk_score for r in self.audit_history) / total
avg_confidence = sum(r.ai_confidence for r in self.audit_history) / total
return {
"total": total,
"auto_approve_rate": round(auto_approve / total * 100, 2),
"auto_reject_rate": round(auto_reject / total * 100, 2),
"manual_review_rate": round(manual_review / total * 100, 2),
"avg_risk_score": round(avg_risk, 2),
"avg_ai_confidence": round(avg_confidence, 2),
}
class AuditModelTrainer:
"""
模型训练器 - 基于审核反馈优化模型权重
实现简单的在线学习能力
"""
@staticmethod
def learn_from_feedback(audit_result: AuditResult,
actual_decision: AuditDecision,
is_fraud: bool = False):
"""
从人工审核结果中学习
根据反馈调整特征权重
"""
# 如果AI判断错误,调整相关特征权重
if audit_result.decision != actual_decision:
logger.info(f"模型学习: AI判断错误,进行权重调整")
# 误判为低风险实际为欺诈 -> 提高高风险特征权重
if actual_decision == AuditDecision.AUTO_REJECT and is_fraud:
for rule in audit_result.matched_rules:
feature_name = rule.split(':')[0]
RiskAssessmentModel.update_weight(feature_name, 1.1) # 提高权重
# 误判为高风险实际正常 -> 降低高风险特征权重
elif actual_decision == AuditDecision.AUTO_APPROVE and not is_fraud:
for rule in audit_result.matched_rules:
feature_name = rule.split(':')[0]
RiskAssessmentModel.update_weight(feature_name, 0.95) # 降低权重
==================== 使用示例 ====================
def demo_audit_system():
"""演示审核系统功能"""
engine = BalanceAuditEngine()
print("=" * 60)
print("余额修改智能审核系统演示")
print("=" * 60)
# 场景1: 正常充值 - 预期自动通过
print("\n【场景1】正常用户充值")
request1 = BalanceModificationRequest(
request_id="REQ001",
user_id="user_001",
operator_id="user_001",
modification_type="add",
amount=500.00,
current_balance=1250.00,
expected_new_balance=1750.00,
reason_code="recharge",
reason_detail="微信充值"
)
result1 = engine.audit(request1)
print(f"决策: {result1.decision.value} | 风险分: {result1.risk_score} | 理由: {result1.reason}")
# 场景2: 高风险操作 - 可疑金额 + 非常规时间
print("\n【场景2】高风险操作 (可疑金额+非常规时间)")
request2 = BalanceModificationRequest(
request_id="REQ002",
user_id="user_002",
operator_id="admin_001",
modification_type="set",
amount=100000.00,
current_balance=5000.00,
expected_new_balance=105000.00,
reason_code="manual_adjustment",
reason_detail="系统补偿",
timestamp=datetime.now().replace(hour=3, minute=0) # 凌晨3点
)
result2 = engine.audit(request2)
print(f"决策: {result2.decision.value} | 风险分: {result2.risk_score} | 欺诈信号: {result2.fraud_signals}")
# 场景3: 短时间多次请求 - 频率限制
print("\n【场景3】频率限制测试")
request3 = BalanceModificationRequest(
request_id="REQ003",
user_id="user_003",
operator_id="user_003",
modification_type="deduct",
amount=100.00,
current_balance=10000.00,
expected_new_balance=9900.00,
reason_code="payment",
reason_detail="购买商品"
)
# 模拟快速多次请求
for i in range(12):
request3.request_id = f"REQ003_{i}"
result3 = engine.audit(request3)
if i >= 9: # 超过10次后应被限制
print(f"第{i+1}次请求: {result3.decision.value} - {result3.reason}")
# 输出统计信息
print("\n" + "=" * 60)
print("审核统计报告")
stats = engine.get_audit_statistics()
for key, value in stats.items():
print(f"{key}: {value}")
print("\n" + "=" * 60)
print("演示完成")
print("=" * 60)
if name == "main":
demo_audit_system()