使用Transformer 模型进行时间序列预测的Pytorch代码示例

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 时间序列预测是一个经久不衰的主题,受自然语言处理领域的成功启发,transformer模型也在时间序列预测有了很大的发展。本文可以作为学习使用Transformer 模型的时间序列预测的一个起点。

数据集

这里我们直接使用kaggle中的 Store Sales — Time Series Forecasting作为数据。这个比赛需要预测54家商店中各种产品系列未来16天的销售情况,总共创建1782个时间序列。数据从2013年1月1日至2017年8月15日,目标是预测接下来16天的销售情况。虽然为了简洁起见,我们做了简化处理,作为模型的输入包含20列中的3,029,400条数据,。每行的关键列为' store_nbr '、' family '和' date '。数据分为三类变量:

1、截止到最后一次训练数据日期(2017年8月15日)之前已知的与时间相关的变量。这些变量包括数字变量,如“销售额”,表示某一产品系列在某家商店的销售额;“transactions”,一家商店的交易总数;' store_sales ',该商店的总销售额;' family_sales '表示该产品系列的总销售额。

2、训练截止日期(2017年8月31日)之前已知,包括“onpromotion”(产品系列中促销产品的数量)和“dcoilwtico”等变量。这些数字列由' holiday '列补充,它表示假日或事件的存在,并被分类编码为整数。此外,' time_idx '、' week_day '、' month_day '、' month '和' year '列提供时间上下文,也编码为整数。虽然我们的模型是只有编码器的,但已经添加了16天移动值“onpromotion”和“dcoilwtico”,以便在没有解码器的情况下包含未来的信息。

3、静态协变量随着时间的推移保持不变,包括诸如“store_nbr”、“family”等标识符,以及“city”、“state”、“type”和“cluster”等分类变量(详细说明了商店的特征),所有这些变量都是整数编码的。

我们最后生成的df名为' data_all ',结构如下:

 categorical_covariates= ['time_idx','week_day','month_day','month','year','holiday']

 categorical_covariates_num_embeddings= []
 forcolincategorical_covariates:
     data_all[col] =data_all[col].astype('category').cat.codes
     categorical_covariates_num_embeddings.append(data_all[col].nunique())

 categorical_static= ['store_nbr','city','state','type','cluster','family_int']

 categorical_static_num_embeddings= []
 forcolincategorical_static:
     data_all[col] =data_all[col].astype('category').cat.codes
     categorical_static_num_embeddings.append(data_all[col].nunique())

 numeric_covariates= ['sales','dcoilwtico','dcoilwtico_future','onpromotion','onpromotion_future','store_sales','transactions','family_sales']

 target_idx=np.where(np.array(numeric_covariates)=='sales')[0][0]

在将数据转换为适合我的PyTorch模型的张量之前,需要将其分为训练集和验证集。窗口大小是一个重要的超参数,表示每个训练样本的序列长度。此外,' num_val '表示使用的验证折数,在此上下文中设置为2。将2013年1月1日至2017年6月28日的观测数据指定为训练数据集,以2017年6月29日至2017年7月14日和2017年7月15日至2017年7月30日作为验证区间。

同时还进行了数据的缩放,完整代码如下:

 defdataframe_to_tensor(series,numeric_covariates,categorical_covariates,categorical_static,target_idx):

     numeric_cov_arr=np.array(series[numeric_covariates].values.tolist())
     category_cov_arr=np.array(series[categorical_covariates].values.tolist())
     static_cov_arr=np.array(series[categorical_static].values.tolist())

     x_numeric=torch.tensor(numeric_cov_arr,dtype=torch.float32).transpose(2,1)
     x_numeric=torch.log(x_numeric+1e-5)
     x_category=torch.tensor(category_cov_arr,dtype=torch.long).transpose(2,1)
     x_static=torch.tensor(static_cov_arr,dtype=torch.long)
     y=torch.tensor(numeric_cov_arr[:,target_idx,:],dtype=torch.float32)

     returnx_numeric, x_category, x_static, y


 window_size=16
 forecast_length=16
 num_val=2

 val_max_date='2017-08-15'
 train_max_date=str((pd.to_datetime(val_max_date) -pd.Timedelta(days=window_size*num_val+forecast_length)).date())

 train_final=data_all[data_all['date']<=train_max_date]
 val_final=data_all[(data_all['date']>train_max_date)&(data_all['date']<=val_max_date)]

 train_series=train_final.groupby(categorical_static+['family']).agg(list).reset_index()
 val_series=val_final.groupby(categorical_static+['family']).agg(list).reset_index()

 x_numeric_train_tensor, x_category_train_tensor, x_static_train_tensor, y_train_tensor=dataframe_to_tensor(train_series,numeric_covariates,categorical_covariates,categorical_static,target_idx)

 x_numeric_val_tensor, x_category_val_tensor, x_static_val_tensor, y_val_tensor=dataframe_to_tensor(val_series,numeric_covariates,categorical_covariates,categorical_static,target_idx)

