一文囊括序列预测方法(源码)

简介: 时间序列是指将某种现象某一个统计指标在不同时间上的各个数值,按时间先后顺序排列而形成的序列。典型的时间序列问题,例如股价预测、制造业中的电力预测、传统消费品行业的销售预测、客户日活跃量预测等等,本文以客户日活跃量预测为例。

1 时间序列


时间序列是指将某种现象某一个统计指标在不同时间上的各个数值,按时间先后顺序排列而形成的序列。典型的时间序列问题,例如股价预测、制造业中的电力预测、传统消费品行业的销售预测、客户日活跃量预测等等,本文以客户日活跃量预测为例。



2 预测方法


时间序列的预测方法可以归纳为三类: 1、时间序列基本规则法-周期因子法; 2、传统序列预测方法,如均值回归、ARIMA等线性模型; 3、机器学习方法,将序列预测转为有监督建模预测,如XGBOOST集成学习方法,LSTM长短期记忆神经网络模型。


2.1 周期因子法


当序列存在周期性时,通过加工出数据的周期性特征预测。这种比较麻烦,简述下流程不做展开。 1、计算周期的因子 。 2、计算base 3、预测结果=周期因子*base


2.2 Ariama


ARIMA模型,差分整合移动平均自回归模型,又称整合移动平均自回归模型 (移动也可称作滑动),时间序列预测分析方法之一。ARIMA(p,d,q)中, AR是"自回归",p为自回归项数;MA为"滑动平均",q为滑动平均项数, d为使之成为平稳序列所做的差分次数(阶数)。 建模的主要步骤是: 1、数据需要先做平稳法处理:采用(对数变换或差分)平稳化后,并检验符合平稳非白噪声序列; 2、观察PACF和ACF截尾/信息准则定阶确定(p, q); 3、 建立ARIMA(p,d,q)模型做预测;


"""
ARIMA( 差分自回归移动平均模型)预测DAU指标
"""
import numpy as np
import pandas as pd
import seaborn as sns
import scipy
import matplotlib.pyplot as plt
import statsmodels.api as sm
import warnings
from math import sqrt
from pandas import Series
from sklearn.metrics import mean_squared_error
from statsmodels.graphics.tsaplots import acf, pacf,plot_acf, plot_pacf
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.arima_model import ARIMA, ARMA, ARIMAResults
from statsmodels.tsa.stattools import adfuller as ADF
from keras.models import load_model
warnings.filterwarnings("ignore")
df = pd.read_csv('DAU.csv')
dau = df["dau"]
# 折线图
df.plot()
plt.show()
# 箱线图
ax = sns.boxplot(y=dau)
plt.show()
# pearsonr时间相关性
# a = df['dau']
# b = df.index
# print(scipy.stats.pearsonr(a,b))
# 自相关性
plot_acf(dau)
plot_pacf(dau)
plt.show()
print('raw序列的ADF')
# p值大于0.05为非平衡时间序列
print(ADF(dau))
#对数变换平稳处理 
# dau_log = np.log(dau)
# dau_log = dau_log.ewm(com=0.5, span=12).mean()
# plot_acf(dau_log)
# plot_pacf(dau_log)
# plt.show()
# print('log序列的ADF')
# print(ADF(dau_log))
# print('log序列的白噪声检验结果')
# # 大于0.05为白噪声序列
# print(acorr_ljungbox(dau_log, lags=1))
#差分平稳处理
diff_1_df = dau.diff(1).dropna(how=any)
diff_1_df = diff_1_df
diff_1_df.plot()
plot_acf(diff_1_df)
plot_pacf(diff_1_df)
plt.show()
print('差分序列的ADF')
print(ADF(diff_1_df))
print('差分序列的白噪声检验结果')
# 大于0.05为白噪声序列
print(acorr_ljungbox(diff_1_df, lags=1))
# # 在满足检验条件后,给出最优p q值 ()
r, rac, Q = sm.tsa.acf(diff_1_df, qstat=True)
prac = pacf(diff_1_df, method='ywmle')
table_data = np.c_[range(1,len(r)), r[1:], rac, prac[1:len(rac)+1], Q]
table = pd.DataFrame(table_data, columns=['lag', "AC","Q", "PAC", "Prob(>Q)"])
order = sm.tsa.arma_order_select_ic(diff_1_df, max_ar=7, max_ma=7, ic=['aic', 'bic', 'hqic'])
p, q =order.bic_min_order
print("p,q")
print(p, q)
# 建立ARIMA(p, d, q)模型  d=1
order = (p, 1, q)
train_X = diff_1_df[:]
arima_model = ARIMA(train_X, order).fit()
# 模型报告
# print(arima_model.summary2())
# 保存模型
arima_model.save('./data/arima_model.h5')
# # load model
arima_model = ARIMAResults.load('./data/arima_model.h5')
# 预测未来两天数据
predict_data_02 = arima_model.predict(start=len(train_X), end=len(train_X) + 1, dynamic = False)
# 预测历史数据
predict_data = arima_model.predict(dynamic = False)
# 逆log化
# original_series = np.exp(train_X.values[1:] + np.log(dau.values[1:-1]))
# predict_series = np.exp(predict_data.values + np.log(dau.values[1:-1]))
# 逆差分
original_series = train_X.values[1:] + dau.values[1:-1]
predict_series = predict_data.values + dau.values[1:-1]
# comp = pd.DataFrame()
# comp['original'] = original_series
# comp['predict'] = predict_series
split_num = int(len(dau.values)/3) or 1
rmse = sqrt(mean_squared_error(original_series[-split_num:], predict_series[-split_num:]))
print('Test RMSE: %.3f' % rmse)
# (0,1,0)Test RMSE
plt.title('ARIMA RMSE: %.3f' % rmse)
plt.plot(original_series[-split_num:], label="original_series")
plt.plot(predict_series[-split_num:], label="predict_series")
plt.legend()
plt.show()



2.3 LSTM


Long Short Term 网络是一种 RNN 特殊的类型,可以学习长期依赖序列信息。 LSTM区别于RNN的地方,主要就在于它在算法中加入了一个判断信息有用与否的“处理器”,这个处理器作用的结构被称为cell。一个cell当中被放置了三扇门,分别叫做输入门、遗忘门和输出门。一个信息进入LSTM的网络当中,只有符合信息才会留下,不符的信息则通过遗忘门被遗忘。通过这机制减少梯度爆炸/消失的风险。



建模主要的步骤: 1、数据处理:差分法数据平稳化;MAX-MIN法数据标准化;构建监督学习训练集;(对于LSTM,差分及标准化不是必要的) 2、模型训练并预测;


"""
LSTM预测DAU指标
"""
import numpy as np
from pandas import DataFrame, datetime, concat,read_csv, Series
from matplotlib import pyplot as plt
from sklearn.metrics import mean_squared_error
from math import sqrt
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.models import load_model
from keras.layers import Dense
from keras.layers import LSTM
from keras.callbacks import EarlyStopping
from keras import regularizers
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.stattools import adfuller as ADF
# convert date
def parser(x):
    return datetime.strptime(x,"%Y-%m-%d")
#supervised
def timeseries_to_supervised(data, lag=1):
  df = DataFrame(data)
  columns = [df.shift(1) for i in range(1, lag+1)]
  columns.append(df)
  df = concat(columns, axis=1)
  df.fillna(0, inplace=True)
  return df
# diff series
def difference(dataset, interval=1):
    diff = list()
    for i in range(interval, len(dataset)):
        value = dataset[i] - dataset[i - interval]
        diff.append(value)
    return Series(diff)
# invert diff value
def inverse_difference(history, yhat, interval=1):
    return yhat + history[-interval]
