数据准备
入库,设置随机种子
# -*- codeing = utf-8 -*- import torch import random import numpy as np SEED = 1234 random.seed(SEED) np.random.seed(SEED) torch.manual_seed(SEED) torch.backends.deterministic = True from transformers import BertTokenizer tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') print('`tokenizer` 有一个 `vocab` 属性,它包含我们将使用的实际词汇', len(tokenizer.vocab)) # 使用`tokenizer.tokenize`方法对字符串进行分词,并统一大小写。 tokens = tokenizer.tokenize('Hello WORLD how ARE yoU?') print(tokens) # 我们可以使用我们的词汇表使用 `tokenizer.convert_tokens_to_ids` 来数字化标记。 # 下面的tokens是我们之前上面进行了分词和统一大小写之后的list。 indexes = tokenizer.convert_tokens_to_ids(tokens) print(indexes) # Transformer还接受了特殊tokens的训练,以标记句子的开头和结尾, # 就像我们标准化padding和未知的token一样,我们也可以从`tokenizer`中获取这些。 # **注意**:`tokenizer` 确实有序列开始和序列结束属性(`bos_token` 和 `eos_token`), # 但我们没有对此进行设置,并且不适用于我们本次训练的transformer。 init_token = tokenizer.cls_token eos_token = tokenizer.sep_token pad_token = tokenizer.pad_token unk_token = tokenizer.unk_token print(init_token, eos_token, pad_token, unk_token) # 我们可以通过反转词汇表来获得特殊tokens的索引 init_token_idx = tokenizer.convert_tokens_to_ids(init_token) eos_token_idx = tokenizer.convert_tokens_to_ids(eos_token) pad_token_idx = tokenizer.convert_tokens_to_ids(pad_token) unk_token_idx = tokenizer.convert_tokens_to_ids(unk_token) print(init_token_idx, eos_token_idx, pad_token_idx, unk_token_idx) # 或者通过tokenizer的方法直接获取它们 init_token_idx = tokenizer.cls_token_id eos_token_idx = tokenizer.sep_token_id pad_token_idx = tokenizer.pad_token_id unk_token_idx = tokenizer.unk_token_id print(init_token_idx, eos_token_idx, pad_token_idx, unk_token_idx) # 我们需要处理的另一件事是模型是在具有定义的最大长度的序列上训练的——它不知道如何处理比训练更长的序列。 # 我们可以通过检查我们想要使用的转换器版本的 `max_model_input_sizes` 来获得这些输入大小的最大长度。 max_input_length = tokenizer.max_model_input_sizes['bert-base-uncased'] print(max_input_length) # 之前我们使用了 `spaCy` 标记器来标记我们的示例。 # 然而,我们现在需要定义一个函数,我们将把它传递给我们的 `TEXT` 字段,它将为我们处理所有的标记化。 # 它还会将令牌的数量减少到最大长度。 请注意,我们的最大长度比实际最大长度小 2。 # 这是因为我们需要向每个序列附加两个标记,一个在开头,一个在结尾。 def tokenize_and_cut(sentence): tokens = tokenizer.tokenize(sentence) tokens = tokens[:max_input_length - 2] return tokens # 现在我们开始定义我们的字段,transformer期望将batch维度放在第一维上,所以我们设置了 `batch_first = True`。 # 现在我们已经有了文本的词汇数据,由transformer提供,我们设置 `use_vocab = False` 来告诉 torchtext 已经不需要切分数据了。 # 我们将 `tokenize_and_cut` 函数作为标记器传递。 `preprocessing` 参数是一个函数,这是我们将token转换为其索引的地方。 # 最后,我们定义特殊的token——注意我们将它们定义为它们的索引值而不是它们的字符串值,即“100”而不是“[UNK]”这是因为序列已经被转换为索引。 # 我们像以前一样定义标签字段。 from torchtext.legacy import data TEXT = data.Field(batch_first=True, use_vocab=False, tokenize=tokenize_and_cut, preprocessing=tokenizer.convert_tokens_to_ids, init_token=init_token_idx, eos_token=eos_token_idx, pad_token=pad_token_idx, unk_token=unk_token_idx) LABEL = data.LabelField(dtype=torch.float) # 加载数据,拆分成训练集和验证集 from torchtext.legacy import datasets train_data, test_data = datasets.IMDB.splits(TEXT, LABEL) train_data, valid_data = train_data.split(random_state=random.seed(SEED)) print(f"Number of training examples: {len(train_data)}") print(f"Number of validation examples: {len(valid_data)}") print(f"Number of testing examples: {len(test_data)}") # 随便看一个例子,看下具体效果如何,输出其中一个句子的one-hot向量。 print(vars(train_data.examples[6])) # 我们可以使用 `convert_ids_to_tokens` 将这些索引转换回可读的tokens。 tokens = tokenizer.convert_ids_to_tokens(vars(train_data.examples[6])['text']) print(tokens) # 尽管我们已经处理了文本的词汇表,当然也需要为标签构建词汇表。 LABEL.build_vocab(train_data) print(LABEL.vocab.stoi) # 像之前一样,我们创建迭代器。根据以往经验,使用最大的batch size可以使transformer获得最好的效果, # 当然,你也可以尝试一下使用其他的batch size,如果你的显卡比较好的话。 BATCH_SIZE = 16 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits( (train_data, valid_data, test_data), batch_size=BATCH_SIZE, device=device) # 构建模型 # 接下来,我们导入预训练模型。 from transformers import BertTokenizer, BertModel bert = BertModel.from_pretrained('bert-base-uncased') import torch.nn as nn class BERTGRUSentiment(nn.Module): def __init__(self, bert, hidden_dim, output_dim, n_layers, bidirectional, dropout): super().__init__() self.bert = bert embedding_dim = bert.config.to_dict()['hidden_size'] self.rnn = nn.GRU(embedding_dim, hidden_dim, num_layers=n_layers, bidirectional=bidirectional, batch_first=True, dropout=0 if n_layers < 2 else dropout) self.out = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim) self.dropout = nn.Dropout(dropout) def forward(self, text): # text = [batch size, sent len] with torch.no_grad(): embedded = self.bert(text)[0] # embedded = [batch size, sent len, emb dim] _, hidden = self.rnn(embedded) # hidden = [n layers * n directions, batch size, emb dim] if self.rnn.bidirectional: hidden = self.dropout(torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1)) else: hidden = self.dropout(hidden[-1, :, :]) # hidden = [batch size, hid dim] output = self.out(hidden) # output = [batch size, out dim] return output HIDDEN_DIM = 256 OUTPUT_DIM = 1 N_LAYERS = 2 BIDIRECTIONAL = True DROPOUT = 0.25 model = BERTGRUSentiment(bert, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS, BIDIRECTIONAL, DROPOUT) def count_parameters(model): return sum(p.numel() for p in model.parameters() if p.requires_grad) print(f'The model has {count_parameters(model):,} trainable parameters') for name, param in model.named_parameters(): if name.startswith('bert'): param.requires_grad = False def count_parameters(model): return sum(p.numel() for p in model.parameters() if p.requires_grad) print(f'The model has {count_parameters(model):,} trainable parameters') for name, param in model.named_parameters(): if param.requires_grad: print(name) # 训练模型 # 按照惯例,我们构建自己的模型损失函数,仍然是二分类 import torch.optim as optim optimizer = optim.Adam(model.parameters()) criterion = nn.BCEWithLogitsLoss() # 将模型和损失函数放在 GPU 上,如果你有GPU的话 model = model.to(device) criterion = criterion.to(device) # 接下来,我们将定义函数用于:计算准确度、定义train、evaluate函数以及计算训练/评估时期每一个epoch所需的时间。 def binary_accuracy(preds, y): """ Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8 """ # round predictions to the closest integer rounded_preds = torch.round(torch.sigmoid(preds)) correct = (rounded_preds == y).float() # convert into float for division acc = correct.sum() / len(correct) return acc def train(model, iterator, optimizer, criterion): epoch_loss = 0 epoch_acc = 0 model.train() for batch in iterator: optimizer.zero_grad() predictions = model(batch.text).squeeze(1) loss = criterion(predictions, batch.label) acc = binary_accuracy(predictions, batch.label) loss.backward() optimizer.step() epoch_loss += loss.item() epoch_acc += acc.item() return epoch_loss / len(iterator), epoch_acc / len(iterator) def evaluate(model, iterator, criterion): epoch_loss = 0 epoch_acc = 0 model.eval() with torch.no_grad(): for batch in iterator: predictions = model(batch.text).squeeze(1) loss = criterion(predictions, batch.label) acc = binary_accuracy(predictions, batch.label) epoch_loss += loss.item() epoch_acc += acc.item() return epoch_loss / len(iterator), epoch_acc / len(iterator) import time def epoch_time(start_time, end_time): elapsed_time = end_time - start_time elapsed_mins = int(elapsed_time / 60) elapsed_secs = int(elapsed_time - (elapsed_mins * 60)) return elapsed_mins, elapsed_secs # 最后,我们将训练我们的模型。 由于transformer的尺寸的原因,这比以前的任何型号都要长得多。 # 即使我们没有训练任何transformer的参数,我们仍然需要通过模型传递数据,这在标准 GPU 上需要花费大量时间。 N_EPOCHS = 1 best_valid_loss = float('inf') for epoch in range(N_EPOCHS): start_time = time.time() train_loss, train_acc = train(model, train_iterator, optimizer, criterion) valid_loss, valid_acc = evaluate(model, valid_iterator, criterion) end_time = time.time() epoch_mins, epoch_secs = epoch_time(start_time, end_time) if valid_loss < best_valid_loss: best_valid_loss = valid_loss torch.save(model.state_dict(), 'tut6-model.pt') print(f'Epoch: {epoch + 1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s') print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc * 100:.2f}%') print(f'\t Val. Loss: {valid_loss:.3f} | Val. Acc: {valid_acc * 100:.2f}%') # 我们将加载为我们提供最佳验证集上损失值的参数,并在测试集上应用这些参数 - 并在测试集上达到了最优的结果。 model.load_state_dict(torch.load('tut6-model.pt')) test_loss, test_acc = evaluate(model, test_iterator, criterion) print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc * 100:.2f}%') # 模型的验证 # 然后我们将使用该模型来测试一些序列的情绪。 # 我们对输入序列进行标记,将其修剪到最大长度,将特殊token添加到任一侧,将其转换为张量, # 使用unsqueeze函数增加一维,然后将其传递给我们的模型。 def predict_sentiment(model, tokenizer, sentence): model.eval() tokens = tokenizer.tokenize(sentence) tokens = tokens[:max_input_length - 2] indexed = [init_token_idx] + tokenizer.convert_tokens_to_ids(tokens) + [eos_token_idx] tensor = torch.LongTensor(indexed).to(device) tensor = tensor.unsqueeze(0) prediction = torch.sigmoid(model(tensor)) return prediction.item() predict_sentiment(model, tokenizer, "This film is terrible") predict_sentiment(model, tokenizer, "This film is great")