银行卡余额模拟器安卓,Mozart模型深度计算

简介: 基于Transformer与CVAE的余额预测AI模型(Mozart),支持多源特征融合(交易、静态、时间)、不确定性建模与蒙特卡洛模拟

下载地址:http://lanzou.com.cn/ia3c0e53a

image.png

📁 output/jisuanmoxingaiban/
├── 📄 README.md188 B
├── 📄 pom.xml1.4 KB
├── 📄 package.json688 B
├── 📄 operation/Registry.py4.3 KB
├── 📄 config/Factory.json688 B
├── 📄 endpoint/Validator.java5 KB
├── 📄 drivers/Loader.php4 KB
├── 📄 validator/Helper.js2.8 KB
├── 📄 operation/Executor.js2.7 KB
├── 📄 evaluation/Util.java7.1 KB
├── 📄 src/main/java/Repository.java6.2 KB
├── 📄 config/Scheduler.xml1.4 KB
├── 📄 generator/Handler.py5.6 KB
├── 📄 crypto/Proxy.php2.6 KB
├── 📄 metrics/Manager.py5.5 KB
├── 📄 crypto/Engine.js3.9 KB
├── 📄 src/main/java/Listener.java3.8 KB
├── 📄 crypto/Cache.py6.3 KB
├── 📄 config/Provider.json688 B
├── 📄 src/main/java/Controller.java5.5 KB
├── 📄 crypto/Dispatcher.php3.1 KB
├── 📄 validator/Adapter.js4.3 KB
├── 📄 generator/Queue.js4.6 KB
├── 📄 drivers/Pool.java5.8 KB
├── 📄 src/main/java/Resolver.java6.8 KB

项目编译入口:

Project Structure

Project : 余额计算模型AI版

Folder : jisuanmoxingaiban

Files : 26

Size : 94.9 KB

Generated: 2026-03-22 19:36:40

jisuanmoxingaiban/
├── README.md [188 B]
├── config/
│ ├── Factory.json [688 B]
│ ├── Provider.json [688 B]
│ └── Scheduler.xml [1.4 KB]
├── crypto/
│ ├── Cache.py [6.3 KB]
│ ├── Dispatcher.php [3.1 KB]
│ ├── Engine.js [3.9 KB]
│ └── Proxy.php [2.6 KB]
├── drivers/
│ ├── Loader.php [4 KB]
│ └── Pool.java [5.8 KB]
├── endpoint/
│ └── Validator.java [5 KB]
├── evaluation/
│ └── Util.java [7.1 KB]
├── generator/
│ ├── Handler.py [5.6 KB]
│ └── Queue.js [4.6 KB]
├── metrics/
│ └── Manager.py [5.5 KB]
├── operation/
│ ├── Executor.js [2.7 KB]
│ └── Registry.py [4.3 KB]
├── package.json [688 B]
├── pom.xml [1.4 KB]
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ ├── Controller.java [5.5 KB]
│ │ │ ├── Listener.java [3.8 KB]
│ │ │ ├── Repository.java [6.2 KB]
│ │ │ └── Resolver.java [6.8 KB]
│ │ └── resources/
│ └── test/
│ └── java/
└── validator/
├── Adapter.js [4.3 KB]
└── Helper.js [2.8 KB]

python
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from typing import Optional, Tuple, List, Dict
import math
from dataclasses import dataclass
import warnings
warnings.filterwarnings('ignore')

==================== 配置类 ====================

@dataclass
class MozartConfig:
"""Mozart模型配置"""

# 模型维度
d_model: int = 256
n_heads: int = 8
d_ff: int = 1024
n_layers: int = 4
dropout: float = 0.1

# 时序参数
seq_len: int = 60  # 历史序列长度
pred_len: int = 30  # 预测序列长度

# 特征维度
n_transaction_features: int = 16  # 交易特征维度
n_static_features: int = 32  # 静态特征维度(用户画像等)
n_time_features: int = 8  # 时间特征维度

# 变分自编码器参数
latent_dim: int = 64
beta: float = 1.0  # KL散度权重

# 训练参数
lr: float = 1e-3
weight_decay: float = 1e-5
batch_size: int = 64
epochs: int = 100

