BERT+PET方式模型训练(一)

简介: • 本项目中完成BERT+PET模型搭建、训练及应用的步骤如下(注意:因为本项目中使用的是BERT预训练模型,所以直接加载即可,无需重复搭建模型架构):• 一、实现模型工具类函数• 二、实现模型训练函数,验证函数• 三、实现模型预测函数

基于BERT+PET方式文本分类模型搭建


模型搭建


  • 本项目中完成BERT+PET模型搭建、训练及应用的步骤如下(注意:因为本项目中使用的是BERT预训练模型,所以直接加载即可,无需重复搭建模型架构):


  • 一、实现模型工具类函数
  • 二、实现模型训练函数,验证函数
  • 三、实现模型预测函数


一、实现模型工具类函数


  • 目的:模型在训练、验证、预测时需要的函数
  • 代码路径:/Users/**/PycharmProjects/llm/prompt_tasks/PET/utils
  • utils文件夹共包含3个py脚本:verbalizer.py、metirc_utils.py以及common_utils.py


1.1 verbalizer.py


  • 目的:定义一个Verbalizer类,用于将一个Label对应到其子Label的映射。
  • 导入必备的工具包


# -*- coding:utf-8 -*-
import os
from typing import Union, List
from pet_config import *
pc = ProjectConfig()


  • 具体实现代码


