银行卡余额模拟器,Cypher深度计算模型

简介: 基于Cypher图模型的深度余额模拟器,支持账户建模、多类型交易、风险指标计算与可视化分析

下载地址:http://lanzou.com.cn/i6926983a

image.png

import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional, Any, Union
import warnings
import json
import os
from enum import Enum
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict
import logging

配置日志

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("CypherBalanceSimulator")

================================================================================

第一部分:基础枚举与异常类

================================================================================

class TransactionType(Enum):
"""交易类型枚举"""
INCOME = "INCOME" # 收入
EXPENSE = "EXPENSE" # 支出
TRANSFER = "TRANSFER" # 转账
INVESTMENT = "INVESTMENT" # 投资
INTEREST = "INTEREST" # 利息
FEE = "FEE" # 手续费
TAX = "TAX" # 税费
DIVIDEND = "DIVIDEND" # 股息

class AccountType(Enum):
"""账户类型枚举"""
PERSONAL = "PERSONAL" # 个人账户
BUSINESS = "BUSINESS" # 企业账户
SAVINGS = "SAVINGS" # 储蓄账户
INVESTMENT = "INVESTMENT" # 投资账户
LOAN = "LOAN" # 贷款账户
CREDIT = "CREDIT" # 信用账户

class RiskLevel(Enum):
"""风险等级"""
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4

class SimulationError(Exception):
"""模拟器基础异常"""
pass

class GraphError(SimulationError):
"""图操作异常"""
pass

class TransactionError(SimulationError):
"""交易执行异常"""
pass

================================================================================

第二部分:图模型核心组件(Cypher风格)

================================================================================

class Node:
"""
图节点类 - 代表一个账户或实体
属性类似Cypher节点的属性字典
"""
def init(self, node_id: str, node_type: AccountType, properties: Dict[str, Any] = None):
self.id = node_id
self.type = node_type
self.properties = properties or {}
self.created_at = datetime.now()
self.updated_at = datetime.now()

def update_property(self, key: str, value: Any):
    """更新节点属性"""
    self.properties[key] = value
    self.updated_at = datetime.now()

def get_property(self, key: str, default: Any = None) -> Any:
    """获取节点属性"""
    return self.properties.get(key, default)

def to_dict(self) -> Dict:
    """转换为字典表示"""
    return {
        "id": self.id,
        "type": self.type.value,
        "properties": self.properties,
        "created_at": self.created_at.isoformat(),
        "updated_at": self.updated_at.isoformat()
    }

def __repr__(self):
    return f"Node(id={self.id}, type={self.type.value}, props={len(self.properties)})"

class Relationship:
"""
关系类 - 代表两个账户之间的关联(类似Cypher关系)
"""
def init(self, rel_id: str, source_id: str, target_id: str,
rel_type: str, properties: Dict[str, Any] = None):
self.id = rel_id
self.source = source_id
self.target = target_id
self.type = rel_type
self.properties = properties or {}
self.created_at = datetime.now()

def update_property(self, key: str, value: Any):
    """更新关系属性"""
    self.properties[key] = value

def to_dict(self) -> Dict:
    """转换为字典表示"""
    return {
        "id": self.id,
        "source": self.source,
        "target": self.target,
        "type": self.type,
        "properties": self.properties,
        "created_at": self.created_at.isoformat()
    }

def __repr__(self):
    return f"Relationship(id={self.id}, {self.source} -[{self.type}]-> {self.target})"

class CypherGraph:
"""
Cypher风格图数据库模拟器
管理节点和关系,支持模式匹配查询
"""
def init(self, name: str = "BalanceGraph"):
self.name = name
self.nodes: Dict[str, Node] = {}
self.relationships: Dict[str, Relationship] = {}
self._adjacency: Dict[str, List[str]] = defaultdict(list) # 源节点到关系ID的映射

def create_node(self, node_id: str, node_type: AccountType, properties: Dict = None) -> Node:
    """创建节点"""
    if node_id in self.nodes:
        raise GraphError(f"Node with id {node_id} already exists")
    node = Node(node_id, node_type, properties)
    self.nodes[node_id] = node
    logger.info(f"Created node: {node_id}")
    return node

