农业银行余额模拟器,OCaml大模型审核系统

简介: AI审核农业账户余额异常设计。集成数据模拟、特征工程、随机森林审核模型及评估模块

下载地址:http://lanzou.com.cn/i09343b5f

image.png

"""
农业余额审核训练系统 (Agricultural Balance Audit Training System)
一个用于模拟和训练AI审核农业账户余额变化的系统。
包含数据生成、审核模型、训练与评估模块。
"""

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.preprocessing import StandardScaler
import joblib
import os
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

==================== 1. 数据模拟模块 ====================

class AgriculturalDataSimulator:
"""模拟农业账户余额及交易数据"""

def __init__(self, random_seed=42):
    self.random_seed = random_seed
    np.random.seed(random_seed)

    # 农业类型及对应的正常波动范围
    self.agriculture_types = ['种植业', '畜牧业', '渔业', '林业', '混合农业']
    self.seasonal_factors = {
        '种植业': {'春季': 1.2, '夏季': 1.5, '秋季': 2.0, '冬季': 0.8},
        '畜牧业': {'春季': 1.1, '夏季': 1.2, '秋季': 1.1, '冬季': 1.0},
        '渔业': {'春季': 1.1, '夏季': 1.4, '秋季': 1.3, '冬季': 0.9},
        '林业': {'春季': 1.0, '夏季': 1.2, '秋季': 1.3, '冬季': 0.9},
        '混合农业': {'春季': 1.1, '夏季': 1.3, '秋季': 1.4, '冬季': 0.9}
    }

def _get_season(self, date):
    """根据日期获取季节"""
    month = date.month
    if month in [3, 4, 5]:
        return '春季'
    elif month in [6, 7, 8]:
        return '夏季'
    elif month in [9, 10, 11]:
        return '秋季'
    else:
        return '冬季'

def generate_accounts(self, n_accounts=1000):
    """生成农业账户基础信息"""
    accounts = []
    for i in range(n_accounts):
        acc_type = np.random.choice(self.agriculture_types)
        # 基础余额:根据农业类型不同有所差异
        base_balance = np.random.uniform(
            50000 if acc_type in ['种植业', '混合农业'] else 30000,
            500000 if acc_type in ['种植业', '畜牧业'] else 200000
        )
        accounts.append({
            'account_id': f'AGRI_{i+1:05d}',
            'agriculture_type': acc_type,
            'region': np.random.choice(['华北', '华东', '华南', '西南', '西北', '东北']),
            'farm_size': np.random.choice(['小型', '中型', '大型'], p=[0.4, 0.4, 0.2]),
            'base_balance': base_balance,
            'credit_score': np.random.randint(300, 850),
            'established_years': np.random.randint(1, 30)
        })
    return pd.DataFrame(accounts)

def generate_transactions(self, accounts_df, months=12):
    """为账户生成月度交易流水和余额变化"""
    all_records = []
    end_date = datetime(2024, 12, 31)

    for _, account in accounts_df.iterrows():
        acc_id = account['account_id']
        acc_type = account['agriculture_type']
        base_balance = account['base_balance']

        current_balance = base_balance
        for m in range(months):
            date = end_date - timedelta(days=30 * m)
            season = self._get_season(date)
            seasonal_factor = self.seasonal_factors[acc_type][season]

            # 正常波动:基于季节和农业类型
            normal_fluctuation = np.random.normal(0, 0.1) * seasonal_factor
            # 随机经营收入/支出
            income = np.random.gamma(2, 5000) * (0.5 + seasonal_factor * 0.5)
            expense = np.random.gamma(2, 4000) * (0.6 + seasonal_factor * 0.4)

            # 是否异常(约5%的概率产生异常交易)
            is_anomaly = np.random.random() < 0.05
            anomaly_amount = 0
            anomaly_type = 'normal'

            if is_anomaly:
                anomaly_type = np.random.choice(['sudden_withdraw', 'unusual_income', 'rapid_fluctuation'], p=[0.5, 0.3, 0.2])
                if anomaly_type == 'sudden_withdraw':
                    anomaly_amount = -np.random.uniform(50000, 200000)
                elif anomaly_type == 'unusual_income':
                    anomaly_amount = np.random.uniform(50000, 150000)
                else:  # rapid_fluctuation
                    anomaly_amount = np.random.uniform(-80000, 80000)

            net_change = income - expense + anomaly_amount
            new_balance = current_balance + net_change

            # 标记是否审核异常
            audit_label = 1 if is_anomaly else 0

            record = {
                'account_id': acc_id,
                'date': date.strftime('%Y-%m-%d'),
                'month': date.month,
                'season': season,
                'agriculture_type': acc_type,
                'region': account['region'],
                'farm_size': account['farm_size'],
                'credit_score': account['credit_score'],
                'established_years': account['established_years'],
                'previous_balance': current_balance,
                'income': income,
                'expense': expense,
                'anomaly_amount': anomaly_amount,
                'anomaly_type': anomaly_type if is_anomaly else 'normal',
                'net_change': net_change,
                'new_balance': new_balance,
                'balance_change_ratio': net_change / (current_balance + 1),
                'audit_label': audit_label
            }
            all_records.append(record)
            current_balance = new_balance

    return pd.DataFrame(all_records)

