目录
摘要
安装所需的库
导入需要的库
定义下载数据函数和解压函数
设置数据集保存路径
下载数据集
切分训练集、验证集和测试集
定义函数以预处理掩码
定义一个功能以可视化图像及其标签
训练和预测的图像大小
方法1.将所有图像和蒙版调整为固定大小(例如256x256像素)。
定义一个PyTorch数据集类
定义训练辅助方法
定义训练和验证方法
定义训练参数
训练模型
预测图像标签并可视化这些预测
方法1的完整代码(亲测可运行)
方法2.预测全尺寸图像的蒙版
方法3.使用原始图像。
摘要
本示例说明如何使用Albumentations 进行二进制语义分段。 我们将使用``牛津IIIT宠物数据集''。 任务是将输入图像的每个像素分类为宠物或背景。
安装所需的库
我们将使用TernausNet,这是一个为语义分割任务提供预训练的UNet模型的库。
pip install ternausnet
ternaus/TernausNet: UNet model with VGG11 encoder pre-trained on Kaggle Carvana dataset (github.com)
导入需要的库
from collections import defaultdict
import copy
import random
import os
import shutil
from urllib.request import urlretrieve
import albumentations as A
import albumentations.augmentations.functional as F
from albumentations.pytorch import ToTensorV2
import cv2
import matplotlib.pyplot as plt
import numpy as np
import ternausnet.models
from tqdm import tqdm
import torch
import torch.backends.cudnn as cudnn
import torch.nn as nn
import torch.optim
from torch.utils.data import Dataset, DataLoader
cudnn.benchmark = True
定义下载数据函数和解压函数
class TqdmUpTo(tqdm):
def update_to(self, b=1, bsize=1, tsize=None):
if tsize is not None:
self.total = tsize
self.update(b * bsize - self.n)
def download_url(url, filepath):
directory = os.path.dirname(os.path.abspath(filepath))
os.makedirs(directory, exist_ok=True)
if os.path.exists(filepath):
print("Dataset already exists on the disk. Skipping download.")
return
with TqdmUpTo(unit="B", unit_scale=True, unit_divisor=1024, miniters=1, desc=os.path.basename(filepath)) as t:
urlretrieve(url, filename=filepath, reporthook=t.update_to, data=None)
t.total = t.n
def extract_archive(filepath):
extract_dir = os.path.dirname(os.path.abspath(filepath))
shutil.unpack_archive(filepath, extract_dir)
设置数据集保存路径
dataset_directory ="datasets/oxford-iiit-pet"
下载数据集
filepath = os.path.join(dataset_directory, "images.tar.gz")
download_url(
url="https://www.robots.ox.ac.uk/~vgg/data/pets/data/images.tar.gz", filepath=filepath,
)
extract_archive(filepath)
Dataset already exists on the disk. Skipping download.
filepath = os.path.join(dataset_directory, "annotations.tar.gz")
download_url(
url="https://www.robots.ox.ac.uk/~vgg/data/pets/data/annotations.tar.gz", filepath=filepath,
)
extract_archive(filepath)
Dataset already exists on the disk. Skipping download.
切分训练集、验证集和测试集
数据集中的某些文件已损坏,因此我们将仅使用OpenCV可以正确加载的那些图像文件。 我们将使用6000张图像进行训练,使用1374张图像进行验证,并使用10张图像进行测试。
root_directory = os.path.join(dataset_directory)
images_directory = os.path.join(root_directory, "images")
masks_directory = os.path.join(root_directory, "annotations", "trimaps")
images_filenames = list(sorted(os.listdir(images_directory)))
correct_images_filenames = [i for i in images_filenames if cv2.imread(os.path.join(images_directory, i)) is not None]
random.seed(42)
random.shuffle(correct_images_filenames)
train_images_filenames = correct_images_filenames[:6000]
val_images_filenames = correct_images_filenames[6000:-10]
test_images_filenames = images_filenames[-10:]
print(len(train_images_filenames), len(val_images_filenames), len(test_images_filenames))
6000 1374 10
定义函数以预处理掩码
数据集包含像素级三图分割。 对于每个图像,都有一个带掩码的关联PNG文件。 掩码的大小等于相关图像的大小。 遮罩图像中的每个像素可以采用以下三个值之一:1、2或3.1表示图像的该像素属于``宠物''类别,``2''属于背景类别,``3''属于边界类别。 由于此示例演示了二进制分割的任务(即为每个像素分配两个类别之一),因此我们将对遮罩进行预处理,因此它将仅包含两个唯一值:如果像素是背景,则为0.0;如果像素是背景,则为1.0。 宠物或边界。
def preprocess_mask(mask):
mask = mask.astype(np.float32)
mask[mask == 2.0] = 0.0
mask[(mask == 1.0) | (mask == 3.0)] = 1.0
return mask
定义一个功能以可视化图像及其标签
让我们定义一个可视化函数,该函数将获取图像文件名的列表,带图像的目录的路径,带掩码的目录的路径以及带预测掩码的可选参数 (我们稍后将使用此参数来显示模型的预测)。
def display_image_grid(images_filenames, images_directory, masks_directory, predicted_masks=None):
cols = 3 if predicted_masks else 2
rows = len(images_filenames)
figure, ax = plt.subplots(nrows=rows, ncols=cols, figsize=(10, 24))
for i, image_filename in enumerate(images_filenames):
image = cv2.imread(os.path.join(images_directory, image_filename))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
mask = cv2.imread(os.path.join(masks_directory, image_filename.replace(".jpg", ".png")), cv2.IMREAD_UNCHANGED,)
mask = preprocess_mask(mask)
ax[i, 0].imshow(image)
ax[i, 1].imshow(mask, interpolation="nearest")
ax[i, 0].set_title("Image")
ax[i, 1].set_title("Ground truth mask")
ax[i, 0].set_axis_off()
ax[i, 1].set_axis_off()
if predicted_masks:
predicted_mask = predicted_masks[i]
ax[i, 2].imshow(predicted_mask, interpolation="nearest")
ax[i, 2].set_title("Predicted mask")
ax[i, 2].set_axis_off()
plt.tight_layout()
plt.show()
display_image_grid(test_images_filenames, images_directory, masks_directory)
训练和预测的图像大小
通常,用于训练和推理的图像具有不同的高度和宽度以及不同的纵横比。 这个事实给深度学习管道带来了两个挑战:-PyTorch要求一批中的所有图像都具有相同的高度和宽度。 -如果神经网络不是完全卷积,则在训练和推理过程中,所有图像都必须使用相同的宽度和高度。 完全卷积的体系结构(例如UNet)可以处理任何大小的图像。
有三种常见的方法来应对这些挑战:
1.在训练过程中,将所有图像和掩码调整为固定大小(例如256x256像素)。模型在推理过程中预测具有固定大小的蒙版后,将蒙版调整为原始图像大小。这种方法很简单,但是有一些缺点:-预测的蒙版小于图像,并且蒙版可能会丢失一些上下文和原始图像的重要细节。 -如果数据集中的图像具有不同的宽高比,则此方法可能会出现问题。例如,假设您要调整大小为1024x512像素(因此长宽比为2:1的图像)到256x256像素(长宽比为1:1)的图像。在这种情况下,这种变换会使图像失真,也可能影响预测的质量。
2.如果使用全卷积神经网络,则可以使用图像裁剪训练模型,但可以使用原始图像进行推理。此选项通常在质量,培训速度和硬件要求之间提供最佳折衷。
3.请勿更改图像的大小,并使用源图像进行训练和推理。使用这种方法,您将不会丢失任何信息。但是,原始图像可能会很大,因此它们可能需要大量的GPU内存。同样,此方法需要更多的培训时间才能获得良好的效果。
某些体系结构(例如UNet)要求必须通过网络的下采样因子(通常为32)将图像的大小整除,因此您可能还需要使用边框填充图像。Albumentations 为这种情况提供了一种特殊的转化方式。
以下示例显示了不同类型的图像的外观。
example_image_filename = correct_images_filenames[0]
image = cv2.imread(os.path.join(images_directory, example_image_filename))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
resized_image = F.resize(image, height=256, width=256)
padded_image = F.pad(image, min_height=512, min_width=512)
padded_constant_image = F.pad(image, min_height=512, min_width=512, border_mode=cv2.BORDER_CONSTANT)
cropped_image = F.center_crop(image, crop_height=256, crop_width=256)
figure, ax = plt.subplots(nrows=1, ncols=5, figsize=(18, 10))
ax.ravel()[0].imshow(image)
ax.ravel()[0].set_title("Original image")
ax.ravel()[1].imshow(resized_image)
ax.ravel()[1].set_title("Resized image")
ax.ravel()[2].imshow(cropped_image)
ax.ravel()[2].set_title("Cropped image")
ax.ravel()[3].imshow(padded_image)
ax.ravel()[3].set_title("Image padded with reflection")
ax.ravel()[4].imshow(padded_constant_image)
ax.ravel()[4].set_title("Image padded with constant padding")
plt.tight_layout()
plt.show()
在本教程中,我们将探讨处理图像大小的所有三种方法。
方法1.将所有图像和蒙版调整为固定大小(例如256x256像素)。
定义一个PyTorch数据集类
接下来,我们定义一个PyTorch数据集。 如果您不熟悉PyTorch数据集,请参阅本教程-https://pytorch.org/tutorials/beginner/data_loading_tutorial.html。 __init__将收到一个可选的转换参数。 它是“白化”增强管道的转换功能。 然后在__getitem__中,Dataset类将使用该函数来扩展图像和遮罩并返回其扩展版本。
class OxfordPetDataset(Dataset):
def __init__(self, images_filenames, images_directory, masks_directory, transform=None):
self.images_filenames = images_filenames
self.images_directory = images_directory
self.masks_directory = masks_directory
self.transform = transform
def __len__(self):
return len(self.images_filenames)
def __getitem__(self, idx):
image_filename = self.images_filenames[idx]
image = cv2.imread(os.path.join(self.images_directory, image_filename))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
mask = cv2.imread(
os.path.join(self.masks_directory, image_filename.replace(".jpg", ".png")), cv2.IMREAD_UNCHANGED,
)
mask = preprocess_mask(mask)
if self.transform is not None:
transformed = self.transform(image=image, mask=mask)
image = transformed["image"]
mask = transformed["mask"]
return image, mask
接下来,我们为训练和验证数据集创建增强管道。 请注意,我们使用A.Resize(256,256)将输入图像和蒙版的大小调整为256x256像素。
train_transform = A.Compose(
[
A.Resize(256, 256),
A.ShiftScaleRotate(shift_limit=0.2, scale_limit=0.2, rotate_limit=30, p=0.5),
A.RGBShift(r_shift_limit=25, g_shift_limit=25, b_shift_limit=25, p=0.5),
A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.3, p=0.5),
A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),
]
)
train_dataset = OxfordPetDataset(train_images_filenames, images_directory, masks_directory, transform=train_transform,)
val_transform = A.Compose(
[A.Resize(256, 256), A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)), ToTensorV2()]
)
val_dataset = OxfordPetDataset(val_images_filenames, images_directory, masks_directory, transform=val_transform,)
让我们定义一个函数,该函数获取数据集并可视化应用于相同图像和关联蒙版的不同增强。
def visualize_augmentations(dataset, idx=0, samples=5):
dataset = copy.deepcopy(dataset)
dataset.transform = A.Compose([t for t in dataset.transform if not isinstance(t, (A.Normalize, ToTensorV2))])
figure, ax = plt.subplots(nrows=samples, ncols=2, figsize=(10, 24))
for i in range(samples):
image, mask = dataset[idx]
ax[i, 0].imshow(image)
ax[i, 1].imshow(mask, interpolation="nearest")
ax[i, 0].set_title("Augmented image")
ax[i, 1].set_title("Augmented mask")
ax[i, 0].set_axis_off()
ax[i, 1].set_axis_off()
plt.tight_layout()
plt.show()
random.seed(42)
visualize_augmentations(train_dataset, idx=55)
定义训练辅助方法
MetricMonitor helps to track metrics such as accuracy or loss during training and validation.
class MetricMonitor:
def __init__(self, float_precision=3):
self.float_precision = float_precision
self.reset()
def reset(self):
self.metrics = defaultdict(lambda: {"val": 0, "count": 0, "avg": 0})
def update(self, metric_name, val):
metric = self.metrics[metric_name]
metric["val"] += val
metric["count"] += 1
metric["avg"] = metric["val"] / metric["count"]
def __str__(self):
return " | ".join(
[
"{metric_name}: {avg:.{float_precision}f}".format(
metric_name=metric_name, avg=metric["avg"], float_precision=self.float_precision
)
for (metric_name, metric) in self.metrics.items()
]
)
定义训练和验证方法
def train(train_loader, model, criterion, optimizer, epoch, params):
metric_monitor = MetricMonitor()
model.train()
stream = tqdm(train_loader)
for i, (images, target) in enumerate(stream, start=1):
images = images.to(params["device"], non_blocking=True)
target = target.to(params["device"], non_blocking=True)
output = model(images).squeeze(1)
loss = criterion(output, target)
metric_monitor.update("Loss", loss.item())
optimizer.zero_grad()
loss.backward()
optimizer.step()
stream.set_description(
"Epoch: {epoch}. Train. {metric_monitor}".format(epoch=epoch, metric_monitor=metric_monitor)
)
def validate(val_loader, model, criterion, epoch, params):
metric_monitor = MetricMonitor()
model.eval()
stream = tqdm(val_loader)
with torch.no_grad():
for i, (images, target) in enumerate(stream, start=1):
images = images.to(params["device"], non_blocking=True)
target = target.to(params["device"], non_blocking=True)
output = model(images).squeeze(1)
loss = criterion(output, target)
metric_monitor.update("Loss", loss.item())
stream.set_description(
"Epoch: {epoch}. Validation. {metric_monitor}".format(epoch=epoch, metric_monitor=metric_monitor)
)
def create_model(params):
model = getattr(ternausnet.models, params["model"])(pretrained=True)
model = model.to(params["device"])
return model
def train_and_validate(model, train_dataset, val_dataset, params):
train_loader = DataLoader(
train_dataset,
batch_size=params["batch_size"],
shuffle=True,
num_workers=params["num_workers"],
pin_memory=True,
)
val_loader = DataLoader(
val_dataset,
batch_size=params["batch_size"],
shuffle=False,
num_workers=params["num_workers"],
pin_memory=True,
)
criterion = nn.BCEWithLogitsLoss().to(params["device"])
optimizer = torch.optim.Adam(model.parameters(), lr=params["lr"])
for epoch in range(1, params["epochs"] + 1):
train(train_loader, model, criterion, optimizer, epoch, params)
validate(val_loader, model, criterion, epoch, params)
return model
def predict(model, params, test_dataset, batch_size):
test_loader = DataLoader(
test_dataset, batch_size=batch_size, shuffle=False, num_workers=params["num_workers"], pin_memory=True,
)
model.eval()
predictions = []
with torch.no_grad():
for images, (original_heights, original_widths) in test_loader:
images = images.to(params["device"], non_blocking=True)
output = model(images)
probabilities = torch.sigmoid(output.squeeze(1))
predicted_masks = (probabilities >= 0.5).float() * 1
predicted_masks = predicted_masks.cpu().numpy()
for predicted_mask, original_height, original_width in zip(
predicted_masks, original_heights.numpy(), original_widths.numpy()
):
predictions.append((predicted_mask, original_height, original_width))
return predictions
定义训练参数
在这里,我们定义了一些训练参数,例如模型架构,学习率,批量大小,epoch等。
params = {
"model": "UNet11",
"device": "cuda",
"lr": 0.001,
"batch_size": 16,
"num_workers": 4,
"epochs": 10,
}
训练模型
model = create_model(params)
model = train_and_validate(model, train_dataset, val_dataset, params)
Epoch: 1. Train. Loss: 0.415: 100%|██████████| 375/375 [01:42<00:00, 3.66it/s]
Epoch: 1. Validation. Loss: 0.210: 100%|██████████| 86/86 [00:09<00:00, 9.55it/s]
Epoch: 2. Train. Loss: 0.257: 100%|██████████| 375/375 [01:40<00:00, 3.75it/s]
Epoch: 2. Validation. Loss: 0.178: 100%|██████████| 86/86 [00:08<00:00, 10.62it/s]
Epoch: 3. Train. Loss: 0.221: 100%|██████████| 375/375 [01:39<00:00, 3.75it/s]
Epoch: 3. Validation. Loss: 0.168: 100%|██████████| 86/86 [00:08<00:00, 10.58it/s]
Epoch: 4. Train. Loss: 0.209: 100%|██████████| 375/375 [01:40<00:00, 3.75it/s]
Epoch: 4. Validation. Loss: 0.156: 100%|██████████| 86/86 [00:08<00:00, 10.57it/s]
Epoch: 5. Train. Loss: 0.190: 100%|██████████| 375/375 [01:40<00:00, 3.75it/s]
Epoch: 5. Validation. Loss: 0.149: 100%|██████████| 86/86 [00:08<00:00, 10.57it/s]
Epoch: 6. Train. Loss: 0.179: 100%|██████████| 375/375 [01:39<00:00, 3.75it/s]
Epoch: 6. Validation. Loss: 0.155: 100%|██████████| 86/86 [00:08<00:00, 10.55it/s]
Epoch: 7. Train. Loss: 0.175: 100%|██████████| 375/375 [01:40<00:00, 3.75it/s]
Epoch: 7. Validation. Loss: 0.147: 100%|██████████| 86/86 [00:08<00:00, 10.59it/s]
Epoch: 8. Train. Loss: 0.167: 100%|██████████| 375/375 [01:40<00:00, 3.75it/s]
Epoch: 8. Validation. Loss: 0.146: 100%|██████████| 86/86 [00:08<00:00, 10.61it/s]
Epoch: 9. Train. Loss: 0.165: 100%|██████████| 375/375 [01:40<00:00, 3.75it/s]
Epoch: 9. Validation. Loss: 0.131: 100%|██████████| 86/86 [00:08<00:00, 10.56it/s]
Epoch: 10. Train. Loss: 0.156: 100%|██████████| 375/375 [01:40<00:00, 3.75it/s]
Epoch: 10. Validation. Loss: 0.140: 100%|██████████| 86/86 [00:08<00:00, 10.60it/s]
预测图像标签并可视化这些预测
现在我们有了训练好的模型,因此让我们尝试预测某些图像的蒙版。 请注意,__ getitem__方法不仅返回图像,还返回图像的原始高度和宽度。 我们将使用这些值将预测蒙版的大小从256x256像素调整为原始图像的大小。
class OxfordPetInferenceDataset(Dataset):
def __init__(self, images_filenames, images_directory, transform=None):
self.images_filenames = images_filenames
self.images_directory = images_directory
self.transform = transform
def __len__(self):
return len(self.images_filenames)
def __getitem__(self, idx):
image_filename = self.images_filenames[idx]
image = cv2.imread(os.path.join(self.images_directory, image_filename))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
original_size = tuple(image.shape[:2])
if self.transform is not None:
transformed = self.transform(image=image)
image = transformed["image"]
return image, original_size
test_transform = A.Compose(
[A.Resize(256, 256), A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)), ToTensorV2()]
)
test_dataset = OxfordPetInferenceDataset(test_images_filenames, images_directory, transform=test_transform,)
predictions = predict(model, params, test_dataset, batch_size=16)
接下来,我们将将256x256像素大小的预测蒙版调整为原始图像的大小。
predicted_masks = []
for predicted_256x256_mask, original_height, original_width in predictions:
full_sized_mask = F.resize(
predicted_256x256_mask, height=original_height, width=original_width, interpolation=cv2.INTER_NEAREST
)
predicted_masks.append(full_sized_mask)
display_image_grid(test_images_filenames, images_directory, masks_directory, predicted_masks=predicted_masks)
方法1的完整代码(亲测可运行)
from collections import defaultdict
import copy
import random
import os
import shutil
from urllib.request import urlretrieve
import albumentations as A
import albumentations.augmentations.functional as F
from albumentations.pytorch import ToTensorV2
import cv2
import matplotlib.pyplot as plt
import numpy as np
import ternausnet.models
from tqdm import tqdm
import torch
import torch.backends.cudnn as cudnn
import torch.nn as nn
import torch.optim
from torch.utils.data import Dataset, DataLoader
cudnn.benchmark = True
class TqdmUpTo(tqdm):
def update_to(self, b=1, bsize=1, tsize=None):
if tsize is not None:
self.total = tsize
self.update(b * bsize - self.n)
def download_url(url, filepath):
directory = os.path.dirname(os.path.abspath(filepath))
os.makedirs(directory, exist_ok=True)
if os.path.exists(filepath):
print("Dataset already exists on the disk. Skipping download.")
return
with TqdmUpTo(unit="B", unit_scale=True, unit_divisor=1024, miniters=1, desc=os.path.basename(filepath)) as t:
urlretrieve(url, filename=filepath, reporthook=t.update_to, data=None)
t.total = t.n
def extract_archive(filepath):
extract_dir = os.path.dirname(os.path.abspath(filepath))
shutil.unpack_archive(filepath, extract_dir)
def preprocess_mask(mask):
mask = mask.astype(np.float32)
mask[mask == 2.0] = 0.0
mask[(mask == 1.0) | (mask == 3.0)] = 1.0
return mask
def display_image_grid(images_filenames, images_directory, masks_directory, predicted_masks=None):
cols = 3 if predicted_masks else 2
rows = len(images_filenames)
figure, ax = plt.subplots(nrows=rows, ncols=cols, figsize=(10, 24))
for i, image_filename in enumerate(images_filenames):
image = cv2.imread(os.path.join(images_directory, image_filename))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
mask = cv2.imread(os.path.join(masks_directory, image_filename.replace(".jpg", ".png")), cv2.IMREAD_UNCHANGED, )
mask = preprocess_mask(mask)
ax[i, 0].imshow(image)
ax[i, 1].imshow(mask, interpolation="nearest")
ax[i, 0].set_title("Image")
ax[i, 1].set_title("Ground truth mask")
ax[i, 0].set_axis_off()
ax[i, 1].set_axis_off()
if predicted_masks:
predicted_mask = predicted_masks[i]
ax[i, 2].imshow(predicted_mask, interpolation="nearest")
ax[i, 2].set_title("Predicted mask")
ax[i, 2].set_axis_off()
plt.tight_layout()
plt.show()
class OxfordPetDataset(Dataset):
def __init__(self, images_filenames, images_directory, masks_directory, transform=None):
self.images_filenames = images_filenames
self.images_directory = images_directory
self.masks_directory = masks_directory
self.transform = transform
def __len__(self):
return len(self.images_filenames)
def __getitem__(self, idx):
image_filename = self.images_filenames[idx]
image = cv2.imread(os.path.join(self.images_directory, image_filename))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
mask = cv2.imread(
os.path.join(self.masks_directory, image_filename.replace(".jpg", ".png")), cv2.IMREAD_UNCHANGED,
)
mask = preprocess_mask(mask)
if self.transform is not None:
transformed = self.transform(image=image, mask=mask)
image = transformed["image"]
mask = transformed["mask"]
return image, mask
def visualize_augmentations(dataset, idx=0, samples=5):
dataset = copy.deepcopy(dataset)
dataset.transform = A.Compose([t for t in dataset.transform if not isinstance(t, (A.Normalize, ToTensorV2))])
figure, ax = plt.subplots(nrows=samples, ncols=2, figsize=(10, 24))
for i in range(samples):
image, mask = dataset[idx]
ax[i, 0].imshow(image)
ax[i, 1].imshow(mask, interpolation="nearest")
ax[i, 0].set_title("Augmented image")
ax[i, 1].set_title("Augmented mask")
ax[i, 0].set_axis_off()
ax[i, 1].set_axis_off()
plt.tight_layout()
plt.show()
class MetricMonitor:
def __init__(self, float_precision=3):
self.float_precision = float_precision
self.reset()
def reset(self):
self.metrics = defaultdict(lambda: {"val": 0, "count": 0, "avg": 0})
def update(self, metric_name, val):
metric = self.metrics[metric_name]
metric["val"] += val
metric["count"] += 1
metric["avg"] = metric["val"] / metric["count"]
def __str__(self):
return " | ".join(
[
"{metric_name}: {avg:.{float_precision}f}".format(
metric_name=metric_name, avg=metric["avg"], float_precision=self.float_precision
)
for (metric_name, metric) in self.metrics.items()
]
)
def train(train_loader, model, criterion, optimizer, epoch, params):
metric_monitor = MetricMonitor()
model.train()
stream = tqdm(train_loader)
for i, (images, target) in enumerate(stream, start=1):
images = images.to(params["device"], non_blocking=True)
target = target.to(params["device"], non_blocking=True)
output = model(images).squeeze(1)
loss = criterion(output, target)
metric_monitor.update("Loss", loss.item())
optimizer.zero_grad()
loss.backward()
optimizer.step()
stream.set_description(
"Epoch: {epoch}. Train. {metric_monitor}".format(epoch=epoch, metric_monitor=metric_monitor)
)
def validate(val_loader, model, criterion, epoch, params):
metric_monitor = MetricMonitor()
model.eval()
stream = tqdm(val_loader)
with torch.no_grad():
for i, (images, target) in enumerate(stream, start=1):
images = images.to(params["device"], non_blocking=True)
target = target.to(params["device"], non_blocking=True)
output = model(images).squeeze(1)
loss = criterion(output, target)
metric_monitor.update("Loss", loss.item())
stream.set_description(
"Epoch: {epoch}. Validation. {metric_monitor}".format(epoch=epoch, metric_monitor=metric_monitor)
)
def create_model(params):
model = getattr(ternausnet.models, params["model"])(pretrained=True)
model = model.to(params["device"])
return model
def train_and_validate(model, train_dataset, val_dataset, params):
train_loader = DataLoader(
train_dataset,
batch_size=params["batch_size"],
shuffle=True,
num_workers=params["num_workers"],
pin_memory=True,
)
val_loader = DataLoader(
val_dataset,
batch_size=params["batch_size"],
shuffle=False,
num_workers=params["num_workers"],
pin_memory=True,
)
criterion = nn.BCEWithLogitsLoss().to(params["device"])
optimizer = torch.optim.Adam(model.parameters(), lr=params["lr"])
for epoch in range(1, params["epochs"] + 1):
train(train_loader, model, criterion, optimizer, epoch, params)
validate(val_loader, model, criterion, epoch, params)
return model
def predict(model, params, test_dataset, batch_size):
test_loader = DataLoader(
test_dataset, batch_size=batch_size, shuffle=False, num_workers=params["num_workers"], pin_memory=True,
)
model.eval()
predictions = []
with torch.no_grad():
for images, (original_heights, original_widths) in test_loader:
images = images.to(params["device"], non_blocking=True)
output = model(images)
probabilities = torch.sigmoid(output.squeeze(1))
predicted_masks = (probabilities >= 0.5).float() * 1
predicted_masks = predicted_masks.cpu().numpy()
for predicted_mask, original_height, original_width in zip(
predicted_masks, original_heights.numpy(), original_widths.numpy()
):
predictions.append((predicted_mask, original_height, original_width))
return predictions
class OxfordPetInferenceDataset(Dataset):
def __init__(self, images_filenames, images_directory, transform=None):
self.images_filenames = images_filenames
self.images_directory = images_directory
self.transform = transform
def __len__(self):
return len(self.images_filenames)
def __getitem__(self, idx):
image_filename = self.images_filenames[idx]
image = cv2.imread(os.path.join(self.images_directory, image_filename))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
original_size = tuple(image.shape[:2])
if self.transform is not None:
transformed = self.transform(image=image)
image = transformed["image"]
return image, original_size
if __name__ == '__main__':
dataset_directory = "datasets/oxford-iiit-pet"
filepath = os.path.join(dataset_directory, "images.tar.gz")
download_url(
url="https://www.robots.ox.ac.uk/~vgg/data/pets/data/images.tar.gz", filepath=filepath,
)
extract_archive(filepath)
filepath = os.path.join(dataset_directory, "annotations.tar.gz")
download_url(
url="https://www.robots.ox.ac.uk/~vgg/data/pets/data/annotations.tar.gz", filepath=filepath,
)
extract_archive(filepath)
root_directory = os.path.join(dataset_directory)
images_directory = os.path.join(root_directory, "images")
masks_directory = os.path.join(root_directory, "annotations", "trimaps")
images_filenames = list(sorted(os.listdir(images_directory)))
correct_images_filenames = [i for i in images_filenames if
cv2.imread(os.path.join(images_directory, i)) is not None]
random.seed(42)
random.shuffle(correct_images_filenames)
train_images_filenames = correct_images_filenames[:6000]
val_images_filenames = correct_images_filenames[6000:-10]
test_images_filenames = images_filenames[-10:]
print(len(train_images_filenames), len(val_images_filenames), len(test_images_filenames))
display_image_grid(test_images_filenames, images_directory, masks_directory)
example_image_filename = correct_images_filenames[0]
image = cv2.imread(os.path.join(images_directory, example_image_filename))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
resized_image = F.resize(image, height=256, width=256)
padded_image = F.pad(image, min_height=512, min_width=512)
padded_constant_image = F.pad(image, min_height=512, min_width=512, border_mode=cv2.BORDER_CONSTANT)
cropped_image = F.center_crop(image, crop_height=256, crop_width=256)
figure, ax = plt.subplots(nrows=1, ncols=5, figsize=(18, 10))
ax.ravel()[0].imshow(image)
ax.ravel()[0].set_title("Original image")
ax.ravel()[1].imshow(resized_image)
ax.ravel()[1].set_title("Resized image")
ax.ravel()[2].imshow(cropped_image)
ax.ravel()[2].set_title("Cropped image")
ax.ravel()[3].imshow(padded_image)
ax.ravel()[3].set_title("Image padded with reflection")
ax.ravel()[4].imshow(padded_constant_image)
ax.ravel()[4].set_title("Image padded with constant padding")
plt.tight_layout()
plt.show()
train_transform = A.Compose(
[
A.Resize(256, 256),
A.ShiftScaleRotate(shift_limit=0.2, scale_limit=0.2, rotate_limit=30, p=0.5),
A.RGBShift(r_shift_limit=25, g_shift_limit=25, b_shift_limit=25, p=0.5),
A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.3, p=0.5),
A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),
]
)
train_dataset = OxfordPetDataset(train_images_filenames, images_directory, masks_directory,
transform=train_transform, )
val_transform = A.Compose(
[A.Resize(256, 256), A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)), ToTensorV2()]
)
val_dataset = OxfordPetDataset(val_images_filenames, images_directory, masks_directory, transform=val_transform, )
random.seed(42)
visualize_augmentations(train_dataset, idx=55)
params = {
"model": "UNet11",
"device": "cuda",
"lr": 0.001,
"batch_size": 8,
"num_workers": 0,
"epochs": 10,
}
model = create_model(params)
model = train_and_validate(model, train_dataset, val_dataset, params)
test_transform = A.Compose(
[A.Resize(256, 256), A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)), ToTensorV2()]
)
test_dataset = OxfordPetInferenceDataset(test_images_filenames, images_directory, transform=test_transform, )
predictions = predict(model, params, test_dataset, batch_size=16)
predicted_masks = []
for predicted_256x256_mask, original_height, original_width in predictions:
full_sized_mask = F.resize(
predicted_256x256_mask, height=original_height, width=original_width, interpolation=cv2.INTER_NEAREST
)
predicted_masks.append(full_sized_mask)
display_image_grid(test_images_filenames, images_directory, masks_directory, predicted_masks=predicted_masks)
方法2.预测全尺寸图像的蒙版
我们将重用上一个示例中的大多数代码。
数据集中同一图像的高度和宽度小于裁剪大小(256x256像素),因此我们首先应用A.PadIfNeeded(min_height = 256,min_width = 256),如果图像的高度或宽度小于 256像素。
train_transform = A.Compose(
[
A.PadIfNeeded(min_height=256, min_width=256),
A.RandomCrop(256, 256),
A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=15, p=0.5),
A.RGBShift(r_shift_limit=15, g_shift_limit=15, b_shift_limit=15, p=0.5),
A.RandomBrightnessContrast(p=0.5),
A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),
]
)
train_dataset = OxfordPetDataset(train_images_filenames, images_directory, masks_directory, transform=train_transform,)
val_transform = A.Compose(
[
A.PadIfNeeded(min_height=256, min_width=256),
A.CenterCrop(256, 256),
A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),
]
)
val_dataset = OxfordPetDataset(val_images_filenames, images_directory, masks_directory, transform=val_transform,)
params = {
"model": "UNet11",
"device": "cuda",
"lr": 0.001,
"batch_size": 16,
"num_workers": 4,
"epochs": 10,
}
model = create_model(params)
model = train_and_validate(model, train_dataset, val_dataset, params)
Epoch: 1. Train. Loss: 0.445: 100%|██████████| 375/375 [01:40<00:00, 3.75it/s]
Epoch: 1. Validation. Loss: 0.279: 100%|██████████| 86/86 [00:08<00:00, 10.49it/s]
Epoch: 2. Train. Loss: 0.311: 100%|██████████| 375/375 [01:39<00:00, 3.75it/s]
Epoch: 2. Validation. Loss: 0.238: 100%|██████████| 86/86 [00:08<00:00, 10.51it/s]
Epoch: 3. Train. Loss: 0.259: 100%|██████████| 375/375 [01:39<00:00, 3.75it/s]
Epoch: 3. Validation. Loss: 0.206: 100%|██████████| 86/86 [00:08<00:00, 10.54it/s]
Epoch: 4. Train. Loss: 0.244: 100%|██████████| 375/375 [01:39<00:00, 3.75it/s]
Epoch: 4. Validation. Loss: 0.211: 100%|██████████| 86/86 [00:08<00:00, 10.54it/s]
Epoch: 5. Train. Loss: 0.224: 100%|██████████| 375/375 [01:40<00:00, 3.74it/s]
Epoch: 5. Validation. Loss: 0.270: 100%|██████████| 86/86 [00:08<00:00, 10.47it/s]
Epoch: 6. Train. Loss: 0.207: 100%|██████████| 375/375 [01:40<00:00, 3.75it/s]
Epoch: 6. Validation. Loss: 0.169: 100%|██████████| 86/86 [00:08<00:00, 10.56it/s]
Epoch: 7. Train. Loss: 0.212: 100%|██████████| 375/375 [01:40<00:00, 3.75it/s]
Epoch: 7. Validation. Loss: 0.169: 100%|██████████| 86/86 [00:08<00:00, 10.56it/s]
Epoch: 8. Train. Loss: 0.189: 100%|██████████| 375/375 [01:40<00:00, 3.75it/s]
Epoch: 8. Validation. Loss: 0.201: 100%|██████████| 86/86 [00:08<00:00, 10.52it/s]
Epoch: 9. Train. Loss: 0.185: 100%|██████████| 375/375 [01:39<00:00, 3.75it/s]
Epoch: 9. Validation. Loss: 0.162: 100%|██████████| 86/86 [00:08<00:00, 10.54it/s]
Epoch: 10. Train. Loss: 0.187: 100%|██████████| 375/375 [01:39<00:00, 3.75it/s]
Epoch: 10. Validation. Loss: 0.159: 100%|██████████| 86/86 [00:08<00:00, 10.49it/s]
测试数据集中的所有图像的最大边尺寸为500像素。 由于PyTorch要求一批中的所有图像都必须具有相同的尺寸,并且UNet要求图像的大小可以被16整除,因此我们将应用A.PadIfNeeded(min_height = 512,min_width = 512,border_mode = cv2。 BORDER_CONSTANT)。 该增加将使图像边界填充零,因此图像大小将变为512x512像素。
test_transform = A.Compose(
[
A.PadIfNeeded(min_height=512, min_width=512, border_mode=cv2.BORDER_CONSTANT),
A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),
]
)
test_dataset = OxfordPetInferenceDataset(test_images_filenames, images_directory, transform=test_transform,)
predictions = predict(model, params, test_dataset, batch_size=16)
由于我们收到了填充图像的蒙版,因此我们需要从填充蒙版中裁剪出原始图像尺寸的一部分。
predicted_masks = []
for predicted_padded_mask, original_height, original_width in predictions:
cropped_mask = F.center_crop(predicted_padded_mask, original_height, original_width)
predicted_masks.append(cropped_mask)
display_image_grid(test_images_filenames, images_directory, masks_directory, predicted_masks=predicted_masks)
方法3.使用原始图像。
我们也可以使用原始图像而无需调整大小或裁剪它们。 但是,此数据集存在问题。 数据集中的一些图像是如此之大,以至于即使batch_size = 1,它们也需要超过11Gb的GPU内存进行训练。 因此,作为折衷方案,我们将首先应用A.LongestMaxSize(512)增强功能,以确保图像的最大尺寸不超过512像素。 在7384个数据集图像中,这种增加将仅影响137个。
接下来将使用A.PadIfNeeded(min_height = 512,min_width = 512)确保一批中的所有图像尺寸均为512x512像素。
train_transform = A.Compose(
[
A.LongestMaxSize(512),
A.PadIfNeeded(min_height=512, min_width=512),
A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=15, p=0.5),
A.RGBShift(r_shift_limit=15, g_shift_limit=15, b_shift_limit=15, p=0.5),
A.RandomBrightnessContrast(p=0.5),
A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),
]
)
train_dataset = OxfordPetDataset(train_images_filenames, images_directory, masks_directory, transform=train_transform,)
val_transform = A.Compose(
[
A.LongestMaxSize(512),
A.PadIfNeeded(min_height=512, min_width=512, border_mode=cv2.BORDER_CONSTANT),
A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),
]
)
val_dataset = OxfordPetDataset(val_images_filenames, images_directory, masks_directory, transform=val_transform,)
params = {
"model": "UNet11",
"device": "cuda",
"lr": 0.001,
"batch_size": 8,
"num_workers": 4,
"epochs": 10,
}
model = create_model(params)
model = train_and_validate(model, train_dataset, val_dataset, params)
Epoch: 1. Train. Loss: 0.442: 100%|██████████| 750/750 [06:58<00:00, 1.79it/s]
Epoch: 1. Validation. Loss: 0.225: 100%|██████████| 172/172 [00:35<00:00, 4.80it/s]
Epoch: 2. Train. Loss: 0.283: 100%|██████████| 750/750 [06:54<00:00, 1.81it/s]
Epoch: 2. Validation. Loss: 0.188: 100%|██████████| 172/172 [00:34<00:00, 4.99it/s]
Epoch: 3. Train. Loss: 0.234: 100%|██████████| 750/750 [06:53<00:00, 1.81it/s]
Epoch: 3. Validation. Loss: 0.154: 100%|██████████| 172/172 [00:34<00:00, 4.96it/s]
Epoch: 4. Train. Loss: 0.211: 100%|██████████| 750/750 [06:53<00:00, 1.81it/s]
Epoch: 4. Validation. Loss: 0.136: 100%|██████████| 172/172 [00:34<00:00, 4.99it/s]
Epoch: 5. Train. Loss: 0.196: 100%|██████████| 750/750 [06:53<00:00, 1.81it/s]
Epoch: 5. Validation. Loss: 0.131: 100%|██████████| 172/172 [00:34<00:00, 4.96it/s]
Epoch: 6. Train. Loss: 0.187: 100%|██████████| 750/750 [06:53<00:00, 1.81it/s]
Epoch: 6. Validation. Loss: 0.151: 100%|██████████| 172/172 [00:34<00:00, 4.98it/s]
Epoch: 7. Train. Loss: 0.177: 100%|██████████| 750/750 [06:53<00:00, 1.81it/s]
Epoch: 7. Validation. Loss: 0.127: 100%|██████████| 172/172 [00:34<00:00, 4.98it/s]
Epoch: 8. Train. Loss: 0.171: 100%|██████████| 750/750 [06:53<00:00, 1.81it/s]
Epoch: 8. Validation. Loss: 0.113: 100%|██████████| 172/172 [00:34<00:00, 4.99it/s]
Epoch: 9. Train. Loss: 0.162: 100%|██████████| 750/750 [06:54<00:00, 1.81it/s]
Epoch: 9. Validation. Loss: 0.143: 100%|██████████| 172/172 [00:34<00:00, 4.94it/s]
Epoch: 10. Train. Loss: 0.157: 100%|██████████| 750/750 [06:53<00:00, 1.81it/s]
Epoch: 10. Validation. Loss: 0.115: 100%|██████████| 172/172 [00:34<00:00, 4.97it/s]
接下来,我们将使用与方法2中相同的代码进行预测。
test_transform = A.Compose(
[
A.PadIfNeeded(min_height=512, min_width=512, border_mode=cv2.BORDER_CONSTANT),
A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),
]
)
test_dataset = OxfordPetInferenceDataset(test_images_filenames, images_directory, transform=test_transform,)
predictions = predict(model, params, test_dataset, batch_size=16)
predicted_masks = []
for predicted_padded_mask, original_height, original_width in predictions:
cropped_mask = F.center_crop(predicted_padded_mask, original_height, original_width)
predicted_masks.append(cropped_mask)
display_image_grid(test_images_filenames, images_directory, masks_directory, predicted_masks=predicted_masks)