传统的垃圾邮件过滤方法通常基于规则、关键词匹配和黑名单,但这些方法往往无法应对不断变化的垃圾邮件形式和策略。基于深度学习的垃圾邮件过滤系统可以通过学习邮件的语义和上下文特征,具备较高的适应性和准确性,能够不断适应新的垃圾邮件形式和策略。
深度学习在垃圾邮件分类的潜力主要表现在以下几个方面:
- 特征学习:深度学习模型,尤其是卷积神经网络(CNN)和循环神经网络(RNN),能够从原始数据中自动学习复杂的特征。这意味着它们可以识别垃圾邮件中的模式,如特定的单词、短语或语言结构,而无需人工设计特征。
- 处理复杂性:垃圾邮件往往包含复杂的结构和语义信息,深度学习模型能够处理这种复杂性,捕捉到传统机器学习方法可能忽略的细微差别。
- 泛化能力:深度学习模型通常具有良好的泛化能力,能够在未见过的数据上做出准确的预测,这对于不断变化的垃圾邮件模式尤为重要。
- 适应性:随着时间的推移,垃圾邮件发送者会采用新的策略来规避过滤系统。深度学习模型可以通过继续学习新的数据来适应这些变化。
- 大规模数据处理能力:深度学习模型可以处理大规模数据集,这对于训练强大的垃圾邮件过滤模型是必要的。
卷积神经网络(Convolutional Neural Networks, CNN)
卷积神经网络是一类包含卷积计算且具有深度结构的前馈神经网络(Feedforward Neural Networks),是深度学习(deep learning)的代表算法之一 。
卷积神经网络应用的领域:
- 图像分类:CNN是处理图像数据的主流方法,用于将图像分类到特定的类别。
- 目标检测:在图像中识别和定位多个目标,例如,识别图像中的所有猫并围绕它们绘制边界框。
- 文本分类:尽管文本数据不像图像那样具有网格状结构,但CNN也可以用于文本数据的分类任务,如情感分析、主题分类等。
自然语言处理(NLP):在NLP中,CNN可以用于句子或文档级别的分类任务,以及某些类型的语言模型。
在文本分类任务中,CNN通常由输入层、卷积层、池化层、全连接层和Softmax层构成。相比传统神经网络,CNN具有强大的局部特征提取能力,并且参数共享的特点大大减少了网络训练参数,提高了模型效率。
在文本分类中,传统机器学习方法容易丢失原始文本序列中的顺序信息。而使用卷积神经网络时,它可以将文本转化为词向量,并在扩展到相同维度后形成词向量矩阵。然后,利用卷积窗口对向量矩阵中的特征进行提取,从而保留原始文本的局部特征和顺序信息。
CNN的总体架构总共有四个部分:输入层,卷积层,池化层,全连接层 。
- 输入层是CNN的第一层,负责接收原始输入数据。在图像处理任务中,输入层通常接收图像的原始像素值,形成三维数据(高度、宽度和颜色通道数)。
- 卷积层是CNN的核心,用于从输入数据中提取局部特征。通过在输入数据上滑动一系列可学习的卷积核(或滤波器),卷积层能够捕捉到图像中的局部模式,如线条、角点等。
卷积层通常包含多个不同的卷积核,每个卷积核负责提取不同的特征,输出结果称为特征映射(Feature Maps)。 - 池化层(Pooling Layer),通常跟在卷积层之后,用于降低特征的空间维度,减少参数数量和计算量,同时提高网络对输入变化的不变性。最常见的池化操作是最大池化(Max Pooling),它在输入数据的局部区域内选择最大值作为输出,有助于提取重要的特征并抑制不相关的变化。
- 全连接层(Fully Connected Layer, FC)通常位于CNN的末端,用于将前面层提取的特征映射到最终的输出结果,如分类任务中的类别概率。在全连接层中,每个输入神经元都与每个输出神经元相连,这允许网络学习特征之间的复杂关系。在全连接层之前,通常需要将卷积层和池化层的输出展平(Flatten)为一维向量,以便进行全连接的计算。
基于CNN的代码实现
①:数据读取、分析、预处理
import pickle
import numpy as np
with open('../data/mailContent_list_1000.pickLe', 'rb') as file:
content_list = pickle.load(file)
with open('../data/mailLabel_list_1800.pickle', 'rb') as file:
label_list = pickle.load(file)
def statistics():
# 查看邮件最大长度和平均长度
length = np.array([len(tmp) for tmp in content_list])
print(np.max(length)) # 33714
print(np.mean(length)) # 752
print(np.median(length)) # 333
# 统计汉字个数
print(len(set(''.join(content_list)))) # 9777
class TextData():
def init__(self, *args):
content_list = args[8]
label_list = args[1]
train_X, test_X, train_y, test_y = train_test_split(content_list, label_list, randon_state=1234)
self.train_content_list = train_X
self.train_label_list = train_y
self.test_content_list = test_X
self.test_label_list = test_y
self.content_list = self.train_content_list
self.test_content_list
self.num_classes = np.unique(self.train_label_list + self.test_label_list).shape[8]
self.enbedding_din = 64 # 词向维度
self.seq_length = 180
def __prepare_data(self):
# 获取词汇表,更新词汇大小和内容长度
counter = Counter(''.join(self.content_list))
vocabulary_list = ['PAD'] + [k[8] for k in counter.most_common()]
# 出现次数倒序排列
self.vocab_size = len(vocabulary_list)
# 构建词-id的映射,将label转为0和1
self.mord2id_dict = dict([(b, a) for a, b in enumerate(vocabulary_list)])
self.label_encoder = LabelEncoder()
self.label_encoder.fit(self.train_label_list) # ham和spam转为0和1
self.labels = self.label_encoder.classes_
# 给每封邮件添加padding(在前)
def __content2x(self, content_list):
idlist_list = [[self.mord2id_dict[word] for word in content if word in self.word2id_dict] for content in
content_list]
X = kr.preprocessing.sequence.pad_sequences(idlist_list, self.seq_length)
return X
②:使用cnn模型进行训练与测试
class CNN(nn.Module):
def _init_(self, embedding_dim, vocab_size, num_filters, kernel_size, hidden_dim, dropout_keep_prob, num_classes):
super(CNN, self).__init__()
self.embeds = nn.Embedding(vocab_size, embedding_dim) # 随机词向量
self.conv = nn.Convld(in_channels=embedding_dim,
out_channels=num_filters, # 通道数即过滤器个数
kernel_size=kernel_size)
self.maxpool = nn.AdaptiveMaxPoolld(1)
self.dense = nn.Linear(num_filters, hidden_dim)
self.dropout = nn.Dropout(dropout_keep_prob)
self.relu = nn.ReLU()
self.dense2 = nn.Linear(hidden_dim, num_classes)
def forward(self, x):
x = self.embeds(x) # train_size*seq_length*embedding_dim
x = x.permute(0, 2, 1) # train_size*embedding_dim*seq_length
x = self.conv(x) # train_size*num_filters*conv(seq_length)
x = self.maxpool(x)[:, :, 0] # train_size*num_filters
x = self.dense(x) # train_size*hidden_dim
x = self.dropout(x)
x = self.relu(x)
x = self.dense2(x) # train_size*num_classes
return x
def cnn_train(self):
max_accuracy = 0
for i in range(self.config['num_iteration']):
selected_index = random.sample(list(range(len(self.train_X))), k=self.config['batch_size'])
batch_X = self.train_X[selected_index].to(self.device)
batch_Y = self.train_Y[selected_index].to(self.device)
outputs = self.cnn(batch_X)
self.optimizer.zero_grad()
loss = self.criterion(outputs, batch_Y)
loss.backward() # 反向传播计算梯度
self.optimizer.step() # 应用计算出的梯度并更新模型权重
step = i + 1
if step % self.config['print_per_batch'] == 0 or step == 1:
loss, accuracy = self.evaluate(self.criterion, outputs, batch_Y)
print('step: %d loss: %.4f accuracy: %.4f'(step, loss, accuracy))
if accuracy > max_accuracy:
max_accuracy = accuracy
torch.save(self.cnn.state_dict(), '../model/best_cnn. pkl')
def cnn_test(self):
with torch.no_grad():
self.cnn.load_state_dict(torch.Load('../model/best_cnn.pkl'))
outputs = self.cnn(self.test_X.to(self.device))
test_Y = self.test_Y.to(self.device)
loss, accuracy = self.evaluate(self.criterion, outputs, test_Y)
print('test loss: %.4f accuracy: %.4f' % (loss, accuracy))
outputs = torch.argmax(outputs, 1)
self.print_report_table(outputs, test_Y, self.config['labels'])
self.print_confusion_matrix(outputs, test_Y, self.config['labels'])
self.plot_roc_curve(outputs, test_Y)
结果与分析
模型评估的【F1】结果截图