PandasTA 源码解析(十四)(2)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: PandasTA 源码解析(十四)

PandasTA 源码解析(十四)(1)https://developer.aliyun.com/article/1506233

.\pandas-ta\pandas_ta\utils\_core.py

# 设置文件编码为 utf-8
# 导入 re 模块并重命名为 re_
# 从 pathlib 模块中导入 Path 类
# 从 sys 模块中导入 float_info 并重命名为 sflt
# 从 numpy 模块中导入 argmax 和 argmin 函数
# 从 pandas 模块中导入 DataFrame 和 Series 类
# 从 pandas 模块中导入 is_datetime64_any_dtype 函数
# 从 pandas_ta 模块中导入 Imports
def _camelCase2Title(x: str):
    """将驼峰命名转换为标题格式"""
    return re_.sub("([a-z])([A-Z])","\g<1> \g<2>", x).title()
def category_files(category: str) -> list:
    """返回类别目录中所有文件名的帮助函数"""
    files = [
        x.stem
        for x in list(Path(f"pandas_ta/{category}/").glob("*.py"))
        if x.stem != "__init__"
    ]
    return files
def get_drift(x: int) -> int:
    """如果不为零,则返回一个整数,否则默认为一"""
    return int(x) if isinstance(x, int) and x != 0 else 1
def get_offset(x: int) -> int:
    """返回一个整数,否则默认为零"""
    return int(x) if isinstance(x, int) else 0
def is_datetime_ordered(df: DataFrame or Series) -> bool:
    """如果索引是日期时间且有序,则返回 True"""
    index_is_datetime = is_datetime64_any_dtype(df.index)
    try:
        ordered = df.index[0] < df.index[-1]
    except RuntimeWarning:
        pass
    finally:
        return True if index_is_datetime and ordered else False
def is_percent(x: int or float) -> bool:
    """检查是否为百分比"""
    if isinstance(x, (int, float)):
        return x is not None and x >= 0 and x <= 100
    return False
def non_zero_range(high: Series, low: Series) -> Series:
    """返回两个序列的差异,并对任何零值添加 epsilon。在加密数据中常见情况是 'high' = 'low'。"""
    diff = high - low
    if diff.eq(0).any().any():
        diff += sflt.epsilon
    return diff
def recent_maximum_index(x):
    """返回最近最大值的索引"""
    return int(argmax(x[::-1]))
def recent_minimum_index(x):
    """返回最近最小值的索引"""
    return int(argmin(x[::-1]))
def signed_series(series: Series, initial: int = None) -> Series:
    """返回带有或不带有初始值的有符号序列"""
    series = verify_series(series)
    sign = series.diff(1)
    sign[sign > 0] = 1
    sign[sign < 0] = -1
    sign.iloc[0] = initial
    return sign
def tal_ma(name: str) -> int:
    """返回 TA Lib 的 MA 类型的枚举值"""
    # 检查是否导入了 talib 模块,并且 name 是否是字符串类型,并且长度大于1
    if Imports["talib"] and isinstance(name, str) and len(name) > 1:
        # 如果满足条件,从 talib 模块导入 MA_Type
        from talib import MA_Type
        # 将 name 转换为小写
        name = name.lower()
        # 根据 name 的不同取值返回对应的 MA_Type 枚举值
        if   name == "sma":   return MA_Type.SMA   # 0
        elif name == "ema":   return MA_Type.EMA   # 1
        elif name == "wma":   return MA_Type.WMA   # 2
        elif name == "dema":  return MA_Type.DEMA  # 3
        elif name == "tema":  return MA_Type.TEMA  # 4
        elif name == "trima": return MA_Type.TRIMA # 5
        elif name == "kama":  return MA_Type.KAMA  # 6
        elif name == "mama":  return MA_Type.MAMA  # 7
        elif name == "t3":    return MA_Type.T3    # 8
    # 如果不满足条件,返回默认值 0,代表 SMA
    return 0 # Default: SMA -> 0