def create_relationship(self, rel_id: str, source_id: str, target_id: str, 
                       rel_type: str, properties: Dict = None) -> Relationship:
    """创建关系"""
    if source_id not in self.nodes:
        raise GraphError(f"Source node {source_id} not found")
    if target_id not in self.nodes:
        raise GraphError(f"Target node {target_id} not found")
    if rel_id in self.relationships:
        raise GraphError(f"Relationship {rel_id} already exists")

    rel = Relationship(rel_id, source_id, target_id, rel_type, properties)
    self.relationships[rel_id] = rel
    self._adjacency[source_id].append(rel_id)
    logger.info(f"Created relationship: {rel}")
    return rel

def match_nodes(self, filters: Dict[str, Any] = None) -> List[Node]:
    """
    匹配节点(类似MATCH子句)
    filters: 属性过滤条件,如 {"type": AccountType.PERSONAL, "balance": lambda x: x > 1000}
    """
    results = []
    for node in self.nodes.values():
        match = True
        if filters:
            for key, condition in filters.items():
                if key == "type":
                    if node.type != condition:
                        match = False
                        break
                elif key.startswith("prop_"):
                    prop_key = key[5:]
                    prop_value = node.get_property(prop_key)
                    if callable(condition):
                        if not condition(prop_value):
                            match = False
                            break
                    else:
                        if prop_value != condition:
                            match = False
                            break
                else:
                    # 直接属性匹配
                    if getattr(node, key, None) != condition:
                        match = False
                        break
        if match:
            results.append(node)
    return results

def match_relationships(self, source_id: str = None, target_id: str = None,
                       rel_type: str = None) -> List[Relationship]:
    """匹配关系"""
    results = []
    for rel in self.relationships.values():
        if source_id and rel.source != source_id:
            continue
        if target_id and rel.target != target_id:
            continue
        if rel_type and rel.type != rel_type:
            continue
        results.append(rel)
    return results

def get_neighbors(self, node_id: str, rel_type: str = None) -> List[Node]:
    """获取邻居节点"""
    neighbors = []
    for rel_id in self._adjacency.get(node_id, []):
        rel = self.relationships[rel_id]
        if rel_type and rel.type != rel_type:
            continue
        neighbor_id = rel.target
        if neighbor_id in self.nodes:
            neighbors.append(self.nodes[neighbor_id])
    return neighbors

def to_dict(self) -> Dict:
    """导出整个图"""
    return {
        "name": self.name,
        "nodes": [node.to_dict() for node in self.nodes.values()],
        "relationships": [rel.to_dict() for rel in self.relationships.values()]
    }

def save_to_file(self, filepath: str):
    """保存图到JSON文件"""
    with open(filepath, 'w') as f:
        json.dump(self.to_dict(), f, indent=2)
    logger.info(f"Graph saved to {filepath}")

def load_from_file(self, filepath: str):
    """从JSON文件加载图"""
    with open(filepath, 'r') as f:
        data = json.load(f)
    # 清空现有数据
    self.nodes.clear()
    self.relationships.clear()
    self._adjacency.clear()

    # 重建节点
    for node_data in data["nodes"]:
        node_type = AccountType(node_data["type"])
        node = Node(node_data["id"], node_type, node_data["properties"])
        node.created_at = datetime.fromisoformat(node_data["created_at"])
        node.updated_at = datetime.fromisoformat(node_data["updated_at"])
        self.nodes[node.id] = node

    # 重建关系
    for rel_data in data["relationships"]:
        rel = Relationship(rel_data["id"], rel_data["source"], rel_data["target"],
                          rel_data["type"], rel_data["properties"])
        rel.created_at = datetime.fromisoformat(rel_data["created_at"])
        self.relationships[rel.id] = rel
        self._adjacency[rel.source].append(rel.id)
    logger.info(f"Graph loaded from {filepath}")

================================================================================

第三部分:交易与余额计算引擎

================================================================================