数据加载器

在数据加载时,需要将每个时间序列从窗口范围内的随机索引开始划分为时间块,以确保模型暴露于不同的序列段。

为了减少偏差还引入了一个额外的超参数设置,它不是随机打乱数据,而是根据块的开始时间对数据集进行排序。然后数据被分成五部分——反映了我们五年的数据集——每一部分都是内部打乱的,这样最后一批数据将包括去年的观察结果,但还是随机的。模型的最终梯度更新受到最近一年的影响,理论上可以改善最近时期的预测。

 defdivide_shuffle(df,div_num):
     space=df.shape[0]//div_num
     division=np.arange(0,df.shape[0],space)
     returnpd.concat([df.iloc[division[i]:division[i]+space,:].sample(frac=1) foriinrange(len(division))])

 defcreate_time_blocks(time_length,window_size):
     start_idx=np.random.randint(0,window_size-1)
     end_idx=time_length-window_size-16-1
     time_indices=np.arange(start_idx,end_idx+1,window_size)[:-1]
     time_indices=np.append(time_indices,end_idx)
     returntime_indices

 defdata_loader(x_numeric_tensor, x_category_tensor, x_static_tensor, y_tensor, batch_size, time_shuffle):

     num_series=x_numeric_tensor.shape[0]
     time_length=x_numeric_tensor.shape[1]
     index_pd=pd.DataFrame({'serie_idx':range(num_series)})
     index_pd['time_idx'] = [create_time_blocks(time_length,window_size) forninrange(index_pd.shape[0])]
     iftime_shuffle:
         index_pd=index_pd.explode('time_idx')
         index_pd=index_pd.sample(frac=1)
     else:
         index_pd=index_pd.explode('time_idx').sort_values('time_idx')
         index_pd=divide_shuffle(index_pd,5)
     indices=np.array(index_pd).astype(int)

     forbatch_idxinnp.arange(0,indices.shape[0],batch_size):

         cur_indices=indices[batch_idx:batch_idx+batch_size,:]

         x_numeric=torch.stack([x_numeric_tensor[n[0],n[1]:n[1]+window_size,:] fornincur_indices])
         x_category=torch.stack([x_category_tensor[n[0],n[1]:n[1]+window_size,:] fornincur_indices])
         x_static=torch.stack([x_static_tensor[n[0],:] fornincur_indices])
         y=torch.stack([y_tensor[n[0],n[1]+window_size:n[1]+window_size+forecast_length] fornincur_indices])

         yieldx_numeric.to(device), x_category.to(device), x_static.to(device), y.to(device)

 defval_loader(x_numeric_tensor, x_category_tensor, x_static_tensor, y_tensor, batch_size, num_val):

     num_time_series=x_numeric_tensor.shape[0]

     foriinrange(num_val):

       forbatch_idxinnp.arange(0,num_time_series,batch_size):

           x_numeric=x_numeric_tensor[batch_idx:batch_idx+batch_size,window_size*i:window_size*(i+1),:]
           x_category=x_category_tensor[batch_idx:batch_idx+batch_size,window_size*i:window_size*(i+1),:]
           x_static=x_static_tensor[batch_idx:batch_idx+batch_size]
           y_val=y_tensor[batch_idx:batch_idx+batch_size,window_size*(i+1):window_size*(i+1)+forecast_length]

           yieldx_numeric.to(device), x_category.to(device), x_static.to(device), y_val.to(device)

模型

我们这里通过Pytorch来简单的实现《Attention is All You Need》(2017)²中描述的Transformer架构。因为是时间序列预测,所以注意力机制中不需要因果关系,也就是没有对注意块应用进行遮蔽。

从输入开始:分类特征通过嵌入层传递,以密集的形式表示它们,然后送到Transformer块。多层感知器(MLP)接受最终编码输入来产生预测。嵌入维数、每个Transformer块中的注意头数和dropout概率是模型的主要超参数。堆叠多个Transformer块由' num_blocks '超参数控制。

