原理介绍
图片1
图片2
图片3
更多资料:
https://distill.pub/2016/augmented-rnns/#attentional-interfaces
https://www.cnblogs.com/shixiangwan/p/7573589.html#top
http://baijiahao.baidu.com/s?id=1587926245504773589&wfr=spider&for=pc
https://talbaumel.github.io/blog/attention/
论文阅读
Hierarchical Attention Networks for Document Classification
这篇文章主要讲述了基于Attention机制实现文本分类
假设我们有很多新闻文档,这些文档属于三类:军事、体育、娱乐。其中有一个文档D有L个句子si(i代表s是文档D的第i个句子),每个句子包含Ti个词(word),wit代表第i个句子的word,t∈[0,T]
Word Encoder:
①给定一个句子si,例如 The superstar is walking in the street,由下面表示[wi1,wi2,wi3,wi4,wi5,wi6,wi1,wi7],我们使用一个词嵌入矩阵W将单词编码为向量
②使用双向GRU编码整个句子关于单词wit的隐含向量:
那么最终隐含向量为前向隐含向量和后向隐含向量拼接在一起
Word Attention:
给定一句话,并不是这个句子中所有的单词对个句子语义起同等大小的“贡献”,比如上句话“The”,“is”等,这些词没有太大作用,因此我们需要使用attention机制来提炼那些比较重要的单词,通过赋予权重以提高他们的重要性。
①通过一个MLP获取hit的隐含表示:
②通过一个softmax函数获取归一化的权重:
③计算句子向量:
通过每个单词获取的hit与对应权重αit乘积,然后获取获得句子向量
代码实现
attenton.py
import tensorflow as tf def attention(inputs, attention_size, time_major=False, return_alphas=False): if isinstance(inputs, tuple): # In case of Bi-RNN, concatenate the forward and the backward RNN outputs. inputs = tf.concat(inputs, 2) if time_major: # (T,B,D) => (B,T,D) inputs = tf.array_ops.transpose(inputs, [1, 0, 2]) hidden_size = inputs.shape[2].value # D value - hidden size of the RNN layer # Trainable parameters w_omega = tf.Variable(tf.random_normal([hidden_size, attention_size], stddev=0.1)) b_omega = tf.Variable(tf.random_normal([attention_size], stddev=0.1)) u_omega = tf.Variable(tf.random_normal([attention_size], stddev=0.1)) with tf.name_scope('v'): # Applying fully connected layer with non-linear activation to each of the B*T timestamps; # the shape of `v` is (B,T,D)*(D,A)=(B,T,A), where A=attention_size v = tf.tanh(tf.tensordot(inputs, w_omega, axes=1) + b_omega) # For each of the timestamps its vector of size A from `v` is reduced with `u` vector vu = tf.tensordot(v, u_omega, axes=1, name='vu') # (B,T) shape alphas = tf.nn.softmax(vu, name='alphas') # (B,T) shape # Output of (Bi-)RNN is reduced with attention vector; the result has (B,D) shape output = tf.reduce_sum(inputs * tf.expand_dims(alphas, -1), 1) if not return_alphas: return output else: return output, alphas
train.py
from __future__ import print_function, division import numpy as np import tensorflow as tf from keras.datasets import imdb from tensorflow.contrib.rnn import GRUCell from tensorflow.python.ops.rnn import bidirectional_dynamic_rnn as bi_rnn from tqdm import tqdm from attention import attention from utils import get_vocabulary_size, fit_in_vocabulary, zero_pad, batch_generator NUM_WORDS = 10000 INDEX_FROM = 3 SEQUENCE_LENGTH = 250 EMBEDDING_DIM = 100 HIDDEN_SIZE = 150 ATTENTION_SIZE = 50 KEEP_PROB = 0.8 BATCH_SIZE = 256 NUM_EPOCHS = 3 # Model easily overfits without pre-trained words embeddings, that's why train for a few epochs DELTA = 0.5 MODEL_PATH = './model' # Load the data set (X_train, y_train), (X_test, y_test) = imdb.load_data(num_words=NUM_WORDS, index_from=INDEX_FROM) # Sequences pre-processing vocabulary_size = get_vocabulary_size(X_train) X_test = fit_in_vocabulary(X_test, vocabulary_size) X_train = zero_pad(X_train, SEQUENCE_LENGTH) X_test = zero_pad(X_test, SEQUENCE_LENGTH) # Different placeholders with tf.name_scope('Inputs'): batch_ph = tf.placeholder(tf.int32, [None, SEQUENCE_LENGTH], name='batch_ph') target_ph = tf.placeholder(tf.float32, [None], name='target_ph') seq_len_ph = tf.placeholder(tf.int32, [None], name='seq_len_ph') keep_prob_ph = tf.placeholder(tf.float32, name='keep_prob_ph') # Embedding layer with tf.name_scope('Embedding_layer'): embeddings_var = tf.Variable(tf.random_uniform([vocabulary_size, EMBEDDING_DIM], -1.0, 1.0), trainable=True) tf.summary.histogram('embeddings_var', embeddings_var) batch_embedded = tf.nn.embedding_lookup(embeddings_var, batch_ph) # (Bi-)RNN layer(-s) rnn_outputs, _ = bi_rnn(GRUCell(HIDDEN_SIZE), GRUCell(HIDDEN_SIZE), inputs=batch_embedded, sequence_length=seq_len_ph, dtype=tf.float32) tf.summary.histogram('RNN_outputs', rnn_outputs) # Attention layer with tf.name_scope('Attention_layer'): attention_output, alphas = attention(rnn_outputs, ATTENTION_SIZE, return_alphas=True) tf.summary.histogram('alphas', alphas) # Dropout drop = tf.nn.dropout(attention_output, keep_prob_ph) # Fully connected layer with tf.name_scope('Fully_connected_layer'): W = tf.Variable(tf.truncated_normal([HIDDEN_SIZE * 2, 1], stddev=0.1)) # Hidden size is multiplied by 2 for Bi-RNN b = tf.Variable(tf.constant(0., shape=[1])) y_hat = tf.nn.xw_plus_b(drop, W, b) y_hat = tf.squeeze(y_hat) tf.summary.histogram('W', W) with tf.name_scope('Metrics'): # Cross-entropy loss and optimizer initialization loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_hat, labels=target_ph)) tf.summary.scalar('loss', loss) optimizer = tf.train.AdamOptimizer(learning_rate=1e-3).minimize(loss) # Accuracy metric accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.round(tf.sigmoid(y_hat)), target_ph), tf.float32)) tf.summary.scalar('accuracy', accuracy) merged = tf.summary.merge_all() # Batch generators train_batch_generator = batch_generator(X_train, y_train, BATCH_SIZE) test_batch_generator = batch_generator(X_test, y_test, BATCH_SIZE) train_writer = tf.summary.FileWriter('./logdir/train', accuracy.graph) test_writer = tf.summary.FileWriter('./logdir/test', accuracy.graph) session_conf = tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth=True)) saver = tf.train.Saver() if __name__ == "__main__": with tf.Session(config=session_conf) as sess: sess.run(tf.global_variables_initializer()) print("Start learning...") for epoch in range(NUM_EPOCHS): loss_train = 0 loss_test = 0 accuracy_train = 0 accuracy_test = 0 print("epoch: {}\t".format(epoch), end="") # Training num_batches = X_train.shape[0] // BATCH_SIZE for b in tqdm(range(num_batches)): x_batch, y_batch = next(train_batch_generator) seq_len = np.array([list(x).index(0) + 1 for x in x_batch]) # actual lengths of sequences loss_tr, acc, _, summary = sess.run([loss, accuracy, optimizer, merged], feed_dict={batch_ph: x_batch, target_ph: y_batch, seq_len_ph: seq_len, keep_prob_ph: KEEP_PROB}) accuracy_train += acc loss_train = loss_tr * DELTA + loss_train * (1 - DELTA) train_writer.add_summary(summary, b + num_batches * epoch) accuracy_train /= num_batches # Testing num_batches = X_test.shape[0] // BATCH_SIZE for b in tqdm(range(num_batches)): x_batch, y_batch = next(test_batch_generator) seq_len = np.array([list(x).index(0) + 1 for x in x_batch]) # actual lengths of sequences loss_test_batch, acc, summary = sess.run([loss, accuracy, merged], feed_dict={batch_ph: x_batch, target_ph: y_batch, seq_len_ph: seq_len, keep_prob_ph: 1.0}) accuracy_test += acc loss_test += loss_test_batch test_writer.add_summary(summary, b + num_batches * epoch) accuracy_test /= num_batches loss_test /= num_batches print("loss: {:.3f}, val_loss: {:.3f}, acc: {:.3f}, val_acc: {:.3f}".format( loss_train, loss_test, accuracy_train, accuracy_test )) train_writer.close() test_writer.close() saver.save(sess, MODEL_PATH) print("Run 'tensorboard --logdir=./logdir' to checkout tensorboard logs.")
utils.py
from __future__ import print_function import numpy as np def zero_pad(X, seq_len): return np.array([x[:seq_len - 1] + [0] * max(seq_len - len(x), 1) for x in X]) def get_vocabulary_size(X): return max([max(x) for x in X]) + 1 # plus the 0th word def fit_in_vocabulary(X, voc_size): return [[w for w in x if w < voc_size] for x in X] def batch_generator(X, y, batch_size): """Primitive batch generator """ size = X.shape[0] X_copy = X.copy() y_copy = y.copy() indices = np.arange(size) np.random.shuffle(indices) X_copy = X_copy[indices] y_copy = y_copy[indices] i = 0 while True: if i + batch_size <= size: yield X_copy[i:i + batch_size], y_copy[i:i + batch_size] i += batch_size else: i = 0 indices = np.arange(size) np.random.shuffle(indices) X_copy = X_copy[indices] y_copy = y_copy[indices] continue if __name__ == "__main__": # Test batch generator gen = batch_generator(np.array(['a', 'b', 'c', 'd']), np.array([1, 2, 3, 4]), 2) for _ in range(8): xx, yy = next(gen) print(xx, yy)
代码地址:https://github.com/ilivans/tf-rnn-attention
运行结果:
在训练集上准确率达到96%,测试集达到86%,效果还是很强大