【网安AIGC专题11.1】12 CODEIE用于NER和RE:顶刊OpenAI API调用、CodeX比chatgpt更好:提示工程设计+控制变量对比实验(格式一致性、模型忠实度、细粒度性能)(下)

本文涉及的产品
NLP自然语言处理_基础版,每接口每天50万次
NLP 自学习平台,3个模型定制额度 1个月
NLP自然语言处理_高级版,每接口累计50万次
简介: 【网安AIGC专题11.1】12 CODEIE用于NER和RE:顶刊OpenAI API调用、CodeX比chatgpt更好:提示工程设计+控制变量对比实验(格式一致性、模型忠实度、细粒度性能)

structure_converter.py:抽象基类用于定义结构转换器的接口

class StructureConverter(object):
    def structure_to_input(self, input_dict: dict, prompt_part_only: bool = False) -> str:
        raise NotImplementedError()
    def output_to_structure(self, input_dict: dict, output_str: str):
        raise NotImplementedError()
    @staticmethod
    def to_function_head(s,input=''):
        return f'def {s}({input}):'
    @staticmethod
    def to_function_name(s):
        s = s.replace(".", "").replace(",", "")
        # remove DT
        tok = s.lower().split()
        tok = [x for x in tok if x not in ['the', 'a', 'an']]
        return '_'.join(tok)
    @staticmethod
    def list_to_str(l):
        # remove \n
        l = [x.replace("\n", " ") if x != '\n' else '' for x in l]
        l = '\n'.join(l)
        return l

这是一个名为 StructureConverter 的Python类,它是一个抽象基类(Abstract Base Class)用于定义结构转换器的接口。这个类包含以下几个方法和静态方法:

  1. structure_to_input(self, input_dict: dict, prompt_part_only: bool = False) -> str:这是一个抽象方法,用于将输入数据结构转换为文本输入。派生类应该实现这个方法来定义特定任务的输入结构到文本输入的转换逻辑。
  2. output_to_structure(self, input_dict: dict, output_str: str):这是另一个抽象方法,用于将输出文本转换回数据结构。与 structure_to_input 类似,它的实现应该由派生类来完成。
  3. to_function_head(s, input=''):这是一个静态方法,用于生成Python函数的头部,其中 s 是函数的名称,input 是函数的参数。它返回一个字符串,表示函数定义。
  4. to_function_name(s):这是另一个静态方法,用于生成合法的Python函数名。给定一个字符串 s,它将字符串处理为合适的Python函数名称格式,删除标点符号并将单词连接成下划线分隔的格式。
  5. list_to_str(l):这也是一个静态方法,用于将字符串列表 l 转换为一个字符串,并处理换行符 \n。它将列表中的每个字符串连接成一个字符串,换行符替换为空格,返回一个多行的字符串。

StructureConverter 类本身是一个抽象基类,无法直接实例化。相反,它提供了接口和一些实用方法,供派生类实现和使用,以定义特定任务的结构转换逻辑。这个类可以作为其他具体结构转换器类的基类,以提供通用的接口和方法。

utils.py:字符串处理、列表操作和代码编译的实用工具函数

import re
import sys
from typing import Tuple
def match_sublist(the_list, to_match):
    """
    :param the_list: [1, 2, 3, 4, 5, 6, 1, 2, 4, 5]
    :param to_match: [1, 2]
    :return:
        [(0, 1), (6, 7)]
    """
    len_to_match = len(to_match)
    matched_list = list()
    for index in range(len(the_list) - len_to_match + 1):
        if to_match == the_list[index:index + len_to_match]:
            matched_list += [(index, index + len_to_match - 1)]
    return matched_list
def check_overlap(x, y):
    if x[0] > y[1] or y[0] > x[1]:
        return False
    else:
        return True
def get_index_tuple(matched: Tuple[int, int]):
    return tuple(range(matched[0], matched[1] + 1))
def span_to_token(text, span_to_token_strategy='space'):
    if span_to_token_strategy == 'space':
        return text.split(' ')
    elif span_to_token_strategy == 'list':
        return list(text)
    else:
        raise NotImplementedError(
            f"The span to token strategy {span_to_token_strategy} is not implemented.")
def to_camel_case(title: str) -> str:
    """Converts a proscript title to a camel case string.
    Example:
        title: travel to the theme park
        camel_case: TravelToThemePark
    """
    if "." == title[-1]:
        title = title[:-1]
    title_tokens = title.split(" ")
    title_camel_case = ""
    for token in title_tokens:
        title_camel_case += token.capitalize()
    return title_camel_case
def to_snake_case(name):
    # replace all space and punctuation with underscore
    if name[-1] == ".":
        name = name[:-1]
    name = re.sub(r'[\s\W]', '_', name)
    return name.lower().strip()
def from_snake_to_normal_str(snake_str: str) -> str:
    """Converts a snake case string to a normal string.
    Example:
        snake_str: travel_to_the_theme_park
        normal_str: travel to the theme park
    """
    return " ".join(snake_str.split("_"))
def compile_code_get_object(py_code_str: str):
    """Given python code as a string, compiles it 
    and returns an object of the class contained in the string.
    Args:
        code (str): _description_
    """
    # compile the code
    try:
        py_code = compile(py_code_str, "<string>", "exec")
    except SyntaxError:
        # try without the last k lines in py_code_str: usually the last line is incomplete
        for k in range(1, 3):
            try:
                lines = py_code_str.split("\n")
                lines = "\n".join(lines[:-k])
                py_code = compile(lines, "<string>", "exec")
            except SyntaxError as e:
                print(f"Error compiling python code:\n{py_code_str}")
                raise e
    # instantiate the class
    py_code_dict = {}
    exec(py_code, py_code_dict)
    # the newly instantiated class will be last in the scope
    py_code_class = py_code_dict[list(py_code_dict.keys())[-1]]()
    return py_code_class

这些函数是一组用于字符串处理、列表操作和代码编译的实用工具函数。以下是这些函数的简要描述:

  1. match_sublist(the_list, to_match):查找列表 the_list 中所有与 to_match 匹配的子列表,并返回它们的索引范围。例如,the_list[1, 2, 3, 4, 5, 6, 1, 2, 4, 5]to_match[1, 2],则返回 [(0, 1), (6, 7)],表示匹配的子列表的起始和结束索引。
  2. check_overlap(x, y):检查两个索引范围 xy 是否有重叠,如果有重叠返回 True,否则返回 False
  3. get_index_tuple(matched: Tuple[int, int]):将匹配的索引范围 matched 转换为一个元组,包含范围内的所有索引。
  4. span_to_token(text, span_to_token_strategy='space'):将文本 text 按照指定策略 span_to_token_strategy 分割为单词或字符列表。默认使用空格分割策略,可以选择 'list' 策略以将文本拆分为字符列表。
  5. to_camel_case(title: str):将一个标题字符串 title 转换为驼峰命名法格式。例如,将 “travel to the theme park” 转换为 “TravelToThemePark”。
  6. to_snake_case(name):将字符串 name 转换为蛇形命名法(snake_case)格式,用下划线分隔单词。
  7. from_snake_to_normal_str(snake_str: str):将蛇形命名法字符串 snake_str 转换为普通字符串,通过删除下划线并用空格分隔单词。
  8. compile_code_get_object(py_code_str: str):编译给定的 Python 代码字符串 py_code_str 并返回该代码中包含的类的对象。这个函数首先编译代码,然后实例化类并返回其对象。

