下载地址:http://lanzou.com.cn/i6570ed6f

!/usr/bin/env python3
"""
余额生成器 - Forth模型深度计算引擎
================================================================================
模块名称: balance_forth_engine.py
描述: 一个基于Forth虚拟机思想的余额生成与深度计算系统。
它模拟了一个栈式操作环境,通过自定义的Forth风格指令来生成、变换、
分析和计算各类账户余额数据。系统支持复杂的数据流操作、深度递归计算、
以及金融模型的模拟运行。
核心特性:
- 完整的Forth风格解释器核心 (词法分析、字典、栈、堆、解释器循环)
- 余额生成原语: 支持随机生成、序列生成、基于模式生成
- 深度计算模型: 复利计算、风险价值(VaR)模拟、蒙特卡洛路径生成
- 高级数据结构操作: 支持数组、矩阵操作,用于批量余额处理
- 可扩展的词汇表系统,便于添加新的金融计算指令
作者: AI 智能编程助手
版本: 2.0.0
"""
import random
import math
import sys
import time
from typing import List, Dict, Any, Callable, Union, Optional, Tuple
from dataclasses import dataclass, field
from collections import deque
import json
import numpy as np # 用于高级数学计算,但尽量保持核心栈操作的原生性
==============================================================================
第一部分: Forth虚拟机核心组件
==============================================================================
class ForthStack:
"""Forth风格的数据栈 (LIFO)"""
def init(self):
self._stack = []
def push(self, item: Any):
self._stack.append(item)
def pop(self) -> Any:
if self.is_empty():
raise IndexError("栈下溢: 无法从空栈中弹出元素")
return self._stack.pop()
def peek(self) -> Any:
if self.is_empty():
raise IndexError("栈为空,无法查看顶部元素")
return self._stack[-1]
def is_empty(self) -> bool:
return len(self._stack) == 0
def depth(self) -> int:
return len(self._stack)
def clear(self):
self._stack.clear()
def __repr__(self):
return f"<ForthStack: {self._stack}>"
def get_all(self) -> list:
return self._stack.copy()
class ReturnStack:
"""返回栈,用于存储控制流返回地址"""
def init(self):
self._stack = []
def push(self, addr: int):
self._stack.append(addr)
def pop(self) -> int:
if not self._stack:
raise IndexError("返回栈下溢")
return self._stack.pop()
def is_empty(self) -> bool:
return len(self._stack) == 0
@dataclass
class ForthWord:
"""Forth单词结构"""
name: str
immediate: bool = False
code: Optional[Callable] = None
xt: int = -1 # 执行令牌,在字典中的索引
class ForthDictionary:
"""Forth字典,管理所有定义的单词"""
def init(self):
self._words: Dict[str, ForthWord] = {}
self._compiled_words: Dict[str, list] = {} # 存储编译后的定义体(单词列表)
def add_word(self, name: str, code: Callable = None, immediate: bool = False):
"""添加原生单词"""
self._words[name] = ForthWord(name=name, code=code, immediate=immediate)
def add_compiled_word(self, name: str, definition: list):
"""添加用Forth定义的单词"""
self._compiled_words[name] = definition
# 同时创建一个占位的原生单词,实际执行时会跳转到编译体
self._words[name] = ForthWord(name=name, code=None, immediate=False)
def find(self, name: str) -> Optional[ForthWord]:
return self._words.get(name)
def get_definition(self, name: str) -> Optional[list]:
return self._compiled_words.get(name)
class ForthInterpreter:
"""Forth解释器核心,包含执行循环、栈管理、字典"""
def init(self):
self.data_stack = ForthStack()
self.return_stack = ReturnStack()
self.dict = ForthDictionary()
self._input_buffer = []
self._input_index = 0
self._state = False # False: 解释模式, True: 编译模式
self._current_definition = [] # 当前正在编译的定义体
self._compiling_word_name = None
# 初始化内置词汇表
self._init_builtin_words()
def _init_builtin_words(self):
"""初始化所有内置的Forth单词,包括基础运算和余额生成专用词"""
# ---------- 基础栈操作 ----------
self.dict.add_word("DUP", code=self._word_dup)
self.dict.add_word("DROP", code=self._word_drop)
self.dict.add_word("SWAP", code=self._word_swap)
self.dict.add_word("OVER", code=self._word_over)
self.dict.add_word("ROT", code=self._word_rot)
self.dict.add_word("DEPTH", code=self._word_depth)
# ---------- 算术运算 ----------
self.dict.add_word("+", code=self._word_add)
self.dict.add_word("-", code=self._word_sub)
self.dict.add_word("*", code=self._word_mul)
self.dict.add_word("/", code=self._word_div)
self.dict.add_word("MOD", code=self._word_mod)
self.dict.add_word("**", code=self._word_pow)
self.dict.add_word("SQRT", code=self._word_sqrt)
# ---------- 比较与逻辑 ----------
self.dict.add_word("=", code=self._word_eq)
self.dict.add_word("<", code=self._word_lt)
self.dict.add_word(">", code=self._word_gt)
self.dict.add_word("0=", code=self._word_zero_eq)
self.dict.add_word("AND", code=self._word_and)
self.dict.add_word("OR", code=self._word_or)
# ---------- 控制流 ----------
self.dict.add_word("IF", code=self._word_if, immediate=True)
self.dict.add_word("ELSE", code=self._word_else, immediate=True)
self.dict.add_word("THEN", code=self._word_then, immediate=True)
self.dict.add_word("BEGIN", code=self._word_begin, immediate=True)
self.dict.add_word("UNTIL", code=self._word_until, immediate=True)
self.dict.add_word("DO", code=self._word_do, immediate=True)
self.dict.add_word("LOOP", code=self._word_loop, immediate=True)
# ---------- I/O 和调试 ----------
self.dict.add_word(".", code=self._word_dot)
self.dict.add_word(".S", code=self._word_dot_s)
self.dict.add_word("CR", code=self._word_cr)
self.dict.add_word("EMIT", code=self._word_emit)
# ---------- 编译相关 ----------
self.dict.add_word(":", code=self._word_colon, immediate=True)
self.dict.add_word(";", code=self._word_semicolon, immediate=True)
self.dict.add_word("IMMEDIATE", code=self._word_immediate)
# ---------- 余额生成与金融计算专用词 ----------
self.dict.add_word("GEN-RAND-BAL", code=self._word_gen_rand_bal)
self.dict.add_word("GEN-SEQ-BAL", code=self._word_gen_seq_bal)
self.dict.add_word("COMPOUND", code=self._word_compound)
self.dict.add_word("VAR-SIM", code=self._word_var_sim)
self.dict.add_word("MC-PATH", code=self._word_mc_path)
self.dict.add_word("BAL-ARRAY", code=self._word_bal_array)
self.dict.add_word("SUM-ARRAY", code=self._word_sum_array)
self.dict.add_word("MAX-ARRAY", code=self._word_max_array)
self.dict.add_word("MIN-ARRAY", code=self._word_min_array)
self.dict.add_word("AVG-ARRAY", code=self._word_avg_array)
self.dict.add_word("NPV", code=self._word_npv)
self.dict.add_word("IRR", code=self._word_irr)
self.dict.add_word("PMT", code=self._word_pmt)
self.dict.add_word("FV", code=self._word_fv)
self.dict.add_word("PV", code=self._word_pv)
# ========== 基础单词实现 ==========
def _word_dup(self): self.data_stack.push(self.data_stack.peek())
def _word_drop(self): self.data_stack.pop()
def _word_swap(self): a, b = self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(a); self.data_stack.push(b)
def _word_over(self): a, b = self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(b); self.data_stack.push(a); self.data_stack.push(b)
def _word_rot(self): a, b, c = self.data_stack.pop(), self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(b); self.data_stack.push(a); self.data_stack.push(c)
def _word_depth(self): self.data_stack.push(self.data_stack.depth())
def _word_add(self): a, b = self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(b + a)
def _word_sub(self): a, b = self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(b - a)
def _word_mul(self): a, b = self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(b * a)
def _word_div(self): a, b = self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(b / a if a != 0 else 0)
def _word_mod(self): a, b = self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(b % a)
def _word_pow(self): a, b = self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(b ** a)
def _word_sqrt(self): self.data_stack.push(math.sqrt(self.data_stack.pop()))
def _word_eq(self): a, b = self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(b == a)
def _word_lt(self): a, b = self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(b < a)
def _word_gt(self): a, b = self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(b > a)
def _word_zero_eq(self): self.data_stack.push(self.data_stack.pop() == 0)
def _word_and(self): a, b = self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(b and a)
def _word_or(self): a, b = self.data_stack.pop(), self.data_stack.pop(); self.data_stack.push(b or a)
def _word_dot(self): print(self.data_stack.pop(), end=' ')
def _word_dot_s(self): print(f"<栈深度: {self.data_stack.depth()}> {self.data_stack.get_all()}")
def _word_cr(self): print()
def _word_emit(self): print(chr(self.data_stack.pop()), end='')
# ---------- 控制流实现 (简化版,使用基于列表的跳转) ----------
# 注意: 为了简化,控制流单词在此实现中通过修改输入指针模拟,需要保存地址列表。
# 本实现为了演示核心概念,采用较为直接的方式,控制流单词会操作解释器的程序计数器。
def _word_if(self):
# 在编译模式下,将IF标记和占位地址加入定义
if self._state:
self._current_definition.append(('IF', len(self._current_definition)))
else:
# 解释模式执行IF: 弹出条件,若为假则跳过直到ELSE或THEN
cond = self.data_stack.pop()
if not cond:
# 跳过直到匹配的ELSE或THEN (简单实现,需要嵌套计数)
skip_count = 1
while self._input_index < len(self._input_buffer):
token = self._input_buffer[self._input_index]
if token == 'IF':
skip_count += 1
elif token == 'THEN':
skip_count -= 1
if skip_count == 0:
self._input_index += 1
break
elif token == 'ELSE' and skip_count == 1:
self._input_index += 1
break
self._input_index += 1
def _word_else(self):
if self._state:
self._current_definition.append(('ELSE', len(self._current_definition)))
else:
# 跳过直到THEN
skip_count = 1
while self._input_index < len(self._input_buffer):
token = self._input_buffer[self._input_index]
if token == 'IF':
skip_count += 1
elif token == 'THEN':
skip_count -= 1
if skip_count == 0:
self._input_index += 1
break
self._input_index += 1
def _word_then(self):
if self._state:
self._current_definition.append(('THEN', len(self._current_definition)))
# 解释模式无动作
def _word_begin(self):
if self._state:
self._current_definition.append(('BEGIN', len(self._current_definition)))
else:
self._input_buffer.insert(self._input_index, 'BEGIN-MARK')
def _word_until(self):
if self._state:
self._current_definition.append(('UNTIL', len(self._current_definition)))
else:
# 解释模式: 弹出条件,若假则跳回BEGIN
cond = self.data_stack.pop()
if not cond:
# 向后查找BEGIN
idx = self._input_index - 1
level = 1
while idx >= 0:
if self._input_buffer[idx] == 'UNTIL':
level += 1
elif self._input_buffer[idx] == 'BEGIN':
level -= 1
if level == 0:
self._input_index = idx + 1
break
idx -= 1
def _word_do(self):
# 简化: DO ... LOOP 期望栈上 (limit start)
if self._state:
self._current_definition.append(('DO', len(self._current_definition)))
else:
# 解释模式: 将循环控制变量压入返回栈
limit, start = self.data_stack.pop(), self.data_stack.pop()
self.return_stack.push(start)
self.return_stack.push(limit)
# 记录循环起始地址
self._input_buffer.insert(self._input_index, ('DO-MARK', self._input_index))
def _word_loop(self):
if self._state:
self._current_definition.append(('LOOP', len(self._current_definition)))
else:
limit = self.return_stack.pop()
start = self.return_stack.pop() + 1
if start < limit:
self.return_stack.push(start)
self.return_stack.push(limit)
# 跳回对应的DO
idx = self._input_index - 1
level = 1
while idx >= 0:
if self._input_buffer[idx] == 'LOOP':
level += 1
elif self._input_buffer[idx] == 'DO':
level -= 1
if level == 0:
self._input_index = idx + 1
break
idx -= 1
else:
# 循环结束,清理
pass
# ---------- 编译相关 ----------
def _word_colon(self):
# 开始编译一个新单词
self._state = True
# 下一个token是单词名
name = self._input_buffer[self._input_index]
self._input_index += 1
self._compiling_word_name = name
self._current_definition = []
def _word_semicolon(self):
# 结束编译,将定义存入字典
self._state = False
self.dict.add_compiled_word(self._compiling_word_name, self._current_definition)
self._compiling_word_name = None
self._current_definition = []
def _word_immediate(self):
# 将最近定义的单词设为立即执行
if self._compiling_word_name:
word = self.dict.find(self._compiling_word_name)
if word:
word.immediate = True
# ---------- 余额生成与金融计算核心 ----------
def _word_gen_rand_bal(self):
"""生成随机余额: 期望栈上 (count min max) -> 生成count个随机余额并推入栈"""
max_bal = self.data_stack.pop()
min_bal = self.data_stack.pop()
count = self.data_stack.pop()
for _ in range(count):
bal = random.uniform(min_bal, max_bal)
self.data_stack.push(round(bal, 2))
def _word_gen_seq_bal(self):
"""生成等差数列余额: 期望栈上 (count start step)"""
step = self.data_stack.pop()
start = self.data_stack.pop()
count = self.data_stack.pop()
for i in range(count):
self.data_stack.push(round(start + i * step, 2))
def _word_compound(self):
"""复利计算: 期望栈上 (principal rate years) -> 返回终值"""
years = self.data_stack.pop()
rate = self.data_stack.pop()
principal = self.data_stack.pop()
fv = principal * ((1 + rate) ** years)
self.data_stack.push(round(fv, 2))
def _word_var_sim(self):
"""风险价值(VaR)模拟: 期望栈上 (principal volatility days confidence) -> VaR值"""
confidence = self.data_stack.pop()
days = self.data_stack.pop()
volatility = self.data_stack.pop()
principal = self.data_stack.pop()
z_score = {0.95: 1.645, 0.99: 2.326}.get(confidence, 1.645)
var = principal * volatility * math.sqrt(days) * z_score
self.data_stack.push(round(var, 2))
def _word_mc_path(self):
"""蒙特卡洛路径生成: 期望栈上 (initial return volatility steps paths) -> 生成最终余额数组"""
paths = self.data_stack.pop()
steps = self.data_stack.pop()
vol = self.data_stack.pop()
ret = self.data_stack.pop()
initial = self.data_stack.pop()
results = []
for _ in range(paths):
price = initial
for _ in range(steps):
shock = vol * random.gauss(0, 1)
price *= math.exp(ret - 0.5 * vol * vol + shock)
results.append(round(price, 2))
# 将结果作为数组推入栈(实际推入列表引用,但Forth风格会压入每个元素?简化:压入列表)
self.data_stack.push(results)
def _word_bal_array(self):
"""创建余额数组: 从栈顶取出count个元素,组成数组"""
count = self.data_stack.pop()
arr = []
for _ in range(count):
arr.insert(0, self.data_stack.pop())
self.data_stack.push(arr)
def _word_sum_array(self):
"""求数组和"""
arr = self.data_stack.pop()
total = sum(arr) if isinstance(arr, list) else 0
self.data_stack.push(round(total, 2))
def _word_max_array(self):
arr = self.data_stack.pop()
self.data_stack.push(max(arr) if arr else 0)
def _word_min_array(self):
arr = self.data_stack.pop()
self.data_stack.push(min(arr) if arr else 0)
def _word_avg_array(self):
arr = self.data_stack.pop()
avg = sum(arr)/len(arr) if arr else 0
self.data_stack.push(round(avg, 2))
def _word_npv(self):
"""净现值: 期望栈上 (rate array) -> NPV"""
arr = self.data_stack.pop()
rate = self.data_stack.pop()
npv = sum(cf / ((1 + rate) ** i) for i, cf in enumerate(arr))
self.data_stack.push(round(npv, 2))
def _word_irr(self):
"""内部收益率: 期望栈上 (array) -> IRR (简单迭代)"""
arr = self.data_stack.pop()
def npv(rate):
return sum(cf / ((1 + rate) ** i) for i, cf in enumerate(arr))
# 牛顿法简化
rate = 0.1
for _ in range(100):
f = npv(rate)
if abs(f) < 1e-6:
break
fp = sum(-i * cf / ((1+rate)**(i+1)) for i, cf in enumerate(arr))
rate -= f / fp if fp != 0 else 0.01
self.data_stack.push(round(rate, 4))
def _word_pmt(self):
"""等额本息每期付款: 栈上 (rate nper pv) -> pmt"""
pv = self.data_stack.pop()
nper = self.data_stack.pop()
rate = self.data_stack.pop()
if rate == 0:
pmt = pv / nper
else:
pmt = pv * rate * (1+rate)**nper / ((1+rate)**nper - 1)
self.data_stack.push(round(pmt, 2))
def _word_fv(self):
"""终值: 栈上 (rate nper pmt pv) -> fv"""
pv = self.data_stack.pop()
pmt = self.data_stack.pop()
nper = self.data_stack.pop()
rate = self.data_stack.pop()
fv = pv*(1+rate)**nper + pmt*((1+rate)**nper - 1)/rate if rate != 0 else pv + pmt*nper
self.data_stack.push(round(fv, 2))
def _word_pv(self):
"""现值: 栈上 (rate nper pmt fv) -> pv"""
fv = self.data_stack.pop()
pmt = self.data_stack.pop()
nper = self.data_stack.pop()
rate = self.data_stack.pop()
pv = (fv + pmt*((1+rate)**nper - 1)/rate) / (1+rate)**nper if rate != 0 else fv - pmt*nper
self.data_stack.push(round(pv, 2))
# ---------- 解释器主循环 ----------
def interpret(self, source: str):
"""解释并执行一段Forth代码"""
self._input_buffer = source.split()
self._input_index = 0
while self._input_index < len(self._input_buffer):
token = self._input_buffer[self._input_index]
self._input_index += 1
# 检查是否为数字
try:
num = float(token)
self.data_stack.push(num)
continue
except ValueError:
pass
# 查找单词
word = self.dict.find(token)
if word:
if self._state and not word.immediate:
# 编译模式且非立即词,将词加入当前定义
self._current_definition.append(token)
else:
# 执行单词
if word.code:
word.code()
else:
# 编译词,执行其定义体
definition = self.dict.get_definition(token)
if definition:
# 保存当前解释器状态,递归执行定义体
saved_buffer = self._input_buffer
saved_index = self._input_index
self._input_buffer = [str(item) if not isinstance(item, tuple) else item[0] for item in definition]
self._input_index = 0
while self._input_index < len(self._input_buffer):
subtoken = self._input_buffer[self._input_index]
self._input_index += 1
try:
num = float(subtoken)
self.data_stack.push(num)
continue
except:
subword = self.dict.find(subtoken)
if subword and subword.code:
subword.code()
else:
# 递归处理
pass
self._input_buffer = saved_buffer
self._input_index = saved_index
else:
raise NameError(f"未定义的单词: {token}")
==============================================================================
第二部分: 扩展的余额生成与深度计算模型 (高层封装)
==============================================================================
class BalanceGenerator:
"""余额生成器高级接口,基于Forth引擎构建"""
def init(self):
self.engine = ForthInterpreter()
def generate_random(self, count: int, min_bal: float, max_bal: float) -> List[float]:
"""生成随机余额列表"""
self.engine.data_stack.push(count)
self.engine.data_stack.push(min_bal)
self.engine.data_stack.push(max_bal)
self.engine._word_gen_rand_bal()
# 从栈中提取结果 (生成了count个数值)
result = []
for _ in range(count):
result.insert(0, self.engine.data_stack.pop())
return result
def generate_sequence(self, count: int, start: float, step: float) -> List[float]:
"""生成等差数列余额"""
self.engine.data_stack.push(count)
self.engine.data_stack.push(start)
self.engine.data_stack.push(step)
self.engine._word_gen_seq_bal()
result = []
for _ in range(count):
result.insert(0, self.engine.data_stack.pop())
return result
def deep_compound_simulation(self, principal: float, rate: float, years: int,
volatility: float, simulations: int) -> Dict:
"""深度复利模拟: 结合随机波动,返回各种统计量"""
self.engine.data_stack.push(principal)
self.engine.data_stack.push(rate)
self.engine.data_stack.push(years)
self.engine._word_compound()
deterministic = self.engine.data_stack.pop()
# 蒙特卡洛路径
self.engine.data_stack.push(principal)
self.engine.data_stack.push(rate)
self.engine.data_stack.push(volatility)
self.engine.data_stack.push(years) # steps per year? 简化用years作为步数
self.engine.data_stack.push(simulations)
self.engine._word_mc_path()
paths = self.engine.data_stack.pop()
return {
"deterministic_fv": deterministic,
"monte_carlo_mean": round(np.mean(paths), 2),
"monte_carlo_std": round(np.std(paths), 2),
"percentile_5": round(np.percentile(paths, 5), 2),
"percentile_95": round(np.percentile(paths, 95), 2),
"all_paths": paths
}
def var_analysis(self, portfolio_value: float, volatility: float,
days: int, confidence: float = 0.95) -> float:
"""风险价值分析"""
self.engine.data_stack.push(portfolio_value)
self.engine.data_stack.push(volatility)
self.engine.data_stack.push(days)
self.engine.data_stack.push(confidence)
self.engine._word_var_sim()
return self.engine.data_stack.pop()
def complex_loan_calculation(self, principal: float, annual_rate: float,
years: int, extra_payment: float = 0) -> Dict:
"""复杂贷款计算 (等额本息 + 额外还款效果)"""
monthly_rate = annual_rate / 12
months = years * 12
self.engine.data_stack.push(monthly_rate)
self.engine.data_stack.push(months)
self.engine.data_stack.push(principal)
self.engine._word_pmt()
base_pmt = self.engine.data_stack.pop()
# 额外还款减少的利息
if extra_payment > 0:
effective_pmt = base_pmt + extra_payment
# 简单计算缩短的月数 (近似)
remaining = principal
month = 0
while remaining > 0 and month < 600:
interest = remaining * monthly_rate
remaining = remaining + interest - effective_pmt
month += 1
new_months = month
saved_months = months - new_months
else:
new_months = months
saved_months = 0
return {
"base_monthly_payment": round(base_pmt, 2),
"total_payments": round(base_pmt * months, 2),
"total_interest": round(base_pmt * months - principal, 2),
"extra_payment": extra_payment,
"new_monthly_payment": round(base_pmt + extra_payment, 2) if extra_payment else base_pmt,
"months_saved": saved_months,
"interest_saved": round((base_pmt * months) - (base_pmt + extra_payment) * new_months, 2) if extra_payment else 0
}
==============================================================================
第三部分: 演示与测试 (长代码,展示深度计算)
==============================================================================
def demonstration():
"""展示Forth模型深度计算的强大功能"""
print("=" 80)
print(" 余额生成器 - Forth模型深度计算引擎 演示程序")
print("=" 80)
# 初始化生成器
bg = BalanceGenerator()
print("\n[1] 生成随机余额 (Forth原生指令):")
random_bals = bg.generate_random(10, 1000.0, 5000.0)
print(f" 生成的余额: {random_bals}")
print("\n[2] 生成等差数列余额:")
seq_bals = bg.generate_sequence(8, 100.0, 150.0)
print(f" 序列: {seq_bals}")
print("\n[3] 深度复利模拟 (确定性 + 蒙特卡洛):")
sim_result = bg.deep_compound_simulation(principal=10000, rate=0.08, years=10,
volatility=0.15, simulations=1000)
print(f" 确定性终值: {sim_result['deterministic_fv']}")
print(f" 蒙特卡洛均值: {sim_result['monte_carlo_mean']}")
print(f" 标准差: {sim_result['monte_carlo_std']}")
print(f" 5%分位数: {sim_result['percentile_5']}")
print(f" 95%分位数: {sim_result['percentile_95']}")
print("\n[4] 风险价值(VaR)分析:")
var = bg.var_analysis(portfolio_value=500000, volatility=0.20, days=10, confidence=0.99)
print(f" 投资组合价值50万,年波动率20%,10天99%置信度VaR: {var}")
print("\n[5] 复杂贷款计算 (等额本息 + 额外还款):")
loan = bg.complex_loan_calculation(principal=300000, annual_rate=0.045, years=30, extra_payment=200)
print(f" 基础月供: {loan['base_monthly_payment']}")
print(f" 总还款额: {loan['total_payments']}")
print(f" 总利息: {loan['total_interest']}")
print(f" 额外月供{loan['extra_payment']}后,月供变为: {loan['new_monthly_payment']}")
print(f" 节省月份: {loan['months_saved']} 个月")
print(f" 节省利息: {loan['interest_saved']}")
print("\n[6] 直接运行Forth脚本 (高级计算):")
forth_script = """
: FIN-MODEL
10000 0.08 10 COMPOUND ( 计算复利 )
500000 0.20 10 0.99 VAR-SIM ( 计算VaR )
." 复利终值: " . CR
." VaR值: " . CR
;
FIN-MODEL
"""
print(" 执行Forth脚本:")
bg.engine.interpret(forth_script)
print("\n[7] 数组操作与NPV/IRR分析:")
bg.engine.interpret("1000 2000 1500 3 BAL-ARRAY DUP .S CR")
bg.engine.interpret("0.1 SWAP NPV .\" NPV = \" . CR")
bg.engine.interpret("0.1 SWAP IRR .\" IRR = \" . CR")
print("\n[8] 终值与现值互算:")
bg.engine.interpret("0.05 10 500 1000 FV .\" 终值(FV) = \" . CR")
bg.engine.interpret("0.05 10 500 1628.89 PV .\" 现值(PV) = \" . CR")
print("\n" + "=" * 80)
print("演示完成。Forth模型展示了栈式语言在金融计算中的深度与灵活性。")
print("=" * 80)
==============================================================================
第四部分: 单元测试与性能基准 (长代码部分)
==============================================================================
def unit_tests():
"""单元测试,验证关键单词的正确性"""
engine = ForthInterpreter()
tests_passed = 0
tests_total = 0
def run_test(name, code, expected_stack):
nonlocal tests_passed, tests_total
tests_total += 1
engine.data_stack.clear()
try:
engine.interpret(code)
result = engine.data_stack.get_all()
if result == expected_stack:
print(f" ✓ {name} 通过")
tests_passed += 1
else:
print(f" ✗ {name} 失败: 期望 {expected_stack}, 得到 {result}")
except Exception as e:
print(f" ✗ {name} 异常: {e}")
print("\n运行单元测试:")
run_test("基础加法", "3 5 +", [8.0])
run_test("复利计算", "1000 0.05 10 COMPOUND", [1628.89])
run_test("随机余额生成", "3 100 200 GEN-RAND-BAL DEPTH", [3])
run_test("数组求和", "10 20 30 3 BAL-ARRAY SUM-ARRAY", [60.0])
run_test("净现值", "100 200 300 3 BAL-ARRAY 0.1 NPV", [481.59])
print(f"\n测试结果: {tests_passed}/{tests_total} 通过\n")
def performance_benchmark():
"""性能基准测试,展示深度计算能力"""
print("\n性能基准测试 (大规模计算):")
bg = BalanceGenerator()
start = time.time()
# 生成10万个随机余额
large_balances = bg.generate_random(100000, 0, 10000)
elapsed = time.time() - start
print(f" 生成100,000个随机余额耗时: