说到时序数据的处理,不得不提到 tslearn 这个库,ts当然是time series的简称。这个库提供了时序数据常用的分类,聚类,计算质心等操作,封装成易用的接口供大家使用。这里介绍对同一个数据采用两种思路实现分类的目的。
数据介绍
引入两种时序数据,根据呈现的波峰形态进行分类即可。使用matplotlib画图工作绘制折线图如下
红色波形和黑色波形是两种时序数据,是需要被分开的两种类别。使用深度学习模型分类可以采用多层感知机模型,输入层即设置为时序数据的长度,输出层则是类别的种类,这里是2。如果采用传统的机器学习的思路呢?这里提供两种思路。一种是分类的思想,knn + dtw的策略, 另一种是通过tslearn 得到两类的质心,根据待分类序列与两个质心之间的距离度量(dtw求距离),比较距离大小,即可得到待分类序列属于哪一类。
knn + dtw
这种方案的思想比较简单,不作过多介绍。通过测试两种分类发现,该方案几乎不涉及到训练过程,计算耗时集中在推理过程。可以参考这里的指标,其中实验一对应上图的两种时序数据。
类别 | 样本总数 | 训练样本 | 测试样本 | 准确率f | 训练时长 | 推理时长 | 模型大小 | |
实验一 | 2 | 42 | 33 | 9 | 100% | <0.1s | 0.667s | 42kb |
实验二 | 3 | 64 | 51 | 13 | 84% | <0.1s | 0.006s | 72kb |
由于计算时长过长以及模型过大的因素,该方案比不过其他能真正提取特征的方案。源码如下
import numpy as np import dataload as dl from tslearn.neighbors import KNeighborsTimeSeriesClassifier from tslearn.svm import TimeSeriesSVC from tslearn.shapelets import LearningShapelets import time def filterTrainData1Div(trainData): train_x = trainData['data'][:,:-1] train_y = np.array(trainData['label']) label0Or2RowIndexList = np.where((train_y==0) | (train_y==2))[0] # the value returned is tuple use [0] to get array train_x = train_x.take(label0Or2RowIndexList, 0) _y = train_y.take(label0Or2RowIndexList, 0).tolist() train_y = np.array([1 if i==2 else i for i in _y]) return train_x, train_y def filterTrainData2Div(trainData): train_x = trainData['data'][:,:-1] train_y = np.array(trainData['label']) label1Or3Or4RowIndexList = np.where((train_y==1) | (train_y==3) | (train_y==4))[0] # the value returned is tuple use [0] to get array train_x = train_x.take(label1Or3Or4RowIndexList, 0) _y = train_y.take(label1Or3Or4RowIndexList, 0).tolist() train_y = np.array([0 if i==1 else i-2 for i in _y]) return train_x, train_y def train(X, y): start = time.time() knn = KNeighborsTimeSeriesClassifier(n_neighbors=2) knn.fit(X, y) # time.sleep(2) end = time.time() train_time = end - start print('train time cost : %.5f sec' %train_time) return knn def train1DivModel(): trainData = dl.getDataFromDir('D:\\your\\dataset\\path') train_x, train_y = filterTrainData1Div(trainData) train_ratio = 0.8 split = int(train_x.shape[0] * train_ratio) model = train(train_x[:split,:], train_y[:split]) model.to_json('models/div1.json') start = time.time() infer = model.predict(train_x[0,:]) end = time.time() infer_time = end - start print('infer time cost : %.5f sec' %infer_time) result = model.score(train_x[split:,:], train_y[split:]) print('train_x:', train_x.shape, 'split:', split) print('1Div model\'s accuracy:', result) def train2DivModel(): trainData = dl.getDataFromDir('D:\\your\\dataset\\path') train_x, train_y = filterTrainData2Div(trainData) train_ratio = 0.8 split = int(train_x.shape[0] * train_ratio) model = train(train_x[:split,:], train_y[:split]) model.to_json('models/div2.json') start = time.time() infer = model.predict(train_x[0,:]) end = time.time() infer_time = end - start print('infer time cost : %.5f sec' %infer_time) result = model.score(train_x[split:,:], train_y[split:]) print('train_x:', train_x.shape, 'split:', split) print('2Div model\'s accuracy:', result) if __name__ == '__main__': train1DivModel() train2DivModel()
质心计算
训练时分别计算两种时序数据的质心,保存质心数据。推理时根据待分类时序数据,计算与两种类别的质心之间的dtw值,比较与哪个类别的距离值越小,即判定为该种类别。计算质心的核心代码如下
import numpy import matplotlib.pyplot as plt from tslearn.barycenters import \ euclidean_barycenter, \ dtw_barycenter_averaging, \ dtw_barycenter_averaging_subgradient, \ softdtw_barycenter from tslearn.datasets import CachedDatasets def getXseries(): numpy.random.seed(0) X_train, y_train, _, _= CachedDatasets().load_dataset("Trace") print("x_train:", X_train.shape) X = X_train[y_train == 2] return X def getMyXseries(label = 0): import dataload as dl import classifyplan as cp trainData = dl.getDataFromDir('D:\\your\\dataset\\path') train_x, train_y = cp.filterTrainData1Div(trainData) X = train_x[train_y == label] print("X:", X.shape) return X def plot_helper(barycenter): for series in X: plt.plot(series.ravel(), "k-", alpha=.2) plt.plot(barycenter.ravel(), "r-", linewidth=2) def plotAll(X): ax1 = plt.subplot(4, 1, 1) plt.title("Euclidean barycenter") plot_helper(euclidean_barycenter(X)) plt.subplot(4, 1, 2, sharex = ax1) plt.title("DBA (vectorized version of Petitjean's EM)") plot_helper(dtw_barycenter_averaging(X, max_iter=50, tol=1e-3)) plt.subplot(4, 1, 3, sharex = ax1) plt.title("DBA (subgradient descent approach)") plot_helper(dtw_barycenter_averaging_subgradient(X, max_iter=50, tol=1e-3)) plt.subplot(4, 1, 4, sharex = ax1) plt.title("Soft-DTW barycenter ($\gamma$\=1.0)") plot_helper(softdtw_barycenter(X, gamma=1., max_iter=50, tol=1e-3)) lenght_of_sequence = X.shape[1] ax1.set_xlim([0, lenght_of_sequence]) plt.tight_layout() plt.show() if __name__ == "__main__": X = getMyXseries() # X = getXseries() plotAll(X)
当然,你也可以在这里找到这份代码的出处。
以上。