# 定义一个函数,计算给定 Series 的无符号差值
def unsigned_differences(series: Series, amount: int = None, **kwargs) -> Series:
    """Unsigned Differences
    返回两个 Series,一个是原始 Series 的无符号正差值,另一个是无符号负差值。
    正差值 Series 仅包含增加值,负差值 Series 仅包含减少值。
    默认示例:
    series   = Series([3, 2, 2, 1, 1, 5, 6, 6, 7, 5, 3]) 返回
    positive  = Series([0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0])
    negative = Series([0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1])
    """
    # 如果未提供 amount 参数,则默认为 1
    amount = int(amount) if amount is not None else 1
    # 计算 Series 的负差值
    negative = series.diff(amount)
    # 将 NaN 值填充为 0
    negative.fillna(0, inplace=True)
    # 复制负差值 Series 以备后用
    positive = negative.copy()
    # 将正差值 Series 中小于等于 0 的值设置为 0
    positive[positive <= 0] = 0
    # 将正差值 Series 中大于 0 的值设置为 1
    positive[positive > 0] = 1
    # 将负差值 Series 中大于等于 0 的值设置为 0
    negative[negative >= 0] = 0
    # 将负差值 Series 中小于 0 的值设置为 1
    negative[negative < 0] = 1
    # 如果 kwargs 中包含 asint 参数且值为 True,则将 Series 转换为整数类型
    if kwargs.pop("asint", False):
        positive = positive.astype(int)
        negative = negative.astype(int)
    # 返回正差值 Series 和负差值 Series
    return positive, negative
# 定义一个函数,验证给定的 Series 是否满足指示器的最小长度要求
def verify_series(series: Series, min_length: int = None) -> Series:
    """If a Pandas Series and it meets the min_length of the indicator return it."""
    # 判断是否指定了最小长度,并且最小长度是整数类型
    has_length = min_length is not None and isinstance(min_length, int)
    # 如果给定的 series 不为空且是 Pandas Series 类型
    if series is not None and isinstance(series, Series):
        # 如果指定了最小长度,并且 series 的大小小于最小长度,则返回 None,否则返回 series
        return None if has_length and series.size < min_length else series

.\pandas-ta\pandas_ta\utils\_math.py

# 设置文件编码为 UTF-8
# 导入 functools 模块中的 reduce 函数
# 从 math 模块中导入 floor 函数并将其命名为 mfloor
# 从 operator 模块中导入 mul 函数
# 从 sys 模块中导入 float_info 对象并将其命名为 sflt
# 从 typing 模块中导入 List、Optional 和 Tuple 类型
from functools import reduce
from math import floor as mfloor
from operator import mul
from sys import float_info as sflt
from typing import List, Optional, Tuple
# 从 numpy 模块中导入 ones、triu、all、append、array、corrcoef、dot、fabs、exp、log、nan、ndarray、seterr、sqrt 和 sum 函数
from numpy import ones, triu
from numpy import all as npAll
from numpy import append as npAppend
from numpy import array as npArray
from numpy import corrcoef as npCorrcoef
from numpy import dot as npDot
from numpy import fabs as npFabs
from numpy import exp as npExp
from numpy import log as npLog
from numpy import nan as npNaN
from numpy import ndarray as npNdArray
from numpy import seterr
from numpy import sqrt as npSqrt
from numpy import sum as npSum
# 从 pandas 模块中导入 DataFrame 和 Series 类
from pandas import DataFrame, Series
# 从 pandas_ta 包中导入 Imports 模块
from pandas_ta import Imports
# 从 ._core 模块中导入 verify_series 函数
from ._core import verify_series
# 定义 combination 函数,接收任意关键字参数并返回整型值
def combination(**kwargs: dict) -> int:
    """https://stackoverflow.com/questions/4941753/is-there-a-math-ncr-function-in-python"""
    # 从 kwargs 中取出键为 "n" 的值,若不存在则默认为 1,并转换为整型
    n = int(npFabs(kwargs.pop("n", 1)))
    # 从 kwargs 中取出键为 "r" 的值,若不存在则默认为 0,并转换为整型
    r = int(npFabs(kwargs.pop("r", 0)))
    # 如果参数中存在 "repetition" 或 "multichoose" 键,则执行以下操作
    if kwargs.pop("repetition", False) or kwargs.pop("multichoose", False):
        # 计算修正后的 n 值
        n = n + r - 1
    # 若 r 小于 0,则返回 None
    # 如果 r 大于 n,则令 r 等于 n
    r = min(n, n - r)
    # 若 r 为 0,则返回 1
    if r == 0:
        return 1
    # 计算组合数的分子部分
    numerator = reduce(mul, range(n, n - r, -1), 1)
    # 计算组合数的分母部分
    denominator = reduce(mul, range(1, r + 1), 1)
    # 返回组合数结果
    return numerator // denominator
