【Multi-NN】解析参考:Numpy手写的多层神经网络

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 【Multi-NN】解析参考:Numpy手写的多层神经网络

前言


       由于原作业需要实现的函数过多,本文先不打算从算法原理讲解,而是直接贴上结果代码,提供大家参考,该实验按照此文章的标准来构建:

image.png


提供成品代码文件


文件获取:


链接:https://pan.baidu.com/s/1Fw_7thL5PxR79zI6XbpnYQ

提取码:txqe


文件结构:


| - hw2 
        | - code
                | - Beras
                        | -  8个.py文件用于实现实验要求函数
                | - assignment.py
                | - preprocess.py
                | - visualize.py
        | - data
                | - mnist
                        | - 四个数据集文件
                | - Iris (可以忽略,不在本实验中使用)


1. 预处理的数据


       该文件为实验自带,主要实现功能为:从../data/mnist/中的4个.gz文件中读取到mnist数据集的分别用于Tran和Test训练集和测试集(2*2=四个)。


preprocess.py


import gzip
import pickle
from unicodedata import numeric
import numpy as np
"""
TODO: 
Same as HW1. Feel free to copy and paste your old implementation here.
It's a good time to vectorize it, while you're at it!
No need to include CIFAR-specific methods.
"""
def get_data_MNIST(subset, data_path="../data", is_reshape=True):
    """
    :param subset: string indicating whether we want the training or testing data 
        (only accepted values are 'train' and 'test')
    :param data_path: directory containing the training and testing inputs and labels
    :return: NumPy array of inputs (float32) and labels (uint8)
    """
    ## http://yann.lecun.com/exdb/mnist/
    subset = subset.lower().strip()
    assert subset in ("test", "train"), f"unknown data subset {subset} requested"
    inputs_file_path, labels_file_path, num_examples = {
        "train": ("train-images-idx3-ubyte.gz", "train-labels-idx1-ubyte.gz", 60000),
        "test": ("t10k-images-idx3-ubyte.gz", "t10k-labels-idx1-ubyte.gz", 10000),
    }[subset]
    inputs_file_path = f"{data_path}/mnist/{inputs_file_path}"
    labels_file_path = f"{data_path}/mnist/{labels_file_path}"
    ## TODO: read the image file and normalize, flatten, and type-convert image
    with open(inputs_file_path, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream:
        buf = bytestream.read(num_examples*28*28 + 16)
        dt = np.dtype(np.uint8)
        temp = np.frombuffer(buf, dtype=dt) 
        image = temp[16:]
        if is_reshape:
            image = image.reshape((num_examples,28*28))
        else:
            image = image.reshape((num_examples, 28, 28, 1))
        image = image/255.0
    print(image.shape)
    ## TODO: read the label file
    with open(labels_file_path, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream:
        buf = bytestream.read(num_examples + 8)
        dt = np.dtype(np.uint8)
        temp = np.frombuffer(buf, dtype=dt) 
        label = temp[8:]
    return image, label
## THE REST ARE OPTIONAL!
'''
def shuffle_data(image_full, label_full, seed):
    pass
def get_subset(image_full, label_full, class_list=list(range(10)), num=100):
    pass
'''


2、独热编码


       此文件用于实现独热编码,需要手写的地方如下:


● fit(): [TODO]在这个函数中,你需要在Data(将其存储在self.uniq中)并创建一个以标签作为键的字典和它们对应的一个热编码作为值。提示:你可能想这么做查看np.eye()以获得单热编码。最终,您将存储它在self.uniq2oh字典。


● forward():在这个函数中,我们传递一个向量,包含对象中所有实际的标签训练集并调用fit()来用unique填充uniq2oh字典标签及其对应的one-hot编码,然后使用它返回一个针对训练集中每个标签的单热编码标签数组。


这个函数已经为您填好了!


●inverse():在函数中,我们将one-hot编码反转为实际编码标签。


这已经为你做过了。


例如,如果我们有标签X和Y,其单热编码为[1,0]和[0,1],我们将{X: [1,0], Y:[0,1]}。


对于MNIST,你将有10个标签,所以你的字典应该有10个条目!


onehot.py


import numpy as np
from .core import Callable
class OneHotEncoder(Callable):
    """
    One-Hot Encodes labels. First takes in a candidate set to figure out what elements it
    needs to consider, and then one-hot encodes subsequent input datasets in the
    forward pass.
    SIMPLIFICATIONS:
     - Implementation assumes that entries are individual elements.
     - Forward will call fit if it hasn't been done yet; most implementations will just error.
     - keras does not have OneHotEncoder; has LabelEncoder, CategoricalEncoder, and to_categorical()
    """
    def fit(self, data):
        """
        Fits the one-hot encoder to a candidate dataset. Said dataset should contain
        all encounterable elements.
        :param data: 1D array containing labels.
            For example, data = [0, 1, 3, 3, 1, 9, ...]
        """
        ## TODO: Fetch all the unique labels and create a dictionary with
        ## the unique labels as keys and their one hot encodings as values
        ## HINT: look up np.eye() and see if you can utilize it!
        ## HINT: Wouldn't it be nice if we just gave you the implementation somewhere...
        self.uniq = np.unique(data)  # all the unique labels from `data`
        self.uniq2oh = {}  # a lookup dictionary with labels and corresponding encodings
        eye = np.eye(len(self.uniq))
        for i in range(len(self.uniq)):
            self.uniq2oh[self.uniq[i]] = eye[i]
    def forward(self, data):
        if not hasattr(self, "uniq2oh"):
            self.fit(data)
        return np.array([self.uniq2oh[x] for x in data])
    def inverse(self, data):
        assert hasattr(self, "uniq"), \
            "forward() or fit() must be called before attempting to invert"
        return np.array([self.uniq[x == 1][0] for x in data])


3、核心抽象


       本文件为实验给定代码,无需做出修改,


core.py


from abc import ABC, abstractmethod  # # For abstract method support
from typing import Tuple
import numpy as np
## DO NOT MODIFY THIS CLASS
class Callable(ABC):
    """
    Callable Sub-classes:
     - CategoricalAccuracy (./metrics.py)       - TODO
     - OneHotEncoder       (./preprocess.py)    - TODO
     - Diffable            (.)                  - DONE
    """
    def __call__(self, *args, **kwargs) -> np.array:
        """Lets `self()` and `self.forward()` be the same"""
        return self.forward(*args, **kwargs)
    @abstractmethod
    def forward(self, *args, **kwargs) -> np.array:
        """Pass inputs through function. Can store inputs and outputs as instance variables"""
        pass
## DO NOT MODIFY THIS CLASS
class Diffable(Callable):
    """
    Diffable Sub-classes:
     - Dense            (./layers.py)           - TODO
     - LeakyReLU, ReLU  (./activations.py)      - TODO
     - Softmax          (./activations.py)      - TODO
     - MeanSquaredError (./losses.py)           - TODO
    """
    """Stores whether the operation being used is inside a gradient tape scope"""
    gradient_tape = None  ## All-instance-shared variable
    def __init__(self):
        """Is the layer trainable"""
        super().__init__()
        self.trainable = True  ## self-only instance variable
    def __call__(self, *args, **kwargs) -> np.array:
        """
        If there is a gradient tape scope in effect, perform AND RECORD the operation.
        Otherwise... just perform the operation and don't let the gradient tape know.
        """
        if Diffable.gradient_tape is not None:
            Diffable.gradient_tape.operations += [self]
        return self.forward(*args, **kwargs)
    @abstractmethod
    def input_gradients(self: np.array) -> np.array:
        """Returns gradient for input (this part gets specified for all diffables)"""
        pass
    def weight_gradients(self: np.array) -> Tuple[np.array, np.array]:
        """Returns gradient for weights (this part gets specified for SOME diffables)"""
        return ()
    def compose_to_input(self, J: np.array) -> np.array:
        """
        Compose the inputted cumulative jacobian with the input jacobian for the layer.
        Implemented with batch-level vectorization.
        Requires `input_gradients` to provide either batched or overall jacobian.
        Assumes input/cumulative jacobians are matrix multiplied
        """
        #  print(f"Composing to input in {self.__class__.__name__}")
        ig = self.input_gradients()
        batch_size = J.shape[0]
        n_out, n_in = ig.shape[-2:]
        j_new = np.zeros((batch_size, n_out), dtype=ig.dtype)
        for b in range(batch_size):
            ig_b = ig[b] if len(ig.shape) == 3 else ig
            j_new[b] = ig_b @ J[b]
        return j_new
    def compose_to_weight(self, J: np.array) -> list:
        """
        Compose the inputted cumulative jacobian with the weight jacobian for the layer.
        Implemented with batch-level vectorization.
        Requires `weight_gradients` to provide either batched or overall jacobian.
        Assumes weight/cumulative jacobians are element-wise multiplied (w/ broadcasting)
        and the resulting per-batch statistics are averaged together for avg per-param gradient.
        """
        # print(f'Composing to weight in {self.__class__.__name__}')
        assert hasattr(
            self, "weights"
        ), f"Layer {self.__class__.__name__} cannot compose along weight path"
        J_out = []
        ## For every weight/weight-gradient pair...
        for w, wg in zip(self.weights, self.weight_gradients()):
            batch_size = J.shape[0]
            ## Make a cumulative jacobian which will contribute to the final jacobian
            j_new = np.zeros((batch_size, *w.shape), dtype=wg.dtype)
            ## For every element in the batch (for a single batch-level gradient updates)
            for b in range(batch_size):
                ## If the weight gradient is a batch of transform matrices, get the right entry.
                ## Allows gradient methods to give either batched or non-batched matrices
                wg_b = wg[b] if len(wg.shape) == 3 else wg
                ## Update the batch's Jacobian update contribution
                j_new[b] = wg_b * J[b]
            ## The final jacobian for this weight is the average gradient update for the batch
            J_out += [np.mean(j_new, axis=0)]
        ## After new jacobian is computed for each weight set, return the list of gradient updatates
        return J_out
class GradientTape:
    def __init__(self):
        ## Log of operations that were performed inside tape scope
        self.operations = []
    def __enter__(self):
        # When tape scope is entered, let Diffable start recording to self.operation
        Diffable.gradient_tape = self
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        # When tape scope is exited, stop letting Diffable record
        Diffable.gradient_tape = None
    def gradient(self) -> list:
        """Get the gradient from first to last recorded operation"""
        ## TODO:
        ##
        ##  Compute weight gradients for all operations.
        ##  If the model has trainable weights [w1, b1, w2, b2] and ends at a loss L.
        ##  the model should return: [dL/dw1, dL/db1, dL/dw2, dL/db2]
        ##
        ##  Recall that self.operations is populated by Diffable class instances...
        ##
        ##  Start from the last operation and compute jacobian w.r.t input.
        ##  Continue to propagate the cumulative jacobian through the layer inputs
        ##  until all operations have been differentiated through.
        ##
        ##  If an operation that has weights is encountered along the way,
        ##  compute the weight gradients and add them to the return list.
        ##  Remember to check if the layer is trainable before doing this though...
        grads = []
        return grads


4、网络层


       此层仿造Keras中的Dense,需要手写函数为:


● forward() : [TODO] 实现向前传递和返回输出。


● weight_gradients() : [TODO] 计算关于的梯度权重和偏差。这将用于优化图层。


● input_gradients() : [TODO] 计算关于的梯度层的输入。这将用于将渐变传播到前面的层。


● _initialize_weight() : [TODO]


初始化致密层的权重值默认情况下,将所有权重初始化为零(顺便说一下,这通常是个坏主意)。你也需要允许更复杂的选项(当初始化式为设置为normal, xavier和kaiing)。遵循Keras的数学假设!


〇 Normal:不言自明,单位正态分布。


〇 Xavier Normal:基于keras.GlorotNormal。


〇 Kaiing He Normal:基于Keras.HeNormal。


在实现这些时,你可能会发现np.random.normal很有帮助。的行动计划说明为什么这些不同的初始化方法是必要的,但是欲了解更多细节,请查看这个网站!请随意添加更多初始化器选项!


.layers


import numpy as np
from .core import Diffable
class Dense(Diffable):
    # https://towardsdatascience.com/weight-initialization-in-neural-networks-a-journey-from-the-basics-to-kaiming-954fb9b47c79
    def __init__(self, input_size, output_size, learning_rate=0.01, initializer="kaiming"):
        super().__init__()
        self.w, self.b = self.__class__._initialize_weight(
            initializer, input_size, output_size
        )
        self.weights = [self.w, self.b]
        self.learning_rate = learning_rate
        self.inputs  = None
        self.outputs = None
    def forward(self, inputs):
        """Forward pass for a dense layer! Refer to lecture slides for how this is computed."""
        self.inputs = inputs
        # TODO: implement the forward pass and return the outputs
        self.outputs = np.matmul(inputs, self.w) + self.b
        return self.outputs
    def weight_gradients(self, eta):
        """Calculating the gradients wrt weights and biases!"""
        # TODO: Implement calculation of gradients
        wgrads = np.dot(self.inputs.T, eta)
        bgrads = np.sum(eta, axis=0)
        return wgrads, bgrads
    def input_gradients(self, eta):
        """Calculating the gradients wrt inputs!"""
        # TODO: Implement calculation of gradients
        inputgrads = np.dot(eta, self.w.T)
        wgrads, bgrads = self.weight_gradients(eta)
        self.w = self.w - self.learning_rate*wgrads
        self.b = self.b - self.learning_rate*bgrads
        return inputgrads
    @staticmethod
    def _initialize_weight(initializer, input_size, output_size):
        """
        Initializes the values of the weights and biases. The bias weights should always start at zero.
        However, the weights should follow the given distribution defined by the initializer parameter
        (zero, normal, xavier, or kaiming). You can do this with an if statement
        cycling through each option!
        Details on each weight initialization option:
            - Zero: Weights and biases contain only 0's. Generally a bad idea since the gradient update
            will be the same for each weight so all weights will have the same values.
            - Normal: Weights are initialized according to a normal distribution.
            - Xavier: Goal is to initialize the weights so that the variance of the activations are the
            same across every layer. This helps to prevent exploding or vanishing gradients. Typically
            works better for layers with tanh or sigmoid activation.
            - Kaiming: Similar purpose as Xavier initialization. Typically works better for layers
            with ReLU activation.
        """
        initializer = initializer.lower()
        assert initializer in (
            "zero",
            "normal",
            "xavier",
            "kaiming",
        ), f"Unknown dense weight initialization strategy '{initializer}' requested"
        io_size = (input_size, output_size)
        # TODO: Implement default assumption: zero-init for weights and bias
        initial_b = np.zeros((1,output_size))
        if initializer=="zero":
            initial_w = np.zeros(io_size)
        # TODO: Implement remaining options (normal, xavier, kaiming initializations). Note that
        # strings must be exactly as written in the assert above
        elif initializer=="normal":
            initial_w = np.random.randn(input_size, output_size)
        elif initializer=="xavier":
            initial_w = np.random.randn(input_size, output_size) * np.sqrt(1 / output_size)
        elif initializer=="kaiming":
            initial_w = np.random.randn(input_size, output_size) * np.sqrt(2 / output_size)
        return initial_w, initial_b


5、激活函数


       该文件用于实现LeakRelu激活函数和SoftMax激活函数,手写了他们的前向传播[def forward]和反向传播[def input_fradients]:


● LeakyReLU ()


       〇 forward() : [TODO]给定输入x,计算并返回LeakyReLU(x)。


       〇 input_gradients() : [TODO]计算并返回与通过对LeakyReLU求导得到输入。


● Softmax():(2470 ONLY)


       〇 forward(): [TODO]给定输入x,计算并返回Softmax(x)。确保使用的是稳定的softmax,即减去所有项的最大值防止溢出/undvim erflow问题。


       〇 input_gradients(): [TODO] Softmax()的部分w.r.t输入。


.activations.py


import numpy as np
from .core import Diffable
class LeakyReLU(Diffable):
    def __init__(self, alpha=0.3):
        super().__init__()
        self.alpha = alpha
        self.inputs = None
        self.outputs = None
    def forward(self, inputs):
        # TODO: Given an input array `x`, compute LeakyReLU(x)
        self.inputs = inputs
        # Your code here:
        self.outputs = inputs if inputs.all()>=0 else inputs*self.alpha
        return self.outputs
    def input_gradients(self, eta):
        # TODO: Compute and return the gradients
        eta[self.inputs<=0] = 0
        return eta
    def compose_to_input(self, J):
        # TODO: Maybe you'll want to override the default?
        return super().compose_to_input(J)
class ReLU(LeakyReLU):
    def __init__(self):
        super().__init__(alpha=0)
class Softmax(Diffable):
    def __init__(self):
        super().__init__()
        self.inputs = None
        self.outputs = None
    def forward(self, inputs):
        """Softmax forward pass!"""
        # TODO: Implement
        # HINT: Use stable softmax, which subtracts maximum from
        # all entries to prevent overflow/underflow issues
        self.inputs = inputs
        # Your code here:
        z = inputs - np.max(inputs, axis=-1,keepdims=True)
        numerator = np.exp(z)
        denominator = np.sum(numerator)
        self.outputs = numerator/denominator
        return self.outputs
    def input_gradients(self, etc):
        """Softmax backprop!"""
        # TODO: Compute and return the gradients
        return etc


☆6、填充函数


       本文用于手写Keras中的序列模型SequentialModel类,SequentialModel继承Model类,从而我们先实现Model类具体内容如下:


● compile() : 初始化模型优化器,损失函数和精度函数,它们作为参数输入,供SequentialModel实例使用。


● fit() : 训练模型将输入和输出关联起来。重复训练每个时代,数据是基于参数的批处理。它还计算Batch_metrics、epoch_metrics和聚合的agg_metrics可以用来跟踪模型的训练进度。


● evaluate() : [TODO] 评估最终模型的性能使用测试阶段中提到的指标。它几乎和符合()函数;想想培训和测试之间会发生什么变化)。


● call() : [TODO] 提示:调用顺序模型意味着什么?还记得顺序模型是一堆层,每一层只有一个输入向量和一个输出向量。你可以在在assignment.py中的SequentialModel类。


● batch_step() : [TODO] 您将看到fit()为每一个都调用了这个函数批处理。您将首先计算输入批处理的模型预测。在训练阶段,你需要计算梯度和更新你的权重根据您正在使用的优化器。对于训练过程中的反向传播,你将使用GradientTape从核心抽象(core.py)来记录操作和中间值。然后您将使用模型的优化器来将梯度应用到模型的可训练变量上。最后,计算和返回该批次的损耗和精度。你可以在在assignment.py中的SequentialModel类。


model.py


from abc import ABC, abstractmethod
from collections import defaultdict
import numpy as np
from .core import Diffable
def print_stats(stat_dict, b=None, b_num=None, e=None, avg=False):
    """
    Given a dictionary of names statistics and batch/epoch info,
    print them in an appealing manner. If avg, display stat averages.
    """
    title_str = " - "
    if e is not None:
        title_str += f"Epoch {e+1:2}: "
    if b is not None:
        title_str += f"Batch {b+1:3}"
        if b_num is not None:
            title_str += f"/{b_num}"
    if avg:
        title_str += f"Average Stats"
    print(f"\r{title_str} : ", end="")
    op = np.mean if avg else lambda x: x
    print({k: np.round(op(v), 4) for k, v in stat_dict.items()}, end="")
    print("   ", end="" if not avg else "\n")
def update_metric_dict(super_dict, sub_dict):
    """
    Appends the average of the sub_dict metrics to the super_dict's metric list
    """
    for k, v in sub_dict.items():
        super_dict[k] += [np.mean(v)]
class Model(ABC):
    ###############################################################################################
    ## BEGIN GIVEN
    def __init__(self, layers):
        """
        Initialize all trainable parameters and take layers as inputs
        """
        # Initialize all trainable parameters
        assert all([issubclass(layer.__class__, Diffable) for layer in layers])
        self.layers = layers[:-1]
        self.trainable_variables = []
        for layer in layers:
            if hasattr(layer, "weights") and layer.trainable:
                for weight in layer.weights:
                    self.trainable_variables += [weight]
    def compile(self, optimizer, loss_fn, acc_fn):
        """
        "Compile" the model by taking in the optimizers, loss, and accuracy functions.
        In more optimized DL implementations, this will have more involved processes
        that make the components extremely efficient but very inflexible.
        """
        self.optimizer = optimizer
        self.compiled_loss = loss_fn
        self.compiled_acc = acc_fn
    def fit(self, x, y, epochs, batch_size):
        """
        Trains the model by iterating over the input dataset and feeding input batches
        into the batch_step method with training. At the end, the metrics are returned.
        """
        agg_metrics = defaultdict(lambda: [])
        batch_num = x.shape[0] // batch_size
        for e in range(epochs):
            epoch_metrics = defaultdict(lambda: [])
            for b, b1 in enumerate(range(batch_size, x.shape[0] + 1, batch_size)):
                b0 = b1 - batch_size
                batch_metrics = self.batch_step(x[b0:b1], y[b0:b1], training=True)
                update_metric_dict(epoch_metrics, batch_metrics)
                print_stats(batch_metrics, b, batch_num, e)
            update_metric_dict(agg_metrics, epoch_metrics)
            print_stats(epoch_metrics, e=e, avg=True)
        return agg_metrics
    def evaluate(self, x, y, batch_size):
        """
        X is the dataset inputs, Y is the dataset labels.
        Evaluates the model by iterating over the input dataset in batches and feeding input batches
        into the batch_step method. At the end, the metrics are returned. Should be called on
        the testing set to evaluate accuracy of the model using the metrics output from the fit method.
        NOTE: This method is almost identical to fit (think about how training and testing differ --
        the core logic should be the same)
        """
        # TODO: Implement evaluate similarly to fit.
        agg_metrics = defaultdict(lambda: [])
        batch_num = x.shape[0] // batch_size
        for e in range(1):
            epoch_metrics = defaultdict(lambda: [])
            for b, b1 in enumerate(range(batch_size, x.shape[0] + 1, batch_size)):
                b0 = b1 - batch_size
                batch_metrics = self.batch_step(x[b0:b1], y[b0:b1], training=False)
                update_metric_dict(epoch_metrics, batch_metrics)
                print_stats(batch_metrics, b, batch_num, e)
            update_metric_dict(agg_metrics, epoch_metrics)
            print_stats(epoch_metrics, e=e, avg=True)
        return agg_metrics
    @abstractmethod
    def call(self, inputs):
        """You will implement this in the SequentialModel class in assignment.py"""
        return
    @abstractmethod
    def batch_step(self, x, y, training=True):
        """You will implement this in the SequentialModel class in assignment.py"""
        return


☆def batch_step() 解析:


y_pre = self.call(x)

:通过前向传播得到网络传播一次后的预测值,


loss = self.compiled_loss.forward(y_pre, y)

:将预测值与真实值放入损失函数中通过前向传播得到损失值。


acc = self.compiled_acc(y_pre, y)

:将预测值与真实值放入精度函数中通过前向传播得到精度值。


各函数反向传播的意义:


激活函数:将神经网络上一层的输入,经过神经网络层的非线性变换转换后,通过激活函数,得到输出。常见的激活函数包括:sigmoid, tanh, relu等。


损失函数:度量神经网络的输出的预测值,与实际值之间的差距的一种方式。常见的损失函数包括:最小二乘损失函数、交叉熵损失函数、回归中使用的smooth L1损失函数等。


优化函数:也就是如何把损失值从神经网络的最外层传递到最前面。如最基础的梯度下降算法,随机梯度下降算法,批量梯度下降算法,带动量的梯度下降算法,Adagrad,Adadelta,Adam等。


损失函数


eta = self.compiled_loss.input_gradients()

:通过损失函数的反向传播得到梯度。


激活函数


for layer in self.layers[::-1]:

       eta = layer.input_gradients(eta)

:将梯度传播各个网络层进行反向传播。


优化函数


if training:

           self.optimizer.apply_gradients(self.trainable_variables[0], self.trainable_variables[1])

:将前向传播一次,反向传播一次之后更新的weights, bias放入优化器中,把损失值从神经网络的最外层传递到最前面。


☆ class SequentialModel in assgnment.py


class SequentialModel(Beras.Model):
    """
    Implemented in Beras/model.py
    def __init__(self, layers):
    def compile(self, optimizer, loss_fn, acc_fn):
    def fit(self, x, y, epochs, batch_size):
    def evaluate(self, x, y, batch_size):           ## <- TODO
    """
    def call(self, inputs):
        """
        Forward pass in sequential model. It's helpful to note that layers are initialized in Beras.Model, and
        you can refer to them with self.layers. You can call a layer by doing var = layer(input).
        """
        # TODO: The call function!
        for layer in self.layers:
            inputs = layer.forward(inputs)
        return inputs
    def batch_step(self, x, y, training=True):
        """
        Computes loss and accuracy for a batch. This step consists of both a forward and backward pass.
        If training=false, don't apply gradients to update the model! 
        Most of this method (forward, loss, applying gradients)
        will take place within the scope of Beras.GradientTape()
        """
        # TODO: Compute loss and accuracy for a batch.
        # If training, then also update the gradients according to the optimizer
        y_pre = self.call(x)
        loss = self.compiled_loss.forward(y_pre, y)
        acc = self.compiled_acc(y_pre, y)
        eta = self.compiled_loss.input_gradients()
        # backwarding...
        for layer in self.layers[::-1]:
            #print(type(layer))
            eta = layer.input_gradients(eta)
        if training:
            self.optimizer.apply_gradients(self.trainable_variables[0], self.trainable_variables[1])
        return {"loss": loss, "acc": acc}


7、损失函数


       这是模型训练中最关键的方面之一。在这次作业中,我没有按照实验中的描述那样子去实现MSE或者说均方误差损失函数,而是选择了CrossEntropyLoss损失函数。因为经过实验,其余两个损失函数效果都不太如人意。


注意:一般SoftMax的反向传播是与CrossEntropyLoss损失函数一起进行的,所以不填写SoftMax的方向传播部分。


● forward() : [TODO] 编写一个计算并返回平均值的函数给出预测和实际标签的平方误差。


提示:什么是MSE?在给出预测和实际标签的情况下,均方误差是预测值与实际值之间的差异。


● input_gradients() : [TODO] 计算并返回梯度。使用用微分法推导出这些梯度的公式。


losses.py


import numpy as np
from .core import Diffable
from abc import ABCMeta, abstractmethod
import numpy as np
class CrossEntropyLoss(Diffable):
    def __init__(self):
        self.classifier = Softmax()
    def input_gradients(self):
        return self.grad
    def forward(self, a, y):
        a = self.classifier.forward(a)
        self.grad = a - y
        loss = -1 * np.einsum('ij,ij->', y, np.log(a), optimize=True) / y.shape[0]
        return loss
class Layer(metaclass=ABCMeta):
    @abstractmethod
    def forward(self, *args):
        pass
    @abstractmethod
    def backward(self, *args):
        pass
class Softmax(Layer):
    def forward(self, x):
        v = np.exp(x - x.max(axis=-1, keepdims=True))    
        return v / v.sum(axis=-1, keepdims=True)
    def backward(self, eta):
        pass


8、优化函数


       对于Mnist数据集来讲,单单只是RMSProp :已经完全足够,所以本文只实现了这一个优化函数。


● RMSProp : [TODO] 误差传播的均方根。


.optimizer.py


from collections import defaultdict
import numpy as np
class RMSProp:
    def __init__(self, learning_rate, beta=0.9, epsilon=1e-6):
        self.learning_rate = learning_rate
        self.beta = beta
        self.epsilon = epsilon
        self.v = defaultdict(lambda: 0)
    def apply_gradients(self, weights, grads):
        # TODO: Implement RMSProp optimization
        # Refer to the lab on Optimizers for a better understanding!
        self.mean_square = self.v['mean_square']
        self.mean_square = self.beta*self.mean_square + (1-self.beta)*(grads)**2
        self.v['mean_square'] = self.mean_square
        weights = weights - self.learning_rate/(np.sqrt(self.mean_square) + self.epsilon)*grads
        return


9、精度指标


       本文件简单的实现了一个精度模型,用于测量模型精度:


● forward() : [TODO] 返回模型的分类精度预测概率和真标签。你应该返回的比例预测标签等于真实标签,其中图像的预测标签为与最高概率对应的标签。参考网络或讲座幻灯片的分类精度数学!


.metrics.py


import numpy as np
from .core import Callable
class CategoricalAccuracy(Callable):
    def forward(self, probs, labels):
        """Categorical accuracy forward pass!"""
        super().__init__()
        # TODO: Compute and return the categorical accuracy of your model given the output probabilities and true labels
        probsArg = np.argmax(probs, axis=1)
        labelsArg = np.argmax(labels, axis=1)
        return sum(probsArg==labelsArg)/len(labels)


10、训练和测试


       构建了两个模型,仿造Keras:


● get_simple_model()中的一个简单模型,最多只有一个扩散层(例如:density - ./layers.py)和一个激活函数(在/ activation.py)。虽然可以这样做,但默认情况下为您提供了这个选项。如果你愿意,可以改一下。自动评分器将评估原始的一个!


● get_advanced_model()中稍微复杂一点的模型,有两个或更多扩散层和两个或两个以上的激活函数。我们推荐使用Adam该模型的优化器具有相当低的学习率。


def  get_simple_model() in assgnment.py
def get_simple_model_components():
    """
    Returns a simple single-layer model.
    """
    ## DO NOT CHANGE IN FINAL SUBMISSION
    from Beras.activations import Softmax
    from Beras.layers import Dense
    from Beras.metrics import CategoricalAccuracy
    from Beras.optimizers import BasicOptimizer, RMSProp
    from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy
    # TODO: create a model and compile it with layers and functions of your choice
    model = SequentialModel([Dense(784, 10), Softmax()])
    model.compile(
        optimizer=RMSProp(0.02),
        loss_fn=CrossEntropyLoss(),
        acc_fn=CategoricalAccuracy(),
    )
    return SimpleNamespace(model=model, epochs=10, batch_size=100)
 get_advanced_model() in assgnment.py
def get_advanced_model_components():
    from Beras.activations import Softmax, LeakyReLU
    from Beras.layers import Dense
    from Beras.metrics import CategoricalAccuracy
    from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy
    from Beras.optimizers import BasicOptimizer, RMSProp
    from Beras.batchnorm import BatchNorm
    """
    Returns a multi-layered model with more involved components.
    """
    # TODO: create/compile a model with layers and functions of your choice.
    model = SequentialModel([Dense(784, 398), BatchNorm(398), LeakyReLU(0), Dense(398, 10), Softmax()])
    model.compile(
        optimizer=RMSProp(0.02),
        loss_fn=CrossEntropyLoss(),
        acc_fn=CategoricalAccuracy(),
    )
    return SimpleNamespace(model=model, epochs=12, batch_size=100)


11、可视化的结果


  我们为您提供了visualize_metrics方法来可视化您的损失和每次使用matplotlib后,精确度都会发生变化。


.visualize.py


import matplotlib.pyplot as plt
import numpy as np
def visualize_metrics(losses=[], accuracies=[]):
    """
    param losses: a 1D array of loss values
    param accuracies: a 1D array of accuracy values
    Displays a plot with loss and accuracy values on the y-axis and batch number/epoch number on the
    x-axis
    """
    if not losses or not accuracies:
        return print("Must provide a list of losses/accuracies to visualize")
    x = np.arange(1, max(len(losses), len(accuracies)) + 1)
    plt.plot(x, losses)
    plt.plot(x, accuracies)
    plt.ylabel("Loss/Acc Value")
    plt.show()
def visualize_images(model, train_inputs, train_labels_ohe, num_searching=500):
    """
    param model: a neural network model (i.e. SequentialModel)
    param train_inputs: sample training inputs for the model to predict
    param train_labels_ohe: one-hot encoded training labels corresponding to train_inputs
    Displays 10 sample outputs the model correctly classifies and 10 sample outputs the model
    incorrectly classifies
    """
    rand_idx = np.random.choice(len(train_inputs), num_searching)
    rand_batch = train_inputs[rand_idx]
    probs = model.call(rand_batch)
    pred_classes = np.argmax(probs, axis=1)
    true_classes = np.argmax(train_labels_ohe[rand_idx], axis=1)
    right_idx = np.where(pred_classes == true_classes)
    wrong_idx = np.where(pred_classes != true_classes)
    right = np.reshape(rand_batch[right_idx], (-1, 28, 28))
    wrong = np.reshape(rand_batch[wrong_idx], (-1, 28, 28))
    right_pred_labels = true_classes[right_idx]
    wrong_pred_labels = pred_classes[wrong_idx]
    assert len(right) >= 10, f"Found less than 10 correct predictions!"
    assert len(wrong) >= 10, f"Found less than 10 correct predictions!"
    fig, axs = plt.subplots(2, 10)
    fig.suptitle("Classigications\n(PL = Predicted Label)")
    subsets = [right, wrong]
    pred_labs = [right_pred_labels, wrong_pred_labels]
    for r in range(2):
        for c in range(10):
            axs[r, c].imshow(subsets[r][c], cmap="Greys")
            axs[r, c].set(title=f"PL: {pred_labs[r][c]}")
            plt.setp(axs[r, c].get_xticklabels(), visible=False)
            plt.setp(axs[r, c].get_yticklabels(), visible=False)
            axs[r, c].tick_params(axis="both", which="both", length=0)
    plt.show()


12、 调用前面11步写好的代码,对模型进行训练并且测试


.assignment.py


from types import SimpleNamespace
import Beras
import numpy as np
class SequentialModel(Beras.Model):
    """
    Implemented in Beras/model.py
    def __init__(self, layers):
    def compile(self, optimizer, loss_fn, acc_fn):
    def fit(self, x, y, epochs, batch_size):
    def evaluate(self, x, y, batch_size):           ## <- TODO
    """
    def call(self, inputs):
        """
        Forward pass in sequential model. It's helpful to note that layers are initialized in Beras.Model, and
        you can refer to them with self.layers. You can call a layer by doing var = layer(input).
        """
        # TODO: The call function!
        for layer in self.layers:
            inputs = layer.forward(inputs)
        return inputs
    def batch_step(self, x, y, training=True):
        """
        Computes loss and accuracy for a batch. This step consists of both a forward and backward pass.
        If training=false, don't apply gradients to update the model! 
        Most of this method (forward, loss, applying gradients)
        will take place within the scope of Beras.GradientTape()
        """
        # TODO: Compute loss and accuracy for a batch.
        # If training, then also update the gradients according to the optimizer
        y_pre = self.call(x)
        loss = self.compiled_loss.forward(y_pre, y)
        acc = self.compiled_acc(y_pre, y)
        eta = self.compiled_loss.input_gradients()
        # backwarding...
        for layer in self.layers[::-1]:
            #print(type(layer))
            eta = layer.input_gradients(eta)
        if training:
            self.optimizer.apply_gradients(self.trainable_variables[0], self.trainable_variables[1])
        return {"loss": loss, "acc": acc}
def get_simple_model_components():
    """
    Returns a simple single-layer model.
    """
    ## DO NOT CHANGE IN FINAL SUBMISSION
    from Beras.activations import Softmax
    from Beras.layers import Dense
    from Beras.metrics import CategoricalAccuracy
    from Beras.optimizers import BasicOptimizer, RMSProp
    from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy
    # TODO: create a model and compile it with layers and functions of your choice
    model = SequentialModel([Dense(784, 10), Softmax()])
    model.compile(
        optimizer=RMSProp(0.02),
        loss_fn=CrossEntropyLoss(),
        acc_fn=CategoricalAccuracy(),
    )
    return SimpleNamespace(model=model, epochs=10, batch_size=100)
def get_advanced_model_components():
    from Beras.activations import Softmax, LeakyReLU
    from Beras.layers import Dense
    from Beras.metrics import CategoricalAccuracy
    from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy
    from Beras.optimizers import BasicOptimizer, RMSProp
    from Beras.batchnorm import BatchNorm
    """
    Returns a multi-layered model with more involved components.
    """
    # TODO: create/compile a model with layers and functions of your choice.
    model = SequentialModel([Dense(784, 398), BatchNorm(398), LeakyReLU(0), Dense(398, 10), Softmax()])
    model.compile(
        optimizer=RMSProp(0.02),
        loss_fn=CrossEntropyLoss(),
        acc_fn=CategoricalAccuracy(),
    )
    return SimpleNamespace(model=model, epochs=12, batch_size=100)
if __name__ == "__main__":
    """
    Read in MNIST data and initialize/train/test your model.
    """
    from Beras.onehot import OneHotEncoder
    import preprocess
    ## Read in MNIST data,
    train_inputs, train_labels = preprocess.get_data_MNIST("train", "../data")
    test_inputs,  test_labels  = preprocess.get_data_MNIST("test",  "../data")
    ## TODO: Use the OneHotEncoder class to one hot encode the labels
    # ohe = lambda x: 0  ## placeholder function: returns zero for a given input
    ohe = OneHotEncoder()
    ohe.fit(train_labels)
    ## Get your model to train and test
    simple = False
    args = get_simple_model_components() if simple else get_advanced_model_components()
    model = args.model
    ## REMINDER: Threshold of accuracy: 
    ##  1470: >85% on testing accuracy from get_simple_model_components
    ##  2470: >95% on testing accuracy from get_advanced_model_components
    # TODO: Fit your model to the training input and the one hot encoded labels
    # Remember to pass all the arguments that SequentialModel.fit() requires
    # such as number of epochs and the batch size
    print('---------------------------[[[Train]]]]---------------------------')
    train_agg_metrics = model.fit(
        train_inputs, 
        ohe(train_labels), 
        epochs     = args.epochs, 
        batch_size = args.batch_size
    )
    print('-------------------------------------------------------------------')
    ## Feel free to use the visualize_metrics function to view your accuracy and loss.
    ## The final accuracy returned during evaluation must be > 80%.
    # from visualize import visualize_images, visualize_metrics
    # visualize_metrics(train_agg_metrics["loss"], train_agg_metrics["acc"])
    # visualize_images(model, train_inputs, ohe(tr  ain_labels))
    ## TODO: Evaluate your model using your testing inputs and one hot encoded labels.
    ## This is the number you will be using!
    print('---------------------------[[[Evaluate]]]---------------------------')
    test_agg_metrics = model.evaluate(test_inputs, ohe(test_labels), batch_size=100)
    print('Testing Performance:', test_agg_metrics)
    print('-----------------------------------------------------------------')


自认为算是一次我做的勉强合格(不够好的意思)的作业,提供的答案也仅供参考,祝大家玩的开心!


相关文章
|
20天前
|
机器学习/深度学习 人工智能 算法
深入解析图神经网络:Graph Transformer的算法基础与工程实践
Graph Transformer是一种结合了Transformer自注意力机制与图神经网络(GNNs)特点的神经网络模型,专为处理图结构数据而设计。它通过改进的数据表示方法、自注意力机制、拉普拉斯位置编码、消息传递与聚合机制等核心技术,实现了对图中节点间关系信息的高效处理及长程依赖关系的捕捉,显著提升了图相关任务的性能。本文详细解析了Graph Transformer的技术原理、实现细节及应用场景,并通过图书推荐系统的实例,展示了其在实际问题解决中的强大能力。
115 30
|
3天前
|
网络协议
TCP报文格式全解析:网络小白变高手的必读指南
本文深入解析TCP报文格式,涵盖源端口、目的端口、序号、确认序号、首部长度、标志字段、窗口大小、检验和、紧急指针及选项字段。每个字段的作用和意义详尽说明,帮助理解TCP协议如何确保可靠的数据传输,是互联网通信的基石。通过学习这些内容,读者可以更好地掌握TCP的工作原理及其在网络中的应用。
|
3天前
|
存储 监控 网络协议
一次读懂网络分层:应用层到物理层全解析
网络模型分为五层结构,从应用层到物理层逐层解析。应用层提供HTTP、SMTP、DNS等常见协议;传输层通过TCP和UDP确保数据可靠或高效传输;网络层利用IP和路由器实现跨网数据包路由;数据链路层通过MAC地址管理局域网设备;物理层负责比特流的物理传输。各层协同工作,使网络通信得以实现。
|
3天前
|
网络协议 安全 网络安全
探索网络模型与协议:从OSI到HTTPs的原理解析
OSI七层网络模型和TCP/IP四层模型是理解和设计计算机网络的框架。OSI模型包括物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,而TCP/IP模型则简化为链路层、网络层、传输层和 HTTPS协议基于HTTP并通过TLS/SSL加密数据,确保安全传输。其连接过程涉及TCP三次握手、SSL证书验证、对称密钥交换等步骤,以保障通信的安全性和完整性。数字信封技术使用非对称加密和数字证书确保数据的机密性和身份认证。 浏览器通过Https访问网站的过程包括输入网址、DNS解析、建立TCP连接、发送HTTPS请求、接收响应、验证证书和解析网页内容等步骤,确保用户与服务器之间的安全通信。
27 1
|
8天前
|
域名解析 弹性计算 安全
阿里云服务器租用、注册域名、备案及域名解析完整流程参考(图文教程)
对于很多初次建站的用户来说,选购云服务器和注册应及备案和域名解析步骤必须了解的,目前轻量云服务器2核2G68元一年,2核4G4M服务器298元一年,域名注册方面,阿里云推出域名1元购买活动,新用户注册com和cn域名2年首年仅需0元,xyz和top等域名首年仅需1元。对于建站的用户来说,购买完云服务器并注册好域名之后,下一步还需要操作备案和域名绑定。本文为大家展示阿里云服务器的购买流程,域名注册、绑定以及备案的完整流程,全文以图文教程形式为大家展示具体细节及注意事项,以供新手用户参考。
|
24天前
|
SQL 安全 算法
网络安全之盾:漏洞防御与加密技术解析
在数字时代的浪潮中,网络安全和信息安全成为维护个人隐私和企业资产的重要防线。本文将深入探讨网络安全的薄弱环节—漏洞,并分析如何通过加密技术来加固这道防线。文章还将分享提升安全意识的重要性,以预防潜在的网络威胁,确保数据的安全与隐私。
48 2
|
26天前
|
安全 算法 网络安全
网络安全的盾牌与剑:漏洞防御与加密技术深度解析
在数字信息的海洋中,网络安全是航行者不可或缺的指南针。本文将深入探讨网络安全的两大支柱——漏洞防御和加密技术,揭示它们如何共同构筑起信息时代的安全屏障。从最新的网络攻击手段到防御策略,再到加密技术的奥秘,我们将一起揭开网络安全的神秘面纱,理解其背后的科学原理,并掌握保护个人和企业数据的关键技能。
32 3
|
28天前
|
网络协议
网络通信的基石:TCP/IP协议栈的层次结构解析
在现代网络通信中,TCP/IP协议栈是构建互联网的基础。它定义了数据如何在网络中传输,以及如何确保数据的完整性和可靠性。本文将深入探讨TCP/IP协议栈的层次结构,揭示每一层的功能和重要性。
58 5
|
28天前
|
监控 网络协议 网络性能优化
网络通信的核心选择:TCP与UDP协议深度解析
在网络通信领域,TCP(传输控制协议)和UDP(用户数据报协议)是两种基础且截然不同的传输层协议。它们各自的特点和适用场景对于网络工程师和开发者来说至关重要。本文将深入探讨TCP和UDP的核心区别,并分析它们在实际应用中的选择依据。
56 3
|
3月前
|
机器学习/深度学习 数据处理 Python
从NumPy到Pandas:轻松转换Python数值库与数据处理利器
从NumPy到Pandas:轻松转换Python数值库与数据处理利器
110 0

推荐镜像

更多