需要源码和数据集请点赞关注收藏后评论区留言~~~
一、文本情感分析简介
文本情感分析是指利用自然语言处理和文本挖掘技术,对带有情感色彩的主观性文本进行分析,处理和抽取的过程。
接下来主要实现情感分类,情感分类又称为情感倾向性分析,是指对给定的文本,识别其中主观性文本的倾向是肯定的还是否定的,或者说是正面的还是负面的,这是情感分析领域研究最多的内容。通常,网络中存在大量的主观性文本和客观性文本,客观性文本是对事务的客观性描述,不带有感情色彩和情感倾向。情感分类的对象是带有情感倾向的主观性文本,因此情感分类首先要进行文本的主客观雷芬,以情感词识别为主,利用不同的文本特征表示方法和分类器进行识别研究,对网络文本事先主客观分类,能够提高情感分类的速度和准确度
二、数据集简介
本篇博客使用的数据集是IMDB的数据,IMDB数据集包含来自互联网的50000条严重两级分化的评论,该数据被分为用于训练的25000条评论和用于测试的25000条评论
里面给出了数据对应网址
三、数据预处理
因为这个数据集非常小,所以如果用这个数据集做word embedding有可能过拟合,而且模型没有通用性,所以传入一个已经学好的word embedding,用的是glove的6B 100维的预训练数据
如下图
四、算法模型
1:循环神经网络(RNN)
它是一类专门处理时序数据样本的神经网络,它的每一层不仅输出给下一层,而同时还输出一个隐状态,供当前层在处理下一个样本时使用
网络结构图如下
2:长短期记忆神经网络(LSTM)
然而 RNN在处理长期依赖时会遇到巨大的困难,因此计算距离较远的节点之间的联系会设计雅可比矩阵的多次相乘,这会带来梯度消失或者梯度爆炸。LSTM可以有效的解决这个问题 它的主要思想是
门控单元以及线性连接的引入
门控单元:有选择性的保存和输出历史信息
线性连接:LSTM可以更好的捕捉时序数据中间间隔较大的依赖关系
工作图如下
五、模型训练
下面使用基于PyTorch的LSTM模型 效果如下
建议使用GPU或者cuda 单纯用cpu训练时间比较长~~~
训练结果图如下
测试集上的损失与精确度的变化
由下图可见 当训练到4-5次左右时模型已经逐渐收敛 不必训练过多次
训练集上的损失与精确度变化
六、代码
部分源码如下
# coding: utf-8 # In[1]: import torch.autograd as autograd import torchtext.vocab as torchvocab from torch.autograd import Variable import tqdm import os import time import re import pandas as pd import string import gensim import time import random import snowballstemmer import collections from collections import Counter from nltk.corpus import stopwords from itertools import chain from sklearn.metrics import accuracy_score from gensim.test.utils import datapath, get_tmpfile from gensim.models import KeyedVectors # In[2]: def clean_text(text): ## Remove puncuation text = text.translate(string.punctuation) ## Convert words to lower case and split them text = text.lower().split() # Remove stop words stops = set(stopwords.words("english")) text = [w for w in text if not w in stops and len(w) >= 3] text = " ".join(text) ## Clean the text text = re.sub(r"[^A-Za-z0-9^,!.\/'+-=]", " ", text) text = re.sub(r"what's", "what is ", text) text = re.sub(r"\'s", " ", text) text = re.sub(r"\'ve", " have ", text) text = re.sub(r"n't", " not ", text) text = re.sub(r"i'm", "i am ", text) text = re.sub(r"\'re", " are ", text) text = re.sub(r"\'d", " would ", text) text = re.sub(r"\'ll", " will ", text) text = re.sub(r",", " ", text) text = re.sub(r"\.", " ", text) text = re.sub(r"!", " ! ", text) text = re.sub(r"\/", " ", text) text = re.sub(r"\^", " ^ ", text) text = re.sub(r"\+", " + ", text) text = re.sub(r"\-", " - ", text) text = re.sub(r"\=", " = ", text) text = re.sub(r"'", " ", text) text = re.sub(r"(\d+)(k)", r"\g<1>000", text) text = re.sub(r":", " : ", text) tan ", text) text = re.sub(r"\0s", "0", text) text = re.sub(r" 9 11 ", "911", text) text = re.sub(r"e - mail", "email", text) text = re.sub(r"j k", "jk", text) text = re.sub(r"\s{2,}", " ", text) ## Stemming text = text.split() stemmer = snowballstemmer.stemmer('english') stemmed_words = [stemmer.stemWord(word) for word in text] text = " ".join(stemmed_words) print(text) return text # In[3]: def readIMDB(path, seg='train'): pos_or_neg = ['pos', 'neg'] data = [] for label in pos_or_neg: files = os.listdir(os.path.join(path, seg, label)) for file in files: with open(os.path.join(path, seg, label, file), 'r', encoding='utf8') as rf: review = rf.read().replace('\n', '') if label == 'pos': data.append([review, 1]) elif label == 'neg': data.append([review, 0]) return data # In[3]: root = r'C:\Users\Admin\Desktop\aclImdb\aclImdb' train_data = readIMDB(root) test_data = readIMDB(root, 'test') # In[4]: def tokenizer(text): return [tok.lower() for tok in text.split(' ')] train_tokenized = [] test_tokenized = [] for review, score in train_data: train_tokenized.append(tokenizer(review)) for review, score in test_data: test_tokenized.append(tokenizer(review)) # In[5]: vocab = set(chain(*train_tokenized)) vocab_size = len(vocab) # In[6]: # 输入文件 glove_file = datapath(r'C:\Users\Admin\Desktop\glove.6B.100d.txt') # 输出文件 tmp_file = get_tmpfile(r'C:\Users\Admin\Desktop\wv.6B.100d.txt') # call glove2word2vec script # default way (through CLI): python -m gensim.scripts.glove2word2vec --input <glove_file> --output <w2v_file> # 开始转换 from gensim.scripts.glove2word2vec import glove2word2vec glove2word2vec(glove_file, tmp_file) # 加载转化后的文件 wvmodel = KeyedVectors.load_word2vec_format(tmp_file) # In[7]: word_to_idx = {word: i + 1 for i, word in enumerate(vocab)} word_to_idx['<unk>'] = 0 idx_to_word = {i + 1: word for i, word in enumerate(vocab)} idx_to_word[0] = '<unk>' # In[8]: def encode_samples(tokenized_samples, vocab): features = [] for sample in tokenized_samples: feature = [] for token in sample: if token in word_to_idx: feature.append(word_to_idx[token]) else: feature.append(0) features.append(feature) return features # In[9]: def pad_samples(features, maxlen=500, PAD=0): padded_features = [] for feature in features: if len(feature) >= maxlen: padded_feature = feature[:maxlen] else: padded_feature = feature while (len(padded_feature) < maxlen): padded_feature.append(PAD) padded_features.append(padded_feature) return padded_features # In[10]: train_features = torch.tensor(pad_samples(encode_samples(train_tokenized, vocab))) train_labels = torch.tensor([score for _, score in train_data]) test_features = torch.tensor(pad_samples(encode_samples(test_tokenized, vocab))) test_labels = torch.tensor([score for _, score in test_data]) # In[13]: class SentimentNet(nn.Module): def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, bidirectional, weight, labels, use_gpu, **kwargs): super(SentimentNet, self).__init__(**kwargs) self.num_hiddens = num_hiddens self.num_layers = num_layers self.use_gpu = use_gpu self.bidirectional = bidirectional self.embedding = nn.Embedding.from_pretrained(weight) self.embedding.weight.requires_grad = False self.encoder = nn.LSTM(input_size=embed_size, hidden_size=self.num_hiddens, num_layers=num_layers, bidirectional=self.bidirectional, dropout=0) if self.bidirectional: self.decoder = nn.Linear(num_hiddens * 4, labels) else: self.decoder = nn.Linear(num_hiddens * 2, labels) def forward(self, inputs): embeddings = self.embedding(inputs) states, hidden = self.encoder(embeddings.permute([1, 0, 2])) encoding = torch.cat([states[0], states[-1]], dim=1) outputs = self.decoder(encoding) return outputs # In[16]: num_epochs = 5 embed_size = 100 num_hiddens = 100 num_layers = 2 bidirectional = True batch_size = 64 labels = 2 lr = 0.8 device = torch.device('cpu') use_gpu = True weight = torch.zeros(vocab_size + 1, embed_size) for i in range(len(wvmodel.index_to_key)): try: index = word_to_idx[wvmodel.index_to_key[i]] except: continue weight[index, :] = torch.from_numpy(wvmodel.get_vector( idx_to_word[word_to_idx[wvmodel.index_to_key[i]]])) # In[17]: net = SentimentNet(vocab_size=(vocab_size + 1), embed_size=embed_size, num_hiddens=num_hiddens, num_layers=num_layers, bidirectional=bidirectional, weight=weight, labels=labels, use_gpu=use_gpu) net.to(device) loss_function = nn.CrossEntropyLoss() optimizer = optim.SGD(net.parameters(), lr=lr) # In[18]: train_set = torch.utils.data.TensorDataset(train_features, train_labels) test_set = torch.utils.data.TensorDataset(test_features, test_labels) train_iter = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True) test_iter = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False) # In[20]: num_epochs = 20 # In[ ]: for epoch in range(num_epochs): start = time.time() train_loss, test_losses = 0, 0 train_acc, test_acc = 0, 0 n, m = 0, 0 for feature, label in train_iter: n += 1 net.zero_grad() feature = Variable(feature.cpu()) label = Variable(label.cpu()) score = net(feature) loss = loss_function(score, label) loss.backward() optimizer.step() train_acc += accuracy_score(torch.argmax(score.cpu().data, dim=1), label.cpu()) train_loss += loss with torch.no_grad(): for test_feature, test_label in test_iter: m += 1 test_feature = test_feature.cpu() net(test_feature) test_loss = loss_function(test_score, test_label) test_acc += accuracy_score(torch.argmax(test_score.cpu().data, dim=1), test_label.cpu()) test_losses += test_loss end = time.time() runtime = end - start epoch: %d, train loss: %.4f, train acc: %.2f, test loss: %.4f, test acc: %.2f, time: %.2f' % (epoch, train_loss.data / n, train_acc / n, test_losses.data / m, test_acc / m, runtime)) # In[ ]:
创作不易 觉得有帮助请点赞关注收藏~~~