下面是单个Transformer块的实现和整体预测模型:

 classtransformer_block(nn.Module):

     def__init__(self,embed_size,num_heads):
         super(transformer_block, self).__init__()

         self.attention=nn.MultiheadAttention(embed_size, num_heads, batch_first=True)
         self.fc=nn.Sequential(nn.Linear(embed_size, 4*embed_size),
                                  nn.LeakyReLU(),
                                  nn.Linear(4*embed_size, embed_size))
         self.dropout=nn.Dropout(drop_prob)
         self.ln1=nn.LayerNorm(embed_size, eps=1e-6)
         self.ln2=nn.LayerNorm(embed_size, eps=1e-6)

     defforward(self, x):

         attn_out, _=self.attention(x, x, x, need_weights=False)
         x=x+self.dropout(attn_out)
         x=self.ln1(x)

         fc_out=self.fc(x)
         x=x+self.dropout(fc_out)
         x=self.ln2(x)

         returnx

 classtransformer_forecaster(nn.Module):

     def__init__(self,embed_size,num_heads,num_blocks):
         super(transformer_forecaster, self).__init__()

         num_len=len(numeric_covariates)
         self.embedding_cov=nn.ModuleList([nn.Embedding(n,embed_size-num_len) fornincategorical_covariates_num_embeddings])
         self.embedding_static=nn.ModuleList([nn.Embedding(n,embed_size-num_len) fornincategorical_static_num_embeddings])

         self.blocks=nn.ModuleList([transformer_block(embed_size,num_heads) forninrange(num_blocks)])

         self.forecast_head=nn.Sequential(nn.Linear(embed_size, embed_size*2),
                                            nn.LeakyReLU(),
                                            nn.Dropout(drop_prob),
                                            nn.Linear(embed_size*2, embed_size*4),
                                            nn.LeakyReLU(),
                                            nn.Linear(embed_size*4, forecast_length),
                                            nn.ReLU())

     defforward(self, x_numeric, x_category, x_static):

         tmp_list= []
         fori,embed_layerinenumerate(self.embedding_static):
             tmp_list.append(embed_layer(x_static[:,i]))
         categroical_static_embeddings=torch.stack(tmp_list).mean(dim=0).unsqueeze(1)

         tmp_list= []
         fori,embed_layerinenumerate(self.embedding_cov):
             tmp_list.append(embed_layer(x_category[:,:,i]))
         categroical_covariates_embeddings=torch.stack(tmp_list).mean(dim=0)
         T=categroical_covariates_embeddings.shape[1]

         embed_out= (categroical_covariates_embeddings+categroical_static_embeddings.repeat(1,T,1))/2
         x=torch.concat((x_numeric,embed_out),dim=-1)

         forblockinself.blocks:
             x=block(x)

         x=x.mean(dim=1)
         x=self.forecast_head(x)

         returnx

我们修改后的transformer架构如下图所示:

模型接受三个独立的输入张量:数值特征、分类特征和静态特征。对分类和静态特征嵌入进行平均,并与数字特征组合形成具有形状(batch_size, window_size, embedding_size)的张量,为Transformer块做好准备。这个复合张量还包含嵌入的时间变量,提供必要的位置信息。

Transformer块提取顺序信息,然后将结果张量沿着时间维度聚合,将其传递到MLP中以生成最终预测(batch_size, forecast_length)。这个比赛采用均方根对数误差(RMSLE)作为评价指标,公式为:

鉴于预测经过对数转换,预测低于-1的负销售额(这会导致未定义的错误)需要进行处理,所以为了避免负的销售预测和由此产生的NaN损失值,在MLP层以后增加了一层ReLU激活确保非负预测。

 class RMSLELoss(nn.Module):

     def __init__(self):
         super().__init__()
         self.mse = nn.MSELoss()

     def forward(self, pred, actual):
         return torch.sqrt(self.mse(torch.log(pred + 1), torch.log(actual + 1)))

训练和验证

训练模型时需要设置几个超参数:窗口大小、是否打乱时间、嵌入大小、头部数量、块数量、dropout、批大小和学习率。以下配置是有效的,但不保证是最好的:

 num_epoch = 1000
 min_val_loss = 999

 num_blocks = 1
 embed_size = 500
 num_heads = 50
 batch_size = 128
 learning_rate = 3e-4
 time_shuffle = False
 drop_prob = 0.1

 model = transformer_forecaster(embed_size,num_heads,num_blocks).to(device)
 criterion = RMSLELoss()
 optimizer = torch.optim.Adam(model.parameters(),lr=learning_rate)
 scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)