class Transaction:
"""单笔交易记录"""
def init(self, tx_id: str, from_node_id: str, to_node_id: str,
amount: float, tx_type: TransactionType, timestamp: datetime = None,
description: str = "", metadata: Dict = None):
self.id = tx_id
self.from_node = from_node_id
self.to_node = to_node_id
self.amount = abs(amount)
self.type = tx_type
self.timestamp = timestamp or datetime.now()
self.description = description
self.metadata = metadata or {}

def to_dict(self) -> Dict:
    return {
        "id": self.id,
        "from": self.from_node,
        "to": self.to_node,
        "amount": self.amount,
        "type": self.type.value,
        "timestamp": self.timestamp.isoformat(),
        "description": self.description,
        "metadata": self.metadata
    }

def __repr__(self):
    return f"Tx({self.id}, {self.from_node}->{self.to_node}, {self.amount} {self.type.value})"

class BalanceSheet:
"""账户资产负债表"""
def init(self, node_id: str):
self.node_id = node_id
self.balance: float = 0.0
self.transaction_history: List[Transaction] = []
self.daily_snapshots: Dict[str, float] = {} # 日期 -> 余额快照

def apply_transaction(self, tx: Transaction, is_inflow: bool) -> float:
    """应用交易,返回更新后的余额"""
    if is_inflow:
        self.balance += tx.amount
    else:
        if self.balance < tx.amount:
            logger.warning(f"Insufficient balance in {self.node_id}: {self.balance} < {tx.amount}")
            # 可选:允许透支,这里选择警告但继续
        self.balance -= tx.amount
    self.transaction_history.append(tx)
    return self.balance

def take_snapshot(self, date: str = None):
    """记录余额快照"""
    if date is None:
        date = datetime.now().strftime("%Y-%m-%d")
    self.daily_snapshots[date] = self.balance

def get_balance_at_date(self, date: str) -> float:
    """获取指定日期的余额"""
    return self.daily_snapshots.get(date, self.balance)

def to_series(self) -> pd.Series:
    """转换为时间序列"""
    dates = sorted(self.daily_snapshots.keys())
    balances = [self.daily_snapshots[d] for d in dates]
    return pd.Series(balances, index=pd.to_datetime(dates))

class TransactionValidator:
"""交易验证器"""
@staticmethod
def validate_transaction(tx: Transaction, graph: CypherGraph) -> Tuple[bool, str]:
"""验证交易合法性"""

    # 检查节点存在
    if tx.from_node not in graph.nodes:
        return False, f"Source node {tx.from_node} not found"
    if tx.to_node not in graph.nodes:
        return False, f"Target node {tx.to_node} not found"

    # 金额必须为正数
    if tx.amount <= 0:
        return False, f"Amount must be positive, got {tx.amount}"

    # 根据交易类型附加验证
    if tx.type == TransactionType.TRANSFER:
        if tx.from_node == tx.to_node:
            return False, "Cannot transfer to self"

    # 检查关系是否存在(如果定义了关系约束)
    relationships = graph.match_relationships(source_id=tx.from_node, target_id=tx.to_node)
    if not relationships and tx.type not in [TransactionType.INCOME, TransactionType.EXPENSE]:
        logger.info(f"No explicit relationship between {tx.from_node} and {tx.to_node}, but proceeding")

    return True, "Valid"

class DeepBalanceCalculator:
"""
深度余额计算器
实现多种复杂的余额计算算法
"""
def init(self, graph: CypherGraph):
self.graph = graph
self.balance_sheets: Dict[str, BalanceSheet] = {}
self._initialize_balance_sheets()

def _initialize_balance_sheets(self):
    """为每个节点初始化资产负债表"""
    for node_id in self.graph.nodes:
        self.balance_sheets[node_id] = BalanceSheet(node_id)