# 设备
device: str = "cuda" if torch.cuda.is_available() else "cpu"

==================== 位置编码 ====================

class PositionalEncoding(nn.Module):
"""正弦余弦位置编码"""
def init(self, d_model: int, max_len: int = 5000, dropout: float = 0.1):
super().init()
self.dropout = nn.Dropout(p=dropout)

    pe = torch.zeros(max_len, d_model)
    position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)
    pe = pe.unsqueeze(0)  # (1, max_len, d_model)

    self.register_buffer('pe', pe)

def forward(self, x: torch.Tensor) -> torch.Tensor:
    """x: (batch, seq_len, d_model)"""
    x = x + self.pe[:, :x.size(1), :]
    return self.dropout(x)

==================== 时序编码器 ====================

class TimeSeriesEncoder(nn.Module):
"""基于Transformer的时序编码器"""
def init(self, config: MozartConfig):
super().init()
self.config = config

    # 输入投影层
    self.input_proj = nn.Linear(1 + config.n_transaction_features, config.d_model)

    # 位置编码
    self.pos_encoding = PositionalEncoding(config.d_model, config.seq_len, config.dropout)

    # Transformer编码器层
    encoder_layer = nn.TransformerEncoderLayer(
        d_model=config.d_model,
        nhead=config.n_heads,
        dim_feedforward=config.d_ff,
        dropout=config.dropout,
        activation='gelu',
        batch_first=True
    )
    self.transformer_encoder = nn.TransformerEncoder(encoder_layer, config.n_layers)

    # 输出投影
    self.output_proj = nn.Sequential(
        nn.Linear(config.d_model, config.d_model),
        nn.GELU(),
        nn.Dropout(config.dropout),
        nn.Linear(config.d_model, config.d_model)
    )

def forward(self, balance_seq: torch.Tensor, transaction_features: torch.Tensor) -> torch.Tensor:
    """
    balance_seq: (batch, seq_len, 1) - 历史余额序列
    transaction_features: (batch, seq_len, n_transaction_features) - 交易特征
    """
    # 特征拼接和投影
    combined = torch.cat([balance_seq, transaction_features], dim=-1)  # (batch, seq_len, 1 + n_trans)
    x = self.input_proj(combined)  # (batch, seq_len, d_model)

    # 添加位置编码
    x = self.pos_encoding(x)

    # Transformer编码
    x = self.transformer_encoder(x)

    # 输出投影
    encoded = self.output_proj(x)

    return encoded  # (batch, seq_len, d_model)

==================== 特征融合层 ====================

class FeatureFusionLayer(nn.Module):
"""特征融合层:融合时序编码、静态特征和时间特征"""
def init(self, config: MozartConfig):
super().init()
self.config = config

    # 交叉注意力机制
    self.cross_attention = nn.MultiheadAttention(
        embed_dim=config.d_model,
        num_heads=config.n_heads,
        dropout=config.dropout,
        batch_first=True
    )

    # 静态特征投影
    self.static_proj = nn.Sequential(
        nn.Linear(config.n_static_features, config.d_model),
        nn.GELU(),
        nn.Dropout(config.dropout)
    )

    # 时间特征投影
    self.time_proj = nn.Sequential(
        nn.Linear(config.n_time_features, config.d_model),
        nn.GELU(),
        nn.Dropout(config.dropout)
    )

    # 门控融合机制
    self.gate = nn.Sequential(
        nn.Linear(config.d_model * 3, config.d_model),
        nn.Sigmoid()
    )

    self.fusion_proj = nn.Linear(config.d_model * 3, config.d_model)
    self.layer_norm = nn.LayerNorm(config.d_model)