这些函数提供了一些用于文本处理、字符串格式转换和代码执行的常见功能,可在不同的上下文中使用。

eval

eval_extraction.py:评估NLP任务的结果与金标准之间的差异

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import argparse
import json
import os
import sys
import numpy as np
from pprint import pprint
from src.eval.scorer import EntityScorer, RelationScorer, EventScorer
def read_file(file_name):
    return [line for line in open(file_name).readlines()]
def write_to_file(result, output_filename, prefix=None):
    with open(output_filename, 'w') as output:
        for key, value in result.items():
            if prefix:
                key = '%s_%s' % (prefix, key)
            output.write("%s=%s\n" % (key, value))
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-g', dest='gold_folder', help="Golden Dataset folder")
    parser.add_argument('-gf', dest='gold_file', help="Golden Dataset File")
    parser.add_argument('-p', dest='pred_folder', nargs='+', help="Predicted model folder")
    parser.add_argument('-pf', dest='pred_file', help="Predicted model file")
    parser.add_argument('-sf', dest='saved_file', help="Saved result file")
    parser.add_argument('-v', dest='verbose', action='store_true', help='Show more information during running')
    parser.add_argument('-w', dest='write_to_file', action='store_true', help="Write evaluation results to predicted folder")
    parser.add_argument('-m', dest='match_mode', default='normal', choices=['set', 'normal', 'multimatch'])
    parser.add_argument('-case', dest='case', action='store_true', help='Show case study')
    options = parser.parse_args()
    data_dict = {
        'test': [options.pred_file, options.gold_file],
    }
    task_dict = {
        'entity': EntityScorer,
        'relation': RelationScorer,
        'event': EventScorer,
    }
    result_list = {'eval': list(), 'test': list()}
    for pred_folder in options.pred_folder:
        gold_folder = options.gold_folder
        for data_key, (generation, gold_file) in data_dict.items():
            gold_filename = os.path.join(gold_folder, gold_file)
            pred_filename = os.path.join(pred_folder, generation)
            if not os.path.exists(pred_filename):
                sys.stderr.write("%s not found.\n" % pred_filename)
                continue
            print("pred:", pred_filename)
            print("gold:", gold_filename)
            if options.case:
                for pred_line, gold_line in zip(read_file(pred_filename), read_file(gold_filename)):
                    gold_instance = json.loads(gold_line)
                    pred_instance = json.loads(pred_line)
                    print('=========================')
                    print(gold_instance['text'])
                    for task in task_dict:
                        scorer = task_dict[task]
                        gold = scorer.load_gold_list([gold_instance[task]])[0]
                        pred = scorer.load_pred_list([pred_instance[task]])[0]
                        min_length = max(
                            len(gold['string']),
                            len(pred['string']),
                            len(gold.get('string_trigger', [])),
                            len(pred.get('string_trigger', [])),
                            len(gold.get('string_role', [])),
                            len(pred.get('string_role', [])),
                        )
                        if min_length == 0:
                            continue
                        if task == 'entity':
                            print("Entity Gold:", sorted(gold['string']))
                            print("Entity Pred:", sorted(pred['string']))
                        if task == 'relation':
                            print("Relation Gold:", sorted(gold['string']))
                            print("Relation Pred:", sorted(pred['string']))
                        if task == 'event':
                            print("Event Gold Trigger:", sorted(gold['string_trigger']))
                            print("Event Pred Trigger:", sorted(pred['string_trigger']))
                            print("Event Gold Role   :", sorted(gold['string_role']))
                            print("Event Pred Role   :", sorted(pred['string_role']))
            results = dict()
            for task in task_dict:
                if task not in json.loads(read_file(pred_filename)[0]):
                    continue
                scorer = task_dict[task]
                gold_list = [json.loads(line)[task] for line in read_file(gold_filename)]
                pred_list = [json.loads(line)[task] for line in read_file(pred_filename)]
                ##########  23-01-07
                ill_formed = [json.loads(line)['statistic']['ill-formed'] for line in read_file(pred_filename)]
                assert len(pred_list) == len(gold_list)
                gold_instance_list = scorer.load_gold_list(gold_list)
                pred_instance_list = scorer.load_pred_list(pred_list)
                assert len(pred_instance_list) == len(gold_instance_list)
                sub_results = scorer.eval_instance_list(
                    gold_instance_list=gold_instance_list,
                    pred_instance_list=pred_instance_list,
                    verbose=options.verbose,
                    match_mode=options.match_mode,
                )
                results.update(sub_results)
            result_list[data_key] += [results]
            if options.write_to_file:
                output_filename = "%s/%s" % (pred_folder, options.saved_file)
                write_to_file(
                    result=results,
                    output_filename=output_filename,
                    prefix=data_key,
                )
if __name__ == "__main__":
    main()

这段代码是一个命令行工具,用于评估NLP任务的结果与金标准之间的差异。具体来说,它有以下功能:

  1. 从命令行参数中获取要评估的模型输出(predicted results)和金标准数据(golden dataset)。
  2. 支持对不同任务(entity、relation、event)的评估。
  3. 支持设置不同的评估模式,包括"set"、"normal"和"multimatch"等。
  4. 可以在评估结果中输出更多信息,以便进行更详细的分析。
  5. 可以将评估结果写入文件,保存到预测结果文件夹中。

主要的功能包括:

  • 读取模型的预测结果和金标准数据文件。
  • 对模型输出和金标准数据进行解析,提取相应任务(entity、relation、event)的信息。
  • 调用评估工具类(EntityScorerRelationScorerEventScorer)对预测结果和金标准数据进行评估。
  • 计算各项评估指标,包括精确度、召回率、F1分数等,并将结果输出到控制台或保存到文件中。
  • 如果设置了"case"选项,还会输出详细的案例分析信息,包括模型的预测结果和金标准数据。

该工具主要用于评估NLP任务的结果,帮助研究人员和从业者了解模型的性能和改进方向。

extract_results.py:评估NLP任务中的模型预测结果,并将评估后的结果存储到指定的输出文件中

