下载地址:http://lanzou.com.cn/i09343b5f

"""
农业余额审核训练系统 (Agricultural Balance Audit Training System)
一个用于模拟和训练AI审核农业账户余额变化的系统。
包含数据生成、审核模型、训练与评估模块。
"""
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.preprocessing import StandardScaler
import joblib
import os
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
==================== 1. 数据模拟模块 ====================
class AgriculturalDataSimulator:
"""模拟农业账户余额及交易数据"""
def __init__(self, random_seed=42):
self.random_seed = random_seed
np.random.seed(random_seed)
# 农业类型及对应的正常波动范围
self.agriculture_types = ['种植业', '畜牧业', '渔业', '林业', '混合农业']
self.seasonal_factors = {
'种植业': {'春季': 1.2, '夏季': 1.5, '秋季': 2.0, '冬季': 0.8},
'畜牧业': {'春季': 1.1, '夏季': 1.2, '秋季': 1.1, '冬季': 1.0},
'渔业': {'春季': 1.1, '夏季': 1.4, '秋季': 1.3, '冬季': 0.9},
'林业': {'春季': 1.0, '夏季': 1.2, '秋季': 1.3, '冬季': 0.9},
'混合农业': {'春季': 1.1, '夏季': 1.3, '秋季': 1.4, '冬季': 0.9}
}
def _get_season(self, date):
"""根据日期获取季节"""
month = date.month
if month in [3, 4, 5]:
return '春季'
elif month in [6, 7, 8]:
return '夏季'
elif month in [9, 10, 11]:
return '秋季'
else:
return '冬季'
def generate_accounts(self, n_accounts=1000):
"""生成农业账户基础信息"""
accounts = []
for i in range(n_accounts):
acc_type = np.random.choice(self.agriculture_types)
# 基础余额:根据农业类型不同有所差异
base_balance = np.random.uniform(
50000 if acc_type in ['种植业', '混合农业'] else 30000,
500000 if acc_type in ['种植业', '畜牧业'] else 200000
)
accounts.append({
'account_id': f'AGRI_{i+1:05d}',
'agriculture_type': acc_type,
'region': np.random.choice(['华北', '华东', '华南', '西南', '西北', '东北']),
'farm_size': np.random.choice(['小型', '中型', '大型'], p=[0.4, 0.4, 0.2]),
'base_balance': base_balance,
'credit_score': np.random.randint(300, 850),
'established_years': np.random.randint(1, 30)
})
return pd.DataFrame(accounts)
def generate_transactions(self, accounts_df, months=12):
"""为账户生成月度交易流水和余额变化"""
all_records = []
end_date = datetime(2024, 12, 31)
for _, account in accounts_df.iterrows():
acc_id = account['account_id']
acc_type = account['agriculture_type']
base_balance = account['base_balance']
current_balance = base_balance
for m in range(months):
date = end_date - timedelta(days=30 * m)
season = self._get_season(date)
seasonal_factor = self.seasonal_factors[acc_type][season]
# 正常波动:基于季节和农业类型
normal_fluctuation = np.random.normal(0, 0.1) * seasonal_factor
# 随机经营收入/支出
income = np.random.gamma(2, 5000) * (0.5 + seasonal_factor * 0.5)
expense = np.random.gamma(2, 4000) * (0.6 + seasonal_factor * 0.4)
# 是否异常(约5%的概率产生异常交易)
is_anomaly = np.random.random() < 0.05
anomaly_amount = 0
anomaly_type = 'normal'
if is_anomaly:
anomaly_type = np.random.choice(['sudden_withdraw', 'unusual_income', 'rapid_fluctuation'], p=[0.5, 0.3, 0.2])
if anomaly_type == 'sudden_withdraw':
anomaly_amount = -np.random.uniform(50000, 200000)
elif anomaly_type == 'unusual_income':
anomaly_amount = np.random.uniform(50000, 150000)
else: # rapid_fluctuation
anomaly_amount = np.random.uniform(-80000, 80000)
net_change = income - expense + anomaly_amount
new_balance = current_balance + net_change
# 标记是否审核异常
audit_label = 1 if is_anomaly else 0
record = {
'account_id': acc_id,
'date': date.strftime('%Y-%m-%d'),
'month': date.month,
'season': season,
'agriculture_type': acc_type,
'region': account['region'],
'farm_size': account['farm_size'],
'credit_score': account['credit_score'],
'established_years': account['established_years'],
'previous_balance': current_balance,
'income': income,
'expense': expense,
'anomaly_amount': anomaly_amount,
'anomaly_type': anomaly_type if is_anomaly else 'normal',
'net_change': net_change,
'new_balance': new_balance,
'balance_change_ratio': net_change / (current_balance + 1),
'audit_label': audit_label
}
all_records.append(record)
current_balance = new_balance
return pd.DataFrame(all_records)
==================== 2. 特征工程模块 ====================
class FeatureEngineer:
"""特征工程:从原始数据中提取审核特征"""
@staticmethod
def create_features(df):
"""创建用于审核模型的特征"""
# 复制数据避免修改原数据
features_df = df.copy()
# 时间特征
features_df['month_sin'] = np.sin(2 * np.pi * features_df['month'] / 12)
features_df['month_cos'] = np.cos(2 * np.pi * features_df['month'] / 12)
# 季节编码
season_map = {'春季': 0, '夏季': 1, '秋季': 2, '冬季': 3}
features_df['season_code'] = features_df['season'].map(season_map)
# 农业类型编码
agri_map = {t: i for i, t in enumerate(features_df['agriculture_type'].unique())}
features_df['agri_type_code'] = features_df['agriculture_type'].map(agri_map)
# 地区编码
region_map = {r: i for i, r in enumerate(features_df['region'].unique())}
features_df['region_code'] = features_df['region'].map(region_map)
# 农场规模编码
size_map = {'小型': 0, '中型': 1, '大型': 2}
features_df['farm_size_code'] = features_df['farm_size'].map(size_map)
# 创建衍生特征
features_df['income_expense_ratio'] = features_df['income'] / (features_df['expense'] + 1)
features_df['balance_stress'] = features_df['new_balance'] / (features_df['credit_score'] + 1)
features_df['volatility'] = features_df.groupby('account_id')['balance_change_ratio'].transform(
lambda x: x.rolling(3, min_periods=1).std().fillna(0)
)
# 历史平均余额变化率(滞后特征)
features_df['lag_1_change'] = features_df.groupby('account_id')['balance_change_ratio'].shift(1).fillna(0)
features_df['lag_2_change'] = features_df.groupby('account_id')['balance_change_ratio'].shift(2).fillna(0)
# 累积异常分数(基于阈值)
features_df['rolling_anomaly_score'] = features_df.groupby('account_id')['balance_change_ratio'].transform(
lambda x: (x.abs() > x.std() * 2).rolling(3, min_periods=1).sum().fillna(0)
)
# 选择最终特征列
feature_columns = [
'credit_score', 'established_years', 'previous_balance', 'new_balance',
'income', 'expense', 'balance_change_ratio', 'month_sin', 'month_cos',
'season_code', 'agri_type_code', 'region_code', 'farm_size_code',
'income_expense_ratio', 'balance_stress', 'volatility',
'lag_1_change', 'lag_2_change', 'rolling_anomaly_score'
]
return features_df[feature_columns], features_df['audit_label']
==================== 3. 审核模型模块 ====================
class AgriculturalAuditModel:
"""农业余额审核模型(基于随机森林)"""
def __init__(self):
self.model = RandomForestClassifier(
n_estimators=100,
max_depth=15,
min_samples_split=10,
min_samples_leaf=5,
class_weight='balanced',
random_state=42,
n_jobs=-1
)
self.scaler = StandardScaler()
self.is_trained = False
self.feature_names = None
def train(self, X, y):
"""训练模型"""
# 记录特征名
self.feature_names = X.columns.tolist()
# 标准化
X_scaled = self.scaler.fit_transform(X)
# 划分训练验证集
X_train, X_val, y_train, y_val = train_test_split(
X_scaled, y, test_size=0.2, random_state=42, stratify=y
)
# 训练
self.model.fit(X_train, y_train)
self.is_trained = True
# 验证评估
y_pred = self.model.predict(X_val)
print("\n========== 模型训练评估 ==========")
print(f"准确率: {accuracy_score(y_val, y_pred):.4f}")
print("\n分类报告:")
print(classification_report(y_val, y_pred, target_names=['正常', '异常']))
print("混淆矩阵:")
print(confusion_matrix(y_val, y_pred))
# 特征重要性
importances = self.model.feature_importances_
indices = np.argsort(importances)[::-1][:10]
print("\nTop 10 重要特征:")
for i, idx in enumerate(indices):
print(f" {i+1}. {self.feature_names[idx]}: {importances[idx]:.4f}")
return self
def predict(self, X):
"""预测异常交易"""
if not self.is_trained:
raise ValueError("模型尚未训练,请先调用train()方法")
X_scaled = self.scaler.transform(X)
return self.model.predict(X_scaled)
def predict_proba(self, X):
"""预测概率"""
if not self.is_trained:
raise ValueError("模型尚未训练,请先调用train()方法")
X_scaled = self.scaler.transform(X)
return self.model.predict_proba(X_scaled)
def save_model(self, path='agricultural_audit_model.pkl'):
"""保存模型"""
model_data = {
'model': self.model,
'scaler': self.scaler,
'feature_names': self.feature_names,
'is_trained': self.is_trained
}
joblib.dump(model_data, path)
print(f"模型已保存至: {path}")
def load_model(self, path='agricultural_audit_model.pkl'):
"""加载模型"""
model_data = joblib.load(path)
self.model = model_data['model']
self.scaler = model_data['scaler']
self.feature_names = model_data['feature_names']
self.is_trained = model_data['is_trained']
print(f"模型已从 {path} 加载")
==================== 4. 审核训练系统主类 ====================
class AgriculturalAuditTrainingSystem:
"""农业余额审核训练系统主控制器"""
def __init__(self):
self.simulator = AgriculturalDataSimulator()
self.feature_engineer = FeatureEngineer()
self.model = AgriculturalAuditModel()
self.accounts = None
self.transactions = None
self.X = None
self.y = None
def generate_data(self, n_accounts=1000, months=12):
"""生成训练数据"""
print(f"正在生成 {n_accounts} 个农业账户,{months} 个月的历史数据...")
self.accounts = self.simulator.generate_accounts(n_accounts)
self.transactions = self.simulator.generate_transactions(self.accounts, months)
print(f"数据生成完成,共 {len(self.transactions)} 条交易记录")
# 统计异常比例
anomaly_rate = self.transactions['audit_label'].mean()
print(f"异常交易比例: {anomaly_rate:.2%}")
return self.transactions
def prepare_features(self):
"""准备特征和标签"""
if self.transactions is None:
raise ValueError("请先调用 generate_data() 生成数据")
print("\n正在进行特征工程...")
self.X, self.y = self.feature_engineer.create_features(self.transactions)
print(f"特征维度: {self.X.shape}")
print(f"特征列表: {list(self.X.columns)}")
return self.X, self.y
def train(self):
"""训练审核模型"""
if self.X is None or self.y is None:
raise ValueError("请先调用 prepare_features() 准备特征")
print("\n开始训练审核模型...")
self.model.train(self.X, self.y)
def evaluate_on_new_data(self, n_new_accounts=200, months=6):
"""使用新生成的数据评估模型性能"""
print("\n========== 在新数据上评估模型 ==========")
# 生成新数据
new_accounts = self.simulator.generate_accounts(n_new_accounts)
new_transactions = self.simulator.generate_transactions(new_accounts, months)
# 特征提取
X_new, y_new = self.feature_engineer.create_features(new_transactions)
# 预测
y_pred = self.model.predict(X_new)
y_proba = self.model.predict_proba(X_new)[:, 1]
# 评估
print(f"准确率: {accuracy_score(y_new, y_pred):.4f}")
print("\n分类报告:")
print(classification_report(y_new, y_pred, target_names=['正常', '异常']))
# 添加预测结果到数据中用于展示
new_transactions['predicted_label'] = y_pred
new_transactions['anomaly_score'] = y_proba
# 展示一些高风险的异常案例
high_risk = new_transactions[new_transactions['predicted_label'] == 1].nlargest(5, 'anomaly_score')
if len(high_risk) > 0:
print("\n高风险异常交易示例:")
display_cols = ['account_id', 'date', 'agriculture_type', 'previous_balance',
'net_change', 'new_balance', 'anomaly_score', 'anomaly_type']
print(high_risk[display_cols].to_string(index=False))
return new_transactions
def batch_audit(self, transactions_df):
"""批量审核新交易数据"""
if not self.model.is_trained:
raise ValueError("模型未训练,请先调用 train()")
print("\n执行批量审核...")
X_audit, _ = self.feature_engineer.create_features(transactions_df)
predictions = self.model.predict(X_audit)
probabilities = self.model.predict_proba(X_audit)[:, 1]
results = transactions_df.copy()
results['audit_result'] = ['异常' if p == 1 else '正常' for p in predictions]
results['confidence'] = probabilities
# 统计审核结果
print(f"审核完成,共审核 {len(results)} 条记录")
print(f"异常记录数: {results['audit_result'].value_counts().get('异常', 0)}")
print(f"正常记录数: {results['audit_result'].value_counts().get('正常', 0)}")
return results
def save_system(self, path_prefix='agricultural_audit'):
"""保存整个系统状态"""
self.model.save_model(f"{path_prefix}_model.pkl")
# 保存特征信息
if self.X is not None:
feature_info = {
'feature_names': self.X.columns.tolist(),
'feature_count': len(self.X.columns)
}
joblib.dump(feature_info, f"{path_prefix}_features.pkl")
print("系统状态已保存")
def load_system(self, path_prefix='agricultural_audit'):
"""加载整个系统状态"""
self.model.load_model(f"{path_prefix}_model.pkl")
# 加载特征信息(可选)
if os.path.exists(f"{path_prefix}_features.pkl"):
feature_info = joblib.load(f"{path_prefix}_features.pkl")
print(f"已加载特征信息: {feature_info['feature_count']} 个特征")
print("系统状态已加载")
==================== 5. 主程序入口 ====================
def main():
"""主程序:演示农业余额审核训练系统的完整流程"""
print("=" 60)
print(" 农业余额审核训练系统 (Agricultural Balance Audit Training System)")
print("=" 60)
# 初始化系统
system = AgriculturalAuditTrainingSystem()
# 1. 生成训练数据
print("\n【步骤1】生成训练数据")
transactions = system.generate_data(n_accounts=1500, months=12)
print(f"数据样本预览:")
print(transactions.head(10).to_string())
# 2. 特征工程
print("\n【步骤2】特征工程")
X, y = system.prepare_features()
# 3. 训练模型
print("\n【步骤3】训练审核模型")
system.train()
# 4. 保存系统
print("\n【步骤4】保存系统")
system.save_system()
# 5. 在新数据上评估
print("\n【步骤5】在新数据上评估模型")
new_results = system.evaluate_on_new_data(n_new_accounts=300, months=3)
# 6. 演示批量审核
print("\n【步骤6】演示批量审核功能")
# 模拟一批待审核的交易数据
sample_audit_data = new_results.sample(20, random_state=42).drop(columns=['audit_label', 'predicted_label', 'anomaly_score'], errors='ignore')
audit_results = system.batch_audit(sample_audit_data)
print("\n批量审核结果示例:")
display_cols = ['account_id', 'date', 'agriculture_type', 'net_change', 'new_balance', 'audit_result', 'confidence']
print(audit_results[display_cols].head(10).to_string(index=False))
print("\n" + "=" * 60)
print("系统运行完成!")
print("=" * 60)
if name == "main":
main()