PyTorch搭建RNN联合嵌入模型(LSTM GRU)实现视觉问答(VQA)实战(超详细 附数据集和源码)

简介: PyTorch搭建RNN联合嵌入模型(LSTM GRU)实现视觉问答(VQA)实战(超详细 附数据集和源码)

需要源码和数据集请点赞关注收藏后评论区留言私信~~~

一、视觉问题简介

视觉问答(VQA)是一种同时设计计算机视觉和自然语言处理的学习任务。简单来说,VQA就是对给定的图片进行问答,一个VQA系统以一张图片和一个关于这张图片形式自由,开放式的自然语言问题作为输入,生成一条自然语言答案作为输出,视觉问题系统综合运用到了目前的计算机视觉和自然语言处理的技术,并设计模型设计,实验,以及可视化。

VQA问题的一种典型模型是联合嵌入模型,这种方法首先学习视觉与自然语言的两个不同模态特征在一个共同的特征空间的嵌入表示,然后根据这种嵌入表示产生回答。

二、数据集的准备

1:下载数据

这里使用VQA2.0数据集进行训练和验证,VQA2.0是一个公认有难度,并且语言验证得到了有效控制的数据集

本次使用到的图片为MSCOCO数据集中train2014子集和val2014子集,图片可以在官网下载

数据集网址

本次用到的图像特征是由目标检测网络Faster-RCNN检测并生成的,可评论区留言私信博主要

2:安装依赖

确保安装好PyTorch,然后在程序目录下运行pip install -r requirements.txt安装其他依赖项

三、关键模块简介

1:FCnet模块

FCnet即一系列的全连接层,各个层的输入输出大小在模块构建时给出,这个模块默认使其中的全连接层具有bias,并以ReLU作为激活函数 并使用weight normalization

2:SimpleClassifier模块

它的作用是:在视觉问答系统的末端,根据融合的特征得到最终答案

3:问题嵌入模块

在联合嵌入模型中,需要使用RNN将输入的问题编码成向量,LSTM和GRU使两种代表性的RNN,由于实践中GRU与LSTM表现相近且占用显存较少,所以这里选用GRU

4:词嵌入

要获得问题句子的嵌入表示,首先应该获得词嵌入表示,每一个词需要用一个唯一的数字表示

baseline代码如下

import torch
import torch.nn as nn
from lib.module import topdown_attention
from lib.module.language_model import WordEmbedding, QuestionEmbedding
from lib.module.classifier import SimpleClassifier
from lib.module.fc import FCNet
class Baseline(nn.Module):
    def __init__(self, w_emb, q_emb, v_att, q_net, v_net, classifer, need_internals=False):
        super(Baseline, self).__init__()
        self.need_internals = need_internals
        self.w_emb = w_emb
        self.q_emb = q_emb
        self.v_att = v_att
        self.q_net = q_net
        self.v_net = v_net
        self.classifier = classifer
    def forward(self, q_tokens, ent_features):
        w_emb = self.w_emb(q_tokens)
        q_emb = self.q_emb(w_emb)
        att = self.v_att(q_emb, ent_features) # [ B, n_ent, 1 ]
        v_emb = (att * ent_features).sum(1)  # [ B, hid_dim ]
        internals = [att.squeeze()] if self.need_internals else None
        q_repr = self.q_net(q_emb)
        v_repr = self.v_net(v_emb)
        joint_repr = q_repr * v_repr
        logits = self.classifier(joint_repr)
        return logits, internals
    @classmethod
    def build_from_config(cls, cfg, dataset, need_internals):
        w_emb = WordEmbedding(dataset.word_dict.n_tokens, cfg.lm.word_emb_dim, 0.0)
        q_emb = QuestionEmbedding(cfg.lm.word_emb_dim, cfg.hid_dim, cfg.lm.n_layers, cfg.lm.bidirectional, cfg.lm.dropout, cfg.lm.rnn_type)
        q_dim = cfg.hid_dim
        att_cls = topdown_attention.classes[cfg.topdown_att.type]
        v_att = att_cls(1, q_dim, cfg.ent_dim, cfg.topdown_att.hid_dim, cfg.topdown_att.dropout)
        q_net = FCNet([q_dim, cfg.hid_dim])
        v_net = FCNet([cfg.ent_dim, cfg.hid_dim])
        classifier = SimpleClassifier(cfg.hid_dim, cfg.mlp.hid_dim, dataset.ans_dict.n_tokens, cfg.mlp.dropout)
        return cls(w_emb, q_emb, v_att, q_net, v_net, classifier, need_internals)

数据集目录如下

四、结果可视化

读取了之前训练好的模型之后,使用数据为配置文件中的val,程序运行完成后结果可视化如下

机器对于给出的图片会输出对于的问答结果

五、代码

部分代码如下

训练类

import os
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim.lr_scheduler import LambdaLR
from torch.nn.utils import clip_grad_norm_
from bisect import bisect
from tqdm import tqdm
def bce_with_logits(logits, labels):
    assert logits.dim() == 2
    loss = F.binary_cross_entropy_with_logits(logits, labels)
    loss *= labels.size(1) # multiply by number of QAs
    return loss