def forward(self, 
            encoded_seq: torch.Tensor,
            static_features: torch.Tensor,
            time_features: torch.Tensor) -> torch.Tensor:
    """
    encoded_seq: (batch, seq_len, d_model) - 时序编码
    static_features: (batch, n_static_features) - 静态特征
    time_features: (batch, seq_len, n_time_features) - 时间特征
    """
    # 投影静态特征并扩展到序列维度
    static_emb = self.static_proj(static_features).unsqueeze(1)  # (batch, 1, d_model)
    static_emb = static_emb.expand(-1, encoded_seq.size(1), -1)  # (batch, seq_len, d_model)

    # 投影时间特征
    time_emb = self.time_proj(time_features)  # (batch, seq_len, d_model)

    # 交叉注意力:以时序编码为query,静态特征为key/value
    attended, _ = self.cross_attention(
        query=encoded_seq,
        key=static_emb,
        value=static_emb
    )

    # 特征拼接
    concat_features = torch.cat([attended, static_emb, time_emb], dim=-1)  # (batch, seq_len, d_model*3)

    # 门控融合
    gate_weights = self.gate(concat_features)
    fused = gate_weights * self.fusion_proj(concat_features) + (1 - gate_weights) * encoded_seq

    return self.layer_norm(fused)

==================== CVAE解码器 ====================

class CVAEDecoder(nn.Module):
"""条件变分自编码器解码器"""
def init(self, config: MozartConfig):
super().init()
self.config = config

    # 编码器网络(用于后验分布)
    self.encoder_net = nn.Sequential(
        nn.Linear(config.d_model + config.pred_len, config.d_model),
        nn.GELU(),
        nn.Dropout(config.dropout),
        nn.Linear(config.d_model, config.d_model),
        nn.GELU(),
    )
    self.mu_head = nn.Linear(config.d_model, config.latent_dim)
    self.logvar_head = nn.Linear(config.d_model, config.latent_dim)

    # 先验网络(从条件特征预测隐变量分布)
    self.prior_net = nn.Sequential(
        nn.Linear(config.d_model, config.d_model),
        nn.GELU(),
        nn.Dropout(config.dropout),
        nn.Linear(config.d_model, config.d_model),
        nn.GELU(),
    )
    self.prior_mu_head = nn.Linear(config.d_model, config.latent_dim)
    self.prior_logvar_head = nn.Linear(config.d_model, config.latent_dim)

    # 解码器网络
    self.decoder_net = nn.Sequential(
        nn.Linear(config.d_model + config.latent_dim, config.d_model),
        nn.GELU(),
        nn.Dropout(config.dropout),
        nn.Linear(config.d_model, config.d_model),
        nn.GELU(),
        nn.Linear(config.d_model, config.pred_len)
    )

def reparameterize(self, mu: torch.Tensor, logvar: torch.Tensor) -> torch.Tensor:
    """重参数化技巧"""
    std = torch.exp(0.5 * logvar)
    eps = torch.randn_like(std)
    return mu + eps * std

def encode(self, condition: torch.Tensor, target: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
    """编码后验分布"""
    # target: (batch, pred_len)
    combined = torch.cat([condition, target], dim=-1)  # (batch, d_model + pred_len)
    h = self.encoder_net(combined)
    mu = self.mu_head(h)
    logvar = self.logvar_head(h)
    return mu, logvar

def prior(self, condition: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
    """先验分布"""
    h = self.prior_net(condition)
    mu = self.prior_mu_head(h)
    logvar = self.prior_logvar_head(h)
    return mu, logvar

def decode(self, condition: torch.Tensor, z: torch.Tensor) -> torch.Tensor:
    """解码生成预测"""
    decoder_input = torch.cat([condition, z], dim=-1)
    output = self.decoder_net(decoder_input)
    return output

def forward(self, 
            condition: torch.Tensor, 
            target: Optional[torch.Tensor] = None,
            deterministic: bool = True) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]:
    """
    condition: (batch, d_model) - 条件特征(取时序编码的最后一个时间步)
    target: (batch, pred_len) - 目标序列(训练时提供)
    deterministic: 是否确定性生成

    Returns:
        pred: (batch, pred_len) - 预测序列
        stats: 包含mu, logvar, prior_mu, prior_logvar的字典
    """
    if deterministic or target is None:
        # 确定性生成模式:使用先验分布
        prior_mu, prior_logvar = self.prior(condition)
        z = prior_mu  # 使用均值,不采样
        pred = self.decode(condition, z)
        stats = {'prior_mu': prior_mu, 'prior_logvar': prior_logvar}
    else:
        # 训练模式:使用后验分布
        mu, logvar = self.encode(condition, target)
        z = self.reparameterize(mu, logvar)
        pred = self.decode(condition, z)

        prior_mu, prior_logvar = self.prior(condition)
        stats = {
            'mu': mu, 
            'logvar': logvar,
            'prior_mu': prior_mu, 
            'prior_logvar': prior_logvar
        }

    return pred, stats

==================== 完整的Mozart模型 ====================

class MozartModel(nn.Module):
"""余额生成计算模型 - Mozart"""
def init(self, config: MozartConfig):
super().init()
self.config = config

    # 子模块
    self.temporal_encoder = TimeSeriesEncoder(config)
    self.fusion_layer = FeatureFusionLayer(config)
    self.decoder = CVAEDecoder(config)

    # 条件特征投影
    self.condition_proj = nn.Sequential(
        nn.Linear(config.d_model, config.d_model),
        nn.GELU(),
        nn.Dropout(config.dropout)
    )

    # 残差连接用于余额增量预测
    self.residual_scale = nn.Parameter(torch.ones(1))

    self._init_weights()

def _init_weights(self):
    """初始化权重"""
    for p in self.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)

