下载地址:http://lanzou.com.cn/i5741b5ab

源码结构:
Project Structure
Folder : gongjijinjisuan
Files : 51
Size : 158.9 KB
Generated: 2026-03-20 17:21:09
gongjijinjisuan/
├── README.md [178 B]
├── config/
│ ├── Converter.xml [1.7 KB]
│ ├── Engine.properties [597 B]
│ ├── Loader.json [684 B]
│ ├── Scheduler.properties [596 B]
│ ├── Server.json [684 B]
│ ├── Validator.xml [1.6 KB]
│ └── application.properties [595 B]
├── endpoints/
│ ├── Adapter.php [3.2 KB]
│ ├── Helper.php [3 KB]
│ ├── Manager.py [6.1 KB]
│ ├── Queue.py [5.6 KB]
│ └── Resolver.go [2.4 KB]
├── entities/
│ ├── Client.sql [2.3 KB]
│ ├── Observer.js [3.4 KB]
│ ├── Observer.py [4.5 KB]
│ ├── Scheduler.sql [3 KB]
│ └── Service.java [3.8 KB]
├── generators/
│ ├── Adapter.java [5.1 KB]
│ └── Util.go [2.5 KB]
├── lib/
│ ├── Cache.jar [638 B]
│ └── Listener.jar [638 B]
├── package.json [684 B]
├── pom.xml [1.5 KB]
├── projections/
│ ├── Converter.php [3.7 KB]
│ ├── Factory.java [6.7 KB]
│ └── Processor.py [6.2 KB]
├── proto/
│ ├── Builder.cpp [1.5 KB]
│ ├── Parser.cpp [1.5 KB]
│ ├── Proxy.go [2.4 KB]
│ └── Transformer.js [3.7 KB]
├── services/
│ ├── Controller.java [4.6 KB]
│ ├── Dispatcher.js [4.4 KB]
│ ├── Listener.ts [2.3 KB]
│ ├── Pool.js [3 KB]
│ ├── Registry.php [4.2 KB]
│ └── Util.py [5 KB]
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ ├── Buffer.java [6 KB]
│ │ │ ├── Executor.java [6.3 KB]
│ │ │ ├── Handler.java [7.4 KB]
│ │ │ └── Provider.java [7.1 KB]
│ │ └── resources/
│ └── test/
│ └── java/
└── transformers/
├── Executor.cpp [1.4 KB]
├── Provider.go [2.9 KB]
├── Repository.ts [3.7 KB]
├── Server.js [4 KB]
├── Service.ts [3.4 KB]
├── Transformer.cpp [1.7 KB]
├── Worker.ts [2.6 KB]
└── Wrapper.js [4.2 KB]
配置驱动:政策变化只需更新配置,无需发版
多租户隔离:支持集团多子公司独立计算
高精度计算:使用Decimal避免浮点误差
可观测性:全链路日志与计算过程追溯
python
架构分层示意
┌─────────────────────────────────────────┐
│ API Gateway │
├─────────────────────────────────────────┤
│ Calculation Service │
│ ┌─────────────────────────────────┐ │
│ │ Policy Engine (配置驱动) │ │
│ ├─────────────────────────────────┤ │
│ │ Calculator Core (计算核心) │ │
│ ├─────────────────────────────────┤ │
│ │ Tax Integration (税务联动) │ │
│ └─────────────────────────────────┘ │
├─────────────────────────────────────────┤
│ Data Layer │
│ (Redis缓存 / 配置中心 / 审计DB) │
└─────────────────────────────────────────┘
二、核心数据结构设计
2.1 基础数据模型
python
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Optional, List
from enum import Enum
import json
class FundType(Enum):
"""公积金类型"""
MANDATORY = "mandatory" # 强制公积金
SUPPLEMENTARY = "supplementary" # 补充公积金
@dataclass
class FundPolicy:
"""公积金政策配置"""
city_code: str # 城市代码
year: int # 政策年份
base_lower_limit: Decimal # 基数下限
base_upper_limit: Decimal # 基数上限
employee_rate: Decimal # 个人缴存比例
employer_rate: Decimal # 单位缴存比例
supplementary_rate: Optional[Decimal] = None # 补充公积金比例
supplementary_cap: Optional[Decimal] = None # 补充公积金上限
def to_cache_key(self) -> str:
"""生成缓存Key"""
return f"fund:policy:{self.city_code}:{self.year}"
@dataclass
class EmployeeInfo:
"""员工信息"""
employee_id: str
name: str
city_code: str
base_salary: Decimal # 月基本工资
previous_month_total: Decimal # 上月总收入
has_supplementary: bool = False # 是否缴纳补充公积金
special_adjustment: Optional[Decimal] = None # 特殊调整金额
2.2 计算结果模型
python
@dataclass
class CalculationDetail:
"""计算明细,支持审计追溯"""
step: str
input_value: Decimal
output_value: Decimal
rule_applied: str
def to_dict(self):
return {
'step': self.step,
'input': str(self.input_value),
'output': str(self.output_value),
'rule': self.rule_applied
}
@dataclass
class FundCalculationResult:
"""公积金计算结果"""
employee_id: str
period: str # 计算期间 YYYYMM
calculation_base: Decimal # 计算基数
employee_amount: Decimal # 个人缴存额
employer_amount: Decimal # 单位缴存额
supplementary_employee: Decimal = Decimal('0') # 补充公积金个人部分
supplementary_employer: Decimal = Decimal('0') # 补充公积金单位部分
total_amount: Decimal = Decimal('0') # 合计缴存额
details: List[CalculationDetail] = field(default_factory=list)
def __post_init__(self):
self.total_amount = (
self.employee_amount +
self.employer_amount +
self.supplementary_employee +
self.supplementary_employer
)
def to_audit_log(self) -> str:
"""生成审计日志"""
return json.dumps({
'employee_id': self.employee_id,
'period': self.period,
'calculation_base': str(self.calculation_base),
'employee_amount': str(self.employee_amount),
'employer_amount': str(self.employer_amount),
'details': [d.to_dict() for d in self.details]
}, ensure_ascii=False)
三、计算核心引擎实现
3.1 基数计算模块
公积金计算的第一步是确定缴存基数,涉及多种业务场景:
python
from abc import ABC, abstractmethod
class BaseCalculator(ABC):
"""基数计算策略抽象类"""
@abstractmethod
def calculate_base(self, employee: EmployeeInfo, policy: FundPolicy) -> Decimal:
pass
class StandardBaseCalculator(BaseCalculator):
"""标准基数计算:取上月总收入与基数上下限比较"""
def calculate_base(self, employee: EmployeeInfo, policy: FundPolicy) -> Decimal:
raw_base = employee.previous_month_total
# 应用上下限
base = min(max(raw_base, policy.base_lower_limit), policy.base_upper_limit)
return base.quantize(Decimal('0.01'))
class NewHireBaseCalculator(BaseCalculator):
"""新入职员工:使用当月基本工资"""
def calculate_base(self, employee: EmployeeInfo, policy: FundPolicy) -> Decimal:
raw_base = employee.base_salary
base = min(max(raw_base, policy.base_lower_limit), policy.base_upper_limit)
return base.quantize(Decimal('0.01'))
class HighIncomeBaseCalculator(BaseCalculator):
"""高收入员工:特殊封顶策略"""
def __init__(self, multiplier: Decimal = Decimal('3')):
self.multiplier = multiplier # 社平工资倍数
def calculate_base(self, employee: EmployeeInfo, policy: FundPolicy) -> Decimal:
# 高收入员工按3倍社平工资封顶,而非普通上限
social_avg = self._get_social_avg_salary(policy.city_code, policy.year)
custom_upper = social_avg * self.multiplier
raw_base = employee.previous_month_total
base = min(raw_base, custom_upper)
return base.quantize(Decimal('0.01'))
def _get_social_avg_salary(self, city_code: str, year: int) -> Decimal:
# 实际项目中从配置中心获取
return Decimal('10000')
3.2 缴存额计算引擎
python
class FundCalculator:
"""公积金计算核心引擎"""
def __init__(self, base_calculator: BaseCalculator = None):
self.base_calculator = base_calculator or StandardBaseCalculator()
self._calculation_context = {}
def calculate(self, employee: EmployeeInfo, policy: FundPolicy) -> FundCalculationResult:
"""执行完整公积金计算"""
result = FundCalculationResult(
employee_id=employee.employee_id,
period=self._get_current_period(),
calculation_base=Decimal('0'),
employee_amount=Decimal('0'),
employer_amount=Decimal('0')
)
# Step 1: 计算缴存基数
calculation_base = self._calculate_base_with_trace(
employee, policy, result
)
result.calculation_base = calculation_base
# Step 2: 计算强制公积金
employee_mandatory, employer_mandatory = self._calculate_mandatory_fund(
calculation_base, policy, result
)
result.employee_amount = employee_mandatory
result.employer_amount = employer_mandatory
# Step 3: 计算补充公积金
if employee.has_supplementary and policy.supplementary_rate:
supplementary_emp, supplementary_emp_er = self._calculate_supplementary_fund(
calculation_base, policy, result
)
result.supplementary_employee = supplementary_emp
result.supplementary_employer = supplementary_emp_er
return result
def _calculate_base_with_trace(self, employee: EmployeeInfo,
policy: FundPolicy,
result: FundCalculationResult) -> Decimal:
"""带追踪的基数计算"""
raw_base = self.base_calculator.calculate_base(employee, policy)
result.details.append(CalculationDetail(
step="基数计算",
input_value=employee.previous_month_total,
output_value=raw_base,
rule_applied=f"上下限约束 [{policy.base_lower_limit}, {policy.base_upper_limit}]"
))
return raw_base
def _calculate_mandatory_fund(self, base: Decimal,
policy: FundPolicy,
result: FundCalculationResult) -> tuple:
"""计算强制公积金"""
employee_amount = (base * policy.employee_rate).quantize(Decimal('0.01'))
employer_amount = (base * policy.employer_rate).quantize(Decimal('0.01'))
result.details.append(CalculationDetail(
step="强制公积金",
input_value=base,
output_value=employee_amount,
rule_applied=f"个人比例: {policy.employee_rate}"
))
return employee_amount, employer_amount
def _calculate_supplementary_fund(self, base: Decimal,
policy: FundPolicy,
result: FundCalculationResult) -> tuple:
"""计算补充公积金"""
# 补充公积金可能有独立上限
effective_base = base
if policy.supplementary_cap:
effective_base = min(base, policy.supplementary_cap)
employee_amount = (effective_base * policy.supplementary_rate).quantize(Decimal('0.01'))
employer_amount = employee_amount # 补充公积金通常个人单位比例相同
result.details.append(CalculationDetail(
step="补充公积金",
input_value=base,
output_value=employee_amount,
rule_applied=f"比例: {policy.supplementary_rate}, 上限: {policy.supplementary_cap}"
))
return employee_amount, employer_amount
def _get_current_period(self) -> str:
"""获取当前计算期间 YYYYMM"""
from datetime import datetime
return datetime.now().strftime('%Y%m')
四、配置中心与多城市适配
4.1 政策配置管理器
python
import redis
from typing import Dict
import json
from decimal import Decimal
class PolicyConfigManager:
"""政策配置管理器,支持热更新"""
def __init__(self, redis_client: redis.Redis, cache_ttl: int = 3600):
self.redis = redis_client
self.cache_ttl = cache_ttl
self._local_cache: Dict[str, FundPolicy] = {}
def get_policy(self, city_code: str, year: int) -> FundPolicy:
"""获取公积金政策,三级缓存"""
cache_key = f"fund:policy:{city_code}:{year}"
# L1: 本地缓存
if cache_key in self._local_cache:
return self._local_cache[cache_key]
# L2: Redis缓存
cached = self.redis.get(cache_key)
if cached:
policy = self._deserialize_policy(cached)
self._local_cache[cache_key] = policy
return policy
# L3: 从配置中心加载(实际项目中从数据库/配置中心读取)
policy = self._load_from_config_center(city_code, year)
# 写入缓存
self.redis.setex(
cache_key,
self.cache_ttl,
self._serialize_policy(policy)
)
self._local_cache[cache_key] = policy
return policy
def refresh_policy(self, city_code: str, year: int):
"""手动刷新政策配置"""
cache_key = f"fund:policy:{city_code}:{year}"
self.redis.delete(cache_key)
if cache_key in self._local_cache:
del self._local_cache[cache_key]
def _load_from_config_center(self, city_code: str, year: int) -> FundPolicy:
"""
实际项目中从配置中心或数据库加载
此处模拟2024年主要城市政策
"""
policies = {
('Beijing', 2024): FundPolicy(
city_code='Beijing',
year=2024,
base_lower_limit=Decimal('2420'),
base_upper_limit=Decimal('33891'),
employee_rate=Decimal('0.12'),
employer_rate=Decimal('0.12'),
supplementary_rate=Decimal('0.05'),
supplementary_cap=Decimal('33891')
),
('Shanghai', 2024): FundPolicy(
city_code='Shanghai',
year=2024,
base_lower_limit=Decimal('2690'),
base_upper_limit=Decimal('34188'),
employee_rate=Decimal('0.07'),
employer_rate=Decimal('0.07'),
supplementary_rate=Decimal('0.05'),
supplementary_cap=Decimal('31014')
),
('Shenzhen', 2024): FundPolicy(
city_code='Shenzhen',
year=2024,
base_lower_limit=Decimal('2360'),
base_upper_limit=Decimal('41190'),
employee_rate=Decimal('0.05'),
employer_rate=Decimal('0.05')
),
}
key = (city_code, year)
if key not in policies:
raise ValueError(f"未找到城市 {city_code} 在 {year} 年的公积金政策")
return policies[key]
def _serialize_policy(self, policy: FundPolicy) -> str:
"""序列化政策配置"""
return json.dumps({
'city_code': policy.city_code,
'year': policy.year,
'base_lower_limit': str(policy.base_lower_limit),
'base_upper_limit': str(policy.base_upper_limit),
'employee_rate': str(policy.employee_rate),
'employer_rate': str(policy.employer_rate),
'supplementary_rate': str(policy.supplementary_rate) if policy.supplementary_rate else None,
'supplementary_cap': str(policy.supplementary_cap) if policy.supplementary_cap else None,
})
def _deserialize_policy(self, data: str) -> FundPolicy:
"""反序列化政策配置"""
d = json.loads(data)
return FundPolicy(
city_code=d['city_code'],
year=d['year'],
base_lower_limit=Decimal(d['base_lower_limit']),
base_upper_limit=Decimal(d['base_upper_limit']),
employee_rate=Decimal(d['employee_rate']),
employer_rate=Decimal(d['employer_rate']),
supplementary_rate=Decimal(d['supplementary_rate']) if d.get('supplementary_rate') else None,
supplementary_cap=Decimal(d['supplementary_cap']) if d.get('supplementary_cap') else None,
)
4.2 批量计算与并发优化
python
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List
import logging
logger = logging.getLogger(name)
class BatchFundCalculator:
"""批量公积金计算器,支持并发处理"""
def __init__(self, policy_manager: PolicyConfigManager, max_workers: int = 10):
self.policy_manager = policy_manager
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def calculate_batch(self, employees: List[EmployeeInfo],
year: int) -> List[FundCalculationResult]:
"""批量计算公积金"""
results = []
futures = {}
for employee in employees:
future = self.executor.submit(
self._calculate_single,
employee,
year
)
futures[future] = employee.employee_id
for future in as_completed(futures):
emp_id = futures[future]
try:
result = future.result()
results.append(result)
logger.info(f"员工 {emp_id} 计算完成")
except Exception as e:
logger.error(f"员工 {emp_id} 计算失败: {e}")
# 记录失败,继续处理其他员工
results.append(None)
return results
def _calculate_single(self, employee: EmployeeInfo, year: int) -> FundCalculationResult:
"""单个员工计算"""
policy = self.policy_manager.get_policy(employee.city_code, year)
# 根据员工类型选择基数计算策略
# 这里简化处理,实际可根据入职日期等动态选择
calculator = FundCalculator(StandardBaseCalculator())
return calculator.calculate(employee, policy)
def calculate_batch_with_progress(self, employees: List[EmployeeInfo],
year: int,
callback=None) -> List[FundCalculationResult]:
"""带进度回调的批量计算"""
total = len(employees)
results = []
for i, employee in enumerate(employees):
try:
result = self._calculate_single(employee, year)
results.append(result)
except Exception as e:
logger.error(f"员工 {employee.employee_id} 计算失败: {e}")
results.append(None)
if callback:
callback(i + 1, total)
return results
五、与个税系统联动
在实际业务中,公积金与个税紧密相关。以下是联动计算示例:
python
@dataclass
class TaxCalculationContext:
"""个税计算上下文"""
employee_id: str
period: str
taxable_income: Decimal # 应纳税所得额
fund_deduction: Decimal # 公积金税前扣除
special_deductions: Decimal # 专项附加扣除
class IntegratedCalculator:
"""公积金与个税联动计算器"""
def calculate_with_tax(self, employee: EmployeeInfo,
policy: FundPolicy,
special_deductions: Decimal = Decimal('0')) -> dict:
"""
联动计算公积金和个税
返回公积金结果和税后影响
"""
# Step 1: 计算公积金
fund_calc = FundCalculator()
fund_result = fund_calc.calculate(employee, policy)
# Step 2: 计算税前扣除总额
total_deduction = (
fund_result.employee_amount +
fund_result.supplementary_employee
)
# Step 3: 计算应纳税所得额
taxable_base = employee.base_salary - total_deduction - special_deductions
# Step 4: 计算个税
tax_amount = self._calculate_individual_tax(taxable_base)
# Step 5: 计算税后实发
net_salary = employee.base_salary - total_deduction - tax_amount
return {
'fund_result': fund_result,
'taxable_income': taxable_base,
'tax_amount': tax_amount,
'net_salary': net_salary,
'total_deduction': total_deduction
}
def _calculate_individual_tax(self, taxable_income: Decimal) -> Decimal:
"""
计算个人所得税(累计预扣法简化版)
实际项目中需要完整的累计预扣逻辑
"""
if taxable_income <= Decimal('5000'):
return Decimal('0')
taxable_part = taxable_income - Decimal('5000')
# 简化税率表
if taxable_part <= Decimal('3000'):
return (taxable_part * Decimal('0.03')).quantize(Decimal('0.01'))
elif taxable_part <= Decimal('12000'):
return (taxable_part * Decimal('0.10') - Decimal('210')).quantize(Decimal('0.01'))
elif taxable_part <= Decimal('25000'):
return (taxable_part * Decimal('0.20') - Decimal('1410')).quantize(Decimal('0.01'))
else:
return (taxable_part * Decimal('0.25') - Decimal('2660')).quantize(Decimal('0.01'))
六、可观测性设计
6.1 计算链路追踪
python
import traceback
from contextvars import ContextVar
import uuid
请求追踪上下文
request_id_var: ContextVar[str] = ContextVar('request_id', default='')
class CalculationTracer:
"""计算链路追踪器"""
@staticmethod
def trace_calculation(func):
"""装饰器:自动记录计算过程"""
def wrapper(*args, **kwargs):
trace_id = str(uuid.uuid4())[:8]
request_id_var.set(trace_id)
logger.info(f"[TraceID: {trace_id}] 开始计算: {func.__name__}")
try:
result = func(*args, **kwargs)
logger.info(f"[TraceID: {trace_id}] 计算完成: {func.__name__}")
return result
except Exception as e:
logger.error(
f"[TraceID: {trace_id}] 计算失败: {func.__name__}, "
f"错误: {e}, 堆栈: {traceback.format_exc()}"
)
raise
return wrapper
6.2 监控指标埋点
python
from dataclasses import dataclass
from datetime import datetime
import threading
import time
@dataclass
class CalculationMetrics:
"""计算指标"""
total_requests: int = 0
success_count: int = 0
failure_count: int = 0
total_duration_ms: float = 0
avg_duration_ms: float = 0
def record_success(self, duration_ms: float):
self.total_requests += 1
self.success_count += 1
self.total_duration_ms += duration_ms
self.avg_duration_ms = self.total_duration_ms / self.success_count
def record_failure(self, duration_ms: float):
self.total_requests += 1
self.failure_count += 1
class MetricsCollector:
"""指标收集器"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.metrics = CalculationMetrics()
return cls._instance
def record_calculation(self, func):
"""装饰器:记录计算耗时和成功率"""
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
duration = (time.time() - start) * 1000
self.metrics.record_success(duration)
return result
except Exception as e:
duration = (time.time() - start) * 1000
self.metrics.record_failure(duration)
raise
return wrapper
def get_metrics(self) -> dict:
"""获取当前指标"""
return {
'total_requests': self.metrics.total_requests,
'success_count': self.metrics.success_count,
'failure_count': self.metrics.failure_count,
'success_rate': self.metrics.success_count / max(1, self.metrics.total_requests),
'avg_duration_ms': round(self.metrics.avg_duration_ms, 2)
}
七、使用示例
python
初始化组件
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
policy_manager = PolicyConfigManager(redis_client)
batch_calculator = BatchFundCalculator(policy_manager, max_workers=5)
准备员工数据
employees = [
EmployeeInfo(
employee_id='EMP001',
name='张三',
city_code='Beijing',
base_salary=Decimal('25000'),
previous_month_total=Decimal('24500'),
has_supplementary=True
),
EmployeeInfo(
employee_id='EMP002',
name='李四',
city_code='Shanghai',
base_salary=Decimal('18000'),
previous_month_total=Decimal('17500'),
has_supplementary=False
),
]
批量计算
results = batch_calculator.calculate_batch(employees, year=2024)
输出结果
for result in results:
if result:
print(f"员工 {result.employee_id}:")
print(f" 计算基数: {result.calculation_base}")
print(f" 个人缴存: {result.employee_amount}")
print(f" 单位缴存: {result.employer_amount}")
print(f" 合计: {result.total_amount}")
print(f" 审计日志: {result.to_audit_log()}\n")
查看监控指标
metrics = MetricsCollector().get_metrics()
print(f"计算指标: {metrics}")
八、生产环境最佳实践
8.1 高可用部署建议
yaml
Kubernetes部署配置示例
apiVersion: apps/v1
kind: Deployment
metadata:
name: fund-calculator
spec:
replicas: 3
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
template:
spec:
containers:
- name: calculator
image: fund-calculator:latest
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
env:
- name: REDIS_SENTINEL_HOSTS
value: "redis-sentinel:26379"
- name: CONFIG_CENTER_URL
value: "http://config-center:8080"