def execute_transaction(self, tx: Transaction) -> Dict[str, float]:
    """
    执行交易,返回受影响节点的余额变化
    """
    # 验证
    valid, msg = TransactionValidator.validate_transaction(tx, self.graph)
    if not valid:
        raise TransactionError(f"Invalid transaction: {msg}")

    # 确定资金流向
    # 对于INCOME: 资金流入to_node
    # 对于EXPENSE: 资金流出from_node
    # 对于TRANSFER: from_node -> to_node
    affected_balances = {}

    if tx.type == TransactionType.INCOME:
        # 收入:资金进入to_node
        sheet_to = self.balance_sheets[tx.to_node]
        new_balance = sheet_to.apply_transaction(tx, is_inflow=True)
        affected_balances[tx.to_node] = new_balance

    elif tx.type == TransactionType.EXPENSE:
        # 支出:资金从from_node流出
        sheet_from = self.balance_sheets[tx.from_node]
        new_balance = sheet_from.apply_transaction(tx, is_inflow=False)
        affected_balances[tx.from_node] = new_balance

    elif tx.type in [TransactionType.TRANSFER, TransactionType.INVESTMENT]:
        # 转账/投资:同时影响两个节点
        sheet_from = self.balance_sheets[tx.from_node]
        sheet_to = self.balance_sheets[tx.to_node]
        new_balance_from = sheet_from.apply_transaction(tx, is_inflow=False)
        new_balance_to = sheet_to.apply_transaction(tx, is_inflow=True)
        affected_balances[tx.from_node] = new_balance_from
        affected_balances[tx.to_node] = new_balance_to

    elif tx.type == TransactionType.INTEREST:
        # 利息:仅影响to_node(收益)
        sheet_to = self.balance_sheets[tx.to_node]
        new_balance = sheet_to.apply_transaction(tx, is_inflow=True)
        affected_balances[tx.to_node] = new_balance

    elif tx.type == TransactionType.FEE:
        # 手续费:从from_node扣除
        sheet_from = self.balance_sheets[tx.from_node]
        new_balance = sheet_from.apply_transaction(tx, is_inflow=False)
        affected_balances[tx.from_node] = new_balance

    else:
        # 默认处理:视为转账
        sheet_from = self.balance_sheets[tx.from_node]
        sheet_to = self.balance_sheets[tx.to_node]
        new_balance_from = sheet_from.apply_transaction(tx, is_inflow=False)
        new_balance_to = sheet_to.apply_transaction(tx, is_inflow=True)
        affected_balances[tx.from_node] = new_balance_from
        affected_balances[tx.to_node] = new_balance_to

    logger.info(f"Executed {tx.type.value}: {tx.amount} from {tx.from_node} to {tx.to_node}")
    return affected_balances

def calculate_net_worth(self, node_ids: List[str] = None) -> float:
    """计算净财富(指定节点或全部)"""
    if node_ids is None:
        node_ids = list(self.balance_sheets.keys())
    total = sum(self.balance_sheets[nid].balance for nid in node_ids if nid in self.balance_sheets)
    return total

def run_simulation(self, transactions: List[Transaction], 
                   snapshot_interval: int = 10) -> pd.DataFrame:
    """
    运行批量交易模拟
    transactions: 交易列表
    snapshot_interval: 每N笔交易记录一次快照
    """
    results = []
    for idx, tx in enumerate(transactions):
        try:
            self.execute_transaction(tx)

            # 定期快照
            if idx % snapshot_interval == 0 or idx == len(transactions) - 1:
                snapshot = {
                    "step": idx,
                    "timestamp": tx.timestamp,
                    "transactions": idx + 1,
                }
                for node_id, sheet in self.balance_sheets.items():
                    snapshot[f"balance_{node_id}"] = sheet.balance
                snapshot["net_worth"] = self.calculate_net_worth()
                results.append(snapshot)

        except Exception as e:
            logger.error(f"Transaction {tx.id} failed: {e}")
            raise

    return pd.DataFrame(results)

def get_balance_series(self, node_id: str) -> pd.Series:
    """获取指定账户的余额时间序列"""
    if node_id not in self.balance_sheets:
        raise SimulationError(f"Node {node_id} not found")
    return self.balance_sheets[node_id].to_series()

