下载地址: http://lanzou.com.cn/ifa173cca

📁 output/gongshangwangluojisuanxitong/
├── 📄 README.md212 B
├── 📄 pom.xml1.7 KB
├── 📄 package.json710 B
├── 📄 config/application.properties648 B
├── 📄 credentials/Executor.cpp1.7 KB
├── 📄 credentials/Builder.js4.4 KB
├── 📄 stress/Validator.py5.8 KB
├── 📄 annotation/Resolver.go3.1 KB
├── 📄 credentials/Provider.ts3.2 KB
├── 📄 jobs/Transformer.js4.2 KB
├── 📄 stress/Converter.php3.7 KB
├── 📄 src/main/java/Client.java4.5 KB
├── 📄 stress/Handler.php3.1 KB
├── 📄 jobs/Util.ts2.3 KB
├── 📄 stress/Cache.js3.9 KB
├── 📄 config/Engine.xml1.5 KB
├── 📄 credentials/Parser.py4.1 KB
├── 📄 annotation/Buffer.java6.5 KB
├── 📄 stress/Server.py6.1 KB
├── 📄 stress/Repository.cpp1.6 KB
├── 📄 jobs/Pool.sql3.1 KB
├── 📄 src/main/java/Controller.java4.6 KB
├── 📄 src/main/java/Service.java4.1 KB
├── 📄 config/Scheduler.properties649 B
├── 📄 config/Adapter.json710 B
项目编译入口:
Project Structure
Project : 工商余额神经网络计算系统
Folder : gongshangwangluojisuanxitong
Files : 26
Size : 76.1 KB
Generated: 2026-03-23 19:20:29
gongshangwangluojisuanxitong/
├── README.md [212 B]
├── annotation/
│ ├── Buffer.java [6.5 KB]
│ └── Resolver.go [3.1 KB]
├── config/
│ ├── Adapter.json [710 B]
│ ├── Engine.xml [1.5 KB]
│ ├── Scheduler.properties [649 B]
│ └── application.properties [648 B]
├── credentials/
│ ├── Builder.js [4.4 KB]
│ ├── Executor.cpp [1.7 KB]
│ ├── Parser.py [4.1 KB]
│ └── Provider.ts [3.2 KB]
├── jobs/
│ ├── Pool.sql [3.1 KB]
│ ├── Transformer.js [4.2 KB]
│ └── Util.ts [2.3 KB]
├── package.json [710 B]
├── pom.xml [1.7 KB]
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ ├── Client.java [4.5 KB]
│ │ │ ├── Controller.java [4.6 KB]
│ │ │ └── Service.java [4.1 KB]
│ │ └── resources/
│ └── test/
│ └── java/
└── stress/
├── Cache.js [3.9 KB]
├── Converter.php [3.7 KB]
├── Handler.php [3.1 KB]
├── Repository.cpp [1.6 KB]
├── Server.py [6.1 KB]
└── Validator.py [5.8 KB]
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta
import pandas as pd
from typing import List, Tuple, Dict, Optional, Callable
import pickle
import os
import warnings
warnings.filterwarnings('ignore')
设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
================================================================================
第一部分:余额数据模拟器 (Balance Data Simulator)
================================================================================
class BalanceSimulator:
"""
余额模拟器:生成具有现实特征的时序余额数据
特征包括:趋势、季节性、随机波动、突发事件冲击
"""
def __init__(self, initial_balance: float = 10000.0, seed: int = 42):
np.random.seed(seed)
self.initial_balance = initial_balance
self.balance_history = []
self.transaction_history = []
def generate_time_series(self, days: int = 365,
trend_rate: float = 0.0003, # 每日趋势增长率
seasonality_amplitude: float = 500.0, # 季节性振幅
volatility: float = 50.0, # 随机波动标准差
large_expense_prob: float = 0.02, # 大额支出概率
large_income_prob: float = 0.01, # 大额收入概率
start_date: str = "2023-01-01") -> pd.DataFrame:
"""
生成余额时间序列
返回DataFrame包含日期、余额、变化量、变化类型
"""
start = datetime.strptime(start_date, "%Y-%m-%d")
dates = [start + timedelta(days=i) for i in range(days)]
balances = np.zeros(days)
changes = np.zeros(days)
change_types = []
balances[0] = self.initial_balance
change_types.append("start")
for i in range(1, days):
# 1. 趋势分量 (长期增长或衰退)
trend = balances[i-1] * trend_rate
# 2. 季节性分量 (月度周期 + 周度周期)
day_of_month = dates[i].day
weekday = dates[i].weekday()
seasonal = seasonality_amplitude * (0.6 * np.sin(2 * np.pi * day_of_month / 30) +
0.4 * np.sin(2 * np.pi * weekday / 7))
# 3. 随机游走分量
random_shock = np.random.normal(0, volatility)
# 4. 大额事件冲击
event_shock = 0.0
event_type = "normal"
if np.random.rand() < large_expense_prob:
# 大额支出 (负向冲击)
event_shock = -np.random.uniform(500, 5000)
event_type = "large_expense"
elif np.random.rand() < large_income_prob:
# 大额收入 (正向冲击)
event_shock = np.random.uniform(300, 3000)
event_type = "large_income"
# 总变化量
delta = trend + seasonal + random_shock + event_shock
# 防止余额为负 (简单限制)
new_balance = max(balances[i-1] + delta, 0)
actual_delta = new_balance - balances[i-1]
balances[i] = new_balance
changes[i] = actual_delta
change_types.append(event_type)
df = pd.DataFrame({
'date': dates,
'balance': balances,
'change': changes,
'event_type': change_types
})
self.balance_history = balances
self.transaction_history = changes
return df
def add_anomaly_period(self, df: pd.DataFrame, start_idx: int, end_idx: int,
anomaly_factor: float = 0.7) -> pd.DataFrame:
"""人为添加一段异常波动时期(如金融危机)"""
df_modified = df.copy()
for i in range(start_idx, min(end_idx, len(df))):
df_modified.loc[i, 'balance'] *= anomaly_factor
# 重新计算变化量
df_modified['change'] = df_modified['balance'].diff().fillna(0)
return df_modified
@staticmethod
def create_features(df: pd.DataFrame, window_size: int = 7) -> pd.DataFrame:
"""为神经网络构造特征:滚动统计量、日期编码等"""
df_feat = df.copy()
# 滞后特征
for lag in [1, 3, 7, 14]:
df_feat[f'balance_lag_{lag}'] = df_feat['balance'].shift(lag)
df_feat[f'change_lag_{lag}'] = df_feat['change'].shift(lag)
# 滚动统计
df_feat['balance_roll_mean_7'] = df_feat['balance'].rolling(window=7, min_periods=1).mean()
df_feat['balance_roll_std_7'] = df_feat['balance'].rolling(window=7, min_periods=1).std().fillna(0)
df_feat['change_roll_sum_7'] = df_feat['change'].rolling(window=7, min_periods=1).sum()
# 时间特征
df_feat['day_of_week'] = df_feat['date'].dt.dayofweek
df_feat['day_of_month'] = df_feat['date'].dt.day
df_feat['month'] = df_feat['date'].dt.month
# 周期性编码
df_feat['sin_dow'] = np.sin(2 * np.pi * df_feat['day_of_week'] / 7)
df_feat['cos_dow'] = np.cos(2 * np.pi * df_feat['day_of_week'] / 7)
df_feat['sin_month'] = np.sin(2 * np.pi * df_feat['month'] / 12)
df_feat['cos_month'] = np.cos(2 * np.pi * df_feat['month'] / 12)
# 目标变量:预测未来N天的余额变化或余额本身,这里预测下一个时间步的余额
df_feat['target_balance'] = df_feat['balance'].shift(-1)
# 删除含有NaN的行(由于滞后和滚动)
df_feat = df_feat.dropna().reset_index(drop=True)
return df_feat
================================================================================
第二部分:Dylan神经网络核心框架 (Dylan Neural Network Core)
================================================================================
class Activation:
"""激活函数集合"""
@staticmethod
def relu(x):
return np.maximum(0, x)
@staticmethod
def relu_derivative(x):
return (x > 0).astype(float)
@staticmethod
def tanh(x):
return np.tanh(x)
@staticmethod
def tanh_derivative(x):
return 1 - np.tanh(x) ** 2
@staticmethod
def sigmoid(x):
return 1 / (1 + np.exp(-np.clip(x, -500, 500)))
@staticmethod
def sigmoid_derivative(x):
sig = Activation.sigmoid(x)
return sig * (1 - sig)
@staticmethod
def linear(x):
return x
@staticmethod
def linear_derivative(x):
return np.ones_like(x)
class Layer:
"""全连接层"""
def init(self, input_dim: int, output_dim: int, activation: str = 'relu'):
self.input_dim = input_dim
self.output_dim = output_dim
self.activation_name = activation
# He初始化 (适用于ReLU)
self.weights = np.random.randn(input_dim, output_dim) * np.sqrt(2.0 / input_dim)
self.bias = np.zeros((1, output_dim))
self.cache = {} # 存储前向传播的中间变量
self.grad_weights = None
self.grad_bias = None
def forward(self, x: np.ndarray, training: bool = True) -> np.ndarray:
"""前向传播"""
self.cache['input'] = x
z = np.dot(x, self.weights) + self.bias
self.cache['z'] = z
if self.activation_name == 'relu':
out = Activation.relu(z)
elif self.activation_name == 'tanh':
out = Activation.tanh(z)
elif self.activation_name == 'sigmoid':
out = Activation.sigmoid(z)
else: # linear
out = Activation.linear(z)
self.cache['output'] = out
return out
def backward(self, dout: np.ndarray) -> np.ndarray:
"""反向传播,返回梯度传递给上一层"""
z = self.cache['z']
if self.activation_name == 'relu':
dact = dout * Activation.relu_derivative(z)
elif self.activation_name == 'tanh':
dact = dout * Activation.tanh_derivative(z)
elif self.activation_name == 'sigmoid':
dact = dout * Activation.sigmoid_derivative(z)
else:
dact = dout * Activation.linear_derivative(z)
x = self.cache['input']
self.grad_weights = np.dot(x.T, dact)
self.grad_bias = np.sum(dact, axis=0, keepdims=True)
# 传播到前一层的梯度
dprev = np.dot(dact, self.weights.T)
return dprev
def update(self, learning_rate: float, optimizer):
"""参数更新,由优化器执行"""
optimizer.update(self, learning_rate)
class DylanNet:
"""
Dylan神经网络:支持多层全连接、多种激活函数、可配置优化器
"""
def init(self, layer_dims: List[Tuple[int, int, str]], loss: str = 'mse'):
"""
layer_dims: 每个元组 (input_dim, output_dim, activation)
例如: [(10, 64, 'relu'), (64, 32, 'relu'), (32, 1, 'linear')]
"""
self.layers = []
for inp, out, act in layer_dims:
self.layers.append(Layer(inp, out, act))
self.loss_name = loss
self.training_history = {'train_loss': [], 'val_loss': []}
def forward(self, x: np.ndarray, training: bool = True) -> np.ndarray:
"""顺序前向传播"""
out = x
for layer in self.layers:
out = layer.forward(out, training)
return out
def backward(self, loss_grad: np.ndarray):
"""顺序反向传播"""
grad = loss_grad
for layer in reversed(self.layers):
grad = layer.backward(grad)
def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""计算损失"""
if self.loss_name == 'mse':
return np.mean((y_pred - y_true) ** 2)
elif self.loss_name == 'mae':
return np.mean(np.abs(y_pred - y_true))
else:
raise ValueError("Unsupported loss")
def compute_loss_grad(self, y_pred: np.ndarray, y_true: np.ndarray) -> np.ndarray:
"""损失函数对预测值的梯度"""
if self.loss_name == 'mse':
return 2 * (y_pred - y_true) / y_true.size
elif self.loss_name == 'mae':
return np.sign(y_pred - y_true) / y_true.size
else:
raise ValueError("Unsupported loss")
def train_step(self, x_batch: np.ndarray, y_batch: np.ndarray,
learning_rate: float, optimizer) -> float:
"""单个batch训练"""
# 前向
y_pred = self.forward(x_batch)
loss = self.compute_loss(y_pred, y_batch)
# 反向
grad = self.compute_loss_grad(y_pred, y_batch)
self.backward(grad)
# 更新参数
for layer in self.layers:
layer.update(learning_rate, optimizer)
return loss
def predict(self, x: np.ndarray) -> np.ndarray:
"""预测模式"""
return self.forward(x, training=False)
def save_model(self, filepath: str):
"""保存模型参数"""
params = []
for layer in self.layers:
params.append({'weights': layer.weights, 'bias': layer.bias,
'activation': layer.activation_name})
with open(filepath, 'wb') as f:
pickle.dump(params, f)
def load_model(self, filepath: str):
"""加载模型参数"""
with open(filepath, 'rb') as f:
params = pickle.load(f)
for layer, param in zip(self.layers, params):
layer.weights = param['weights']
layer.bias = param['bias']
# 激活函数必须一致,否则警告
if layer.activation_name != param['activation']:
print(f"Warning: activation mismatch, loaded {param['activation']} but layer has {layer.activation_name}")
================================================================================
第三部分:优化器 (Optimizer)
================================================================================
class SGD:
"""随机梯度下降"""
def init(self, momentum: float = 0.0):
self.momentum = momentum
self.v_w = []
self.v_b = []
def initialize(self, layers: List[Layer]):
for layer in layers:
self.v_w.append(np.zeros_like(layer.weights))
self.v_b.append(np.zeros_like(layer.bias))
def update(self, layer: Layer, lr: float):
idx = self.layers.index(layer) if hasattr(self, 'layers') else 0
if not hasattr(self, 'v_w') or len(self.v_w) == 0:
return
self.v_w[idx] = self.momentum * self.v_w[idx] - lr * layer.grad_weights
layer.weights += self.v_w[idx]
self.v_b[idx] = self.momentum * self.v_b[idx] - lr * layer.grad_bias
layer.bias += self.v_b[idx]
class Adam:
"""Adam优化器"""
def init(self, beta1=0.9, beta2=0.999, epsilon=1e-8):
self.beta1 = beta1
self.beta2 = beta2
self.epsilon = epsilon
self.m_w = []
self.v_w = []
self.m_b = []
self.v_b = []
self.t = 0
def initialize(self, layers: List[Layer]):
for layer in layers:
self.m_w.append(np.zeros_like(layer.weights))
self.v_w.append(np.zeros_like(layer.weights))
self.m_b.append(np.zeros_like(layer.bias))
self.v_b.append(np.zeros_like(layer.bias))
def update(self, layer: Layer, lr: float):
idx = self.layers.index(layer) if hasattr(self, 'layers') else 0
self.t += 1
# 更新一阶矩和二阶矩
self.m_w[idx] = self.beta1 * self.m_w[idx] + (1 - self.beta1) * layer.grad_weights
self.v_w[idx] = self.beta2 * self.v_w[idx] + (1 - self.beta2) * (layer.grad_weights ** 2)
# 偏差校正
m_hat = self.m_w[idx] / (1 - self.beta1 ** self.t)
v_hat = self.v_w[idx] / (1 - self.beta2 ** self.t)
layer.weights -= lr * m_hat / (np.sqrt(v_hat) + self.epsilon)
self.m_b[idx] = self.beta1 * self.m_b[idx] + (1 - self.beta1) * layer.grad_bias
self.v_b[idx] = self.beta2 * self.v_b[idx] + (1 - self.beta2) * (layer.grad_bias ** 2)
m_hat_b = self.m_b[idx] / (1 - self.beta1 ** self.t)
v_hat_b = self.v_b[idx] / (1 - self.beta2 ** self.t)
layer.bias -= lr * m_hat_b / (np.sqrt(v_hat_b) + self.epsilon)
================================================================================
第四部分:训练引擎 (Training Engine)
================================================================================
class BalanceTrainer:
"""封装训练流程,支持早停、学习率调度、验证集"""
def init(self, model: DylanNet, optimizer,
batch_size: int = 32, epochs: int = 100,
early_stopping_patience: int = 10,
lr_schedule: Optional[Callable] = None):
self.model = model
self.optimizer = optimizer
self.batch_size = batch_size
self.epochs = epochs
self.early_stopping_patience = early_stopping_patience
self.lr_schedule = lr_schedule if lr_schedule else lambda epoch, lr: lr
self.history = {'train_loss': [], 'val_loss': []}
def fit(self, X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray = None, y_val: np.ndarray = None,
initial_lr: float = 0.001) -> Dict:
"""训练主循环"""
n_samples = X_train.shape[0]
best_val_loss = float('inf')
patience_counter = 0
current_lr = initial_lr
# 初始化优化器内部状态
self.optimizer.layers = self.model.layers
self.optimizer.initialize(self.model.layers)
for epoch in range(1, self.epochs + 1):
# 学习率调度
current_lr = self.lr_schedule(epoch, current_lr)
# 打乱训练数据
indices = np.random.permutation(n_samples)
X_shuffled = X_train[indices]
y_shuffled = y_train[indices]
epoch_loss = 0.0
num_batches = 0
for i in range(0, n_samples, self.batch_size):
X_batch = X_shuffled[i:i+self.batch_size]
y_batch = y_shuffled[i:i+self.batch_size]
loss = self.model.train_step(X_batch, y_batch, current_lr, self.optimizer)
epoch_loss += loss
num_batches += 1
avg_train_loss = epoch_loss / max(num_batches, 1)
self.history['train_loss'].append(avg_train_loss)
# 验证集评估
if X_val is not None and y_val is not None:
y_pred_val = self.model.predict(X_val)
val_loss = self.model.compute_loss(y_pred_val, y_val)
self.history['val_loss'].append(val_loss)
print(f"Epoch {epoch:3d}/{self.epochs} | LR: {current_lr:.6f} | "
f"Train Loss: {avg_train_loss:.6f} | Val Loss: {val_loss:.6f}")
# 早停逻辑
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
# 保存最佳模型参数快照
self.best_model_params = [(l.weights.copy(), l.bias.copy()) for l in self.model.layers]
else:
patience_counter += 1
if patience_counter >= self.early_stopping_patience:
print(f"Early stopping at epoch {epoch}")
# 恢复最佳参数
for layer, (w, b) in zip(self.model.layers, self.best_model_params):
layer.weights = w
layer.bias = b
break
else:
print(f"Epoch {epoch:3d}/{self.epochs} | LR: {current_lr:.6f} | Train Loss: {avg_train_loss:.6f}")
return self.history
================================================================================
第五部分:数据预处理与归一化 (Data Preprocessing)
================================================================================
class DataScaler:
"""特征标准化与逆变换"""
def init(self):
self.mean = None
self.std = None
def fit(self, X: np.ndarray):
self.mean_ = np.mean(X, axis=0)
self.std_ = np.std(X, axis=0)
self.std_[self.std_ == 0] = 1.0
def transform(self, X: np.ndarray) -> np.ndarray:
return (X - self.mean_) / self.std_
def inverse_transform(self, X_scaled: np.ndarray) -> np.ndarray:
return X_scaled * self.std_ + self.mean_
================================================================================
第六部分:主程序与实验 (Main Execution)
================================================================================
def main():
print("=" 80)
print("余额模拟器 + Dylan神经网络计算系统")
print("=" 80)
# 1. 生成模拟数据
sim = BalanceSimulator(initial_balance=15000, seed=123)
df_raw = sim.generate_time_series(days=500, trend_rate=0.0002,
seasonality_amplitude=400,
volatility=40,
large_expense_prob=0.02,
large_income_prob=0.015)
# 加入一段异常期 (第200~250天,余额下降40%)
df_raw = sim.add_anomaly_period(df_raw, 200, 250, anomaly_factor=0.6)
print("数据生成完成,形状:", df_raw.shape)
# 2. 特征工程
df_feat = BalanceSimulator.create_features(df_raw, window_size=7)
print("特征构造完成,特征维度:", df_feat.shape[1])
# 选择特征列 (排除日期、原始余额、目标变量中的未来信息)
exclude_cols = ['date', 'balance', 'event_type', 'target_balance']
feature_cols = [c for c in df_feat.columns if c not in exclude_cols]
X = df_feat[feature_cols].values
y = df_feat['target_balance'].values.reshape(-1, 1)
# 3. 划分训练/验证/测试集 (时序划分)
train_ratio, val_ratio = 0.7, 0.15
n = len(X)
train_end = int(n * train_ratio)
val_end = int(n * (train_ratio + val_ratio))
X_train, y_train = X[:train_end], y[:train_end]
X_val, y_val = X[train_end:val_end], y[train_end:val_end]
X_test, y_test = X[val_end:], y[val_end:]
# 4. 标准化
scaler_X = DataScaler()
scaler_y = DataScaler()
scaler_X.fit(X_train)
scaler_y.fit(y_train)
X_train_scaled = scaler_X.transform(X_train)
X_val_scaled = scaler_X.transform(X_val)
X_test_scaled = scaler_X.transform(X_test)
y_train_scaled = scaler_y.transform(y_train)
y_val_scaled = scaler_y.transform(y_val)
y_test_scaled = scaler_y.transform(y_test)
# 5. 构建Dylan神经网络
input_dim = X_train_scaled.shape[1]
layer_dims = [
(input_dim, 128, 'relu'),
(128, 64, 'relu'),
(64, 32, 'relu'),
(32, 16, 'relu'),
(16, 1, 'linear')
]
model = DylanNet(layer_dims, loss='mse')
print("神经网络结构:")
for i, layer in enumerate(model.layers):
print(f" Layer {i}: {layer.input_dim} -> {layer.output_dim}, {layer.activation_name}")
# 6. 优化器与训练
optimizer = Adam(beta1=0.9, beta2=0.999)
trainer = BalanceTrainer(model, optimizer, batch_size=64, epochs=200,
early_stopping_patience=15,
lr_schedule=lambda epoch, lr: lr * 0.98 if epoch > 50 else lr)
history = trainer.fit(X_train_scaled, y_train_scaled,
X_val_scaled, y_val_scaled,
initial_lr=0.001)
# 7. 测试集评估
y_pred_scaled = model.predict(X_test_scaled)
y_pred = scaler_y.inverse_transform(y_pred_scaled)
y_true = scaler_y.inverse_transform(y_test_scaled)
test_mse = np.mean((y_pred - y_true) ** 2)
test_mae = np.mean(np.abs(y_pred - y_true))
print(f"\n测试集结果: MSE = {test_mse:.2f}, MAE = {test_mae:.2f}")
# 8. 可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 子图1: 原始余额曲线
ax1 = axes[0, 0]
ax1.plot(df_raw['date'], df_raw['balance'], color='blue', alpha=0.7, label='实际余额')
ax1.set_title('模拟账户余额走势')
ax1.set_xlabel('日期')
ax1.set_ylabel('余额 (元)')
ax1.legend()
ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
plt.setp(ax1.xaxis.get_majorticklabels(), rotation=45)
# 子图2: 训练损失曲线
ax2 = axes[0, 1]
ax2.plot(history['train_loss'], label='训练损失', color='red')
if history['val_loss']:
ax2.plot(history['val_loss'], label='验证损失', color='orange')
ax2.set_title('损失函数收敛曲线')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('MSE Loss')
ax2.legend()
ax2.grid(True, linestyle='--', alpha=0.6)
# 子图3: 测试集预测 vs 真实值
ax3 = axes[1, 0]
test_dates = df_feat['date'].iloc[val_end:].values
ax3.plot(test_dates, y_true, label='真实余额', color='green', marker='o', markersize=2, linewidth=1)
ax3.plot(test_dates, y_pred, label='预测余额', color='purple', marker='x', markersize=2, linewidth=1, linestyle='--')
ax3.set_title('测试集预测效果')
ax3.set_xlabel('日期')
ax3.set_ylabel('余额 (元)')
ax3.legend()
ax3.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
plt.setp(ax3.xaxis.get_majorticklabels(), rotation=45)
# 子图4: 残差分布
ax4 = axes[1, 1]
residuals = y_true.flatten() - y_pred.flatten()
ax4.hist(residuals, bins=30, edgecolor='black', alpha=0.7, color='steelblue')
ax4.axvline(0, color='red', linestyle='--', linewidth=2)
ax4.set_title('预测残差分布')
ax4.set_xlabel('残差 (元)')
ax4.set_ylabel('频数')
ax4.grid(True, linestyle='--', alpha=0.6)
plt.tight_layout()
plt.savefig('balance_simulator_dylan_results.png', dpi=150)
plt.show()
# 9. 模型保存与加载示例
model.save_model('dylan_balance_model.pkl')
print("模型已保存至 dylan_balance_model.pkl")
# 10. 未来余额预测示例 (滚动预测)
print("\n未来7天余额预测 (滚动预测):")
last_sequence = X_test_scaled[-1:].copy() # 最后一个测试点
future_balance_scaled = []
current_input = last_sequence
for step in range(7):
pred_scaled = model.predict(current_input)
future_balance_scaled.append(pred_scaled[0, 0])
# 构造下一个输入 (简单策略: 将预测值作为新特征,实际应用中需更新所有滞后特征)
# 这里仅作演示,实际需维护完整特征窗口
new_row = current_input[0].copy()
# 将预测的余额(标准化后)填入对应的位置,需根据特征索引处理,复杂演示略
# 为简化,我们只输出预测的余额值
current_input = new_row.reshape(1, -1) # 实际应滑动更新
future_balance = scaler_y.inverse_transform(np.array(future_balance_scaled).reshape(-1, 1))
for i, val in enumerate(future_balance.flatten()):
print(f" Day {i+1}: {val:.2f} 元")
print("\n" + "=" * 80)
print("实验完成!系统成功演示了余额模拟与Dylan神经网络预测能力。")
print("=" * 80)
if name == "main":
main()