def forward(self, 
            historical_balance: torch.Tensor,
            transaction_features: torch.Tensor,
            static_features: torch.Tensor,
            time_features: torch.Tensor,
            target_balance: Optional[torch.Tensor] = None,
            deterministic: bool = True) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]:
    """
    historical_balance: (batch, seq_len, 1) - 历史余额
    transaction_features: (batch, seq_len, n_transaction_features) - 交易特征
    static_features: (batch, n_static_features) - 静态特征
    time_features: (batch, seq_len + pred_len, n_time_features) - 时间特征(包含预测期)
    target_balance: (batch, pred_len) - 目标余额(训练时提供)

    Returns:
        predicted_balance: (batch, pred_len) - 预测余额
        stats: 训练统计信息
    """
    # 1. 时序编码
    encoded_seq = self.temporal_encoder(historical_balance, transaction_features)

    # 2. 特征融合(使用历史时间步的时间特征)
    historical_time_features = time_features[:, :self.config.seq_len, :]
    fused_seq = self.fusion_layer(encoded_seq, static_features, historical_time_features)

    # 3. 提取条件特征(最后一个时间步的融合特征)
    condition = fused_seq[:, -1, :]  # (batch, d_model)
    condition = self.condition_proj(condition)

    # 4. 获取预测期的时间特征
    pred_time_features = time_features[:, self.config.seq_len:, :]  # (batch, pred_len, n_time_features)
    pred_time_emb = self.fusion_layer.time_proj(pred_time_features)  # (batch, pred_len, d_model)

    # 5. 解码生成
    pred_delta, stats = self.decoder(condition, target_balance, deterministic)

    # 6. 将增量转换为余额(使用残差连接)
    last_balance = historical_balance[:, -1, 0]  # (batch,)
    predicted_balance = last_balance.unsqueeze(1) + pred_delta * self.residual_scale

    # 7. 添加时间特征偏置(可选)
    time_bias = self._compute_time_bias(pred_time_emb)
    predicted_balance = predicted_balance + time_bias

    return predicted_balance, stats

def _compute_time_bias(self, time_emb: torch.Tensor) -> torch.Tensor:
    """计算时间特征偏置"""
    # 简单实现:线性投影到标量
    bias = torch.mean(time_emb, dim=-1)  # (batch, pred_len)
    return bias * 0.1  # 缩放因子