def compute_risk_metrics(self, node_id: str, lookback_days: int = 30) -> Dict[str, float]:
    """
    计算风险指标
    - 波动率
    - 最大回撤
    - 夏普比率(假设无风险利率为0)
    - 在险价值(VaR)
    """
    series = self.get_balance_series(node_id)
    if len(series) < 2:
        return {}

    # 收益率序列
    returns = series.pct_change().dropna()

    # 波动率(年化,假设日数据)
    volatility = returns.std() * np.sqrt(252)

    # 最大回撤
    cumulative = series
    running_max = cumulative.expanding().max()
    drawdown = (cumulative - running_max) / running_max
    max_drawdown = drawdown.min()

    # 夏普比率
    sharpe = returns.mean() / returns.std() * np.sqrt(252) if returns.std() > 0 else 0

    # 简单VaR (95%置信度)
    var_95 = np.percentile(returns, 5)

    return {
        "volatility": volatility,
        "max_drawdown": max_drawdown,
        "sharpe_ratio": sharpe,
        "var_95": var_95,
        "current_balance": series.iloc[-1],
        "avg_balance": series.mean()
    }

================================================================================

第四部分:高级模拟器与策略模式

================================================================================

class TransactionStrategy:
"""交易策略基类"""
def generate_transactions(self, step: int, context: Dict) -> List[Transaction]:
"""生成交易列表"""
raise NotImplementedError

class RandomWalkStrategy(TransactionStrategy):
"""随机游走策略 - 生成随机交易"""
def init(self, graph: CypherGraph, avg_amount: float = 100,
volatility: float = 0.3, tx_types: List[TransactionType] = None):
self.graph = graph
self.avg_amount = avg_amount
self.volatility = volatility
self.tx_types = tx_types or [TransactionType.INCOME, TransactionType.EXPENSE, TransactionType.TRANSFER]
self.node_ids = list(graph.nodes.keys())

def generate_transactions(self, step: int, context: Dict) -> List[Transaction]:
    # 随机选择交易类型
    tx_type = np.random.choice(self.tx_types)
    # 随机选择节点
    from_node = np.random.choice(self.node_ids)
    to_node = np.random.choice(self.node_ids)
    # 随机金额(对数正态分布)
    amount = np.random.lognormal(np.log(self.avg_amount), self.volatility)

    tx = Transaction(
        tx_id=f"sim_{step}_{datetime.now().timestamp()}",
        from_node_id=from_node,
        to_node_id=to_node,
        amount=amount,
        tx_type=tx_type,
        description=f"Random walk step {step}"
    )
    return [tx]

class CyclicalCashFlowStrategy(TransactionStrategy):
"""周期性现金流策略 - 模拟工资、房租等周期交易"""
def init(self, graph: CypherGraph, salary_node: str, expense_node: str,
salary_amount: float, rent_amount: float, salary_day: int = 25,
rent_day: int = 1):
self.graph = graph
self.salary_node = salary_node
self.expense_node = expense_node
self.salary_amount = salary_amount
self.rent_amount = rent_amount
self.salary_day = salary_day
self.rent_day = rent_day

def generate_transactions(self, step: int, context: Dict) -> List[Transaction]:
    current_date = context.get("current_date", datetime.now())
    day = current_date.day
    transactions = []

    # 工资日
    if day == self.salary_day:
        tx = Transaction(
            tx_id=f"salary_{step}_{current_date.strftime('%Y%m%d')}",
            from_node_id="system",
            to_node_id=self.salary_node,
            amount=self.salary_amount,
            tx_type=TransactionType.INCOME,
            description="Monthly salary"
        )
        transactions.append(tx)

    # 房租日
    if day == self.rent_day and self.expense_node != self.salary_node:
        tx = Transaction(
            tx_id=f"rent_{step}_{current_date.strftime('%Y%m%d')}",
            from_node_id=self.salary_node,
            to_node_id=self.expense_node,
            amount=self.rent_amount,
            tx_type=TransactionType.EXPENSE,
            description="Monthly rent"
        )
        transactions.append(tx)

    return transactions