class Verbalizer(object):
    """
    Verbalizer类,用于将一个Label对应到其子Label的映射。
    """

    def __init__(self, verbalizer_file: str, tokenizer, max_label_len: int):
        """
        Args:
            verbalizer_file (str): verbalizer文件存放地址。
            tokenizer: 分词器,用于文本和id之间的转换。
            max_label_len (int): 标签长度,若大于则截断,若小于则补齐
        """
        self.tokenizer = tokenizer
        self.label_dict = self.load_label_dict(verbalizer_file)
        self.max_label_len = max_label_len

    def load_label_dict(self, verbalizer_file: str):
        """
        读取本地文件,构建verbalizer字典。
        Args:
            verbalizer_file (str): verbalizer文件存放地址。
        Returns:
            dict -> {
                '体育': ['篮球', '足球','网球', '排球',  ...],
                '酒店': ['宾馆', '旅馆', '旅店', '酒店', ...],
                ...
            }
        """
        label_dict = {}
        with open(verbalizer_file, 'r', encoding='utf8') as f:
            for line in f.readlines():
                label, sub_labels = line.strip().split('\t')
                label_dict[label] = list(set(sub_labels.split(',')))
        return label_dict
    
    def find_sub_labels(self, label: Union[list, str]):
        """
        通过标签找到对应所有的子标签。
      Args:
            label (Union[list, str]): 标签, 文本型  id_list, e.g. -> '体育' or [860, 5509]
     
      Returns:
            dict -> {
                'sub_labels': ['足球', '网球'], 
                'token_ids': [[6639, 4413], [5381, 4413]]
            }
        """
        if type(label) == list:    # 如果传入为id_list, 则通过tokenizer进行文本转换
            while self.tokenizer.pad_token_id in label:
                label.remove(self.tokenizer.pad_token_id)
            label = ''.join(self.tokenizer.convert_ids_to_tokens(label))
        # print(f'label-->{label}')
        if label not in self.label_dict:
            raise ValueError(f'Lable Error: "{label}" not in label_dict')
        
        sub_labels = self.label_dict[label]
        ret = {'sub_labels': sub_labels}
        token_ids = [_id[1:-1] for _id in self.tokenizer(sub_labels)['input_ids']]
        # print(f'token_ids-->{token_ids}')
        for i in range(len(token_ids)):
            token_ids[i] = token_ids[i][:self.max_label_len]  # 对标签进行截断与补齐
            if len(token_ids[i]) < self.max_label_len:
                token_ids[i] = token_ids[i] + [self.tokenizer.pad_token_id] * (self.max_label_len - len(token_ids[i]))
        ret['token_ids'] = token_ids
        return ret
    
    def batch_find_sub_labels(self, label: List[Union[list, str]]):
        """
        批量找到子标签。

        Args:
        label (List[list, str]): 标签列表, [[4510, 5554], [860, 5509]] or ['体育', '电脑']

        Returns:
            list -> [
                        {
                         'sub_labels': ['足球', '网球'], 
                                 'token_ids': [[6639, 4413], [5381, 4413]]
                        },
                        ...
                    ]
        """
        return [self.find_sub_labels(l) for l in label]

    def get_common_sub_str(self, str1: str, str2: str):
        """
        寻找最大公共子串。
        str1:abcd
        str2:abadbcdba
        """
        lstr1, lstr2 = len(str1), len(str2)
        # 生成0矩阵,为方便后续计算,比字符串长度多了一列
        record = [[0 for i in range(lstr2 + 1)] for j in range(lstr1 + 1)]
        p = 0  # 最长匹配对应在str1中的最后一位
        maxNum = 0  # 最长匹配长度

        for i in range(lstr1):
            for j in range(lstr2):
                if str1[i] == str2[j]:
                    record[i+1][j+1] = record[i][j] + 1
                    if record[i+1][j+1] > maxNum:
                        maxNum = record[i+1][j+1]
                        p = i + 1

        return str1[p-maxNum:p], maxNum


    
    def hard_mapping(self, sub_label: str):
        """
        强匹配函数,当模型生成的子label不存在时,通过最大公共子串找到重合度最高的主label。

        Args:
            sub_label (str): 子label。

        Returns:
            str: 主label。
        """
        label, max_overlap_str = '', 0
        # print(self.label_dict.items())
        for main_label, sub_labels in self.label_dict.items():
            overlap_num = 0
            for s_label in sub_labels:  # 求所有子label与当前推理label之间的最长公共子串长度
                overlap_num += self.get_common_sub_str(sub_label, s_label)[1]
            if overlap_num >= max_overlap_str:
                max_overlap_str = overlap_num
                label = main_label
        return label

    def find_main_label(self, sub_label: List[Union[list, str]], hard_mapping=True):
        """
        通过子标签找到父标签。

        Args:
            sub_label (List[Union[list, str]]): 子标签, 文本型  id_list, e.g. -> '苹果' or [5741, 3362]
            hard_mapping (bool): 当生成的词语不存在时,是否一定要匹配到一个最相似的label。

        Returns:
            dict -> {
                'label': '水果', 
                'token_ids': [3717, 3362]
            }
        """
        if type(sub_label) == list:     # 如果传入为id_list, 则通过tokenizer转回来
            pad_token_id = self.tokenizer.pad_token_id
            while pad_token_id in sub_label:           # 移除[PAD]token
                sub_label.remove(pad_token_id)
            sub_label = ''.join(self.tokenizer.convert_ids_to_tokens(sub_label))
        # print(sub_label)
        main_label = '无'
        for label, s_labels in self.label_dict.items():
            if sub_label in s_labels:
                main_label = label
                break

        if main_label == '无' and hard_mapping:
            main_label = self.hard_mapping(sub_label)
        # print(main_label)
        ret = {
            'label': main_label,
            'token_ids': self.tokenizer(main_label)['input_ids'][1:-1]
        }
        return ret

    def batch_find_main_label(self, sub_label: List[Union[list, str]], hard_mapping=True):
        """
        批量通过子标签找父标签。

        Args:
            sub_label (List[Union[list, str]]): 子标签列表, ['苹果', ...] or [[5741, 3362], ...]

        Returns:
            list: [
                    {
                    'label': '水果', 
                    'token_ids': [3717, 3362]
                    },
                    ...
            ]
        """
        return [self.find_main_label(l, hard_mapping) for l in sub_label]


if __name__ == '__main__':
    from rich import print
    from transformers import AutoTokenizer

    tokenizer = AutoTokenizer.from_pretrained(pc.pre_model)
    verbalizer = Verbalizer(
        verbalizer_file=pc.verbalizer,
        tokenizer=tokenizer,
        max_label_len=2
    )
    print(verbalizer.label_dict)
    # label = [4510, 5554]
    # ret = verbalizer.find_sub_labels(label)
    # label = ['电脑', '衣服']
    label = [[4510, 5554], [6132, 3302]]
    ret = verbalizer.batch_find_sub_labels(label)
    print(ret)


