卷积层前向与反向传播
Conv2d前向与反向
class Conv2d(): def __init__(self, in_channels, n_filter, filter_size, padding, stride): """ parameters: in_channel: 输入feature的通道数 n_filter: 卷积核数目 filter_size: 卷积核的尺寸(h_filter, w_filter) padding: 0填充数目 stride: 卷积核滑动步幅 """ self.in_channels = in_channels self.n_filter = n_filter self.h_filter, self.w_filter = filter_size self.padding = padding self.stride = stride # 初始化参数,卷积网络的参数size与输入的size无关 self.W = np.random.randn(n_filter, self.in_channels, self.h_filter, self.w_filter) / np.sqrt(n_filter / 2.) self.b = np.zeros((n_filter, 1)) self.params = [self.W, self.b] def __call__(self, X): # 计算输出feature的尺寸 self.n_x, _, self.h_x, self.w_x = X.shape self.h_out = (self.h_x + 2 * self.padding - self.h_filter) / self.stride + 1 self.w_out = (self.w_x + 2 * self.padding - self.w_filter) / self.stride + 1 if not self.h_out.is_integer() or not self.w_out.is_integer(): raise Exception("Invalid dimensions!") self.h_out, self.w_out = int(self.h_out), int(self.w_out) # 声明Img2colIndices实例 self.img2col_indices = Img2colIndices((self.h_filter, self.w_filter), self.padding, self.stride) return self.forward(X) def forward(self, X): # 将X转换成col self.x_col = self.img2col_indices.img2col(X) # 转换参数W的形状,使它适合与col形态的x做计算 self.w_row = self.W.reshape(self.n_filter, -1) # 计算前向传播 out = self.w_row @ self.x_col + self.b # @在numpy中相当于矩阵乘法,等价于numpy.matmul() out = out.reshape(self.n_filter, self.h_out, self.w_out, self.n_x) out = out.transpose(3, 0, 1, 2) return out def backward(self, d_out): """ parameters: d_out: loss对卷积输出的梯度 """ # 转换d_out的形状 d_out_col = d_out.transpose(1, 2, 3, 0) d_out_col = d_out_col.reshape(self.n_filter, -1) d_w = d_out_col @ self.x_col.T d_w = d_w.reshape(self.W.shape) # shape=(n_filter, d_x, h_filter, w_filter) d_b = d_out_col.sum(axis=1).reshape(self.n_filter, 1) d_x = self.w_row.T @ d_out_col # 将col态的d_x转换成image格式 d_x = self.img2col_indices.col2img(d_x) return d_x, [d_w, d_b]
MaxPool2d
class Maxpool(): def __init__(self, size, stride): """ parameters: size: maxpool框框的尺寸,int类型 stride: maxpool框框的滑动步幅,一般设计步幅和size一样 """ self.size = size # maxpool框的尺寸 self.stride = stride def __call__(self, X): """ parameters: X: 输入feature,shape=(batch_size, channels, height, width) """ self.n_x, self.c_x, self.h_x, self.w_x = X.shape # 计算maxpool输出尺寸 self.h_out = (self.h_x - self.size) / self.stride + 1 self.w_out = (self.w_x - self.size) / self.stride + 1 if not self.h_out.is_integer() or not self.w_out.is_integer(): raise Exception("Invalid dimensions!") self.h_out, self.w_out = int(self.h_out), int(self.w_out) # 声明Img2colIndices实例 self.img2col_indices = Img2colIndices((self.size, self.size), padding=0, stride=self.stride) # maxpool不需要padding return self.forward(X) def forward(self, X): """ parameters: X: 输入feature,shape=(batch_size, channels, height, width) """ x_reshaped = X.reshape(self.n_x * self.c_x, 1, self.h_x, self.w_x) self.x_col = self.img2col_indices.img2col(x_reshaped) self.max_indices = np.argmax(self.x_col, axis=0) out = self.x_col[self.max_indices, range(self.max_indices.size)] out = out.reshape(self.h_out, self.w_out, self.n_x, self.c_x).transpose(2, 3, 0, 1) return out def backward(self, d_out): """ parameters: d_out: loss多maxpool输出的梯度,shape=(batch_size, channels, h_out, w_out) """ d_x_col = np.zeros_like(self.x_col) # shape=(size*size, h_out*h_out*batch*C) d_out_flat = d_out.transpose(2, 3, 0, 1).ravel() d_x_col[self.max_indices, range(self.max_indices.size)] = d_out_flat # 将d_x由col形态转换到img形态 d_x = self.img2col_indices.col2img(d_x_col) d_x = d_x.reshape(self.n_x, self.c_x, self.h_x, self.w_x) return d_x
BatchNorm2d
class BatchNorm2d(): """ 对卷积层来说,批量归一化发生在卷积计算之后、应用激活函数之前。 如果卷积计算输出多个通道,我们需要对这些通道的输出分别做批量归一化,且每个通道都拥有独立的拉伸和偏移参数,并均为标量。 设小批量中有 m 个样本。在单个通道上,假设卷积计算输出的高和宽分别为 p 和 q 。我们需要对该通道中 m×p×q 个元素同时做批量归一化。 对这些元素做标准化计算时,我们使用相同的均值和方差,即该通道中 m×p×q 个元素的均值和方差。 将训练好的模型用于预测时,我们希望模型对于任意输入都有确定的输出。 因此,单个样本的输出不应取决于批量归一化所需要的随机小批量中的均值和方差。 一种常用的方法是通过移动平均估算整个训练数据集的样本均值和方差,并在预测时使用它们得到确定的输出。 """ def __init__(self, n_channel, momentum): """ parameters: n_channel: 输入feature的通道数 momentum: moving_mean/moving_var迭代调整系数 """ self.n_channel = n_channel self.momentum = momentum # 参与求梯度和迭代的拉伸和偏移参数,分别初始化成1和0 self.gamma = np.ones((1, n_channel, 1, 1)) self.beta = np.zeros((1, n_channel, 1, 1)) # 测试时使用的参数,初始化为0,需在训练时动态调整 self.moving_mean = np.zeros((1, n_channel, 1, 1)) self.moving_var = np.zeros((1, n_channel, 1, 1)) self.params = [self.gamma, self.beta] def __call__(self, X, mode): """ X: shape = (N, C, H, W) mode: 训练阶段还是测试阶段,train或test, 需要在调用时传参 """ self.X = X # 求gamma的梯度时用 return self.forward(X, mode) def forward(self, X, mode): """ X: shape = (N, C, H, W) mode: 训练阶段还是测试阶段,train或test, 需要在调用时传参 """ if mode != 'train': # 如果是在预测模式下,直接使用传入的移动平均所得的均值和方差 self.x_norm = (X - self.moving_mean) / np.sqrt(self.moving_var + 1e-5) else: # 使用二维卷积层的情况,计算通道维上(axis=1)的均值和方差。 # 这里我们需要保持X的形状以便后面可以做广播运算 mean = X.mean(axis=(0, 2, 3), keepdims=True) self.var = X.var(axis=(0, 2, 3), keepdims=True) # 设为self,是因为backward时会用到 # 训练模式下用当前的均值和方差做标准化。设为类实例的属性,backward时用 self.x_norm = (X - mean) / (np.sqrt(self.var + 1e-5)) # 更新移动平均的均值和方差 self.moving_mean = self.momentum * self.moving_mean + (1 - self.momentum) * mean self.moving_var = self.momentum * self.moving_var + (1 - self.momentum) * self.var # 拉伸和偏移 out = self.x_norm * self.gamma + self.beta return out def backward(self, d_out): """ d_out的形状与输入的形状一样 """ d_gamma = (d_out * self.x_norm).sum(axis=(0, 2, 3), keepdims=True) d_beta = d_out.sum(axis=(0, 2, 3), keepdims=True) d_x = (d_out * self.gamma) / np.sqrt(self.var + 1e-5) return d_x, [d_gamma, d_beta]
Flatten层
class Flatten(): """ 最后的卷积层输出的feature若要连接全连接层需要将feature拉平 单独建立一个模块是为了方便梯度反向传播 """ def __init__(self): pass def __call__(self, X): self.x_shape = X.shape # (batch_size, channels, height, width) return self.forward(X) def forward(self, X): out = X.ravel().reshape(self.x_shape[0], -1) return out def backward(self, d_out): d_x = d_out.reshape(self.x_shape) return d_x
全连接层的前向与反向
import numpy as np # 定义线性层网络 class Linear(): """ 线性全连接层 """ def __init__(self, dim_in, dim_out): """ 参数: dim_in: 输入维度 dim_out: 输出维度 """ # 初始化参数 scale = np.sqrt(dim_in / 2) self.weight = np.random.standard_normal((dim_in, dim_out)) / scale self.bias = np.random.standard_normal(dim_out) / scale # self.weight = np.random.randn(dim_in, dim_out) # self.bias = np.zeros(dim_out) self.params = [self.weight, self.bias] def __call__(self, X): """ 参数: X:这一层的输入,shape=(batch_size, dim_in) return: xw + b """ self.X = X return self.forward() def forward(self): return np.dot(self.X, self.weight) + self.bias def backward(self, d_out): """ 参数: d_out:输出的梯度, shape=(batch_size, dim_out) return: 返回loss对输入 X 的梯度(前一层(l-1)的激活值的梯度) """ # 计算梯度 # 对input的梯度有batch维度,对参数的梯度对batch维度取平均 d_x = np.dot(d_out, self.weight.T) # 输入也即上一层激活值的梯度 d_w = np.dot(self.X.T, d_out) # weight的梯度 d_b = np.mean(d_out, axis=0) # bias的梯度 return d_x, [d_w, d_b]
Dropout前向与反向
class Dropout(): """ 在训练时随机将部分feature置为0 """ def __init__(self, p): """ parameters: p: 保留比例 """ self.p = p def __call__(self, X, mode): """ mode: 是在训练阶段还是测试阶段. train 或者 test """ return self.forward(X, mode) def forward(self, X, mode): if mode == 'train': self.mask = np.random.binomial(1, self.p, X.shape) / self.p out = self.mask * X else: out = X return out def backward(self, d_out): """ d_out: loss对dropout输出的梯度 """ return d_out * self.mask
激活函数
ReLU
import numpy as np # 定义Relu层 class Relu(object): def __init__(self): self.X = None def __call__(self, X): self.X = X return self.forward(self.X) def forward(self, X): return np.maximum(0, X) def backward(self, grad_output): """ grad_output: loss对relu激活输出的梯度 return: relu对输入input_z的梯度 """ grad_relu = self.X > 0 # input_z大于0的提放梯度为1,其它为0 return grad_relu * grad_output # numpy中*为点乘
Tanh
class Tanh(): def __init__(self): self.X = None def __call__(self, X): self.X = X return self.forward(self.X) def forward(self, X): return np.tanh(X) def backward(self, grad_output): grad_tanh = 1 - (np.tanh(self.X)) ** 2 return grad_output * grad_tanh
Sigmoid
class Sigmoid(): def __init__(self): self.X = None def __call__(self, X): self.X = X return self.forward(self.X) def forward(self, X): return self._sigmoid(X) def backward(self, grad_output): sigmoid_grad = self._sigmoid(self.X) * (1 - self._sigmoid(self.X)) return grad_output * sigmoid_grad def _sigmoid(self, X): return 1.0 / (1 + np.exp(-X))
损失函数
这里以交叉熵损失函数为例:
import numpy as np # 交叉熵损失 class CrossEntropyLoss(): """ 对最后一层的神经元输出计算交叉熵损失 """ def __init__(self): self.X = None self.labels = None def __call__(self, X, labels): """ 参数: X: 模型最后fc层输出 labels: one hot标注,shape=(batch_size, num_class) """ self.X = X self.labels = labels return self.forward(self.X) def forward(self, X): """ 计算交叉熵损失 参数: X:最后一层神经元输出,shape=(batch_size, C) label:数据onr-hot标注,shape=(batch_size, C) return: 交叉熵loss """ self.softmax_x = self.softmax(X) log_softmax = self.log_softmax(self.softmax_x) cross_entropy_loss = np.sum(-(self.labels * log_softmax), axis=1).mean() return cross_entropy_loss def backward(self): grad_x = (self.softmax_x - self.labels) # 返回的梯度需要除以batch_size return grad_x / self.X.shape[0] def log_softmax(self, softmax_x): """ 参数: softmax_x, 在经过softmax处理过的X return: log_softmax处理后的结果shape = (m, C) """ return np.log(softmax_x + 1e-5) def softmax(self, X): """ 根据输入,返回softmax 代码利用softmax函数的性质: softmax(x) = softmax(x + c) """ batch_size = X.shape[0] # axis=1 表示在二维数组中沿着横轴进行取最大值的操作 max_value = X.max(axis=1) #每一行减去自己本行最大的数字,防止取指数后出现inf,性质:softmax(x) = softmax(x + c) # 一定要新定义变量,不要用-=,否则会改变输入X。因为在调用计算损失时,多次用到了softmax,input不能改变 tmp = X - max_value.reshape(batch_size, 1) # 对每个数取指数 exp_input = np.exp(tmp) # shape=(m, n) # 求出每一行的和 exp_sum = exp_input.sum(axis=1, keepdims=True) # shape=(m, 1) return exp_input / exp_sum
优化器
SGD
class SGD(): """ 随机梯度下降 parameters: 模型需要训练的参数 lr: float, 学习率 momentum: float, 动量因子,默认为None不使用动量梯度下降 """ def __init__(self, parameters, lr, momentum=None): self.parameters = parameters self.lr = lr self.momentum = momentum if momentum is not None: self.velocity = self.velocity_initial() def update_parameters(self, grads): """ grads: 调用network的backward方法,返回的grads. """ if self.momentum == None: for param, grad in zip(self.parameters, grads): param -= self.lr * grad else: for i in range(len(self.parameters)): self.velocity[i] = self.momentum * self.velocity[i] - self.lr * grads[i] self.parameters[i] += self.velocity[i] def velocity_initial(self): """ 初始化velocity,按照parameters的参数顺序依次将v初始化为0 """ velocity = [] for param in self.parameters: velocity.append(np.zeros_like(param)) return velocity