class DeepBalanceSimulator:
"""
深度余额模拟器主类
整合图模型、交易引擎和策略
"""
def init(self, graph: CypherGraph):
self.graph = graph
self.calculator = DeepBalanceCalculator(graph)
self.strategies: List[TransactionStrategy] = []
self.simulation_history: List[Dict] = []
self.current_step = 0

def add_strategy(self, strategy: TransactionStrategy):
    """添加交易生成策略"""
    self.strategies.append(strategy)

def simulate_days(self, days: int, start_date: datetime = None) -> pd.DataFrame:
    """
    按天模拟
    """
    if start_date is None:
        start_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)

    all_transactions = []
    daily_results = []

    for day_offset in range(days):
        current_date = start_date + timedelta(days=day_offset)
        context = {"current_date": current_date, "step": self.current_step}

        # 收集所有策略生成的交易
        day_transactions = []
        for strategy in self.strategies:
            txs = strategy.generate_transactions(self.current_step, context)
            day_transactions.extend(txs)

        # 执行当天交易
        for tx in day_transactions:
            try:
                self.calculator.execute_transaction(tx)
                all_transactions.append(tx)
            except Exception as e:
                logger.error(f"Transaction failed on {current_date}: {e}")
                continue

        # 记录每日快照
        snapshot = {
            "date": current_date.strftime("%Y-%m-%d"),
            "day": day_offset,
            "transactions_count": len(day_transactions)
        }
        for node_id in self.graph.nodes:
            snapshot[f"balance_{node_id}"] = self.calculator.balance_sheets[node_id].balance
        snapshot["net_worth"] = self.calculator.calculate_net_worth()
        daily_results.append(snapshot)

        # 为每个账户记录日快照
        for node_id in self.graph.nodes:
            self.calculator.balance_sheets[node_id].take_snapshot(current_date.strftime("%Y-%m-%d"))

        self.current_step += 1

    self.simulation_history = daily_results
    return pd.DataFrame(daily_results)

def get_balance_dataframe(self) -> pd.DataFrame:
    """获取所有账户余额的时间序列DataFrame"""
    data = {}
    for node_id, sheet in self.calculator.balance_sheets.items():
        series = sheet.to_series()
        data[node_id] = series
    return pd.DataFrame(data)

def generate_report(self) -> Dict:
    """生成综合报告"""
    report = {
        "simulation_steps": self.current_step,
        "graph_summary": {
            "nodes": len(self.graph.nodes),
            "relationships": len(self.graph.relationships)
        },
        "accounts": {}
    }

    for node_id, sheet in self.calculator.balance_sheets.items():
        metrics = self.calculator.compute_risk_metrics(node_id)
        report["accounts"][node_id] = {
            "final_balance": sheet.balance,
            "total_transactions": len(sheet.transaction_history),
            "risk_metrics": metrics
        }

    report["net_worth"] = self.calculator.calculate_net_worth()
    return report

def plot_balance_timeline(self, figsize=(12, 6)):
    """绘制余额时间线"""
    df = self.get_balance_dataframe()
    if df.empty:
        logger.warning("No data to plot")
        return

    plt.figure(figsize=figsize)
    for col in df.columns:
        plt.plot(df.index, df[col], label=col, marker='o', markersize=3)
    plt.xlabel("Date")
    plt.ylabel("Balance")
    plt.title("Account Balance Timeline")
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

def plot_net_worth(self, figsize=(10, 5)):
    """绘制净财富变化"""
    if not self.simulation_history:
        logger.warning("No simulation history")
        return

    df = pd.DataFrame(self.simulation_history)
    plt.figure(figsize=figsize)
    plt.plot(range(len(df)), df["net_worth"], marker='o', linestyle='-', linewidth=2)
    plt.xlabel("Day")
    plt.ylabel("Net Worth")
    plt.title("Net Worth Evolution")
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

def export_transactions(self, filepath: str):
    """导出所有交易记录"""
    all_txs = []
    for sheet in self.calculator.balance_sheets.values():
        for tx in sheet.transaction_history:
            all_txs.append(tx.to_dict())

    with open(filepath, 'w') as f:
        json.dump(all_txs, f, indent=2, default=str)
    logger.info(f"Exported {len(all_txs)} transactions to {filepath}")