import argparse
import json
import random
from tqdm import tqdm
import subprocess
from src.converters.get_converter import ConverterFactory
import pandas as pd
def eval(src_file, pred_file, save_file, job_type,
         schema_path, map_config_path, pred_key='generated_code'):
    src_d = pd.read_json(src_file, orient='records', lines=True)
    with open(pred_file, 'r') as f:
        pred_d = []
        for line in f:
            data = json.loads(line.strip())
            pred_d.append(data)
    converter = ConverterFactory.get_converter(job_type=job_type, schema_folder=schema_path, map_config_path=map_config_path)
    prediction_list = []
    ill_formed = 0
    invalid_label = 0
    invalid_text_span = 0
    invalid_label_asoc = 0
    invalid_text_span_asoc = 0
    for qid, src_data in tqdm(src_d.iterrows(), total=len(src_d)):
        pred = pred_d[qid]
        predictions = converter.output_to_structure(src_data, pred[pred_key])
        if predictions['statistic']['ill-formed'] == True:
            ill_formed += 1
        if predictions['statistic']['Invalid-Label'] == True:
            invalid_label += 1
        if predictions['statistic']['Invalid-Text-Span'] == True:
            invalid_text_span += 1
        if predictions['statistic']['Invalid-Label-asoc'] == True:
            invalid_label_asoc += 1
        if predictions['statistic']['Invalid-Text-Span-asoc'] == True:
            invalid_text_span_asoc += 1
        prediction_list.append(predictions)
    pd.DataFrame(prediction_list).to_json(save_file, orient='records', lines=True)
    print ("ill_formed number: ", ill_formed)
    print ("Invalid-Label number: ", invalid_label)
    print ("Invalid-Text-Span number: ", invalid_text_span)
    print ("Invalid-Label-asoc number: ", invalid_label_asoc)
    print ("Invalid-Text-Span-asoc number: ", invalid_text_span_asoc)
def config():
    parser = argparse.ArgumentParser()
    parser.add_argument('--raw_output_file', type=str)
    parser.add_argument('--output_file', type=str)
    parser.add_argument('--src_file', type=str)
    parser.add_argument('--job_type', type=str)
    parser.add_argument("--schema_path", type=str, required=True)
    parser.add_argument("--map_config_path", type=str, required=True)
    parser.add_argument("--pred_key", type=str, default='generated_code')
    args = parser.parse_args()
    return args
if __name__ == "__main__":
    args = config()
    save_file = args.output_file
    eval(args.src_file, args.raw_output_file,
         save_file, args.job_type,
         args.schema_path, args.map_config_path, args.pred_key)

这段代码主要用于评估NLP任务中的模型预测结果,并将评估后的结果存储到指定的输出文件中。以下是其主要功能:

  1. 从命令行参数中读取各种输入文件和配置参数,包括原始输出文件(raw_output_file)、输出文件(output_file)、数据源文件(src_file)、任务类型(job_type)、模式配置路径(schema_path)、映射配置路径(map_config_path)、预测键名(pred_key)等。
  2. 通过Converter工厂类创建适当的转换器(如NER或RE),用于将原始模型预测结果转换为结构化的评估结果。
  3. 遍历数据源文件中的每个示例,对每个示例的原始模型预测结果进行结构化转换和评估。
  4. 计算各种评估指标,包括识别标签不合法(Invalid-Label)、文本跨度不合法(Invalid-Text-Span)、标签关联不合法(Invalid-Label-asoc)、文本跨度关联不合法(Invalid-Text-Span-asoc)等。
  5. 统计不合法的预测结果数量,包括不合法的标签、文本跨度和它们的关联。
  6. 最后,将评估后的结果以JSON格式保存到输出文件中,并在控制台输出统计信息。

这段代码用于自动化评估模型预测结果,帮助研究人员和从业者了解模型性能,以便进行进一步的改进和分析。

src\eval\scorer.py:提供一个通用的评估框架,使用户能够方便地计算不同任务的评估指标

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# adapted from https://raw.githubusercontent.com/universal-ie/UIE/main/uie/extraction/scorer.py
from collections import defaultdict
from copy import deepcopy
from typing import Dict, List
import sys
def tuple_offset(offset):
    if isinstance(offset, tuple):
        return offset
    else:
        return tuple(offset)
class Metric:
    """ Tuple Metric """
    def __init__(self, verbose=False, match_mode='normal'):
        self.tp = 0.
        self.gold_num = 0.
        self.pred_num = 0.
        self.verbose = verbose
        self.match_mode = match_mode
        assert self.match_mode in {'set', 'normal', 'multimatch'}
    def __repr__(self) -> str:
        return f"tp: {self.tp}, gold: {self.gold_num}, pred: {self.pred_num}"
    @staticmethod
    def safe_div(a, b):
        if b == 0.:
            return 0.
        else:
            return a / b
    def compute_f1(self, prefix=''):
        tp = self.tp
        pred_num = self.pred_num
        gold_num = self.gold_num
        p, r = self.safe_div(tp, pred_num), self.safe_div(tp, gold_num)
        return {prefix + 'tp': tp,
                prefix + 'gold': gold_num,
                prefix + 'pred': pred_num,
                prefix + 'P': p * 100,
                prefix + 'R': r * 100,
                prefix + 'F1': self.safe_div(2 * p * r, p + r) * 100
                }
    def count_instance(self, gold_list, pred_list):
        if self.match_mode == 'set':
            gold_list = set(gold_list)
            pred_list = set(pred_list)
            if self.verbose:
                print("Gold:", gold_list)
                print("Pred:", pred_list)
            self.gold_num += len(gold_list)
            self.pred_num += len(pred_list)
            self.tp += len(gold_list & pred_list)
        else:
            if self.verbose:
                print("Gold:", gold_list)
                print("Pred:", pred_list)
            self.gold_num += len(gold_list)
            self.pred_num += len(pred_list)
            if len(gold_list) > 0 and len(pred_list) > 0:
                # guarantee length same
                assert len(gold_list[0]) == len(pred_list[0])
            dup_gold_list = deepcopy(gold_list)
            for pred in pred_list:
                if pred in dup_gold_list:
                    self.tp += 1
                    if self.match_mode == 'normal':
                        # Each Gold Instance can be matched one time
                        dup_gold_list.remove(pred)
    def count_batch_instance(self, batch_gold_list, batch_pred_list):
        for gold_list, pred_list in zip(batch_gold_list, batch_pred_list):
            self.count_instance(gold_list=gold_list, pred_list=pred_list)
class RecordMetric(Metric):
    """ 不考虑不同 Role 之间的顺序,例如事件论元"""
    @staticmethod
    def is_equal(gold, pred):
        if gold['type'] != pred['type']:
            return False
        if gold['spot'] != pred['spot']:
            return False
        if len(gold['asocs']) != len(pred['asocs']):
            return False
        for gold_role, pred_role in zip(sorted(gold['asocs']), sorted(pred['asocs'])):
            if gold_role != pred_role:
                return False
        return True
    def count_instance(self, gold_list, pred_list):
        if self.match_mode == 'set':
            raise NotImplementedError(f'{self.__class__.__name__} do not support the match model `set`')
        if self.verbose:
            print("Gold:", gold_list)
            print("Pred:", pred_list)
        self.gold_num += len(gold_list)
        self.pred_num += len(pred_list)
        gold_indexes = list(range(len(gold_list)))
        non_found = [True] * len(gold_list)
        for pred in pred_list:
            for gold_index in gold_indexes:
                if non_found[gold_index] and self.is_equal(gold_list[gold_index], pred):
                    self.tp += 1
                    non_found[gold_index] = False
                    if self.match_mode == 'normal':
                        break