# max_min标准化  [-1, 1]
def scale(train, test):
  # fit scaler
  scaler = MinMaxScaler(feature_range=(-1, 1))
  scaler = scaler.fit(train)
  # transform train
  train = train.reshape(train.shape[0], train.shape[1])
  train_scaled = scaler.transform(train)
  # transform test
  test = test.reshape(test.shape[0], test.shape[1])
  test_scaled = scaler.transform(test)
  return scaler, train_scaled, test_scaled
# invert scale transform
def invert_scale(scaler, X, value):
  new_row = [x for x in X] + [value]
  array = np.array(new_row)
  array = array.reshape(1, len(array))
  inverted = scaler.inverse_transform(array)
  return inverted[0, -1]
# model train
def fit_lstm(train, batch_size, nb_epoch, neurons):
  X, y = train[:,0:-1], train[:,-1]
  # reshp
  X = X.reshape(X.shape[0], 1, X.shape[1])
  model = Sequential()
  # stateful
  # input(samples:batch row, time steps:1, features:one time observed)
  model.add(LSTM(neurons,
                 batch_input_shape=(batch_size, X.shape[1], X.shape[2]),
                 stateful=True, return_sequences=True, dropout=0.2))
  model.add(Dense(1))
  model.compile(loss="mean_squared_error", optimizer="adam")
# train
  train_loss = []
  val_loss = []
  for i in range(nb_epoch):
    # shuffle=false
    history = model.fit(X, y, batch_size=batch_size, epochs=1,verbose=0,shuffle=False,validation_split=0.3)
    train_loss.append(history.history['loss'][0])
    val_loss.append(history.history['val_loss'][0])
    # clear state
    model.reset_states()
    # 提前停止训练
    if i > 50 and sum(val_loss[-10:]) < 0.3:
      print(sum(val_loss[-5:]))
      print("better epoch", i)
      break
    # print(history.history['loss'])
  plt.plot(train_loss)
  plt.plot(val_loss)
  plt.title('model train vs validation loss')
  plt.ylabel('loss')
  plt.xlabel('epoch')
  plt.legend(['train', 'validation'], loc='upper right')
  plt.show()
  return model
# model predict
def forecast_lstm(model, batch_size, X):
  X = X.reshape(1, 1, len(X))
  yhat = model.predict(X, batch_size=batch_size)
  return yhat[0,0]
# 开始加载数据
series = read_csv('DAU.csv')["dau"]
print(series.head())
series.plot()
plt.show()
# 数据平稳化
raw_values = series.values
diff_values = difference(raw_values, 1)
print(diff_values.head())
plt.plot(raw_values, label="raw")
plt.plot(diff_values, label="diff")
plt.legend()
plt.show()
print('差分序列的ADF')
print(ADF(diff_values)[1])
print('差分序列的白噪声检验结果')
# (array([13.95689179]), array([0.00018705]))
print(acorr_ljungbox(diff_values, lags=1)[1][0])
# 序列转监督数据
supervised = timeseries_to_supervised(diff_values, 1)
print(supervised.head())
supervised_values = supervised.values
# split data
split_num = int(len(supervised_values)/3) or 1
train, test = supervised_values[0:-split_num], supervised_values[-split_num:]
# 标准化
scaler, train_scaled, test_scaled = scale(train, test)
#fit model
lstm_model = fit_lstm(train_scaled, 1, 200, 5)
train_reshaped = train_scaled[:, 0].reshape(len(train_scaled), 1, 1)
train_predict = lstm_model.predict(train_reshaped, batch_size=1)
train_raw = train_scaled[:, 0]
# # train RMSE plot
# train_raw = raw_values[0:-split_num]
# predictions = list()
# for i in range(len(train_scaled)):
#   # make one-step forecast
#   X, y = train_scaled[i, 0:-1], train_scaled[i, -1]
#   yhat = forecast_lstm(lstm_model, 1, X)
#   # invert scaling
#   yhat = invert_scale(scaler, X, yhat)
#   # invert differencing
#   yhat = inverse_difference(raw_values, yhat, len(train_scaled)+1-i)
#   # store forecast
#   predictions.append(yhat)
#   expected = train_raw[i]
#   mae = abs(yhat-expected)
#   print('data=%d, Predicted=%f, Expected=%f, mae=%.3f' % (i+1, yhat, expected,mae))
#   print(mae)
# plt.plot(train_raw, label="train_raw")
# plt.plot(predictions, label="predict")
# plt.legend()
# plt.show()
# 保存模型
lstm_model.save('./data/lstm_model_epoch50.h5')
# # load model
lstm_model = load_model('./data/lstm_model_epoch50.h5')
# validation
predictions = list()
for i in range(len(test_scaled)):
  # make one-step forecast
  X, y = test_scaled[i, 0:-1], test_scaled[i, -1]
  yhat = forecast_lstm(lstm_model, 1, X)
  # invert scaling
  yhat = invert_scale(scaler, X, yhat)
  # invert differencing
  yhat = inverse_difference(raw_values, yhat, len(test_scaled)+1-i)
  # store forecast
  predictions.append(yhat)
  expected = raw_values[len(train) + i + 1]
  mae = abs(yhat-expected)
  print('data=%d, Predicted=%f, Expected=%f, mae=%.3f' % (i+1, yhat, expected, mae))