================================================================================

第五部分:示例与演示

================================================================================

def create_demo_graph() -> CypherGraph:
"""创建一个演示用图"""
graph = CypherGraph(name="PersonalFinanceGraph")

# 创建节点
graph.create_node("alice", AccountType.PERSONAL, {"name": "Alice", "initial_balance": 5000})
graph.create_node("bob", AccountType.PERSONAL, {"name": "Bob", "initial_balance": 3000})
graph.create_node("business", AccountType.BUSINESS, {"name": "Alice's Business", "initial_balance": 10000})
graph.create_node("savings", AccountType.SAVINGS, {"name": "High Yield Savings", "interest_rate": 0.045})
graph.create_node("investment", AccountType.INVESTMENT, {"name": "Stock Portfolio", "risk": "moderate"})
graph.create_node("system", AccountType.PERSONAL, {"name": "System Account", "is_system": True})

# 创建关系
graph.create_relationship("rel_1", "alice", "business", "OWNS", {"share": 1.0})
graph.create_relationship("rel_2", "alice", "savings", "HAS_ACCOUNT", {"opened": "2023-01-01"})
graph.create_relationship("rel_3", "alice", "investment", "INVESTS_IN", {"allocation": 0.3})
graph.create_relationship("rel_4", "bob", "alice", "OWES", {"amount": 500})
graph.create_relationship("rel_5", "business", "alice", "PAYS", {"type": "salary"})

return graph

def initialize_balances(graph: CypherGraph, calculator: DeepBalanceCalculator):
"""初始化账户余额(从节点属性中读取initial_balance)"""
for node_id, node in graph.nodes.items():
initial = node.get_property("initial_balance", 0.0)
if initial > 0:

        # 创建一个初始化的收入交易
        init_tx = Transaction(
            tx_id=f"init_{node_id}",
            from_node_id="system",
            to_node_id=node_id,
            amount=initial,
            tx_type=TransactionType.INCOME,
            description="Initial balance"
        )
        calculator.execute_transaction(init_tx)
        calculator.balance_sheets[node_id].take_snapshot()

def run_demo_simulation():
"""运行完整演示"""
print("=" 80)
print("Cypher深度余额模拟器 - 演示运行")
print("="
80)

# 1. 创建图
graph = create_demo_graph()
print(f"\n[1] 图已创建: {graph.name}")
print(f"    节点数: {len(graph.nodes)}")
print(f"    关系数: {len(graph.relationships)}")

# 2. 创建模拟器
simulator = DeepBalanceSimulator(graph)

# 3. 初始化余额
initialize_balances(graph, simulator.calculator)
print("\n[2] 初始余额已设置:")
for node_id, sheet in simulator.calculator.balance_sheets.items():
    print(f"    {node_id}: ${sheet.balance:,.2f}")

# 4. 添加交易策略
# 随机策略
random_strategy = RandomWalkStrategy(
    graph, avg_amount=200, volatility=0.4,
    tx_types=[TransactionType.EXPENSE, TransactionType.TRANSFER]
)
simulator.add_strategy(random_strategy)

# 周期性现金流策略
cashflow_strategy = CyclicalCashFlowStrategy(
    graph, salary_node="alice", expense_node="bob",
    salary_amount=3000, rent_amount=1200,
    salary_day=25, rent_day=1
)
simulator.add_strategy(cashflow_strategy)

print("\n[3] 交易策略已添加:")
print("    - RandomWalkStrategy (随机支出/转账)")
print("    - CyclicalCashFlowStrategy (月薪/房租)")

# 5. 运行模拟
print("\n[4] 开始模拟60天...")
result_df = simulator.simulate_days(days=60, start_date=datetime(2024, 1, 1))

# 6. 输出结果
print("\n[5] 模拟完成!最终余额:")
final_balances = result_df.iloc[-1]
for col in final_balances.index:
    if col.startswith("balance_"):
        node = col.replace("balance_", "")
        print(f"    {node}: ${final_balances[col]:,.2f}")