# 定义错误函数 erf(x),接收一个参数 x,返回 erf(x) 的值
def erf(x):
    """Error Function erf(x)
    The algorithm comes from Handbook of Mathematical Functions, formula 7.1.26.
    Source: https://stackoverflow.com/questions/457408/is-there-an-easily-available-implementation-of-erf-for-python
    """
    # 保存 x 的符号
    sign = 1 if x >= 0 else -1
    x = abs(x)
    # 定义常数
    a1 =  0.254829592
    a2 = -0.284496736
    a3 =  1.421413741
    a4 = -1.453152027
    a5 =  1.061405429
    p  =  0.3275911
    # 使用 A&S 公式 7.1.26 计算 erf(x) 的值
    t = 1.0 / (1.0 + p * x)
    y = 1.0 - (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t * npExp(-x * x)
    # 返回 erf(x) 的值,若 x 为负数,则取相反数
    return sign * y # erf(-x) = -erf(x)
# 定义斐波那契数列函数 fibonacci,接收一个整型参数 n 和任意关键字参数,返回一个 numpy 数组
def fibonacci(n: int = 2, **kwargs: dict) -> npNdArray:
    """Fibonacci Sequence as a numpy array"""
    # 将 n 转换为非负整数
    n = int(npFabs(n)) if n >= 0 else 2
    # 从 kwargs 中取出键为 "zero" 的值,若存在且为 True,则斐波那契数列以 0 开头
    zero = kwargs.pop("zero", False)
    if zero:
        a, b = 0, 1
    else:
        # 若不以 0 开头,则斐波那契数列从 1 开始,n 减 1
        n -= 1
        a, b = 1, 1
    # 初始化结果数组,包含斐波那契数列的第一个元素
    result = npArray([a])
    # 循环生成斐波那契数列
    for _ in range(0, n):
        a, b = b, a + b
        result = npAppend(result, a)
    # 从 kwargs 中取出键为 "weighted" 的值,若存在且为 True,则返回加权后的斐波那契数列
    weighted = kwargs.pop("weighted", False)
    if weighted:
        # 计算斐波那契数列的总和
        fib_sum = npSum(result)
        # 若总和大于 0,则返回斐波那契数列的每个元素除以总和的结果
        if fib_sum > 0:
            return result / fib_sum
        else:
            # 若总和小于等于 0,则直接返回斐波那契数列
            return result
    else:
        # 若不加权,则直接返回斐波那契数
    """Classic Linear Regression in Numpy or Scikit-Learn"""
    # 确保 x 和 y 是 Series 类型的数据
    x, y = verify_series(x), verify_series(y)
    # 获取 x 和 y 的大小
    m, n = x.size, y.size
    # 如果 x 和 y 的大小不相等,则打印错误信息并返回空字典
    if m != n:
        print(f"[X] Linear Regression X and y have unequal total observations: {m} != {n}")
        return {}
    # 如果导入了 sklearn 模块,则使用 sklearn 进行线性回归
    if Imports["sklearn"]:
        return _linear_regression_sklearn(x, y)
    # 否则使用 numpy 进行线性回归
    else:
        return _linear_regression_np(x, y)
# 返回给定序列的对数几何平均值
def log_geometric_mean(series: Series) -> float:
    n = series.size  # 获取序列的大小
    if n < 2: return 0  # 如果序列大小小于2,则返回0
    else:
        series = series.fillna(0) + 1  # 将序列中的空值填充为0,并加1
        if npAll(series > 0):  # 检查序列中的所有值是否大于0
            return npExp(npLog(series).sum() / n) - 1  # 计算序列的对数和的均值的指数,然后减去1
        return 0  # 如果序列中存在小于等于0的值,则返回0
# 返回帕斯卡三角形的第n行
def pascals_triangle(n: int = None, **kwargs: dict) -> npNdArray:
    n = int(npFabs(n)) if n is not None else 0  # 将n转换为整数并取绝对值,如果n为None则设为0
    # 计算
    triangle = npArray([combination(n=n, r=i) for i in range(0, n + 1)])  # 创建帕斯卡三角形的第n行
    triangle_sum = npSum(triangle)  # 计算三角形的总和
    triangle_weights = triangle / triangle_sum  # 计算每个元素的权重
    inverse_weights = 1 - triangle_weights  # 计算逆权重
    weighted = kwargs.pop("weighted", False)  # 获取weighted参数,默认为False
    inverse = kwargs.pop("inverse", False)  # 获取inverse参数,默认为False
    if weighted and inverse:  # 如果weighted和inverse都为True
        return inverse_weights  # 返回逆权重
    if weighted:  # 如果weighted为True
        return triangle_weights  # 返回权重
    if inverse:  # 如果inverse为True
        return None  # 返回None
    return triangle  # 返回帕斯卡三角形的第n行
# 返回对称三角形的第n行
def symmetric_triangle(n: int = None, **kwargs: dict) -> Optional[List[int]]:
    n = int(npFabs(n)) if n is not None else 2  # 将n转换为整数并取绝对值,如果n为None则设为2
    triangle = None
    if n == 2:  # 如果n为2
        triangle = [1, 1]  # 返回固定的列表
    if n > 2:  # 如果n大于2
        if n % 2 == 0:  # 如果n为偶数
            front = [i + 1 for i in range(0, mfloor(n / 2))]  # 创建前半部分列表
            triangle = front + front[::-1]  # 创建对称三角形
        else:
            front = [i + 1 for i in range(0, mfloor(0.5 * (n + 1)))]  # 创建前半部分列表
            triangle = front.copy()  # 复制前半部分列表
            front.pop()  # 移除最后一个元素
            triangle += front[::-1]  # 创建对称三角形
    if kwargs.pop("weighted", False) and isinstance(triangle, list):  # 如果weighted为True且triangle是列表类型
        triangle_sum = npSum(triangle)  # 计算三角形的总和
        triangle_weights = triangle / triangle_sum  # 计算每个元素的权重
        return triangle_weights  # 返回权重
    return triangle  # 返回对称三角形的第n行
# 返回权重与值x的点积
def weights(w: npNdArray):
    def _dot(x):
        return npDot(w, x)
    return _dot
# 如果值接近于零,则返回零,否则返回自身
def zero(x: Tuple[int, float]) -> Tuple[int, float]:
    return 0 if abs(x) < sflt.epsilon else x
# DataFrame相关性分析辅助函数
def df_error_analysis(dfA: DataFrame, dfB: DataFrame, **kwargs: dict) -> DataFrame:
    corr_method = kwargs.pop("corr_method", "pearson")  # 获取相关性计算方法,默认为pearson
    # 计算它们的差异和相关性
    diff = dfA - dfB  # 计算DataFrame的差异
    corr = dfA.corr(dfB, method=corr_method)  # 计算DataFrame的相关性
    # 用于绘图
    if kwargs.pop("plot", False):  # 如果plot为True
        diff.hist()  # 绘制差异的直方图
        if diff[diff > 0].any():  # 如果差异中存在大于0的值
            diff.plot(kind="kde")  # 绘制密度曲线图
    if kwargs.pop("triangular", False):  # 如果triangular为True
        return corr.where(triu(ones(corr.shape)).astype(bool))  # 返回上三角部分的相关性矩阵
    return corr  # 返回相关性矩阵
# 私有函数
# 使用 Numpy 实现简单的线性回归,适用于没有安装 sklearn 包的环境,接受两个一维数组作为输入
def _linear_regression_np(x: Series, y: Series) -> dict:
    # 初始化结果字典,所有值设为 NaN
    result = {"a": npNaN, "b": npNaN, "r": npNaN, "t": npNaN, "line": npNaN}
    # 计算 x 和 y 的总和
    x_sum = x.sum()
    y_sum = y.sum()
    # 如果 x 的总和不为 0
    if int(x_sum) != 0:
        # 计算 x 和 y 之间的相关系数
        r = npCorrcoef(x, y)[0, 1]
        m = x.size
        # 计算回归系数 b
        r_mix = m * (x * y).sum() - x_sum * y_sum
        b = r_mix // (m * (x * x).sum() - x_sum * x_sum)
        # 计算截距 a 和回归线
        a = y.mean() - b * x.mean()
        line = a + b * x
        # 临时保存 Numpy 的错误设置
        _np_err = seterr()
        # 忽略除零和无效值的错误
        seterr(divide="ignore", invalid="ignore")
        # 更新结果字典
        result = {
            "a": a, "b": b, "r": r,
            "t": r / npSqrt((1 - r * r) / (m - 2)),
            "line": line,
        }
        # 恢复 Numpy 的错误设置
        seterr(divide=_np_err["divide"], invalid=_np_err["invalid"])
    return result
# 使用 Scikit Learn 实现简单的线性回归,适用于安装了 sklearn 包的环境,接受两个一维数组作为输入
def _linear_regression_sklearn(x: Series, y: Series) -> dict:
    # 导入 LinearRegression 类
    from sklearn.linear_model import LinearRegression
    # 将 x 转换为 DataFrame,创建 LinearRegression 模型并拟合数据
    X = DataFrame(x)
    lr = LinearRegression().fit(X, y=y)
    # 计算决定系数
    r = lr.score(X, y=y)
    # 获取截距和斜率
    a, b = lr.intercept_, lr.coef_[0]
    # 更新结果字典
    result = {
        "a": a, "b": b, "r": r,
        "t": r / npSqrt((1 - r * r) / (x.size - 2)),
        "line": a + b * x
    }
    return result

PandasTA 源码解析(十四)(3)https://developer.aliyun.com/article/1506235

相关文章
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
88 2
|
13天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
13天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
13天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
57 12
|
1月前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
14天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
2月前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
2月前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
63 3
|
3月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
68 5

热门文章

最新文章