这里使用adam优化器和学习率调度,以便在训练期间逐步调整学习率。

 for epoch in range(num_epoch):

     batch_loader = data_loader(x_numeric_train_tensor, x_category_train_tensor, x_static_train_tensor, y_train_tensor, batch_size, time_shuffle)
     train_loss = 0
     counter = 0

     model.train()
     for x_numeric, x_category, x_static, y in batch_loader:

         optimizer.zero_grad()
         preds = model(x_numeric, x_category, x_static)
         loss = criterion(preds, y)
         train_loss += loss.item()
         counter += 1
         loss.backward()
         optimizer.step()

     train_loss = train_loss/counter
     print(f'Epoch {epoch} training loss: {train_loss}')

     model.eval()
     val_batches = val_loader(x_numeric_val_tensor, x_category_val_tensor, x_static_val_tensor, y_val_tensor, batch_size, num_val)
     val_loss = 0
     counter = 0
     for x_numeric_val, x_category_val, x_static_val, y_val in val_batches:
         with torch.no_grad():
             preds = model(x_numeric_val,x_category_val,x_static_val)
             loss = criterion(preds,y_val).item()
         val_loss += loss
         counter += 1
     val_loss = val_loss/counter
     print(f'Epoch {epoch} validation loss: {val_loss}')

     if val_loss<min_val_loss:
       print('saved...')
       torch.save(model,data_folder+'best.model')
       min_val_loss = val_loss

     scheduler.step()

结果

训练后,表现最好的模型的训练损失为0.387,验证损失为0.457。当应用于测试集时,该模型的RMSLE为0.416,比赛排名为第89位(前10%)。

更大的嵌入和更多的注意力头似乎可以提高性能,但最好的结果是用一个单独的Transformer 实现的,这表明在有限的数据下,简单是优点。当放弃整体打乱而选择局部打乱时,效果有所改善;引入轻微的时间偏差提高了预测的准确性。

以下是引用

[1]: Alexis Cook, DanB, inversion, Ryan Holbrook. (2021). Store Sales — Time Series Forecasting. Kaggle. https://avoid.overfit.cn/post/960767b198ac4d9f988fc1795aa89e59

[2]: Vaswani, A., Shazeer, N., Parmar, N., Uszkoreit, J., Jones, L., Gomez, A. N., … & Polosukhin, I. (2017). Attention is all you need. Advances in neural information processing systems, 30.

[3]: Lim, B., Arık, S. Ö., Loeff, N., & Pfister, T. (2021). Temporal fusion transformers for interpretable multi-horizon time series forecasting. International Journal of Forecasting, 37(4), 1748–1764.

作者:Kaan Aslan

