前言
最近在做多变量时间序列异常检测相关的工作,顺带也整理了目前市面上比较常用的五个多变量时间序列异常检测数据集,测试集都有标好的label,这五个数据集应该是在这个领域最为常用benchmark的数据集,整理主要来自于很多顶会的对比实验。
本文主要介绍五个数据集的具体信息和对应的标准化处理,并给出处理的代码和最终标准化的格式。
tips:作为一个写了四五年博客的职场小白来说,准备从今天开始好好把ATA写下去,多积累,干!
数据集下载
- SMD数据集:https://github.com/NetManAIOps/OmniAnomaly/tree/master/ServerMachineDataset
- MSL/SMAP数据集:
wget https://s3-us-west-2.amazonaws.com/telemanom/data.zip && unzip data.zip && rm data.zip
cd data && wget https://raw.githubusercontent.com/khundman/telemanom/master/labeled_anomalies.csv
- SWat和WADI数据集申请,填写申请即可:https://itrust.sutd.edu.sg/itrust-labs_datasets/
数据集详情
SMD数据集(Server Machine Dataset)
- 相关链接:https://github.com/NetManAIOps/OmniAnomaly
- 出自论文:Robust Anomaly Detection for Multivariate Time Series through Stochastic RNN
- Netman收集
信息
- 5周数据(时间粒度是1min,数据中时间index省略)
- 28个不同的机器对应28个实体,每个实体有38维度的数据(每个维度都是机器的metric)
- 数据量=57246028=1411200
- 训练集和测试集=1:1,训练集无label,测试集有label
原始文件说明
train: The former half part of the dataset.
- machine-x-y.txt 其中x代表组,y是组里的index,每一个machine-x-y代表一个具体的机器即实体。
- test: The latter half part of the dataset.
- test_label: The label of the test set. It denotes whether a point is an anomaly.
- interpretation_label: The lists of dimensions contribute to each anomaly. (该数据集给出具体哪几个维度造成了最终的异常)
SMAP(Soil Moisture Active Passive satellite)/MSL(Mars Science Laboratory rover)
- 相关链接:https://github.com/khundman/telemanom
- 出自论文:A framework for using LSTMs to detect anomalies in multivariate time series data
- NASA收集
信息
- 时间信息匿名(时间粒度也是1min),数据已全被scale到0-1
- SMAP和MSL代表两个不同的产生遥测流的航天器,两个数据集分开
- SMAP有55个channel(实体),每个channel有25个维度
- MSL有27个channel(实体),每个channel有55个维度
- 只有telemetry value是连续变量,其他变量都是相关命令(command)是否发送,实际就是0或者1(0为没发送,而1代表已发送)
- 训练集无label,测试集有label
原始文件说明
labeled_anomalies:数据处理和两个航天器数据分离依靠此文件
- chan_id: 对应于train和test中对应名字的numpy文件。(对应实体)
- spacecraft:chan_id所归属的SMAP或MSL
- anomaly_sequence:对应test中chan_id文件中的index为此范围的为异常序列
- class:有两种异常,point和contextual,前者点异常,后者是整体变化趋势异常
- num_values: 对应chan_id的test文件的时间戳数量
- train
- test
SWat(Secure Water Treatment)
- https://itrust.sutd.edu.sg/itrust-labs_datasets/dataset_info
- Singapore University of Technology and Design收集
信息
- 51 个feature, 来自于 51 个传感器和执行器。
- 时间信息给出,train(2015.12.22 16:30:00 -> 2015.12.28 10:00:00). test(2015.12.28 10:00:00->2016.1.2 15:00:00).
- 一共连续11天数据, 7天正常操作下,4天阶段性受到attack,一共41次袭击。
- 时间粒度为1s
- normal有v0和v1两个版本,v1是剔除掉无用的前30分钟数据,故采用normalv1和attackv0
- 原数据中label是normal和attack,统一替换为0和1
WADI(Water Distribution)
- 相关链接:https://itrust.sutd.edu.sg/itrust-labs_datasets/dataset_info/
- Singapore University of Technology and Design收集
信息
- 127个feature, 来自123个传感器和执行器的数据
- 时间信息给出,train(2017.9.25 18:00:00 -> 2017.10.9 18:00:00). test(2017.10.9 18.:00:00->2017.10.11 18:00:00). (第一版本数据信息)(目前用的数据是第二版本,train的不稳定数据已删除,故train的数据时间范围不是一直连续的,测试集时间范围不变)
- 一共连续16天数据, 14天正常操作下,2天阶段性受到attack,在两天内15次袭击。
- 时间粒度是1s
有两个版本,由于核电站在运行期间的某些时期不稳定,受影响的读数已被删除,故第二个版本的数据训练集时间是有几个区段被删除掉的,而测试集时间是完全的完整的,且只有第二个版本的数据存在label,故采用第二个版本数据。
- 新版本中的数据时间格式存在错误,因为两个版本都存在row这行,所以利用row将时间格式替换成第一个版本中的时间数据即可。
- 原数据是1为 no attack 而-1为attack,为了保证数据的一致性则attack为1 而no attack为0
数据集标准化处理
最终的格式为:
datasets
processed_datasets(处理后数据格式)
dataset1(数据集文件夹名字)
entity1 (实例名称,一个实例一个文件夹)
train数据集
- timestamp/datetime
- fea1(多维度特征)
- fea2
test数据集
- timestamp/datetime
- fea1
- fea2
- label (0为正常 1为异常)
entity2
- train
- test
- dataset2
具体格式
时间统一为datetime格式,很多时间信息匿名的,给予0-len(dataset),label列名统一用label,0为正常,1为异常。
处理代码
MSL SMAP SMD处理代码
import ast
import csv
import os
import sys
from pickle import dump
import pandas as pd
import numpy as np
output_folder = 'processed_csv'
os.makedirs(output_folder, exist_ok=True)
def load_and_save(category, filename, dataset, dataset_folder):
os.makedirs(os.path.join(output_folder, filename.split('.')[0]), exist_ok=True)
temp = np.genfromtxt(os.path.join(dataset_folder, category, filename),
dtype=np.float32,
delimiter=',')
# print(dataset, category, filename, temp.shape)
fea_len = len(temp[0, :])
header_list = []
for i in range(fea_len):
header_list.append("col_%d"%i)
data = pd.DataFrame(temp, columns=header_list).reset_index()
data.rename(columns={'index': 'timestamp'}, inplace=True)
if category == "test":
temp1 = np.genfromtxt(os.path.join(dataset_folder, "test_label", filename),
dtype=np.float32,
delimiter=',')
data1 = pd.DataFrame(temp1, columns=["label"]).reset_index()
data1.rename(columns={'index': 'timestamp'}, inplace=True)
data = pd.merge(data, data1, how="left", on='timestamp')
print(dataset, category, filename, temp.shape)
data.to_csv(os.path.join(output_folder, filename.split('.')[0], dataset + "_" + category + ".csv"), index=False)
def load_data(dataset):
if dataset == 'SMD':
dataset_folder = 'ServerMachineDataset'
file_list = os.listdir(os.path.join(dataset_folder, "train"))
for filename in file_list:
if filename.endswith('.txt'):
load_and_save('train', filename, filename.strip('.txt'), dataset_folder)
load_and_save('test', filename, filename.strip('.txt'), dataset_folder)
elif dataset == 'SMAP' or dataset == 'MSL':
dataset_folder = 'data'
with open(os.path.join(dataset_folder, 'labeled_anomalies.csv'), 'r') as file:
csv_reader = csv.reader(file, delimiter=',')
res = [row for row in csv_reader][1:]
res = sorted(res, key=lambda k: k[0])
label_folder = os.path.join(dataset_folder, 'test_label')
os.makedirs(label_folder, exist_ok=True)
data_info = [row for row in res if row[1] == dataset and row[0] != 'P-2']
labels = []
for row in data_info:
anomalies = ast.literal_eval(row[2])
length = int(row[-1])
label = np.zeros([length], dtype=np.int)
for anomaly in anomalies:
label[anomaly[0]:anomaly[1] + 1] = 1
labels.extend(label)
labels = np.asarray(labels)
print(dataset, 'test_label', labels.shape)
labels = pd.DataFrame(labels, columns=["label"]).reset_index()
labels.rename(columns={'index': 'timestamp'}, inplace=True)
def concatenate_and_save(category):
data = []
for row in data_info:
filename = row[0]
print(os.path.join(dataset_folder, category, filename + '.npy'))
temp = np.load(os.path.join(dataset_folder, category, filename + '.npy'))
data.extend(temp)
data = np.asarray(data)
print(dataset, category, data.shape)
fea_len = len(data[0, :])
header_list = []
for i in range(fea_len):
header_list.append("col_%d" % i)
data = pd.DataFrame(data, columns=header_list).reset_index()
data.rename(columns={'index': 'timestamp'}, inplace=True)
if category == "test":
data = pd.merge(data, labels, how="left", on='timestamp')
print(dataset, category, filename, temp.shape)
data.to_csv(os.path.join(output_folder, dataset + "_" + category + ".csv"),
index=False)
for c in ['train', 'test']:
concatenate_and_save(c)
if __name__ == '__main__':
datasets = ['SMD', 'SMAP', 'MSL']
load_data('MSL')
改于:https://github.com/NetManAIOps/OmniAnomaly/blob/master/data_preprocess.py
WADI处理
import pandas as pd
train_new = pd.read_csv('./WADI.A2_19 Nov 2019/WADI_14days_new.csv')
test_new = pd.read_csv('./WADI.A2_19 Nov 2019/WADI_attackdataLABLE.csv', skiprows=1)
test = pd.read_csv('./WADI.A1_9 Oct 2017/WADI_attackdata.csv')
train = pd.read_csv('./WADI.A1_9 Oct 2017/WADI_14days.csv', skiprows=4)
def recover_date(str1, str2):
return str1+" "+str2
train["datetime"] = train.apply(lambda x : recover_date(x['Date'], x['Time']), axis=1)
train["datetime"] = pd.to_datetime(train['datetime'])
train_time = train[['Row', 'datetime']]
train_new_time = pd.merge(train_new, train_time, how='left', on='Row')
del train_new_time['Row']
del train_new_time['Date']
del train_new_time['Time']
train_new_time.to_csv('./processing/WADI_train.csv', index=False)
test["datetime"] = test.apply(lambda x : recover_date(x['Date'], x['Time']), axis=1)
test["datetime"] = pd.to_datetime(test['datetime'])
test = test.loc[-2:, :]
test_new = test_new.rename(columns={'Row ':'Row'})
test_time = test[['Row', 'datetime']]
test_new_time = pd.merge(test_new, test_time, how='left', on='Row')
del test_new_time['Row']
del test_new_time['Date ']
del test_new_time['Time']
test_new_time = test_new_time.rename(columns={'Attack LABLE (1:No Attack, -1:Attack)':'label'})
test_new_time.loc[test_new_time['label'] == 1, 'label'] = 0
test_new_time.loc[test_new_time['label'] == -1, 'label'] = 1
test_new_time.to_csv('./processing/WADI_test.csv', index=False)
SWaT处理
import numpy as np
import pandas as pd
normal = pd.read_csv("input/SWaT_Dataset_Normal_v1.csv")
attack = pd.read_csv("input/SWaT_Dataset_Attack_v0.csv",sep=";")
normal['Timestamp'] = pd.to_datetime(normal['Timestamp'])
del normal['Normal/Attack']
normal = normal.rename(columns={'Timestamp':'datetime'})
datetime = normal['datetime']
del normal['datetime']
for i in list(normal):
normal[i]=normal[i].apply(lambda x: str(x).replace("," , "."))
normal = normal.astype(float)
normal['datetime']= datetime
normal.to_csv('SWaT_train.csv', index=False)
attack['Timestamp'] = pd.to_datetime(attack['Timestamp'])
attack = attack.rename(columns={'Timestamp':'datetime'})
datetime = attack['datetime']
del attack['datetime']
labels = [ float(label!= 'Normal' ) for label in attack["Normal/Attack"].values]
del attack['Normal/Attack']
for i in list(attack):
attack[i]=attack[i].apply(lambda x: str(x).replace("," , "."))
attack = attack.astype(float)
attack['datetime'] = datetime
attack['label'] = labels
attack.to_csv('SWaT_test.csv', index=False)