def query_codex:向Codex(AI引擎)发出查询,以获取生成的代码
def query_codex(task: dict, prompt_text: str, engine: str, max_tokens: int): prompt = f"{prompt_text} {task['input_prompt']}" response = OpenaiAPIWrapper.call( prompt=prompt, max_tokens=max_tokens, engine=engine) return response
这段代码定义了一个名为query_codex
的函数,用于向Codex(可能是GPT-3或其他类似的AI引擎)发出查询以获取生成的代码。以下是函数的主要参数和功能:
task
(dict):任务的输入,是一个包含任务信息的字典。prompt_text
(str):用于查询的提示文本。engine
(str):用于执行任务的引擎名称。max_tokens
(int):生成的代码的最大令牌数。
函数的主要功能如下:
- 构建完整的提示文本,将输入提示(
task['input_prompt']
)附加到传入的提示文本(prompt_text
)后面。 - 使用
OpenaiAPIWrapper
中的call
方法,向Codex引擎发送查询请求,传递生成的提示文本、最大令牌数(max_tokens
)和引擎名称(engine
)。 - 函数将Codex的响应作为结果返回。
这个函数的主要目的是执行查询,以获取与给定任务和提示相关的生成代码。它使用提供的引擎和参数来发出查询请求,并将Codex的响应返回供后续处理。
def get_completed_code:从Codex的响应中提取生成的代码,并将其与任务的输入提示合并,以获得完整的生成代码
def get_completed_code(task: dict, codex_response: dict) -> str: completed_code = OpenaiAPIWrapper.parse_response(codex_response) all_code = f"{task['input_prompt']}{completed_code}" # NOTE: space is already taken care of, no need to add it again, otherwise indentation will be off return all_code
这段代码定义了一个名为get_completed_code
的函数,用于从Codex的响应中提取生成的代码。以下是函数的主要参数和功能:
task
(dict):任务的输入,是一个包含任务信息的字典。codex_response
(dict):Codex的响应,可能包含生成的代码。
函数的主要功能如下:
- 使用
OpenaiAPIWrapper
中的parse_response
方法,从Codex的响应(codex_response
)中提取生成的代码,并将其存储在completed_code
变量中。 - 将生成的代码与任务的输入提示(
task['input_prompt']
)合并,以获得完整的代码。这是通过将输入提示和生成的代码连接在一起来实现的,不需要额外的空格或缩进。 - 返回包含完整代码的字符串(
all_code
)。
这个函数的目的是从Codex的响应中提取生成的代码,并将其与任务的输入提示合并,以获得完整的生成代码。这个生成的代码可以在后续任务中使用或记录下来。
def get_request_per_minute:根据已经发出的请求数和经过的时间来计算每分钟的请求速率,以便在维持请求速率时使用
def get_request_per_minute(num_request: int, begin_time: float) -> float: elapsed_time = time.time() - begin_time request_per_minute = (num_request / elapsed_time) * 60 return request_per_minute
这段代码定义了一个名为get_request_per_minute
的函数,用于计算每分钟的请求速率。以下是函数的主要参数和功能:
num_request
(int):已经发出的请求数。begin_time
(float):开始计时的时间。
函数的主要功能如下:
- 计算从
begin_time
开始到当前时间的经过的时间(elapsed_time
),使用time.time()
函数来获取当前时间戳。 - 使用已发出的请求数(
num_request
)除以经过的时间(elapsed_time
),然后乘以60,以得到每分钟的请求速率。 - 返回计算出的请求速率(
request_per_minute
)。
这个函数的目的是根据已经发出的请求数和经过的时间来计算每分钟的请求速率,以便在维持请求速率时使用。
主函数:(通用模版)从命令行传递参数来配置任务的执行,然后根据这些参数来执行任务
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--task_file_path", type=str, required=True) parser.add_argument("--num_tasks", type=int, required=True) parser.add_argument("--start_idx", type=int, required=True) parser.add_argument("--output_file_path", type=str, required=True) parser.add_argument("--prompt_path", type=str, required=False, default=None) parser.add_argument("--job_type", type=str, required=True, choices=ConverterFactory.supported_converters) parser.add_argument("--keep_writing_output", action="store_true", default=True) parser.add_argument("--engine", type=str, required=True) parser.add_argument("--max_requests_per_min", type=int, default=10) parser.add_argument("--max_tokens", type=int, default=280) parser.add_argument("--schema_path", type=str, required=True) parser.add_argument("--map_config_path", type=str, required=True) parser.add_argument("--start_cut_num", type=int, default=0) args = parser.parse_args() run(task_file_path=args.task_file_path, num_tasks=args.num_tasks,start_idx=args.start_idx, output_file_path=args.output_file_path, prompt_path=args.prompt_path, keep_writing_output=args.keep_writing_output, engine=args.engine, max_requests_per_min=args.max_requests_per_min, max_tokens=args.max_tokens,schema_path=args.schema_path, map_config_path=args.map_config_path,start_cut_num=args.start_cut_num)
这段代码是一个主程序入口,用于执行任务。它使用argparse
模块来解析命令行参数,根据用户提供的参数来调用run
函数,执行任务。以下是它的主要功能和参数:
- 使用
argparse
创建一个命令行解析器(parser
)。 - 添加一系列命令行参数,包括:
task_file_path
:任务文件的路径。num_tasks
:任务的数量。start_idx
:任务的起始索引。output_file_path
:输出文件的路径。prompt_path
:提示文件的路径(可选参数,默认为None
)。job_type
:作业类型,从支持的作业类型中选择。keep_writing_output
:一个布尔标志,指示是否保持写入输出(默认为True
)。engine
:用于执行任务的引擎名称。max_requests_per_min
:每分钟的最大请求数(默认为10)。max_tokens
:生成的代码的最大令牌数(默认为280)。schema_path
:模式文件的路径。map_config_path
:映射配置文件的路径。start_cut_num
:一个整数,用于指示从输入提示中删除的示例数量(默认为0)。
- 使用
parser.parse_args()
解析命令行参数,并将结果存储在args
变量中。 - 调用
run
函数,传递解析后的参数,以执行任务。
这个主程序入口的目的是允许用户从命令行传递参数来配置任务的执行,然后根据这些参数来执行任务。这是一个通用的模板,可以根据不同的需求和任务来定制。
converters
ner
structure2nl_sel_v1.py:将文本结构化成适用于自然语言处理任务的输入数据,然后将模型的输出结果转换回结构化的数据
import json import re from collections import OrderedDict from typing import List, Union, Dict, Tuple from src.converters.structure_converter import StructureConverter from src.converters.record import EntityRecord, RelationRecord from src.utils.file_utils import load_yaml,load_schema from uie.sel2record.record import MapConfig from uie.sel2record.sel2record import SEL2Record class NLSELPromptCreator(StructureConverter): def __init__(self, schema_folder=None, map_config_path=None): self.schema_dict = SEL2Record.load_schema_dict(schema_folder) self.decoding = 'spotasoc' record_schema = self.schema_dict['record'] self.entity_schema = record_schema.type_list self.relation_schema = record_schema.role_list self.spot_asoc = record_schema.type_role_dict self.map_config = MapConfig.load_from_yaml(map_config_path) def structure_to_input(self, input_dict: dict, prompt_part_only: bool = False): """ {'text': 'CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .', 'entity': [{'type': 'organization', 'offset': [2], 'text': 'LEICESTERSHIRE'}], 'spot': ['organization'], """ text = input_dict['text'] record = input_dict['record'] prompt = [] input = ['The text is : ', "\"" + text + "\". ", "The named entities in the text: " ] prompt.extend(input) if prompt_part_only: return ''.join(prompt) return ''.join(prompt) + '\n' def output_to_structure(self, input_dict, output_str): """ sample: {'text': 'CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .', 'tokens': ['CRICKET', '-', 'LEICESTERSHIRE', 'TAKE', 'OVER', 'AT', 'TOP', 'AFTER', 'INNINGS', 'VICTORY', '.'], 'record': '<extra_id_0> <extra_id_0> organization <extra_id_5> LEICESTERSHIRE <extra_id_1> <extra_id_1>', 'entity': [{'type': 'organization', 'offset': [2], 'text': 'LEICESTERSHIRE'}], 'relation': [], 'event': [], 'spot': ['organization'], 'asoc': [], 'spot_asoc': [{'span': 'LEICESTERSHIRE', 'label': 'organization', 'asoc': []}]} code: The text is : "CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .". Find named entities such as organization, person, miscellaneous, location in the text. The organization "LEICESTERSHIRE" exist in the text. :param sample: :param code: :return: """ text = input_dict['text'] tokens = input_dict['tokens'] sel2record = SEL2Record( schema_dict=self.schema_dict, decoding_schema=self.decoding, map_config=self.map_config, ) pattern = re.compile(r"The named entities in the text:\s*(.*)") pred = re.search(pattern, output_str).group(1) # print ("pred: ") # print (pred) pred_record = sel2record.sel2record(pred, text, tokens) return pred_record if __name__ == "__main__": schema_folder = 'data/conll03' map_config_path = 'config/offset_map/first_offset_en.yaml' val_path = 'data/conll03/val.json' with open(val_path) as fin: line = fin.readline() line = eval(line.strip()) data = line # print ("dev data:\n", data) converter = NLSELPromptCreator(schema_folder=schema_folder, map_config_path=map_config_path) # convert the whole sample prompt = converter.structure_to_input(data, prompt_part_only=False) # print ("prompt:\n", prompt) # we have to provide the init state to the sample # prompt = converter.generate_sample_head(data) # print("sample head: ", prompt) code = """ The text is : "CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .". The named entities in the text: <extra_id_0> <extra_id_0> organization <extra_id_5> LEICESTERSHIRE <extra_id_1> <extra_id_1> """ data = {"text":"Enterprises from domestic coastal provinces and cities increased , and there are altogether 30 enterprise representatives from 30 provinces , cities and autonomous regions coming to this meeting .","tokens":["Enterprises","from","domestic","coastal","provinces","and","cities","increased",",","and","there","are","altogether","30","enterprise","representatives","from","30","provinces",",","cities","and","autonomous","regions","coming","to","this","meeting","."],"entity":[{"type":"geographical social political","offset":[2,3,4,5,6],"text":"domestic coastal provinces and cities"},{"type":"geographical social political","offset":[17,18,19,20,21,22,23],"text":"30 provinces , cities and autonomous regions"},{"type":"organization","offset":[0,1,2,3,4,5,6],"text":"Enterprises from domestic coastal provinces and cities"},{"type":"person","offset":[12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27],"text":"altogether 30 enterprise representatives from 30 provinces , cities and autonomous regions coming to this meeting"}],"relation":[],"event":[],"spot":["person","organization","geographical social political"],"asoc":[],"spot_asoc":[{"span":"Enterprises from domestic coastal provinces and cities","label":"organization","asoc":[]},{"span":"domestic coastal provinces and cities","label":"geographical social political","asoc":[]},{"span":"altogether 30 enterprise representatives from 30 provinces , cities and autonomous regions coming to this meeting","label":"person","asoc":[]},{"span":"30 provinces , cities and autonomous regions","label":"geographical social political","asoc":[]}]} code = r'The text is : \"Enterprises from domestic coastal provinces and cities increased , and there are altogether 30 enterprise representatives from 30 provinces , cities and autonomous regions coming to this meeting .\". The named entities in the text: <extra_id_0> <extra_id_0> organization <extra_id_5> Enterprises from domestic coastal provinces and cities <extra_id_1> <extra_id_0> geographical social political <extra_id_5> domestic coastal provinces and cities <extra_id_1> <extra_id_0> person <extra_id_5> altogether 30 enterprise representatives from 30 provinces , cities and autonomous regions <extra_id_1> <extra_id_0> geographical social political <extra_id_5> provinces , cities and autonomous regions <extra_id_1> <extra_id_0> geographical social political <extra_id_5> provinces <extra_id_1> <extra_id_0> geographical social political <extra_id_5> cities <extra_id_1> <extra_id_0> geographical social political <extra_id_5> autonomous regions <extra_id_1> <extra_id_0> person <extra_id_5> this meeting <extra_id_1> <extra_id_1>\n' # conver the prediction to the answers predictions = converter.output_to_structure(data, code) print (predictions)
这段代码主要用于将文本结构化成适用于自然语言处理任务的输入数据,然后将模型的输出结果转换回结构化的数据。代码的核心部分包括以下功能:
- 导入必要的库和模块,包括
json
、re
、collections
、typing
等。还导入了一些自定义的模块,例如StructureConverter
和其他用于处理数据的模块。 - 定义了一个名为
NLSELPromptCreator
的类,该类继承自StructureConverter
,用于将文本数据结构化为适用于自然语言处理任务的输入,并将模型的输出结果还原为结构化的数据。 - 类中的
__init__
方法用于初始化类的属性,包括加载模式文件、解码模式、实体模式、关系模式和其他配置信息。 structure_to_input
方法用于将输入数据结构化为适用于模型的输入格式。它接受输入字典和一个布尔参数prompt_part_only
,根据输入数据生成用于模型的提示文本。output_to_structure
方法用于将模型的输出结果还原为结构化的数据。它接受输入字典和模型的输出字符串,并使用自定义的SEL2Record
类进行解析,以还原结构化数据。- 在
if __name__ == "__main__":
部分,脚本展示了如何使用NLSELPromptCreator
类来处理输入数据并还原模型的输出结果。具体来说,它加载了模式文件、配置文件和示例数据,然后调用structure_to_input
方法生成模型输入的提示文本,接着将模型的输出结果传递给output_to_structure
方法,以还原结构化数据。
总的来说,这段代码是用于数据处理和转换的工具,特别适用于将自然语言文本转化为适用于特定任务的输入格式,以及将模型的输出结果还原为结构化数据。这对于自然语言处理任务中的数据预处理和后处理非常有用。
structure2nl_sel_v2.py:将文本结构化成适用于自然语言处理任务的输入数据,然后将模型的输出结果转换回结构化的数据
import json import re from collections import OrderedDict from typing import List, Union, Dict, Tuple import numpy as np from src.converters.structure_converter import StructureConverter from src.converters.record import EntityRecord, RelationRecord from src.utils.file_utils import load_yaml,load_schema from uie.sel2record.record import MapConfig from uie.sel2record.sel2record import SEL2Record class NLSELPromptCreator(StructureConverter): def __init__(self, schema_folder=None, map_config_path=None): self.schema_dict = SEL2Record.load_schema_dict(schema_folder) self.decoding = 'spotasoc' record_schema = self.schema_dict['record'] self.entity_schema = record_schema.type_list self.relation_schema = record_schema.role_list self.spot_asoc = record_schema.type_role_dict self.map_config = MapConfig.load_from_yaml(map_config_path) def structure_to_input(self, input_dict: dict, prompt_part_only: bool = False): """ {'text': 'CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .', 'entity': [{'type': 'organization', 'offset': [2], 'text': 'LEICESTERSHIRE'}], 'spot': ['organization'], """ text = input_dict['text'] record = input_dict['record'] prompt = [] input = ['The text is : ', "\"" + text + "\". ", "The named entities in the text: " ] prompt.extend(input) if prompt_part_only: return ''.join(prompt) record = record.replace('extra_id_','') prompt.append(record) return ''.join(prompt) + '\n' def existing_nested(self, entity_dict_list): entity_offset = [] for ent in entity_dict_list: tmp_offset = ent['offset'] entity_offset.append(tmp_offset) sorted_offset = sorted(entity_offset) start = -1 end = -1 for so in sorted_offset: temp_s, temp_e = so[0],so[-1] if temp_s <= end: return True start = temp_s end = temp_e return False def output_to_structure(self, input_dict, output_str): """ sample: {'text': 'CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .', 'tokens': ['CRICKET', '-', 'LEICESTERSHIRE', 'TAKE', 'OVER', 'AT', 'TOP', 'AFTER', 'INNINGS', 'VICTORY', '.'], 'record': '<extra_id_0> <extra_id_0> organization <extra_id_5> LEICESTERSHIRE <extra_id_1> <extra_id_1>', 'entity': [{'type': 'organization', 'offset': [2], 'text': 'LEICESTERSHIRE'}], 'relation': [], 'event': [], 'spot': ['organization'], 'asoc': [], 'spot_asoc': [{'span': 'LEICESTERSHIRE', 'label': 'organization', 'asoc': []}]} code: The text is : "CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .". Find named entities such as organization, person, miscellaneous, location in the text. The organization "LEICESTERSHIRE" exist in the text. :param sample: :param code: :return: """ text = input_dict['text'] tokens = input_dict['tokens'] entity = input_dict['entity'] exist_nested = self.existing_nested(entity) sel2record = SEL2Record( schema_dict=self.schema_dict, decoding_schema=self.decoding, map_config=self.map_config, ) pattern = re.compile(r"The named entities in the text:\s*(.*)") pred = re.search(pattern, output_str).group(1) pred = pred.strip() # # print ("text: ", text) # print ("output_str: ", output_str) # print ("pred: ", pred) pred_record = sel2record.sel2record(pred, text, tokens) pred_record['statistic']['complex'] = exist_nested return pred_record if __name__ == "__main__": schema_folder = 'data/conll03' map_config_path = 'config/offset_map/first_offset_en.yaml' val_path = 'data/conll03/val.json' with open(val_path) as fin: line = fin.readline() line = eval(line.strip()) data = line # print ("dev data:\n", data) converter = NLSELPromptCreator(schema_folder=schema_folder, map_config_path=map_config_path) # convert the whole sample prompt = converter.structure_to_input(data, prompt_part_only=False) print ("prompt:\n", prompt) # we have to provide the init state to the sample # prompt = converter.generate_sample_head(data) # print("sample head: ", prompt) # code = """ # The text is : "CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .". The named entities in the text: <extra_id_0> <extra_id_0> organization <extra_id_5> LEICESTERSHIRE <extra_id_1> <extra_id_1> # """ # data = {"text":"Enterprises from domestic coastal provinces and cities increased , and there are altogether 30 enterprise representatives from 30 provinces , cities and autonomous regions coming to this meeting .","tokens":["Enterprises","from","domestic","coastal","provinces","and","cities","increased",",","and","there","are","altogether","30","enterprise","representatives","from","30","provinces",",","cities","and","autonomous","regions","coming","to","this","meeting","."],"entity":[{"type":"geographical social political","offset":[2,3,4,5,6],"text":"domestic coastal provinces and cities"},{"type":"geographical social political","offset":[17,18,19,20,21,22,23],"text":"30 provinces , cities and autonomous regions"},{"type":"organization","offset":[0,1,2,3,4,5,6],"text":"Enterprises from domestic coastal provinces and cities"},{"type":"person","offset":[12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27],"text":"altogether 30 enterprise representatives from 30 provinces , cities and autonomous regions coming to this meeting"}],"relation":[],"event":[],"spot":["person","organization","geographical social political"],"asoc":[],"spot_asoc":[{"span":"Enterprises from domestic coastal provinces and cities","label":"organization","asoc":[]},{"span":"domestic coastal provinces and cities","label":"geographical social political","asoc":[]},{"span":"altogether 30 enterprise representatives from 30 provinces , cities and autonomous regions coming to this meeting","label":"person","asoc":[]},{"span":"30 provinces , cities and autonomous regions","label":"geographical social political","asoc":[]}]} # code = r'The text is : "CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .". The named entities in the text: <0> <0> organization <5> LEICESTERSHIRE <1> <1>\n' code = repr(prompt) # conver the prediction to the answers predictions = converter.output_to_structure(data, code) print (predictions)
这段代码是一个Python脚本,与前一个代码段非常相似,也是用于将文本结构化为适用于自然语言处理任务的输入数据,并将模型的输出结果还原为结构化数据。不过在这个代码段中,有一些新增的功能和修改:
- 导入了一些额外的库和模块,如
numpy
,用于在代码中进行一些数学运算。 NLSELPromptCreator
类的构造函数中,引入了一个新的方法existing_nested
,用于检测输入文本中是否存在嵌套的实体。structure_to_input
方法中,生成的prompt
中还包括了从输入数据中提取的record
信息。output_to_structure
方法中,通过新的existing_nested
方法检测输入数据中是否存在嵌套实体,并将结果存储在生成的结构化数据中。- 在
if __name__ == "__main__":
部分,脚本加载了模式文件、配置文件和示例数据,然后调用NLSELPromptCreator
类的方法,将输入数据结构化为适用于模型的提示文本,并将模型的输出结果还原为结构化数据。此外,还对输入数据进行了一些修改,以测试新功能。
总的来说,这段代码与前一个代码段非常相似,但在一些细节上进行了一些修改和新增功能。它仍然是用于处理和转换文本数据的工具,特别适用于自然语言处理任务中的数据预处理和后处理。
structure2nl_sel_v3.py
import json import re from collections import OrderedDict from typing import List, Union, Dict, Tuple from src.converters.structure_converter import StructureConverter from src.converters.record import EntityRecord, RelationRecord from src.utils.file_utils import load_yaml,load_schema from uie.sel2record.record import MapConfig from uie.sel2record.sel2record import SEL2Record class NLSELPromptCreator(StructureConverter): def __init__(self, schema_folder=None, map_config_path=None): self.schema_dict = SEL2Record.load_schema_dict(schema_folder) self.decoding = 'spotasoc' record_schema = self.schema_dict['record'] self.entity_schema = record_schema.type_list self.relation_schema = record_schema.role_list self.spot_asoc = record_schema.type_role_dict self.map_config = MapConfig.load_from_yaml(map_config_path) def structure_to_input(self, input_dict: dict, prompt_part_only: bool = False): """ {'text': 'CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .', 'entity': [{'type': 'organization', 'offset': [2], 'text': 'LEICESTERSHIRE'}], 'spot': ['organization'], """ text = input_dict['text'] record = input_dict['record'] prompt = [] input = ['The text is : ', "\"" + text + "\". ", "The named entities in the text: " ] prompt.extend(input) if prompt_part_only: return ''.join(prompt) record = record.replace('extra_id_','') record = record.lstrip('<0>').rstrip('<1>').strip() record = record.split('<1>') record = [rec.strip().lstrip('<0>').strip() for rec in record] record_new = [] for rec in record: if rec != '': temp_str = rec temp_tuple = temp_str.split('<5>') assert len(temp_tuple) == 2 temp_tuple = [tt.strip() for tt in temp_tuple] new_str = f'"{temp_tuple[1]}" is "{temp_tuple[0]}" .' record_new.append(new_str) record = ' '.join(record_new) prompt.append(record) return ''.join(prompt) + '\n' def output_to_structure(self, input_dict, output_str): """ sample: {'text': 'CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .', 'tokens': ['CRICKET', '-', 'LEICESTERSHIRE', 'TAKE', 'OVER', 'AT', 'TOP', 'AFTER', 'INNINGS', 'VICTORY', '.'], 'record': '<extra_id_0> <extra_id_0> organization <extra_id_5> LEICESTERSHIRE <extra_id_1> <extra_id_1>', 'entity': [{'type': 'organization', 'offset': [2], 'text': 'LEICESTERSHIRE'}], 'relation': [], 'event': [], 'spot': ['organization'], 'asoc': [], 'spot_asoc': [{'span': 'LEICESTERSHIRE', 'label': 'organization', 'asoc': []}]} code: The text is : "CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .". Find named entities such as organization, person, miscellaneous, location in the text. The organization "LEICESTERSHIRE" exist in the text. :param sample: :param code: :return: """ text = input_dict['text'] tokens = input_dict['tokens'] sel2record = SEL2Record( schema_dict=self.schema_dict, decoding_schema=self.decoding, map_config=self.map_config, ) pattern = re.compile(r"The named entities in the text:\s*(.*)") pred = re.search(pattern, output_str).group(1) pattern = re.compile(r"\"(.*?)\"\sis\s\"(.*?)\"\s.") pred = pattern.findall(pred) pred = [(p[1],p[0]) for p in pred] pred = [' <5> '.join(p) for p in pred] pred = ['<0> ' + p + ' <1>' for p in pred] pred = ' '.join(pred) pred = '<0> ' + pred + ' <1>' pred_record = sel2record.sel2record(pred, text, tokens) return pred_record if __name__ == "__main__": schema_folder = 'data/conll03' map_config_path = 'config/offset_map/first_offset_en.yaml' val_path = 'data/conll03/val.json' with open(val_path) as fin: line = fin.readline() line = fin.readline() line = eval(line.strip()) data = line # print ("dev data:\n", data) converter = NLSELPromptCreator(schema_folder=schema_folder, map_config_path=map_config_path) # convert the whole sample prompt = converter.structure_to_input(data, prompt_part_only=False) print ("prompt:\n", prompt) # we have to provide the init state to the sample # prompt = converter.generate_sample_head(data) # print("sample head: ", prompt) # code = """ # The text is : "CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .". The named entities in the text: <extra_id_0> <extra_id_0> organization <extra_id_5> LEICESTERSHIRE <extra_id_1> <extra_id_1> # """ # data = {"text":"Enterprises from domestic coastal provinces and cities increased , and there are altogether 30 enterprise representatives from 30 provinces , cities and autonomous regions coming to this meeting .","tokens":["Enterprises","from","domestic","coastal","provinces","and","cities","increased",",","and","there","are","altogether","30","enterprise","representatives","from","30","provinces",",","cities","and","autonomous","regions","coming","to","this","meeting","."],"entity":[{"type":"geographical social political","offset":[2,3,4,5,6],"text":"domestic coastal provinces and cities"},{"type":"geographical social political","offset":[17,18,19,20,21,22,23],"text":"30 provinces , cities and autonomous regions"},{"type":"organization","offset":[0,1,2,3,4,5,6],"text":"Enterprises from domestic coastal provinces and cities"},{"type":"person","offset":[12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27],"text":"altogether 30 enterprise representatives from 30 provinces , cities and autonomous regions coming to this meeting"}],"relation":[],"event":[],"spot":["person","organization","geographical social political"],"asoc":[],"spot_asoc":[{"span":"Enterprises from domestic coastal provinces and cities","label":"organization","asoc":[]},{"span":"domestic coastal provinces and cities","label":"geographical social political","asoc":[]},{"span":"altogether 30 enterprise representatives from 30 provinces , cities and autonomous regions coming to this meeting","label":"person","asoc":[]},{"span":"30 provinces , cities and autonomous regions","label":"geographical social political","asoc":[]}]} # code = r'The text is : "CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .". The named entities in the text: <0> <0> organization <5> LEICESTERSHIRE <1> <1>\n' code = repr(prompt) # conver the prediction to the answers predictions = converter.output_to_structure(data, code) print (predictions)
这段代码也是与前面的代码段非常相似,它仍然是用于将文本结构化为适用于自然语言处理任务的输入数据,并将模型的输出结果还原为结构化数据。在这个代码段中,主要的改变包括:
structure_to_input
方法现在可以正确地处理record
信息,将其从模型的输出中提取并格式化为更易读的文本。output_to_structure
方法在处理模型输出时,根据新的格式化规则,将模型的输出解析为结构化数据。这包括解析文本并将其还原为实体。if __name__ == "__main__":
部分加载了模式文件、配置文件和示例数据,然后调用NLSELPromptCreator
类的方法,将输入数据结构化为适用于模型的提示文本,并将模型的输出结果还原为结构化数据。此外,还对输入数据进行了一些修改,以测试新功能。
总的来说,这段代码仍然是用于文本数据的处理和转换工具,特别适用于自然语言处理任务中的数据预处理和后处理。它提供了更复杂的处理能力,可以正确处理record
信息,并按新的格式规则生成输出。
structure2pl_func_v1.py:结构转换器,用于将文本数据转换为适合训练和输入到模型的格式,以及将模型的输出结果还原为结构化数据
import json import re from collections import OrderedDict from typing import List, Union, Dict, Tuple from src.converters.structure_converter import StructureConverter from src.utils.file_utils import load_yaml,load_schema from uie.sel2record.record import EntityRecord, RelationRecord from uie.sel2record.record import MapConfig from uie.sel2record.sel2record import SEL2Record class PLFuncPromptCreator(StructureConverter): def __init__(self, schema_folder=None, map_config_path=None): self.schema_dict = SEL2Record.load_schema_dict(schema_folder) self.decoding = 'spotasoc' record_schema = self.schema_dict['record'] self.entity_schema = record_schema.type_list self.relation_schema = record_schema.role_list self.spot_asoc = record_schema.type_role_dict self.map_config = MapConfig.load_from_yaml(map_config_path) def structure_to_input(self, input_dict: dict, prompt_part_only: bool = False): """ {'text': 'CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .', 'entity': [{'type': 'organization', 'offset': [2], 'text': 'LEICESTERSHIRE'}], 'spot': ['organization'], """ text = input_dict['text'] entity_list = input_dict['entity'] spot_list = input_dict['spot'] prompt = [] goal = 'named entity extraction' func_head = self.to_function_head(self.to_function_name(goal),input='input_text') prompt.append(func_head) docstring = '\t""" extract named entities from the input_text . """' prompt.append(docstring) input_text = f'\tinput_text = "{text}"' prompt.append(input_text) inline_annotation = '\t# extracted named entity list' prompt.append(inline_annotation) if prompt_part_only: return self.list_to_str(prompt) for spot in spot_list: entity_list_name = self.to_function_name(spot) + '_list' tmp_entity_text = [] for ent in entity_list: if ent['type'] == spot: ent_text = ent['text'] tmp_entity_text.append(f'"{ent_text}"') prompt.append(f'\t{entity_list_name} = [' + ', '.join(tmp_entity_text) + ']') prompt = self.list_to_str(prompt) return prompt + '\n' def output_to_structure(self, input_dict, output_str): """ input_dict: {'text': 'West Indian all-rounder Phil Simmons took four for 38 on Friday as Leicestershire beat Somerset by an innings and 39 runs in two days to take over at the head of the county championship .', 'tokens': ['West', 'Indian', 'all-rounder', 'Phil', 'Simmons', 'took', 'four', 'for', '38', 'on', 'Friday', 'as', 'Leicestershire', 'beat', 'Somerset', 'by', 'an', 'innings', 'and', '39', 'runs', 'in', 'two', 'days', 'to', 'take', 'over', 'at', 'the', 'head', 'of', 'the', 'county', 'championship', '.'], 'record': '<extra_id_0> <extra_id_0> miscellaneous <extra_id_5> West Indian <extra_id_1> <extra_id_0> person <extra_id_5> Phil Simmons <extra_id_1> <extra_id_0> organization <extra_id_5> Leicestershire <extra_id_1> <extra_id_0> organization <extra_id_5> Somerset <extra_id_1> <extra_id_1>', 'entity': [{'type': 'organization', 'offset': [12], 'text': 'Leicestershire'}, {'type': 'person', 'offset': [3, 4], 'text': 'Phil Simmons'}, {'type': 'organization', 'offset': [14], 'text': 'Somerset'}, {'type': 'miscellaneous', 'offset': [0, 1], 'text': 'West Indian'}], 'relation': [], 'event': [], 'spot': ['person', 'organization', 'miscellaneous'], 'asoc': [], 'spot_asoc': [{'span': 'West Indian', 'label': 'miscellaneous', 'asoc': []}, {'span': 'Phil Simmons', 'label': 'person', 'asoc': []}, {'span': 'Leicestershire', 'label': 'organization', 'asoc': []}, {'span': 'Somerset', 'label': 'organization', 'asoc': []}]} output_str: def extract_named_entity(input_text): # extract named entities from the input_text. input_text = "West Indian all-rounder Phil Simmons took four for 38 on Friday as Leicestershire beat Somerset by an innings and 39 runs in two days to take over at the head of the county championship ." # extracted named entity list person_list = ["Phil Simmons"] organization_list = ["Leicestershire", "Somerset"] miscellaneous_list = ["West Indian"] :return: """ tokens = input_dict['tokens'] sent_records = {} sent_records['entity'] = [] for entity_s in self.entity_schema: temp_entities = re.findall(f'{self.to_function_name(entity_s)}_list' + r' = \[(.*?)\]', output_str) if len(temp_entities) != 0: temp_entities = temp_entities[0].split(", ") temp_entity_list = [ {'text': e.strip(r'\"'), 'type': entity_s} for e in temp_entities ] sent_records['entity'].extend(temp_entity_list) offset_records = {} record_map = EntityRecord(map_config=self.map_config) offset_records['offset'] = record_map.to_offset( instance=sent_records.get('entity', []), tokens=tokens, ) offset_records['string'] = record_map.to_string( sent_records.get('entity', []), ) """ {'offset': [('opinion', (10,)), ('aspect', (11, 12)), ('opinion', (32,)), ('aspect', (34,))], 'string': [('opinion', 'soft'), ('aspect', 'rubber enclosure'), ('opinion', 'break'), ('aspect', 'seal')]} """ return {"entity": offset_records,"relation": {"offset": [], "string": []},"event": {"offset": [], "string": []}} if __name__ == "__main__": schema_path = 'data/conll03' map_config_path = 'config/offset_map/first_offset_en.yaml' val_path = 'data/conll03/val.json' with open(val_path) as fin: line0 = fin.readline() line1 = fin.readline() line = fin.readline() line = eval(line.strip()) data = line converter = PLFuncPromptCreator(schema_folder=schema_path, map_config_path=map_config_path) # convert the whole sample prompt = converter.structure_to_input(data, prompt_part_only=False) # convert the whole sample # prompt = converter.structure_to_input(data, prompt_part_only=True) # print ("prompt:\n", prompt) data = {"text":"Two goals from defensive errors in the last six minutes allowed Japan to come from behind and collect all three points from their opening meeting against Syria .","tokens":["Two","goals","from","defensive","errors","in","the","last","six","minutes","allowed","Japan","to","come","from","behind","and","collect","all","three","points","from","their","opening","meeting","against","Syria","."],"entity":[{"type":"location","offset":[26],"text":"Syria"},{"type":"location","offset":[11],"text":"Japan"}],"relation":[],"event":[],"spot":["location"],"asoc":[],"spot_asoc":[{"span":"Japan","label":"location","asoc":[]},{"span":"Syria","label":"location","asoc":[]}]} code = r'def named_entity_extraction(input_text):\n\t\"\"\" extract named entities from the input_text . \"\"\"\n\tinput_text = \"Two goals from defensive errors in the last six minutes allowed Japan to come from behind and collect all three points from their opening meeting against Syria .\"\n\t# extracted named entity list\n\tlocation_list = [\"Syria\"]\n' print (data) print (code) # conver the prediction to the answers predictions = converter.output_to_structure(data, code) print ("output: \n") print (predictions)
这段代码看起来是一个结构转换器,用于将文本数据转换为适合训练和输入到模型的格式,以及将模型的输出结果还原为结构化数据。在这个特定的示例中,它是为了从文本中提取命名实体而设计的。
以下是代码的一些关键部分:
structure_to_input
方法:这个方法将输入文本数据转化为一个Python函数的形式,函数用于从输入文本中提取命名实体。它生成一个Python函数头,包括函数名和参数(input_text
),然后生成一个文档字符串,指明函数的目的。接下来,它提供了输入文本,然后为每个命名实体类型生成一个Python列表,以存储从文本中提取的实体。最后,它将生成的代码合并成一个完整的Python函数。output_to_structure
方法:这个方法用于将模型生成的代码输出还原为结构化数据。它从输出字符串中提取每个命名实体类型的实体列表,并将它们映射回原始文本的偏移位置。最后,它返回包含命名实体信息的字典,其中包括偏移位置和字符串表示。if __name__ == "__main__":
部分加载了模式文件、配置文件和示例数据。然后,它使用PLFuncPromptCreator
类将示例数据转化为模型输入的提示文本,并将模型的输出代码还原为结构化数据。这里还包括了一个硬编码的示例数据和相应的输出代码。
总的来说,这段代码是一个通用的结构转换器,可以用于将文本数据转化为适合输入到模型的格式,以及将模型的输出结果还原为结构化数据。这在自然语言处理任务中非常有用,特别是在需要处理命名实体提取的任务中。
structure2pl_func_v2.py:将结构化数据转化为模型的输入提示文本,从生成的代码中提取出结构化数据
import json import re from collections import OrderedDict from typing import List, Union, Dict, Tuple from src.converters.structure_converter import StructureConverter from src.utils.file_utils import load_yaml,load_schema from uie.sel2record.record import EntityRecord, RelationRecord from uie.sel2record.record import MapConfig from uie.sel2record.sel2record import SEL2Record """ def extract_named_entity(input_text): # extract named entities from the input_text . input_text = "Steve became CEO of Apple in 1998" # extracted named entities person = ["Steve"] organization = ["Apple"] person = ["Steve"] organization = ["Apple"] """ class PLFuncPromptCreator(StructureConverter): def __init__(self, schema_folder=None, map_config_path=None): self.schema_dict = SEL2Record.load_schema_dict(schema_folder) self.decoding = 'spotasoc' record_schema = self.schema_dict['record'] self.entity_schema = record_schema.type_list self.relation_schema = record_schema.role_list self.spot_asoc = record_schema.type_role_dict self.map_config = MapConfig.load_from_yaml(map_config_path) def structure_to_input(self, input_dict: dict, prompt_part_only: bool = False): """ {'text': 'CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .', 'entity': [{'type': 'organization', 'offset': [2], 'text': 'LEICESTERSHIRE'}], 'spot': ['organization'], "spot_asoc":[{"span":"Japan","label":"location","asoc":[]},{"span":"Syria","label":"location","asoc":[]}] """ text = input_dict['text'] spot_asoc_list = input_dict['spot_asoc'] prompt = [] goal = 'named entity extraction' func_head = self.to_function_head(self.to_function_name(goal),input='input_text') prompt.append(func_head) docstring = '\t""" extract named entities from the input_text . """' prompt.append(docstring) input_text = f'\tinput_text = "{text}"' prompt.append(input_text) inline_annotation = '\t# extracted named entities' prompt.append(inline_annotation) if prompt_part_only: return self.list_to_str(prompt) for sc in spot_asoc_list: entity_text = sc['span'] entity_type = self.to_function_name(sc['label']) prompt.append(f'\t{entity_type} = [ {entity_text} ]') prompt = self.list_to_str(prompt) return prompt + '\n' def output_to_structure(self, input_dict, output_str): """ input_dict: {'text': 'West Indian all-rounder Phil Simmons took four for 38 on Friday as Leicestershire beat Somerset by an innings and 39 runs in two days to take over at the head of the county championship .', 'tokens': ['West', 'Indian', 'all-rounder', 'Phil', 'Simmons', 'took', 'four', 'for', '38', 'on', 'Friday', 'as', 'Leicestershire', 'beat', 'Somerset', 'by', 'an', 'innings', 'and', '39', 'runs', 'in', 'two', 'days', 'to', 'take', 'over', 'at', 'the', 'head', 'of', 'the', 'county', 'championship', '.'], 'record': '<extra_id_0> <extra_id_0> miscellaneous <extra_id_5> West Indian <extra_id_1> <extra_id_0> person <extra_id_5> Phil Simmons <extra_id_1> <extra_id_0> organization <extra_id_5> Leicestershire <extra_id_1> <extra_id_0> organization <extra_id_5> Somerset <extra_id_1> <extra_id_1>', 'entity': [{'type': 'organization', 'offset': [12], 'text': 'Leicestershire'}, {'type': 'person', 'offset': [3, 4], 'text': 'Phil Simmons'}, {'type': 'organization', 'offset': [14], 'text': 'Somerset'}, {'type': 'miscellaneous', 'offset': [0, 1], 'text': 'West Indian'}], 'relation': [], 'event': [], 'spot': ['person', 'organization', 'miscellaneous'], 'asoc': [], 'spot_asoc': [{'span': 'West Indian', 'label': 'miscellaneous', 'asoc': []}, {'span': 'Phil Simmons', 'label': 'person', 'asoc': []}, {'span': 'Leicestershire', 'label': 'organization', 'asoc': []}, {'span': 'Somerset', 'label': 'organization', 'asoc': []}]} output_str: def extract_named_entity(input_text): # extract named entities from the input_text. input_text = "West Indian all-rounder Phil Simmons took four for 38 on Friday as Leicestershire beat Somerset by an innings and 39 runs in two days to take over at the head of the county championship ." # extracted named entity list person_list = ["Phil Simmons"] organization_list = ["Leicestershire", "Somerset"] miscellaneous_list = ["West Indian"] :return: """ tokens = input_dict['tokens'] sent_records = {} sent_records['entity'] = [] for entity_s in self.entity_schema: temp_entities = re.findall(f'{self.to_function_name(entity_s)}' + r' = \[(.*?)\]', output_str) if len(temp_entities) != 0: temp_entity_list = [ {'text': e.strip(), 'type': entity_s} for e in temp_entities ] sent_records['entity'].extend(temp_entity_list) offset_records = {} record_map = EntityRecord(map_config=self.map_config) offset_records['offset'] = record_map.to_offset( instance=sent_records.get('entity', []), tokens=tokens, ) offset_records['string'] = record_map.to_string( sent_records.get('entity', []), ) """ {'offset': [('opinion', (10,)), ('aspect', (11, 12)), ('opinion', (32,)), ('aspect', (34,))], 'string': [('opinion', 'soft'), ('aspect', 'rubber enclosure'), ('opinion', 'break'), ('aspect', 'seal')]} """ return {"entity": offset_records,"relation": {"offset": [], "string": []},"event": {"offset": [], "string": []}} if __name__ == "__main__": schema_path = 'data/conll03' map_config_path = 'config/offset_map/first_offset_en.yaml' val_path = 'data/conll03/val.json' with open(val_path) as fin: line0 = fin.readline() line1 = fin.readline() line = fin.readline() line = eval(line.strip()) data = line print ('data: ', data) print ('data keys: ', data.keys()) converter = PLFuncPromptCreator(schema_folder=schema_path, map_config_path=map_config_path) # convert the whole sample prompt = converter.structure_to_input(data, prompt_part_only=False) print ("prompt:\n", prompt) code = repr(prompt) # conver the prediction to the answers predictions = converter.output_to_structure(data, code) print ("output: \n") print (predictions)
这段代码是为了执行两个主要任务:
- 将结构化数据(如文本和实体)转化为 Python 函数的形式,该函数用于从输入文本中提取命名实体。该函数被创建为
extract_named_entity(input_text)
,其中input_text
是函数的输入参数,用于传入要分析的文本。在函数内部,命名实体被提取并存储在不同的列表中(如person
和organization
)。此函数还包含有关如何从输入文本中提取命名实体的注释。 - 将生成的代码字符串(Python 函数)反转回结构化数据。这是通过分析生成的 Python 代码字符串以提取从文本中提取的命名实体完成的。函数的返回值是包含偏移位置和字符串表示的字典,其中包括命名实体信息。
代码的执行如下:
- 它首先加载了模式文件和配置文件,然后从示例数据中获取一个示例。
- 然后,使用
PLFuncPromptCreator
类将示例数据转化为模型输入的提示文本,生成的 Python 函数包括示例中提到的命名实体类型和实体。 - 代码还执行了逆操作,将生成的代码字符串从函数中提取出结构化数据。
总之,这段代码是一个示例,演示了如何将结构化数据转化为模型的输入提示文本,并如何从生成的代码中提取出结构化数据。这在自然语言处理任务中特别有用,尤其是在需要进行信息提取的场景中。
structure2pl_func_v3.py
import json import re from collections import OrderedDict from typing import List, Union, Dict, Tuple from src.converters.structure_converter import StructureConverter from src.utils.file_utils import load_yaml,load_schema from uie.sel2record.record import EntityRecord, RelationRecord from uie.sel2record.record import MapConfig from uie.sel2record.sel2record import SEL2Record """ def extract_named_entity(input_text): # extract named entities from the input_text . input_text = "Steve became CEO of Apple in 1998" # extracted named entities person = ["Steve"] organization = ["Apple"] person = ["Steve"] organization = ["Apple"] """ class PLFuncPromptCreator(StructureConverter): def __init__(self, schema_folder=None, map_config_path=None): self.schema_dict = SEL2Record.load_schema_dict(schema_folder) self.decoding = 'spotasoc' record_schema = self.schema_dict['record'] self.entity_schema = record_schema.type_list self.relation_schema = record_schema.role_list self.spot_asoc = record_schema.type_role_dict self.map_config = MapConfig.load_from_yaml(map_config_path) def structure_to_input(self, input_dict: dict, prompt_part_only: bool = False): """ {'text': 'CRICKET - LEICESTERSHIRE TAKE OVER AT TOP AFTER INNINGS VICTORY .', 'entity': [{'type': 'organization', 'offset': [2], 'text': 'LEICESTERSHIRE'}], 'spot': ['organization'], "spot_asoc":[{"span":"Japan","label":"location","asoc":[]},{"span":"Syria","label":"location","asoc":[]}] """ text = input_dict['text'] spot_asoc_list = input_dict['spot_asoc'] prompt = [] goal = 'named entity extraction' func_head = self.to_function_head(self.to_function_name(goal),input='input_text') prompt.append(func_head) docstring = '\t""" extract named entities from the input_text . """' prompt.append(docstring) input_text = f'\tinput_text = "{text}"' prompt.append(input_text) inline_annotation = '\t# extracted named entities' prompt.append(inline_annotation) if prompt_part_only: return self.list_to_str(prompt) for sc in spot_asoc_list: entity_text = sc['span'] entity_type = self.to_function_name(sc['label']) prompt.append(f'\t{entity_type} = [ "{entity_text}" ]') prompt = self.list_to_str(prompt) return prompt + '\n' def output_to_structure(self, input_dict, output_str): """ input_dict: {'text': 'West Indian all-rounder Phil Simmons took four for 38 on Friday as Leicestershire beat Somerset by an innings and 39 runs in two days to take over at the head of the county championship .', 'tokens': ['West', 'Indian', 'all-rounder', 'Phil', 'Simmons', 'took', 'four', 'for', '38', 'on', 'Friday', 'as', 'Leicestershire', 'beat', 'Somerset', 'by', 'an', 'innings', 'and', '39', 'runs', 'in', 'two', 'days', 'to', 'take', 'over', 'at', 'the', 'head', 'of', 'the', 'county', 'championship', '.'], 'record': '<extra_id_0> <extra_id_0> miscellaneous <extra_id_5> West Indian <extra_id_1> <extra_id_0> person <extra_id_5> Phil Simmons <extra_id_1> <extra_id_0> organization <extra_id_5> Leicestershire <extra_id_1> <extra_id_0> organization <extra_id_5> Somerset <extra_id_1> <extra_id_1>', 'entity': [{'type': 'organization', 'offset': [12], 'text': 'Leicestershire'}, {'type': 'person', 'offset': [3, 4], 'text': 'Phil Simmons'}, {'type': 'organization', 'offset': [14], 'text': 'Somerset'}, {'type': 'miscellaneous', 'offset': [0, 1], 'text': 'West Indian'}], 'relation': [], 'event': [], 'spot': ['person', 'organization', 'miscellaneous'], 'asoc': [], 'spot_asoc': [{'span': 'West Indian', 'label': 'miscellaneous', 'asoc': []}, {'span': 'Phil Simmons', 'label': 'person', 'asoc': []}, {'span': 'Leicestershire', 'label': 'organization', 'asoc': []}, {'span': 'Somerset', 'label': 'organization', 'asoc': []}]} output_str: def extract_named_entity(input_text): # extract named entities from the input_text. input_text = "West Indian all-rounder Phil Simmons took four for 38 on Friday as Leicestershire beat Somerset by an innings and 39 runs in two days to take over at the head of the county championship ." # extracted named entity list person_list = ["Phil Simmons"] organization_list = ["Leicestershire", "Somerset"] miscellaneous_list = ["West Indian"] :return: """ tokens = input_dict['tokens'] sent_records = {} sent_records['entity'] = [] for entity_s in self.entity_schema: temp_entities = re.findall(f'{self.to_function_name(entity_s)}' + r' = \[(.*?)\]', output_str) if len(temp_entities) != 0: temp_entity_list = [ {'text': e.strip().strip(r'\"') , 'type': entity_s} for e in temp_entities ] sent_records['entity'].extend(temp_entity_list) offset_records = {} record_map = EntityRecord(map_config=self.map_config) offset_records['offset'] = record_map.to_offset( instance=sent_records.get('entity', []), tokens=tokens, ) offset_records['string'] = record_map.to_string( sent_records.get('entity', []), ) """ {'offset': [('opinion', (10,)), ('aspect', (11, 12)), ('opinion', (32,)), ('aspect', (34,))], 'string': [('opinion', 'soft'), ('aspect', 'rubber enclosure'), ('opinion', 'break'), ('aspect', 'seal')]} """ return {"entity": offset_records,"relation": {"offset": [], "string": []},"event": {"offset": [], "string": []}} if __name__ == "__main__": schema_path = 'data/conll03' map_config_path = 'config/offset_map/first_offset_en.yaml' val_path = 'data/conll03/val.json' with open(val_path) as fin: line0 = fin.readline() line1 = fin.readline() line = fin.readline() line = eval(line.strip()) data = line # print ('data: ', data) # print ('data keys: ', data.keys()) converter = PLFuncPromptCreator(schema_folder=schema_path, map_config_path=map_config_path) # convert the whole sample prompt = converter.structure_to_input(data, prompt_part_only=False) # print ("prompt:\n", prompt) code = repr(prompt) data = {"text":"China controlled most of the match and saw several chances missed until the 78th minute when Uzbek striker Igor Shkvyrin took advantage of a misdirected defensive header to lob the ball over the advancing Chinese keeper and into an empty net .","tokens":["China","controlled","most","of","the","match","and","saw","several","chances","missed","until","the","78th","minute","when","Uzbek","striker","Igor","Shkvyrin","took","advantage","of","a","misdirected","defensive","header","to","lob","the","ball","over","the","advancing","Chinese","keeper","and","into","an","empty","net","."],"entity":[{"type":"miscellaneous","offset":[16],"text":"Uzbek"},{"type":"miscellaneous","offset":[34],"text":"Chinese"},{"type":"person","offset":[18,19],"text":"Igor Shkvyrin"},{"type":"location","offset":[0],"text":"China"}],"relation":[],"event":[],"spot":["person","miscellaneous","location"],"asoc":[],"spot_asoc":[{"span":"China","label":"location","asoc":[]},{"span":"Uzbek","label":"miscellaneous","asoc":[]},{"span":"Igor Shkvyrin","label":"person","asoc":[]},{"span":"Chinese","label":"miscellaneous","asoc":[]}],"input_idx":5,"input_prompt":"def named_entity_extraction(input_text):\n\t\"\"\" extract named entities from the input_text . \"\"\"\n\tinput_text = \"China controlled most of the match and saw several chances missed until the 78th minute when Uzbek striker Igor Shkvyrin took advantage of a misdirected defensive header to lob the ball over the advancing Chinese keeper and into an empty net .\"\n\t# extracted named entities","reference_output":"def named_entity_extraction(input_text):\n\t\"\"\" extract named entities from the input_text . \"\"\"\n\tinput_text = \"China controlled most of the match and saw several chances missed until the 78th minute when Uzbek striker Igor Shkvyrin took advantage of a misdirected defensive header to lob the ball over the advancing Chinese keeper and into an empty net .\"\n\t# extracted named entities\n\tlocation = [ \"China\" ]\n\tmiscellaneous = [ \"Uzbek\" ]\n\tperson = [ \"Igor Shkvyrin\" ]\n\tmiscellaneous = [ \"Chinese\" ]\n"} code = r'def named_entity_extraction(input_text):\n\t\"\"\" extract named entities from the input_text . \"\"\"\n\tinput_text = \"China controlled most of the match and saw several chances missed until the 78th minute when Uzbek striker Igor Shkvyrin took advantage of a misdirected defensive header to lob the ball over the advancing Chinese keeper and into an empty net .\"\n\t# extracted named entities\n\tlocation = [ \"China\" ]\n\tperson = [ \"Igor Shkvyrin\" ]\n\tlocation = [ \"Uzbek\" ]\n' # conver the prediction to the answers print (data) print (code) predictions = converter.output_to_structure(data, code) print ("output: \n") print (predictions)
这段代码是一个用于将结构化数据转换为Python函数及反向操作的示例。这个示例的主要目的是为了将输入的文本数据中提取命名实体,并将提取的结果转化为Python函数,以及反向操作,从Python函数中提取出命名实体的结构化数据。
首先,PLFuncPromptCreator
类的构造函数中加载了模式和配置文件,这些文件用于定义命名实体和其它实体之间的关系。
然后,在 structure_to_input
方法中,输入的结构化数据包括文本和相关的命名实体信息。该方法将这些信息转化为Python函数的形式,函数名为 extract_named_entity(input_text)
,其中 input_text
是输入文本,用于传递要提取命名实体的文本。函数内部包括了有关如何从文本中提取命名实体的注释,以及命名实体的提取结果。这些结果被以Python变量的形式嵌入到生成的代码中,如 person = ["Steve"]
和 organization = ["Apple"]
。
接下来,output_to_structure
方法执行反向操作,从生成的Python函数中提取出命名实体的结构化数据。它分析了生成的Python代码字符串以查找变量和命名实体,然后将它们整理成一个结构化的数据格式。这个数据格式包括命名实体的类型、文本和偏移位置。
最后,代码示例提供了一个示例数据,以及使用前述方法生成的Python代码。然后,使用 output_to_structure
方法将生成的Python代码转化为结构化数据,以验证反向操作是否能够成功提取出命名实体的信息。
总的来说,这个示例是一个简单的端到端示例,演示了如何将结构化数据转化为Python函数形式,以及如何从Python函数中提取出结构化数据。这对于自然语言处理和信息提取任务非常有用。
structure2pl_func_v4.py
structure2pl_func_v5.py
structure2pl_func_v6.py
structure2pl_func_v7.py
structure2pl_func_v8.py
get_converter.py:创建不同类型的文本结构到Python函数的转换器
# NER tasks from src.converters.ner.structure2pl_func_v5 import PLFuncPromptCreator as NERPLFuncPromptCreator from src.converters.ner.structure2nl_sel_v2 import NLSELPromptCreator as NERNLSELPromptCreator # RE tasks from src.converters.re.structure2pl_func_v5 import PLFuncPromptCreator as REPLFuncPromptCreator from src.converters.re.structure2nl_sel_v2 import NLSELPromptCreator as RENLSELPromptCreator class ConverterFactory: converter_to_class = { # ner "ner-pl-func": NERPLFuncPromptCreator, "ner-nl-sel": NERNLSELPromptCreator, # re "re-pl-func": REPLFuncPromptCreator, "re-nl-sel": RENLSELPromptCreator } supported_converters = list(converter_to_class.keys()) @staticmethod def get_converter(job_type: str, **kwargs): if job_type not in ConverterFactory.supported_converters: raise ValueError(f"Unsupported job type: {job_type}") return ConverterFactory.converter_to_class[job_type](**kwargs)
这段代码是一个名为 ConverterFactory
的类,它用于创建不同类型的文本结构到Python函数的转换器。这些转换器是为命名实体识别(NER)和关系抽取(RE)等自然语言处理任务设计的。以下是一些关键细节:
ConverterFactory
类包括以下子类转换器,每个子类用于不同的任务:
- NER(命名实体识别)相关任务的转换器:
NERPLFuncPromptCreator
用于生成Python函数的NER任务。NERNLSELPromptCreator
用于生成NL-Sel(自然语言到结构化查询语言)的NER任务。
- RE(关系抽取)相关任务的转换器:
REPLFuncPromptCreator
用于生成Python函数的RE任务。RENLSELPromptCreator
用于生成NL-Sel的RE任务。
ConverterFactory
类维护了一个名为converter_to_class
的字典,该字典将任务类型(例如:“ner-pl-func”)映射到相应的转换器类。这使得根据任务类型选择正确的转换器变得非常方便。supported_converters
列表包含了所有受支持的任务类型。通过检查任务类型是否包含在此列表中,您可以验证是否支持所请求的任务类型。get_converter
方法是工厂的核心方法,用于根据任务类型返回适当的转换器实例。如果请求的任务类型不受支持,它会引发一个值错误(ValueError
)。
总之,ConverterFactory
类提供了一个通用的接口,用于根据任务类型选择适当的转换器。这使得在不同的自然语言处理任务中,使用不同类型的文本结构转换变得更加方便和模块化。
record.py:提供了不同的映射策略和工具函数,以便进行息抽取、关系抽取和事件抽取任务。
这段代码定义了一些用于文本转换和映射的类和函数。具体来说,它包括以下部分:
MapConfig
类:这个类用于配置映射策略,包括map_strategy
(映射策略)、de_duplicate
(是否去重)、span_to_token
(用于将文本转换为标记的策略)等。Record
类:这是一个基类,其他特定记录类(EntityRecord
、RelationRecord
、EventRecord
)都继承自它。它包含一个span_to_token
方法,用于将文本转换为标记。EntityRecord
类:用于将生成的字符串转换为包含实体信息(类型和范围)的记录。这个类包括方法,可以将生成的记录列表转换为实体信息,还可以将这些实体信息映射到文本中的标记位置。RelationRecord
类:用于将生成的字符串转换为包含关系信息(关系类型、参数1类型、参数1范围、参数2类型、参数2范围)的记录。这个类包括方法,可以将生成的记录列表转换为关系信息,还可以将这些关系信息映射到文本中的标记位置。EventRecord
类:用于将生成的字符串转换为包含事件信息(事件类型、触发词范围、角色信息)的记录。这个类包括方法,可以将生成的记录列表转换为事件信息,还可以将这些事件信息映射到文本中的标记位置。
总之,这段代码主要用于处理自然语言文本中的信息抽取、关系抽取和事件抽取任务,提供了不同的映射策略和工具函数以便进行这些任务。
#!/usr/bin/env python # -*- coding:utf-8 -*- from asyncio.log import logger import numpy from src.converters.utils import span_to_token, match_sublist, check_overlap, get_index_tuple import logging logger = logging.getLogger("__main__") class MapConfig: def __init__(self, map_strategy: str = 'first', de_duplicate: bool = True, span_to_token: str = 'space') -> None: self.map_strategy = map_strategy self.de_duplicate = de_duplicate self.span_to_token = span_to_token def __repr__(self) -> str: repr_list = [ f"map_strategy: {self.map_strategy}", f"de_duplicate: {self.de_duplicate}", f"span_to_token: {self.span_to_token}", ] return ', '.join(repr_list) @staticmethod def load_from_yaml(config_file): import yaml with open(config_file) as fin: config = yaml.load(fin, Loader=yaml.FullLoader) return MapConfig( map_strategy=config['map_strategy'], de_duplicate=config['de_duplicate'], span_to_token=config['span_to_token'], ) class Record: def __init__(self, map_config) -> None: self._map_config = map_config def span_to_token(self, text): return span_to_token(text, span_to_token_strategy=self._map_config['span_to_token']) class EntityRecord(Record): """ Record for converting generated string to information record <type, span> """ @staticmethod def to_string(pred_record_list): entity_list = list() for pred_record in pred_record_list: record_type, record_text = pred_record['type'], pred_record['text'] if record_text == "": logger.warning(f"Empty Extraction {pred_record}") continue entity_list += [(record_type, record_text)] return entity_list def to_offset(self, instance, tokens): map_strategy_dict = { 'first': self.record_to_offset_first_role, 'closest': self.record_to_offset_closest_role, 'longer_first': self.record_to_offset_longer_first, } if self._map_config['map_strategy'] in map_strategy_dict: map_function = map_strategy_dict[self._map_config['map_strategy']] return map_function( instance=instance, token_list=tokens, ) else: raise NotImplementedError( f"The map strategy {self._map_config.map_strategy} in {self.__class__} is not implemented." ) def record_to_offset_closest_role( self, instance, token_list, ): """ Find Role's offset using closest matched with trigger word. :param instance: :return: """ return self.record_to_offset_first_role(instance, token_list=token_list) def record_to_offset_first_role(self, instance, token_list): """ Find Entity's offset using first matched in the sentence. :param instance: :return: """ entity_list = list() entity_matched_set = set() for pred_record in instance: record_type, record_text = pred_record['type'], pred_record['text'] if record_text == "": logger.warning(f"Empty Extraction {pred_record}") continue matched_list = match_sublist(token_list, self.span_to_token(record_text)) for matched in matched_list: if (record_type, matched) not in entity_matched_set: entity_list += [(record_type, tuple(range(matched[0], matched[1] + 1)))] entity_matched_set.add((record_type, matched)) break return entity_list def record_to_offset_longer_first(self, instance, token_list): """ Find Entity's offset using first matched in the sentence. :param instance: :return: """ entity_list = list() entity_matched_set = set() for x in instance: x['length'] = len(x['text']) instance.sort(reverse=True, key=lambda x: x['length']) for pred_record in instance: record_type, record_text = pred_record['type'], pred_record['text'] if record_text == "": logger.warning(f"Empty Extraction {pred_record}") continue matched_list = match_sublist(token_list, self.span_to_token(record_text)) for matched in matched_list: flag = False for _, g in entity_matched_set: if check_overlap(g, matched): flag = True if flag: continue if (record_type, matched) not in entity_matched_set: entity_list += [(record_type, tuple(range(matched[0], matched[1] + 1)))] entity_matched_set.add((record_type, matched)) break return entity_list class RelationRecord(Record): """ Record for converting generated string to information record <type, arg1_type, arg1_span, arg2_type, arg2_span> """ def to_offset(self, instance, tokens): map_strategy_dict = { 'first': self.record_to_offset_first_role, 'closest': self.record_to_offset_closest_role, 'longer_first': self.record_to_offset_closest_role, } if self._map_config['map_strategy'] in map_strategy_dict: map_function = map_strategy_dict[self._map_config['map_strategy']] return map_function( instance=instance, token_list=tokens, ) else: raise NotImplementedError( f"The map strategy {self._map_config['map_strategy']} in {self.__class__} is not implemented." ) @staticmethod def to_string(instance): relation_list = list() for record in instance: relation_type = record['type'] relation = [relation_type] if len(record['roles']) < 2: continue for role_type, text_str in record['roles'][:2]: relation += [role_type, text_str] relation_list += [tuple(relation)] return relation_list def record_to_offset_first_role(self, instance, token_list): """ Find Role's offset using first matched in the sentence. :param instance: :return: """ relation_list = list() for record in instance: relation_type = record['type'] if len(record['roles']) < 2: continue relation = [relation_type] for role_type, text_str in record['roles'][:2]: matched_list = match_sublist(token_list, self.span_to_token(text_str)) if len(matched_list) == 0: logger.warning("[Cannot reconstruct]: %s %s\n" % (text_str, token_list)) break relation += [role_type, get_index_tuple(matched_list[0])] if len(relation) != 5 or (self._map_config.de_duplicate and tuple(relation) in relation_list): continue relation_list += [tuple(relation)] return relation_list def record_to_offset_closest_role(self, instance, token_list): """ Find Role's offset using closest matched with trigger word. :param instance: :return: """ relation_list = list() for record in instance: relation_type = record['type'] if len(record['roles']) < 2: continue arg1_type, arg1_text = record['roles'][0] arg2_type, arg2_text = record['roles'][1] arg1_matched_list = match_sublist(token_list, self.span_to_token(arg1_text)) if len(arg1_matched_list) == 0: logger.warning("[Retry]: %s %s\n" % (arg1_text, token_list)) arg1_matched_list = match_sublist(token_list, self.span_to_token(arg1_text + '.')) arg2_matched_list = match_sublist(token_list, self.span_to_token(arg2_text)) if len(arg2_matched_list) == 0: logger.warning("[Retry]: %s %s\n" % (arg2_text, token_list)) arg2_matched_list = match_sublist(token_list, self.span_to_token(arg2_text + '.')) if len(arg1_matched_list) == 0: logger.warning("[Cannot reconstruct]: %s %s\n" % (arg1_text, token_list)) break if len(arg2_matched_list) == 0: logger.warning("[Cannot reconstruct]: %s %s\n" % (arg2_text, token_list)) break distance_tuple = list() for arg1_match in arg1_matched_list: for arg2_match in arg2_matched_list: distance = abs(arg1_match[0] - arg2_match[0]) distance_tuple += [(distance, arg1_match, arg2_match)] distance_tuple.sort() relation = [ relation_type, arg1_type, get_index_tuple(distance_tuple[0][1]), arg2_type, get_index_tuple(distance_tuple[0][2]), ] if self._map_config['de_duplicate'] and tuple( relation) in relation_list: continue relation_list += [tuple(relation)] return relation_list class EventRecord(Record): """ Record for converting generated string to information record in predicate-arguments { type: pred_type, trigger: predicate_span, args: [(arg_type, arg_span), ...] } """ def to_offset(self, instance, tokens): map_strategy_dict = { 'first': self.record_to_offset_first_role, 'closest': self.record_to_offset_closest_role, 'longer_first': self.record_to_offset_closest_role, } if self._map_config.map_strategy in map_strategy_dict: map_function = map_strategy_dict[self._map_config.map_strategy] return map_function( instance=instance, token_list=tokens, ) else: raise NotImplementedError( f"The map strategy {self._map_config.map_strategy} in {self.__class__} is not implemented." ) @staticmethod def to_string(instance): """ {'type': 'Justice:Appeal', 'trigger': 'appeal', 'roles': [ ('Adjudicator', 'court'), ('Plaintiff', 'Anwar') ], } """ return instance def record_to_offset_first_role(self, instance, token_list): """ Find Role's offset using first matched in the sentence. """ record_list = list() trigger_matched_set = set() for record in instance: event_type = record['type'] trigger = record['trigger'] matched_list = match_sublist(token_list, self.span_to_token(trigger)) if len(matched_list) == 0: logger.warning("[Cannot reconstruct]: %s %s\n" % (trigger, token_list)) continue trigger_offset = None for matched in matched_list: if matched not in trigger_matched_set: trigger_offset = get_index_tuple(matched) trigger_matched_set.add(matched) break # No trigger word, skip the record if trigger_offset is None: break pred_record = { 'type': event_type, 'roles': [], 'trigger': trigger_offset } for role_type, text_str in record['roles']: matched_list = match_sublist(token_list, self.span_to_token(text_str)) if len(matched_list) == 0: logger.warning("[Cannot reconstruct]: %s %s\n" % (text_str, token_list)) continue pred_record['roles'] += [(role_type, get_index_tuple(matched_list[0]))] record_list += [pred_record] return record_list def record_to_offset_closest_role(self, instance, token_list): """ Find Role's offset using closest matched with trigger word. """ record_list = list() trigger_matched_set = set() for record in instance: event_type = record['type'] trigger = record['trigger'] matched_list = match_sublist(token_list, self.span_to_token(trigger)) if len(matched_list) == 0: logger.warning("[Cannot reconstruct]: %s %s\n" % (trigger, token_list)) continue trigger_offset = None for matched in matched_list: if matched not in trigger_matched_set: trigger_offset = get_index_tuple(matched) trigger_matched_set.add(matched) break # No trigger word, skip the record if trigger_offset is None or len(trigger_offset) == 0: break pred_record = { 'type': event_type, 'roles': [], 'trigger': trigger_offset } for role_type, text_str in record['roles']: matched_list = match_sublist(token_list, self.span_to_token(text_str)) if len(matched_list) == 0: logger.warning("[Cannot reconstruct]: %s %s\n" % (text_str, token_list)) else: abs_distances = [ abs(match[0] - trigger_offset[0]) for match in matched_list ] closest_index = numpy.argmin(abs_distances) pred_record['roles'] += [( role_type, get_index_tuple(matched_list[closest_index]))] record_list += [pred_record] return record_list