print(f"\n    最终净财富: ${result_df.iloc[-1]['net_worth']:,.2f}")

# 7. 生成报告
print("\n[6] 风险指标报告:")
report = simulator.generate_report()
for account, data in report["accounts"].items():
    print(f"\n    账户: {account}")
    print(f"      最终余额: ${data['final_balance']:,.2f}")
    print(f"      交易次数: {data['total_transactions']}")
    if data['risk_metrics']:
        metrics = data['risk_metrics']
        print(f"      波动率(年化): {metrics.get('volatility', 0):.2%}")
        print(f"      最大回撤: {metrics.get('max_drawdown', 0):.2%}")
        print(f"      夏普比率: {metrics.get('sharpe_ratio', 0):.2f}")

# 8. 可视化
print("\n[7] 生成可视化图表...")
simulator.plot_balance_timeline()
simulator.plot_net_worth()

# 9. 导出数据
simulator.export_transactions("demo_transactions.json")
graph.save_to_file("demo_graph.json")
print("\n[8] 数据已导出: demo_transactions.json, demo_graph.json")

return simulator

================================================================================

第六部分:高级分析模块

================================================================================

class CypherAnalytics:
"""
基于Cypher图模型的深度分析工具
"""
def init(self, simulator: DeepBalanceSimulator):
self.simulator = simulator
self.graph = simulator.graph

def analyze_flow_patterns(self) -> pd.DataFrame:
    """分析资金流动模式"""
    flow_data = []
    for sheet in self.simulator.calculator.balance_sheets.values():
        for tx in sheet.transaction_history:
            flow_data.append({
                "from": tx.from_node,
                "to": tx.to_node,
                "amount": tx.amount,
                "type": tx.type.value,
                "date": tx.timestamp
            })
    return pd.DataFrame(flow_data)

def detect_anomalies(self, threshold: float = 3.0) -> List[Transaction]:
相关文章
|
2天前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
10264 35
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
14天前
|
人工智能 安全 Linux
【OpenClaw保姆级图文教程】阿里云/本地部署集成模型Ollama/Qwen3.5/百炼 API 步骤流程及避坑指南
2026年,AI代理工具的部署逻辑已从“单一云端依赖”转向“云端+本地双轨模式”。OpenClaw(曾用名Clawdbot)作为开源AI代理框架,既支持对接阿里云百炼等云端免费API,也能通过Ollama部署本地大模型,完美解决两类核心需求:一是担心云端API泄露核心数据的隐私安全诉求;二是频繁调用导致token消耗过高的成本控制需求。
5948 14
|
22天前
|
人工智能 JavaScript Ubuntu
5分钟上手龙虾AI!OpenClaw部署(阿里云+本地)+ 免费多模型配置保姆级教程(MiniMax、Claude、阿里云百炼)
OpenClaw(昵称“龙虾AI”)作为2026年热门的开源个人AI助手,由PSPDFKit创始人Peter Steinberger开发,核心优势在于“真正执行任务”——不仅能聊天互动,还能自动处理邮件、管理日程、订机票、写代码等,且所有数据本地处理,隐私完全可控。它支持接入MiniMax、Claude、GPT等多类大模型,兼容微信、Telegram、飞书等主流聊天工具,搭配100+可扩展技能,成为兼顾实用性与隐私性的AI工具首选。
23235 120
|
8天前
|
人工智能 JavaScript API
解放双手!OpenClaw Agent Browser全攻略(阿里云+本地部署+免费API+网页自动化场景落地)
“让AI聊聊天、写代码不难,难的是让它自己打开网页、填表单、查数据”——2026年,无数OpenClaw用户被这个痛点困扰。参考文章直击核心:当AI只能“纸上谈兵”,无法实际操控浏览器,就永远成不了真正的“数字员工”。而Agent Browser技能的出现,彻底打破了这一壁垒——它给OpenClaw装上“上网的手和眼睛”,让AI能像真人一样打开网页、点击按钮、填写表单、提取数据,24小时不间断完成网页自动化任务。
1965 4