def sce_with_logits(logits, labels):
    assert logits.dim() == 2
    loss = F.cross_entropy(logits, labels.nonzero()[:, 1])
    loss *= labels.size(1)
    return loss
def compute_score_with_logits(logits, labels):
    with torch.no_grad():
        logits = torch.max(logits, 1)[1] # argmax
        one_hots = torch.zeros(*labels.size()).cuda()
        one_hots.scatter_(1, logits.view(-1, 1), 1)
        scores = (one_hots * labels)
        return scores
def lr_schedule_func_builder(cfg):
    def func(step_idx):
        if step_idx <= cfg.train.warmup_steps:
            alpha = float(step_idx) / float(cfg.train.warmup_steps)
            return cfg.train.warmup_factor * (1. - alpha) + alpha
        else:
            idx = bisect(cfg.train.lr_steps, step_idx)
            return pow(cfg.train.lr_ratio, idx)
    return func
def train(model, cfg, train_loader, val_loader, n_epochs, val_freq, out_dir):
    os.makedirs(out_dir, exist_ok=True)
    optim = torch.optim.Adamax(model.parameters(), **cfg.train.optim)
    n_train_batches = len(train_loader)
    train_score = 0.0
    loss_fn = bce_with_logits if cfg.model.loss == "logistic" else sce_with_logits
    for epoch in range(n_epochs):
        epoch_loss = 0.0
        tic_0 = time.time()
        for i, data in enumerate(train_loader):
            tic_1 = time.time()
            q_tokens = data[2].cuda()
            a_targets = data[3].cuda()
            v_features = [_.cuda() for _ in data[4:]]
            tic_2 = time.time()
            optim.zero_grad()
            logits, _ = model(q_tokens, *v_features)
            loss = loss_fn(logits, a_targets)
            tic_3 = time.time()
            loss.backward()
            if cfg.train.clip_grad: clip_grad_norm_(model.parameters(), cfg.train.max_grad_norm)
            optim.step()
            tic_4 = time.time()
            batch_score = compute_score_with_logits(logits, a_targets).sum()
            epoch_loss += float(loss.data.item() * logits.size(0))
            train_score += float(batch_score)
            del loss
            logstr = "epoch %2d batch %4d/%4d | ^ %4dms | => %4dms | <= %4dms" % \
                  (epoch + 1, i + 1, n_train_batches, 1000*(tic_2-tic_0), 1000*(tic_3-tic_2), 1000*(tic_4-tic_3))
            print("%-80s" % logstr, end="\r")
            tic_0 = time.time()
        epoch_loss /= len(train_loader.dataset)
        train_score = 100 * train_score / len(train_loader.dataset)
        logstr = "epoch %2d | train_loss: %5.2f train_score: %5.2f" % (epoch + 1, epoch_loss, train_score)
        if (epoch + 1) % val_freq == 0:
            model.eval()
            val_score, upper_bound = validate(model, val_loader)
            model.train()
            logstr += " | val_score: %5.2f (%5.2f)" % (100 * val_score, 100 * upper_bound)
        print("%-80s" % logstr)
        model_path = os.path.join(out_dir, 'model_%d.pth' % (epoch + 1))
        torch.save(model.state_dict(), model_path)
def validate(model, loader):
    score = 0
    upper_bound = 0
    n_qas = 0
    with torch.no_grad():
        for i, data in enumerate(loader):
            q_tokens = data[2].cuda()
            a_targets = data[3].cuda()
            v_features = [_.cuda() for _ in data[4:]]
            logits, _ = model(q_tokens, *v_features)
            batch_score = compute_score_with_logits(logits, a_targets)
            score += batch_score.sum()
            upper_bound += (a_targets.max(1)[0]).sum()
            n_qas += logits.size(0)
            logstr = "val batch %5d/%5d" % (i + 1, len(loader))
            print("%-80s" % logstr, end='\r')
    score = score / n_qas
    upper_bound = upper_bound / n_qas
    return score, upper_bound

infer类

import os
import time
import json
import torch
import cv2
import shutil
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
colors = [ (175, 84, 65), (68, 194, 246), (136, 147, 65), (92, 192, 151) ]
def attention_map(im, boxes, atts, p=0.8, bgc=1.0, compress=0.85, box_color=(65, 81, 226)):
    height, width, channel = im.shape
    im = im / 255.0
    att_map = np.zeros([height, width])
    boxes = boxes.astype(np.int)
    for box, att in zip(boxes, atts):
        x1, y1, x2, y2 = box
        roi = att_map[y1:y2, x1:x2]
        roi[roi < att] = att
    att_map /= att_map.max()
    att_map = att_map ** p
    att_map = att_map * compress + (1-compress)
    att_map = cv2.resize(att_map, (int(width/16), int(height/16)))
    att_map = cv2.resize(att_map, (width, height))
    att_map = np.expand_dims(att_map, axis=2)
    bg = np.ones_like(att_map) * bgc
    att_im = im * att_map + bg * (1-att_map)
    att_im = (att_im * 255).astype(np.uint8)
    center = np.argmax(atts)
    x1, y1, x2, y2 = boxes[center]
    cv2.rectangle(att_im, (x1, y1), (x2, y2), box_color, 5)
    return att_im