1.2 common_utils.py


  • 目的:定义损失函数、将mask_position位置的token logits转换为token的id。
  • 脚本里面包含两个函数:mlm_loss()以及convert_logits_to_ids()
  • 导入必备的工具包:


# coding:utf-8
# 导入必备工具包
import torch
from rich import print


  • 定义损失函数mlm_loss()


def mlm_loss(logits, mask_positions, sub_mask_labels,
             cross_entropy_criterion, device):
    """
    计算指定位置的mask token的output与label之间的cross entropy loss。

    Args:
        logits (torch.tensor): 模型原始输出 -> (batch, seq_len, vocab_size)
        mask_positions (torch.tensor): mask token的位置  -> (batch, mask_label_num)
        sub_mask_labels (list): mask token的sub label, 由于每个label的sub_label数目不同,所以  这里是个变长的list,
                                    e.g. -> [
                                        [[2398, 3352]],
                                        [[2398, 3352], [3819, 3861]]
                                    ]
        cross_entropy_criterion (CrossEntropyLoss): CE Loss计算器
        device (str): cpu还是gpu

    Returns:
        torch.tensor: CE Loss
    """
    batch_size, seq_len, vocab_size = logits.size()
    loss = None
    for single_value in zip(logits, sub_mask_labels, mask_positions):
        single_logits = single_value[0]
                single_sub_mask_labels = single_value[1]
        single_mask_positions = single_value[2]
        
        # single_mask_logits形状:(mask_label_num, vocab_size)
        single_mask_logits = single_logits[single_mask_positions] 
        
        # single_mask_logits按照子标签的长度进行复制:
        # single_mask_logits形状-->(sub_label_num, mask_label_num, vocab_size)
        single_mask_logits = single_mask_logits.repeat(len(single_sub_mask_labels), 1,
                                                       1)  
        
        #single_mask_logits改变形状:(sub_label_num * mask_label_num, vocab_size)
        #模型预测的结果
        single_mask_logits = single_mask_logits.reshape(-1, vocab_size)
                
        # single_sub_mask_labels形状:(sub_label_num, mask_label_num)
        single_sub_mask_labels = torch.LongTensor(single_sub_mask_labels).to(device)  
        
        # single_sub_mask_labels形状: # (sub_label_num * mask_label_num)
        single_sub_mask_labels = single_sub_mask_labels.reshape(-1, 1).squeeze() 
        
        if not single_sub_mask_labels.size():  # 处理单token维度下维度缺失的问题
            single_sub_mask_labels = single_sub_mask_labels.unsqueeze(dim=0)
            
        cur_loss = cross_entropy_criterion(single_mask_logits, single_sub_mask_labels)
        cur_loss = cur_loss / len(single_sub_mask_labels)

        if not loss:
            loss = cur_loss
        else:
            loss += cur_loss

    loss = loss / batch_size
    return loss


  • 定义convert_logits_to_ids()函数


def convert_logits_to_ids(
        logits: torch.tensor,
        mask_positions: torch.tensor):
    """
    输入LM的词表概率分布(LMModel的logits),将mask_position位置的
    token logits转换为token的id。

    Args:
        logits (torch.tensor): model output -> (batch, seq_len, vocab_size)
        mask_positions (torch.tensor): mask token的位置 -> (batch, mask_label_num)

    Returns:
        torch.LongTensor: 对应mask position上最大概率的推理token -> (batch, mask_label_num)
    """
    label_length = mask_positions.size()[1]  # 标签长度
    # print(f'label_length--》{label_length}')
    batch_size, seq_len, vocab_size = logits.size()

    mask_positions_after_reshaped = []

    for batch, mask_pos in enumerate(mask_positions.detach().cpu().numpy().tolist()):
        for pos in mask_pos:
            mask_positions_after_reshaped.append(batch * seq_len + pos)
            
    # logits形状:(batch_size * seq_len, vocab_size)
    logits = logits.reshape(batch_size * seq_len, -1) 
    
    # mask_logits形状:(batch * label_num, vocab_size)
    mask_logits = logits[mask_positions_after_reshaped]
    
    # predict_tokens形状: (batch * label_num)
    predict_tokens = mask_logits.argmax(dim=-1)
    
    # 改变后的predict_tokens形状: (batch, label_num)
    predict_tokens = predict_tokens.reshape(-1, label_length)  # (batch, label_num)

    return predict_tokens