class OrderedRecordMetric(RecordMetric):
    """ 考虑不同 Role 之间的顺序,例如关系 """
    @staticmethod
    def is_equal(gold, pred):
        if gold['type'] != pred['type']:
            return False
        if gold['spot'] != pred['spot']:
            return False
        if len(gold['asocs']) != len(pred['asocs']):
            return False
        for gold_role, pred_role in zip(gold['asocs'], pred['asocs']):
            if gold_role != pred_role:
                return False
        return True
def warning_tp_increment(gold, pred, prefix):
    sys.stderr.write(f"{prefix} TP Increment Warning, Gold Offset: {gold['offset']}\n")
    sys.stderr.write(f"{prefix} TP Increment Warning, Pred Offset: {pred['offset']}\n")
    sys.stderr.write(f"{prefix} TP Increment Warning, Gold String: {gold['string']}\n")
    sys.stderr.write(f"{prefix} TP Increment Warning, Pred String: {pred['string']}\n")
    sys.stderr.write(f"===============\n")
class Scorer:
    @staticmethod
    def load_gold_list(gold_list, offset_key=None):
        raise NotImplementedError
    @staticmethod
    def load_pred_list(pred_list):
        raise NotImplementedError
    @staticmethod
    def eval_instance_list(gold_instance_list, pred_instance_list, verbose=False, match_mode='normal'):
        raise NotImplementedError
class EntityScorer(Scorer):
    @staticmethod
    def load_gold_list(gold_list: List[List[Dict]]):
        """ Load gold instance to `string` and `offset`
        Args:
            gold_list (List[List[Dict]]): [description]
                [
                    [
                        {'type': 'Geo-political', 'offset': [7], 'text': 'seattle'},
                        {'type': 'Location', 'offset': [11], 'text': 'lot'},
                        {'type': 'Geo-political', 'offset': [14], 'text': 'city'}
                    ],
                    [...]
                ]
        Returns:
            List[Dict]: each instance has `offset` and `string`
                [
                    {
                        'offset': [('Geo-political', (7,)), ('Location', (11,)), ('Geo-political', (14,))],
                        'string': [('Geo-political', 'seattle'), ('Location', 'lot'), ('Geo-political', 'city')]
                    },
                    {...}, ...
                ]
        """
        gold_instance_list = []
        for gold in gold_list:
            gold_offset = list()
            gold_string = list()
            for span in gold:
                span_label = span['type']
                span_offset = span['offset']
                span_text = span['text']
                gold_offset += [(span_label, tuple_offset(span_offset))]
                gold_string += [(span_label, span_text)]
            gold_instance = {
                'offset': gold_offset,
                'string': gold_string,
            }
            gold_instance_list += [gold_instance]
        return gold_instance_list
    @staticmethod
    def load_pred_list(pred_list: List[Dict]):
        """[summary]
        Args:
            pred_list (List[Dict]): [description]
                [
                    {
                        'offset': [['Geo-political', [7]], ['Geo-political', [14]]],
                        'string': [['Geo-political', 'seattle'], ['Geo-political', 'city']]
                    },
                    {...},
                ]
        Returns:
            List[Dict] : each relation instance has `offset` and `string`
                [
                    {
                        'offset': [('Geo-political', (7,)), ('Geo-political', (14,))],
                        'string': [('Geo-political', 'seattle'), ('Geo-political', 'city')]
                    }
                ]
        """
        pred_instance_list = list()
        for pred in pred_list:
            for offset_pred in pred['offset']:
                if not isinstance(offset_pred[1], tuple):
                    offset_pred[1] = tuple_offset(offset_pred[1])
            pred['offset'] = [tuple_offset(p) for p in pred['offset']]
            pred['string'] = [tuple_offset(p) for p in pred['string']]
            pred_instance_list += [pred]
        return pred_instance_list
    @staticmethod
    def eval_instance_list(gold_instance_list: List[Dict], pred_instance_list: List[Dict], verbose=False, match_mode='normal'):
        """[summary]
        Args:
            gold_instance_list (List[Dict]): [description]
                [
                    {
                        'offset': [('Geo-political', (7,)), ('Location', (11,)), ('Geo-political', (14,))],
                        'string': [('Geo-political', 'seattle'), ('Location', 'lot'), ('Geo-political', 'city')]
                    },
                    {...}, ...
                ]
            pred_instance_list (List[Dict]): [description]
                [
                    {
                        'offset': [('Geo-political', (7,)), ('Geo-political', (14,))],
                        'string': [('Geo-political', 'seattle'), ('Geo-political', 'city')]
                    }
                ]
            verbose (bool, optional): [description]. Defaults to False.
            match_mode (string, optional): [description]. Defaults to `normal` .
        Returns:
            Dict: Result of Evaluation
                (offset, string) X (gold, pred, tp, P, R, F1)
        """
        metrics = {
            'string': Metric(verbose=verbose, match_mode=match_mode),
            'offset': Metric(verbose=verbose, match_mode=match_mode),
        }
        for pred, gold in zip(pred_instance_list, gold_instance_list):
            pre_string_tp, pre_offset_tp = metrics['string'].tp, metrics['offset'].tp
            for eval_key in metrics:
                metrics[eval_key].count_instance(
                    gold_list=gold.get(eval_key, []),
                    pred_list=pred.get(eval_key, [])
                )
            post_string_tp, post_offset_tp = metrics['string'].tp, metrics['offset'].tp
            if verbose and post_offset_tp - pre_offset_tp != post_string_tp - pre_string_tp:
                warning_tp_increment(gold=gold, pred=pred, prefix='Entity')
        results = dict()
        for eval_key in metrics:
            results.update(metrics[eval_key].compute_f1(prefix=eval_key + '-ent-'))
        return results