==================== 2. 特征工程模块 ====================

class FeatureEngineer:
"""特征工程:从原始数据中提取审核特征"""

@staticmethod
def create_features(df):
    """创建用于审核模型的特征"""
    # 复制数据避免修改原数据
    features_df = df.copy()

    # 时间特征
    features_df['month_sin'] = np.sin(2 * np.pi * features_df['month'] / 12)
    features_df['month_cos'] = np.cos(2 * np.pi * features_df['month'] / 12)

    # 季节编码
    season_map = {'春季': 0, '夏季': 1, '秋季': 2, '冬季': 3}
    features_df['season_code'] = features_df['season'].map(season_map)

    # 农业类型编码
    agri_map = {t: i for i, t in enumerate(features_df['agriculture_type'].unique())}
    features_df['agri_type_code'] = features_df['agriculture_type'].map(agri_map)

    # 地区编码
    region_map = {r: i for i, r in enumerate(features_df['region'].unique())}
    features_df['region_code'] = features_df['region'].map(region_map)

    # 农场规模编码
    size_map = {'小型': 0, '中型': 1, '大型': 2}
    features_df['farm_size_code'] = features_df['farm_size'].map(size_map)

    # 创建衍生特征
    features_df['income_expense_ratio'] = features_df['income'] / (features_df['expense'] + 1)
    features_df['balance_stress'] = features_df['new_balance'] / (features_df['credit_score'] + 1)
    features_df['volatility'] = features_df.groupby('account_id')['balance_change_ratio'].transform(
        lambda x: x.rolling(3, min_periods=1).std().fillna(0)
    )

    # 历史平均余额变化率(滞后特征)
    features_df['lag_1_change'] = features_df.groupby('account_id')['balance_change_ratio'].shift(1).fillna(0)
    features_df['lag_2_change'] = features_df.groupby('account_id')['balance_change_ratio'].shift(2).fillna(0)

    # 累积异常分数(基于阈值)
    features_df['rolling_anomaly_score'] = features_df.groupby('account_id')['balance_change_ratio'].transform(
        lambda x: (x.abs() > x.std() * 2).rolling(3, min_periods=1).sum().fillna(0)
    )

    # 选择最终特征列
    feature_columns = [
        'credit_score', 'established_years', 'previous_balance', 'new_balance',
        'income', 'expense', 'balance_change_ratio', 'month_sin', 'month_cos',
        'season_code', 'agri_type_code', 'region_code', 'farm_size_code',
        'income_expense_ratio', 'balance_stress', 'volatility',
        'lag_1_change', 'lag_2_change', 'rolling_anomaly_score'
    ]

    return features_df[feature_columns], features_df['audit_label']

==================== 3. 审核模型模块 ====================

class AgriculturalAuditModel:
"""农业余额审核模型(基于随机森林)"""

def __init__(self):
    self.model = RandomForestClassifier(
        n_estimators=100,
        max_depth=15,
        min_samples_split=10,
        min_samples_leaf=5,
        class_weight='balanced',
        random_state=42,
        n_jobs=-1
    )
    self.scaler = StandardScaler()
    self.is_trained = False
    self.feature_names = None

def train(self, X, y):
    """训练模型"""
    # 记录特征名
    self.feature_names = X.columns.tolist()

    # 标准化
    X_scaled = self.scaler.fit_transform(X)

    # 划分训练验证集
    X_train, X_val, y_train, y_val = train_test_split(
        X_scaled, y, test_size=0.2, random_state=42, stratify=y
    )

    # 训练
    self.model.fit(X_train, y_train)
    self.is_trained = True

    # 验证评估
    y_pred = self.model.predict(X_val)

    print("\n========== 模型训练评估 ==========")
    print(f"准确率: {accuracy_score(y_val, y_pred):.4f}")
    print("\n分类报告:")
    print(classification_report(y_val, y_pred, target_names=['正常', '异常']))
    print("混淆矩阵:")
    print(confusion_matrix(y_val, y_pred))

    # 特征重要性
    importances = self.model.feature_importances_
    indices = np.argsort(importances)[::-1][:10]
    print("\nTop 10 重要特征:")
    for i, idx in enumerate(indices):
        print(f"  {i+1}. {self.feature_names[idx]}: {importances[idx]:.4f}")

    return self

