structure_converter.py:抽象基类用于定义结构转换器的接口
class StructureConverter(object): def structure_to_input(self, input_dict: dict, prompt_part_only: bool = False) -> str: raise NotImplementedError() def output_to_structure(self, input_dict: dict, output_str: str): raise NotImplementedError() @staticmethod def to_function_head(s,input=''): return f'def {s}({input}):' @staticmethod def to_function_name(s): s = s.replace(".", "").replace(",", "") # remove DT tok = s.lower().split() tok = [x for x in tok if x not in ['the', 'a', 'an']] return '_'.join(tok) @staticmethod def list_to_str(l): # remove \n l = [x.replace("\n", " ") if x != '\n' else '' for x in l] l = '\n'.join(l) return l
这是一个名为 StructureConverter
的Python类,它是一个抽象基类(Abstract Base Class)用于定义结构转换器的接口。这个类包含以下几个方法和静态方法:
structure_to_input(self, input_dict: dict, prompt_part_only: bool = False) -> str
:这是一个抽象方法,用于将输入数据结构转换为文本输入。派生类应该实现这个方法来定义特定任务的输入结构到文本输入的转换逻辑。output_to_structure(self, input_dict: dict, output_str: str)
:这是另一个抽象方法,用于将输出文本转换回数据结构。与structure_to_input
类似,它的实现应该由派生类来完成。to_function_head(s, input='')
:这是一个静态方法,用于生成Python函数的头部,其中s
是函数的名称,input
是函数的参数。它返回一个字符串,表示函数定义。to_function_name(s)
:这是另一个静态方法,用于生成合法的Python函数名。给定一个字符串s
,它将字符串处理为合适的Python函数名称格式,删除标点符号并将单词连接成下划线分隔的格式。list_to_str(l)
:这也是一个静态方法,用于将字符串列表l
转换为一个字符串,并处理换行符\n
。它将列表中的每个字符串连接成一个字符串,换行符替换为空格,返回一个多行的字符串。
StructureConverter
类本身是一个抽象基类,无法直接实例化。相反,它提供了接口和一些实用方法,供派生类实现和使用,以定义特定任务的结构转换逻辑。这个类可以作为其他具体结构转换器类的基类,以提供通用的接口和方法。
utils.py:字符串处理、列表操作和代码编译的实用工具函数
import re import sys from typing import Tuple def match_sublist(the_list, to_match): """ :param the_list: [1, 2, 3, 4, 5, 6, 1, 2, 4, 5] :param to_match: [1, 2] :return: [(0, 1), (6, 7)] """ len_to_match = len(to_match) matched_list = list() for index in range(len(the_list) - len_to_match + 1): if to_match == the_list[index:index + len_to_match]: matched_list += [(index, index + len_to_match - 1)] return matched_list def check_overlap(x, y): if x[0] > y[1] or y[0] > x[1]: return False else: return True def get_index_tuple(matched: Tuple[int, int]): return tuple(range(matched[0], matched[1] + 1)) def span_to_token(text, span_to_token_strategy='space'): if span_to_token_strategy == 'space': return text.split(' ') elif span_to_token_strategy == 'list': return list(text) else: raise NotImplementedError( f"The span to token strategy {span_to_token_strategy} is not implemented.") def to_camel_case(title: str) -> str: """Converts a proscript title to a camel case string. Example: title: travel to the theme park camel_case: TravelToThemePark """ if "." == title[-1]: title = title[:-1] title_tokens = title.split(" ") title_camel_case = "" for token in title_tokens: title_camel_case += token.capitalize() return title_camel_case def to_snake_case(name): # replace all space and punctuation with underscore if name[-1] == ".": name = name[:-1] name = re.sub(r'[\s\W]', '_', name) return name.lower().strip() def from_snake_to_normal_str(snake_str: str) -> str: """Converts a snake case string to a normal string. Example: snake_str: travel_to_the_theme_park normal_str: travel to the theme park """ return " ".join(snake_str.split("_")) def compile_code_get_object(py_code_str: str): """Given python code as a string, compiles it and returns an object of the class contained in the string. Args: code (str): _description_ """ # compile the code try: py_code = compile(py_code_str, "<string>", "exec") except SyntaxError: # try without the last k lines in py_code_str: usually the last line is incomplete for k in range(1, 3): try: lines = py_code_str.split("\n") lines = "\n".join(lines[:-k]) py_code = compile(lines, "<string>", "exec") except SyntaxError as e: print(f"Error compiling python code:\n{py_code_str}") raise e # instantiate the class py_code_dict = {} exec(py_code, py_code_dict) # the newly instantiated class will be last in the scope py_code_class = py_code_dict[list(py_code_dict.keys())[-1]]() return py_code_class
这些函数是一组用于字符串处理、列表操作和代码编译的实用工具函数。以下是这些函数的简要描述:
match_sublist(the_list, to_match)
:查找列表the_list
中所有与to_match
匹配的子列表,并返回它们的索引范围。例如,the_list
是[1, 2, 3, 4, 5, 6, 1, 2, 4, 5]
,to_match
是[1, 2]
,则返回[(0, 1), (6, 7)]
,表示匹配的子列表的起始和结束索引。check_overlap(x, y)
:检查两个索引范围x
和y
是否有重叠,如果有重叠返回True
,否则返回False
。get_index_tuple(matched: Tuple[int, int])
:将匹配的索引范围matched
转换为一个元组,包含范围内的所有索引。span_to_token(text, span_to_token_strategy='space')
:将文本text
按照指定策略span_to_token_strategy
分割为单词或字符列表。默认使用空格分割策略,可以选择'list'
策略以将文本拆分为字符列表。to_camel_case(title: str)
:将一个标题字符串title
转换为驼峰命名法格式。例如,将 “travel to the theme park” 转换为 “TravelToThemePark”。to_snake_case(name)
:将字符串name
转换为蛇形命名法(snake_case)格式,用下划线分隔单词。from_snake_to_normal_str(snake_str: str)
:将蛇形命名法字符串snake_str
转换为普通字符串,通过删除下划线并用空格分隔单词。compile_code_get_object(py_code_str: str)
:编译给定的 Python 代码字符串py_code_str
并返回该代码中包含的类的对象。这个函数首先编译代码,然后实例化类并返回其对象。
这些函数提供了一些用于文本处理、字符串格式转换和代码执行的常见功能,可在不同的上下文中使用。
eval
eval_extraction.py:评估NLP任务的结果与金标准之间的差异
#!/usr/bin/env python # -*- coding:utf-8 -*- import argparse import json import os import sys import numpy as np from pprint import pprint from src.eval.scorer import EntityScorer, RelationScorer, EventScorer def read_file(file_name): return [line for line in open(file_name).readlines()] def write_to_file(result, output_filename, prefix=None): with open(output_filename, 'w') as output: for key, value in result.items(): if prefix: key = '%s_%s' % (prefix, key) output.write("%s=%s\n" % (key, value)) def main(): parser = argparse.ArgumentParser() parser.add_argument('-g', dest='gold_folder', help="Golden Dataset folder") parser.add_argument('-gf', dest='gold_file', help="Golden Dataset File") parser.add_argument('-p', dest='pred_folder', nargs='+', help="Predicted model folder") parser.add_argument('-pf', dest='pred_file', help="Predicted model file") parser.add_argument('-sf', dest='saved_file', help="Saved result file") parser.add_argument('-v', dest='verbose', action='store_true', help='Show more information during running') parser.add_argument('-w', dest='write_to_file', action='store_true', help="Write evaluation results to predicted folder") parser.add_argument('-m', dest='match_mode', default='normal', choices=['set', 'normal', 'multimatch']) parser.add_argument('-case', dest='case', action='store_true', help='Show case study') options = parser.parse_args() data_dict = { 'test': [options.pred_file, options.gold_file], } task_dict = { 'entity': EntityScorer, 'relation': RelationScorer, 'event': EventScorer, } result_list = {'eval': list(), 'test': list()} for pred_folder in options.pred_folder: gold_folder = options.gold_folder for data_key, (generation, gold_file) in data_dict.items(): gold_filename = os.path.join(gold_folder, gold_file) pred_filename = os.path.join(pred_folder, generation) if not os.path.exists(pred_filename): sys.stderr.write("%s not found.\n" % pred_filename) continue print("pred:", pred_filename) print("gold:", gold_filename) if options.case: for pred_line, gold_line in zip(read_file(pred_filename), read_file(gold_filename)): gold_instance = json.loads(gold_line) pred_instance = json.loads(pred_line) print('=========================') print(gold_instance['text']) for task in task_dict: scorer = task_dict[task] gold = scorer.load_gold_list([gold_instance[task]])[0] pred = scorer.load_pred_list([pred_instance[task]])[0] min_length = max( len(gold['string']), len(pred['string']), len(gold.get('string_trigger', [])), len(pred.get('string_trigger', [])), len(gold.get('string_role', [])), len(pred.get('string_role', [])), ) if min_length == 0: continue if task == 'entity': print("Entity Gold:", sorted(gold['string'])) print("Entity Pred:", sorted(pred['string'])) if task == 'relation': print("Relation Gold:", sorted(gold['string'])) print("Relation Pred:", sorted(pred['string'])) if task == 'event': print("Event Gold Trigger:", sorted(gold['string_trigger'])) print("Event Pred Trigger:", sorted(pred['string_trigger'])) print("Event Gold Role :", sorted(gold['string_role'])) print("Event Pred Role :", sorted(pred['string_role'])) results = dict() for task in task_dict: if task not in json.loads(read_file(pred_filename)[0]): continue scorer = task_dict[task] gold_list = [json.loads(line)[task] for line in read_file(gold_filename)] pred_list = [json.loads(line)[task] for line in read_file(pred_filename)] ########## 23-01-07 ill_formed = [json.loads(line)['statistic']['ill-formed'] for line in read_file(pred_filename)] assert len(pred_list) == len(gold_list) gold_instance_list = scorer.load_gold_list(gold_list) pred_instance_list = scorer.load_pred_list(pred_list) assert len(pred_instance_list) == len(gold_instance_list) sub_results = scorer.eval_instance_list( gold_instance_list=gold_instance_list, pred_instance_list=pred_instance_list, verbose=options.verbose, match_mode=options.match_mode, ) results.update(sub_results) result_list[data_key] += [results] if options.write_to_file: output_filename = "%s/%s" % (pred_folder, options.saved_file) write_to_file( result=results, output_filename=output_filename, prefix=data_key, ) if __name__ == "__main__": main()
这段代码是一个命令行工具,用于评估NLP任务的结果与金标准之间的差异。具体来说,它有以下功能:
- 从命令行参数中获取要评估的模型输出(predicted results)和金标准数据(golden dataset)。
- 支持对不同任务(entity、relation、event)的评估。
- 支持设置不同的评估模式,包括"set"、"normal"和"multimatch"等。
- 可以在评估结果中输出更多信息,以便进行更详细的分析。
- 可以将评估结果写入文件,保存到预测结果文件夹中。
主要的功能包括:
- 读取模型的预测结果和金标准数据文件。
- 对模型输出和金标准数据进行解析,提取相应任务(entity、relation、event)的信息。
- 调用评估工具类(
EntityScorer
、RelationScorer
、EventScorer
)对预测结果和金标准数据进行评估。 - 计算各项评估指标,包括精确度、召回率、F1分数等,并将结果输出到控制台或保存到文件中。
- 如果设置了"case"选项,还会输出详细的案例分析信息,包括模型的预测结果和金标准数据。
该工具主要用于评估NLP任务的结果,帮助研究人员和从业者了解模型的性能和改进方向。
extract_results.py:评估NLP任务中的模型预测结果,并将评估后的结果存储到指定的输出文件中
import argparse import json import random from tqdm import tqdm import subprocess from src.converters.get_converter import ConverterFactory import pandas as pd def eval(src_file, pred_file, save_file, job_type, schema_path, map_config_path, pred_key='generated_code'): src_d = pd.read_json(src_file, orient='records', lines=True) with open(pred_file, 'r') as f: pred_d = [] for line in f: data = json.loads(line.strip()) pred_d.append(data) converter = ConverterFactory.get_converter(job_type=job_type, schema_folder=schema_path, map_config_path=map_config_path) prediction_list = [] ill_formed = 0 invalid_label = 0 invalid_text_span = 0 invalid_label_asoc = 0 invalid_text_span_asoc = 0 for qid, src_data in tqdm(src_d.iterrows(), total=len(src_d)): pred = pred_d[qid] predictions = converter.output_to_structure(src_data, pred[pred_key]) if predictions['statistic']['ill-formed'] == True: ill_formed += 1 if predictions['statistic']['Invalid-Label'] == True: invalid_label += 1 if predictions['statistic']['Invalid-Text-Span'] == True: invalid_text_span += 1 if predictions['statistic']['Invalid-Label-asoc'] == True: invalid_label_asoc += 1 if predictions['statistic']['Invalid-Text-Span-asoc'] == True: invalid_text_span_asoc += 1 prediction_list.append(predictions) pd.DataFrame(prediction_list).to_json(save_file, orient='records', lines=True) print ("ill_formed number: ", ill_formed) print ("Invalid-Label number: ", invalid_label) print ("Invalid-Text-Span number: ", invalid_text_span) print ("Invalid-Label-asoc number: ", invalid_label_asoc) print ("Invalid-Text-Span-asoc number: ", invalid_text_span_asoc) def config(): parser = argparse.ArgumentParser() parser.add_argument('--raw_output_file', type=str) parser.add_argument('--output_file', type=str) parser.add_argument('--src_file', type=str) parser.add_argument('--job_type', type=str) parser.add_argument("--schema_path", type=str, required=True) parser.add_argument("--map_config_path", type=str, required=True) parser.add_argument("--pred_key", type=str, default='generated_code') args = parser.parse_args() return args if __name__ == "__main__": args = config() save_file = args.output_file eval(args.src_file, args.raw_output_file, save_file, args.job_type, args.schema_path, args.map_config_path, args.pred_key)
这段代码主要用于评估NLP任务中的模型预测结果,并将评估后的结果存储到指定的输出文件中。以下是其主要功能:
- 从命令行参数中读取各种输入文件和配置参数,包括原始输出文件(
raw_output_file
)、输出文件(output_file
)、数据源文件(src_file
)、任务类型(job_type
)、模式配置路径(schema_path
)、映射配置路径(map_config_path
)、预测键名(pred_key
)等。 - 通过
Converter
工厂类创建适当的转换器(如NER或RE),用于将原始模型预测结果转换为结构化的评估结果。 - 遍历数据源文件中的每个示例,对每个示例的原始模型预测结果进行结构化转换和评估。
- 计算各种评估指标,包括识别标签不合法(
Invalid-Label
)、文本跨度不合法(Invalid-Text-Span
)、标签关联不合法(Invalid-Label-asoc
)、文本跨度关联不合法(Invalid-Text-Span-asoc
)等。 - 统计不合法的预测结果数量,包括不合法的标签、文本跨度和它们的关联。
- 最后,将评估后的结果以JSON格式保存到输出文件中,并在控制台输出统计信息。
这段代码用于自动化评估模型预测结果,帮助研究人员和从业者了解模型性能,以便进行进一步的改进和分析。
src\eval\scorer.py:提供一个通用的评估框架,使用户能够方便地计算不同任务的评估指标
#!/usr/bin/env python # -*- coding:utf-8 -*- # adapted from https://raw.githubusercontent.com/universal-ie/UIE/main/uie/extraction/scorer.py from collections import defaultdict from copy import deepcopy from typing import Dict, List import sys def tuple_offset(offset): if isinstance(offset, tuple): return offset else: return tuple(offset) class Metric: """ Tuple Metric """ def __init__(self, verbose=False, match_mode='normal'): self.tp = 0. self.gold_num = 0. self.pred_num = 0. self.verbose = verbose self.match_mode = match_mode assert self.match_mode in {'set', 'normal', 'multimatch'} def __repr__(self) -> str: return f"tp: {self.tp}, gold: {self.gold_num}, pred: {self.pred_num}" @staticmethod def safe_div(a, b): if b == 0.: return 0. else: return a / b def compute_f1(self, prefix=''): tp = self.tp pred_num = self.pred_num gold_num = self.gold_num p, r = self.safe_div(tp, pred_num), self.safe_div(tp, gold_num) return {prefix + 'tp': tp, prefix + 'gold': gold_num, prefix + 'pred': pred_num, prefix + 'P': p * 100, prefix + 'R': r * 100, prefix + 'F1': self.safe_div(2 * p * r, p + r) * 100 } def count_instance(self, gold_list, pred_list): if self.match_mode == 'set': gold_list = set(gold_list) pred_list = set(pred_list) if self.verbose: print("Gold:", gold_list) print("Pred:", pred_list) self.gold_num += len(gold_list) self.pred_num += len(pred_list) self.tp += len(gold_list & pred_list) else: if self.verbose: print("Gold:", gold_list) print("Pred:", pred_list) self.gold_num += len(gold_list) self.pred_num += len(pred_list) if len(gold_list) > 0 and len(pred_list) > 0: # guarantee length same assert len(gold_list[0]) == len(pred_list[0]) dup_gold_list = deepcopy(gold_list) for pred in pred_list: if pred in dup_gold_list: self.tp += 1 if self.match_mode == 'normal': # Each Gold Instance can be matched one time dup_gold_list.remove(pred) def count_batch_instance(self, batch_gold_list, batch_pred_list): for gold_list, pred_list in zip(batch_gold_list, batch_pred_list): self.count_instance(gold_list=gold_list, pred_list=pred_list) class RecordMetric(Metric): """ 不考虑不同 Role 之间的顺序,例如事件论元""" @staticmethod def is_equal(gold, pred): if gold['type'] != pred['type']: return False if gold['spot'] != pred['spot']: return False if len(gold['asocs']) != len(pred['asocs']): return False for gold_role, pred_role in zip(sorted(gold['asocs']), sorted(pred['asocs'])): if gold_role != pred_role: return False return True def count_instance(self, gold_list, pred_list): if self.match_mode == 'set': raise NotImplementedError(f'{self.__class__.__name__} do not support the match model `set`') if self.verbose: print("Gold:", gold_list) print("Pred:", pred_list) self.gold_num += len(gold_list) self.pred_num += len(pred_list) gold_indexes = list(range(len(gold_list))) non_found = [True] * len(gold_list) for pred in pred_list: for gold_index in gold_indexes: if non_found[gold_index] and self.is_equal(gold_list[gold_index], pred): self.tp += 1 non_found[gold_index] = False if self.match_mode == 'normal': break class OrderedRecordMetric(RecordMetric): """ 考虑不同 Role 之间的顺序,例如关系 """ @staticmethod def is_equal(gold, pred): if gold['type'] != pred['type']: return False if gold['spot'] != pred['spot']: return False if len(gold['asocs']) != len(pred['asocs']): return False for gold_role, pred_role in zip(gold['asocs'], pred['asocs']): if gold_role != pred_role: return False return True def warning_tp_increment(gold, pred, prefix): sys.stderr.write(f"{prefix} TP Increment Warning, Gold Offset: {gold['offset']}\n") sys.stderr.write(f"{prefix} TP Increment Warning, Pred Offset: {pred['offset']}\n") sys.stderr.write(f"{prefix} TP Increment Warning, Gold String: {gold['string']}\n") sys.stderr.write(f"{prefix} TP Increment Warning, Pred String: {pred['string']}\n") sys.stderr.write(f"===============\n") class Scorer: @staticmethod def load_gold_list(gold_list, offset_key=None): raise NotImplementedError @staticmethod def load_pred_list(pred_list): raise NotImplementedError @staticmethod def eval_instance_list(gold_instance_list, pred_instance_list, verbose=False, match_mode='normal'): raise NotImplementedError class EntityScorer(Scorer): @staticmethod def load_gold_list(gold_list: List[List[Dict]]): """ Load gold instance to `string` and `offset` Args: gold_list (List[List[Dict]]): [description] [ [ {'type': 'Geo-political', 'offset': [7], 'text': 'seattle'}, {'type': 'Location', 'offset': [11], 'text': 'lot'}, {'type': 'Geo-political', 'offset': [14], 'text': 'city'} ], [...] ] Returns: List[Dict]: each instance has `offset` and `string` [ { 'offset': [('Geo-political', (7,)), ('Location', (11,)), ('Geo-political', (14,))], 'string': [('Geo-political', 'seattle'), ('Location', 'lot'), ('Geo-political', 'city')] }, {...}, ... ] """ gold_instance_list = [] for gold in gold_list: gold_offset = list() gold_string = list() for span in gold: span_label = span['type'] span_offset = span['offset'] span_text = span['text'] gold_offset += [(span_label, tuple_offset(span_offset))] gold_string += [(span_label, span_text)] gold_instance = { 'offset': gold_offset, 'string': gold_string, } gold_instance_list += [gold_instance] return gold_instance_list @staticmethod def load_pred_list(pred_list: List[Dict]): """[summary] Args: pred_list (List[Dict]): [description] [ { 'offset': [['Geo-political', [7]], ['Geo-political', [14]]], 'string': [['Geo-political', 'seattle'], ['Geo-political', 'city']] }, {...}, ] Returns: List[Dict] : each relation instance has `offset` and `string` [ { 'offset': [('Geo-political', (7,)), ('Geo-political', (14,))], 'string': [('Geo-political', 'seattle'), ('Geo-political', 'city')] } ] """ pred_instance_list = list() for pred in pred_list: for offset_pred in pred['offset']: if not isinstance(offset_pred[1], tuple): offset_pred[1] = tuple_offset(offset_pred[1]) pred['offset'] = [tuple_offset(p) for p in pred['offset']] pred['string'] = [tuple_offset(p) for p in pred['string']] pred_instance_list += [pred] return pred_instance_list @staticmethod def eval_instance_list(gold_instance_list: List[Dict], pred_instance_list: List[Dict], verbose=False, match_mode='normal'): """[summary] Args: gold_instance_list (List[Dict]): [description] [ { 'offset': [('Geo-political', (7,)), ('Location', (11,)), ('Geo-political', (14,))], 'string': [('Geo-political', 'seattle'), ('Location', 'lot'), ('Geo-political', 'city')] }, {...}, ... ] pred_instance_list (List[Dict]): [description] [ { 'offset': [('Geo-political', (7,)), ('Geo-political', (14,))], 'string': [('Geo-political', 'seattle'), ('Geo-political', 'city')] } ] verbose (bool, optional): [description]. Defaults to False. match_mode (string, optional): [description]. Defaults to `normal` . Returns: Dict: Result of Evaluation (offset, string) X (gold, pred, tp, P, R, F1) """ metrics = { 'string': Metric(verbose=verbose, match_mode=match_mode), 'offset': Metric(verbose=verbose, match_mode=match_mode), } for pred, gold in zip(pred_instance_list, gold_instance_list): pre_string_tp, pre_offset_tp = metrics['string'].tp, metrics['offset'].tp for eval_key in metrics: metrics[eval_key].count_instance( gold_list=gold.get(eval_key, []), pred_list=pred.get(eval_key, []) ) post_string_tp, post_offset_tp = metrics['string'].tp, metrics['offset'].tp if verbose and post_offset_tp - pre_offset_tp != post_string_tp - pre_string_tp: warning_tp_increment(gold=gold, pred=pred, prefix='Entity') results = dict() for eval_key in metrics: results.update(metrics[eval_key].compute_f1(prefix=eval_key + '-ent-')) return results class RelationScorer(Scorer): @staticmethod def load_gold_list(gold_list: List[List[Dict]]): """[summary] Args: gold_list (List[List[Dict]]): List of Sentece, each sentence contains a List of Relation Dict [ [ { 'type': 'Part-whole', 'args': [{'type': 'Location', 'offset': [11], 'text': 'lot'}, {'type': 'Geo-political', 'offset': [14], 'text': 'city'}] }, ... ], [...], ] Returns: List[Dict]: List of Sentece, each sentence contains two List (offset, string) of Relation Tuple [ { 'offset': [('Part-whole', 'Geo-political', (0,), 'Geo-political', (2,)), ... ], 'string': [('Part-whole', 'Geo-political', 'MULTAN', 'Geo-political', 'Pakistan'), ...] } ] """ gold_instance_list = [] for gold in gold_list: gold_instance = defaultdict(list) for record in gold: assert len(record['args']) == 2 gold_instance['offset'] += [( record['type'], record['args'][0]['type'], tuple_offset(record['args'][0]['offset']), record['args'][1]['type'], tuple_offset(record['args'][1]['offset']), )] gold_instance['string'] += [( record['type'], record['args'][0]['type'], record['args'][0]['text'], record['args'][1]['type'], record['args'][1]['text'], )] gold_instance_list += [gold_instance] return gold_instance_list @staticmethod def load_pred_list(pred_list): """[summary] Args: pred_list (List[Dict]): List of Sentece, each sentence contains two List (offset, string) of Relation List [ { 'offset': [['Part-whole', 'Geo-political', [0], 'Geo-political', [2]]], 'string': [['Part-whole', 'Geo-political', 'MULTAN', 'Geo-political', 'Pakistan']], }, ... ] Returns: List[Dict]: List of Sentece, each sentence contains two List (offset, string) of Relation Tuple [ { 'offset': [('Part-whole', 'Geo-political', (0,), 'Geo-political', (2,))], 'string': [('Part-whole', 'Geo-political', 'MULTAN', 'Geo-political', 'Pakistan')] }, ... ] """ pred_instance_list = list() for pred in pred_list: for offset_pred in pred['offset']: if not isinstance(offset_pred[2], tuple): offset_pred[2] = tuple_offset(offset_pred[2]) if not isinstance(offset_pred[4], tuple): offset_pred[4] = tuple_offset(offset_pred[4]) pred['offset'] = [tuple_offset(p) for p in pred['offset']] pred['string'] = [tuple_offset(p) for p in pred['string']] pred_instance_list += [pred] return pred_instance_list @staticmethod def eval_instance_list(gold_instance_list, pred_instance_list, verbose=False, match_mode='normal'): """[summary] Args: gold_instance_list (List[Dict]): List of Sentece, each sentence contains two List (offset, string) of Relation Tuple [ { 'offset': [('Part-whole', 'Geo-political', (0,), 'Geo-political', (2,)), ... ], 'string': [('Part-whole', 'Geo-political', 'MULTAN', 'Geo-political', 'Pakistan'), ...] } ] pred_instance_list ([type]): List of Sentece, each sentence contains two List (offset, string) of Relation Tuple [ { 'offset': [('Part-whole', 'Geo-political', (0,), 'Geo-political', (2,))], 'string': [('Part-whole', 'Geo-political', 'MULTAN', 'Geo-political', 'Pakistan')] }, ... ] verbose (bool, optional): Defaults to False. match_mode (string, optional): [description]. Defaults to `normal` . Returns: Dict: Result of Evaluation (offset, string) X (boundary, strict) X (gold, pred, tp, P, R, F1) """ # Span Boundary and Type metrics = { 'offset': Metric(verbose=verbose, match_mode=match_mode), 'string': Metric(verbose=verbose, match_mode=match_mode), } # Span Boundary Only boundary_metrics = { 'offset': Metric(verbose=verbose, match_mode=match_mode), 'string': Metric(verbose=verbose, match_mode=match_mode), } for pred, gold in zip(pred_instance_list, gold_instance_list): pre_string_tp, pre_offset_tp = metrics['string'].tp, metrics['offset'].tp for eval_key in metrics: # Span Boundary and Type metrics[eval_key].count_instance( gold_list=gold.get(eval_key, []), pred_list=pred.get(eval_key, []), ) post_string_tp, post_offset_tp = metrics['string'].tp, metrics['offset'].tp if verbose and (post_offset_tp - pre_offset_tp != post_string_tp - pre_string_tp): warning_tp_increment(gold=gold, pred=pred, prefix='Relation Strict') pre_string_tp, pre_offset_tp = boundary_metrics['string'].tp, boundary_metrics['offset'].tp for eval_key in boundary_metrics: # Span Boundary Only boundary_metrics[eval_key].count_instance( gold_list=[(x[0], x[2], x[4]) for x in gold.get(eval_key, [])], pred_list=[(x[0], x[2], x[4]) for x in pred.get(eval_key, [])], ) post_string_tp, post_offset_tp = boundary_metrics['string'].tp, boundary_metrics['offset'].tp if verbose and post_offset_tp - pre_offset_tp != post_string_tp - pre_string_tp: warning_tp_increment(gold=gold, pred=pred, prefix='Relation Boundary') results = dict() for eval_key in metrics: results.update(metrics[eval_key].compute_f1(prefix=eval_key + '-rel-strict-')) for eval_key in boundary_metrics: results.update(boundary_metrics[eval_key].compute_f1(prefix=eval_key + '-rel-boundary-')) return results class EventScorer(Scorer): @staticmethod def load_gold_list(gold_list): """[summary] Args: gold_list (List[List[Dict]]): List of Sentece, each sentence contains a List of Event Dict [ [ # Sentance { # Event Record 'type': 'Die', 'offset': [16], 'text': 'shot', 'args': [ {'type': 'Victim', 'offset': [17], 'text': 'himself'}, {'type': 'Agent', 'offset': [5, 6], 'text': 'John Joseph'}, {'type': 'Place', 'offset': [23], 'text': 'court'} ] }, ] ] Returns: List[Dict]: List of Sentece, each sentence contains Four List of Event Tuple [ { 'offset_trigger': [('Die', (16,)), ('Convict', (30,))], 'string_trigger': [('Die', 'shot'), ('Convict', 'convicted')], 'offset_role': [('Die', 'Victim', (17,)), ('Die', 'Agent', (5, 6)), ('Die', 'Place', (23,))], 'string_role': [('Die', 'Victim', 'himself'), ('Die', 'Agent', 'John Joseph'), ('Die', 'Place', 'court')] }, ... ] """ gold_instance_list = [] for gold in gold_list: gold_instance = defaultdict(list) for record in gold: gold_instance['offset_trigger'] += [(record['type'], tuple_offset(record['offset']))] gold_instance['string_trigger'] += [(record['type'], record['text'])] for arg in record['args']: gold_instance['offset_role'] += [(record['type'], arg['type'], tuple_offset(arg['offset']))] gold_instance['string_role'] += [(record['type'], arg['type'], arg['text'])] gold_instance_list += [gold_instance] return gold_instance_list @staticmethod def load_pred_list(pred_list): """[summary] Args: pred_list (List[Dict]): List of Sentece, each sentence contains two List (offset, string) of Event List [ { 'offset': [{'type': 'Attack', 'roles': [['Attacker', [5, 6]], ['Place', [23]], ['Target', [17]]], 'trigger': [16]}], 'string': [{'roles': [['Attacker', 'John Joseph'], ['Place', 'court'], ['Target', 'himself']], 'type': 'Attack', 'trigger': 'shot'}], }, ... ] Returns: List[Dict]: List of Sentece, each sentence contains four List (offset, string) X (trigger, role) of Event List [ { 'offset_trigger': [('Attack', (16,))], 'offset_role': [('Attack', 'Attacker', (5, 6)), ('Attack', 'Place', (23,)), ('Attack', 'Target', (17,))], 'string_trigger': [('Attack', 'shot')], 'string_role': [('Attack', 'Attacker', 'John Joseph'), ('Attack', 'Place', 'court'), ('Attack', 'Target', 'himself')], }, ... ] """ pred_instance_list = list() for pred in pred_list: pred_instance = defaultdict(list) for offset_pred in pred['offset']: event_type, trigger_offset = offset_pred['type'], tuple_offset(offset_pred['trigger']) pred_instance['offset_trigger'] += [(event_type, trigger_offset)] for role_type, role_offset in offset_pred['roles']: pred_instance['offset_role'] += [(event_type, role_type, tuple_offset(role_offset))] for string_pred in pred['string']: event_type, trigger_string = string_pred['type'], string_pred['trigger'] pred_instance['string_trigger'] += [(event_type, trigger_string)] for role_type, role_string in string_pred['roles']: pred_instance['string_role'] += [(event_type, role_type, role_string)] pred_instance_list += [pred_instance] return pred_instance_list @staticmethod def eval_instance_list(gold_instance_list, pred_instance_list, verbose=False, match_mode='normal'): """[summary] Args: gold_instance_list (List[Dict]): List of Sentece, each sentence contains Four List of Event Tuple [ { 'offset_trigger': [('Die', (16,)), ('Convict', (30,))], 'string_trigger': [('Die', 'shot'), ('Convict', 'convicted')], 'offset_role': [('Die', 'Victim', (17,)), ('Die', 'Agent', (5, 6)), ('Die', 'Place', (23,))], 'string_role': [('Die', 'Victim', 'himself'), ('Die', 'Agent', 'John Joseph'), ('Die', 'Place', 'court')] }, ... ] pred_instance_list (List[Dict]): List of Sentece, each sentence contains four List (offset, string) X (trigger, role) of Event List [ { 'offset_trigger': [('Attack', (16,))], 'offset_role': [('Attack', 'Attacker', (5, 6)), ('Attack', 'Place', (23,)), ('Attack', 'Target', (17,))], 'string_trigger': [('Attack', 'shot')], 'string_role': [('Attack', 'Attacker', 'John Joseph'), ('Attack', 'Place', 'court'), ('Attack', 'Target', 'himself')], }, ... ] verbose (bool, optional): [description]. Defaults to False. match_mode (string, optional): [description]. Defaults to `normal`. Returns: Dict: Result of Evaluation (offset, string) X (trigger, role) X (gold, pred, tp, P, R, F1) """ trigger_metrics = { 'offset': Metric(verbose=verbose, match_mode=match_mode), 'string': Metric(verbose=verbose, match_mode=match_mode), } role_metrics = { 'offset': Metric(verbose=verbose, match_mode=match_mode), 'string': Metric(verbose=verbose, match_mode=match_mode), } for pred, gold in zip(pred_instance_list, gold_instance_list): pre_string_tp, pre_offset_tp = trigger_metrics['string'].tp, trigger_metrics['offset'].tp for eval_key in trigger_metrics: trigger_metrics[eval_key].count_instance( gold_list=gold.get(eval_key + '_trigger', []), pred_list=pred.get(eval_key + '_trigger', []) ) post_string_tp, post_offset_tp = trigger_metrics['string'].tp, trigger_metrics['offset'].tp if verbose and post_offset_tp - pre_offset_tp != post_string_tp - pre_string_tp: warning_tp_increment(gold=gold, pred=pred, prefix='Trigger') pre_string_tp, pre_offset_tp = role_metrics['string'].tp, role_metrics['offset'].tp for eval_key in role_metrics: role_metrics[eval_key].count_instance( gold_list=gold.get(eval_key + '_role', []), pred_list=pred.get(eval_key + '_role', []) ) post_string_tp, post_offset_tp = role_metrics['string'].tp, role_metrics['offset'].tp if verbose and post_offset_tp - pre_offset_tp != post_string_tp - pre_string_tp: warning_tp_increment(gold=gold, pred=pred, prefix='Role') results = dict() for eval_key in trigger_metrics: results.update(trigger_metrics[eval_key].compute_f1(prefix=f'{eval_key}-evt-trigger-')) for eval_key in role_metrics: results.update(role_metrics[eval_key].compute_f1(prefix=f'{eval_key}-evt-role-')) return results
prompt
constants.py
END = "# END" END_LINE = "\n----------------------------------------"
这段代码定义了一个评估(scoring)模块,用于计算不同任务(如实体识别、关系抽取、事件抽取)的评估指标。下面是代码的主要组成部分:
Metric
类:这是一个通用的评估指标类,用于计算真阳性(true positives,tp)、金标样本数(gold_num)、预测样本数(pred_num)以及相关的评估指标,如精确度(P)、召回率(R)和 F1 分数。safe_div
方法用于进行除法计算,以避免除以零的情况。compute_f1
方法计算 F1 分数。count_instance
方法用于计算指标的真阳性、金标样本数和预测样本数。RecordMetric
类:这是继承自Metric
类的一个子类,用于处理不考虑不同角色之间的顺序的任务(例如事件论元抽取)。它包括了一个额外的is_equal
方法,用于判断金标和预测是否相等。OrderedRecordMetric
类:这是继承自RecordMetric
类的子类,用于处理考虑不同角色之间的顺序的任务(例如关系抽取)。它重写了is_equal
方法,以考虑不同角色之间的顺序。Scorer
类:这是一个基本的评估类,定义了三个静态方法load_gold_list
、load_pred_list
和eval_instance_list
,分别用于加载金标样本、预测样本,以及计算评估指标。子类可以继承这个类并实现这些方法来适应不同的任务。EntityScorer
、RelationScorer
和EventScorer
类:这些类分别用于实体识别、关系抽取和事件抽取任务。它们通过继承Scorer
类,实现了加载金标样本和预测样本的方法,以及计算相应任务的评估指标的方法。
这段代码的目的是提供一个通用的评估框架,使用户能够方便地计算不同任务的评估指标,例如 F1 分数。根据任务的特点,可以选择适当的评估类来加载数据并计算指标。
make_prompt.py:生成提示文本(prompt)用于模型的训练
import pandas as pd import os import shutil import random import argparse from collections import defaultdict import json import sys from src.prompt.constants import END from src.utils.record_schema import RecordSchema def make_prompt(file_path: str, out_file, n_examples, seed: int = 0): random.seed(seed) data = [json.loads(line.strip()) for line in open(file_path)] if n_examples != -1: samples = random.sample(data, n_examples) else: samples = data random.shuffle(samples) prompt = "" for sample in samples: prompt += sample["reference_output"] prompt += f"{END}\n\n" with open(out_file,'w',encoding='utf-8') as fout: fout.write(prompt) print ("saved prompt to ", out_file) return 0 def main(): parser = argparse.ArgumentParser() parser.add_argument('-src', help='Source File Name', required=True) parser.add_argument('-tgt', help='Target File Name, n shot sampled', required=True) parser.add_argument('-schema_file', help='schema_file', required=True) parser.add_argument('-task', help='N-Shot Task name', required=True, choices=['entity', 'relation', 'event']) parser.add_argument('-n_examples', help='n_examples',type=int) parser.add_argument('-seed', help='Default is None, no random') parser.add_argument('-min_len', dest='min_len', help='Default is None', type=int) options = parser.parse_args() source_file = options.src target_file = options.tgt make_prompt(file_path=source_file, out_file=target_file, n_examples=options.n_examples) if __name__ == "__main__": main()
这段代码的主要功能是生成提示文本(prompt)用于模型的训练。以下是代码的主要步骤和功能:
- 代码通过命令行参数接受输入文件的路径、输出文件的路径、生成的提示文本的样本数(
n_examples
)、种子值(seed
)、以及其他必要参数。 - 从输入文件中读取数据,数据以JSON格式存储。可以选择从中随机采样一定数量的样本,也可以使用所有的样本。
- 随机打乱数据的顺序。
- 创建一个空的提示文本字符串。
- 对每个样本,将样本的
reference_output
字段添加到提示文本字符串中,同时在每个样本之间添加特定标记(END
)以分隔不同的样本。 - 将生成的提示文本字符串写入输出文件中。
- 输出提示文本的保存路径,以便用户查看。
这段代码通常用于生成用于训练NLP模型的提示文本,可以从数据中随机选择一些样本,并按一定格式组织成提示文本,以供后续模型训练使用。
src\prompt\make_task_file.py:将给定的数据转换成适用于模型训练的任务文件
import pandas as pd from tqdm import tqdm from src.converters.get_converter import ConverterFactory from src.utils.file_utils import load_yaml, load_schema, read_data def make_task_file(args): data = read_data(args.inpath) converter = ConverterFactory.get_converter(args.job_type,schema_folder=args.schema_path, map_config_path=args.map_config_path) res = [] for i, row in tqdm(data.iterrows(), total=len(data)): try: struct_input = converter.structure_to_input(row, prompt_part_only=False) if struct_input is None: continue tmp = {k: v for (k, v) in row.items() if k not in ['record']} tmp["input_idx"] = i tmp["input_prompt"] = converter.structure_to_input(row, prompt_part_only=True) tmp["reference_output"] = struct_input except Exception as e: raise e res.append(tmp) # successfully converted conversion_rate = len(res) / len(data) pd.DataFrame(res).to_json(args.outpath, orient='records', lines=True) print(f"Converted {len(res)} out of {len(data)} rows ({conversion_rate:.2%})") print ("Saved to ", args.outpath) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument("--inpath", type=str, required=True) parser.add_argument("--outpath", type=str, required=True) parser.add_argument("--job_type", type=str, required=True) parser.add_argument("--schema_path", type=str, required=True) parser.add_argument("--map_config_path", type=str, required=True) args = parser.parse_args() make_task_file(args)
这段代码的主要功能是将给定的数据转换成适用于模型训练的任务文件。以下是代码的主要步骤和功能:
- 代码通过命令行参数接受输入文件的路径、输出文件的路径、任务类型(
job_type
)、模式配置文件的路径(schema_path
)、映射配置文件的路径(map_config_path
),以及其他必要参数。 - 从输入文件中读取数据,数据通常以DataFrame格式存储。
- 利用
ConverterFactory
从给定的任务类型和配置文件加载适当的转换器(例如,NER或RE转换器)。 - 对于数据中的每一行,调用转换器的
structure_to_input
方法将原始数据转换为适用于模型训练的输入。此外,还提取一些其他字段,如索引、输入提示等,用于创建任务文件的元信息。 - 如果数据的转换成功,将转换后的数据添加到结果列表中。
- 最后,计算成功转换的数据比例,并将结果保存为JSON格式的任务文件,以供后续模型训练使用。
总的来说,这段代码用于将原始数据转换为可用于不同NLP任务(如NER、RE等)的任务文件格式,以便模型能够理解和学习这些任务。生成的任务文件包含了输入、输出和其他元信息,以便于后续的训练和评估。
utils
file_utils.py:加载数据和配置信息,以便进行任务的转换和生成
import yaml import pandas as pd def load_schema(schema_path): with open(schema_path,encoding='utf8') as fin: entity_line = fin.readline().strip() relation_line = fin.readline().strip() spot_asoc_line = fin.readline().strip() return {'entity_schema': eval(entity_line), 'relation_schema': eval(relation_line), 'spot_asoc_schema': eval(spot_asoc_line)} def load_yaml(yaml_path): with open(yaml_path,'r') as fin: map_config = yaml.load(fin.read(), Loader=yaml.FullLoader) return map_config def read_data(inpath): if "json" in inpath: data = pd.read_json(inpath, orient='records', lines=True) else: raise ValueError(f"Unknown input format: {inpath}") return data
这段代码是一组用于加载数据和配置文件的辅助函数,包括:
load_schema(schema_path)
函数用于加载给定路径的模式(schema)文件。该文件通常包含了实体、关系和关联标签的信息。函数读取文件的前三行,分别包含了实体模式、关系模式和关联标签模式的定义,然后返回这些模式的字典。load_yaml(yaml_path)
函数用于加载给定路径的YAML格式配置文件。它打开指定路径的文件,使用PyYAML库加载文件内容,并返回加载后的配置字典。read_data(inpath)
函数用于加载数据文件,支持JSON格式的数据文件。它检查文件类型,如果文件类型为JSON,它使用Pandas库的pd.read_json
函数加载文件内容为DataFrame,并以行的方式解释为记录。函数返回加载后的数据作为DataFrame。
这些函数用于在主要代码中加载数据和配置信息,以便进行任务的转换和生成。例如,load_schema
函数加载了任务所需的模式信息,而load_yaml
函数加载了映射配置文件。 read_data
函数用于加载任务的输入数据。这些辅助函数有助于使主要代码更模块化和易于维护。
record_schema.py:管理任务的记录模式信息,以便在任务处理过程中进行合并和存储
#!/usr/bin/env python # -*- coding:utf-8 -*- import json from collections import defaultdict from typing import List class RecordSchema: def __init__(self, type_list, role_list, type_role_dict): self.type_list = type_list self.role_list = role_list self.type_role_dict = type_role_dict @staticmethod def read_from_file(filename): lines = open(filename).readlines() type_list = json.loads(lines[0]) role_list = json.loads(lines[1]) type_role_dict = json.loads(lines[2]) return RecordSchema(type_list, role_list, type_role_dict) def write_to_file(self, filename): with open(filename, 'w') as output: output.write(json.dumps(self.type_list, ensure_ascii=False) + '\n') output.write(json.dumps(self.role_list, ensure_ascii=False) + '\n') output.write(json.dumps(self.type_role_dict, ensure_ascii=False) + '\n') def merge_schema(schema_list: List[RecordSchema]): type_set = set() role_set = set() type_role_dict = defaultdict(list) for schema in schema_list: for type_name in schema.type_list: type_set.add(type_name) for role_name in schema.role_list: role_set.add(role_name) for type_name in schema.type_role_dict: type_role_dict[type_name] += schema.type_role_dict[type_name] for type_name in type_role_dict: type_role_dict[type_name] = list(set(type_role_dict[type_name])) return RecordSchema(type_list=list(type_set), role_list=list(role_set), type_role_dict=type_role_dict, )
这段代码定义了一个名为 RecordSchema
的类,该类用于管理任务的记录模式(schema)。记录模式通常包括实体类型(type)、关系角色(role)和类型-角色字典(type_role_dict)。
__init__
方法初始化RecordSchema
类的实例,需要传入类型列表(type_list
)、角色列表(role_list
)和类型-角色字典(type_role_dict
)。read_from_file
方法从文件中读取记录模式的定义。它读取文件的前三行,分别包含类型列表、角色列表和类型-角色字典的定义,并使用这些信息创建RecordSchema
的实例。write_to_file
方法将记录模式的定义写入文件。它将类型列表、角色列表和类型-角色字典的信息写入文件的三行中。
此外,代码还定义了一个名为 merge_schema
的函数,用于合并多个记录模式。它接受一个记录模式列表 schema_list
,并合并这些记录模式的类型、角色和类型-角色字典信息,最终返回一个合并后的 RecordSchema
实例。
这些工具函数和类用于管理任务的记录模式信息,以便在任务处理过程中进行合并和存储。