1.3 metirc_utils.py


  • 目的:定义(多)分类问题下的指标评估(acc, precision, recall, f1)。
  • 导入必备的工具包:


from typing import List

import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, f1_score
from sklearn.metrics import recall_score, confusion_matrix


  • 定义ClassEvaluator类


class ClassEvaluator(object):

    def __init__(self):
        self.goldens = []
        self.predictions = []

    def add_batch(self, pred_batch: List[List], gold_batch: List[List]):
        """
        添加一个batch中的prediction和gold列表,用于后续统一计算。

        Args:
            pred_batch (list): 模型预测标签列表, e.g. -> [0, 0, 1, 2, 0, ...] or [['体', '育'], ['财', '经'], ...]
            gold_batch (list): 真实标签标签列表, e.g. -> [1, 0, 1, 2, 0, ...] or [['体', '育'], ['财', '经'], ...]
        """
        assert len(pred_batch) == len(gold_batch)
                
        # 若遇到多个子标签构成一个标签的情况
        if type(gold_batch[0]) in [list, tuple]:  
            # 将所有的label拼接为一个整label: ['体', '育'] -> '体育'
            pred_batch = [','.join([str(e) for e in ele]) for ele in pred_batch]  
            gold_batch = [','.join([str(e) for e in ele]) for ele in gold_batch]
            
        self.goldens.extend(gold_batch)
        self.predictions.extend(pred_batch)

    def compute(self, round_num=2) -> dict:
        """
        根据当前类中累积的变量值,计算当前的P, R, F1。

        Args:
            round_num (int): 计算结果保留小数点后几位, 默认小数点后2位。

        Returns:
            dict -> {
                'accuracy': 准确率,
                'precision': 精准率,
                'recall': 召回率,
                'f1': f1值,
                'class_metrics': {
                    '0': {
                            'precision': 该类别下的precision,
                            'recall': 该类别下的recall,
                            'f1': 该类别下的f1
                        },
                    ...
                }
            }
        """
        classes, class_metrics, res = sorted(list(set(self.goldens) | set(self.predictions))), {}, {}
        
        # 构建全局指标
        res['accuracy'] = round(accuracy_score(self.goldens, self.predictions), round_num)  
        
        res['precision'] = round(precision_score(self.goldens, self.predictions, average='weighted'), round_num)
        
        # average='weighted'代表:考虑类别的不平衡性,需要计算类别的加权平均。如果是二分类问题则选择参数‘binary‘
        res['recall'] = round(recall_score(self.goldens, self.predictions, average='weighted'), round_num)
        
        res['f1'] = round(f1_score(self.goldens, self.predictions, average='weighted'), round_num)

        try:
            conf_matrix = np.array(confusion_matrix(self.goldens, self.predictions))  # (n_class, n_class)
            assert conf_matrix.shape[0] == len(classes)
            for i in range(conf_matrix.shape[0]):  # 构建每个class的指标
                precision = 0 if sum(conf_matrix[:, i]) == 0 else conf_matrix[i, i] / sum(conf_matrix[:, i])
                recall = 0 if sum(conf_matrix[i, :]) == 0 else conf_matrix[i, i] / sum(conf_matrix[i, :])
                f1 = 0 if (precision + recall) == 0 else 2 * precision * recall / (precision + recall)
                class_metrics[classes[i]] = {
                    'precision': round(precision, round_num),
                    'recall': round(recall, round_num),
                    'f1': round(f1, round_num)
                }
            res['class_metrics'] = class_metrics
        except Exception as e:
            print(f'[Warning] Something wrong when calculate class_metrics: {e}')
            print(f'-> goldens: {set(self.goldens)}')
            print(f'-> predictions: {set(self.predictions)}')
            print(f'-> diff elements: {set(self.predictions) - set(self.goldens)}')
            res['class_metrics'] = {}

        return res

    def reset(self):
        """
        重置积累的数值。
        """
        self.goldens = []
        self.predictions = []




BERT+PET方式模型训练(二)+https://developer.aliyun.com/article/1544776?spm=a2c6h.13148508.setting.30.22454f0eHFZZj3