def predict(self, X):
    """预测异常交易"""
    if not self.is_trained:
        raise ValueError("模型尚未训练,请先调用train()方法")
    X_scaled = self.scaler.transform(X)
    return self.model.predict(X_scaled)

def predict_proba(self, X):
    """预测概率"""
    if not self.is_trained:
        raise ValueError("模型尚未训练,请先调用train()方法")
    X_scaled = self.scaler.transform(X)
    return self.model.predict_proba(X_scaled)

def save_model(self, path='agricultural_audit_model.pkl'):
    """保存模型"""
    model_data = {
        'model': self.model,
        'scaler': self.scaler,
        'feature_names': self.feature_names,
        'is_trained': self.is_trained
    }
    joblib.dump(model_data, path)
    print(f"模型已保存至: {path}")

def load_model(self, path='agricultural_audit_model.pkl'):
    """加载模型"""
    model_data = joblib.load(path)
    self.model = model_data['model']
    self.scaler = model_data['scaler']
    self.feature_names = model_data['feature_names']
    self.is_trained = model_data['is_trained']
    print(f"模型已从 {path} 加载")

==================== 4. 审核训练系统主类 ====================

class AgriculturalAuditTrainingSystem:
"""农业余额审核训练系统主控制器"""

def __init__(self):
    self.simulator = AgriculturalDataSimulator()
    self.feature_engineer = FeatureEngineer()
    self.model = AgriculturalAuditModel()
    self.accounts = None
    self.transactions = None
    self.X = None
    self.y = None

def generate_data(self, n_accounts=1000, months=12):
    """生成训练数据"""
    print(f"正在生成 {n_accounts} 个农业账户,{months} 个月的历史数据...")
    self.accounts = self.simulator.generate_accounts(n_accounts)
    self.transactions = self.simulator.generate_transactions(self.accounts, months)
    print(f"数据生成完成,共 {len(self.transactions)} 条交易记录")

    # 统计异常比例
    anomaly_rate = self.transactions['audit_label'].mean()
    print(f"异常交易比例: {anomaly_rate:.2%}")

    return self.transactions

def prepare_features(self):
    """准备特征和标签"""
    if self.transactions is None:
        raise ValueError("请先调用 generate_data() 生成数据")

    print("\n正在进行特征工程...")
    self.X, self.y = self.feature_engineer.create_features(self.transactions)
    print(f"特征维度: {self.X.shape}")
    print(f"特征列表: {list(self.X.columns)}")

    return self.X, self.y

def train(self):
    """训练审核模型"""
    if self.X is None or self.y is None:
        raise ValueError("请先调用 prepare_features() 准备特征")

    print("\n开始训练审核模型...")
    self.model.train(self.X, self.y)

def evaluate_on_new_data(self, n_new_accounts=200, months=6):
    """使用新生成的数据评估模型性能"""
    print("\n========== 在新数据上评估模型 ==========")

    # 生成新数据
    new_accounts = self.simulator.generate_accounts(n_new_accounts)
    new_transactions = self.simulator.generate_transactions(new_accounts, months)

    # 特征提取
    X_new, y_new = self.feature_engineer.create_features(new_transactions)

    # 预测
    y_pred = self.model.predict(X_new)
    y_proba = self.model.predict_proba(X_new)[:, 1]

    # 评估
    print(f"准确率: {accuracy_score(y_new, y_pred):.4f}")
    print("\n分类报告:")
    print(classification_report(y_new, y_pred, target_names=['正常', '异常']))

    # 添加预测结果到数据中用于展示
    new_transactions['predicted_label'] = y_pred
    new_transactions['anomaly_score'] = y_proba

    # 展示一些高风险的异常案例
    high_risk = new_transactions[new_transactions['predicted_label'] == 1].nlargest(5, 'anomaly_score')
    if len(high_risk) > 0:
        print("\n高风险异常交易示例:")
        display_cols = ['account_id', 'date', 'agriculture_type', 'previous_balance', 
                      'net_change', 'new_balance', 'anomaly_score', 'anomaly_type']
        print(high_risk[display_cols].to_string(index=False))

    return new_transactions

