【Multi-NN】解析参考:Numpy手写的多层神经网络

简介: 【Multi-NN】解析参考:Numpy手写的多层神经网络

前言


       由于原作业需要实现的函数过多,本文先不打算从算法原理讲解,而是直接贴上结果代码,提供大家参考,该实验按照此文章的标准来构建:

image.png


提供成品代码文件


文件获取:


链接:https://pan.baidu.com/s/1Fw_7thL5PxR79zI6XbpnYQ

提取码:txqe


文件结构:


| - hw2 
        | - code
                | - Beras
                        | -  8个.py文件用于实现实验要求函数
                | - assignment.py
                | - preprocess.py
                | - visualize.py
        | - data
                | - mnist
                        | - 四个数据集文件
                | - Iris (可以忽略,不在本实验中使用)


1. 预处理的数据


       该文件为实验自带,主要实现功能为:从../data/mnist/中的4个.gz文件中读取到mnist数据集的分别用于Tran和Test训练集和测试集(2*2=四个)。


preprocess.py


import gzip
import pickle
from unicodedata import numeric
import numpy as np
"""
TODO: 
Same as HW1. Feel free to copy and paste your old implementation here.
It's a good time to vectorize it, while you're at it!
No need to include CIFAR-specific methods.
"""
def get_data_MNIST(subset, data_path="../data", is_reshape=True):
    """
    :param subset: string indicating whether we want the training or testing data 
        (only accepted values are 'train' and 'test')
    :param data_path: directory containing the training and testing inputs and labels
    :return: NumPy array of inputs (float32) and labels (uint8)
    """
    ## http://yann.lecun.com/exdb/mnist/
    subset = subset.lower().strip()
    assert subset in ("test", "train"), f"unknown data subset {subset} requested"
    inputs_file_path, labels_file_path, num_examples = {
        "train": ("train-images-idx3-ubyte.gz", "train-labels-idx1-ubyte.gz", 60000),
        "test": ("t10k-images-idx3-ubyte.gz", "t10k-labels-idx1-ubyte.gz", 10000),
    }[subset]
    inputs_file_path = f"{data_path}/mnist/{inputs_file_path}"
    labels_file_path = f"{data_path}/mnist/{labels_file_path}"
    ## TODO: read the image file and normalize, flatten, and type-convert image
    with open(inputs_file_path, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream:
        buf = bytestream.read(num_examples*28*28 + 16)
        dt = np.dtype(np.uint8)
        temp = np.frombuffer(buf, dtype=dt) 
        image = temp[16:]
        if is_reshape:
            image = image.reshape((num_examples,28*28))
        else:
            image = image.reshape((num_examples, 28, 28, 1))
        image = image/255.0
    print(image.shape)
    ## TODO: read the label file
    with open(labels_file_path, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream:
        buf = bytestream.read(num_examples + 8)
        dt = np.dtype(np.uint8)
        temp = np.frombuffer(buf, dtype=dt) 
        label = temp[8:]
    return image, label
## THE REST ARE OPTIONAL!
'''
def shuffle_data(image_full, label_full, seed):
    pass
def get_subset(image_full, label_full, class_list=list(range(10)), num=100):
    pass
'''


2、独热编码


       此文件用于实现独热编码,需要手写的地方如下:


● fit(): [TODO]在这个函数中,你需要在Data(将其存储在self.uniq中)并创建一个以标签作为键的字典和它们对应的一个热编码作为值。提示:你可能想这么做查看np.eye()以获得单热编码。最终,您将存储它在self.uniq2oh字典。


● forward():在这个函数中,我们传递一个向量,包含对象中所有实际的标签训练集并调用fit()来用unique填充uniq2oh字典标签及其对应的one-hot编码,然后使用它返回一个针对训练集中每个标签的单热编码标签数组。


这个函数已经为您填好了!


●inverse():在函数中,我们将one-hot编码反转为实际编码标签。


这已经为你做过了。


例如,如果我们有标签X和Y,其单热编码为[1,0]和[0,1],我们将{X: [1,0], Y:[0,1]}。


对于MNIST,你将有10个标签,所以你的字典应该有10个条目!


onehot.py


import numpy as np
from .core import Callable
class OneHotEncoder(Callable):
    """
    One-Hot Encodes labels. First takes in a candidate set to figure out what elements it
    needs to consider, and then one-hot encodes subsequent input datasets in the
    forward pass.
    SIMPLIFICATIONS:
     - Implementation assumes that entries are individual elements.
     - Forward will call fit if it hasn't been done yet; most implementations will just error.
     - keras does not have OneHotEncoder; has LabelEncoder, CategoricalEncoder, and to_categorical()
    """
    def fit(self, data):
        """
        Fits the one-hot encoder to a candidate dataset. Said dataset should contain
        all encounterable elements.
        :param data: 1D array containing labels.
            For example, data = [0, 1, 3, 3, 1, 9, ...]
        """
        ## TODO: Fetch all the unique labels and create a dictionary with
        ## the unique labels as keys and their one hot encodings as values
        ## HINT: look up np.eye() and see if you can utilize it!
        ## HINT: Wouldn't it be nice if we just gave you the implementation somewhere...
        self.uniq = np.unique(data)  # all the unique labels from `data`
        self.uniq2oh = {}  # a lookup dictionary with labels and corresponding encodings
        eye = np.eye(len(self.uniq))
        for i in range(len(self.uniq)):
            self.uniq2oh[self.uniq[i]] = eye[i]
    def forward(self, data):
        if not hasattr(self, "uniq2oh"):
            self.fit(data)
        return np.array([self.uniq2oh[x] for x in data])
    def inverse(self, data):
        assert hasattr(self, "uniq"), \
            "forward() or fit() must be called before attempting to invert"
        return np.array([self.uniq[x == 1][0] for x in data])


3、核心抽象


       本文件为实验给定代码,无需做出修改,


core.py


from abc import ABC, abstractmethod  # # For abstract method support
from typing import Tuple
import numpy as np
## DO NOT MODIFY THIS CLASS
class Callable(ABC):
    """
    Callable Sub-classes:
     - CategoricalAccuracy (./metrics.py)       - TODO
     - OneHotEncoder       (./preprocess.py)    - TODO
     - Diffable            (.)                  - DONE
    """
    def __call__(self, *args, **kwargs) -> np.array:
        """Lets `self()` and `self.forward()` be the same"""
        return self.forward(*args, **kwargs)
    @abstractmethod
    def forward(self, *args, **kwargs) -> np.array:
        """Pass inputs through function. Can store inputs and outputs as instance variables"""
        pass
## DO NOT MODIFY THIS CLASS
class Diffable(Callable):
    """
    Diffable Sub-classes:
     - Dense            (./layers.py)           - TODO
     - LeakyReLU, ReLU  (./activations.py)      - TODO
     - Softmax          (./activations.py)      - TODO
     - MeanSquaredError (./losses.py)           - TODO
    """
    """Stores whether the operation being used is inside a gradient tape scope"""
    gradient_tape = None  ## All-instance-shared variable
    def __init__(self):
        """Is the layer trainable"""
        super().__init__()
        self.trainable = True  ## self-only instance variable
    def __call__(self, *args, **kwargs) -> np.array:
        """
        If there is a gradient tape scope in effect, perform AND RECORD the operation.
        Otherwise... just perform the operation and don't let the gradient tape know.
        """
        if Diffable.gradient_tape is not None:
            Diffable.gradient_tape.operations += [self]
        return self.forward(*args, **kwargs)
    @abstractmethod
    def input_gradients(self: np.array) -> np.array:
        """Returns gradient for input (this part gets specified for all diffables)"""
        pass
    def weight_gradients(self: np.array) -> Tuple[np.array, np.array]:
        """Returns gradient for weights (this part gets specified for SOME diffables)"""
        return ()
    def compose_to_input(self, J: np.array) -> np.array:
        """
        Compose the inputted cumulative jacobian with the input jacobian for the layer.
        Implemented with batch-level vectorization.
        Requires `input_gradients` to provide either batched or overall jacobian.
        Assumes input/cumulative jacobians are matrix multiplied
        """
        #  print(f"Composing to input in {self.__class__.__name__}")
        ig = self.input_gradients()
        batch_size = J.shape[0]
        n_out, n_in = ig.shape[-2:]
        j_new = np.zeros((batch_size, n_out), dtype=ig.dtype)
        for b in range(batch_size):
            ig_b = ig[b] if len(ig.shape) == 3 else ig
            j_new[b] = ig_b @ J[b]
        return j_new
    def compose_to_weight(self, J: np.array) -> list:
        """
        Compose the inputted cumulative jacobian with the weight jacobian for the layer.
        Implemented with batch-level vectorization.
        Requires `weight_gradients` to provide either batched or overall jacobian.
        Assumes weight/cumulative jacobians are element-wise multiplied (w/ broadcasting)
        and the resulting per-batch statistics are averaged together for avg per-param gradient.
        """
        # print(f'Composing to weight in {self.__class__.__name__}')
        assert hasattr(
            self, "weights"
        ), f"Layer {self.__class__.__name__} cannot compose along weight path"
        J_out = []
        ## For every weight/weight-gradient pair...
        for w, wg in zip(self.weights, self.weight_gradients()):
            batch_size = J.shape[0]
            ## Make a cumulative jacobian which will contribute to the final jacobian
            j_new = np.zeros((batch_size, *w.shape), dtype=wg.dtype)
            ## For every element in the batch (for a single batch-level gradient updates)
            for b in range(batch_size):
                ## If the weight gradient is a batch of transform matrices, get the right entry.
                ## Allows gradient methods to give either batched or non-batched matrices
                wg_b = wg[b] if len(wg.shape) == 3 else wg
                ## Update the batch's Jacobian update contribution
                j_new[b] = wg_b * J[b]
            ## The final jacobian for this weight is the average gradient update for the batch
            J_out += [np.mean(j_new, axis=0)]
        ## After new jacobian is computed for each weight set, return the list of gradient updatates
        return J_out
class GradientTape:
    def __init__(self):
        ## Log of operations that were performed inside tape scope
        self.operations = []
    def __enter__(self):
        # When tape scope is entered, let Diffable start recording to self.operation
        Diffable.gradient_tape = self
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        # When tape scope is exited, stop letting Diffable record
        Diffable.gradient_tape = None
    def gradient(self) -> list:
        """Get the gradient from first to last recorded operation"""
        ## TODO:
        ##
        ##  Compute weight gradients for all operations.
        ##  If the model has trainable weights [w1, b1, w2, b2] and ends at a loss L.
        ##  the model should return: [dL/dw1, dL/db1, dL/dw2, dL/db2]
        ##
        ##  Recall that self.operations is populated by Diffable class instances...
        ##
        ##  Start from the last operation and compute jacobian w.r.t input.
        ##  Continue to propagate the cumulative jacobian through the layer inputs
        ##  until all operations have been differentiated through.
        ##
        ##  If an operation that has weights is encountered along the way,
        ##  compute the weight gradients and add them to the return list.
        ##  Remember to check if the layer is trainable before doing this though...
        grads = []
        return grads


4、网络层


       此层仿造Keras中的Dense,需要手写函数为:


● forward() : [TODO] 实现向前传递和返回输出。


● weight_gradients() : [TODO] 计算关于的梯度权重和偏差。这将用于优化图层。


● input_gradients() : [TODO] 计算关于的梯度层的输入。这将用于将渐变传播到前面的层。


● _initialize_weight() : [TODO]


初始化致密层的权重值默认情况下,将所有权重初始化为零(顺便说一下,这通常是个坏主意)。你也需要允许更复杂的选项(当初始化式为设置为normal, xavier和kaiing)。遵循Keras的数学假设!


〇 Normal:不言自明,单位正态分布。


〇 Xavier Normal:基于keras.GlorotNormal。


〇 Kaiing He Normal:基于Keras.HeNormal。


在实现这些时,你可能会发现np.random.normal很有帮助。的行动计划说明为什么这些不同的初始化方法是必要的,但是欲了解更多细节,请查看这个网站!请随意添加更多初始化器选项!


.layers


import numpy as np
from .core import Diffable
class Dense(Diffable):
    # https://towardsdatascience.com/weight-initialization-in-neural-networks-a-journey-from-the-basics-to-kaiming-954fb9b47c79
    def __init__(self, input_size, output_size, learning_rate=0.01, initializer="kaiming"):
        super().__init__()
        self.w, self.b = self.__class__._initialize_weight(
            initializer, input_size, output_size
        )
        self.weights = [self.w, self.b]
        self.learning_rate = learning_rate
        self.inputs  = None
        self.outputs = None
    def forward(self, inputs):
        """Forward pass for a dense layer! Refer to lecture slides for how this is computed."""
        self.inputs = inputs
        # TODO: implement the forward pass and return the outputs
        self.outputs = np.matmul(inputs, self.w) + self.b
        return self.outputs
    def weight_gradients(self, eta):
        """Calculating the gradients wrt weights and biases!"""
        # TODO: Implement calculation of gradients
        wgrads = np.dot(self.inputs.T, eta)
        bgrads = np.sum(eta, axis=0)
        return wgrads, bgrads
    def input_gradients(self, eta):
        """Calculating the gradients wrt inputs!"""
        # TODO: Implement calculation of gradients
        inputgrads = np.dot(eta, self.w.T)
        wgrads, bgrads = self.weight_gradients(eta)
        self.w = self.w - self.learning_rate*wgrads
        self.b = self.b - self.learning_rate*bgrads
        return inputgrads
    @staticmethod
    def _initialize_weight(initializer, input_size, output_size):
        """
        Initializes the values of the weights and biases. The bias weights should always start at zero.
        However, the weights should follow the given distribution defined by the initializer parameter
        (zero, normal, xavier, or kaiming). You can do this with an if statement
        cycling through each option!
        Details on each weight initialization option:
            - Zero: Weights and biases contain only 0's. Generally a bad idea since the gradient update
            will be the same for each weight so all weights will have the same values.
            - Normal: Weights are initialized according to a normal distribution.
            - Xavier: Goal is to initialize the weights so that the variance of the activations are the
            same across every layer. This helps to prevent exploding or vanishing gradients. Typically
            works better for layers with tanh or sigmoid activation.
            - Kaiming: Similar purpose as Xavier initialization. Typically works better for layers
            with ReLU activation.
        """
        initializer = initializer.lower()
        assert initializer in (
            "zero",
            "normal",
            "xavier",
            "kaiming",
        ), f"Unknown dense weight initialization strategy '{initializer}' requested"
        io_size = (input_size, output_size)
        # TODO: Implement default assumption: zero-init for weights and bias
        initial_b = np.zeros((1,output_size))
        if initializer=="zero":
            initial_w = np.zeros(io_size)
        # TODO: Implement remaining options (normal, xavier, kaiming initializations). Note that
        # strings must be exactly as written in the assert above
        elif initializer=="normal":
            initial_w = np.random.randn(input_size, output_size)
        elif initializer=="xavier":
            initial_w = np.random.randn(input_size, output_size) * np.sqrt(1 / output_size)
        elif initializer=="kaiming":
            initial_w = np.random.randn(input_size, output_size) * np.sqrt(2 / output_size)
        return initial_w, initial_b


5、激活函数


       该文件用于实现LeakRelu激活函数和SoftMax激活函数,手写了他们的前向传播[def forward]和反向传播[def input_fradients]:


● LeakyReLU ()


       〇 forward() : [TODO]给定输入x,计算并返回LeakyReLU(x)。


       〇 input_gradients() : [TODO]计算并返回与通过对LeakyReLU求导得到输入。


● Softmax():(2470 ONLY)


       〇 forward(): [TODO]给定输入x,计算并返回Softmax(x)。确保使用的是稳定的softmax,即减去所有项的最大值防止溢出/undvim erflow问题。


       〇 input_gradients(): [TODO] Softmax()的部分w.r.t输入。


.activations.py


import numpy as np
from .core import Diffable
class LeakyReLU(Diffable):
    def __init__(self, alpha=0.3):
        super().__init__()
        self.alpha = alpha
        self.inputs = None
        self.outputs = None
    def forward(self, inputs):
        # TODO: Given an input array `x`, compute LeakyReLU(x)
        self.inputs = inputs
        # Your code here:
        self.outputs = inputs if inputs.all()>=0 else inputs*self.alpha
        return self.outputs
    def input_gradients(self, eta):
        # TODO: Compute and return the gradients
        eta[self.inputs<=0] = 0
        return eta
    def compose_to_input(self, J):
        # TODO: Maybe you'll want to override the default?
        return super().compose_to_input(J)
class ReLU(LeakyReLU):
    def __init__(self):
        super().__init__(alpha=0)
class Softmax(Diffable):
    def __init__(self):
        super().__init__()
        self.inputs = None
        self.outputs = None
    def forward(self, inputs):
        """Softmax forward pass!"""
        # TODO: Implement
        # HINT: Use stable softmax, which subtracts maximum from
        # all entries to prevent overflow/underflow issues
        self.inputs = inputs
        # Your code here:
        z = inputs - np.max(inputs, axis=-1,keepdims=True)
        numerator = np.exp(z)
        denominator = np.sum(numerator)
        self.outputs = numerator/denominator
        return self.outputs
    def input_gradients(self, etc):
        """Softmax backprop!"""
        # TODO: Compute and return the gradients
        return etc


☆6、填充函数


       本文用于手写Keras中的序列模型SequentialModel类,SequentialModel继承Model类,从而我们先实现Model类具体内容如下:


● compile() : 初始化模型优化器,损失函数和精度函数,它们作为参数输入,供SequentialModel实例使用。


● fit() : 训练模型将输入和输出关联起来。重复训练每个时代,数据是基于参数的批处理。它还计算Batch_metrics、epoch_metrics和聚合的agg_metrics可以用来跟踪模型的训练进度。


● evaluate() : [TODO] 评估最终模型的性能使用测试阶段中提到的指标。它几乎和符合()函数;想想培训和测试之间会发生什么变化)。


● call() : [TODO] 提示:调用顺序模型意味着什么?还记得顺序模型是一堆层,每一层只有一个输入向量和一个输出向量。你可以在在assignment.py中的SequentialModel类。


● batch_step() : [TODO] 您将看到fit()为每一个都调用了这个函数批处理。您将首先计算输入批处理的模型预测。在训练阶段,你需要计算梯度和更新你的权重根据您正在使用的优化器。对于训练过程中的反向传播,你将使用GradientTape从核心抽象(core.py)来记录操作和中间值。然后您将使用模型的优化器来将梯度应用到模型的可训练变量上。最后,计算和返回该批次的损耗和精度。你可以在在assignment.py中的SequentialModel类。


model.py


from abc import ABC, abstractmethod
from collections import defaultdict
import numpy as np
from .core import Diffable
def print_stats(stat_dict, b=None, b_num=None, e=None, avg=False):
    """
    Given a dictionary of names statistics and batch/epoch info,
    print them in an appealing manner. If avg, display stat averages.
    """
    title_str = " - "
    if e is not None:
        title_str += f"Epoch {e+1:2}: "
    if b is not None:
        title_str += f"Batch {b+1:3}"
        if b_num is not None:
            title_str += f"/{b_num}"
    if avg:
        title_str += f"Average Stats"
    print(f"\r{title_str} : ", end="")
    op = np.mean if avg else lambda x: x
    print({k: np.round(op(v), 4) for k, v in stat_dict.items()}, end="")
    print("   ", end="" if not avg else "\n")
def update_metric_dict(super_dict, sub_dict):
    """
    Appends the average of the sub_dict metrics to the super_dict's metric list
    """
    for k, v in sub_dict.items():
        super_dict[k] += [np.mean(v)]
class Model(ABC):
    ###############################################################################################
    ## BEGIN GIVEN
    def __init__(self, layers):
        """
        Initialize all trainable parameters and take layers as inputs
        """
        # Initialize all trainable parameters
        assert all([issubclass(layer.__class__, Diffable) for layer in layers])
        self.layers = layers[:-1]
        self.trainable_variables = []
        for layer in layers:
            if hasattr(layer, "weights") and layer.trainable:
                for weight in layer.weights:
                    self.trainable_variables += [weight]
    def compile(self, optimizer, loss_fn, acc_fn):
        """
        "Compile" the model by taking in the optimizers, loss, and accuracy functions.
        In more optimized DL implementations, this will have more involved processes
        that make the components extremely efficient but very inflexible.
        """
        self.optimizer = optimizer
        self.compiled_loss = loss_fn
        self.compiled_acc = acc_fn
    def fit(self, x, y, epochs, batch_size):
        """
        Trains the model by iterating over the input dataset and feeding input batches
        into the batch_step method with training. At the end, the metrics are returned.
        """
        agg_metrics = defaultdict(lambda: [])
        batch_num = x.shape[0] // batch_size
        for e in range(epochs):
            epoch_metrics = defaultdict(lambda: [])
            for b, b1 in enumerate(range(batch_size, x.shape[0] + 1, batch_size)):
                b0 = b1 - batch_size
                batch_metrics = self.batch_step(x[b0:b1], y[b0:b1], training=True)
                update_metric_dict(epoch_metrics, batch_metrics)
                print_stats(batch_metrics, b, batch_num, e)
            update_metric_dict(agg_metrics, epoch_metrics)
            print_stats(epoch_metrics, e=e, avg=True)
        return agg_metrics
    def evaluate(self, x, y, batch_size):
        """
        X is the dataset inputs, Y is the dataset labels.
        Evaluates the model by iterating over the input dataset in batches and feeding input batches
        into the batch_step method. At the end, the metrics are returned. Should be called on
        the testing set to evaluate accuracy of the model using the metrics output from the fit method.
        NOTE: This method is almost identical to fit (think about how training and testing differ --
        the core logic should be the same)
        """
        # TODO: Implement evaluate similarly to fit.
        agg_metrics = defaultdict(lambda: [])
        batch_num = x.shape[0] // batch_size
        for e in range(1):
            epoch_metrics = defaultdict(lambda: [])
            for b, b1 in enumerate(range(batch_size, x.shape[0] + 1, batch_size)):
                b0 = b1 - batch_size
                batch_metrics = self.batch_step(x[b0:b1], y[b0:b1], training=False)
                update_metric_dict(epoch_metrics, batch_metrics)
                print_stats(batch_metrics, b, batch_num, e)
            update_metric_dict(agg_metrics, epoch_metrics)
            print_stats(epoch_metrics, e=e, avg=True)
        return agg_metrics
    @abstractmethod
    def call(self, inputs):
        """You will implement this in the SequentialModel class in assignment.py"""
        return
    @abstractmethod
    def batch_step(self, x, y, training=True):
        """You will implement this in the SequentialModel class in assignment.py"""
        return


☆def batch_step() 解析:


y_pre = self.call(x)

:通过前向传播得到网络传播一次后的预测值,


loss = self.compiled_loss.forward(y_pre, y)

:将预测值与真实值放入损失函数中通过前向传播得到损失值。


acc = self.compiled_acc(y_pre, y)

:将预测值与真实值放入精度函数中通过前向传播得到精度值。


各函数反向传播的意义:


激活函数:将神经网络上一层的输入,经过神经网络层的非线性变换转换后,通过激活函数,得到输出。常见的激活函数包括:sigmoid, tanh, relu等。


损失函数:度量神经网络的输出的预测值,与实际值之间的差距的一种方式。常见的损失函数包括:最小二乘损失函数、交叉熵损失函数、回归中使用的smooth L1损失函数等。


优化函数:也就是如何把损失值从神经网络的最外层传递到最前面。如最基础的梯度下降算法,随机梯度下降算法,批量梯度下降算法,带动量的梯度下降算法,Adagrad,Adadelta,Adam等。


损失函数


eta = self.compiled_loss.input_gradients()

:通过损失函数的反向传播得到梯度。


激活函数


for layer in self.layers[::-1]:

       eta = layer.input_gradients(eta)

:将梯度传播各个网络层进行反向传播。


优化函数


if training:

           self.optimizer.apply_gradients(self.trainable_variables[0], self.trainable_variables[1])

:将前向传播一次,反向传播一次之后更新的weights, bias放入优化器中,把损失值从神经网络的最外层传递到最前面。


☆ class SequentialModel in assgnment.py


class SequentialModel(Beras.Model):
    """
    Implemented in Beras/model.py
    def __init__(self, layers):
    def compile(self, optimizer, loss_fn, acc_fn):
    def fit(self, x, y, epochs, batch_size):
    def evaluate(self, x, y, batch_size):           ## <- TODO
    """
    def call(self, inputs):
        """
        Forward pass in sequential model. It's helpful to note that layers are initialized in Beras.Model, and
        you can refer to them with self.layers. You can call a layer by doing var = layer(input).
        """
        # TODO: The call function!
        for layer in self.layers:
            inputs = layer.forward(inputs)
        return inputs
    def batch_step(self, x, y, training=True):
        """
        Computes loss and accuracy for a batch. This step consists of both a forward and backward pass.
        If training=false, don't apply gradients to update the model! 
        Most of this method (forward, loss, applying gradients)
        will take place within the scope of Beras.GradientTape()
        """
        # TODO: Compute loss and accuracy for a batch.
        # If training, then also update the gradients according to the optimizer
        y_pre = self.call(x)
        loss = self.compiled_loss.forward(y_pre, y)
        acc = self.compiled_acc(y_pre, y)
        eta = self.compiled_loss.input_gradients()
        # backwarding...
        for layer in self.layers[::-1]:
            #print(type(layer))
            eta = layer.input_gradients(eta)
        if training:
            self.optimizer.apply_gradients(self.trainable_variables[0], self.trainable_variables[1])
        return {"loss": loss, "acc": acc}


7、损失函数


       这是模型训练中最关键的方面之一。在这次作业中,我没有按照实验中的描述那样子去实现MSE或者说均方误差损失函数,而是选择了CrossEntropyLoss损失函数。因为经过实验,其余两个损失函数效果都不太如人意。


注意:一般SoftMax的反向传播是与CrossEntropyLoss损失函数一起进行的,所以不填写SoftMax的方向传播部分。


● forward() : [TODO] 编写一个计算并返回平均值的函数给出预测和实际标签的平方误差。


提示:什么是MSE?在给出预测和实际标签的情况下,均方误差是预测值与实际值之间的差异。


● input_gradients() : [TODO] 计算并返回梯度。使用用微分法推导出这些梯度的公式。


losses.py


import numpy as np
from .core import Diffable
from abc import ABCMeta, abstractmethod
import numpy as np
class CrossEntropyLoss(Diffable):
    def __init__(self):
        self.classifier = Softmax()
    def input_gradients(self):
        return self.grad
    def forward(self, a, y):
        a = self.classifier.forward(a)
        self.grad = a - y
        loss = -1 * np.einsum('ij,ij->', y, np.log(a), optimize=True) / y.shape[0]
        return loss
class Layer(metaclass=ABCMeta):
    @abstractmethod
    def forward(self, *args):
        pass
    @abstractmethod
    def backward(self, *args):
        pass
class Softmax(Layer):
    def forward(self, x):
        v = np.exp(x - x.max(axis=-1, keepdims=True))    
        return v / v.sum(axis=-1, keepdims=True)
    def backward(self, eta):
        pass


8、优化函数


       对于Mnist数据集来讲,单单只是RMSProp :已经完全足够,所以本文只实现了这一个优化函数。


● RMSProp : [TODO] 误差传播的均方根。


.optimizer.py


from collections import defaultdict
import numpy as np
class RMSProp:
    def __init__(self, learning_rate, beta=0.9, epsilon=1e-6):
        self.learning_rate = learning_rate
        self.beta = beta
        self.epsilon = epsilon
        self.v = defaultdict(lambda: 0)
    def apply_gradients(self, weights, grads):
        # TODO: Implement RMSProp optimization
        # Refer to the lab on Optimizers for a better understanding!
        self.mean_square = self.v['mean_square']
        self.mean_square = self.beta*self.mean_square + (1-self.beta)*(grads)**2
        self.v['mean_square'] = self.mean_square
        weights = weights - self.learning_rate/(np.sqrt(self.mean_square) + self.epsilon)*grads
        return


9、精度指标


       本文件简单的实现了一个精度模型,用于测量模型精度:


● forward() : [TODO] 返回模型的分类精度预测概率和真标签。你应该返回的比例预测标签等于真实标签,其中图像的预测标签为与最高概率对应的标签。参考网络或讲座幻灯片的分类精度数学!


.metrics.py


import numpy as np
from .core import Callable
class CategoricalAccuracy(Callable):
    def forward(self, probs, labels):
        """Categorical accuracy forward pass!"""
        super().__init__()
        # TODO: Compute and return the categorical accuracy of your model given the output probabilities and true labels
        probsArg = np.argmax(probs, axis=1)
        labelsArg = np.argmax(labels, axis=1)
        return sum(probsArg==labelsArg)/len(labels)


10、训练和测试


       构建了两个模型,仿造Keras:


● get_simple_model()中的一个简单模型,最多只有一个扩散层(例如:density - ./layers.py)和一个激活函数(在/ activation.py)。虽然可以这样做,但默认情况下为您提供了这个选项。如果你愿意,可以改一下。自动评分器将评估原始的一个!


● get_advanced_model()中稍微复杂一点的模型,有两个或更多扩散层和两个或两个以上的激活函数。我们推荐使用Adam该模型的优化器具有相当低的学习率。


def  get_simple_model() in assgnment.py
def get_simple_model_components():
    """
    Returns a simple single-layer model.
    """
    ## DO NOT CHANGE IN FINAL SUBMISSION
    from Beras.activations import Softmax
    from Beras.layers import Dense
    from Beras.metrics import CategoricalAccuracy
    from Beras.optimizers import BasicOptimizer, RMSProp
    from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy
    # TODO: create a model and compile it with layers and functions of your choice
    model = SequentialModel([Dense(784, 10), Softmax()])
    model.compile(
        optimizer=RMSProp(0.02),
        loss_fn=CrossEntropyLoss(),
        acc_fn=CategoricalAccuracy(),
    )
    return SimpleNamespace(model=model, epochs=10, batch_size=100)
 get_advanced_model() in assgnment.py
def get_advanced_model_components():
    from Beras.activations import Softmax, LeakyReLU
    from Beras.layers import Dense
    from Beras.metrics import CategoricalAccuracy
    from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy
    from Beras.optimizers import BasicOptimizer, RMSProp
    from Beras.batchnorm import BatchNorm
    """
    Returns a multi-layered model with more involved components.
    """
    # TODO: create/compile a model with layers and functions of your choice.
    model = SequentialModel([Dense(784, 398), BatchNorm(398), LeakyReLU(0), Dense(398, 10), Softmax()])
    model.compile(
        optimizer=RMSProp(0.02),
        loss_fn=CrossEntropyLoss(),
        acc_fn=CategoricalAccuracy(),
    )
    return SimpleNamespace(model=model, epochs=12, batch_size=100)


11、可视化的结果


  我们为您提供了visualize_metrics方法来可视化您的损失和每次使用matplotlib后,精确度都会发生变化。


.visualize.py


import matplotlib.pyplot as plt
import numpy as np
def visualize_metrics(losses=[], accuracies=[]):
    """
    param losses: a 1D array of loss values
    param accuracies: a 1D array of accuracy values
    Displays a plot with loss and accuracy values on the y-axis and batch number/epoch number on the
    x-axis
    """
    if not losses or not accuracies:
        return print("Must provide a list of losses/accuracies to visualize")
    x = np.arange(1, max(len(losses), len(accuracies)) + 1)
    plt.plot(x, losses)
    plt.plot(x, accuracies)
    plt.ylabel("Loss/Acc Value")
    plt.show()
def visualize_images(model, train_inputs, train_labels_ohe, num_searching=500):
    """
    param model: a neural network model (i.e. SequentialModel)
    param train_inputs: sample training inputs for the model to predict
    param train_labels_ohe: one-hot encoded training labels corresponding to train_inputs
    Displays 10 sample outputs the model correctly classifies and 10 sample outputs the model
    incorrectly classifies
    """
    rand_idx = np.random.choice(len(train_inputs), num_searching)
    rand_batch = train_inputs[rand_idx]
    probs = model.call(rand_batch)
    pred_classes = np.argmax(probs, axis=1)
    true_classes = np.argmax(train_labels_ohe[rand_idx], axis=1)
    right_idx = np.where(pred_classes == true_classes)
    wrong_idx = np.where(pred_classes != true_classes)
    right = np.reshape(rand_batch[right_idx], (-1, 28, 28))
    wrong = np.reshape(rand_batch[wrong_idx], (-1, 28, 28))
    right_pred_labels = true_classes[right_idx]
    wrong_pred_labels = pred_classes[wrong_idx]
    assert len(right) >= 10, f"Found less than 10 correct predictions!"
    assert len(wrong) >= 10, f"Found less than 10 correct predictions!"
    fig, axs = plt.subplots(2, 10)
    fig.suptitle("Classigications\n(PL = Predicted Label)")
    subsets = [right, wrong]
    pred_labs = [right_pred_labels, wrong_pred_labels]
    for r in range(2):
        for c in range(10):
            axs[r, c].imshow(subsets[r][c], cmap="Greys")
            axs[r, c].set(title=f"PL: {pred_labs[r][c]}")
            plt.setp(axs[r, c].get_xticklabels(), visible=False)
            plt.setp(axs[r, c].get_yticklabels(), visible=False)
            axs[r, c].tick_params(axis="both", which="both", length=0)
    plt.show()


12、 调用前面11步写好的代码,对模型进行训练并且测试


.assignment.py


from types import SimpleNamespace
import Beras
import numpy as np
class SequentialModel(Beras.Model):
    """
    Implemented in Beras/model.py
    def __init__(self, layers):
    def compile(self, optimizer, loss_fn, acc_fn):
    def fit(self, x, y, epochs, batch_size):
    def evaluate(self, x, y, batch_size):           ## <- TODO
    """
    def call(self, inputs):
        """
        Forward pass in sequential model. It's helpful to note that layers are initialized in Beras.Model, and
        you can refer to them with self.layers. You can call a layer by doing var = layer(input).
        """
        # TODO: The call function!
        for layer in self.layers:
            inputs = layer.forward(inputs)
        return inputs
    def batch_step(self, x, y, training=True):
        """
        Computes loss and accuracy for a batch. This step consists of both a forward and backward pass.
        If training=false, don't apply gradients to update the model! 
        Most of this method (forward, loss, applying gradients)
        will take place within the scope of Beras.GradientTape()
        """
        # TODO: Compute loss and accuracy for a batch.
        # If training, then also update the gradients according to the optimizer
        y_pre = self.call(x)
        loss = self.compiled_loss.forward(y_pre, y)
        acc = self.compiled_acc(y_pre, y)
        eta = self.compiled_loss.input_gradients()
        # backwarding...
        for layer in self.layers[::-1]:
            #print(type(layer))
            eta = layer.input_gradients(eta)
        if training:
            self.optimizer.apply_gradients(self.trainable_variables[0], self.trainable_variables[1])
        return {"loss": loss, "acc": acc}
def get_simple_model_components():
    """
    Returns a simple single-layer model.
    """
    ## DO NOT CHANGE IN FINAL SUBMISSION
    from Beras.activations import Softmax
    from Beras.layers import Dense
    from Beras.metrics import CategoricalAccuracy
    from Beras.optimizers import BasicOptimizer, RMSProp
    from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy
    # TODO: create a model and compile it with layers and functions of your choice
    model = SequentialModel([Dense(784, 10), Softmax()])
    model.compile(
        optimizer=RMSProp(0.02),
        loss_fn=CrossEntropyLoss(),
        acc_fn=CategoricalAccuracy(),
    )
    return SimpleNamespace(model=model, epochs=10, batch_size=100)
def get_advanced_model_components():
    from Beras.activations import Softmax, LeakyReLU
    from Beras.layers import Dense
    from Beras.metrics import CategoricalAccuracy
    from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy
    from Beras.optimizers import BasicOptimizer, RMSProp
    from Beras.batchnorm import BatchNorm
    """
    Returns a multi-layered model with more involved components.
    """
    # TODO: create/compile a model with layers and functions of your choice.
    model = SequentialModel([Dense(784, 398), BatchNorm(398), LeakyReLU(0), Dense(398, 10), Softmax()])
    model.compile(
        optimizer=RMSProp(0.02),
        loss_fn=CrossEntropyLoss(),
        acc_fn=CategoricalAccuracy(),
    )
    return SimpleNamespace(model=model, epochs=12, batch_size=100)
if __name__ == "__main__":
    """
    Read in MNIST data and initialize/train/test your model.
    """
    from Beras.onehot import OneHotEncoder
    import preprocess
    ## Read in MNIST data,
    train_inputs, train_labels = preprocess.get_data_MNIST("train", "../data")
    test_inputs,  test_labels  = preprocess.get_data_MNIST("test",  "../data")
    ## TODO: Use the OneHotEncoder class to one hot encode the labels
    # ohe = lambda x: 0  ## placeholder function: returns zero for a given input
    ohe = OneHotEncoder()
    ohe.fit(train_labels)
    ## Get your model to train and test
    simple = False
    args = get_simple_model_components() if simple else get_advanced_model_components()
    model = args.model
    ## REMINDER: Threshold of accuracy: 
    ##  1470: >85% on testing accuracy from get_simple_model_components
    ##  2470: >95% on testing accuracy from get_advanced_model_components
    # TODO: Fit your model to the training input and the one hot encoded labels
    # Remember to pass all the arguments that SequentialModel.fit() requires
    # such as number of epochs and the batch size
    print('---------------------------[[[Train]]]]---------------------------')
    train_agg_metrics = model.fit(
        train_inputs, 
        ohe(train_labels), 
        epochs     = args.epochs, 
        batch_size = args.batch_size
    )
    print('-------------------------------------------------------------------')
    ## Feel free to use the visualize_metrics function to view your accuracy and loss.
    ## The final accuracy returned during evaluation must be > 80%.
    # from visualize import visualize_images, visualize_metrics
    # visualize_metrics(train_agg_metrics["loss"], train_agg_metrics["acc"])
    # visualize_images(model, train_inputs, ohe(tr  ain_labels))
    ## TODO: Evaluate your model using your testing inputs and one hot encoded labels.
    ## This is the number you will be using!
    print('---------------------------[[[Evaluate]]]---------------------------')
    test_agg_metrics = model.evaluate(test_inputs, ohe(test_labels), batch_size=100)
    print('Testing Performance:', test_agg_metrics)
    print('-----------------------------------------------------------------')


自认为算是一次我做的勉强合格(不够好的意思)的作业,提供的答案也仅供参考,祝大家玩的开心!


相关文章
|
6天前
|
机器学习/深度学习 数据采集 自然语言处理
理解并应用机器学习算法:神经网络深度解析
【5月更文挑战第15天】本文深入解析了神经网络的基本原理和关键组成,包括神经元、层、权重、偏置及损失函数。介绍了神经网络在图像识别、NLP等领域的应用,并涵盖了从数据预处理、选择网络结构到训练与评估的实践流程。理解并掌握这些知识,有助于更好地运用神经网络解决实际问题。随着技术发展,神经网络未来潜力无限。
|
6天前
|
SQL 安全 网络安全
构筑网络长城:网络安全漏洞解析与防御策略
【4月更文挑战第30天】 在数字化时代,网络安全已成为维护信息完整性、确保数据流通安全和保障用户隐私的关键。本文将深入探讨网络安全的核心问题——安全漏洞,并分享关于加密技术的最新进展以及提升个人和企业安全意识的有效方法。通过对常见网络威胁的剖析,我们旨在提供一套综合性的网络防御策略,以助力读者构建更为坚固的信息安全防线。
|
3天前
|
域名解析 网络协议 网络性能优化
如何提升自建DNS服务下的网络体验
网络质量和网络体验是通信过程中的两个不同层面,质量涉及设备上下行表现,而体验关乎端到端通信效果。衡量质量常用带宽、延迟、丢包率等指标;体验则关注可访问性,DNS解析速度和服务位置等。现代路由器能自动调整网络质量,普通用户无需过多干预。自建DNS服务时,选择权威DNS能解决可访问性,但可能不提供最优体验。AdguardHome和Clash等工具能进一步优化DNS解析,提升网络体验。
27 6
如何提升自建DNS服务下的网络体验
|
4天前
|
Linux 网络安全
CentOS系统openssh-9,网络安全大厂面试真题解析大全
CentOS系统openssh-9,网络安全大厂面试真题解析大全
|
4天前
|
Linux 网络安全 Windows
网络安全笔记-day8,DHCP部署_dhcp搭建部署,源码解析
网络安全笔记-day8,DHCP部署_dhcp搭建部署,源码解析
|
4天前
|
运维 网络协议 Linux
Docker网络_docker 网络,来看看这份超全面的《Linux运维面试题及解析》
Docker网络_docker 网络,来看看这份超全面的《Linux运维面试题及解析》
|
6天前
|
机器学习/深度学习 算法 Go
YOLOv5网络结构解析
YOLOv5网络结构解析
|
6天前
|
机器学习/深度学习 存储 算法
卷积神经网络(CNN)的数学原理解析
卷积神经网络(CNN)的数学原理解析
36 1
卷积神经网络(CNN)的数学原理解析
|
6天前
|
安全 算法 网络安全
构筑网络长城:网络安全漏洞解析与防御策略深入理解操作系统:进程管理与调度策略
【4月更文挑战第30天】 在数字化时代,网络安全已成为维护信息完整性、确保数据流通安全和保障用户隐私的关键。本文将深入探讨网络安全的核心问题——安全漏洞,并分享关于加密技术的最新进展以及提升个人和企业安全意识的有效方法。通过对常见网络威胁的剖析,我们旨在提供一套综合性的网络防御策略,以助力读者构建更为坚固的信息安全防线。 【4月更文挑战第30天】 在现代操作系统的核心,进程管理是维持多任务环境稳定的关键。本文将深入探讨操作系统中的进程概念、进程状态转换及进程调度策略。通过分析不同的调度算法,我们将了解操作系统如何平衡各进程的执行,确保系统资源的高效利用和响应时间的最优化。文中不仅剖析了先来先
|
6天前
|
机器学习/深度学习 算法 PyTorch
python手把手搭建图像多分类神经网络-代码教程(手动搭建残差网络、mobileNET)
python手把手搭建图像多分类神经网络-代码教程(手动搭建残差网络、mobileNET)
56 0

推荐镜像

更多