一、数据集介绍
数据集下载地址:https://www.kaggle.com/datasets/carlosmiao/dogbreedidentification
狗的品种共120种10222张训练图片
train和test文件夹里面是图片
labels.csv种记录着图片对应的标签值。
关于mobileNet模型,这里不做过多介绍,只是迁移学习用到了,当然,你完全可以自定义网络
二、实战
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import datetime
import glob
import wget
import PIL
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.model_selection import train_test_split
import tensorflow as tf
import tensorflow_hub as hub
# from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import load_model
from plot_model import plot_model
import warnings
warnings.filterwarnings('ignore')
2.1 数据预处理
# 数据集所在路径
dataset_dir = "dog-breed-identification/"
2.1.1 第1步:对labels标签的预处理
# 读取labels.csv文件
# 说明:
# labels.csv文件中的每一行数据 与 文件夹train中的图片一一对应
labels = pd.read_csv(os.path.join(dataset_dir, 'labels.csv'))
labels.shape
# 默认显示前5行
labels.head()
# 说明:
# id : 每张图片的文件名;
# bread : 每张图片所对应狗的品种;
# 数据表描述
labels.describe()
# 解释:
# ① 一共有10222张训练图片;
# ② 不同的breed(品种)共有120种,即狗的不同类别一共120种;
# 狗的不同品种名称(标签类别)
classes_names = np.unique(labels['breed'])
print("一共有{}种类型的狗".format(len(classes_names)))
# 狗的类别名称
classes_names
# 一共有10222张狗的图片,一共120个品种,统计每个品种的狗对应的图片数量
plt.figure(figsize=(16, 8))
sns.countplot(labels['breed'])
plt.xticks([])
plt.show()
# 对labels进行labelencoding和onehotencoding
lb_encoder = LabelEncoder()
new_labels = lb_encoder.fit_transform(labels['breed']).reshape(-1, 1) # 转换后的结果变换为一列
new_labels.shape
# 显示前10个标签值
new_labels[:10]
ot_encoder = OneHotEncoder()
# 注意:这里需要转换成array类型,否则是matrix
new_labels = ot_encoder.fit_transform(new_labels).toarray()
new_labels.shape
type(new_labels)
new_labels[0] # 显示第一个标签对应的onehot向量
2.1.2 第2步:对图片数据的预处理
# 图片数据集的路径
train_path = os.path.join(dataset_dir, 'train')
train_path
# 统计train文件夹下一共有多少图片
images_paths = glob.glob(train_path+'/*.jpg')
len(images_paths)
# 显示前5张图片的路径
images_paths[:5]
# 加载每一张图片,转换为tensor类型,并进行归一化处理
IMG_SIZE = 224 # 由于MobileNet v2的输入shape=(224,224,3)
def process_image(img_path):
# 加载图片
img = tf.io.read_file(img_path)
# 将jpg格式转换为tensor
img = tf.image.decode_jpeg(img, channels=3)
# 数据归一化,(0,255) --> [0,1)
img = tf.image.convert_image_dtype(img, dtype=tf.float32)
# resize 调整图像大小
img = tf.image.resize(img, size=[IMG_SIZE, IMG_SIZE])
# 返回
return img
# 将每一张图片与labels中的标签一一关联
def get_img_label(img_path, label):
image = process_image(img_path) # 调用函数 process_image
return image, label # 按元组形式返回
# 将图片数据与标签数据进行切分,分离成:train 和 val 数据集
def train_data_split(images, labels, ratio=0.2):
'''
images : 图片数据路径
labels : 标签数据
ratio : 验证数据所占比例
random_state:随机数种子
'''
x_train, x_val, y_train, y_val = train_test_split(images, labels, test_size=ratio, random_state=666)
return x_train, x_val, y_train, y_val
x_train, x_val, y_train, y_val = train_data_split(images_paths, new_labels, ratio=0.2)
len(x_train) # 训练集图片路径数量
x_train[:5] # 注意:目前训练集数据仍然是图片的路径
验证机图片数量:
len(x_val) # 验证集图片路径数量
# 创建批数据,一批一批读取数据
BATCH_SIZE = 32 # 每批32张图片
def create_batch_data(X, y=None, batch_size = BATCH_SIZE, data_type=3):
'''参数说明:
① X : 图片数据集(路径)
② y :标签数据集(OneHot向量)
③ batch_size :一批数据的大小
④ data_type :数据集类型,1 :表示测试集,2 :表示验证数据,3 :表示训练数据
进一步说明:
1. 如果是测试集,图片格式转换为tensor类型;
2. 如果是验证集,图片格式转换为tensor类型,同时,images与labels同时返回;
3. 如果是训练集,图片格式转换为tensor类型,打乱数据集,images与labels同时返回;
'''
if data_type == 1: # 测试集
# 创建一个数据集,其元素是给定张量的切片。
print("**** 创建测试集的batches ****")
dataset = tf.data.Dataset.from_tensor_slices((tf.constant(X)))
data_batch = dataset.map(process_image).batch(batch_size)
return data_batch
elif data_type == 2: # 验证集
print("**** 创建验证集的batches ****")
dataset = tf.data.Dataset.from_tensor_slices((tf.constant(X), tf.constant(y)))
data_batch = dataset.map(get_img_label).batch(batch_size)
return data_batch
else: # 训练集
print("**** 创建训练集的batches ****")
dataset = tf.data.Dataset.from_tensor_slices((tf.constant(X), tf.constant(y)))
dataset = dataset.shuffle(buffer_size=len(X)) # 打乱训练集顺序
data_batch = dataset.map(get_img_label).batch(batch_size)
return data_batch
# 示例:演示from_tensor_slices()的效果
t = tf.data.Dataset.from_tensor_slices((tf.constant(x_train)))
it=iter(t)
print(next(it))
# 训练集:创建batches
train_data = create_batch_data(x_train, y_train, data_type=3)
# 验证集:创建batches
val_data = create_batch_data(x_val, y_val, data_type=2)
# 显示一批数据集
def show_images(images, labels):
batches = len(images) # 一个batch包含的图片数量
plt.figure(figsize=(16,8))
for i in range(batches):
plt.subplot(6, 6, i+1) # 指定图片显示的位置
plt.imshow(images[i])
plt.axis('off')
# 获取每个label向量中最大值的索引,找到对应的标签名
plt.title(classes_names[np.argmax(labels[i])])
# 从train中获取一个batch_size的样本数据
# samples_data, samples_label = next(train_data.as_numpy_iterator())
it=iter(train_data)
samples_data, samples_label=next(it)
samples_data, samples_label
samples_data.shape
samples_label.shape
# 显示一批数据集
show_images(samples_data, samples_label)
2.2 迁移学习、模型微调
# 超参数设置
INPUT_SHAPE = (None, IMG_SIZE, IMG_SIZE, 3) # batch_size, height, width, channel
OUTPUT_SHAPE = len(classes_names) # 即120个类别
MODEL_URL = "https://tfhub.dev/google/imagenet/mobilenet_v3_large_100_224/classification/5" # 模型URL
# 定义创建模型对象的函数
def create_model(input_shape=INPUT_SHAPE, output_shape=OUTPUT_SHAPE, model_url=MODEL_URL):
model = tf.keras.Sequential([
hub.KerasLayer(model_url), # 下载预训练模型
tf.keras.layers.Dense(units=output_shape, activation='softmax') # 全连接层,多分类,计算每个类别的概率值
])
# 模型编译
model.compile(loss=tf.keras.losses.CategoricalCrossentropy(), # 多分类损失函数
optimizer=tf.keras.optimizers.Adam(), # 优化器
metrics=['accuracy']) # 评估指标
# 模型构建
model.build(input_shape)
return model
# 模型编译
model = create_model()
# 模型结构
model.summary()
%load_ext tensorboard
!rm -rf ./logdir/ # 清楚之前的日志记录
# 定义TensorBoard
logdir = os.path.join('logdir', datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) # 日志保存文件夹的格式
tensorboar_callback = tf.keras.callbacks.TensorBoard(logdir) # 定义callback
# 定义earlystop callback
# 如果在验证集上,在3轮后,accuracy没有大幅提升,则中止训练
early_stopping_callback = tf.keras.callbacks.EarlyStopping(monitor='accuracy', patience=3)
2.3 模型训练、验证、测试
# 模型训练、验证
# EPOCHS = 30 # 轮数
EPOCHS=10
model.fit(x=train_data, # 训练集
epochs=EPOCHS, # 循环次数
validation_data=val_data, # 验证集
callbacks=[tensorboar_callback, early_stopping_callback] # callbacks
)
这里我电脑显卡太垃圾了,只设置epochs=10,你可以根据自己硬件适当调整
# 显示 训练结果
%tensorboard --logdir logdir
# 模型测试
test_path = os.path.join(dataset_dir, 'test')
test_images_paths = glob.glob(test_path+'/*.jpg')
test_images_paths[:5]
test_data = create_batch_data(test_images_paths, data_type=1) # 测试集类型,生成一批一批的数据
test_preditions = model.predict(test_data, verbose=1)
test_preditions.shape # 预测了10357张图片,每个标签对应一个120维的向量
# 获取第一张test图片,其120个类别的预测概率值如下:
test_preditions[0] # 第一张图片的预测类别概率值
# 第一张test图片的最大概率值
max_test_01_prob = np.max(test_preditions[0])
print(f'最大概率值:{max_test_01_prob}')
# 第一张test图片的最大概率值的索引
max_test_01_index = np.argmax(test_preditions[0])
print(f'最大概率值对应的索引:{max_test_01_index}')
# 第一张test图片的最大概率值对应的标签
max_test_01_label = classes_names[max_test_01_index]
print(f'最大概率值对应的label:{max_test_01_label}')
# 函数:显示测试集图片,预测概率,预测标签名称
def show_test_imgs(img_path, prob):
test_image = process_image(img_path) # 图片预处理
pred_label = classes_names[np.argmax(prob)] # 模型预测结果
max_prob = round(np.max(prob),2) # 最大概率值
title_name = f'prob:{str(max_prob)},label:{pred_label}' # title : 概率,标签名
plt.imshow(test_image)
plt.axis('off')
plt.title(title_name)
# 函数:显示测试图片排名前10的概率值和标签名
def show_top10_prob_label(pred_probs):
top_10_probs_indices = pred_probs.argsort()[-10:][::-1] # 先升序排序,获取最后10个概率值的索引,再降序(由高到低)
top_10_probs_values = pred_probs[top_10_probs_indices] # 获取10个最大的概率值(由高到低)
top_10_probs_labels = classes_names[top_10_probs_indices] # 获取10个概率值对应的标签
# 显示
plt.bar(np.arange(len(top_10_probs_indices)), # 范围
top_10_probs_values, # 显示的概率值
color='blue')
# 显示横轴信息
plt.xticks(np.arange(len(top_10_probs_indices)), # 范围
labels=top_10_probs_labels, # 标签
rotation='vertical') # 垂直显示
# 显示3张图片以及对应的预测概率值(排名前10)
def show_test_prediction(numbers = 3):
plt.figure(figsize=(16,8))
for i in range(numbers):
plt.subplot(numbers, 2, 2*i+1) # 指定位置(第一列)
show_test_imgs(test_images_paths[i], # 指定test图片
test_preditions[i]) # 指定test概率值(120个类别)
plt.subplot(numbers, 2, 2*i+2) # 指定位置(第二列)
show_top10_prob_label(test_preditions[i]) # 显示排名前10个预测概率值
plt.show()
show_test_prediction(numbers=3)
2.4 模型预测
这里从网上下载一张柯基的图片来测试
from PIL import Image
try:
img=Image.open('Corgi.jpg')
except:
print('error')
# 数据类型转换
test_img_01_data = create_batch_data(X=['Corgi.jpg'], y=None, batch_size=1, data_type=1)
# 模型预测
test_result = model.predict(test_img_01_data)
# 预测结果维度
test_result.shape
# 输出结果
best_prob = np.max(test_result)
prob_label = classes_names[np.argmax(test_result)]
print(f'预测的最大概率:{round(best_prob*100, 2)}%,对应的标签名:{prob_label}')
# 另一种直接的方式:model.predict_proba(test_img_01_data)
可以看到,预测结果确实是柯基,证明我们的模型还是挺不错的。但是硬件太拉垮了,==略微出手已是显卡极限==。
2.5 模型保存
# 模型保存
model.save("model.h5")
# 加载模型
model = load_model('model.h5', custom_objects={"KerasLayer" : hub.KerasLayer})
model.summary()
保存预加载都没什么问题,到此,迁移学习就搞完了。