def batch_audit(self, transactions_df):
    """批量审核新交易数据"""
    if not self.model.is_trained:
        raise ValueError("模型未训练,请先调用 train()")

    print("\n执行批量审核...")
    X_audit, _ = self.feature_engineer.create_features(transactions_df)
    predictions = self.model.predict(X_audit)
    probabilities = self.model.predict_proba(X_audit)[:, 1]

    results = transactions_df.copy()
    results['audit_result'] = ['异常' if p == 1 else '正常' for p in predictions]
    results['confidence'] = probabilities

    # 统计审核结果
    print(f"审核完成,共审核 {len(results)} 条记录")
    print(f"异常记录数: {results['audit_result'].value_counts().get('异常', 0)}")
    print(f"正常记录数: {results['audit_result'].value_counts().get('正常', 0)}")

    return results

def save_system(self, path_prefix='agricultural_audit'):
    """保存整个系统状态"""
    self.model.save_model(f"{path_prefix}_model.pkl")

    # 保存特征信息
    if self.X is not None:
        feature_info = {
            'feature_names': self.X.columns.tolist(),
            'feature_count': len(self.X.columns)
        }
        joblib.dump(feature_info, f"{path_prefix}_features.pkl")

    print("系统状态已保存")

def load_system(self, path_prefix='agricultural_audit'):
    """加载整个系统状态"""
    self.model.load_model(f"{path_prefix}_model.pkl")

    # 加载特征信息(可选)
    if os.path.exists(f"{path_prefix}_features.pkl"):
        feature_info = joblib.load(f"{path_prefix}_features.pkl")
        print(f"已加载特征信息: {feature_info['feature_count']} 个特征")

    print("系统状态已加载")

==================== 5. 主程序入口 ====================

def main():
"""主程序:演示农业余额审核训练系统的完整流程"""
print("=" 60)
print(" 农业余额审核训练系统 (Agricultural Balance Audit Training System)")
print("="
60)

# 初始化系统
system = AgriculturalAuditTrainingSystem()

# 1. 生成训练数据
print("\n【步骤1】生成训练数据")
transactions = system.generate_data(n_accounts=1500, months=12)
print(f"数据样本预览:")
print(transactions.head(10).to_string())

# 2. 特征工程
print("\n【步骤2】特征工程")
X, y = system.prepare_features()

# 3. 训练模型
print("\n【步骤3】训练审核模型")
system.train()

# 4. 保存系统
print("\n【步骤4】保存系统")
system.save_system()

# 5. 在新数据上评估
print("\n【步骤5】在新数据上评估模型")
new_results = system.evaluate_on_new_data(n_new_accounts=300, months=3)

# 6. 演示批量审核
print("\n【步骤6】演示批量审核功能")
# 模拟一批待审核的交易数据
sample_audit_data = new_results.sample(20, random_state=42).drop(columns=['audit_label', 'predicted_label', 'anomaly_score'], errors='ignore')
audit_results = system.batch_audit(sample_audit_data)

print("\n批量审核结果示例:")
display_cols = ['account_id', 'date', 'agriculture_type', 'net_change', 'new_balance', 'audit_result', 'confidence']
print(audit_results[display_cols].head(10).to_string(index=False))

print("\n" + "=" * 60)
print("系统运行完成!")
print("=" * 60)

if name == "main":
main()

相关文章
|
1天前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
10096 24
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
13天前
|
人工智能 安全 Linux
【OpenClaw保姆级图文教程】阿里云/本地部署集成模型Ollama/Qwen3.5/百炼 API 步骤流程及避坑指南
2026年,AI代理工具的部署逻辑已从“单一云端依赖”转向“云端+本地双轨模式”。OpenClaw(曾用名Clawdbot)作为开源AI代理框架,既支持对接阿里云百炼等云端免费API,也能通过Ollama部署本地大模型,完美解决两类核心需求:一是担心云端API泄露核心数据的隐私安全诉求;二是频繁调用导致token消耗过高的成本控制需求。
5828 14
|
21天前
|
人工智能 JavaScript Ubuntu
5分钟上手龙虾AI!OpenClaw部署(阿里云+本地)+ 免费多模型配置保姆级教程(MiniMax、Claude、阿里云百炼)
OpenClaw(昵称“龙虾AI”)作为2026年热门的开源个人AI助手,由PSPDFKit创始人Peter Steinberger开发,核心优势在于“真正执行任务”——不仅能聊天互动,还能自动处理邮件、管理日程、订机票、写代码等,且所有数据本地处理,隐私完全可控。它支持接入MiniMax、Claude、GPT等多类大模型,兼容微信、Telegram、飞书等主流聊天工具,搭配100+可扩展技能,成为兼顾实用性与隐私性的AI工具首选。
22753 119

热门文章

最新文章