下载地址:http://lanzou.com.cn/ia3c0e53a

📁 output/jisuanmoxingaiban/
├── 📄 README.md188 B
├── 📄 pom.xml1.4 KB
├── 📄 package.json688 B
├── 📄 operation/Registry.py4.3 KB
├── 📄 config/Factory.json688 B
├── 📄 endpoint/Validator.java5 KB
├── 📄 drivers/Loader.php4 KB
├── 📄 validator/Helper.js2.8 KB
├── 📄 operation/Executor.js2.7 KB
├── 📄 evaluation/Util.java7.1 KB
├── 📄 src/main/java/Repository.java6.2 KB
├── 📄 config/Scheduler.xml1.4 KB
├── 📄 generator/Handler.py5.6 KB
├── 📄 crypto/Proxy.php2.6 KB
├── 📄 metrics/Manager.py5.5 KB
├── 📄 crypto/Engine.js3.9 KB
├── 📄 src/main/java/Listener.java3.8 KB
├── 📄 crypto/Cache.py6.3 KB
├── 📄 config/Provider.json688 B
├── 📄 src/main/java/Controller.java5.5 KB
├── 📄 crypto/Dispatcher.php3.1 KB
├── 📄 validator/Adapter.js4.3 KB
├── 📄 generator/Queue.js4.6 KB
├── 📄 drivers/Pool.java5.8 KB
├── 📄 src/main/java/Resolver.java6.8 KB
项目编译入口:
Project Structure
Project : 余额计算模型AI版
Folder : jisuanmoxingaiban
Files : 26
Size : 94.9 KB
Generated: 2026-03-22 19:36:40
jisuanmoxingaiban/
├── README.md [188 B]
├── config/
│ ├── Factory.json [688 B]
│ ├── Provider.json [688 B]
│ └── Scheduler.xml [1.4 KB]
├── crypto/
│ ├── Cache.py [6.3 KB]
│ ├── Dispatcher.php [3.1 KB]
│ ├── Engine.js [3.9 KB]
│ └── Proxy.php [2.6 KB]
├── drivers/
│ ├── Loader.php [4 KB]
│ └── Pool.java [5.8 KB]
├── endpoint/
│ └── Validator.java [5 KB]
├── evaluation/
│ └── Util.java [7.1 KB]
├── generator/
│ ├── Handler.py [5.6 KB]
│ └── Queue.js [4.6 KB]
├── metrics/
│ └── Manager.py [5.5 KB]
├── operation/
│ ├── Executor.js [2.7 KB]
│ └── Registry.py [4.3 KB]
├── package.json [688 B]
├── pom.xml [1.4 KB]
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ ├── Controller.java [5.5 KB]
│ │ │ ├── Listener.java [3.8 KB]
│ │ │ ├── Repository.java [6.2 KB]
│ │ │ └── Resolver.java [6.8 KB]
│ │ └── resources/
│ └── test/
│ └── java/
└── validator/
├── Adapter.js [4.3 KB]
└── Helper.js [2.8 KB]
python
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from typing import Optional, Tuple, List, Dict
import math
from dataclasses import dataclass
import warnings
warnings.filterwarnings('ignore')
==================== 配置类 ====================
@dataclass
class MozartConfig:
"""Mozart模型配置"""
# 模型维度
d_model: int = 256
n_heads: int = 8
d_ff: int = 1024
n_layers: int = 4
dropout: float = 0.1
# 时序参数
seq_len: int = 60 # 历史序列长度
pred_len: int = 30 # 预测序列长度
# 特征维度
n_transaction_features: int = 16 # 交易特征维度
n_static_features: int = 32 # 静态特征维度(用户画像等)
n_time_features: int = 8 # 时间特征维度
# 变分自编码器参数
latent_dim: int = 64
beta: float = 1.0 # KL散度权重
# 训练参数
lr: float = 1e-3
weight_decay: float = 1e-5
batch_size: int = 64
epochs: int = 100
# 设备
device: str = "cuda" if torch.cuda.is_available() else "cpu"
==================== 位置编码 ====================
class PositionalEncoding(nn.Module):
"""正弦余弦位置编码"""
def init(self, d_model: int, max_len: int = 5000, dropout: float = 0.1):
super().init()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0) # (1, max_len, d_model)
self.register_buffer('pe', pe)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""x: (batch, seq_len, d_model)"""
x = x + self.pe[:, :x.size(1), :]
return self.dropout(x)
==================== 时序编码器 ====================
class TimeSeriesEncoder(nn.Module):
"""基于Transformer的时序编码器"""
def init(self, config: MozartConfig):
super().init()
self.config = config
# 输入投影层
self.input_proj = nn.Linear(1 + config.n_transaction_features, config.d_model)
# 位置编码
self.pos_encoding = PositionalEncoding(config.d_model, config.seq_len, config.dropout)
# Transformer编码器层
encoder_layer = nn.TransformerEncoderLayer(
d_model=config.d_model,
nhead=config.n_heads,
dim_feedforward=config.d_ff,
dropout=config.dropout,
activation='gelu',
batch_first=True
)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, config.n_layers)
# 输出投影
self.output_proj = nn.Sequential(
nn.Linear(config.d_model, config.d_model),
nn.GELU(),
nn.Dropout(config.dropout),
nn.Linear(config.d_model, config.d_model)
)
def forward(self, balance_seq: torch.Tensor, transaction_features: torch.Tensor) -> torch.Tensor:
"""
balance_seq: (batch, seq_len, 1) - 历史余额序列
transaction_features: (batch, seq_len, n_transaction_features) - 交易特征
"""
# 特征拼接和投影
combined = torch.cat([balance_seq, transaction_features], dim=-1) # (batch, seq_len, 1 + n_trans)
x = self.input_proj(combined) # (batch, seq_len, d_model)
# 添加位置编码
x = self.pos_encoding(x)
# Transformer编码
x = self.transformer_encoder(x)
# 输出投影
encoded = self.output_proj(x)
return encoded # (batch, seq_len, d_model)
==================== 特征融合层 ====================
class FeatureFusionLayer(nn.Module):
"""特征融合层:融合时序编码、静态特征和时间特征"""
def init(self, config: MozartConfig):
super().init()
self.config = config
# 交叉注意力机制
self.cross_attention = nn.MultiheadAttention(
embed_dim=config.d_model,
num_heads=config.n_heads,
dropout=config.dropout,
batch_first=True
)
# 静态特征投影
self.static_proj = nn.Sequential(
nn.Linear(config.n_static_features, config.d_model),
nn.GELU(),
nn.Dropout(config.dropout)
)
# 时间特征投影
self.time_proj = nn.Sequential(
nn.Linear(config.n_time_features, config.d_model),
nn.GELU(),
nn.Dropout(config.dropout)
)
# 门控融合机制
self.gate = nn.Sequential(
nn.Linear(config.d_model * 3, config.d_model),
nn.Sigmoid()
)
self.fusion_proj = nn.Linear(config.d_model * 3, config.d_model)
self.layer_norm = nn.LayerNorm(config.d_model)
def forward(self,
encoded_seq: torch.Tensor,
static_features: torch.Tensor,
time_features: torch.Tensor) -> torch.Tensor:
"""
encoded_seq: (batch, seq_len, d_model) - 时序编码
static_features: (batch, n_static_features) - 静态特征
time_features: (batch, seq_len, n_time_features) - 时间特征
"""
# 投影静态特征并扩展到序列维度
static_emb = self.static_proj(static_features).unsqueeze(1) # (batch, 1, d_model)
static_emb = static_emb.expand(-1, encoded_seq.size(1), -1) # (batch, seq_len, d_model)
# 投影时间特征
time_emb = self.time_proj(time_features) # (batch, seq_len, d_model)
# 交叉注意力:以时序编码为query,静态特征为key/value
attended, _ = self.cross_attention(
query=encoded_seq,
key=static_emb,
value=static_emb
)
# 特征拼接
concat_features = torch.cat([attended, static_emb, time_emb], dim=-1) # (batch, seq_len, d_model*3)
# 门控融合
gate_weights = self.gate(concat_features)
fused = gate_weights * self.fusion_proj(concat_features) + (1 - gate_weights) * encoded_seq
return self.layer_norm(fused)
==================== CVAE解码器 ====================
class CVAEDecoder(nn.Module):
"""条件变分自编码器解码器"""
def init(self, config: MozartConfig):
super().init()
self.config = config
# 编码器网络(用于后验分布)
self.encoder_net = nn.Sequential(
nn.Linear(config.d_model + config.pred_len, config.d_model),
nn.GELU(),
nn.Dropout(config.dropout),
nn.Linear(config.d_model, config.d_model),
nn.GELU(),
)
self.mu_head = nn.Linear(config.d_model, config.latent_dim)
self.logvar_head = nn.Linear(config.d_model, config.latent_dim)
# 先验网络(从条件特征预测隐变量分布)
self.prior_net = nn.Sequential(
nn.Linear(config.d_model, config.d_model),
nn.GELU(),
nn.Dropout(config.dropout),
nn.Linear(config.d_model, config.d_model),
nn.GELU(),
)
self.prior_mu_head = nn.Linear(config.d_model, config.latent_dim)
self.prior_logvar_head = nn.Linear(config.d_model, config.latent_dim)
# 解码器网络
self.decoder_net = nn.Sequential(
nn.Linear(config.d_model + config.latent_dim, config.d_model),
nn.GELU(),
nn.Dropout(config.dropout),
nn.Linear(config.d_model, config.d_model),
nn.GELU(),
nn.Linear(config.d_model, config.pred_len)
)
def reparameterize(self, mu: torch.Tensor, logvar: torch.Tensor) -> torch.Tensor:
"""重参数化技巧"""
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
def encode(self, condition: torch.Tensor, target: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""编码后验分布"""
# target: (batch, pred_len)
combined = torch.cat([condition, target], dim=-1) # (batch, d_model + pred_len)
h = self.encoder_net(combined)
mu = self.mu_head(h)
logvar = self.logvar_head(h)
return mu, logvar
def prior(self, condition: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""先验分布"""
h = self.prior_net(condition)
mu = self.prior_mu_head(h)
logvar = self.prior_logvar_head(h)
return mu, logvar
def decode(self, condition: torch.Tensor, z: torch.Tensor) -> torch.Tensor:
"""解码生成预测"""
decoder_input = torch.cat([condition, z], dim=-1)
output = self.decoder_net(decoder_input)
return output
def forward(self,
condition: torch.Tensor,
target: Optional[torch.Tensor] = None,
deterministic: bool = True) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]:
"""
condition: (batch, d_model) - 条件特征(取时序编码的最后一个时间步)
target: (batch, pred_len) - 目标序列(训练时提供)
deterministic: 是否确定性生成
Returns:
pred: (batch, pred_len) - 预测序列
stats: 包含mu, logvar, prior_mu, prior_logvar的字典
"""
if deterministic or target is None:
# 确定性生成模式:使用先验分布
prior_mu, prior_logvar = self.prior(condition)
z = prior_mu # 使用均值,不采样
pred = self.decode(condition, z)
stats = {'prior_mu': prior_mu, 'prior_logvar': prior_logvar}
else:
# 训练模式:使用后验分布
mu, logvar = self.encode(condition, target)
z = self.reparameterize(mu, logvar)
pred = self.decode(condition, z)
prior_mu, prior_logvar = self.prior(condition)
stats = {
'mu': mu,
'logvar': logvar,
'prior_mu': prior_mu,
'prior_logvar': prior_logvar
}
return pred, stats
==================== 完整的Mozart模型 ====================
class MozartModel(nn.Module):
"""余额生成计算模型 - Mozart"""
def init(self, config: MozartConfig):
super().init()
self.config = config
# 子模块
self.temporal_encoder = TimeSeriesEncoder(config)
self.fusion_layer = FeatureFusionLayer(config)
self.decoder = CVAEDecoder(config)
# 条件特征投影
self.condition_proj = nn.Sequential(
nn.Linear(config.d_model, config.d_model),
nn.GELU(),
nn.Dropout(config.dropout)
)
# 残差连接用于余额增量预测
self.residual_scale = nn.Parameter(torch.ones(1))
self._init_weights()
def _init_weights(self):
"""初始化权重"""
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
def forward(self,
historical_balance: torch.Tensor,
transaction_features: torch.Tensor,
static_features: torch.Tensor,
time_features: torch.Tensor,
target_balance: Optional[torch.Tensor] = None,
deterministic: bool = True) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]:
"""
historical_balance: (batch, seq_len, 1) - 历史余额
transaction_features: (batch, seq_len, n_transaction_features) - 交易特征
static_features: (batch, n_static_features) - 静态特征
time_features: (batch, seq_len + pred_len, n_time_features) - 时间特征(包含预测期)
target_balance: (batch, pred_len) - 目标余额(训练时提供)
Returns:
predicted_balance: (batch, pred_len) - 预测余额
stats: 训练统计信息
"""
# 1. 时序编码
encoded_seq = self.temporal_encoder(historical_balance, transaction_features)
# 2. 特征融合(使用历史时间步的时间特征)
historical_time_features = time_features[:, :self.config.seq_len, :]
fused_seq = self.fusion_layer(encoded_seq, static_features, historical_time_features)
# 3. 提取条件特征(最后一个时间步的融合特征)
condition = fused_seq[:, -1, :] # (batch, d_model)
condition = self.condition_proj(condition)
# 4. 获取预测期的时间特征
pred_time_features = time_features[:, self.config.seq_len:, :] # (batch, pred_len, n_time_features)
pred_time_emb = self.fusion_layer.time_proj(pred_time_features) # (batch, pred_len, d_model)
# 5. 解码生成
pred_delta, stats = self.decoder(condition, target_balance, deterministic)
# 6. 将增量转换为余额(使用残差连接)
last_balance = historical_balance[:, -1, 0] # (batch,)
predicted_balance = last_balance.unsqueeze(1) + pred_delta * self.residual_scale
# 7. 添加时间特征偏置(可选)
time_bias = self._compute_time_bias(pred_time_emb)
predicted_balance = predicted_balance + time_bias
return predicted_balance, stats
def _compute_time_bias(self, time_emb: torch.Tensor) -> torch.Tensor:
"""计算时间特征偏置"""
# 简单实现:线性投影到标量
bias = torch.mean(time_emb, dim=-1) # (batch, pred_len)
return bias * 0.1 # 缩放因子
def sample(self,
historical_balance: torch.Tensor,
transaction_features: torch.Tensor,
static_features: torch.Tensor,
time_features: torch.Tensor,
n_samples: int = 10) -> torch.Tensor:
"""生成多个样本(蒙特卡洛模拟)"""
self.eval()
with torch.no_grad():
# 编码条件
encoded_seq = self.temporal_encoder(historical_balance, transaction_features)
historical_time_features = time_features[:, :self.config.seq_len, :]
fused_seq = self.fusion_layer(encoded_seq, static_features, historical_time_features)
condition = fused_seq[:, -1, :]
condition = self.condition_proj(condition)
# 生成多个样本
samples = []
pred_time_features = time_features[:, self.config.seq_len:, :]
pred_time_emb = self.fusion_layer.time_proj(pred_time_features)
last_balance = historical_balance[:, -1, 0]
for _ in range(n_samples):
# 从先验分布采样
prior_mu, prior_logvar = self.decoder.prior(condition)
z = self.decoder.reparameterize(prior_mu, prior_logvar)
pred_delta = self.decoder.decode(condition, z)
pred_balance = last_balance.unsqueeze(1) + pred_delta * self.residual_scale
time_bias = self._compute_time_bias(pred_time_emb)
pred_balance = pred_balance + time_bias
samples.append(pred_balance.unsqueeze(0))
samples = torch.cat(samples, dim=0) # (n_samples, batch, pred_len)
return samples
==================== 损失函数 ====================
class MozartLoss(nn.Module):
"""Mozart模型损失函数"""
def init(self, beta: float = 1.0):
super().init()
self.beta = beta
def forward(self,
pred: torch.Tensor,
target: torch.Tensor,
stats: Dict[str, torch.Tensor]) -> torch.Tensor:
"""
pred: (batch, pred_len) - 预测值
target: (batch, pred_len) - 真实值
stats: 包含mu, logvar, prior_mu, prior_logvar的字典
"""
# 预测损失(MSE + MAE混合)
mse_loss = F.mse_loss(pred, target)
mae_loss = F.l1_loss(pred, target)
pred_loss = mse_loss + 0.5 * mae_loss
# KL散度损失(如果存在后验分布)
if 'mu' in stats:
mu = stats['mu']
logvar = stats['logvar']
prior_mu = stats['prior_mu']
prior_logvar = stats['prior_logvar']
# 计算KL散度: KL(q(z|x) || p(z|c))
kl_loss = -0.5 * torch.sum(
1 + logvar - prior_logvar -
(torch.exp(logvar) + (mu - prior_mu).pow(2)) / torch.exp(prior_logvar),
dim=-1
).mean()
else:
kl_loss = torch.tensor(0.0, device=pred.device)
total_loss = pred_loss + self.beta * kl_loss
return total_loss, {'pred_loss': pred_loss.item(), 'kl_loss': kl_loss.item()}
==================== 数据集类 ====================
class BalanceDataset(Dataset):
"""余额数据集"""
def init(self,
balance_series: np.ndarray,
transaction_features: np.ndarray,
static_features: np.ndarray,
time_features: np.ndarray,
seq_len: int,
pred_len: int):
"""
balance_series: (n_samples, total_len) - 余额序列
transaction_features: (n_samples, total_len, n_trans_features)
static_features: (n_samples, n_static_features)
time_features: (n_samples, total_len, n_time_features)
"""
self.seq_len = seq_len
self.pred_len = pred_len
self.total_len = balance_series.shape[1]
self.samples = []
for i in range(self.total_len - seq_len - pred_len + 1):
hist_balance = balance_series[:, i:i+seq_len]
target_balance = balance_series[:, i+seq_len:i+seq_len+pred_len]
hist_trans = transaction_features[:, i:i+seq_len, :]
target_time = time_features[:, i:i+seq_len+pred_len, :]
self.samples.append({
'hist_balance': hist_balance,
'target_balance': target_balance,
'hist_trans': hist_trans,
'target_time': target_time,
'static': static_features
})
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
sample = self.samples[idx]
return {
'hist_balance': torch.FloatTensor(sample['hist_balance']).unsqueeze(-1),
'target_balance': torch.FloatTensor(sample['target_balance']),
'hist_trans': torch.FloatTensor(sample['hist_trans']),
'target_time': torch.FloatTensor(sample['target_time']),
'static': torch.FloatTensor(sample['static'])
}
==================== 训练器 ====================
class MozartTrainer:
"""Mozart模型训练器"""
def init(self, model: MozartModel, config: MozartConfig):
self.model = model.to(config.device)
self.config = config
self.optimizer = torch.optim.AdamW(
model.parameters(),
lr=config.lr,
weight_decay=config.weight_decay
)
self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
self.optimizer,
T_max=config.epochs
)
self.criterion = MozartLoss(beta=config.beta)
self.train_losses = []
self.val_losses = []
def train_epoch(self, dataloader: DataLoader) -> float:
"""训练一个epoch"""
self.model.train()
total_loss = 0.0
for batch in dataloader:
hist_balance = batch['hist_balance'].to(self.config.device)
target_balance = batch['target_balance'].to(self.config.device)
hist_trans = batch['hist_trans'].to(self.config.device)
target_time = batch['target_time'].to(self.config.device)
static = batch['static'].to(self.config.device)
# 前向传播(训练模式)
pred, stats = self.model(
hist_balance, hist_trans, static, target_time,
target_balance, deterministic=False
)
# 计算损失
loss, loss_dict = self.criterion(pred, target_balance, stats)
# 反向传播
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)
def validate(self, dataloader: DataLoader) -> Tuple[float, float]:
"""验证"""
self.model.eval()
total_loss = 0.0
total_mape = 0.0
with torch.no_grad():
for batch in dataloader:
hist_balance = batch['hist_balance'].to(self.config.device)
target_balance = batch['target_balance'].to(self.config.device)
hist_trans = batch['hist_trans'].to(self.config.device)
target_time = batch['target_time'].to(self.config.device)
static = batch['static'].to(self.config.device)
# 确定性预测
pred, stats = self.model(
hist_balance, hist_trans, static, target_time,
target_balance, deterministic=True
)
loss, _ = self.criterion(pred, target_balance, stats)
total_loss += loss.item()
# 计算MAPE
mape = torch.mean(torch.abs((target_balance - pred) / (target_balance + 1e-8))) * 100
total_mape += mape.item()
return total_loss / len(dataloader), total_mape / len(dataloader)
def train(self,
train_dataloader: DataLoader,
val_dataloader: DataLoader,
epochs: int = None):
"""完整训练流程"""
epochs = epochs or self.config.epochs
for epoch in range(epochs):
train_loss = self.train_epoch(train_dataloader)
val_loss, val_mape = self.validate(val_dataloader)
self.scheduler.step()
self.train_losses.append(train_loss)
self.val_losses.append(val_loss)
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{epochs} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | MAPE: {val_mape:.2f}%")
return self.train_losses, self.val_losses
==================== 示例数据和训练 ====================
def generate_synthetic_data(n_samples: int = 1000,
seq_len: int = 60,
pred_len: int = 30,
n_trans_features: int = 16,
n_static_features: int = 32,
n_time_features: int = 8) -> Dict:
"""生成合成数据用于演示"""
total_len = seq_len + pred_len
# 生成余额序列(带趋势和季节性)
time = np.arange(total_len)
balance_series = np.zeros((n_samples, total_len))
for i in range(n_samples):
trend = 1000 + 5 * time
seasonality = 50 * np.sin(2 * np.pi * time / 30)
noise = np.random.randn(total_len) * 20
balance_series[i] = trend + seasonality + noise
# 生成交易特征
transaction_features = np.random.randn(n_samples, total_len, n_trans_features)
# 生成静态特征
static_features = np.random.randn(n_samples, n_static_features)
# 生成时间特征(星期几、月份等)
time_features = np.zeros((n_samples, total_len, n_time_features))
for i in range(total_len):
time_features[:, i, 0] = np.sin(2 * np.pi * i / 7) # 星期几正弦
time_features[:, i, 1] = np.cos(2 * np.pi * i / 7) # 星期几余弦
time_features[:, i, 2] = np.sin(2 * np.pi * i / 30) # 月内天数
time_features[:, i, 3] = np.cos(2 * np.pi * i / 30)
time_features[:, i, 4] = np.sin(2 * np.pi * i / 365) # 年内天数
time_features[:, i, 5] = np.cos(2 * np.pi * i / 365)
time_features[:, i, 6] = i % 24 / 24 # 小时
time_features[:, i, 7] = (i // 24) % 7 / 7 # 星期
return {
'balance_series': balance_series,
'transaction_features': transaction_features,
'static_features': static_features,
'time_features': time_features
}
def main():
"""主函数:演示模型训练和使用"""
# 配置
config = MozartConfig()
# 生成数据
print("生成合成数据...")
data = generate_synthetic_data(
n_samples=2000,
seq_len=config.seq_len,
pred_len=config.pred_len,
n_trans_features=config.n_transaction_features,
n_static_features=config.n_static_features,
n_time_features=config.n_time_features
)
# 划分训练集和验证集
n_train = int(0.8 * data['balance_series'].shape[0])
train_data = {k: v[:n_train] for k, v in data.items()}
val_data = {k: v[n_train:] for k, v in data.items()}
# 创建数据集
train_dataset = BalanceDataset(
train_data['balance_series'],
train_data['transaction_features'],
train_data['static_features'],
train_data['time_features'],
config.seq_len,
config.pred_len
)
val_dataset = BalanceDataset(
val_data['balance_series'],
val_data['transaction_features'],
val_data['static_features'],
val_data['time_features'],
config.seq_len,
config.pred_len
)
train_dataloader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=config.batch_size, shuffle=False)
# 创建模型和训练器
print("创建Mozart模型...")
model = MozartModel(config)
trainer = MozartTrainer(model, config)
# 训练
print("开始训练...")
train_losses, val_losses = trainer.train(train_dataloader, val_dataloader, epochs=20)
# 蒙特卡洛模拟示例
print("\n蒙特卡洛模拟示例...")
model.eval()
sample_batch = next(iter(val_dataloader))
hist_balance = sample_batch['hist_balance'][:1].to(config.device)
hist_trans = sample_batch['hist_trans'][:1].to(config.device)
target_time = sample_batch['target_time'][:1].to(config.device)
static = sample_batch['static'][:1].to(config.device)
# 生成多个样本
samples = model.sample(hist_balance, hist_trans, static, target_time, n_samples=100)
# 计算统计量
mean_pred = samples.mean(dim=0).cpu().numpy().squeeze()
std_pred = samples.std(dim=0).cpu().numpy().squeeze()
print(f"预测均值 (前5个时间步): {mean_pred[:5]}")
print(f"预测标准差 (前5个时间步): {std_pred[:5]}")
print("\n训练完成!")
if name == "main":
main()