你是谁家的小猫咪?
小茗同学:二狗子家的小猫又来偷吃了,小花猫已经饿坏了,咋办呀???
怎么办???小宠物们识别要大学科研团队才能做,作为幼儿园的我怎么办???
小茗同学苦恼的挠了挠头,发出痛苦的啊啊啊声音。。。。。。
小茗同学查到了新闻报道《“猴脸识别技术”来了!》new.qq.com/omn/2021022…
别着急,幼儿园小茗同学我教你用旷视深度学习框架MegEngine来实现小猫识别,不认识的小猫不准进。
1.数据采集
所有猫的视频均采集公开视频,通过视频截图来获得小猫的正脸照片,不用单独拍照了。
!unzip -q data/data71411/cat.zip
replace 1.mp4? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C
1.1 python调用openCV每隔1秒从视频中截取一张图片,编号后保存。
import cv2 import os for i in range(1,5): # 创建图片目录 print(i) mp4_file=str(i)+'.mp4' dir_path=os.path.join('dataset',str(i)) if not os.path.exists(dir_path): os.makedirs(dir_path) # 每秒存一次图片 vidcap = cv2.VideoCapture(mp4_file) success,image = vidcap.read() fps = int(vidcap.get(cv2.CAP_PROP_FPS)) count = 0 while success: if count % fps == 0: cv2.imwrite("{}/{}.jpg".format(dir_path, int(count / fps)), image) print('Process %dth seconds: ' % int(count / fps), success) success,image = vidcap.read() count += 1
1.2 生成图片进行处理
删除片尾等不正常图片
手动中...................................
import matplotlib.pyplot as plt %matplotlib inline import cv2 as cv import numpy as np # jupyter notebook显示 def visualize_images(): img = cv.imread('dataset/1/1.jpg') plt.imshow(img) plt.show() visualize_images()
1.3 数据集查看
4只不同的小猫
1.4 list生成
自定义的数据集,首先要生成图像列表,把自定的图像分为测试集和训练集,并带有标签。下面的程序可以单独运行,只要把一个大类的文件夹路径传进去就可以了,该程序会把里面的每个小类别都迭代,生成固定格式的列表.比如我们把人脸类别的根目录传进去./dataset。最后会在指定目录下面生成三个文件,readme.json、train.list和test.list.
import os import json # 设置要生成文件的路径 data_root_path = 'cat' # 所有类别的信息 class_detail = [] # 获取所有类别保存的文件夹名称,这里是['1', '2', '3','4'] class_dirs = os.listdir(data_root_path) # 类别标签 class_label = 0 # 获取总类别的名称 father_paths = data_root_path.split('/') while True: if father_paths[father_paths.__len__() - 1] == '': del father_paths[father_paths.__len__() - 1] else: break father_path = father_paths[father_paths.__len__() - 1] data_list_path='./' # 清空原来的数据 with open( "test.txt", 'w') as f: pass with open( "train.txt", 'w') as f: pass # 总的图像数量 all_class_images = 0 # 读取每个类别 for class_dir in class_dirs: # 每个类别的信息 class_detail_list = {} test_sum = 0 trainer_sum = 0 # 统计每个类别有多少张图片 class_sum = 0 # 获取类别路径 path = data_root_path + "/" + class_dir # 获取所有图片 img_paths = os.listdir(path) for img_path in img_paths: # 遍历文件夹下的每个图片 name_path = path + '/' + img_path # 每张图片的路径 if class_sum % 10 == 0: # 每10张图片取一个做测试数据 test_sum += 1 #test_sum测试数据的数目 with open(data_list_path + "test.txt", 'a') as f: f.write(name_path + "\t%d" % class_label + "\n") #class_label 标签:0,1,2 else: trainer_sum += 1 #trainer_sum测试数据的数目 with open(data_list_path + "train.txt", 'a') as f: f.write(name_path + "\t%d" % class_label + "\n")#class_label 标签:0,1,2 class_sum += 1 #每类图片的数目 all_class_images += 1 #所有类图片的数目 # 说明的json文件的class_detail数据 class_detail_list['class_name'] = class_dir #类别名称,如jiangwen class_detail_list['class_label'] = class_label #类别标签,0,1,2 class_detail_list['class_test_images'] = test_sum #该类数据的测试集数目 class_detail_list['class_trainer_images'] = trainer_sum #该类数据的训练集数目 class_detail.append(class_detail_list) class_label += 1 #class_label 标签:0,1,2 # 获取类别数量 all_class_sum = class_dirs.__len__() # 说明的json文件信息 readjson = {} readjson['all_class_name'] = father_path #文件父目录 readjson['all_class_sum'] = all_class_sum # readjson['all_class_images'] = all_class_images readjson['class_detail'] = class_detail jsons = json.dumps(readjson, sort_keys=True, indent=4, separators=(',', ': ')) with open(data_list_path + "readme.json",'w') as f: f.write(jsons) print ('生成数据列表完成!')
生成数据列表完成!
1.5 DataSet构造
import megengine.hub import urllib import cv2 import numpy as np import megengine.data.transform as T import megengine.functional as F import numpy as np from PIL import Image class MiaoMiaoDataset(megengine.data.dataset.Dataset): """ 2类Bee数据集类的定义 """ def __init__(self,mode='train'): """ 初始化函数 """ self.data = [] with open('{}.txt'.format(mode)) as f: for line in f.readlines(): info = line.strip().split('\t') if len(info) > 0: self.data.append([info[0].strip(), info[1].strip()]) if mode == 'train': self.transforms = T.Compose([ T.Resize((224,224)), T.RandomHorizontalFlip(0.5), # 随机水平翻转 T.ToMode("CHW"), # 数据的格式转换和标准化 HWC => CHW T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # 图像归一化 ]) else: self.transforms = T.Compose([ T.Resize((224,224)), # 图像大小修改 # T.RandomCrop(IMAGE_SIZE), # 随机裁剪 T.ToMode("CHW"), # 数据的格式转换和标准化 HWC => CHW T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # 图像归一化 ]) def get_origin_data(self): return self.data def __getitem__(self, index): """ 根据索引获取单个样本 """ image_file, label = self.data[index] image = Image.open(image_file) if image.mode != 'RGB': image = image.convert('RGB') image = self.transforms(image) return image, np.array(label, dtype='int64') def __len__(self): """ 获取样本总数 """ return len(self.data)
train_dataset=MiaoMiaoDataset(mode='train') test_dataset=MiaoMiaoDataset(mode='test') print('train_data len: {}, test_data len:{}'.format(train_dataset.__len__(), test_dataset.__len__()))
train_data len: 45, test_data len:7
2.Model 定义
目前,数据已经划分了train和test数据集,以及分类数量等。
下一步我们将定义model,再复习一下resnet网络了。
import math import megengine.functional as F import megengine.hub as hub import megengine.module as M class BasicBlock(M.Module): expansion = 1 def __init__( self, in_channels, channels, stride=1, groups=1, base_width=64, dilation=1, norm=M.BatchNorm2d, ): super().__init__() if groups != 1 or base_width != 64: raise ValueError("BasicBlock only supports groups=1 and base_width=64") if dilation > 1: raise NotImplementedError("Dilation > 1 not supported in BasicBlock") self.conv1 = M.Conv2d( in_channels, channels, 3, stride, padding=dilation, bias=False ) self.bn1 = norm(channels) self.conv2 = M.Conv2d(channels, channels, 3, 1, padding=1, bias=False) self.bn2 = norm(channels) self.downsample = ( M.Identity() if in_channels == channels and stride == 1 else M.Sequential( M.Conv2d(in_channels, channels, 1, stride, bias=False), norm(channels), ) ) def forward(self, x): identity = x x = self.conv1(x) x = self.bn1(x) x = F.relu(x) x = self.conv2(x) x = self.bn2(x) identity = self.downsample(identity) x += identity x = F.relu(x) return x class Bottleneck(M.Module): expansion = 4 def __init__( self, in_channels, channels, stride=1, groups=1, base_width=64, dilation=1, norm=M.BatchNorm2d, ): super().__init__() width = int(channels * (base_width / 64.0)) * groups self.conv1 = M.Conv2d(in_channels, width, 1, 1, bias=False) self.bn1 = norm(width) self.conv2 = M.Conv2d( width, width, 3, stride, padding=dilation, groups=groups, dilation=dilation, bias=False, ) self.bn2 = norm(width) self.conv3 = M.Conv2d(width, channels * self.expansion, 1, 1, bias=False) self.bn3 = norm(channels * self.expansion) self.downsample = ( M.Identity() if in_channels == channels * self.expansion and stride == 1 else M.Sequential( M.Conv2d(in_channels, channels * self.expansion, 1, stride, bias=False), norm(channels * self.expansion), ) ) def forward(self, x): identity = x x = self.conv1(x) x = self.bn1(x) x = F.relu(x) x = self.conv2(x) x = self.bn2(x) x = F.relu(x) x = self.conv3(x) x = self.bn3(x) identity = self.downsample(identity) x += identity x = F.relu(x) return x class ResNet(M.Module): def __init__( self, block, layers, num_classes=1000, zero_init_residual=False, groups=1, width_per_group=64, replace_stride_with_dilation=None, norm=M.BatchNorm2d, ): super().__init__() self.in_channels = 64 self.dilation = 1 if replace_stride_with_dilation is None: # each element in the tuple indicates if we should replace # the 2x2 stride with a dilated convolution instead replace_stride_with_dilation = [False, False, False] if len(replace_stride_with_dilation) != 3: raise ValueError( "replace_stride_with_dilation should be None " "or a 3-element tuple, got {}".format(replace_stride_with_dilation) ) self.groups = groups self.base_width = width_per_group self.conv1 = M.Conv2d( 3, self.in_channels, kernel_size=7, stride=2, padding=3, bias=False ) self.bn1 = norm(self.in_channels) self.maxpool = M.MaxPool2d(kernel_size=3, stride=2, padding=1) self.layer1 = self._make_layer(block, 64, layers[0], norm=norm) self.layer2 = self._make_layer( block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0], norm=norm, ) self.layer3 = self._make_layer( block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1], norm=norm, ) self.layer4 = self._make_layer( block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2], norm=norm, ) self.fc = M.Linear(512 * block.expansion, num_classes) for m in self.modules(): if isinstance(m, M.Conv2d): M.init.msra_normal_(m.weight, mode="fan_out", nonlinearity="relu") if m.bias is not None: fan_in, _ = M.init.calculate_fan_in_and_fan_out(m.weight) bound = 1 / math.sqrt(fan_in) M.init.uniform_(m.bias, -bound, bound) elif isinstance(m, M.BatchNorm2d): M.init.ones_(m.weight) M.init.zeros_(m.bias) elif isinstance(m, M.Linear): M.init.msra_uniform_(m.weight, a=math.sqrt(5)) if m.bias is not None: fan_in, _ = M.init.calculate_fan_in_and_fan_out(m.weight) bound = 1 / math.sqrt(fan_in) M.init.uniform_(m.bias, -bound, bound) # Zero-initialize the last BN in each residual branch, # so that the residual branch starts with zeros, and each residual block # behaves like an identity. According to https://arxiv.org/abs/1706.02677 # This improves the model by 0.2~0.3%. if zero_init_residual: for m in self.modules(): if isinstance(m, Bottleneck): M.init.zeros_(m.bn3.weight) elif isinstance(m, BasicBlock): M.init.zeros_(m.bn2.weight) def _make_layer( self, block, channels, blocks, stride=1, dilate=False, norm=M.BatchNorm2d ): previous_dilation = self.dilation if dilate: self.dilation *= stride stride = 1 layers = [] layers.append( block( self.in_channels, channels, stride, groups=self.groups, base_width=self.base_width, dilation=previous_dilation, norm=norm, ) ) self.in_channels = channels * block.expansion for _ in range(1, blocks): layers.append( block( self.in_channels, channels, groups=self.groups, base_width=self.base_width, dilation=self.dilation, norm=norm, ) ) return M.Sequential(*layers) def extract_features(self, x): outputs = {} x = self.conv1(x) x = self.bn1(x) x = F.relu(x) x = self.maxpool(x) outputs["stem"] = x x = self.layer1(x) outputs["res2"] = x x = self.layer2(x) outputs["res3"] = x x = self.layer3(x) outputs["res4"] = x x = self.layer4(x) outputs["res5"] = x return outputs def forward(self, x): x = self.extract_features(x)["res5"] x = F.avg_pool2d(x, 7) x = F.flatten(x, 1) x = self.fc(x) return x @hub.pretrained( "https://data.megengine.org.cn/models/weights/resnet18_naiveaug_70312_78a63ca6.pkl" ) def resnet18(**kwargs): r"""ResNet-18 model from `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_ """ return ResNet(BasicBlock, [2, 2, 2, 2], **kwargs) @hub.pretrained( "https://data.megengine.org.cn/models/weights/resnet34_naiveaug_73960_fd9d869d.pkl" ) def resnet34(**kwargs): r"""ResNet-34 model from `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_ """ return ResNet(BasicBlock, [3, 4, 6, 3], **kwargs) @hub.pretrained( "https://data.megengine.org.cn/models/weights/resnet50_fbaug_76254_4e14b7d1.pkl" ) def resnet50(**kwargs): r"""ResNet-50 model from `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_ """ return ResNet(Bottleneck, [3, 4, 6, 3], **kwargs) @hub.pretrained( "https://data.megengine.org.cn/models/weights/resnet101_fbaug_77944_b7932921.pkl" ) def resnet101(**kwargs): r"""ResNet-101 model from `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_ """ return ResNet(Bottleneck, [3, 4, 23, 3], **kwargs) @hub.pretrained( "https://data.megengine.org.cn/models/weights/resnet152_fbaug_78582_7551aff3.pkl" ) def resnet152(**kwargs): r"""ResNet-152 model from `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_ """ return ResNet(Bottleneck, [3, 8, 36, 3], **kwargs) @hub.pretrained( "https://data.megengine.org.cn/models/weights/resnext50_32x4d_fbaug_77592_c4b04e5e.pkl" ) def resnext50_32x4d(**kwargs): r"""ResNeXt-50 32x4d model from `"Aggregated Residual Transformation for Deep Neural Networks" <https://arxiv.org/pdf/1611.05431.pdf>`_ Args: pretrained (bool): If True, returns a model pre-trained on ImageNet progress (bool): If True, displays a progress bar of the download to stderr """ kwargs["groups"] = 32 kwargs["width_per_group"] = 4 return ResNet(Bottleneck, [3, 4, 6, 3], **kwargs) @hub.pretrained( "https://data.megengine.org.cn/models/weights/resnext101_32x8d_fbaug_79520_80efb344.pkl" ) def resnext101_32x8d(**kwargs): r"""ResNeXt-101 32x8d model from `"Aggregated Residual Transformation for Deep Neural Networks" <https://arxiv.org/pdf/1611.05431.pdf>`_ Args: pretrained (bool): If True, returns a model pre-trained on ImageNet progress (bool): If True, displays a progress bar of the download to stderr """ kwargs["groups"] = 32 kwargs["width_per_group"] = 8 return ResNet(Bottleneck, [3, 4, 23, 3], **kwargs) 复制代码
class AverageMeter: """Computes and stores the average and current value""" def __init__(self, name, fmt=":.3f"): self.name = name self.fmt = fmt self.reset() def reset(self): self.val = 0 self.avg = 0 self.sum = 0 self.count = 0 def update(self, val, n=1): self.val = val self.sum += val * n self.count += n self.avg = self.sum / self.count def __str__(self): fmtstr = "{name} {val" + self.fmt + "} ({avg" + self.fmt + "})" return fmtstr.format(**self.__dict__)
def valid(func, data_queue, args): objs = AverageMeter("Loss") top1 = AverageMeter("Acc@1") top5 = AverageMeter("Acc@5") clck = AverageMeter("Time") t = time.time() for step, (image, label) in enumerate(data_queue): image = megengine.tensor(image, dtype="float32") label = megengine.tensor(label, dtype="int32") n = image.shape[0] loss, acc1, acc5 = func(image, label) objs.update(loss.item(), n) top1.update(100 * acc1.item(), n) top5.update(100 * acc5.item(), n) clck.update(time.time() - t, n) t = time.time() if step % args.print_freq == 0 and dist.get_rank() == 0: logging.info("Test step %d, %s %s %s %s", step, objs, top1, top5, clck) return objs.avg, top1.avg, top5.avg
3、Model 训练
def worker(args): # pylint: disable=too-many-statements if dist.get_rank() == 0: os.makedirs(os.path.join(args.save, args.arch), exist_ok=True) megengine.logger.set_log_file(os.path.join(args.save, args.arch, "log.txt")) # build dataset train_dataloader, valid_dataloader = build_dataset(args) train_queue = iter(train_dataloader) # infinite steps_per_epoch = 1280000 // (dist.get_world_size() * args.batch_size) # build model model = snet_model.__dict__[18]() # Sync parameters and buffers if dist.get_world_size() > 1: dist.bcast_list_(model.parameters()) dist.bcast_list_(model.buffers()) # Autodiff gradient manager gm = autodiff.GradManager().attach( model.parameters(), callbacks=dist.make_allreduce_cb("mean") if dist.get_world_size() > 1 else None, ) # Optimizer params_wd = [] params_nwd = [] for n, p in model.named_parameters(): if n.find("weight") >= 0 and len(p.shape) > 1: print("include ", n, p.shape) params_wd.append(p) else: print("NOT include ", n, p.shape) params_nwd.append(p) opt = optim.SGD( [ {"params": params_wd}, {"params": params_nwd, "weight_decay": 0}, ], lr=args.lr * dist.get_world_size(), momentum=args.momentum, weight_decay=args.weight_decay, ) # train and valid func def train_step(image, label): with gm: logits = model(image) loss = F.nn.cross_entropy(logits, label, label_smooth=0.1) acc1, acc5 = F.topk_accuracy(logits, label, topk=(1, 5)) gm.backward(loss) opt.step().clear_grad() return loss, acc1, acc5 def valid_step(image, label): logits = model(image) loss = F.nn.cross_entropy(logits, label, label_smooth=0.1) acc1, acc5 = F.topk_accuracy(logits, label, topk=(1, 5)) # calculate mean values if dist.get_world_size() > 1: loss = F.distributed.all_reduce_sum(loss) / dist.get_world_size() acc1 = F.distributed.all_reduce_sum(acc1) / dist.get_world_size() acc5 = F.distributed.all_reduce_sum(acc5) / dist.get_world_size() return loss, acc1, acc5 # linear learning rate scheduler def adjust_learning_rate(step): lr = args.lr * dist.get_world_size() * (1 - step / (args.epochs * steps_per_epoch)) for param_group in opt.param_groups: param_group["lr"] = lr return lr # start training objs = AverageMeter("Loss") top1 = AverageMeter("Acc@1") top5 = AverageMeter("Acc@5") clck = AverageMeter("Time") for step in range(0, args.epochs * steps_per_epoch): lr = adjust_learning_rate(step) t = time.time() image, label = next(train_queue) image = megengine.tensor(image, dtype="float32") label = megengine.tensor(label, dtype="int32") loss, acc1, acc5 = train_step(image, label) objs.update(loss.item()) top1.update(100 * acc1.item()) top5.update(100 * acc5.item()) clck.update(time.time() - t) if step % args.print_freq == 0 and dist.get_rank() == 0: logging.info( "Epoch %d Step %d, LR %.4f, %s %s %s %s", step // steps_per_epoch, step, lr, objs, top1, top5, clck, ) objs.reset() top1.reset() top5.reset() clck.reset() if (step + 1) % steps_per_epoch == 0: model.eval() _, valid_acc1, valid_acc5 = valid(valid_step, valid_dataloader, args) model.train() logging.info( "Epoch %d Test Acc@1 %.3f, Acc@5 %.3f", (step + 1) // steps_per_epoch, valid_acc1, valid_acc5, ) if dist.get_rank() == 0: megengine.save( { "epoch": (step + 1) // steps_per_epoch, "state_dict": model.state_dict(), }, os.path.join(args.save, args.arch, "checkpoint.pkl"), )
预测
对test_dataset数据进行预测
print('测试数据集样本量:{}'.format(len(test_dataset)))
测试数据集样本量:7
# 执行预测 result = model.predict(test_dataset)
Predict begin... step 7/7 [==============================] - 32ms/step Predict samples: 7
# 打印前10条看看结果 for idx in range(7): predict_label = str(np.argmax(result[0][idx])) real_label = str(test_dataset.__getitem__(idx)[1]) print('样本ID:{}, 真实标签:{}, 预测值:{}'.format(idx, real_label, predict_label))
样本ID:0, 真实标签:0, 预测值:0 样本ID:1, 真实标签:0, 预测值:0 样本ID:2, 真实标签:2, 预测值:2 样本ID:3, 真实标签:3, 预测值:3 样本ID:4, 真实标签:3, 预测值:3 样本ID:5, 真实标签:4, 预测值:0 样本ID:6, 真实标签:4, 预测值:1
# 定义画图方法 from PIL import Image import matplotlib.font_manager as font_manager import matplotlib.pyplot as plt %matplotlib inline fontpath = 'MINGHEI_R.TTF' font = font_manager.FontProperties(fname=fontpath, size=10) def show_img(img, predict): plt.figure() plt.title(predict, FontProperties=font) plt.imshow(img, cmap=plt.cm.binary) plt.show() # 抽样展示 origin_data=test_dataset.get_origin_data() for i in range(7): img_path=origin_data[i][0] real_label=str(origin_data[i][1]) predict_label= str(np.argmax(result[0][i])) img=Image.open(img_path) title='样本ID:{}, 真实标签:{}, 预测值:{}'.format(idx, real_label, predict_label) show_img(img, title)