如图 我这里是(随便拿了个网络的玉米病斑数据,然后用resnet跑了测试了一下效果)要提取做的是识别这里面的病斑,然后我训练好我的模型,打入图片进行预测,之后对于其中最后一个卷积层的输出打入到Grad-CAM进行可视化。
import os import numpy as np from PIL import Image from torchvision import transforms from utils import GradCAM, show_cam_on_image, center_crop_img import torch from matplotlib import pyplot as plt from torch import nn from torchvision.transforms import transforms def main(): #这个下面放置你网络的代码,因为载入权重的时候需要读取网络代码,这里我建议直接从自己的训练代码中原封不动的复制过来即可,我这里因为跑代码使用的是Resnet,所以这里将resent的网络复制到这里即可 class BasicBlock(nn.Module): expansion = 1 def __init__(self, in_channel, out_channel, stride=1, downsample=None, **kwargs): super(BasicBlock, self).__init__() self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=out_channel, kernel_size=3, stride=stride, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(out_channel) self.relu = nn.ReLU() self.conv2 = nn.Conv2d(in_channels=out_channel, out_channels=out_channel, kernel_size=3, stride=1, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(out_channel) self.downsample = downsample def forward(self, x): identity = x if self.downsample is not None: identity = self.downsample(x) out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) out += identity out = self.relu(out) return out class Bottleneck(nn.Module): """ 注意:原论文中,在虚线残差结构的主分支上,第一个1x1卷积层的步距是2,第二个3x3卷积层步距是1。 但在pytorch官方实现过程中是第一个1x1卷积层的步距是1,第二个3x3卷积层步距是2, 这么做的好处是能够在top1上提升大概0.5%的准确率。 可参考Resnet v1.5 https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch """ expansion = 4 def __init__(self, in_channel, out_channel, stride=1, downsample=None, groups=1, width_per_group=64): super(Bottleneck, self).__init__() width = int(out_channel * (width_per_group / 64.)) * groups self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=width, kernel_size=1, stride=1, bias=False) # squeeze channels self.bn1 = nn.BatchNorm2d(width) # ----------------------------------------- self.conv2 = nn.Conv2d(in_channels=width, out_channels=width, groups=groups, kernel_size=3, stride=stride, bias=False, padding=1) self.bn2 = nn.BatchNorm2d(width) # ----------------------------------------- self.conv3 = nn.Conv2d(in_channels=width, out_channels=out_channel * self.expansion, kernel_size=1, stride=1, bias=False) # unsqueeze channels self.bn3 = nn.BatchNorm2d(out_channel * self.expansion) self.relu = nn.ReLU(inplace=True) self.downsample = downsample def forward(self, x): identity = x if self.downsample is not None: identity = self.downsample(x) out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) out = self.relu(out) out = self.conv3(out) out = self.bn3(out) out += identity out = self.relu(out) return out class ResNet(nn.Module): def __init__(self, block, blocks_num, num_classes=5, include_top=True, groups=1, width_per_group=64): super(ResNet, self).__init__() self.include_top = include_top self.in_channel = 64 self.groups = groups self.width_per_group = width_per_group self.conv1 = nn.Conv2d(3, self.in_channel, kernel_size=7, stride=2, padding=3, bias=False) self.bn1 = nn.BatchNorm2d(self.in_channel) self.relu = nn.ReLU(inplace=True) self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) self.layer1 = self._make_layer(block, 64, blocks_num[0]) self.layer2 = self._make_layer(block, 128, blocks_num[1], stride=2) self.layer3 = self._make_layer(block, 256, blocks_num[2], stride=2) self.layer4 = self._make_layer(block, 512, blocks_num[3], stride=2) if self.include_top: self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) # output size = (1, 1) self.fc = nn.Linear(512 * block.expansion, num_classes) for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') def _make_layer(self, block, channel, block_num, stride=1): downsample = None if stride != 1 or self.in_channel != channel * block.expansion: downsample = nn.Sequential( nn.Conv2d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(channel * block.expansion)) layers = [] layers.append(block(self.in_channel, channel, downsample=downsample, stride=stride, groups=self.groups, width_per_group=self.width_per_group)) self.in_channel = channel * block.expansion for _ in range(1, block_num): layers.append(block(self.in_channel, channel, groups=self.groups, width_per_group=self.width_per_group)) return nn.Sequential(*layers) def forward(self, x): x = self.conv1(x) x = self.bn1(x) x = self.relu(x) x = self.maxpool(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.layer4(x) if self.include_top: x = self.avgpool(x) x = torch.flatten(x, 1) x = self.fc(x) return x def resnet34(num_classes=1000, include_top=True): # https://download.pytorch.org/models/resnet34-333f7ec4.pth return ResNet(BasicBlock, [3, 4, 6, 3], num_classes=num_classes, include_top=include_top) def resnet50(num_classes=1000, include_top=True): # https://download.pytorch.org/models/resnet50-19c8e357.pth return ResNet(Bottleneck, [3, 4, 6, 3], num_classes=num_classes, include_top=include_top) def resnet101(num_classes=1000, include_top=True): # https://download.pytorch.org/models/resnet101-5d3b4d8f.pth return ResNet(Bottleneck, [3, 4, 23, 3], num_classes=num_classes, include_top=include_top) def resnext50_32x4d(num_classes=1000, include_top=True): # https://download.pytorch.org/models/resnext50_32x4d-7cdf4587.pth groups = 32 width_per_group = 4 return ResNet(Bottleneck, [3, 4, 6, 3], num_classes=num_classes, include_top=include_top, groups=groups, width_per_group=width_per_group) def resnext101_32x8d(num_classes=1000, include_top=True): # https://download.pytorch.org/models/resnext101_32x8d-8ba56ff5.pth groups = 32 width_per_group = 8 return ResNet(Bottleneck, [3, 4, 23, 3], num_classes=num_classes, include_top=include_top, groups=groups, width_per_group=width_per_group) net = resnet34() device = torch.device("cpu") net.load_state_dict(torch.load("./transfer-learning-resnet.pth", map_location=device)) # 载入训练的resnet模型权重,你将训练的模型权重放到当前文件夹下即可 target_layers = [net.layer4[-1]] #这里是 看你是想看那一层的输出,我这里是打印的resnet最后一层的输出,你也可以根据需要修改成自己的 print(target_layers) data_transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]) # 导入图片 img_path = "./38.jpg"#这里是导入你需要测试图片 image_size = 500#训练图像的尺寸,在你训练图像的时候图像尺寸是多少这里就填多少 assert os.path.exists(img_path), "file: '{}' dose not exist.".format(img_path) img = Image.open(img_path).convert('RGB')#将图片转成RGB格式的 img = np.array(img, dtype=np.uint8) #转成np格式 img = center_crop_img(img, image_size) #将测试图像裁剪成跟训练图片尺寸相同大小的 # [C, H, W] img_tensor = data_transform(img)#简单预处理将图片转化为张量 # expand batch dimension # [C, H, W] -> [N, C, H, W] input_tensor = torch.unsqueeze(img_tensor, dim=0) #增加一个batch维度 cam = GradCAM(model=net, target_layers=target_layers, use_cuda=False) grayscale_cam = cam(input_tensor=input_tensor) grayscale_cam = grayscale_cam[0, :] visualization = show_cam_on_image(img.astype(dtype=np.float32) / 255., grayscale_cam, use_rgb=True) plt.imshow(visualization) plt.savefig('./result.png')#将热力图的结果保存到本地当前文件夹 plt.show() if __name__ == '__main__': main()
import cv2 import numpy as np class ActivationsAndGradients: """ Class for extracting activations and registering gradients from targeted intermediate layers """ def __init__(self, model, target_layers, reshape_transform): self.model = model self.gradients = [] self.activations = [] self.reshape_transform = reshape_transform self.handles = [] for target_layer in target_layers: self.handles.append( target_layer.register_forward_hook( self.save_activation)) # Backward compatibility with older pytorch versions: if hasattr(target_layer, 'register_full_backward_hook'): self.handles.append( target_layer.register_full_backward_hook( self.save_gradient)) else: self.handles.append( target_layer.register_backward_hook( self.save_gradient)) def save_activation(self, module, input, output): activation = output if self.reshape_transform is not None: activation = self.reshape_transform(activation) self.activations.append(activation.cpu().detach()) def save_gradient(self, module, grad_input, grad_output): # Gradients are computed in reverse order grad = grad_output[0] if self.reshape_transform is not None: grad = self.reshape_transform(grad) self.gradients = [grad.cpu().detach()] + self.gradients def __call__(self, x): self.gradients = [] self.activations = [] return self.model(x) def release(self): for handle in self.handles: handle.remove() class GradCAM: def __init__(self, model, target_layers, reshape_transform=None, use_cuda=False): self.model = model.eval() self.target_layers = target_layers self.reshape_transform = reshape_transform self.cuda = use_cuda if self.cuda: self.model = model.cuda() self.activations_and_grads = ActivationsAndGradients( self.model, target_layers, reshape_transform) """ Get a vector of weights for every channel in the target layer. Methods that return weights channels, will typically need to only implement this function. """ @staticmethod def get_cam_weights(grads): return np.mean(grads, axis=(2, 3), keepdims=True) @staticmethod def get_loss(output, target_category): loss = 0 for i in range(len(target_category)): loss = loss + output[i, target_category[i]] return loss def get_cam_image(self, activations, grads): weights = self.get_cam_weights(grads) weighted_activations = weights * activations cam = weighted_activations.sum(axis=1) return cam @staticmethod def get_target_width_height(input_tensor): width, height = input_tensor.size(-1), input_tensor.size(-2) return width, height def compute_cam_per_layer(self, input_tensor): activations_list = [a.cpu().data.numpy() for a in self.activations_and_grads.activations] grads_list = [g.cpu().data.numpy() for g in self.activations_and_grads.gradients] target_size = self.get_target_width_height(input_tensor) cam_per_target_layer = [] # Loop over the saliency image from every layer for layer_activations, layer_grads in zip(activations_list, grads_list): cam = self.get_cam_image(layer_activations, layer_grads) cam[cam < 0] = 0 # works like mute the min-max scale in the function of scale_cam_image scaled = self.scale_cam_image(cam, target_size) cam_per_target_layer.append(scaled[:, None, :]) return cam_per_target_layer def aggregate_multi_layers(self, cam_per_target_layer): cam_per_target_layer = np.concatenate(cam_per_target_layer, axis=1) cam_per_target_layer = np.maximum(cam_per_target_layer, 0) result = np.mean(cam_per_target_layer, axis=1) return self.scale_cam_image(result) @staticmethod def scale_cam_image(cam, target_size=None): result = [] for img in cam: img = img - np.min(img) img = img / (1e-7 + np.max(img)) if target_size is not None: img = cv2.resize(img, target_size) result.append(img) result = np.float32(result) return result def __call__(self, input_tensor, target_category=None): if self.cuda: input_tensor = input_tensor.cuda() # 正向传播得到网络输出logits(未经过softmax) output = self.activations_and_grads(input_tensor) if isinstance(target_category, int): target_category = [target_category] * input_tensor.size(0) if target_category is None: target_category = np.argmax(output.cpu().data.numpy(), axis=-1) print(f"category id: {target_category}") else: assert (len(target_category) == input_tensor.size(0)) self.model.zero_grad() loss = self.get_loss(output, target_category) loss.backward(retain_graph=True) # In most of the saliency attribution papers, the saliency is # computed with a single target layer. # Commonly it is the last convolutional layer. # Here we support passing a list with multiple target layers. # It will compute the saliency image for every image, # and then aggregate them (with a default mean aggregation). # This gives you more flexibility in case you just want to # use all conv layers for example, all Batchnorm layers, # or something else. cam_per_layer = self.compute_cam_per_layer(input_tensor) return self.aggregate_multi_layers(cam_per_layer) def __del__(self): self.activations_and_grads.release() def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_tb): self.activations_and_grads.release() if isinstance(exc_value, IndexError): # Handle IndexError here... print( f"An exception occurred in CAM with block: {exc_type}. Message: {exc_value}") return True def show_cam_on_image(img: np.ndarray, mask: np.ndarray, use_rgb: bool = False, colormap: int = cv2.COLORMAP_JET) -> np.ndarray: """ This function overlays the cam mask on the image as an heatmap. By default the heatmap is in BGR format. :param img: The base image in RGB or BGR format. :param mask: The cam mask. :param use_rgb: Whether to use an RGB or BGR heatmap, this should be set to True if 'img' is in RGB format. :param colormap: The OpenCV colormap to be used. :returns: The default image with the cam overlay. """ heatmap = cv2.applyColorMap(np.uint8(255 * mask), colormap) if use_rgb: heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB) heatmap = np.float32(heatmap) / 255 if np.max(img) > 1: raise Exception( "The input image should np.float32 in the range [0, 1]") cam = heatmap + img cam = cam / np.max(cam) return np.uint8(255 * cam) def center_crop_img(img: np.ndarray, size: int): h, w, c = img.shape if w == h == size: return img if w < h: ratio = size / w new_w = size new_h = int(h * ratio) else: ratio = size / h new_h = size new_w = int(w * ratio) img = cv2.resize(img, dsize=(new_w, new_h)) if new_w == size: h = (new_h - size) // 2 img = img[h: h+size] else: w = (new_w - size) // 2 img = img[:, w: w+size] return img