自然语言情感分析
众所周知,人类自然语言中包含了丰富的情感色彩:表达人的情绪(如悲伤、快乐)、表达人的心情(如倦怠、忧郁)、表达人的喜好(如喜欢、讨厌)。利用机器自动分析这些情感倾向,不但有助于帮助企业了解消费者对其产品的感受,为产品改进提供依据;同时还有助于企业分析商业伙伴们的态度,一遍更好地进行商业决策。
我们可以将情感分析任务定义为一个分类问题,即指定一个文本输入,机器通过对文本进行分析、处理、归纳和推理后自动输出结论。
常见情感分析任务
- 正向:表示正面积极的情感,如高兴,幸福,惊喜,期待等
- 负向:表示负面消极的情感,如难过,伤心,愤怒,惊恐等
- 其它:表示其它类型的情感
深度神经网络完成情感分析任务
常见的方法是首先将单词转化为向量,然后将句子中的每个单词进行向量化,并使用这个向量表示情感分析任务。
首先把一个句子所有词向量的Embedding及逆行加和平均,将得到的加权embedding向量作为整个句子的向量表示,但是这种方式会面临一些问题
- 变长的句子:自然语言中语句往往是不同长度,然而大部分神经网络接受的输入都是等长张量
- 组合语义:使用加权embedding向量会丧失语义信息,比如不同词的前后顺序影响,例如
我喜欢你
和你喜欢我
,如果采用加权这两个句子的表达向量是一样的,但是显然这两个句子语义大不相同
处理变长语句
通过设定max_seq_len
用来控制神经网络最大可以处理文本的长度,如果我们语句长度不够,我们可以使用填充,如果语句过长可以使用截断
- 对于长度超过max_seq_len的句子,我们通常会把这个句子进行截断,以便可以输入到一个张量中。句子截断的过程是有技巧的,有时截取句子的前一部分会比后一部分好,有时则恰好相反。当然也存在其他的截断方式,有兴趣的读者可以翻阅一下相关资料,这里不做赘述。
- 对于句子长度不足max_seq_len的句子,我们一般会使用一个特殊的词语对这个句子进行填充,这个过程称为Padding。假设给定一个句子“我,爱,人工,智能”,max_seq_len=6,那么可能得到两种填充方式:
- 前向填充: “[pad],[pad],我,爱,人工,智能”
- 后向填充:“我,爱,人工,智能,[pad],[pad]”
循环神经网络RNN
RNN网络是常见的用户面向序列的模型,可以对自然语言句子或是其它时序信号进行建模,RNN网络结构如图所示:
它是把整个语句向量进行切分成不同的词向量,每个词向量用一个向量进行表示,首先图中的我会输入到网络进行记忆,然后将输出传出到下个cell中和下个时间片的词向量爱共同建模,直到最后一个cell,然后输出向量。
长短时记忆网络(LSTM)
RNN可以对时序数据或者自然语言进行处理,但是它有个问题就是网络模型小,且不能保留更多的有效信息,如果我们的语句过长的话,LSTM就可以解决这个问题,这个网络模型可以学习遗忘和记忆,选择性的进行学习,对之前的语句记忆与遗忘。
- 输入门: ,控制有多少输入信号会被融合。
- 遗忘门: ,控制有多少过去的记忆会被融合。
- 输出门: ,控制最终输出多少记忆。
- 单元状态:
LSTM实现情感分析任务
借助长短时记忆网络,我们可以非常轻松地完成情感分析任务。如下图所示。对于每个句子,我们首先通过截断和填充的方式,把这些句子变成固定长度的向量。然后,利用长短时记忆网络,从左到右开始阅读每个句子。在完成阅读之后,我们使用长短时记忆网络的最后一个输出记忆,作为整个句子的语义信息,并直接把这个向量作为输入,送入一个分类层进行分类,从而完成对情感分析问题的神经网络建模。
完整代码
1.加载语料数据
""" * Created with PyCharm * 作者: 阿光 * 日期: 2022/1/13 * 时间: 23:29 * 描述: """ import random import re import tarfile import numpy as np import requests def download(): corpus_url = "https://dataset.bj.bcebos.com/imdb%2FaclImdb_v1.tar.gz" web_request = requests.get(corpus_url) corpus = web_request.content with open("./aclImdb_v1.tar.gz", "wb") as f: f.write(corpus) f.close() # download() def load_imdb(is_training): data_set = [] for label in ["pos", "neg"]: with tarfile.open("./aclImdb_v1.tar.gz") as tarf: path_pattern = "aclImdb/train/" + label + "/.*\.txt$" if is_training \ else "aclImdb/test/" + label + "/.*\.txt$" path_pattern = re.compile(path_pattern) tf = tarf.next() while tf != None: if bool(path_pattern.match(tf.name)): sentence = tarf.extractfile(tf).read().decode() sentence_label = 0 if label == 'neg' else 1 data_set.append((sentence, sentence_label)) tf = tarf.next() return data_set def data_preprocess(corpus): data_set = [] for sentence, sentence_label in corpus: sentence = sentence.strip().lower() sentence = sentence.split(" ") data_set.append((sentence, sentence_label)) return data_set # 构造词典,统计每个词的频率,并根据频率将每个词转换为一个整数id def build_dict(corpus): word_freq_dict = dict() for sentence, _ in corpus: for word in sentence: if word not in word_freq_dict: word_freq_dict[word] = 0 word_freq_dict[word] += 1 word_freq_dict = sorted(word_freq_dict.items(), key=lambda x: x[1], reverse=True) word2id_dict = dict() word2id_freq = dict() word2id_dict['[oov]'] = 0 word2id_freq[0] = 1e10 word2id_dict['[pad]'] = 1 word2id_freq[1] = 1e10 for word, freq in word_freq_dict: word2id_dict[word] = len(word2id_dict) word2id_freq[word2id_dict[word]] = freq return word2id_freq, word2id_dict # 把语料转换为id序列 def convert_corpus_to_id(corpus, word2id_dict): data_set = [] for sentence, sentence_label in corpus: sentence = [word2id_dict[word] if word in word2id_dict \ else word2id_dict['[oov]'] for word in sentence] data_set.append((sentence, sentence_label)) return data_set # 编写一个迭代器,每次调用这个迭代器都会返回一个新的batch,用于训练或者预测 def build_batch(word2id_dict, corpus, batch_size, epoch_num, max_seq_len, shuffle=True): sentence_batch = [] sentence_label_batch = [] for _ in range(epoch_num): if shuffle: random.shuffle(corpus) for sentence, sentence_label in corpus: sentence_sample = sentence[:min(max_seq_len, len(sentence))] if len(sentence_sample) < max_seq_len: for _ in range(max_seq_len - len(sentence_sample)): sentence_sample.append(word2id_dict['[pad]']) sentence_batch.append(sentence_sample) sentence_label_batch.append([sentence_label]) if len(sentence_batch) == batch_size: yield np.array(sentence_batch).astype("int64"), np.array(sentence_label_batch).astype("int64") sentence_batch = [] sentence_label_batch = [] if len(sentence_batch) == batch_size: yield np.array(sentence_batch).astype("int64"), np.array(sentence_label_batch).astype("int64") def get_data(): train_corpus = load_imdb(True) test_corpus = load_imdb(False) train_corpus = data_preprocess(train_corpus) test_corpus = data_preprocess(test_corpus) word2id_freq, word2id_dict = build_dict(train_corpus) vocab_size = len(word2id_freq) train_corpus = convert_corpus_to_id(train_corpus, word2id_dict) test_corpus = convert_corpus_to_id(test_corpus, word2id_dict) train_datasets = build_batch(word2id_dict, train_corpus[:1000], batch_size=64, epoch_num=64, max_seq_len=30) return train_datasets
2.定义LSTM网络模型
""" * Created with PyCharm * 作者: 阿光 * 日期: 2022/1/13 * 时间: 23:45 * 描述: """ import keras from tensorflow import nn from tensorflow.keras.layers import * class Model(keras.Model): def __init__(self): super(Model, self).__init__() self.embedding = Embedding(input_dim=252173, output_dim=256) self.lstm = LSTM(128) self.fc = Dense(2, activation=nn.softmax) def call(self, inputs): x = self.embedding(inputs) x = self.lstm(x) x = self.fc(x) return x
3.训练数据
""" * Created with PyCharm * 作者: 阿光 * 日期: 2022/1/13 * 时间: 23:57 * 描述: """ import tensorflow as tf from keras import Input import lstm from model import Model models = Model() models.build(input_shape=(1, 50)) models.call(Input(shape=50)) models.summary() train_datasets = lstm.get_data() models.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy']) # 权重保存路径 checkpoint_path = "./weight/cp.ckpt" # 回调函数,用户保存权重 save_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, save_best_only=True, save_weights_only=True, monitor='loss', verbose=1) history = models.fit(train_datasets, epochs=5, callbacks=[save_callback])