class RelationScorer(Scorer):
    @staticmethod
    def load_gold_list(gold_list: List[List[Dict]]):
        """[summary]
        Args:
            gold_list (List[List[Dict]]): List of Sentece, each sentence contains a List of Relation Dict
                [
                    [
                        {
                            'type': 'Part-whole',
                            'args': [{'type': 'Location', 'offset': [11], 'text': 'lot'}, {'type': 'Geo-political', 'offset': [14], 'text': 'city'}]
                        }, ...
                    ],
                    [...],
                ]
        Returns:
            List[Dict]: List of Sentece, each sentence contains two List (offset, string) of Relation Tuple
                [
                    {
                        'offset': [('Part-whole', 'Geo-political', (0,), 'Geo-political', (2,)), ... ],
                        'string': [('Part-whole', 'Geo-political', 'MULTAN', 'Geo-political', 'Pakistan'), ...]
                    }
                ]
        """
        gold_instance_list = []
        for gold in gold_list:
            gold_instance = defaultdict(list)
            for record in gold:
                assert len(record['args']) == 2
                gold_instance['offset'] += [(
                    record['type'],
                    record['args'][0]['type'],
                    tuple_offset(record['args'][0]['offset']),
                    record['args'][1]['type'],
                    tuple_offset(record['args'][1]['offset']),
                )]
                gold_instance['string'] += [(
                    record['type'],
                    record['args'][0]['type'],
                    record['args'][0]['text'],
                    record['args'][1]['type'],
                    record['args'][1]['text'],
                )]
            gold_instance_list += [gold_instance]
        return gold_instance_list
    @staticmethod
    def load_pred_list(pred_list):
        """[summary]
        Args:
            pred_list (List[Dict]): List of Sentece, each sentence contains two List (offset, string) of Relation List
                [
                    {
                        'offset': [['Part-whole', 'Geo-political', [0], 'Geo-political', [2]]],
                        'string': [['Part-whole', 'Geo-political', 'MULTAN', 'Geo-political', 'Pakistan']],
                    }, ...
                ]
        Returns:
            List[Dict]: List of Sentece, each sentence contains two List (offset, string) of Relation Tuple
                [
                    {
                        'offset': [('Part-whole', 'Geo-political', (0,), 'Geo-political', (2,))],
                        'string': [('Part-whole', 'Geo-political', 'MULTAN', 'Geo-political', 'Pakistan')]
                    }, ...
                ]
        """
        pred_instance_list = list()
        for pred in pred_list:
            for offset_pred in pred['offset']:
                if not isinstance(offset_pred[2], tuple):
                    offset_pred[2] = tuple_offset(offset_pred[2])
                if not isinstance(offset_pred[4], tuple):
                    offset_pred[4] = tuple_offset(offset_pred[4])
            pred['offset'] = [tuple_offset(p) for p in pred['offset']]
            pred['string'] = [tuple_offset(p) for p in pred['string']]
            pred_instance_list += [pred]
        return pred_instance_list
    @staticmethod
    def eval_instance_list(gold_instance_list, pred_instance_list, verbose=False, match_mode='normal'):
        """[summary]
        Args:
            gold_instance_list (List[Dict]): List of Sentece, each sentence contains two List (offset, string) of Relation Tuple
                [
                    {
                        'offset': [('Part-whole', 'Geo-political', (0,), 'Geo-political', (2,)), ... ],
                        'string': [('Part-whole', 'Geo-political', 'MULTAN', 'Geo-political', 'Pakistan'), ...]
                    }
                ]
            pred_instance_list ([type]): List of Sentece, each sentence contains two List (offset, string) of Relation Tuple
                [
                    {
                        'offset': [('Part-whole', 'Geo-political', (0,), 'Geo-political', (2,))],
                        'string': [('Part-whole', 'Geo-political', 'MULTAN', 'Geo-political', 'Pakistan')]
                    }, ...
                ]
            verbose (bool, optional): Defaults to False.
            match_mode (string, optional): [description]. Defaults to `normal` .
        Returns:
            Dict: Result of Evaluation
                (offset, string) X (boundary, strict) X (gold, pred, tp, P, R, F1)
        """
        # Span Boundary and Type
        metrics = {
            'offset': Metric(verbose=verbose, match_mode=match_mode),
            'string': Metric(verbose=verbose, match_mode=match_mode),
        }
        # Span Boundary Only
        boundary_metrics = {
            'offset': Metric(verbose=verbose, match_mode=match_mode),
            'string': Metric(verbose=verbose, match_mode=match_mode),
        }
        for pred, gold in zip(pred_instance_list, gold_instance_list):
            pre_string_tp, pre_offset_tp = metrics['string'].tp, metrics['offset'].tp
            for eval_key in metrics:
                # Span Boundary and Type
                metrics[eval_key].count_instance(
                    gold_list=gold.get(eval_key, []),
                    pred_list=pred.get(eval_key, []),
                )
            post_string_tp, post_offset_tp = metrics['string'].tp, metrics['offset'].tp
            if verbose and (post_offset_tp - pre_offset_tp != post_string_tp - pre_string_tp):
                warning_tp_increment(gold=gold, pred=pred, prefix='Relation Strict')
            pre_string_tp, pre_offset_tp = boundary_metrics['string'].tp, boundary_metrics['offset'].tp
            for eval_key in boundary_metrics:
                # Span Boundary Only
                boundary_metrics[eval_key].count_instance(
                    gold_list=[(x[0], x[2], x[4]) for x in gold.get(eval_key, [])],
                    pred_list=[(x[0], x[2], x[4]) for x in pred.get(eval_key, [])],
                )
            post_string_tp, post_offset_tp = boundary_metrics['string'].tp, boundary_metrics['offset'].tp
            if verbose and post_offset_tp - pre_offset_tp != post_string_tp - pre_string_tp:
                warning_tp_increment(gold=gold, pred=pred, prefix='Relation Boundary')
        results = dict()
        for eval_key in metrics:
            results.update(metrics[eval_key].compute_f1(prefix=eval_key + '-rel-strict-'))
        for eval_key in boundary_metrics:
            results.update(boundary_metrics[eval_key].compute_f1(prefix=eval_key + '-rel-boundary-'))
        return results
