写在最前面
论文 STIOCS: Active learning-based semi-supervised training framework for IOC extraction
代码解读:https://github.com/MuscleFish/SeqMask.
【论文代码】① DataDeal.py
DataPreprocess.py
clean_text(text)方法
clean_text方法是用于清理和预处理文本数据的。它接受一个参数text,并执行以下操作:
- 将text转换为字符串,并将所有字符转换为小写。
- 使用正则表达式替换文本中的各种缩写和常见短语,例如将"what’s"替换为"what is",将"can’t"替换为"can not"等。
- 使用正则表达式识别并替换文本中的特定模式,例如IP地址、电子邮件地址、MD5哈希值、SHA1哈希值、SHA250哈希值、URL、文件路径等。
- 去除十六进制数。
- 替换一些特定的缩写,例如将"no.\d+“替换为"number”,将"c2"替换为"C2"等。
- 去除单个的小写字母和两位数的十六进制数。
- 去除所有的数字。
- 去除所有的非字母、非数字和非撇号(')的字符。
- 去除多余的空格,并去除文本前后的空格。
最后,这个方法返回清理后的文本。
# 数据清理 def clean_text(text): text = str(text) text = text.lower() text = re.sub(r"what's", "what is ", text) text = re.sub(r"\'s", " ", text) text = re.sub(r"\'ve", " have ", text) text = re.sub(r"can't", "can not ", text) text = re.sub(r"n't", " not ", text) text = re.sub(r"i'm", " i am ", text) text = re.sub(r"\'re", " are ", text) text = re.sub(r"\'d", " would ", text) text = re.sub(r"\'ll", " will ", text) text = re.sub(r"\'scuse", " excuse ", text) text = re.sub('(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.)\{3\}(?:25[0-5] |2[0-4][0-9]|[01]?[0-9][0-9]?)(/([0-2][0-9]|3[0-2]|[0-9]))?', 'IPv4', text) text = re.sub('(CVE\-[0-9]{4}\-[0-9]{4,6})', ' CVE ', text) text = re.sub('([a-z][_a-z0-9-.]+@[a-z0-9-]+\.[a-z]+)', ' email ', text) text = re.sub('(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', ' IP ', text) text = re.sub('([a-f0-9]{32}|[A-F0-9]{32})', ' MD5 ', text) text = re.sub('((HKLM|HKCU)\\[\\A-Za-z0-9-_]+)', ' registry ', text) text = re.sub('([a-f0-9]{40}|[A-F0-9]{40})', ' SHA1 ', text) text = re.sub('([a-f0-9]{64}|[A-F0-9]{64})', ' SHA250 ', text) text = re.sub('http(s)?:\\[0-9a-zA-Z_\.\-\\]+.', ' URL ', text) text = re.sub('cve-[0-9]{4}-[0-9]{4,6}', ' vulnerability ', text) text = re.sub('[a-zA-Z]{1}:\\[0-9a-zA-Z_\.\-\\]+', ' file ', text) text = re.sub(r'\b[a-fA-F\d]{32}\b|\b[a-fA-F\d]{40}\b|\b[a-fA-F\d]{64}', ' hash ', text) # 去除十六进制 text = re.sub('\\bx[A-Fa-f0-9]{2}', ' ', text) # 缩写 text = re.sub('\\bno.\d+\\b',' number ',text) # y text = re.sub('\\bc2\\b','C2',text) text = re.sub('\\b[b-z]{1}\\b',' ',text) text = re.sub('\\b[a-f0-9]{2}\\b', ' ', text) # text = re.sub(r'\b\d+\b',' ',text) text = re.sub('\\n', ' ', text) # text = re.sub(r'(\.)\1', ' ', text) text = re.sub('\d+',' ',text) text = re.sub('\'s',' ',text) # 去除稀奇古怪的符号 text = re.sub('[^\w\']', ' ', text) # text = re.sub('\W', ' ', text) text = re.sub("\s+"," ",text) text = text.strip(' ') return text
cut_sentences方法
cut_sentences方法是用于处理和清理文本数据的。它接受一个参数doc,并执行以下操作:
- 使用正则表达式替换doc中的特定模式,例如将"no.\d+“替换为” number "。
- 去除多余的空格,并去除doc前后的空格。
- 使用sent_tokenize函数将doc分割为句子列表,然后使用clean_text函数清理每个句子。
- 创建一个空列表result,用于存储处理后的句子。
- 遍历清理后的句子列表。对于每个句子,它首先使用word_tokenize函数将其分割为单词列表。然后,如果单词列表的长度小于5或大于128,那么它将跳过这个句子。否则,它将使用WordNetLemmatizer对单词列表中的每个单词进行词性还原,然后将还原后的单词列表连接为一个字符串,并将其添加到result列表中。
最后,这个方法返回result列表,它包含了处理和清理后的句子。
def cut_sentences(doc): doc = re.sub('\\bno.\d+\\b',' number ',doc) doc = re.sub("\s+"," ",doc) doc = doc.strip(' ') sent_tokenize_list = sent_tokenize(doc) sent_tokenize_list = [clean_text(w) for w in sent_tokenize_list] result = [] for sen in sent_tokenize_list: word_list = word_tokenize(sen) # 删除单词数小于5与大于128的句子 if len(word_list) < 5 or len(word_list) > 128: continue # 词性还原 from nltk.stem import WordNetLemmatizer lemmatizer = WordNetLemmatizer() _sen = [] for word in word_list: word = lemmatizer.lemmatize(word,pos='v') word = lemmatizer.lemmatize(word,pos='n') _sen.append(word) _sen = ' '.join(_sen) result.append(_sen) return result
read_csv方法
read_csv方法是DataIo类的一部分,用于读取CSV文件并对数据进行预处理。
这个方法接受三个参数:path(CSV文件的路径)、headers(要读取的列的名称)和encoding(文件的编码,默认为’UTF-8’)。
首先,它使用pandas的read_csv函数读取CSV文件,并选择headers参数指定的列。
然后,它创建一个空列表result,用于存储处理后的数据。
接着,它获取数据的形状(行数和列数)。如果数据有两个维度,那么它将列数设置为形状的第二个元素。否则,列数默认为1。
然后,它打印一个进度条,用于显示数据处理的进度。进度条的长度等于行数乘以列数除以10,再加1。
接着,它遍历数据的每一列和每一行。对于每个元素,它使用cut_sentences函数处理这个元素,并将结果添加到result列表中。每处理一个元素,它就更新一次进度条。
最后,当所有的数据都处理完毕,它打印一个消息表示数据处理完成,并返回result列表。
cut_sentences方法中的sent_tokenize函数是如何工作的?
sent_tokenize是nltk库中的一个函数,用于将文本分割为句子。它基于Punkt句子分割器,这是一个无监督的机器学习的句子分割器。它使用预先训练的模型来识别句子的边界。
sent_tokenize函数的工作原理如下:
- 首先,它将文本分割为可能的句子边界。可能的句子边界通常是句号、问号和感叹号等标点符号。
- 然后,它使用预先训练的模型来确定这些可能的边界是否真的是句子边界。这个模型使用各种信息,例如边界附近的单词、边界前后的字符等。
- 最后,它返回识别出的句子列表。
例如,如果你有一个文本"Hello world. How are you?",sent_tokenize函数将返回一个列表,包含两个元素:"Hello world.“和"How are you?”。
sent_tokenize函数是如何训练的?
sent_tokenize函数使用的是预训练的Punkt句子分割器,这是一个无监督的机器学习算法。它的训练过程如下:
- 首先,Punkt算法需要一个大量的未标记的平原文本数据。这些数据不需要人工标记句子边界,但需要有正确的句子边界标点(如句号、问号和感叹号)。
- 然后,Punkt算法会扫描这些文本数据,学习标点符号后面的词的分布。例如,如果一个词经常出现在句号后面,那么这个词可能是一个句子的开始。
- 同时,Punkt算法也会学习缩写词的分布。因为在英文中,缩写词后面通常会有一个句号,但这个句号并不表示句子的结束。
- 在学习过程中,Punkt算法会使用一些启发式规则,例如,如果一个句号后面跟着的是小写字母,那么这个句号可能不是句子的结束。
- 最后,Punkt算法会使用这些学习到的信息,生成一个模型,这个模型可以用来预测句子的边界。
需要注意的是,虽然sent_tokenize函数使用的是预训练的Punkt模型,但你也可以使用你自己的文本数据,训练一个新的Punkt模型。
# 读取数据,并处理数据 class DataIo: def __init__(self): pass def read_list(self, sentences): return [cut_sentences(s) for s in sentences] # 读取数据后,对数据进行清理、分句、词性变换,返回文章*句子的二维列表 def read_csv(self,path,headers,encoding='UTF-8'): data = pd.read_csv(path,encoding=encoding)[headers] result = [] shape = data.shape row = shape[0] col = 1 if len(shape) == 2: col = shape[1] print('[',end='') bar = int(row * col / 10) + 1 count = 0 for j in range(col): for i in range(row): result.append(cut_sentences(data.iloc[i,j])) if count % bar == 0: print('==',end='') count += 1 print('] 数据处理完成') return result
read_json方法
read_json方法是用于读取JSON文件并对数据进行预处理的。
这个方法接受两个参数:path(JSON文件的路径)和encoding(文件的编码,默认为’UTF-8’)。
首先,它打开并读取指定路径的JSON文件,然后使用simplejson.loads函数将读取的内容转换为Python数据结构。
然后,它创建一个空列表result,用于存储处理后的数据。
接着,它打印一个进度条,用于显示数据处理的进度。进度条的长度等于数据的长度除以10,再加1。
然后,它遍历数据的每一个元素。对于每个元素,它尝试获取其’articles’字段,然后遍历这个字段的每一个元素。对于每个元素,它尝试获取其’content’字段,然后使用cut_sentences函数处理这个字段的内容,并将结果添加到result列表中。如果在这个过程中发生任何异常,它将忽略这个异常并继续处理下一个元素。每处理一个元素,它就更新一次进度条。
最后,当所有的数据都处理完毕,它打印一个消息表示数据处理完成,并返回result列表。
def read_json(self,path,encoding='UTF-8'): result = [] with open(path,'r',encoding=encoding)as fp: data = simplejson.loads(fp.read()) print('[',end='') bar = int(len(data) / 10) + 1 count = 0 for val in data: try: articles = val['articles'] for art in articles: try: content = art['content'] result.append(cut_sentences(content)) except BaseException: continue except BaseException: continue if count % bar == 0: print('==',end='') count += 1 print('] 数据处理完成') return result