下载地址:http://lanzou.com.cn/i6926983a

import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional, Any, Union
import warnings
import json
import os
from enum import Enum
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict
import logging
配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("CypherBalanceSimulator")
================================================================================
第一部分:基础枚举与异常类
================================================================================
class TransactionType(Enum):
"""交易类型枚举"""
INCOME = "INCOME" # 收入
EXPENSE = "EXPENSE" # 支出
TRANSFER = "TRANSFER" # 转账
INVESTMENT = "INVESTMENT" # 投资
INTEREST = "INTEREST" # 利息
FEE = "FEE" # 手续费
TAX = "TAX" # 税费
DIVIDEND = "DIVIDEND" # 股息
class AccountType(Enum):
"""账户类型枚举"""
PERSONAL = "PERSONAL" # 个人账户
BUSINESS = "BUSINESS" # 企业账户
SAVINGS = "SAVINGS" # 储蓄账户
INVESTMENT = "INVESTMENT" # 投资账户
LOAN = "LOAN" # 贷款账户
CREDIT = "CREDIT" # 信用账户
class RiskLevel(Enum):
"""风险等级"""
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
class SimulationError(Exception):
"""模拟器基础异常"""
pass
class GraphError(SimulationError):
"""图操作异常"""
pass
class TransactionError(SimulationError):
"""交易执行异常"""
pass
================================================================================
第二部分:图模型核心组件(Cypher风格)
================================================================================
class Node:
"""
图节点类 - 代表一个账户或实体
属性类似Cypher节点的属性字典
"""
def init(self, node_id: str, node_type: AccountType, properties: Dict[str, Any] = None):
self.id = node_id
self.type = node_type
self.properties = properties or {}
self.created_at = datetime.now()
self.updated_at = datetime.now()
def update_property(self, key: str, value: Any):
"""更新节点属性"""
self.properties[key] = value
self.updated_at = datetime.now()
def get_property(self, key: str, default: Any = None) -> Any:
"""获取节点属性"""
return self.properties.get(key, default)
def to_dict(self) -> Dict:
"""转换为字典表示"""
return {
"id": self.id,
"type": self.type.value,
"properties": self.properties,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}
def __repr__(self):
return f"Node(id={self.id}, type={self.type.value}, props={len(self.properties)})"
class Relationship:
"""
关系类 - 代表两个账户之间的关联(类似Cypher关系)
"""
def init(self, rel_id: str, source_id: str, target_id: str,
rel_type: str, properties: Dict[str, Any] = None):
self.id = rel_id
self.source = source_id
self.target = target_id
self.type = rel_type
self.properties = properties or {}
self.created_at = datetime.now()
def update_property(self, key: str, value: Any):
"""更新关系属性"""
self.properties[key] = value
def to_dict(self) -> Dict:
"""转换为字典表示"""
return {
"id": self.id,
"source": self.source,
"target": self.target,
"type": self.type,
"properties": self.properties,
"created_at": self.created_at.isoformat()
}
def __repr__(self):
return f"Relationship(id={self.id}, {self.source} -[{self.type}]-> {self.target})"
class CypherGraph:
"""
Cypher风格图数据库模拟器
管理节点和关系,支持模式匹配查询
"""
def init(self, name: str = "BalanceGraph"):
self.name = name
self.nodes: Dict[str, Node] = {}
self.relationships: Dict[str, Relationship] = {}
self._adjacency: Dict[str, List[str]] = defaultdict(list) # 源节点到关系ID的映射
def create_node(self, node_id: str, node_type: AccountType, properties: Dict = None) -> Node:
"""创建节点"""
if node_id in self.nodes:
raise GraphError(f"Node with id {node_id} already exists")
node = Node(node_id, node_type, properties)
self.nodes[node_id] = node
logger.info(f"Created node: {node_id}")
return node
def create_relationship(self, rel_id: str, source_id: str, target_id: str,
rel_type: str, properties: Dict = None) -> Relationship:
"""创建关系"""
if source_id not in self.nodes:
raise GraphError(f"Source node {source_id} not found")
if target_id not in self.nodes:
raise GraphError(f"Target node {target_id} not found")
if rel_id in self.relationships:
raise GraphError(f"Relationship {rel_id} already exists")
rel = Relationship(rel_id, source_id, target_id, rel_type, properties)
self.relationships[rel_id] = rel
self._adjacency[source_id].append(rel_id)
logger.info(f"Created relationship: {rel}")
return rel
def match_nodes(self, filters: Dict[str, Any] = None) -> List[Node]:
"""
匹配节点(类似MATCH子句)
filters: 属性过滤条件,如 {"type": AccountType.PERSONAL, "balance": lambda x: x > 1000}
"""
results = []
for node in self.nodes.values():
match = True
if filters:
for key, condition in filters.items():
if key == "type":
if node.type != condition:
match = False
break
elif key.startswith("prop_"):
prop_key = key[5:]
prop_value = node.get_property(prop_key)
if callable(condition):
if not condition(prop_value):
match = False
break
else:
if prop_value != condition:
match = False
break
else:
# 直接属性匹配
if getattr(node, key, None) != condition:
match = False
break
if match:
results.append(node)
return results
def match_relationships(self, source_id: str = None, target_id: str = None,
rel_type: str = None) -> List[Relationship]:
"""匹配关系"""
results = []
for rel in self.relationships.values():
if source_id and rel.source != source_id:
continue
if target_id and rel.target != target_id:
continue
if rel_type and rel.type != rel_type:
continue
results.append(rel)
return results
def get_neighbors(self, node_id: str, rel_type: str = None) -> List[Node]:
"""获取邻居节点"""
neighbors = []
for rel_id in self._adjacency.get(node_id, []):
rel = self.relationships[rel_id]
if rel_type and rel.type != rel_type:
continue
neighbor_id = rel.target
if neighbor_id in self.nodes:
neighbors.append(self.nodes[neighbor_id])
return neighbors
def to_dict(self) -> Dict:
"""导出整个图"""
return {
"name": self.name,
"nodes": [node.to_dict() for node in self.nodes.values()],
"relationships": [rel.to_dict() for rel in self.relationships.values()]
}
def save_to_file(self, filepath: str):
"""保存图到JSON文件"""
with open(filepath, 'w') as f:
json.dump(self.to_dict(), f, indent=2)
logger.info(f"Graph saved to {filepath}")
def load_from_file(self, filepath: str):
"""从JSON文件加载图"""
with open(filepath, 'r') as f:
data = json.load(f)
# 清空现有数据
self.nodes.clear()
self.relationships.clear()
self._adjacency.clear()
# 重建节点
for node_data in data["nodes"]:
node_type = AccountType(node_data["type"])
node = Node(node_data["id"], node_type, node_data["properties"])
node.created_at = datetime.fromisoformat(node_data["created_at"])
node.updated_at = datetime.fromisoformat(node_data["updated_at"])
self.nodes[node.id] = node
# 重建关系
for rel_data in data["relationships"]:
rel = Relationship(rel_data["id"], rel_data["source"], rel_data["target"],
rel_data["type"], rel_data["properties"])
rel.created_at = datetime.fromisoformat(rel_data["created_at"])
self.relationships[rel.id] = rel
self._adjacency[rel.source].append(rel.id)
logger.info(f"Graph loaded from {filepath}")
================================================================================
第三部分:交易与余额计算引擎
================================================================================
class Transaction:
"""单笔交易记录"""
def init(self, tx_id: str, from_node_id: str, to_node_id: str,
amount: float, tx_type: TransactionType, timestamp: datetime = None,
description: str = "", metadata: Dict = None):
self.id = tx_id
self.from_node = from_node_id
self.to_node = to_node_id
self.amount = abs(amount)
self.type = tx_type
self.timestamp = timestamp or datetime.now()
self.description = description
self.metadata = metadata or {}
def to_dict(self) -> Dict:
return {
"id": self.id,
"from": self.from_node,
"to": self.to_node,
"amount": self.amount,
"type": self.type.value,
"timestamp": self.timestamp.isoformat(),
"description": self.description,
"metadata": self.metadata
}
def __repr__(self):
return f"Tx({self.id}, {self.from_node}->{self.to_node}, {self.amount} {self.type.value})"
class BalanceSheet:
"""账户资产负债表"""
def init(self, node_id: str):
self.node_id = node_id
self.balance: float = 0.0
self.transaction_history: List[Transaction] = []
self.daily_snapshots: Dict[str, float] = {} # 日期 -> 余额快照
def apply_transaction(self, tx: Transaction, is_inflow: bool) -> float:
"""应用交易,返回更新后的余额"""
if is_inflow:
self.balance += tx.amount
else:
if self.balance < tx.amount:
logger.warning(f"Insufficient balance in {self.node_id}: {self.balance} < {tx.amount}")
# 可选:允许透支,这里选择警告但继续
self.balance -= tx.amount
self.transaction_history.append(tx)
return self.balance
def take_snapshot(self, date: str = None):
"""记录余额快照"""
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
self.daily_snapshots[date] = self.balance
def get_balance_at_date(self, date: str) -> float:
"""获取指定日期的余额"""
return self.daily_snapshots.get(date, self.balance)
def to_series(self) -> pd.Series:
"""转换为时间序列"""
dates = sorted(self.daily_snapshots.keys())
balances = [self.daily_snapshots[d] for d in dates]
return pd.Series(balances, index=pd.to_datetime(dates))
class TransactionValidator:
"""交易验证器"""
@staticmethod
def validate_transaction(tx: Transaction, graph: CypherGraph) -> Tuple[bool, str]:
"""验证交易合法性"""
# 检查节点存在
if tx.from_node not in graph.nodes:
return False, f"Source node {tx.from_node} not found"
if tx.to_node not in graph.nodes:
return False, f"Target node {tx.to_node} not found"
# 金额必须为正数
if tx.amount <= 0:
return False, f"Amount must be positive, got {tx.amount}"
# 根据交易类型附加验证
if tx.type == TransactionType.TRANSFER:
if tx.from_node == tx.to_node:
return False, "Cannot transfer to self"
# 检查关系是否存在(如果定义了关系约束)
relationships = graph.match_relationships(source_id=tx.from_node, target_id=tx.to_node)
if not relationships and tx.type not in [TransactionType.INCOME, TransactionType.EXPENSE]:
logger.info(f"No explicit relationship between {tx.from_node} and {tx.to_node}, but proceeding")
return True, "Valid"
class DeepBalanceCalculator:
"""
深度余额计算器
实现多种复杂的余额计算算法
"""
def init(self, graph: CypherGraph):
self.graph = graph
self.balance_sheets: Dict[str, BalanceSheet] = {}
self._initialize_balance_sheets()
def _initialize_balance_sheets(self):
"""为每个节点初始化资产负债表"""
for node_id in self.graph.nodes:
self.balance_sheets[node_id] = BalanceSheet(node_id)
def execute_transaction(self, tx: Transaction) -> Dict[str, float]:
"""
执行交易,返回受影响节点的余额变化
"""
# 验证
valid, msg = TransactionValidator.validate_transaction(tx, self.graph)
if not valid:
raise TransactionError(f"Invalid transaction: {msg}")
# 确定资金流向
# 对于INCOME: 资金流入to_node
# 对于EXPENSE: 资金流出from_node
# 对于TRANSFER: from_node -> to_node
affected_balances = {}
if tx.type == TransactionType.INCOME:
# 收入:资金进入to_node
sheet_to = self.balance_sheets[tx.to_node]
new_balance = sheet_to.apply_transaction(tx, is_inflow=True)
affected_balances[tx.to_node] = new_balance
elif tx.type == TransactionType.EXPENSE:
# 支出:资金从from_node流出
sheet_from = self.balance_sheets[tx.from_node]
new_balance = sheet_from.apply_transaction(tx, is_inflow=False)
affected_balances[tx.from_node] = new_balance
elif tx.type in [TransactionType.TRANSFER, TransactionType.INVESTMENT]:
# 转账/投资:同时影响两个节点
sheet_from = self.balance_sheets[tx.from_node]
sheet_to = self.balance_sheets[tx.to_node]
new_balance_from = sheet_from.apply_transaction(tx, is_inflow=False)
new_balance_to = sheet_to.apply_transaction(tx, is_inflow=True)
affected_balances[tx.from_node] = new_balance_from
affected_balances[tx.to_node] = new_balance_to
elif tx.type == TransactionType.INTEREST:
# 利息:仅影响to_node(收益)
sheet_to = self.balance_sheets[tx.to_node]
new_balance = sheet_to.apply_transaction(tx, is_inflow=True)
affected_balances[tx.to_node] = new_balance
elif tx.type == TransactionType.FEE:
# 手续费:从from_node扣除
sheet_from = self.balance_sheets[tx.from_node]
new_balance = sheet_from.apply_transaction(tx, is_inflow=False)
affected_balances[tx.from_node] = new_balance
else:
# 默认处理:视为转账
sheet_from = self.balance_sheets[tx.from_node]
sheet_to = self.balance_sheets[tx.to_node]
new_balance_from = sheet_from.apply_transaction(tx, is_inflow=False)
new_balance_to = sheet_to.apply_transaction(tx, is_inflow=True)
affected_balances[tx.from_node] = new_balance_from
affected_balances[tx.to_node] = new_balance_to
logger.info(f"Executed {tx.type.value}: {tx.amount} from {tx.from_node} to {tx.to_node}")
return affected_balances
def calculate_net_worth(self, node_ids: List[str] = None) -> float:
"""计算净财富(指定节点或全部)"""
if node_ids is None:
node_ids = list(self.balance_sheets.keys())
total = sum(self.balance_sheets[nid].balance for nid in node_ids if nid in self.balance_sheets)
return total
def run_simulation(self, transactions: List[Transaction],
snapshot_interval: int = 10) -> pd.DataFrame:
"""
运行批量交易模拟
transactions: 交易列表
snapshot_interval: 每N笔交易记录一次快照
"""
results = []
for idx, tx in enumerate(transactions):
try:
self.execute_transaction(tx)
# 定期快照
if idx % snapshot_interval == 0 or idx == len(transactions) - 1:
snapshot = {
"step": idx,
"timestamp": tx.timestamp,
"transactions": idx + 1,
}
for node_id, sheet in self.balance_sheets.items():
snapshot[f"balance_{node_id}"] = sheet.balance
snapshot["net_worth"] = self.calculate_net_worth()
results.append(snapshot)
except Exception as e:
logger.error(f"Transaction {tx.id} failed: {e}")
raise
return pd.DataFrame(results)
def get_balance_series(self, node_id: str) -> pd.Series:
"""获取指定账户的余额时间序列"""
if node_id not in self.balance_sheets:
raise SimulationError(f"Node {node_id} not found")
return self.balance_sheets[node_id].to_series()
def compute_risk_metrics(self, node_id: str, lookback_days: int = 30) -> Dict[str, float]:
"""
计算风险指标
- 波动率
- 最大回撤
- 夏普比率(假设无风险利率为0)
- 在险价值(VaR)
"""
series = self.get_balance_series(node_id)
if len(series) < 2:
return {}
# 收益率序列
returns = series.pct_change().dropna()
# 波动率(年化,假设日数据)
volatility = returns.std() * np.sqrt(252)
# 最大回撤
cumulative = series
running_max = cumulative.expanding().max()
drawdown = (cumulative - running_max) / running_max
max_drawdown = drawdown.min()
# 夏普比率
sharpe = returns.mean() / returns.std() * np.sqrt(252) if returns.std() > 0 else 0
# 简单VaR (95%置信度)
var_95 = np.percentile(returns, 5)
return {
"volatility": volatility,
"max_drawdown": max_drawdown,
"sharpe_ratio": sharpe,
"var_95": var_95,
"current_balance": series.iloc[-1],
"avg_balance": series.mean()
}
================================================================================
第四部分:高级模拟器与策略模式
================================================================================
class TransactionStrategy:
"""交易策略基类"""
def generate_transactions(self, step: int, context: Dict) -> List[Transaction]:
"""生成交易列表"""
raise NotImplementedError
class RandomWalkStrategy(TransactionStrategy):
"""随机游走策略 - 生成随机交易"""
def init(self, graph: CypherGraph, avg_amount: float = 100,
volatility: float = 0.3, tx_types: List[TransactionType] = None):
self.graph = graph
self.avg_amount = avg_amount
self.volatility = volatility
self.tx_types = tx_types or [TransactionType.INCOME, TransactionType.EXPENSE, TransactionType.TRANSFER]
self.node_ids = list(graph.nodes.keys())
def generate_transactions(self, step: int, context: Dict) -> List[Transaction]:
# 随机选择交易类型
tx_type = np.random.choice(self.tx_types)
# 随机选择节点
from_node = np.random.choice(self.node_ids)
to_node = np.random.choice(self.node_ids)
# 随机金额(对数正态分布)
amount = np.random.lognormal(np.log(self.avg_amount), self.volatility)
tx = Transaction(
tx_id=f"sim_{step}_{datetime.now().timestamp()}",
from_node_id=from_node,
to_node_id=to_node,
amount=amount,
tx_type=tx_type,
description=f"Random walk step {step}"
)
return [tx]
class CyclicalCashFlowStrategy(TransactionStrategy):
"""周期性现金流策略 - 模拟工资、房租等周期交易"""
def init(self, graph: CypherGraph, salary_node: str, expense_node: str,
salary_amount: float, rent_amount: float, salary_day: int = 25,
rent_day: int = 1):
self.graph = graph
self.salary_node = salary_node
self.expense_node = expense_node
self.salary_amount = salary_amount
self.rent_amount = rent_amount
self.salary_day = salary_day
self.rent_day = rent_day
def generate_transactions(self, step: int, context: Dict) -> List[Transaction]:
current_date = context.get("current_date", datetime.now())
day = current_date.day
transactions = []
# 工资日
if day == self.salary_day:
tx = Transaction(
tx_id=f"salary_{step}_{current_date.strftime('%Y%m%d')}",
from_node_id="system",
to_node_id=self.salary_node,
amount=self.salary_amount,
tx_type=TransactionType.INCOME,
description="Monthly salary"
)
transactions.append(tx)
# 房租日
if day == self.rent_day and self.expense_node != self.salary_node:
tx = Transaction(
tx_id=f"rent_{step}_{current_date.strftime('%Y%m%d')}",
from_node_id=self.salary_node,
to_node_id=self.expense_node,
amount=self.rent_amount,
tx_type=TransactionType.EXPENSE,
description="Monthly rent"
)
transactions.append(tx)
return transactions
class DeepBalanceSimulator:
"""
深度余额模拟器主类
整合图模型、交易引擎和策略
"""
def init(self, graph: CypherGraph):
self.graph = graph
self.calculator = DeepBalanceCalculator(graph)
self.strategies: List[TransactionStrategy] = []
self.simulation_history: List[Dict] = []
self.current_step = 0
def add_strategy(self, strategy: TransactionStrategy):
"""添加交易生成策略"""
self.strategies.append(strategy)
def simulate_days(self, days: int, start_date: datetime = None) -> pd.DataFrame:
"""
按天模拟
"""
if start_date is None:
start_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
all_transactions = []
daily_results = []
for day_offset in range(days):
current_date = start_date + timedelta(days=day_offset)
context = {"current_date": current_date, "step": self.current_step}
# 收集所有策略生成的交易
day_transactions = []
for strategy in self.strategies:
txs = strategy.generate_transactions(self.current_step, context)
day_transactions.extend(txs)
# 执行当天交易
for tx in day_transactions:
try:
self.calculator.execute_transaction(tx)
all_transactions.append(tx)
except Exception as e:
logger.error(f"Transaction failed on {current_date}: {e}")
continue
# 记录每日快照
snapshot = {
"date": current_date.strftime("%Y-%m-%d"),
"day": day_offset,
"transactions_count": len(day_transactions)
}
for node_id in self.graph.nodes:
snapshot[f"balance_{node_id}"] = self.calculator.balance_sheets[node_id].balance
snapshot["net_worth"] = self.calculator.calculate_net_worth()
daily_results.append(snapshot)
# 为每个账户记录日快照
for node_id in self.graph.nodes:
self.calculator.balance_sheets[node_id].take_snapshot(current_date.strftime("%Y-%m-%d"))
self.current_step += 1
self.simulation_history = daily_results
return pd.DataFrame(daily_results)
def get_balance_dataframe(self) -> pd.DataFrame:
"""获取所有账户余额的时间序列DataFrame"""
data = {}
for node_id, sheet in self.calculator.balance_sheets.items():
series = sheet.to_series()
data[node_id] = series
return pd.DataFrame(data)
def generate_report(self) -> Dict:
"""生成综合报告"""
report = {
"simulation_steps": self.current_step,
"graph_summary": {
"nodes": len(self.graph.nodes),
"relationships": len(self.graph.relationships)
},
"accounts": {}
}
for node_id, sheet in self.calculator.balance_sheets.items():
metrics = self.calculator.compute_risk_metrics(node_id)
report["accounts"][node_id] = {
"final_balance": sheet.balance,
"total_transactions": len(sheet.transaction_history),
"risk_metrics": metrics
}
report["net_worth"] = self.calculator.calculate_net_worth()
return report
def plot_balance_timeline(self, figsize=(12, 6)):
"""绘制余额时间线"""
df = self.get_balance_dataframe()
if df.empty:
logger.warning("No data to plot")
return
plt.figure(figsize=figsize)
for col in df.columns:
plt.plot(df.index, df[col], label=col, marker='o', markersize=3)
plt.xlabel("Date")
plt.ylabel("Balance")
plt.title("Account Balance Timeline")
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def plot_net_worth(self, figsize=(10, 5)):
"""绘制净财富变化"""
if not self.simulation_history:
logger.warning("No simulation history")
return
df = pd.DataFrame(self.simulation_history)
plt.figure(figsize=figsize)
plt.plot(range(len(df)), df["net_worth"], marker='o', linestyle='-', linewidth=2)
plt.xlabel("Day")
plt.ylabel("Net Worth")
plt.title("Net Worth Evolution")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def export_transactions(self, filepath: str):
"""导出所有交易记录"""
all_txs = []
for sheet in self.calculator.balance_sheets.values():
for tx in sheet.transaction_history:
all_txs.append(tx.to_dict())
with open(filepath, 'w') as f:
json.dump(all_txs, f, indent=2, default=str)
logger.info(f"Exported {len(all_txs)} transactions to {filepath}")
================================================================================
第五部分:示例与演示
================================================================================
def create_demo_graph() -> CypherGraph:
"""创建一个演示用图"""
graph = CypherGraph(name="PersonalFinanceGraph")
# 创建节点
graph.create_node("alice", AccountType.PERSONAL, {"name": "Alice", "initial_balance": 5000})
graph.create_node("bob", AccountType.PERSONAL, {"name": "Bob", "initial_balance": 3000})
graph.create_node("business", AccountType.BUSINESS, {"name": "Alice's Business", "initial_balance": 10000})
graph.create_node("savings", AccountType.SAVINGS, {"name": "High Yield Savings", "interest_rate": 0.045})
graph.create_node("investment", AccountType.INVESTMENT, {"name": "Stock Portfolio", "risk": "moderate"})
graph.create_node("system", AccountType.PERSONAL, {"name": "System Account", "is_system": True})
# 创建关系
graph.create_relationship("rel_1", "alice", "business", "OWNS", {"share": 1.0})
graph.create_relationship("rel_2", "alice", "savings", "HAS_ACCOUNT", {"opened": "2023-01-01"})
graph.create_relationship("rel_3", "alice", "investment", "INVESTS_IN", {"allocation": 0.3})
graph.create_relationship("rel_4", "bob", "alice", "OWES", {"amount": 500})
graph.create_relationship("rel_5", "business", "alice", "PAYS", {"type": "salary"})
return graph
def initialize_balances(graph: CypherGraph, calculator: DeepBalanceCalculator):
"""初始化账户余额(从节点属性中读取initial_balance)"""
for node_id, node in graph.nodes.items():
initial = node.get_property("initial_balance", 0.0)
if initial > 0:
# 创建一个初始化的收入交易
init_tx = Transaction(
tx_id=f"init_{node_id}",
from_node_id="system",
to_node_id=node_id,
amount=initial,
tx_type=TransactionType.INCOME,
description="Initial balance"
)
calculator.execute_transaction(init_tx)
calculator.balance_sheets[node_id].take_snapshot()
def run_demo_simulation():
"""运行完整演示"""
print("=" 80)
print("Cypher深度余额模拟器 - 演示运行")
print("=" 80)
# 1. 创建图
graph = create_demo_graph()
print(f"\n[1] 图已创建: {graph.name}")
print(f" 节点数: {len(graph.nodes)}")
print(f" 关系数: {len(graph.relationships)}")
# 2. 创建模拟器
simulator = DeepBalanceSimulator(graph)
# 3. 初始化余额
initialize_balances(graph, simulator.calculator)
print("\n[2] 初始余额已设置:")
for node_id, sheet in simulator.calculator.balance_sheets.items():
print(f" {node_id}: ${sheet.balance:,.2f}")
# 4. 添加交易策略
# 随机策略
random_strategy = RandomWalkStrategy(
graph, avg_amount=200, volatility=0.4,
tx_types=[TransactionType.EXPENSE, TransactionType.TRANSFER]
)
simulator.add_strategy(random_strategy)
# 周期性现金流策略
cashflow_strategy = CyclicalCashFlowStrategy(
graph, salary_node="alice", expense_node="bob",
salary_amount=3000, rent_amount=1200,
salary_day=25, rent_day=1
)
simulator.add_strategy(cashflow_strategy)
print("\n[3] 交易策略已添加:")
print(" - RandomWalkStrategy (随机支出/转账)")
print(" - CyclicalCashFlowStrategy (月薪/房租)")
# 5. 运行模拟
print("\n[4] 开始模拟60天...")
result_df = simulator.simulate_days(days=60, start_date=datetime(2024, 1, 1))
# 6. 输出结果
print("\n[5] 模拟完成!最终余额:")
final_balances = result_df.iloc[-1]
for col in final_balances.index:
if col.startswith("balance_"):
node = col.replace("balance_", "")
print(f" {node}: ${final_balances[col]:,.2f}")
print(f"\n 最终净财富: ${result_df.iloc[-1]['net_worth']:,.2f}")
# 7. 生成报告
print("\n[6] 风险指标报告:")
report = simulator.generate_report()
for account, data in report["accounts"].items():
print(f"\n 账户: {account}")
print(f" 最终余额: ${data['final_balance']:,.2f}")
print(f" 交易次数: {data['total_transactions']}")
if data['risk_metrics']:
metrics = data['risk_metrics']
print(f" 波动率(年化): {metrics.get('volatility', 0):.2%}")
print(f" 最大回撤: {metrics.get('max_drawdown', 0):.2%}")
print(f" 夏普比率: {metrics.get('sharpe_ratio', 0):.2f}")
# 8. 可视化
print("\n[7] 生成可视化图表...")
simulator.plot_balance_timeline()
simulator.plot_net_worth()
# 9. 导出数据
simulator.export_transactions("demo_transactions.json")
graph.save_to_file("demo_graph.json")
print("\n[8] 数据已导出: demo_transactions.json, demo_graph.json")
return simulator
================================================================================
第六部分:高级分析模块
================================================================================
class CypherAnalytics:
"""
基于Cypher图模型的深度分析工具
"""
def init(self, simulator: DeepBalanceSimulator):
self.simulator = simulator
self.graph = simulator.graph
def analyze_flow_patterns(self) -> pd.DataFrame:
"""分析资金流动模式"""
flow_data = []
for sheet in self.simulator.calculator.balance_sheets.values():
for tx in sheet.transaction_history:
flow_data.append({
"from": tx.from_node,
"to": tx.to_node,
"amount": tx.amount,
"type": tx.type.value,
"date": tx.timestamp
})
return pd.DataFrame(flow_data)
def detect_anomalies(self, threshold: float = 3.0) -> List[Transaction]: