前言
由于原作业需要实现的函数过多,本文先不打算从算法原理讲解,而是直接贴上结果代码,提供大家参考,该实验按照此文章的标准来构建:
提供成品代码文件
文件获取:
链接:https://pan.baidu.com/s/1Fw_7thL5PxR79zI6XbpnYQ
提取码:txqe
文件结构:
| - hw2 | - code | - Beras | - 8个.py文件用于实现实验要求函数 | - assignment.py | - preprocess.py | - visualize.py | - data | - mnist | - 四个数据集文件 | - Iris (可以忽略,不在本实验中使用)
1. 预处理的数据
该文件为实验自带,主要实现功能为:从../data/mnist/中的4个.gz文件中读取到mnist数据集的分别用于Tran和Test训练集和测试集(2*2=四个)。
preprocess.py
import gzip import pickle from unicodedata import numeric import numpy as np """ TODO: Same as HW1. Feel free to copy and paste your old implementation here. It's a good time to vectorize it, while you're at it! No need to include CIFAR-specific methods. """ def get_data_MNIST(subset, data_path="../data", is_reshape=True): """ :param subset: string indicating whether we want the training or testing data (only accepted values are 'train' and 'test') :param data_path: directory containing the training and testing inputs and labels :return: NumPy array of inputs (float32) and labels (uint8) """ ## http://yann.lecun.com/exdb/mnist/ subset = subset.lower().strip() assert subset in ("test", "train"), f"unknown data subset {subset} requested" inputs_file_path, labels_file_path, num_examples = { "train": ("train-images-idx3-ubyte.gz", "train-labels-idx1-ubyte.gz", 60000), "test": ("t10k-images-idx3-ubyte.gz", "t10k-labels-idx1-ubyte.gz", 10000), }[subset] inputs_file_path = f"{data_path}/mnist/{inputs_file_path}" labels_file_path = f"{data_path}/mnist/{labels_file_path}" ## TODO: read the image file and normalize, flatten, and type-convert image with open(inputs_file_path, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream: buf = bytestream.read(num_examples*28*28 + 16) dt = np.dtype(np.uint8) temp = np.frombuffer(buf, dtype=dt) image = temp[16:] if is_reshape: image = image.reshape((num_examples,28*28)) else: image = image.reshape((num_examples, 28, 28, 1)) image = image/255.0 print(image.shape) ## TODO: read the label file with open(labels_file_path, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream: buf = bytestream.read(num_examples + 8) dt = np.dtype(np.uint8) temp = np.frombuffer(buf, dtype=dt) label = temp[8:] return image, label ## THE REST ARE OPTIONAL! ''' def shuffle_data(image_full, label_full, seed): pass def get_subset(image_full, label_full, class_list=list(range(10)), num=100): pass '''
2、独热编码
此文件用于实现独热编码,需要手写的地方如下:
● fit(): [TODO]在这个函数中,你需要在Data(将其存储在self.uniq中)并创建一个以标签作为键的字典和它们对应的一个热编码作为值。提示:你可能想这么做查看np.eye()以获得单热编码。最终,您将存储它在self.uniq2oh字典。
● forward():在这个函数中,我们传递一个向量,包含对象中所有实际的标签训练集并调用fit()来用unique填充uniq2oh字典标签及其对应的one-hot编码,然后使用它返回一个针对训练集中每个标签的单热编码标签数组。
这个函数已经为您填好了!
●inverse():在函数中,我们将one-hot编码反转为实际编码标签。
这已经为你做过了。
例如,如果我们有标签X和Y,其单热编码为[1,0]和[0,1],我们将{X: [1,0], Y:[0,1]}。
对于MNIST,你将有10个标签,所以你的字典应该有10个条目!
onehot.py
import numpy as np from .core import Callable class OneHotEncoder(Callable): """ One-Hot Encodes labels. First takes in a candidate set to figure out what elements it needs to consider, and then one-hot encodes subsequent input datasets in the forward pass. SIMPLIFICATIONS: - Implementation assumes that entries are individual elements. - Forward will call fit if it hasn't been done yet; most implementations will just error. - keras does not have OneHotEncoder; has LabelEncoder, CategoricalEncoder, and to_categorical() """ def fit(self, data): """ Fits the one-hot encoder to a candidate dataset. Said dataset should contain all encounterable elements. :param data: 1D array containing labels. For example, data = [0, 1, 3, 3, 1, 9, ...] """ ## TODO: Fetch all the unique labels and create a dictionary with ## the unique labels as keys and their one hot encodings as values ## HINT: look up np.eye() and see if you can utilize it! ## HINT: Wouldn't it be nice if we just gave you the implementation somewhere... self.uniq = np.unique(data) # all the unique labels from `data` self.uniq2oh = {} # a lookup dictionary with labels and corresponding encodings eye = np.eye(len(self.uniq)) for i in range(len(self.uniq)): self.uniq2oh[self.uniq[i]] = eye[i] def forward(self, data): if not hasattr(self, "uniq2oh"): self.fit(data) return np.array([self.uniq2oh[x] for x in data]) def inverse(self, data): assert hasattr(self, "uniq"), \ "forward() or fit() must be called before attempting to invert" return np.array([self.uniq[x == 1][0] for x in data])
3、核心抽象
本文件为实验给定代码,无需做出修改,
core.py
from abc import ABC, abstractmethod # # For abstract method support from typing import Tuple import numpy as np ## DO NOT MODIFY THIS CLASS class Callable(ABC): """ Callable Sub-classes: - CategoricalAccuracy (./metrics.py) - TODO - OneHotEncoder (./preprocess.py) - TODO - Diffable (.) - DONE """ def __call__(self, *args, **kwargs) -> np.array: """Lets `self()` and `self.forward()` be the same""" return self.forward(*args, **kwargs) @abstractmethod def forward(self, *args, **kwargs) -> np.array: """Pass inputs through function. Can store inputs and outputs as instance variables""" pass ## DO NOT MODIFY THIS CLASS class Diffable(Callable): """ Diffable Sub-classes: - Dense (./layers.py) - TODO - LeakyReLU, ReLU (./activations.py) - TODO - Softmax (./activations.py) - TODO - MeanSquaredError (./losses.py) - TODO """ """Stores whether the operation being used is inside a gradient tape scope""" gradient_tape = None ## All-instance-shared variable def __init__(self): """Is the layer trainable""" super().__init__() self.trainable = True ## self-only instance variable def __call__(self, *args, **kwargs) -> np.array: """ If there is a gradient tape scope in effect, perform AND RECORD the operation. Otherwise... just perform the operation and don't let the gradient tape know. """ if Diffable.gradient_tape is not None: Diffable.gradient_tape.operations += [self] return self.forward(*args, **kwargs) @abstractmethod def input_gradients(self: np.array) -> np.array: """Returns gradient for input (this part gets specified for all diffables)""" pass def weight_gradients(self: np.array) -> Tuple[np.array, np.array]: """Returns gradient for weights (this part gets specified for SOME diffables)""" return () def compose_to_input(self, J: np.array) -> np.array: """ Compose the inputted cumulative jacobian with the input jacobian for the layer. Implemented with batch-level vectorization. Requires `input_gradients` to provide either batched or overall jacobian. Assumes input/cumulative jacobians are matrix multiplied """ # print(f"Composing to input in {self.__class__.__name__}") ig = self.input_gradients() batch_size = J.shape[0] n_out, n_in = ig.shape[-2:] j_new = np.zeros((batch_size, n_out), dtype=ig.dtype) for b in range(batch_size): ig_b = ig[b] if len(ig.shape) == 3 else ig j_new[b] = ig_b @ J[b] return j_new def compose_to_weight(self, J: np.array) -> list: """ Compose the inputted cumulative jacobian with the weight jacobian for the layer. Implemented with batch-level vectorization. Requires `weight_gradients` to provide either batched or overall jacobian. Assumes weight/cumulative jacobians are element-wise multiplied (w/ broadcasting) and the resulting per-batch statistics are averaged together for avg per-param gradient. """ # print(f'Composing to weight in {self.__class__.__name__}') assert hasattr( self, "weights" ), f"Layer {self.__class__.__name__} cannot compose along weight path" J_out = [] ## For every weight/weight-gradient pair... for w, wg in zip(self.weights, self.weight_gradients()): batch_size = J.shape[0] ## Make a cumulative jacobian which will contribute to the final jacobian j_new = np.zeros((batch_size, *w.shape), dtype=wg.dtype) ## For every element in the batch (for a single batch-level gradient updates) for b in range(batch_size): ## If the weight gradient is a batch of transform matrices, get the right entry. ## Allows gradient methods to give either batched or non-batched matrices wg_b = wg[b] if len(wg.shape) == 3 else wg ## Update the batch's Jacobian update contribution j_new[b] = wg_b * J[b] ## The final jacobian for this weight is the average gradient update for the batch J_out += [np.mean(j_new, axis=0)] ## After new jacobian is computed for each weight set, return the list of gradient updatates return J_out class GradientTape: def __init__(self): ## Log of operations that were performed inside tape scope self.operations = [] def __enter__(self): # When tape scope is entered, let Diffable start recording to self.operation Diffable.gradient_tape = self return self def __exit__(self, exc_type, exc_val, exc_tb): # When tape scope is exited, stop letting Diffable record Diffable.gradient_tape = None def gradient(self) -> list: """Get the gradient from first to last recorded operation""" ## TODO: ## ## Compute weight gradients for all operations. ## If the model has trainable weights [w1, b1, w2, b2] and ends at a loss L. ## the model should return: [dL/dw1, dL/db1, dL/dw2, dL/db2] ## ## Recall that self.operations is populated by Diffable class instances... ## ## Start from the last operation and compute jacobian w.r.t input. ## Continue to propagate the cumulative jacobian through the layer inputs ## until all operations have been differentiated through. ## ## If an operation that has weights is encountered along the way, ## compute the weight gradients and add them to the return list. ## Remember to check if the layer is trainable before doing this though... grads = [] return grads
4、网络层
此层仿造Keras中的Dense,需要手写函数为:
● forward() : [TODO] 实现向前传递和返回输出。
● weight_gradients() : [TODO] 计算关于的梯度权重和偏差。这将用于优化图层。
● input_gradients() : [TODO] 计算关于的梯度层的输入。这将用于将渐变传播到前面的层。
● _initialize_weight() : [TODO]
初始化致密层的权重值默认情况下,将所有权重初始化为零(顺便说一下,这通常是个坏主意)。你也需要允许更复杂的选项(当初始化式为设置为normal, xavier和kaiing)。遵循Keras的数学假设!
〇 Normal:不言自明,单位正态分布。
〇 Xavier Normal:基于keras.GlorotNormal。
〇 Kaiing He Normal:基于Keras.HeNormal。
在实现这些时,你可能会发现np.random.normal很有帮助。的行动计划说明为什么这些不同的初始化方法是必要的,但是欲了解更多细节,请查看这个网站!请随意添加更多初始化器选项!
.layers
import numpy as np from .core import Diffable class Dense(Diffable): # https://towardsdatascience.com/weight-initialization-in-neural-networks-a-journey-from-the-basics-to-kaiming-954fb9b47c79 def __init__(self, input_size, output_size, learning_rate=0.01, initializer="kaiming"): super().__init__() self.w, self.b = self.__class__._initialize_weight( initializer, input_size, output_size ) self.weights = [self.w, self.b] self.learning_rate = learning_rate self.inputs = None self.outputs = None def forward(self, inputs): """Forward pass for a dense layer! Refer to lecture slides for how this is computed.""" self.inputs = inputs # TODO: implement the forward pass and return the outputs self.outputs = np.matmul(inputs, self.w) + self.b return self.outputs def weight_gradients(self, eta): """Calculating the gradients wrt weights and biases!""" # TODO: Implement calculation of gradients wgrads = np.dot(self.inputs.T, eta) bgrads = np.sum(eta, axis=0) return wgrads, bgrads def input_gradients(self, eta): """Calculating the gradients wrt inputs!""" # TODO: Implement calculation of gradients inputgrads = np.dot(eta, self.w.T) wgrads, bgrads = self.weight_gradients(eta) self.w = self.w - self.learning_rate*wgrads self.b = self.b - self.learning_rate*bgrads return inputgrads @staticmethod def _initialize_weight(initializer, input_size, output_size): """ Initializes the values of the weights and biases. The bias weights should always start at zero. However, the weights should follow the given distribution defined by the initializer parameter (zero, normal, xavier, or kaiming). You can do this with an if statement cycling through each option! Details on each weight initialization option: - Zero: Weights and biases contain only 0's. Generally a bad idea since the gradient update will be the same for each weight so all weights will have the same values. - Normal: Weights are initialized according to a normal distribution. - Xavier: Goal is to initialize the weights so that the variance of the activations are the same across every layer. This helps to prevent exploding or vanishing gradients. Typically works better for layers with tanh or sigmoid activation. - Kaiming: Similar purpose as Xavier initialization. Typically works better for layers with ReLU activation. """ initializer = initializer.lower() assert initializer in ( "zero", "normal", "xavier", "kaiming", ), f"Unknown dense weight initialization strategy '{initializer}' requested" io_size = (input_size, output_size) # TODO: Implement default assumption: zero-init for weights and bias initial_b = np.zeros((1,output_size)) if initializer=="zero": initial_w = np.zeros(io_size) # TODO: Implement remaining options (normal, xavier, kaiming initializations). Note that # strings must be exactly as written in the assert above elif initializer=="normal": initial_w = np.random.randn(input_size, output_size) elif initializer=="xavier": initial_w = np.random.randn(input_size, output_size) * np.sqrt(1 / output_size) elif initializer=="kaiming": initial_w = np.random.randn(input_size, output_size) * np.sqrt(2 / output_size) return initial_w, initial_b
5、激活函数
该文件用于实现LeakRelu激活函数和SoftMax激活函数,手写了他们的前向传播[def forward]和反向传播[def input_fradients]:
● LeakyReLU ()
〇 forward() : [TODO]给定输入x,计算并返回LeakyReLU(x)。
〇 input_gradients() : [TODO]计算并返回与通过对LeakyReLU求导得到输入。
● Softmax():(2470 ONLY)
〇 forward(): [TODO]给定输入x,计算并返回Softmax(x)。确保使用的是稳定的softmax,即减去所有项的最大值防止溢出/undvim erflow问题。
〇 input_gradients(): [TODO] Softmax()的部分w.r.t输入。
.activations.py
import numpy as np from .core import Diffable class LeakyReLU(Diffable): def __init__(self, alpha=0.3): super().__init__() self.alpha = alpha self.inputs = None self.outputs = None def forward(self, inputs): # TODO: Given an input array `x`, compute LeakyReLU(x) self.inputs = inputs # Your code here: self.outputs = inputs if inputs.all()>=0 else inputs*self.alpha return self.outputs def input_gradients(self, eta): # TODO: Compute and return the gradients eta[self.inputs<=0] = 0 return eta def compose_to_input(self, J): # TODO: Maybe you'll want to override the default? return super().compose_to_input(J) class ReLU(LeakyReLU): def __init__(self): super().__init__(alpha=0) class Softmax(Diffable): def __init__(self): super().__init__() self.inputs = None self.outputs = None def forward(self, inputs): """Softmax forward pass!""" # TODO: Implement # HINT: Use stable softmax, which subtracts maximum from # all entries to prevent overflow/underflow issues self.inputs = inputs # Your code here: z = inputs - np.max(inputs, axis=-1,keepdims=True) numerator = np.exp(z) denominator = np.sum(numerator) self.outputs = numerator/denominator return self.outputs def input_gradients(self, etc): """Softmax backprop!""" # TODO: Compute and return the gradients return etc
☆6、填充函数
本文用于手写Keras中的序列模型SequentialModel类,SequentialModel继承Model类,从而我们先实现Model类具体内容如下:
● compile() : 初始化模型优化器,损失函数和精度函数,它们作为参数输入,供SequentialModel实例使用。
● fit() : 训练模型将输入和输出关联起来。重复训练每个时代,数据是基于参数的批处理。它还计算Batch_metrics、epoch_metrics和聚合的agg_metrics可以用来跟踪模型的训练进度。
● evaluate() : [TODO] 评估最终模型的性能使用测试阶段中提到的指标。它几乎和符合()函数;想想培训和测试之间会发生什么变化)。
● call() : [TODO] 提示:调用顺序模型意味着什么?还记得顺序模型是一堆层,每一层只有一个输入向量和一个输出向量。你可以在在assignment.py中的SequentialModel类。
● batch_step() : [TODO] 您将看到fit()为每一个都调用了这个函数批处理。您将首先计算输入批处理的模型预测。在训练阶段,你需要计算梯度和更新你的权重根据您正在使用的优化器。对于训练过程中的反向传播,你将使用GradientTape从核心抽象(core.py)来记录操作和中间值。然后您将使用模型的优化器来将梯度应用到模型的可训练变量上。最后,计算和返回该批次的损耗和精度。你可以在在assignment.py中的SequentialModel类。
model.py
from abc import ABC, abstractmethod from collections import defaultdict import numpy as np from .core import Diffable def print_stats(stat_dict, b=None, b_num=None, e=None, avg=False): """ Given a dictionary of names statistics and batch/epoch info, print them in an appealing manner. If avg, display stat averages. """ title_str = " - " if e is not None: title_str += f"Epoch {e+1:2}: " if b is not None: title_str += f"Batch {b+1:3}" if b_num is not None: title_str += f"/{b_num}" if avg: title_str += f"Average Stats" print(f"\r{title_str} : ", end="") op = np.mean if avg else lambda x: x print({k: np.round(op(v), 4) for k, v in stat_dict.items()}, end="") print(" ", end="" if not avg else "\n") def update_metric_dict(super_dict, sub_dict): """ Appends the average of the sub_dict metrics to the super_dict's metric list """ for k, v in sub_dict.items(): super_dict[k] += [np.mean(v)] class Model(ABC): ############################################################################################### ## BEGIN GIVEN def __init__(self, layers): """ Initialize all trainable parameters and take layers as inputs """ # Initialize all trainable parameters assert all([issubclass(layer.__class__, Diffable) for layer in layers]) self.layers = layers[:-1] self.trainable_variables = [] for layer in layers: if hasattr(layer, "weights") and layer.trainable: for weight in layer.weights: self.trainable_variables += [weight] def compile(self, optimizer, loss_fn, acc_fn): """ "Compile" the model by taking in the optimizers, loss, and accuracy functions. In more optimized DL implementations, this will have more involved processes that make the components extremely efficient but very inflexible. """ self.optimizer = optimizer self.compiled_loss = loss_fn self.compiled_acc = acc_fn def fit(self, x, y, epochs, batch_size): """ Trains the model by iterating over the input dataset and feeding input batches into the batch_step method with training. At the end, the metrics are returned. """ agg_metrics = defaultdict(lambda: []) batch_num = x.shape[0] // batch_size for e in range(epochs): epoch_metrics = defaultdict(lambda: []) for b, b1 in enumerate(range(batch_size, x.shape[0] + 1, batch_size)): b0 = b1 - batch_size batch_metrics = self.batch_step(x[b0:b1], y[b0:b1], training=True) update_metric_dict(epoch_metrics, batch_metrics) print_stats(batch_metrics, b, batch_num, e) update_metric_dict(agg_metrics, epoch_metrics) print_stats(epoch_metrics, e=e, avg=True) return agg_metrics def evaluate(self, x, y, batch_size): """ X is the dataset inputs, Y is the dataset labels. Evaluates the model by iterating over the input dataset in batches and feeding input batches into the batch_step method. At the end, the metrics are returned. Should be called on the testing set to evaluate accuracy of the model using the metrics output from the fit method. NOTE: This method is almost identical to fit (think about how training and testing differ -- the core logic should be the same) """ # TODO: Implement evaluate similarly to fit. agg_metrics = defaultdict(lambda: []) batch_num = x.shape[0] // batch_size for e in range(1): epoch_metrics = defaultdict(lambda: []) for b, b1 in enumerate(range(batch_size, x.shape[0] + 1, batch_size)): b0 = b1 - batch_size batch_metrics = self.batch_step(x[b0:b1], y[b0:b1], training=False) update_metric_dict(epoch_metrics, batch_metrics) print_stats(batch_metrics, b, batch_num, e) update_metric_dict(agg_metrics, epoch_metrics) print_stats(epoch_metrics, e=e, avg=True) return agg_metrics @abstractmethod def call(self, inputs): """You will implement this in the SequentialModel class in assignment.py""" return @abstractmethod def batch_step(self, x, y, training=True): """You will implement this in the SequentialModel class in assignment.py""" return
☆def batch_step() 解析:
y_pre = self.call(x)
:通过前向传播得到网络传播一次后的预测值,
loss = self.compiled_loss.forward(y_pre, y)
:将预测值与真实值放入损失函数中通过前向传播得到损失值。
acc = self.compiled_acc(y_pre, y)
:将预测值与真实值放入精度函数中通过前向传播得到精度值。
各函数反向传播的意义:
激活函数:将神经网络上一层的输入,经过神经网络层的非线性变换转换后,通过激活函数,得到输出。常见的激活函数包括:sigmoid, tanh, relu等。
损失函数:度量神经网络的输出的预测值,与实际值之间的差距的一种方式。常见的损失函数包括:最小二乘损失函数、交叉熵损失函数、回归中使用的smooth L1损失函数等。
优化函数:也就是如何把损失值从神经网络的最外层传递到最前面。如最基础的梯度下降算法,随机梯度下降算法,批量梯度下降算法,带动量的梯度下降算法,Adagrad,Adadelta,Adam等。
损失函数
eta = self.compiled_loss.input_gradients()
:通过损失函数的反向传播得到梯度。
激活函数
for layer in self.layers[::-1]:
eta = layer.input_gradients(eta)
:将梯度传播各个网络层进行反向传播。
优化函数
if training:
self.optimizer.apply_gradients(self.trainable_variables[0], self.trainable_variables[1])
:将前向传播一次,反向传播一次之后更新的weights, bias放入优化器中,把损失值从神经网络的最外层传递到最前面。
☆ class SequentialModel in assgnment.py
class SequentialModel(Beras.Model): """ Implemented in Beras/model.py def __init__(self, layers): def compile(self, optimizer, loss_fn, acc_fn): def fit(self, x, y, epochs, batch_size): def evaluate(self, x, y, batch_size): ## <- TODO """ def call(self, inputs): """ Forward pass in sequential model. It's helpful to note that layers are initialized in Beras.Model, and you can refer to them with self.layers. You can call a layer by doing var = layer(input). """ # TODO: The call function! for layer in self.layers: inputs = layer.forward(inputs) return inputs def batch_step(self, x, y, training=True): """ Computes loss and accuracy for a batch. This step consists of both a forward and backward pass. If training=false, don't apply gradients to update the model! Most of this method (forward, loss, applying gradients) will take place within the scope of Beras.GradientTape() """ # TODO: Compute loss and accuracy for a batch. # If training, then also update the gradients according to the optimizer y_pre = self.call(x) loss = self.compiled_loss.forward(y_pre, y) acc = self.compiled_acc(y_pre, y) eta = self.compiled_loss.input_gradients() # backwarding... for layer in self.layers[::-1]: #print(type(layer)) eta = layer.input_gradients(eta) if training: self.optimizer.apply_gradients(self.trainable_variables[0], self.trainable_variables[1]) return {"loss": loss, "acc": acc}
7、损失函数
这是模型训练中最关键的方面之一。在这次作业中,我没有按照实验中的描述那样子去实现MSE或者说均方误差损失函数,而是选择了CrossEntropyLoss损失函数。因为经过实验,其余两个损失函数效果都不太如人意。
注意:一般SoftMax的反向传播是与CrossEntropyLoss损失函数一起进行的,所以不填写SoftMax的方向传播部分。
● forward() : [TODO] 编写一个计算并返回平均值的函数给出预测和实际标签的平方误差。
提示:什么是MSE?在给出预测和实际标签的情况下,均方误差是预测值与实际值之间的差异。
● input_gradients() : [TODO] 计算并返回梯度。使用用微分法推导出这些梯度的公式。
losses.py
import numpy as np from .core import Diffable from abc import ABCMeta, abstractmethod import numpy as np class CrossEntropyLoss(Diffable): def __init__(self): self.classifier = Softmax() def input_gradients(self): return self.grad def forward(self, a, y): a = self.classifier.forward(a) self.grad = a - y loss = -1 * np.einsum('ij,ij->', y, np.log(a), optimize=True) / y.shape[0] return loss class Layer(metaclass=ABCMeta): @abstractmethod def forward(self, *args): pass @abstractmethod def backward(self, *args): pass class Softmax(Layer): def forward(self, x): v = np.exp(x - x.max(axis=-1, keepdims=True)) return v / v.sum(axis=-1, keepdims=True) def backward(self, eta): pass
8、优化函数
对于Mnist数据集来讲,单单只是RMSProp :已经完全足够,所以本文只实现了这一个优化函数。
● RMSProp : [TODO] 误差传播的均方根。
.optimizer.py
from collections import defaultdict import numpy as np class RMSProp: def __init__(self, learning_rate, beta=0.9, epsilon=1e-6): self.learning_rate = learning_rate self.beta = beta self.epsilon = epsilon self.v = defaultdict(lambda: 0) def apply_gradients(self, weights, grads): # TODO: Implement RMSProp optimization # Refer to the lab on Optimizers for a better understanding! self.mean_square = self.v['mean_square'] self.mean_square = self.beta*self.mean_square + (1-self.beta)*(grads)**2 self.v['mean_square'] = self.mean_square weights = weights - self.learning_rate/(np.sqrt(self.mean_square) + self.epsilon)*grads return
9、精度指标
本文件简单的实现了一个精度模型,用于测量模型精度:
● forward() : [TODO] 返回模型的分类精度预测概率和真标签。你应该返回的比例预测标签等于真实标签,其中图像的预测标签为与最高概率对应的标签。参考网络或讲座幻灯片的分类精度数学!
.metrics.py
import numpy as np from .core import Callable class CategoricalAccuracy(Callable): def forward(self, probs, labels): """Categorical accuracy forward pass!""" super().__init__() # TODO: Compute and return the categorical accuracy of your model given the output probabilities and true labels probsArg = np.argmax(probs, axis=1) labelsArg = np.argmax(labels, axis=1) return sum(probsArg==labelsArg)/len(labels)
10、训练和测试
构建了两个模型,仿造Keras:
● get_simple_model()中的一个简单模型,最多只有一个扩散层(例如:density - ./layers.py)和一个激活函数(在/ activation.py)。虽然可以这样做,但默认情况下为您提供了这个选项。如果你愿意,可以改一下。自动评分器将评估原始的一个!
● get_advanced_model()中稍微复杂一点的模型,有两个或更多扩散层和两个或两个以上的激活函数。我们推荐使用Adam该模型的优化器具有相当低的学习率。
def get_simple_model() in assgnment.py def get_simple_model_components(): """ Returns a simple single-layer model. """ ## DO NOT CHANGE IN FINAL SUBMISSION from Beras.activations import Softmax from Beras.layers import Dense from Beras.metrics import CategoricalAccuracy from Beras.optimizers import BasicOptimizer, RMSProp from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy # TODO: create a model and compile it with layers and functions of your choice model = SequentialModel([Dense(784, 10), Softmax()]) model.compile( optimizer=RMSProp(0.02), loss_fn=CrossEntropyLoss(), acc_fn=CategoricalAccuracy(), ) return SimpleNamespace(model=model, epochs=10, batch_size=100) get_advanced_model() in assgnment.py def get_advanced_model_components(): from Beras.activations import Softmax, LeakyReLU from Beras.layers import Dense from Beras.metrics import CategoricalAccuracy from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy from Beras.optimizers import BasicOptimizer, RMSProp from Beras.batchnorm import BatchNorm """ Returns a multi-layered model with more involved components. """ # TODO: create/compile a model with layers and functions of your choice. model = SequentialModel([Dense(784, 398), BatchNorm(398), LeakyReLU(0), Dense(398, 10), Softmax()]) model.compile( optimizer=RMSProp(0.02), loss_fn=CrossEntropyLoss(), acc_fn=CategoricalAccuracy(), ) return SimpleNamespace(model=model, epochs=12, batch_size=100)
11、可视化的结果
我们为您提供了visualize_metrics方法来可视化您的损失和每次使用matplotlib后,精确度都会发生变化。
.visualize.py
import matplotlib.pyplot as plt import numpy as np def visualize_metrics(losses=[], accuracies=[]): """ param losses: a 1D array of loss values param accuracies: a 1D array of accuracy values Displays a plot with loss and accuracy values on the y-axis and batch number/epoch number on the x-axis """ if not losses or not accuracies: return print("Must provide a list of losses/accuracies to visualize") x = np.arange(1, max(len(losses), len(accuracies)) + 1) plt.plot(x, losses) plt.plot(x, accuracies) plt.ylabel("Loss/Acc Value") plt.show() def visualize_images(model, train_inputs, train_labels_ohe, num_searching=500): """ param model: a neural network model (i.e. SequentialModel) param train_inputs: sample training inputs for the model to predict param train_labels_ohe: one-hot encoded training labels corresponding to train_inputs Displays 10 sample outputs the model correctly classifies and 10 sample outputs the model incorrectly classifies """ rand_idx = np.random.choice(len(train_inputs), num_searching) rand_batch = train_inputs[rand_idx] probs = model.call(rand_batch) pred_classes = np.argmax(probs, axis=1) true_classes = np.argmax(train_labels_ohe[rand_idx], axis=1) right_idx = np.where(pred_classes == true_classes) wrong_idx = np.where(pred_classes != true_classes) right = np.reshape(rand_batch[right_idx], (-1, 28, 28)) wrong = np.reshape(rand_batch[wrong_idx], (-1, 28, 28)) right_pred_labels = true_classes[right_idx] wrong_pred_labels = pred_classes[wrong_idx] assert len(right) >= 10, f"Found less than 10 correct predictions!" assert len(wrong) >= 10, f"Found less than 10 correct predictions!" fig, axs = plt.subplots(2, 10) fig.suptitle("Classigications\n(PL = Predicted Label)") subsets = [right, wrong] pred_labs = [right_pred_labels, wrong_pred_labels] for r in range(2): for c in range(10): axs[r, c].imshow(subsets[r][c], cmap="Greys") axs[r, c].set(title=f"PL: {pred_labs[r][c]}") plt.setp(axs[r, c].get_xticklabels(), visible=False) plt.setp(axs[r, c].get_yticklabels(), visible=False) axs[r, c].tick_params(axis="both", which="both", length=0) plt.show()
12、 调用前面11步写好的代码,对模型进行训练并且测试
.assignment.py
from types import SimpleNamespace import Beras import numpy as np class SequentialModel(Beras.Model): """ Implemented in Beras/model.py def __init__(self, layers): def compile(self, optimizer, loss_fn, acc_fn): def fit(self, x, y, epochs, batch_size): def evaluate(self, x, y, batch_size): ## <- TODO """ def call(self, inputs): """ Forward pass in sequential model. It's helpful to note that layers are initialized in Beras.Model, and you can refer to them with self.layers. You can call a layer by doing var = layer(input). """ # TODO: The call function! for layer in self.layers: inputs = layer.forward(inputs) return inputs def batch_step(self, x, y, training=True): """ Computes loss and accuracy for a batch. This step consists of both a forward and backward pass. If training=false, don't apply gradients to update the model! Most of this method (forward, loss, applying gradients) will take place within the scope of Beras.GradientTape() """ # TODO: Compute loss and accuracy for a batch. # If training, then also update the gradients according to the optimizer y_pre = self.call(x) loss = self.compiled_loss.forward(y_pre, y) acc = self.compiled_acc(y_pre, y) eta = self.compiled_loss.input_gradients() # backwarding... for layer in self.layers[::-1]: #print(type(layer)) eta = layer.input_gradients(eta) if training: self.optimizer.apply_gradients(self.trainable_variables[0], self.trainable_variables[1]) return {"loss": loss, "acc": acc} def get_simple_model_components(): """ Returns a simple single-layer model. """ ## DO NOT CHANGE IN FINAL SUBMISSION from Beras.activations import Softmax from Beras.layers import Dense from Beras.metrics import CategoricalAccuracy from Beras.optimizers import BasicOptimizer, RMSProp from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy # TODO: create a model and compile it with layers and functions of your choice model = SequentialModel([Dense(784, 10), Softmax()]) model.compile( optimizer=RMSProp(0.02), loss_fn=CrossEntropyLoss(), acc_fn=CategoricalAccuracy(), ) return SimpleNamespace(model=model, epochs=10, batch_size=100) def get_advanced_model_components(): from Beras.activations import Softmax, LeakyReLU from Beras.layers import Dense from Beras.metrics import CategoricalAccuracy from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy from Beras.optimizers import BasicOptimizer, RMSProp from Beras.batchnorm import BatchNorm """ Returns a multi-layered model with more involved components. """ # TODO: create/compile a model with layers and functions of your choice. model = SequentialModel([Dense(784, 398), BatchNorm(398), LeakyReLU(0), Dense(398, 10), Softmax()]) model.compile( optimizer=RMSProp(0.02), loss_fn=CrossEntropyLoss(), acc_fn=CategoricalAccuracy(), ) return SimpleNamespace(model=model, epochs=12, batch_size=100) if __name__ == "__main__": """ Read in MNIST data and initialize/train/test your model. """ from Beras.onehot import OneHotEncoder import preprocess ## Read in MNIST data, train_inputs, train_labels = preprocess.get_data_MNIST("train", "../data") test_inputs, test_labels = preprocess.get_data_MNIST("test", "../data") ## TODO: Use the OneHotEncoder class to one hot encode the labels # ohe = lambda x: 0 ## placeholder function: returns zero for a given input ohe = OneHotEncoder() ohe.fit(train_labels) ## Get your model to train and test simple = False args = get_simple_model_components() if simple else get_advanced_model_components() model = args.model ## REMINDER: Threshold of accuracy: ## 1470: >85% on testing accuracy from get_simple_model_components ## 2470: >95% on testing accuracy from get_advanced_model_components # TODO: Fit your model to the training input and the one hot encoded labels # Remember to pass all the arguments that SequentialModel.fit() requires # such as number of epochs and the batch size print('---------------------------[[[Train]]]]---------------------------') train_agg_metrics = model.fit( train_inputs, ohe(train_labels), epochs = args.epochs, batch_size = args.batch_size ) print('-------------------------------------------------------------------') ## Feel free to use the visualize_metrics function to view your accuracy and loss. ## The final accuracy returned during evaluation must be > 80%. # from visualize import visualize_images, visualize_metrics # visualize_metrics(train_agg_metrics["loss"], train_agg_metrics["acc"]) # visualize_images(model, train_inputs, ohe(tr ain_labels)) ## TODO: Evaluate your model using your testing inputs and one hot encoded labels. ## This is the number you will be using! print('---------------------------[[[Evaluate]]]---------------------------') test_agg_metrics = model.evaluate(test_inputs, ohe(test_labels), batch_size=100) print('Testing Performance:', test_agg_metrics) print('-----------------------------------------------------------------')
自认为算是一次我做的勉强合格(不够好的意思)的作业,提供的答案也仅供参考,祝大家玩的开心!