class EventScorer(Scorer):
    @staticmethod
    def load_gold_list(gold_list):
        """[summary]
        Args:
            gold_list (List[List[Dict]]): List of Sentece, each sentence contains a List of Event Dict
                [
                    [ # Sentance
                        { # Event Record
                            'type': 'Die',
                            'offset': [16],
                            'text': 'shot',
                            'args': [
                                {'type': 'Victim', 'offset': [17], 'text': 'himself'},
                                {'type': 'Agent', 'offset': [5, 6], 'text': 'John Joseph'},
                                {'type': 'Place', 'offset': [23], 'text': 'court'}
                            ]
                        },
                    ]
                ]
        Returns:
            List[Dict]: List of Sentece, each sentence contains Four List of Event Tuple
                [
                    {
                        'offset_trigger': [('Die', (16,)), ('Convict', (30,))],
                        'string_trigger': [('Die', 'shot'), ('Convict', 'convicted')],
                        'offset_role': [('Die', 'Victim', (17,)), ('Die', 'Agent', (5, 6)), ('Die', 'Place', (23,))],
                        'string_role': [('Die', 'Victim', 'himself'), ('Die', 'Agent', 'John Joseph'), ('Die', 'Place', 'court')]
                    },
                    ...
                ]
        """
        gold_instance_list = []
        for gold in gold_list:
            gold_instance = defaultdict(list)
            for record in gold:
                gold_instance['offset_trigger'] += [(record['type'], tuple_offset(record['offset']))]
                gold_instance['string_trigger'] += [(record['type'], record['text'])]
                for arg in record['args']:
                    gold_instance['offset_role'] += [(record['type'], arg['type'], tuple_offset(arg['offset']))]
                    gold_instance['string_role'] += [(record['type'], arg['type'], arg['text'])]
            gold_instance_list += [gold_instance]
        return gold_instance_list
    @staticmethod
    def load_pred_list(pred_list):
        """[summary]
        Args:
            pred_list (List[Dict]): List of Sentece, each sentence contains two List (offset, string) of Event List
                [
                    {
                        'offset': [{'type': 'Attack', 'roles': [['Attacker', [5, 6]], ['Place', [23]], ['Target', [17]]], 'trigger': [16]}],
                        'string': [{'roles': [['Attacker', 'John Joseph'], ['Place', 'court'], ['Target', 'himself']], 'type': 'Attack', 'trigger': 'shot'}],
                    },
                    ...
                ]
        Returns:
            List[Dict]: List of Sentece, each sentence contains four List (offset, string) X (trigger, role) of Event List
                [
                    {
                        'offset_trigger': [('Attack', (16,))],
                        'offset_role': [('Attack', 'Attacker', (5, 6)), ('Attack', 'Place', (23,)), ('Attack', 'Target', (17,))],
                        'string_trigger': [('Attack', 'shot')],
                        'string_role': [('Attack', 'Attacker', 'John Joseph'), ('Attack', 'Place', 'court'), ('Attack', 'Target', 'himself')],
                    },
                    ...
                ]
        """
        pred_instance_list = list()
        for pred in pred_list:
            pred_instance = defaultdict(list)
            for offset_pred in pred['offset']:
                event_type, trigger_offset = offset_pred['type'], tuple_offset(offset_pred['trigger'])
                pred_instance['offset_trigger'] += [(event_type, trigger_offset)]
                for role_type, role_offset in offset_pred['roles']:
                    pred_instance['offset_role'] += [(event_type, role_type, tuple_offset(role_offset))]
            for string_pred in pred['string']:
                event_type, trigger_string = string_pred['type'], string_pred['trigger']
                pred_instance['string_trigger'] += [(event_type, trigger_string)]
                for role_type, role_string in string_pred['roles']:
                    pred_instance['string_role'] += [(event_type, role_type, role_string)]
            pred_instance_list += [pred_instance]
        return pred_instance_list
    @staticmethod
    def eval_instance_list(gold_instance_list, pred_instance_list, verbose=False, match_mode='normal'):
        """[summary]
        Args:
            gold_instance_list (List[Dict]): List of Sentece, each sentence contains Four List of Event Tuple
                [
                    {
                        'offset_trigger': [('Die', (16,)), ('Convict', (30,))],
                        'string_trigger': [('Die', 'shot'), ('Convict', 'convicted')],
                        'offset_role': [('Die', 'Victim', (17,)), ('Die', 'Agent', (5, 6)), ('Die', 'Place', (23,))],
                        'string_role': [('Die', 'Victim', 'himself'), ('Die', 'Agent', 'John Joseph'), ('Die', 'Place', 'court')]
                    },
                    ...
                ]
            pred_instance_list (List[Dict]): List of Sentece, each sentence contains four List (offset, string) X (trigger, role) of Event List
                [
                    {
                        'offset_trigger': [('Attack', (16,))],
                        'offset_role': [('Attack', 'Attacker', (5, 6)), ('Attack', 'Place', (23,)), ('Attack', 'Target', (17,))],
                        'string_trigger': [('Attack', 'shot')],
                        'string_role': [('Attack', 'Attacker', 'John Joseph'), ('Attack', 'Place', 'court'), ('Attack', 'Target', 'himself')],
                    },
                    ...
                ]
            verbose (bool, optional): [description]. Defaults to False.
            match_mode (string, optional): [description]. Defaults to `normal`.
        Returns:
            Dict: Result of Evaluation
                (offset, string) X (trigger, role) X (gold, pred, tp, P, R, F1)
        """
        trigger_metrics = {
            'offset': Metric(verbose=verbose, match_mode=match_mode),
            'string': Metric(verbose=verbose, match_mode=match_mode),
        }
        role_metrics = {
            'offset': Metric(verbose=verbose, match_mode=match_mode),
            'string': Metric(verbose=verbose, match_mode=match_mode),
        }
        for pred, gold in zip(pred_instance_list, gold_instance_list):
            pre_string_tp, pre_offset_tp = trigger_metrics['string'].tp, trigger_metrics['offset'].tp
            for eval_key in trigger_metrics:
                trigger_metrics[eval_key].count_instance(
                    gold_list=gold.get(eval_key + '_trigger', []),
                    pred_list=pred.get(eval_key + '_trigger', [])
                )
            post_string_tp, post_offset_tp = trigger_metrics['string'].tp, trigger_metrics['offset'].tp
            if verbose and post_offset_tp - pre_offset_tp != post_string_tp - pre_string_tp:
                warning_tp_increment(gold=gold, pred=pred, prefix='Trigger')
            pre_string_tp, pre_offset_tp = role_metrics['string'].tp, role_metrics['offset'].tp
            for eval_key in role_metrics:
                role_metrics[eval_key].count_instance(
                    gold_list=gold.get(eval_key + '_role', []),
                    pred_list=pred.get(eval_key + '_role', [])
                )
            post_string_tp, post_offset_tp = role_metrics['string'].tp, role_metrics['offset'].tp
            if verbose and post_offset_tp - pre_offset_tp != post_string_tp - pre_string_tp:
                warning_tp_increment(gold=gold, pred=pred, prefix='Role')
        results = dict()
        for eval_key in trigger_metrics:
            results.update(trigger_metrics[eval_key].compute_f1(prefix=f'{eval_key}-evt-trigger-'))
        for eval_key in role_metrics:
            results.update(role_metrics[eval_key].compute_f1(prefix=f'{eval_key}-evt-role-'))
        return results

prompt

constants.py

END = "# END"
END_LINE = "\n----------------------------------------"

这段代码定义了一个评估(scoring)模块,用于计算不同任务(如实体识别、关系抽取、事件抽取)的评估指标。下面是代码的主要组成部分:

  1. Metric 类:这是一个通用的评估指标类,用于计算真阳性(true positives,tp)、金标样本数(gold_num)、预测样本数(pred_num)以及相关的评估指标,如精确度(P)、召回率(R)和 F1 分数。safe_div 方法用于进行除法计算,以避免除以零的情况。compute_f1 方法计算 F1 分数。count_instance 方法用于计算指标的真阳性、金标样本数和预测样本数。
  2. RecordMetric 类:这是继承自 Metric 类的一个子类,用于处理不考虑不同角色之间的顺序的任务(例如事件论元抽取)。它包括了一个额外的 is_equal 方法,用于判断金标和预测是否相等。
  3. OrderedRecordMetric 类:这是继承自 RecordMetric 类的子类,用于处理考虑不同角色之间的顺序的任务(例如关系抽取)。它重写了 is_equal 方法,以考虑不同角色之间的顺序。
  4. Scorer 类:这是一个基本的评估类,定义了三个静态方法 load_gold_listload_pred_listeval_instance_list,分别用于加载金标样本、预测样本,以及计算评估指标。子类可以继承这个类并实现这些方法来适应不同的任务。
  5. EntityScorerRelationScorerEventScorer 类:这些类分别用于实体识别、关系抽取和事件抽取任务。它们通过继承 Scorer 类,实现了加载金标样本和预测样本的方法,以及计算相应任务的评估指标的方法。

这段代码的目的是提供一个通用的评估框架,使用户能够方便地计算不同任务的评估指标,例如 F1 分数。根据任务的特点,可以选择适当的评估类来加载数据并计算指标。