相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
1月前
|
存储 物联网 PyTorch
基于PyTorch的大语言模型微调指南:Torchtune完整教程与代码示例
**Torchtune**是由PyTorch团队开发的一个专门用于LLM微调的库。它旨在简化LLM的微调流程,提供了一系列高级API和预置的最佳实践
174 59
基于PyTorch的大语言模型微调指南:Torchtune完整教程与代码示例
|
2月前
|
算法 PyTorch 算法框架/工具
Pytorch学习笔记(九):Pytorch模型的FLOPs、模型参数量等信息输出(torchstat、thop、ptflops、torchsummary)
本文介绍了如何使用torchstat、thop、ptflops和torchsummary等工具来计算Pytorch模型的FLOPs、模型参数量等信息。
388 2
|
23天前
|
机器学习/深度学习 人工智能 PyTorch
Transformer模型变长序列优化:解析PyTorch上的FlashAttention2与xFormers
本文探讨了Transformer模型中变长输入序列的优化策略,旨在解决深度学习中常见的计算效率问题。文章首先介绍了批处理变长输入的技术挑战,特别是填充方法导致的资源浪费。随后,提出了多种优化技术,包括动态填充、PyTorch NestedTensors、FlashAttention2和XFormers的memory_efficient_attention。这些技术通过减少冗余计算、优化内存管理和改进计算模式,显著提升了模型的性能。实验结果显示,使用FlashAttention2和无填充策略的组合可以将步骤时间减少至323毫秒,相比未优化版本提升了约2.5倍。
42 3
Transformer模型变长序列优化:解析PyTorch上的FlashAttention2与xFormers
|
2月前
|
机器学习/深度学习 自然语言处理 监控
利用 PyTorch Lightning 搭建一个文本分类模型
利用 PyTorch Lightning 搭建一个文本分类模型
76 8
利用 PyTorch Lightning 搭建一个文本分类模型
|
2月前
|
机器学习/深度学习 自然语言处理 数据建模
三种Transformer模型中的注意力机制介绍及Pytorch实现:从自注意力到因果自注意力
本文深入探讨了Transformer模型中的三种关键注意力机制:自注意力、交叉注意力和因果自注意力,这些机制是GPT-4、Llama等大型语言模型的核心。文章不仅讲解了理论概念,还通过Python和PyTorch从零开始实现这些机制,帮助读者深入理解其内部工作原理。自注意力机制通过整合上下文信息增强了输入嵌入,多头注意力则通过多个并行的注意力头捕捉不同类型的依赖关系。交叉注意力则允许模型在两个不同输入序列间传递信息,适用于机器翻译和图像描述等任务。因果自注意力确保模型在生成文本时仅考虑先前的上下文,适用于解码器风格的模型。通过本文的详细解析和代码实现,读者可以全面掌握这些机制的应用潜力。
131 3
三种Transformer模型中的注意力机制介绍及Pytorch实现:从自注意力到因果自注意力
|
2月前
|
机器学习/深度学习 PyTorch 算法框架/工具
聊一聊计算机视觉中常用的注意力机制以及Pytorch代码实现
本文介绍了几种常用的计算机视觉注意力机制及其PyTorch实现,包括SENet、CBAM、BAM、ECA-Net、SA-Net、Polarized Self-Attention、Spatial Group-wise Enhance和Coordinate Attention等,每种方法都附有详细的网络结构说明和实验结果分析。通过这些注意力机制的应用,可以有效提升模型在目标检测任务上的性能。此外,作者还提供了实验数据集的基本情况及baseline模型的选择与实验结果,方便读者理解和复现。
77 0
聊一聊计算机视觉中常用的注意力机制以及Pytorch代码实现
|
3月前
|
机器学习/深度学习 PyTorch 调度
在Pytorch中为不同层设置不同学习率来提升性能,优化深度学习模型
在深度学习中,学习率作为关键超参数对模型收敛速度和性能至关重要。传统方法采用统一学习率,但研究表明为不同层设置差异化学习率能显著提升性能。本文探讨了这一策略的理论基础及PyTorch实现方法,包括模型定义、参数分组、优化器配置及训练流程。通过示例展示了如何为ResNet18设置不同层的学习率,并介绍了渐进式解冻和层适应学习率等高级技巧,帮助研究者更好地优化模型训练。
216 4
在Pytorch中为不同层设置不同学习率来提升性能,优化深度学习模型
|
3月前
|
机器学习/深度学习 监控 PyTorch
PyTorch 模型调试与故障排除指南
在深度学习领域,PyTorch 成为开发和训练神经网络的主要框架之一。本文为 PyTorch 开发者提供全面的调试指南,涵盖从基础概念到高级技术的内容。目标读者包括初学者、中级开发者和高级工程师。本文探讨常见问题及解决方案,帮助读者理解 PyTorch 的核心概念、掌握调试策略、识别性能瓶颈,并通过实际案例获得实践经验。无论是在构建简单神经网络还是复杂模型,本文都将提供宝贵的洞察和实用技巧,帮助开发者更高效地开发和优化 PyTorch 模型。
56 3
PyTorch 模型调试与故障排除指南
|
2月前
|
存储 并行计算 PyTorch
探索PyTorch:模型的定义和保存方法
探索PyTorch:模型的定义和保存方法
|
4月前
|
机器学习/深度学习 PyTorch 编译器
PyTorch 与 TorchScript:模型的序列化与加速
【8月更文第27天】PyTorch 是一个非常流行的深度学习框架,它以其灵活性和易用性而著称。然而,当涉及到模型的部署和性能优化时,PyTorch 的动态计算图可能会带来一些挑战。为了解决这些问题,PyTorch 引入了 TorchScript,这是一个用于序列化和优化 PyTorch 模型的工具。本文将详细介绍如何使用 TorchScript 来序列化 PyTorch 模型以及如何加速模型的执行。
177 4