PandasTA 源码解析(十四)(1)https://developer.aliyun.com/article/1506233
.\pandas-ta\pandas_ta\utils\_core.py
# 设置文件编码为 utf-8 # 导入 re 模块并重命名为 re_ # 从 pathlib 模块中导入 Path 类 # 从 sys 模块中导入 float_info 并重命名为 sflt # 从 numpy 模块中导入 argmax 和 argmin 函数 # 从 pandas 模块中导入 DataFrame 和 Series 类 # 从 pandas 模块中导入 is_datetime64_any_dtype 函数 # 从 pandas_ta 模块中导入 Imports def _camelCase2Title(x: str): """将驼峰命名转换为标题格式""" return re_.sub("([a-z])([A-Z])","\g<1> \g<2>", x).title() def category_files(category: str) -> list: """返回类别目录中所有文件名的帮助函数""" files = [ x.stem for x in list(Path(f"pandas_ta/{category}/").glob("*.py")) if x.stem != "__init__" ] return files def get_drift(x: int) -> int: """如果不为零,则返回一个整数,否则默认为一""" return int(x) if isinstance(x, int) and x != 0 else 1 def get_offset(x: int) -> int: """返回一个整数,否则默认为零""" return int(x) if isinstance(x, int) else 0 def is_datetime_ordered(df: DataFrame or Series) -> bool: """如果索引是日期时间且有序,则返回 True""" index_is_datetime = is_datetime64_any_dtype(df.index) try: ordered = df.index[0] < df.index[-1] except RuntimeWarning: pass finally: return True if index_is_datetime and ordered else False def is_percent(x: int or float) -> bool: """检查是否为百分比""" if isinstance(x, (int, float)): return x is not None and x >= 0 and x <= 100 return False def non_zero_range(high: Series, low: Series) -> Series: """返回两个序列的差异,并对任何零值添加 epsilon。在加密数据中常见情况是 'high' = 'low'。""" diff = high - low if diff.eq(0).any().any(): diff += sflt.epsilon return diff def recent_maximum_index(x): """返回最近最大值的索引""" return int(argmax(x[::-1])) def recent_minimum_index(x): """返回最近最小值的索引""" return int(argmin(x[::-1])) def signed_series(series: Series, initial: int = None) -> Series: """返回带有或不带有初始值的有符号序列""" series = verify_series(series) sign = series.diff(1) sign[sign > 0] = 1 sign[sign < 0] = -1 sign.iloc[0] = initial return sign def tal_ma(name: str) -> int: """返回 TA Lib 的 MA 类型的枚举值""" # 检查是否导入了 talib 模块,并且 name 是否是字符串类型,并且长度大于1 if Imports["talib"] and isinstance(name, str) and len(name) > 1: # 如果满足条件,从 talib 模块导入 MA_Type from talib import MA_Type # 将 name 转换为小写 name = name.lower() # 根据 name 的不同取值返回对应的 MA_Type 枚举值 if name == "sma": return MA_Type.SMA # 0 elif name == "ema": return MA_Type.EMA # 1 elif name == "wma": return MA_Type.WMA # 2 elif name == "dema": return MA_Type.DEMA # 3 elif name == "tema": return MA_Type.TEMA # 4 elif name == "trima": return MA_Type.TRIMA # 5 elif name == "kama": return MA_Type.KAMA # 6 elif name == "mama": return MA_Type.MAMA # 7 elif name == "t3": return MA_Type.T3 # 8 # 如果不满足条件,返回默认值 0,代表 SMA return 0 # Default: SMA -> 0 # 定义一个函数,计算给定 Series 的无符号差值 def unsigned_differences(series: Series, amount: int = None, **kwargs) -> Series: """Unsigned Differences 返回两个 Series,一个是原始 Series 的无符号正差值,另一个是无符号负差值。 正差值 Series 仅包含增加值,负差值 Series 仅包含减少值。 默认示例: series = Series([3, 2, 2, 1, 1, 5, 6, 6, 7, 5, 3]) 返回 positive = Series([0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0]) negative = Series([0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1]) """ # 如果未提供 amount 参数,则默认为 1 amount = int(amount) if amount is not None else 1 # 计算 Series 的负差值 negative = series.diff(amount) # 将 NaN 值填充为 0 negative.fillna(0, inplace=True) # 复制负差值 Series 以备后用 positive = negative.copy() # 将正差值 Series 中小于等于 0 的值设置为 0 positive[positive <= 0] = 0 # 将正差值 Series 中大于 0 的值设置为 1 positive[positive > 0] = 1 # 将负差值 Series 中大于等于 0 的值设置为 0 negative[negative >= 0] = 0 # 将负差值 Series 中小于 0 的值设置为 1 negative[negative < 0] = 1 # 如果 kwargs 中包含 asint 参数且值为 True,则将 Series 转换为整数类型 if kwargs.pop("asint", False): positive = positive.astype(int) negative = negative.astype(int) # 返回正差值 Series 和负差值 Series return positive, negative # 定义一个函数,验证给定的 Series 是否满足指示器的最小长度要求 def verify_series(series: Series, min_length: int = None) -> Series: """If a Pandas Series and it meets the min_length of the indicator return it.""" # 判断是否指定了最小长度,并且最小长度是整数类型 has_length = min_length is not None and isinstance(min_length, int) # 如果给定的 series 不为空且是 Pandas Series 类型 if series is not None and isinstance(series, Series): # 如果指定了最小长度,并且 series 的大小小于最小长度,则返回 None,否则返回 series return None if has_length and series.size < min_length else series
.\pandas-ta\pandas_ta\utils\_math.py
# 设置文件编码为 UTF-8 # 导入 functools 模块中的 reduce 函数 # 从 math 模块中导入 floor 函数并将其命名为 mfloor # 从 operator 模块中导入 mul 函数 # 从 sys 模块中导入 float_info 对象并将其命名为 sflt # 从 typing 模块中导入 List、Optional 和 Tuple 类型 from functools import reduce from math import floor as mfloor from operator import mul from sys import float_info as sflt from typing import List, Optional, Tuple # 从 numpy 模块中导入 ones、triu、all、append、array、corrcoef、dot、fabs、exp、log、nan、ndarray、seterr、sqrt 和 sum 函数 from numpy import ones, triu from numpy import all as npAll from numpy import append as npAppend from numpy import array as npArray from numpy import corrcoef as npCorrcoef from numpy import dot as npDot from numpy import fabs as npFabs from numpy import exp as npExp from numpy import log as npLog from numpy import nan as npNaN from numpy import ndarray as npNdArray from numpy import seterr from numpy import sqrt as npSqrt from numpy import sum as npSum # 从 pandas 模块中导入 DataFrame 和 Series 类 from pandas import DataFrame, Series # 从 pandas_ta 包中导入 Imports 模块 from pandas_ta import Imports # 从 ._core 模块中导入 verify_series 函数 from ._core import verify_series # 定义 combination 函数,接收任意关键字参数并返回整型值 def combination(**kwargs: dict) -> int: """https://stackoverflow.com/questions/4941753/is-there-a-math-ncr-function-in-python""" # 从 kwargs 中取出键为 "n" 的值,若不存在则默认为 1,并转换为整型 n = int(npFabs(kwargs.pop("n", 1))) # 从 kwargs 中取出键为 "r" 的值,若不存在则默认为 0,并转换为整型 r = int(npFabs(kwargs.pop("r", 0))) # 如果参数中存在 "repetition" 或 "multichoose" 键,则执行以下操作 if kwargs.pop("repetition", False) or kwargs.pop("multichoose", False): # 计算修正后的 n 值 n = n + r - 1 # 若 r 小于 0,则返回 None # 如果 r 大于 n,则令 r 等于 n r = min(n, n - r) # 若 r 为 0,则返回 1 if r == 0: return 1 # 计算组合数的分子部分 numerator = reduce(mul, range(n, n - r, -1), 1) # 计算组合数的分母部分 denominator = reduce(mul, range(1, r + 1), 1) # 返回组合数结果 return numerator // denominator # 定义错误函数 erf(x),接收一个参数 x,返回 erf(x) 的值 def erf(x): """Error Function erf(x) The algorithm comes from Handbook of Mathematical Functions, formula 7.1.26. Source: https://stackoverflow.com/questions/457408/is-there-an-easily-available-implementation-of-erf-for-python """ # 保存 x 的符号 sign = 1 if x >= 0 else -1 x = abs(x) # 定义常数 a1 = 0.254829592 a2 = -0.284496736 a3 = 1.421413741 a4 = -1.453152027 a5 = 1.061405429 p = 0.3275911 # 使用 A&S 公式 7.1.26 计算 erf(x) 的值 t = 1.0 / (1.0 + p * x) y = 1.0 - (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t * npExp(-x * x) # 返回 erf(x) 的值,若 x 为负数,则取相反数 return sign * y # erf(-x) = -erf(x) # 定义斐波那契数列函数 fibonacci,接收一个整型参数 n 和任意关键字参数,返回一个 numpy 数组 def fibonacci(n: int = 2, **kwargs: dict) -> npNdArray: """Fibonacci Sequence as a numpy array""" # 将 n 转换为非负整数 n = int(npFabs(n)) if n >= 0 else 2 # 从 kwargs 中取出键为 "zero" 的值,若存在且为 True,则斐波那契数列以 0 开头 zero = kwargs.pop("zero", False) if zero: a, b = 0, 1 else: # 若不以 0 开头,则斐波那契数列从 1 开始,n 减 1 n -= 1 a, b = 1, 1 # 初始化结果数组,包含斐波那契数列的第一个元素 result = npArray([a]) # 循环生成斐波那契数列 for _ in range(0, n): a, b = b, a + b result = npAppend(result, a) # 从 kwargs 中取出键为 "weighted" 的值,若存在且为 True,则返回加权后的斐波那契数列 weighted = kwargs.pop("weighted", False) if weighted: # 计算斐波那契数列的总和 fib_sum = npSum(result) # 若总和大于 0,则返回斐波那契数列的每个元素除以总和的结果 if fib_sum > 0: return result / fib_sum else: # 若总和小于等于 0,则直接返回斐波那契数列 return result else: # 若不加权,则直接返回斐波那契数 """Classic Linear Regression in Numpy or Scikit-Learn""" # 确保 x 和 y 是 Series 类型的数据 x, y = verify_series(x), verify_series(y) # 获取 x 和 y 的大小 m, n = x.size, y.size # 如果 x 和 y 的大小不相等,则打印错误信息并返回空字典 if m != n: print(f"[X] Linear Regression X and y have unequal total observations: {m} != {n}") return {} # 如果导入了 sklearn 模块,则使用 sklearn 进行线性回归 if Imports["sklearn"]: return _linear_regression_sklearn(x, y) # 否则使用 numpy 进行线性回归 else: return _linear_regression_np(x, y) # 返回给定序列的对数几何平均值 def log_geometric_mean(series: Series) -> float: n = series.size # 获取序列的大小 if n < 2: return 0 # 如果序列大小小于2,则返回0 else: series = series.fillna(0) + 1 # 将序列中的空值填充为0,并加1 if npAll(series > 0): # 检查序列中的所有值是否大于0 return npExp(npLog(series).sum() / n) - 1 # 计算序列的对数和的均值的指数,然后减去1 return 0 # 如果序列中存在小于等于0的值,则返回0 # 返回帕斯卡三角形的第n行 def pascals_triangle(n: int = None, **kwargs: dict) -> npNdArray: n = int(npFabs(n)) if n is not None else 0 # 将n转换为整数并取绝对值,如果n为None则设为0 # 计算 triangle = npArray([combination(n=n, r=i) for i in range(0, n + 1)]) # 创建帕斯卡三角形的第n行 triangle_sum = npSum(triangle) # 计算三角形的总和 triangle_weights = triangle / triangle_sum # 计算每个元素的权重 inverse_weights = 1 - triangle_weights # 计算逆权重 weighted = kwargs.pop("weighted", False) # 获取weighted参数,默认为False inverse = kwargs.pop("inverse", False) # 获取inverse参数,默认为False if weighted and inverse: # 如果weighted和inverse都为True return inverse_weights # 返回逆权重 if weighted: # 如果weighted为True return triangle_weights # 返回权重 if inverse: # 如果inverse为True return None # 返回None return triangle # 返回帕斯卡三角形的第n行 # 返回对称三角形的第n行 def symmetric_triangle(n: int = None, **kwargs: dict) -> Optional[List[int]]: n = int(npFabs(n)) if n is not None else 2 # 将n转换为整数并取绝对值,如果n为None则设为2 triangle = None if n == 2: # 如果n为2 triangle = [1, 1] # 返回固定的列表 if n > 2: # 如果n大于2 if n % 2 == 0: # 如果n为偶数 front = [i + 1 for i in range(0, mfloor(n / 2))] # 创建前半部分列表 triangle = front + front[::-1] # 创建对称三角形 else: front = [i + 1 for i in range(0, mfloor(0.5 * (n + 1)))] # 创建前半部分列表 triangle = front.copy() # 复制前半部分列表 front.pop() # 移除最后一个元素 triangle += front[::-1] # 创建对称三角形 if kwargs.pop("weighted", False) and isinstance(triangle, list): # 如果weighted为True且triangle是列表类型 triangle_sum = npSum(triangle) # 计算三角形的总和 triangle_weights = triangle / triangle_sum # 计算每个元素的权重 return triangle_weights # 返回权重 return triangle # 返回对称三角形的第n行 # 返回权重与值x的点积 def weights(w: npNdArray): def _dot(x): return npDot(w, x) return _dot # 如果值接近于零,则返回零,否则返回自身 def zero(x: Tuple[int, float]) -> Tuple[int, float]: return 0 if abs(x) < sflt.epsilon else x # DataFrame相关性分析辅助函数 def df_error_analysis(dfA: DataFrame, dfB: DataFrame, **kwargs: dict) -> DataFrame: corr_method = kwargs.pop("corr_method", "pearson") # 获取相关性计算方法,默认为pearson # 计算它们的差异和相关性 diff = dfA - dfB # 计算DataFrame的差异 corr = dfA.corr(dfB, method=corr_method) # 计算DataFrame的相关性 # 用于绘图 if kwargs.pop("plot", False): # 如果plot为True diff.hist() # 绘制差异的直方图 if diff[diff > 0].any(): # 如果差异中存在大于0的值 diff.plot(kind="kde") # 绘制密度曲线图 if kwargs.pop("triangular", False): # 如果triangular为True return corr.where(triu(ones(corr.shape)).astype(bool)) # 返回上三角部分的相关性矩阵 return corr # 返回相关性矩阵 # 私有函数 # 使用 Numpy 实现简单的线性回归,适用于没有安装 sklearn 包的环境,接受两个一维数组作为输入 def _linear_regression_np(x: Series, y: Series) -> dict: # 初始化结果字典,所有值设为 NaN result = {"a": npNaN, "b": npNaN, "r": npNaN, "t": npNaN, "line": npNaN} # 计算 x 和 y 的总和 x_sum = x.sum() y_sum = y.sum() # 如果 x 的总和不为 0 if int(x_sum) != 0: # 计算 x 和 y 之间的相关系数 r = npCorrcoef(x, y)[0, 1] m = x.size # 计算回归系数 b r_mix = m * (x * y).sum() - x_sum * y_sum b = r_mix // (m * (x * x).sum() - x_sum * x_sum) # 计算截距 a 和回归线 a = y.mean() - b * x.mean() line = a + b * x # 临时保存 Numpy 的错误设置 _np_err = seterr() # 忽略除零和无效值的错误 seterr(divide="ignore", invalid="ignore") # 更新结果字典 result = { "a": a, "b": b, "r": r, "t": r / npSqrt((1 - r * r) / (m - 2)), "line": line, } # 恢复 Numpy 的错误设置 seterr(divide=_np_err["divide"], invalid=_np_err["invalid"]) return result # 使用 Scikit Learn 实现简单的线性回归,适用于安装了 sklearn 包的环境,接受两个一维数组作为输入 def _linear_regression_sklearn(x: Series, y: Series) -> dict: # 导入 LinearRegression 类 from sklearn.linear_model import LinearRegression # 将 x 转换为 DataFrame,创建 LinearRegression 模型并拟合数据 X = DataFrame(x) lr = LinearRegression().fit(X, y=y) # 计算决定系数 r = lr.score(X, y=y) # 获取截距和斜率 a, b = lr.intercept_, lr.coef_[0] # 更新结果字典 result = { "a": a, "b": b, "r": r, "t": r / npSqrt((1 - r * r) / (x.size - 2)), "line": a + b * x } return result
PandasTA 源码解析(十四)(3)https://developer.aliyun.com/article/1506235