make_prompt.py:生成提示文本(prompt)用于模型的训练

import pandas as pd
import os
import shutil
import random
import argparse
from collections import defaultdict
import json
import sys
from src.prompt.constants import END
from src.utils.record_schema import RecordSchema
def make_prompt(file_path: str, out_file, n_examples, seed: int = 0):
    random.seed(seed)
    data = [json.loads(line.strip()) for line in open(file_path)]
    if n_examples != -1:
        samples = random.sample(data, n_examples)
    else:
        samples = data
    random.shuffle(samples)
    prompt = ""
    for sample in samples:
        prompt += sample["reference_output"]
        prompt += f"{END}\n\n"
    with open(out_file,'w',encoding='utf-8') as fout:
        fout.write(prompt)
    print ("saved prompt to ", out_file)
    return 0
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-src', help='Source File Name', required=True)
    parser.add_argument('-tgt', help='Target File Name, n shot sampled',
                        required=True)
    parser.add_argument('-schema_file', help='schema_file', required=True)
    parser.add_argument('-task', help='N-Shot Task name', required=True,
                        choices=['entity', 'relation', 'event'])
    parser.add_argument('-n_examples', help='n_examples',type=int)
    parser.add_argument('-seed', help='Default is None, no random')
    parser.add_argument('-min_len', dest='min_len', help='Default is None', type=int)
    options = parser.parse_args()
    source_file = options.src
    target_file = options.tgt
    make_prompt(file_path=source_file, out_file=target_file, n_examples=options.n_examples)
if __name__ == "__main__":
    main()

这段代码的主要功能是生成提示文本(prompt)用于模型的训练。以下是代码的主要步骤和功能:

  1. 代码通过命令行参数接受输入文件的路径、输出文件的路径、生成的提示文本的样本数(n_examples)、种子值(seed)、以及其他必要参数。
  2. 从输入文件中读取数据,数据以JSON格式存储。可以选择从中随机采样一定数量的样本,也可以使用所有的样本。
  3. 随机打乱数据的顺序。
  4. 创建一个空的提示文本字符串。
  5. 对每个样本,将样本的reference_output字段添加到提示文本字符串中,同时在每个样本之间添加特定标记(END)以分隔不同的样本。
  6. 将生成的提示文本字符串写入输出文件中。
  7. 输出提示文本的保存路径,以便用户查看。

这段代码通常用于生成用于训练NLP模型的提示文本,可以从数据中随机选择一些样本,并按一定格式组织成提示文本,以供后续模型训练使用。

src\prompt\make_task_file.py:将给定的数据转换成适用于模型训练的任务文件

import pandas as pd
from tqdm import tqdm
from src.converters.get_converter import ConverterFactory
from src.utils.file_utils import load_yaml, load_schema, read_data
def make_task_file(args):
    data = read_data(args.inpath)
    converter = ConverterFactory.get_converter(args.job_type,schema_folder=args.schema_path, map_config_path=args.map_config_path)
    res = []
    for i, row in tqdm(data.iterrows(), total=len(data)):
        try:
            struct_input = converter.structure_to_input(row, prompt_part_only=False)
            if struct_input is None:
                continue
            tmp = {k: v for (k, v) in row.items() if k not in ['record']}
            tmp["input_idx"] = i
            tmp["input_prompt"] = converter.structure_to_input(row, prompt_part_only=True)
            tmp["reference_output"] = struct_input
        except Exception as e:
            raise e
        res.append(tmp)
    # successfully converted
    conversion_rate = len(res) / len(data)
    pd.DataFrame(res).to_json(args.outpath, orient='records', lines=True)
    print(f"Converted {len(res)} out of {len(data)} rows ({conversion_rate:.2%})")
    print ("Saved to ", args.outpath)
if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--inpath", type=str, required=True)
    parser.add_argument("--outpath", type=str, required=True)
    parser.add_argument("--job_type", type=str, required=True)
    parser.add_argument("--schema_path", type=str, required=True)
    parser.add_argument("--map_config_path", type=str, required=True)
    args = parser.parse_args()
    make_task_file(args)

这段代码的主要功能是将给定的数据转换成适用于模型训练的任务文件。以下是代码的主要步骤和功能:

  1. 代码通过命令行参数接受输入文件的路径、输出文件的路径、任务类型(job_type)、模式配置文件的路径(schema_path)、映射配置文件的路径(map_config_path),以及其他必要参数。
  2. 从输入文件中读取数据,数据通常以DataFrame格式存储。
  3. 利用ConverterFactory从给定的任务类型和配置文件加载适当的转换器(例如,NER或RE转换器)。
  4. 对于数据中的每一行,调用转换器的structure_to_input方法将原始数据转换为适用于模型训练的输入。此外,还提取一些其他字段,如索引、输入提示等,用于创建任务文件的元信息。
  5. 如果数据的转换成功,将转换后的数据添加到结果列表中。
  6. 最后,计算成功转换的数据比例,并将结果保存为JSON格式的任务文件,以供后续模型训练使用。

总的来说,这段代码用于将原始数据转换为可用于不同NLP任务(如NER、RE等)的任务文件格式,以便模型能够理解和学习这些任务。生成的任务文件包含了输入、输出和其他元信息,以便于后续的训练和评估。

utils

file_utils.py:加载数据和配置信息,以便进行任务的转换和生成

import yaml
import pandas as pd
def load_schema(schema_path):
    with open(schema_path,encoding='utf8') as fin:
        entity_line = fin.readline().strip()
        relation_line = fin.readline().strip()
        spot_asoc_line = fin.readline().strip()
    return {'entity_schema': eval(entity_line),
            'relation_schema': eval(relation_line),
            'spot_asoc_schema': eval(spot_asoc_line)}
def load_yaml(yaml_path):
    with open(yaml_path,'r') as fin:
        map_config = yaml.load(fin.read(), Loader=yaml.FullLoader)
    return map_config
def read_data(inpath):
    if "json" in inpath:
        data = pd.read_json(inpath, orient='records', lines=True)
    else:
        raise ValueError(f"Unknown input format: {inpath}")
    return data

这段代码是一组用于加载数据和配置文件的辅助函数,包括:

  1. load_schema(schema_path) 函数用于加载给定路径的模式(schema)文件。该文件通常包含了实体、关系和关联标签的信息。函数读取文件的前三行,分别包含了实体模式、关系模式和关联标签模式的定义,然后返回这些模式的字典。
  2. load_yaml(yaml_path) 函数用于加载给定路径的YAML格式配置文件。它打开指定路径的文件,使用PyYAML库加载文件内容,并返回加载后的配置字典。
  3. read_data(inpath) 函数用于加载数据文件,支持JSON格式的数据文件。它检查文件类型,如果文件类型为JSON,它使用Pandas库的pd.read_json函数加载文件内容为DataFrame,并以行的方式解释为记录。函数返回加载后的数据作为DataFrame。

这些函数用于在主要代码中加载数据和配置信息,以便进行任务的转换和生成。例如,load_schema函数加载了任务所需的模式信息,而load_yaml函数加载了映射配置文件。 read_data 函数用于加载任务的输入数据。这些辅助函数有助于使主要代码更模块化和易于维护。