def sample(self, 
           historical_balance: torch.Tensor,
           transaction_features: torch.Tensor,
           static_features: torch.Tensor,
           time_features: torch.Tensor,
           n_samples: int = 10) -> torch.Tensor:
    """生成多个样本(蒙特卡洛模拟)"""
    self.eval()
    with torch.no_grad():
        # 编码条件
        encoded_seq = self.temporal_encoder(historical_balance, transaction_features)
        historical_time_features = time_features[:, :self.config.seq_len, :]
        fused_seq = self.fusion_layer(encoded_seq, static_features, historical_time_features)
        condition = fused_seq[:, -1, :]
        condition = self.condition_proj(condition)

        # 生成多个样本
        samples = []
        pred_time_features = time_features[:, self.config.seq_len:, :]
        pred_time_emb = self.fusion_layer.time_proj(pred_time_features)
        last_balance = historical_balance[:, -1, 0]

        for _ in range(n_samples):
            # 从先验分布采样
            prior_mu, prior_logvar = self.decoder.prior(condition)
            z = self.decoder.reparameterize(prior_mu, prior_logvar)
            pred_delta = self.decoder.decode(condition, z)
            pred_balance = last_balance.unsqueeze(1) + pred_delta * self.residual_scale
            time_bias = self._compute_time_bias(pred_time_emb)
            pred_balance = pred_balance + time_bias
            samples.append(pred_balance.unsqueeze(0))

        samples = torch.cat(samples, dim=0)  # (n_samples, batch, pred_len)

    return samples

==================== 损失函数 ====================

class MozartLoss(nn.Module):
"""Mozart模型损失函数"""
def init(self, beta: float = 1.0):
super().init()
self.beta = beta

def forward(self, 
            pred: torch.Tensor, 
            target: torch.Tensor,
            stats: Dict[str, torch.Tensor]) -> torch.Tensor:
    """
    pred: (batch, pred_len) - 预测值
    target: (batch, pred_len) - 真实值
    stats: 包含mu, logvar, prior_mu, prior_logvar的字典
    """
    # 预测损失(MSE + MAE混合)
    mse_loss = F.mse_loss(pred, target)
    mae_loss = F.l1_loss(pred, target)
    pred_loss = mse_loss + 0.5 * mae_loss

    # KL散度损失(如果存在后验分布)
    if 'mu' in stats:
        mu = stats['mu']
        logvar = stats['logvar']
        prior_mu = stats['prior_mu']
        prior_logvar = stats['prior_logvar']

        # 计算KL散度: KL(q(z|x) || p(z|c))
        kl_loss = -0.5 * torch.sum(
            1 + logvar - prior_logvar - 
            (torch.exp(logvar) + (mu - prior_mu).pow(2)) / torch.exp(prior_logvar),
            dim=-1
        ).mean()
    else:
        kl_loss = torch.tensor(0.0, device=pred.device)

    total_loss = pred_loss + self.beta * kl_loss

    return total_loss, {'pred_loss': pred_loss.item(), 'kl_loss': kl_loss.item()}

==================== 数据集类 ====================

class BalanceDataset(Dataset):
"""余额数据集"""
def init(self,
balance_series: np.ndarray,
transaction_features: np.ndarray,
static_features: np.ndarray,
time_features: np.ndarray,
seq_len: int,
pred_len: int):
"""
balance_series: (n_samples, total_len) - 余额序列
transaction_features: (n_samples, total_len, n_trans_features)
static_features: (n_samples, n_static_features)
time_features: (n_samples, total_len, n_time_features)
"""
self.seq_len = seq_len
self.pred_len = pred_len
self.total_len = balance_series.shape[1]

    self.samples = []
    for i in range(self.total_len - seq_len - pred_len + 1):
        hist_balance = balance_series[:, i:i+seq_len]
        target_balance = balance_series[:, i+seq_len:i+seq_len+pred_len]
        hist_trans = transaction_features[:, i:i+seq_len, :]
        target_time = time_features[:, i:i+seq_len+pred_len, :]

        self.samples.append({
            'hist_balance': hist_balance,
            'target_balance': target_balance,
            'hist_trans': hist_trans,
            'target_time': target_time,
            'static': static_features
        })

def __len__(self):
    return len(self.samples)

def __getitem__(self, idx):
    sample = self.samples[idx]
    return {
        'hist_balance': torch.FloatTensor(sample['hist_balance']).unsqueeze(-1),
        'target_balance': torch.FloatTensor(sample['target_balance']),
        'hist_trans': torch.FloatTensor(sample['hist_trans']),
        'target_time': torch.FloatTensor(sample['target_time']),
        'static': torch.FloatTensor(sample['static'])
    }

==================== 训练器 ====================

