滑动均值和标准差
为了更好利用向量化来加速,滑动窗口使用np.lib.stride_tricks.sliding_window_view(x, win)
提取,它会返回所有x[i]
开头并且长度为win
的数组的数组。
def rolling(x, win): r = np.lib.stride_tricks.sliding_window_view(x, win) pad = np.zeros([len(x) - len(r), win]) * np.nan return np.vstack([pad, r]) def rolling_mean(x, win): return rolling(x, win).mean(-1) def rolling_std(x, win): return rolling(x, win).std(-1)
布林带
def bollinger(close, win=10, nstd=2): means = rolling_mean(close, win) stds = rolling_std(close, win) upper = means + nstd * stds lower = means - nstd * stds return upper, means, lower
指数滑动均值
这是原始实现:
# 计算指数平滑 # y[i] = alpha * x[i] + (1 - alpha) * y[i - 1] def exp_smooth_naive(x, alpha): y = x.copy() for i in range(1, len(y)): y[i] = y[i] * alpha + y[i - 1] * (1 - alpha) return y
原始公式是递归的,需要改成通项才能向量化,这是推导过程:
y[0] = x[0] = init y[t] = alpha * x[t] + (1-alpha) * y[t-1] = alpha * x[t] + (1-alpha) * alpha * x[t-1] + (1-alpha) ** 2 * y[t-2] = alpha * x[t] + (1-alpha) * alpha * x[t-1] + ... + (1-alpha)** t * init = alpha * x[t] + (1-alpha) * alpha * x[t-1] + ... + alpha * (1-alpha)** t * init + (1 - alpha) ** (t + 1) * init = Σ(alpha * (1 - alpha) ** i * x[t-i]; i: 0 -> t) + (1 - alpha) ** (t + 1) * init corr[i] = alpha * (1-alpha) ** i supl[t] = (1 - alpha) ** (t + 1) * init y[t] = Σ(corr[i] * x(t-i); i: 0 -> t) + supl[t] y = conv(corr, x) + supl
这就完成了向量化,因为 NumPy 或者 PyTorch 都针对卷积做了特殊优化。
def exp_smooth_vec(x, alpha): init, n = x[0], len(x) corr = alpha * (1 - alpha) ** np.arange(0, n) supl = (1 - alpha) ** (np.arange(0, n) + 1) * init y = np.convolve(corr, x, 'full')[:n] + supl return y exp_smooth = exp_smooth_vec def rolling_ema(x, win): x = np.asarray(x) alpha = 2 / (win + 1.0) return exp_smooth(x, alpha)
MACD
def macd(close, fast_win=12, slow_win=26, sig_win=9): fast = rolling_ema(close, fast_win) slow = rolling_ema(close, slow_win) dif = fast - slow dea = rolling_ema(dif, sig_win) macd_ = dif - dea * 2 return macd_, dif, dea
RSI
def rsi(close, win=3): change = np.diff(close) up = np.where(change > 0, change, 0) down = np.where(change < 0, change, 0) sum_up = rolling(up, win).sum(-1) sum_down = rolling(down, win).sum(-1) eps = 1e-12 rs = sum_up / (sum_down + eps) rsi_ = 100 - 100 / (1 + rs) return np.hstack([[np.nan], rsi_])
KDJ
def kdj(close, low, high, n=9): hn = rolling(high, n).max(-1) ln = rolling(low, n).min(-1) rsv = (close - ln) / (hn - ln) * 100 rsv = [x for x in rsv if not np.isnan(x)] rsv = np.hstack([[50], rsv]) k = exp_smooth(rsv, 2/3) d = exp_smooth(k, 2/3) j = 3 * k - 2 * d pad = [np.nan] * (len(close) - len(k)) k = np.hstack([pad, k]) d = np.hstack([pad, d]) j = np.hstack([pad, j]) return k, d, j
OBV
def obv(close, vol): change = np.diff(close) sig = np.hstack([[1], np.sign(change)]) obv_ = np.cumsum(vol * sig) return obv_