mae = np.average(abs(predictions - raw_values[-split_num:]))
print("Test MAE: %.3f",mae)
#report performance
rmse = sqrt(mean_squared_error(raw_values[-split_num:], predictions))
print('Test RMSE: %.3f' % rmse)
# line plot of observed vs predicted
plt.plot(raw_values[-split_num:], label="raw")
plt.plot(predictions, label="predict")
plt.title('LSTM Test RMSE: %.3f' % rmse)
plt.legend()
plt.show()




# 附 数据集dau.csv
log_date,dau
2018-06-01,257488
2018-06-02,286612
2018-06-03,287405
2018-06-04,246955
2018-06-05,249926
2018-06-06,252951
2018-06-07,255467
2018-06-08,262498
2018-06-09,288368
2018-06-10,288440
2018-06-11,255447
2018-06-12,251316
2018-06-13,251654
2018-06-14,250515
2018-06-15,262155
2018-06-16,288844
2018-06-17,296143
2018-06-18,298142
2018-06-19,264124
2018-06-20,262992
2018-06-21,263549
2018-06-22,271631
2018-06-23,296452
2018-06-24,296986
2018-06-25,271197
2018-06-26,270546
2018-06-27,271208
2018-06-28,275496
2018-06-29,284218
2018-06-30,307498
2018-07-01,316097
2018-07-02,295106
2018-07-03,290675
2018-07-04,292231
2018-07-05,297510
2018-07-06,298839
2018-07-07,302083
2018-07-08,301238
2018-07-09,296398
2018-07-10,300986
2018-07-11,301459
2018-07-12,299865
2018-07-13,289830
2018-07-14,297501
2018-07-15,297443
2018-07-16,293097
2018-07-17,293866
2018-07-18,292902
2018-07-19,292368
2018-07-20,290766
2018-07-21,294669
2018-07-22,295811
2018-07-23,297514
2018-07-24,297392
2018-07-25,298957
2018-07-26,298101
2018-07-27,298740
2018-07-28,304086
2018-07-29,305269
2018-07-30,304827
2018-07-31,299689
2018-08-01,300526
2018-08-02,59321
2018-08-03,31731
2018-08-04,36838
2018-08-05,42043
2018-08-06,42366
2018-08-07,37209
相关文章
|
SQL 存储 BI
数仓学习---数仓开发之ADS层
数仓学习---数仓开发之ADS层
1083 1
|
缓存 监控 网络安全
因服务器时间不同步引起的异常
因服务器时间不同步引起的异常
1078 1
|
索引 Python
如何在Python中使用Pandas库进行季节性调整?
Pandas库在Python中支持季节性调整,通过`seasonal_decompose`函数实现。步骤包括:导入Pandas和statsmodels模块,准备时间序列DataFrame,调用函数分解数据为趋势、季节性和残差,可选地分析或绘制这些部分,以及根据需求去除季节性影响(原始数据减去季节性成分)。这是基础的季节性调整流程,可按实际需求调整。
595 0
|
Prometheus 监控 异构计算
阿里云容器服务GPU监控2.0基础篇3:监控NVIDIA XID错误
本系列相关文章:阿里云容器服务GPU监控2.0基础篇1:基本功能使用阿里云容器服务GPU监控2.0基础篇2:监控NVLINK带宽阿里云容器服务GPU监控2.0基础篇3:监控NVIDIA XID错误阿里云容器服务GPU监控2.0进阶篇1:剖析(Profiling)GPU使用情况必备知识阿里云容器服务GPU监控2.0进阶篇2:学会剖析(Profiling)GPU使用情况本篇文章将向您介绍如何使用GPU
8345 0
阿里云容器服务GPU监控2.0基础篇3:监控NVIDIA XID错误
|
6月前
|
机器学习/深度学习 存储 运维
机器学习异常检测实战:用Isolation Forest快速构建无标签异常检测系统
本研究通过实验演示了异常标记如何逐步完善异常检测方案和主要分类模型在欺诈检测中的应用。实验结果表明,Isolation Forest作为一个强大的异常检测模型,无需显式建模正常模式即可有效工作,在处理未见风险事件方面具有显著优势。
431 46
|
9月前
|
存储 运维 监控
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
本文总结了阿里妈妈数据技术专家陈亮在Flink Forward Asia 2024大会上的分享,围绕广告业务背景、架构设计及湖仓方案演进展开。内容涵盖广告生态运作、实时数仓挑战与优化,以及基于Paimon的湖仓方案优势。通过分层设计与技术优化,实现业务交付周期缩短30%以上,资源开销降低40%,并大幅提升系统稳定性和运营效率。文章还介绍了阿里云实时计算Flink版的免费试用活动,助力企业探索实时计算与湖仓一体化解决方案。
981 3
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
|
索引 Python
python pandas 把数据保存成csv文件,以及读取csv文件获取指定行、指定列数据
该文档详细介绍了如何使用Python的Pandas库处理图像数据集,并将其保存为CSV文件。示例数据集位于`test_data`目录中,包含5张PNG图片,每张图片名中的数字代表其标签。文档提供了将这些数据转换为CSV格式的具体步骤,包括不同格式的数据输入方法(如NumPy数组、嵌套列表、嵌套元组和字典),以及如何使用`pd.DataFrame`和`to_csv`方法保存数据。此外,还展示了如何读取CSV文件并访问其中的每一行和每一列数据,包括获取列名、指定列数据及行数据的操作方法。
777 1
|
数据库 数据安全/隐私保护
Shiro【自定义Realm 、多Realm认证 、多Realm认证策略、异常处理】(四)-全面详解(学习总结---从入门到深化)
Shiro【自定义Realm 、多Realm认证 、多Realm认证策略、异常处理】(四)-全面详解(学习总结---从入门到深化)
363 1
|
SQL 缓存 数据库
Django ORM的性能优化:高效处理大量数据
【4月更文挑战第15天】本文介绍了优化Django ORM性能的六大技巧:1) 使用批量操作如bulk_create和bulk_update;2) 利用prefetch_related和select_related减少查询次数;3) 为常用字段添加索引;4) 优化数据库查询,避免循环查询;5) 使用缓存提升频繁查询性能;6) 在必要时使用原生SQL。这些策略能帮助处理大量数据时提升Django ORM的效率。