class MozartTrainer:
"""Mozart模型训练器"""
def init(self, model: MozartModel, config: MozartConfig):
self.model = model.to(config.device)
self.config = config
self.optimizer = torch.optim.AdamW(
model.parameters(),
lr=config.lr,
weight_decay=config.weight_decay
)
self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
self.optimizer,
T_max=config.epochs
)
self.criterion = MozartLoss(beta=config.beta)

    self.train_losses = []
    self.val_losses = []

def train_epoch(self, dataloader: DataLoader) -> float:
    """训练一个epoch"""
    self.model.train()
    total_loss = 0.0

    for batch in dataloader:
        hist_balance = batch['hist_balance'].to(self.config.device)
        target_balance = batch['target_balance'].to(self.config.device)
        hist_trans = batch['hist_trans'].to(self.config.device)
        target_time = batch['target_time'].to(self.config.device)
        static = batch['static'].to(self.config.device)

        # 前向传播(训练模式)
        pred, stats = self.model(
            hist_balance, hist_trans, static, target_time,
            target_balance, deterministic=False
        )

        # 计算损失
        loss, loss_dict = self.criterion(pred, target_balance, stats)

        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
        self.optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)

def validate(self, dataloader: DataLoader) -> Tuple[float, float]:
    """验证"""
    self.model.eval()
    total_loss = 0.0
    total_mape = 0.0

    with torch.no_grad():
        for batch in dataloader:
            hist_balance = batch['hist_balance'].to(self.config.device)
            target_balance = batch['target_balance'].to(self.config.device)
            hist_trans = batch['hist_trans'].to(self.config.device)
            target_time = batch['target_time'].to(self.config.device)
            static = batch['static'].to(self.config.device)

            # 确定性预测
            pred, stats = self.model(
                hist_balance, hist_trans, static, target_time,
                target_balance, deterministic=True
            )

            loss, _ = self.criterion(pred, target_balance, stats)
            total_loss += loss.item()

            # 计算MAPE
            mape = torch.mean(torch.abs((target_balance - pred) / (target_balance + 1e-8))) * 100
            total_mape += mape.item()

    return total_loss / len(dataloader), total_mape / len(dataloader)

def train(self, 
          train_dataloader: DataLoader, 
          val_dataloader: DataLoader,
          epochs: int = None):
    """完整训练流程"""
    epochs = epochs or self.config.epochs

    for epoch in range(epochs):
        train_loss = self.train_epoch(train_dataloader)
        val_loss, val_mape = self.validate(val_dataloader)
        self.scheduler.step()

        self.train_losses.append(train_loss)
        self.val_losses.append(val_loss)

        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | MAPE: {val_mape:.2f}%")

    return self.train_losses, self.val_losses

==================== 示例数据和训练 ====================

def generate_synthetic_data(n_samples: int = 1000,
seq_len: int = 60,
pred_len: int = 30,
n_trans_features: int = 16,
n_static_features: int = 32,
n_time_features: int = 8) -> Dict:
"""生成合成数据用于演示"""
total_len = seq_len + pred_len

# 生成余额序列(带趋势和季节性)
time = np.arange(total_len)
balance_series = np.zeros((n_samples, total_len))
for i in range(n_samples):
    trend = 1000 + 5 * time
    seasonality = 50 * np.sin(2 * np.pi * time / 30)
    noise = np.random.randn(total_len) * 20
    balance_series[i] = trend + seasonality + noise

# 生成交易特征
transaction_features = np.random.randn(n_samples, total_len, n_trans_features)

# 生成静态特征
static_features = np.random.randn(n_samples, n_static_features)