def infer_visualize(model, args, cfg, ans_dict, loader):
    _, ckpt = os.path.split(args.checkpoint)
    ckpt, _ = os.path.splitext(ckpt)
    out_dir = os.path.join(args.out_dir, "%s_%s_%s_visualization" % (args.cfg_name, ckpt, args.data))
    os.makedirs(out_dir, exist_ok=True)
    model.eval()
    questions_path = cfg.data[args.data].composition[0].q_jsons[0]
    questions = json.load(open(questions_path))
    pbar = tqdm(total=args.n_batches * loader.batch_size)
    with torch.no_grad():
        for i, data in enumerate(loader):
            if i == args.n_batches: break
            question_ids = data[0]
            image_ids = data[1]
            q_tokens = data[2].cuda()
            obj_featuers = data[4].cuda()
            batch_boxes = data[5].numpy()
            logits, internals = model(q_tokens, obj_featuers)
            topdown_atts = internals[0]
            topdown_atts = topdown_atts.data.cpu().numpy()
            _, predictions = logits.max(dim=1)
            for idx in range(len(question_ids)):
                question_id = question_ids[idx]
                image_id = image_ids[idx]
                boxes = batch_boxes[idx]
                answer = ans_dict.idx2ans[predictions[idx]]
                q_entry = questions[question_id]
                topdown_att = topdown_atts[idx]
                question = q_entry["question"]
                gts = list(q_entry["answers"].items())
                gts = sorted(gts, reverse=True, key=lambda x: x[1])
                gt = gts[0][0]
                q_out_dir = os.path.join(out_dir, question_id)
                os.makedirs(q_out_dir, exist_ok=True)
                q_str = question + "\n" + "gt: %s\n" % gt + "answer: %s\n" % answer
                with open(os.path.join(q_out_dir, "qa.txt"), "w") as f: f.write(q_str)
                image_path = os.path.join(args.images_dir, "%s.jpg" % image_id)
                shutil.copy(image_path, os.path.join(q_out_dir, "original.jpg"))
                im = cv2.imread(image_path)
                att_map = attention_map(im.copy(), boxes, topdown_att)
                cv2.imwrite(os.path.join(q_out_dir, "topdown_att.jpg"), att_map)
                pbar.update(1)

创作不易 觉得有帮助请点赞关注收藏~~~

相关文章
|
2月前
|
机器学习/深度学习 编解码 PyTorch
Pytorch实现手写数字识别 | MNIST数据集(CNN卷积神经网络)
Pytorch实现手写数字识别 | MNIST数据集(CNN卷积神经网络)
|
1月前
|
机器学习/深度学习 PyTorch 算法框架/工具
【PyTorch实战演练】AlexNet网络模型构建并使用Cifar10数据集进行批量训练(附代码)
【PyTorch实战演练】AlexNet网络模型构建并使用Cifar10数据集进行批量训练(附代码)
61 0
|
1月前
|
机器学习/深度学习 PyTorch 算法框架/工具
【PyTorch实战演练】使用Cifar10数据集训练LeNet5网络并实现图像分类(附代码)
【PyTorch实战演练】使用Cifar10数据集训练LeNet5网络并实现图像分类(附代码)
61 0
|
4天前
|
机器学习/深度学习 算法 Python
Python用RNN神经网络:LSTM、GRU、回归和ARIMA对COVID19新冠疫情人数时间序列预测
Python用RNN神经网络:LSTM、GRU、回归和ARIMA对COVID19新冠疫情人数时间序列预测
47 12
|
10天前
|
机器学习/深度学习
HAR-RV-J与递归神经网络(RNN)混合模型预测和交易大型股票指数的高频波动率
HAR-RV-J与递归神经网络(RNN)混合模型预测和交易大型股票指数的高频波动率
19 0
|
18天前
|
机器学习/深度学习 数据可视化 PyTorch
利用PyTorch实现基于MNIST数据集的手写数字识别
利用PyTorch实现基于MNIST数据集的手写数字识别
20 2
|
1月前
|
机器学习/深度学习 自然语言处理 PyTorch
【PyTorch实战演练】基于全连接网络构建RNN并生成人名
【PyTorch实战演练】基于全连接网络构建RNN并生成人名
23 0
|
1月前
|
机器学习/深度学习 自然语言处理 并行计算
神经网络结构——CNN、RNN、LSTM、Transformer !!
神经网络结构——CNN、RNN、LSTM、Transformer !!
136 0
|
1月前
|
机器学习/深度学习 PyTorch 算法框架/工具
使用PyTorch加载数据集:简单指南
使用PyTorch加载数据集:简单指南
使用PyTorch加载数据集:简单指南
|
2月前
|
机器学习/深度学习 算法 PyTorch
pytorch实现手写数字识别 | MNIST数据集(全连接神经网络)
pytorch实现手写数字识别 | MNIST数据集(全连接神经网络)