相关文章
|
6月前
BERT+PET方式模型训练(二)
• 本项目中完成BERT+PET模型搭建、训练及应用的步骤如下(注意:因为本项目中使用的是BERT预训练模型,所以直接加载即可,无需重复搭建模型架构): • 一、实现模型工具类函数 • 二、实现模型训练函数,验证函数 • 三、实现模型预测函数
|
7月前
|
机器学习/深度学习 人工智能 开发工具
如何快速部署本地训练的 Bert-VITS2 语音模型到 Hugging Face
Hugging Face是一个机器学习(ML)和数据科学平台和社区,帮助用户构建、部署和训练机器学习模型。它提供基础设施,用于在实时应用中演示、运行和部署人工智能(AI)。用户还可以浏览其他用户上传的模型和数据集。Hugging Face通常被称为机器学习界的GitHub,因为它让开发人员公开分享和测试他们所训练的模型。 本次分享如何快速部署本地训练的 Bert-VITS2 语音模型到 Hugging Face。
如何快速部署本地训练的 Bert-VITS2 语音模型到 Hugging Face
|
7月前
|
PyTorch 算法框架/工具
Bert Pytorch 源码分析:五、模型架构简图 REV1
Bert Pytorch 源码分析:五、模型架构简图 REV1
100 0
|
7月前
|
PyTorch 算法框架/工具
Bert Pytorch 源码分析:五、模型架构简图
Bert Pytorch 源码分析:五、模型架构简图
77 0
|
2月前
|
自然语言处理 PyTorch 算法框架/工具
掌握从零到一的进阶攻略:让你轻松成为BERT微调高手——详解模型微调全流程,含实战代码与最佳实践秘籍,助你应对各类NLP挑战!
【10月更文挑战第1天】随着深度学习技术的进步,预训练模型已成为自然语言处理(NLP)领域的常见实践。这些模型通过大规模数据集训练获得通用语言表示,但需进一步微调以适应特定任务。本文通过简化流程和示例代码,介绍了如何选择预训练模型(如BERT),并利用Python库(如Transformers和PyTorch)进行微调。文章详细说明了数据准备、模型初始化、损失函数定义及训练循环等关键步骤,并提供了评估模型性能的方法。希望本文能帮助读者更好地理解和实现模型微调。
88 2
掌握从零到一的进阶攻略:让你轻松成为BERT微调高手——详解模型微调全流程,含实战代码与最佳实践秘籍,助你应对各类NLP挑战!
|
2月前
|
机器学习/深度学习 自然语言处理 知识图谱
|
2月前
|
机器学习/深度学习 自然语言处理 算法
[大语言模型-工程实践] 手把手教你-基于BERT模型提取商品标题关键词及优化改进
[大语言模型-工程实践] 手把手教你-基于BERT模型提取商品标题关键词及优化改进
195 0
|
3月前
|
搜索推荐 算法
模型小,还高效!港大最新推荐系统EasyRec:零样本文本推荐能力超越OpenAI、Bert
【9月更文挑战第21天】香港大学研究者开发了一种名为EasyRec的新推荐系统,利用语言模型的强大文本理解和生成能力,解决了传统推荐算法在零样本学习场景中的局限。EasyRec通过文本-行为对齐框架,结合对比学习和协同语言模型调优,提升了推荐准确性。实验表明,EasyRec在多个真实世界数据集上的表现优于现有模型,但其性能依赖高质量文本数据且计算复杂度较高。论文详见:http://arxiv.org/abs/2408.08821
82 7
|
2月前
|
机器学习/深度学习 人工智能 自然语言处理
【AI大模型】BERT模型:揭秘LLM主要类别架构(上)
【AI大模型】BERT模型:揭秘LLM主要类别架构(上)
|
4月前
|
机器学习/深度学习 存储 自然语言处理
【NLP-新闻文本分类】3 Bert模型的对抗训练
详细介绍了使用BERT模型进行新闻文本分类的过程,包括数据集预处理、使用预处理数据训练BERT语料库、加载语料库和词典后用原始数据训练BERT模型,以及模型测试。
81 1

热门文章

最新文章

下一篇
DataWorks