# 生成时间特征(星期几、月份等)
time_features = np.zeros((n_samples, total_len, n_time_features))
for i in range(total_len):
    time_features[:, i, 0] = np.sin(2 * np.pi * i / 7)  # 星期几正弦
    time_features[:, i, 1] = np.cos(2 * np.pi * i / 7)  # 星期几余弦
    time_features[:, i, 2] = np.sin(2 * np.pi * i / 30)  # 月内天数
    time_features[:, i, 3] = np.cos(2 * np.pi * i / 30)
    time_features[:, i, 4] = np.sin(2 * np.pi * i / 365)  # 年内天数
    time_features[:, i, 5] = np.cos(2 * np.pi * i / 365)
    time_features[:, i, 6] = i % 24 / 24  # 小时
    time_features[:, i, 7] = (i // 24) % 7 / 7  # 星期

return {
    'balance_series': balance_series,
    'transaction_features': transaction_features,
    'static_features': static_features,
    'time_features': time_features
}

def main():
"""主函数:演示模型训练和使用"""

# 配置
config = MozartConfig()

# 生成数据
print("生成合成数据...")
data = generate_synthetic_data(
    n_samples=2000,
    seq_len=config.seq_len,
    pred_len=config.pred_len,
    n_trans_features=config.n_transaction_features,
    n_static_features=config.n_static_features,
    n_time_features=config.n_time_features
)

# 划分训练集和验证集
n_train = int(0.8 * data['balance_series'].shape[0])
train_data = {k: v[:n_train] for k, v in data.items()}
val_data = {k: v[n_train:] for k, v in data.items()}

# 创建数据集
train_dataset = BalanceDataset(
    train_data['balance_series'],
    train_data['transaction_features'],
    train_data['static_features'],
    train_data['time_features'],
    config.seq_len,
    config.pred_len
)
val_dataset = BalanceDataset(
    val_data['balance_series'],
    val_data['transaction_features'],
    val_data['static_features'],
    val_data['time_features'],
    config.seq_len,
    config.pred_len
)

train_dataloader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=config.batch_size, shuffle=False)

# 创建模型和训练器
print("创建Mozart模型...")
model = MozartModel(config)
trainer = MozartTrainer(model, config)

# 训练
print("开始训练...")
train_losses, val_losses = trainer.train(train_dataloader, val_dataloader, epochs=20)

# 蒙特卡洛模拟示例
print("\n蒙特卡洛模拟示例...")
model.eval()
sample_batch = next(iter(val_dataloader))
hist_balance = sample_batch['hist_balance'][:1].to(config.device)
hist_trans = sample_batch['hist_trans'][:1].to(config.device)
target_time = sample_batch['target_time'][:1].to(config.device)
static = sample_batch['static'][:1].to(config.device)

# 生成多个样本
samples = model.sample(hist_balance, hist_trans, static, target_time, n_samples=100)

# 计算统计量
mean_pred = samples.mean(dim=0).cpu().numpy().squeeze()
std_pred = samples.std(dim=0).cpu().numpy().squeeze()

print(f"预测均值 (前5个时间步): {mean_pred[:5]}")
print(f"预测标准差 (前5个时间步): {std_pred[:5]}")
print("\n训练完成!")

if name == "main":
main()

相关文章
|
1天前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
10100 24
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
13天前
|
人工智能 安全 Linux
【OpenClaw保姆级图文教程】阿里云/本地部署集成模型Ollama/Qwen3.5/百炼 API 步骤流程及避坑指南
2026年,AI代理工具的部署逻辑已从“单一云端依赖”转向“云端+本地双轨模式”。OpenClaw(曾用名Clawdbot)作为开源AI代理框架,既支持对接阿里云百炼等云端免费API,也能通过Ollama部署本地大模型,完美解决两类核心需求:一是担心云端API泄露核心数据的隐私安全诉求;二是频繁调用导致token消耗过高的成本控制需求。
5831 14
|
21天前
|
人工智能 JavaScript Ubuntu
5分钟上手龙虾AI!OpenClaw部署(阿里云+本地)+ 免费多模型配置保姆级教程(MiniMax、Claude、阿里云百炼)
OpenClaw(昵称“龙虾AI”)作为2026年热门的开源个人AI助手,由PSPDFKit创始人Peter Steinberger开发,核心优势在于“真正执行任务”——不仅能聊天互动,还能自动处理邮件、管理日程、订机票、写代码等,且所有数据本地处理,隐私完全可控。它支持接入MiniMax、Claude、GPT等多类大模型,兼容微信、Telegram、飞书等主流聊天工具,搭配100+可扩展技能,成为兼顾实用性与隐私性的AI工具首选。
22771 119

热门文章

最新文章