基于BERT+PET方式文本分类模型搭建
模型搭建
- 本项目中完成BERT+PET模型搭建、训练及应用的步骤如下(注意:因为本项目中使用的是BERT预训练模型,所以直接加载即可,无需重复搭建模型架构):
- 一、实现模型工具类函数
- 二、实现模型训练函数,验证函数
- 三、实现模型预测函数
一、实现模型工具类函数
- 目的:模型在训练、验证、预测时需要的函数
- 代码路径:/Users/**/PycharmProjects/llm/prompt_tasks/PET/utils
- utils文件夹共包含3个py脚本:verbalizer.py、metirc_utils.py以及common_utils.py
1.1 verbalizer.py
- 目的:定义一个Verbalizer类,用于将一个Label对应到其子Label的映射。
- 导入必备的工具包
import os from typing import Union, List from pet_config import * pc = ProjectConfig()
- 具体实现代码
class Verbalizer(object): """ Verbalizer类,用于将一个Label对应到其子Label的映射。 """ def __init__(self, verbalizer_file: str, tokenizer, max_label_len: int): """ Args: verbalizer_file (str): verbalizer文件存放地址。 tokenizer: 分词器,用于文本和id之间的转换。 max_label_len (int): 标签长度,若大于则截断,若小于则补齐 """ self.tokenizer = tokenizer self.label_dict = self.load_label_dict(verbalizer_file) self.max_label_len = max_label_len def load_label_dict(self, verbalizer_file: str): """ 读取本地文件,构建verbalizer字典。 Args: verbalizer_file (str): verbalizer文件存放地址。 Returns: dict -> { '体育': ['篮球', '足球','网球', '排球', ...], '酒店': ['宾馆', '旅馆', '旅店', '酒店', ...], ... } """ label_dict = {} with open(verbalizer_file, 'r', encoding='utf8') as f: for line in f.readlines(): label, sub_labels = line.strip().split('\t') label_dict[label] = list(set(sub_labels.split(','))) return label_dict def find_sub_labels(self, label: Union[list, str]): """ 通过标签找到对应所有的子标签。 Args: label (Union[list, str]): 标签, 文本型 或 id_list, e.g. -> '体育' or [860, 5509] Returns: dict -> { 'sub_labels': ['足球', '网球'], 'token_ids': [[6639, 4413], [5381, 4413]] } """ if type(label) == list: # 如果传入为id_list, 则通过tokenizer进行文本转换 while self.tokenizer.pad_token_id in label: label.remove(self.tokenizer.pad_token_id) label = ''.join(self.tokenizer.convert_ids_to_tokens(label)) if label not in self.label_dict: raise ValueError(f'Lable Error: "{label}" not in label_dict') sub_labels = self.label_dict[label] ret = {'sub_labels': sub_labels} token_ids = [_id[1:-1] for _id in self.tokenizer(sub_labels)['input_ids']] for i in range(len(token_ids)): token_ids[i] = token_ids[i][:self.max_label_len] # 对标签进行截断与补齐 if len(token_ids[i]) < self.max_label_len: token_ids[i] = token_ids[i] + [self.tokenizer.pad_token_id] * (self.max_label_len - len(token_ids[i])) ret['token_ids'] = token_ids return ret def batch_find_sub_labels(self, label: List[Union[list, str]]): """ 批量找到子标签。 Args: label (List[list, str]): 标签列表, [[4510, 5554], [860, 5509]] or ['体育', '电脑'] Returns: list -> [ { 'sub_labels': ['足球', '网球'], 'token_ids': [[6639, 4413], [5381, 4413]] }, ... ] """ return [self.find_sub_labels(l) for l in label] def get_common_sub_str(self, str1: str, str2: str): """ 寻找最大公共子串。 str1:abcd str2:abadbcdba """ lstr1, lstr2 = len(str1), len(str2) record = [[0 for i in range(lstr2 + 1)] for j in range(lstr1 + 1)] p = 0 # 最长匹配对应在str1中的最后一位 maxNum = 0 # 最长匹配长度 for i in range(lstr1): for j in range(lstr2): if str1[i] == str2[j]: record[i+1][j+1] = record[i][j] + 1 if record[i+1][j+1] > maxNum: maxNum = record[i+1][j+1] p = i + 1 return str1[p-maxNum:p], maxNum def hard_mapping(self, sub_label: str): """ 强匹配函数,当模型生成的子label不存在时,通过最大公共子串找到重合度最高的主label。 Args: sub_label (str): 子label。 Returns: str: 主label。 """ label, max_overlap_str = '', 0 for main_label, sub_labels in self.label_dict.items(): overlap_num = 0 for s_label in sub_labels: # 求所有子label与当前推理label之间的最长公共子串长度 overlap_num += self.get_common_sub_str(sub_label, s_label)[1] if overlap_num >= max_overlap_str: max_overlap_str = overlap_num label = main_label return label def find_main_label(self, sub_label: List[Union[list, str]], hard_mapping=True): """ 通过子标签找到父标签。 Args: sub_label (List[Union[list, str]]): 子标签, 文本型 或 id_list, e.g. -> '苹果' or [5741, 3362] hard_mapping (bool): 当生成的词语不存在时,是否一定要匹配到一个最相似的label。 Returns: dict -> { 'label': '水果', 'token_ids': [3717, 3362] } """ if type(sub_label) == list: # 如果传入为id_list, 则通过tokenizer转回来 pad_token_id = self.tokenizer.pad_token_id while pad_token_id in sub_label: # 移除[PAD]token sub_label.remove(pad_token_id) sub_label = ''.join(self.tokenizer.convert_ids_to_tokens(sub_label)) main_label = '无' for label, s_labels in self.label_dict.items(): if sub_label in s_labels: main_label = label break if main_label == '无' and hard_mapping: main_label = self.hard_mapping(sub_label) ret = { 'label': main_label, 'token_ids': self.tokenizer(main_label)['input_ids'][1:-1] } return ret def batch_find_main_label(self, sub_label: List[Union[list, str]], hard_mapping=True): """ 批量通过子标签找父标签。 Args: sub_label (List[Union[list, str]]): 子标签列表, ['苹果', ...] or [[5741, 3362], ...] Returns: list: [ { 'label': '水果', 'token_ids': [3717, 3362] }, ... ] """ return [self.find_main_label(l, hard_mapping) for l in sub_label] if __name__ == '__main__': from rich import print from transformers import AutoTokenizer tokenizer = AutoTokenizer.from_pretrained(pc.pre_model) verbalizer = Verbalizer( verbalizer_file=pc.verbalizer, tokenizer=tokenizer, max_label_len=2 ) print(verbalizer.label_dict) label = [[4510, 5554], [6132, 3302]] ret = verbalizer.batch_find_sub_labels(label) print(ret)
1.2 common_utils.py
- 目的:定义损失函数、将mask_position位置的token logits转换为token的id。
- 脚本里面包含两个函数:mlm_loss()以及convert_logits_to_ids()
- 导入必备的工具包:
import torch from rich import print
- 定义损失函数mlm_loss()
def mlm_loss(logits, mask_positions, sub_mask_labels, cross_entropy_criterion, device): """ 计算指定位置的mask token的output与label之间的cross entropy loss。 Args: logits (torch.tensor): 模型原始输出 -> (batch, seq_len, vocab_size) mask_positions (torch.tensor): mask token的位置 -> (batch, mask_label_num) sub_mask_labels (list): mask token的sub label, 由于每个label的sub_label数目不同,所以 这里是个变长的list, e.g. -> [ [[2398, 3352]], [[2398, 3352], [3819, 3861]] ] cross_entropy_criterion (CrossEntropyLoss): CE Loss计算器 device (str): cpu还是gpu Returns: torch.tensor: CE Loss """ batch_size, seq_len, vocab_size = logits.size() loss = None for single_value in zip(logits, sub_mask_labels, mask_positions): single_logits = single_value[0] single_sub_mask_labels = single_value[1] single_mask_positions = single_value[2] single_mask_logits = single_logits[single_mask_positions] single_mask_logits = single_mask_logits.repeat(len(single_sub_mask_labels), 1, 1) single_mask_logits = single_mask_logits.reshape(-1, vocab_size) single_sub_mask_labels = torch.LongTensor(single_sub_mask_labels).to(device) single_sub_mask_labels = single_sub_mask_labels.reshape(-1, 1).squeeze() if not single_sub_mask_labels.size(): # 处理单token维度下维度缺失的问题 single_sub_mask_labels = single_sub_mask_labels.unsqueeze(dim=0) cur_loss = cross_entropy_criterion(single_mask_logits, single_sub_mask_labels) cur_loss = cur_loss / len(single_sub_mask_labels) if not loss: loss = cur_loss else: loss += cur_loss loss = loss / batch_size return loss
- 定义convert_logits_to_ids()函数
def convert_logits_to_ids( logits: torch.tensor, mask_positions: torch.tensor): """ 输入LM的词表概率分布(LMModel的logits),将mask_position位置的 token logits转换为token的id。 Args: logits (torch.tensor): model output -> (batch, seq_len, vocab_size) mask_positions (torch.tensor): mask token的位置 -> (batch, mask_label_num) Returns: torch.LongTensor: 对应mask position上最大概率的推理token -> (batch, mask_label_num) """ label_length = mask_positions.size()[1] # 标签长度 batch_size, seq_len, vocab_size = logits.size() mask_positions_after_reshaped = [] for batch, mask_pos in enumerate(mask_positions.detach().cpu().numpy().tolist()): for pos in mask_pos: mask_positions_after_reshaped.append(batch * seq_len + pos) logits = logits.reshape(batch_size * seq_len, -1) mask_logits = logits[mask_positions_after_reshaped] predict_tokens = mask_logits.argmax(dim=-1) predict_tokens = predict_tokens.reshape(-1, label_length) # (batch, label_num) return predict_tokens
1.3 metirc_utils.py
- 目的:定义(多)分类问题下的指标评估(acc, precision, recall, f1)。
- 导入必备的工具包:
from typing import List import numpy as np import pandas as pd from sklearn.metrics import accuracy_score, precision_score, f1_score from sklearn.metrics import recall_score, confusion_matrix
- 定义ClassEvaluator类
class ClassEvaluator(object): def __init__(self): self.goldens = [] self.predictions = [] def add_batch(self, pred_batch: List[List], gold_batch: List[List]): """ 添加一个batch中的prediction和gold列表,用于后续统一计算。 Args: pred_batch (list): 模型预测标签列表, e.g. -> [0, 0, 1, 2, 0, ...] or [['体', '育'], ['财', '经'], ...] gold_batch (list): 真实标签标签列表, e.g. -> [1, 0, 1, 2, 0, ...] or [['体', '育'], ['财', '经'], ...] """ assert len(pred_batch) == len(gold_batch) if type(gold_batch[0]) in [list, tuple]: pred_batch = [','.join([str(e) for e in ele]) for ele in pred_batch] gold_batch = [','.join([str(e) for e in ele]) for ele in gold_batch] self.goldens.extend(gold_batch) self.predictions.extend(pred_batch) def compute(self, round_num=2) -> dict: """ 根据当前类中累积的变量值,计算当前的P, R, F1。 Args: round_num (int): 计算结果保留小数点后几位, 默认小数点后2位。 Returns: dict -> { 'accuracy': 准确率, 'precision': 精准率, 'recall': 召回率, 'f1': f1值, 'class_metrics': { '0': { 'precision': 该类别下的precision, 'recall': 该类别下的recall, 'f1': 该类别下的f1 }, ... } } """ classes, class_metrics, res = sorted(list(set(self.goldens) | set(self.predictions))), {}, {} res['accuracy'] = round(accuracy_score(self.goldens, self.predictions), round_num) res['precision'] = round(precision_score(self.goldens, self.predictions, average='weighted'), round_num) res['recall'] = round(recall_score(self.goldens, self.predictions, average='weighted'), round_num) res['f1'] = round(f1_score(self.goldens, self.predictions, average='weighted'), round_num) try: conf_matrix = np.array(confusion_matrix(self.goldens, self.predictions)) # (n_class, n_class) assert conf_matrix.shape[0] == len(classes) for i in range(conf_matrix.shape[0]): # 构建每个class的指标 precision = 0 if sum(conf_matrix[:, i]) == 0 else conf_matrix[i, i] / sum(conf_matrix[:, i]) recall = 0 if sum(conf_matrix[i, :]) == 0 else conf_matrix[i, i] / sum(conf_matrix[i, :]) f1 = 0 if (precision + recall) == 0 else 2 * precision * recall / (precision + recall) class_metrics[classes[i]] = { 'precision': round(precision, round_num), 'recall': round(recall, round_num), 'f1': round(f1, round_num) } res['class_metrics'] = class_metrics except Exception as e: print(f'[Warning] Something wrong when calculate class_metrics: {e}') print(f'-> goldens: {set(self.goldens)}') print(f'-> predictions: {set(self.predictions)}') print(f'-> diff elements: {set(self.predictions) - set(self.goldens)}') res['class_metrics'] = {} return res def reset(self): """ 重置积累的数值。 """ self.goldens = [] self.predictions = []
BERT+PET方式模型训练(二)+https://developer.aliyun.com/article/1544776?spm=a2c6h.13148508.setting.30.22454f0eHFZZj3