record_schema.py:管理任务的记录模式信息,以便在任务处理过程中进行合并和存储

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import json
from collections import defaultdict
from typing import List
class RecordSchema:
    def __init__(self, type_list, role_list, type_role_dict):
        self.type_list = type_list
        self.role_list = role_list
        self.type_role_dict = type_role_dict
    @staticmethod
    def read_from_file(filename):
        lines = open(filename).readlines()
        type_list = json.loads(lines[0])
        role_list = json.loads(lines[1])
        type_role_dict = json.loads(lines[2])
        return RecordSchema(type_list, role_list, type_role_dict)
    def write_to_file(self, filename):
        with open(filename, 'w') as output:
            output.write(json.dumps(self.type_list, ensure_ascii=False) + '\n')
            output.write(json.dumps(self.role_list, ensure_ascii=False) + '\n')
            output.write(json.dumps(self.type_role_dict, ensure_ascii=False) + '\n')
def merge_schema(schema_list: List[RecordSchema]):
    type_set = set()
    role_set = set()
    type_role_dict = defaultdict(list)
    for schema in schema_list:
        for type_name in schema.type_list:
            type_set.add(type_name)
        for role_name in schema.role_list:
            role_set.add(role_name)
        for type_name in schema.type_role_dict:
            type_role_dict[type_name] += schema.type_role_dict[type_name]
    for type_name in type_role_dict:
        type_role_dict[type_name] = list(set(type_role_dict[type_name]))
    return RecordSchema(type_list=list(type_set),
                        role_list=list(role_set),
                        type_role_dict=type_role_dict,
                        )

这段代码定义了一个名为 RecordSchema 的类,该类用于管理任务的记录模式(schema)。记录模式通常包括实体类型(type)、关系角色(role)和类型-角色字典(type_role_dict)。

  • __init__ 方法初始化 RecordSchema 类的实例,需要传入类型列表(type_list)、角色列表(role_list)和类型-角色字典(type_role_dict)。
  • read_from_file 方法从文件中读取记录模式的定义。它读取文件的前三行,分别包含类型列表、角色列表和类型-角色字典的定义,并使用这些信息创建 RecordSchema 的实例。
  • write_to_file 方法将记录模式的定义写入文件。它将类型列表、角色列表和类型-角色字典的信息写入文件的三行中。

此外,代码还定义了一个名为 merge_schema 的函数,用于合并多个记录模式。它接受一个记录模式列表 schema_list,并合并这些记录模式的类型、角色和类型-角色字典信息,最终返回一个合并后的 RecordSchema 实例。

这些工具函数和类用于管理任务的记录模式信息,以便在任务处理过程中进行合并和存储。

目录
相关文章
|
5月前
|
JSON JavaScript API
「AIGC」NodeJs使用openai流式请求与非流式请求
本文档是关于使用Node.js与OpenAI API交互的教程,涵盖了非流式和流式请求。非流式请求示例展示了如何一次性返回所有数据,适用于兼容性但可能需要较长时间。流式请求则演示了如何即时响应数据,提高交互体验,但可能不适用于所有系统。代码示例使用了axios库和http模块,展示了如何处理数据流。
325 0
|
23天前
|
JSON API 数据安全/隐私保护
拍立淘按图搜索API接口返回数据的JSON格式示例
拍立淘按图搜索API接口允许用户通过上传图片来搜索相似的商品,该接口返回的通常是一个JSON格式的响应,其中包含了与上传图片相似的商品信息。以下是一个基于淘宝平台的拍立淘按图搜索API接口返回数据的JSON格式示例,同时提供对其关键字段的解释
|
3月前
|
API PHP
ThinkPHP 通用的API格式封装
本文介绍了在ThinkPHP框架中如何统一封装API返回格式的方法,包括创建状态码枚举类、编写统一格式化函数以及在BaseController和Error控制器中重写`__call`方法来处理不存在的方法或控制器调用,以实现统一的错误处理和返回格式。
ThinkPHP 通用的API格式封装
|
3月前
|
人工智能 安全 测试技术
忘掉GPT-5!OpenAI推出全新AI模型系列o1,声称性能达到博士级
忘掉GPT-5!OpenAI推出全新AI模型系列o1,声称性能达到博士级
|
2月前
|
JSON API 数据格式
商品详情数据JSON格式示例参考(api接口)
JSON数据格式的商品详情数据通常包含商品的多个层级信息,以下是一个综合多个来源信息的JSON数据格式的商品详情数据示例参考:
|
3月前
|
设计模式 Java API
Quarkus RESTful API性能揭秘:如何打造极速响应的应用程序?
在互联网高速发展的背景下,企业对应用性能的要求日益提升。Quarkus作为一款基于Java的开源框架,以出色的性能和简洁的设计模式成为开发者的首选。本文通过实例展示如何利用Quarkus构建响应迅速的RESTful API应用。首先创建Maven项目并配置Quarkus依赖,接着逐步实现用户管理系统的各个模块,包括实体类、数据访问层、服务层及资源层,最终完成一个高性能的RESTful API。通过Quarkus,开发者可以更高效地开发出轻量级且响应快速的应用程序。
60 1
|
4月前
|
JSON 前端开发 API
构建前端防腐策略问题之更新getMemoryUsagePercent函数以适应新的API返回格式的问题如何解决
构建前端防腐策略问题之更新getMemoryUsagePercent函数以适应新的API返回格式的问题如何解决
构建前端防腐策略问题之更新getMemoryUsagePercent函数以适应新的API返回格式的问题如何解决
|
3月前
|
安全 Java API
【性能与安全的双重飞跃】JDK 22外部函数与内存API:JNI的继任者,引领Java新潮流!
【9月更文挑战第7天】JDK 22外部函数与内存API的发布,标志着Java在性能与安全性方面实现了双重飞跃。作为JNI的继任者,这一新特性不仅简化了Java与本地代码的交互过程,还提升了程序的性能和安全性。我们有理由相信,在外部函数与内存API的引领下,Java将开启一个全新的编程时代,为开发者们带来更加高效、更加安全的编程体验。让我们共同期待Java在未来的辉煌成就!
71 11
|
3月前
|
安全 Java API
【本地与Java无缝对接】JDK 22外部函数和内存API:JNI终结者,性能与安全双提升!
【9月更文挑战第6天】JDK 22的外部函数和内存API无疑是Java编程语言发展史上的一个重要里程碑。它不仅解决了JNI的诸多局限和挑战,还为Java与本地代码的互操作提供了更加高效、安全和简洁的解决方案。随着FFM API的逐渐成熟和完善,我们有理由相信,Java将在更多领域展现出其强大的生命力和竞争力。让我们共同期待Java编程新纪元的到来!
105 11
|
4月前
|
程序员 数据库连接 API
分享一个解决 EF 性能低的思路,通过 Python 访问心跳侦测 API 保持 EF 在线
分享一个解决 EF 性能低的思路,通过 Python 访问心跳侦测 API 保持 EF 在线