【Multi-NN】解析参考:Numpy手写的多层神经网络

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 【Multi-NN】解析参考:Numpy手写的多层神经网络

前言


       由于原作业需要实现的函数过多,本文先不打算从算法原理讲解,而是直接贴上结果代码,提供大家参考,该实验按照此文章的标准来构建:

image.png


提供成品代码文件


文件获取:


链接:https://pan.baidu.com/s/1Fw_7thL5PxR79zI6XbpnYQ

提取码:txqe


文件结构:


| - hw2 
        | - code
                | - Beras
                        | -  8个.py文件用于实现实验要求函数
                | - assignment.py
                | - preprocess.py
                | - visualize.py
        | - data
                | - mnist
                        | - 四个数据集文件
                | - Iris (可以忽略,不在本实验中使用)


1. 预处理的数据


       该文件为实验自带,主要实现功能为:从../data/mnist/中的4个.gz文件中读取到mnist数据集的分别用于Tran和Test训练集和测试集(2*2=四个)。


preprocess.py


import gzip
import pickle
from unicodedata import numeric
import numpy as np
"""
TODO: 
Same as HW1. Feel free to copy and paste your old implementation here.
It's a good time to vectorize it, while you're at it!
No need to include CIFAR-specific methods.
"""
def get_data_MNIST(subset, data_path="../data", is_reshape=True):
    """
    :param subset: string indicating whether we want the training or testing data 
        (only accepted values are 'train' and 'test')
    :param data_path: directory containing the training and testing inputs and labels
    :return: NumPy array of inputs (float32) and labels (uint8)
    """
    ## http://yann.lecun.com/exdb/mnist/
    subset = subset.lower().strip()
    assert subset in ("test", "train"), f"unknown data subset {subset} requested"
    inputs_file_path, labels_file_path, num_examples = {
        "train": ("train-images-idx3-ubyte.gz", "train-labels-idx1-ubyte.gz", 60000),
        "test": ("t10k-images-idx3-ubyte.gz", "t10k-labels-idx1-ubyte.gz", 10000),
    }[subset]
    inputs_file_path = f"{data_path}/mnist/{inputs_file_path}"
    labels_file_path = f"{data_path}/mnist/{labels_file_path}"
    ## TODO: read the image file and normalize, flatten, and type-convert image
    with open(inputs_file_path, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream:
        buf = bytestream.read(num_examples*28*28 + 16)
        dt = np.dtype(np.uint8)
        temp = np.frombuffer(buf, dtype=dt) 
        image = temp[16:]
        if is_reshape:
            image = image.reshape((num_examples,28*28))
        else:
            image = image.reshape((num_examples, 28, 28, 1))
        image = image/255.0
    print(image.shape)
    ## TODO: read the label file
    with open(labels_file_path, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream:
        buf = bytestream.read(num_examples + 8)
        dt = np.dtype(np.uint8)
        temp = np.frombuffer(buf, dtype=dt) 
        label = temp[8:]
    return image, label
## THE REST ARE OPTIONAL!
'''
def shuffle_data(image_full, label_full, seed):
    pass
def get_subset(image_full, label_full, class_list=list(range(10)), num=100):
    pass
'''


2、独热编码


       此文件用于实现独热编码,需要手写的地方如下:


● fit(): [TODO]在这个函数中,你需要在Data(将其存储在self.uniq中)并创建一个以标签作为键的字典和它们对应的一个热编码作为值。提示:你可能想这么做查看np.eye()以获得单热编码。最终,您将存储它在self.uniq2oh字典。


● forward():在这个函数中,我们传递一个向量,包含对象中所有实际的标签训练集并调用fit()来用unique填充uniq2oh字典标签及其对应的one-hot编码,然后使用它返回一个针对训练集中每个标签的单热编码标签数组。


这个函数已经为您填好了!


●inverse():在函数中,我们将one-hot编码反转为实际编码标签。


这已经为你做过了。


例如,如果我们有标签X和Y,其单热编码为[1,0]和[0,1],我们将{X: [1,0], Y:[0,1]}。


对于MNIST,你将有10个标签,所以你的字典应该有10个条目!


onehot.py


import numpy as np
from .core import Callable
class OneHotEncoder(Callable):
    """
    One-Hot Encodes labels. First takes in a candidate set to figure out what elements it
    needs to consider, and then one-hot encodes subsequent input datasets in the
    forward pass.
    SIMPLIFICATIONS:
     - Implementation assumes that entries are individual elements.
     - Forward will call fit if it hasn't been done yet; most implementations will just error.
     - keras does not have OneHotEncoder; has LabelEncoder, CategoricalEncoder, and to_categorical()
    """
    def fit(self, data):
        """
        Fits the one-hot encoder to a candidate dataset. Said dataset should contain
        all encounterable elements.
        :param data: 1D array containing labels.
            For example, data = [0, 1, 3, 3, 1, 9, ...]
        """
        ## TODO: Fetch all the unique labels and create a dictionary with
        ## the unique labels as keys and their one hot encodings as values
        ## HINT: look up np.eye() and see if you can utilize it!
        ## HINT: Wouldn't it be nice if we just gave you the implementation somewhere...
        self.uniq = np.unique(data)  # all the unique labels from `data`
        self.uniq2oh = {}  # a lookup dictionary with labels and corresponding encodings
        eye = np.eye(len(self.uniq))
        for i in range(len(self.uniq)):
            self.uniq2oh[self.uniq[i]] = eye[i]
    def forward(self, data):
        if not hasattr(self, "uniq2oh"):
            self.fit(data)
        return np.array([self.uniq2oh[x] for x in data])
    def inverse(self, data):
        assert hasattr(self, "uniq"), \
            "forward() or fit() must be called before attempting to invert"
        return np.array([self.uniq[x == 1][0] for x in data])


3、核心抽象


       本文件为实验给定代码,无需做出修改,


core.py


from abc import ABC, abstractmethod  # # For abstract method support
from typing import Tuple
import numpy as np
## DO NOT MODIFY THIS CLASS
class Callable(ABC):
    """
    Callable Sub-classes:
     - CategoricalAccuracy (./metrics.py)       - TODO
     - OneHotEncoder       (./preprocess.py)    - TODO
     - Diffable            (.)                  - DONE
    """
    def __call__(self, *args, **kwargs) -> np.array:
        """Lets `self()` and `self.forward()` be the same"""
        return self.forward(*args, **kwargs)
    @abstractmethod
    def forward(self, *args, **kwargs) -> np.array:
        """Pass inputs through function. Can store inputs and outputs as instance variables"""
        pass
## DO NOT MODIFY THIS CLASS
class Diffable(Callable):
    """
    Diffable Sub-classes:
     - Dense            (./layers.py)           - TODO
     - LeakyReLU, ReLU  (./activations.py)      - TODO
     - Softmax          (./activations.py)      - TODO
     - MeanSquaredError (./losses.py)           - TODO
    """
    """Stores whether the operation being used is inside a gradient tape scope"""
    gradient_tape = None  ## All-instance-shared variable
    def __init__(self):
        """Is the layer trainable"""
        super().__init__()
        self.trainable = True  ## self-only instance variable
    def __call__(self, *args, **kwargs) -> np.array:
        """
        If there is a gradient tape scope in effect, perform AND RECORD the operation.
        Otherwise... just perform the operation and don't let the gradient tape know.
        """
        if Diffable.gradient_tape is not None:
            Diffable.gradient_tape.operations += [self]
        return self.forward(*args, **kwargs)
    @abstractmethod
    def input_gradients(self: np.array) -> np.array:
        """Returns gradient for input (this part gets specified for all diffables)"""
        pass
    def weight_gradients(self: np.array) -> Tuple[np.array, np.array]:
        """Returns gradient for weights (this part gets specified for SOME diffables)"""
        return ()
    def compose_to_input(self, J: np.array) -> np.array:
        """
        Compose the inputted cumulative jacobian with the input jacobian for the layer.
        Implemented with batch-level vectorization.
        Requires `input_gradients` to provide either batched or overall jacobian.
        Assumes input/cumulative jacobians are matrix multiplied
        """
        #  print(f"Composing to input in {self.__class__.__name__}")
        ig = self.input_gradients()
        batch_size = J.shape[0]
        n_out, n_in = ig.shape[-2:]
        j_new = np.zeros((batch_size, n_out), dtype=ig.dtype)
        for b in range(batch_size):
            ig_b = ig[b] if len(ig.shape) == 3 else ig
            j_new[b] = ig_b @ J[b]
        return j_new
    def compose_to_weight(self, J: np.array) -> list:
        """
        Compose the inputted cumulative jacobian with the weight jacobian for the layer.
        Implemented with batch-level vectorization.
        Requires `weight_gradients` to provide either batched or overall jacobian.
        Assumes weight/cumulative jacobians are element-wise multiplied (w/ broadcasting)
        and the resulting per-batch statistics are averaged together for avg per-param gradient.
        """
        # print(f'Composing to weight in {self.__class__.__name__}')
        assert hasattr(
            self, "weights"
        ), f"Layer {self.__class__.__name__} cannot compose along weight path"
        J_out = []
        ## For every weight/weight-gradient pair...
        for w, wg in zip(self.weights, self.weight_gradients()):
            batch_size = J.shape[0]
            ## Make a cumulative jacobian which will contribute to the final jacobian
            j_new = np.zeros((batch_size, *w.shape), dtype=wg.dtype)
            ## For every element in the batch (for a single batch-level gradient updates)
            for b in range(batch_size):
                ## If the weight gradient is a batch of transform matrices, get the right entry.
                ## Allows gradient methods to give either batched or non-batched matrices
                wg_b = wg[b] if len(wg.shape) == 3 else wg
                ## Update the batch's Jacobian update contribution
                j_new[b] = wg_b * J[b]
            ## The final jacobian for this weight is the average gradient update for the batch
            J_out += [np.mean(j_new, axis=0)]
        ## After new jacobian is computed for each weight set, return the list of gradient updatates
        return J_out
class GradientTape:
    def __init__(self):
        ## Log of operations that were performed inside tape scope
        self.operations = []
    def __enter__(self):
        # When tape scope is entered, let Diffable start recording to self.operation
        Diffable.gradient_tape = self
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        # When tape scope is exited, stop letting Diffable record
        Diffable.gradient_tape = None
    def gradient(self) -> list:
        """Get the gradient from first to last recorded operation"""
        ## TODO:
        ##
        ##  Compute weight gradients for all operations.
        ##  If the model has trainable weights [w1, b1, w2, b2] and ends at a loss L.
        ##  the model should return: [dL/dw1, dL/db1, dL/dw2, dL/db2]
        ##
        ##  Recall that self.operations is populated by Diffable class instances...
        ##
        ##  Start from the last operation and compute jacobian w.r.t input.
        ##  Continue to propagate the cumulative jacobian through the layer inputs
        ##  until all operations have been differentiated through.
        ##
        ##  If an operation that has weights is encountered along the way,
        ##  compute the weight gradients and add them to the return list.
        ##  Remember to check if the layer is trainable before doing this though...
        grads = []
        return grads


4、网络层


       此层仿造Keras中的Dense,需要手写函数为:


● forward() : [TODO] 实现向前传递和返回输出。


● weight_gradients() : [TODO] 计算关于的梯度权重和偏差。这将用于优化图层。


● input_gradients() : [TODO] 计算关于的梯度层的输入。这将用于将渐变传播到前面的层。


● _initialize_weight() : [TODO]


初始化致密层的权重值默认情况下,将所有权重初始化为零(顺便说一下,这通常是个坏主意)。你也需要允许更复杂的选项(当初始化式为设置为normal, xavier和kaiing)。遵循Keras的数学假设!


〇 Normal:不言自明,单位正态分布。


〇 Xavier Normal:基于keras.GlorotNormal。


〇 Kaiing He Normal:基于Keras.HeNormal。


在实现这些时,你可能会发现np.random.normal很有帮助。的行动计划说明为什么这些不同的初始化方法是必要的,但是欲了解更多细节,请查看这个网站!请随意添加更多初始化器选项!


.layers


import numpy as np
from .core import Diffable
class Dense(Diffable):
    # https://towardsdatascience.com/weight-initialization-in-neural-networks-a-journey-from-the-basics-to-kaiming-954fb9b47c79
    def __init__(self, input_size, output_size, learning_rate=0.01, initializer="kaiming"):
        super().__init__()
        self.w, self.b = self.__class__._initialize_weight(
            initializer, input_size, output_size
        )
        self.weights = [self.w, self.b]
        self.learning_rate = learning_rate
        self.inputs  = None
        self.outputs = None
    def forward(self, inputs):
        """Forward pass for a dense layer! Refer to lecture slides for how this is computed."""
        self.inputs = inputs
        # TODO: implement the forward pass and return the outputs
        self.outputs = np.matmul(inputs, self.w) + self.b
        return self.outputs
    def weight_gradients(self, eta):
        """Calculating the gradients wrt weights and biases!"""
        # TODO: Implement calculation of gradients
        wgrads = np.dot(self.inputs.T, eta)
        bgrads = np.sum(eta, axis=0)
        return wgrads, bgrads
    def input_gradients(self, eta):
        """Calculating the gradients wrt inputs!"""
        # TODO: Implement calculation of gradients
        inputgrads = np.dot(eta, self.w.T)
        wgrads, bgrads = self.weight_gradients(eta)
        self.w = self.w - self.learning_rate*wgrads
        self.b = self.b - self.learning_rate*bgrads
        return inputgrads
    @staticmethod
    def _initialize_weight(initializer, input_size, output_size):
        """
        Initializes the values of the weights and biases. The bias weights should always start at zero.
        However, the weights should follow the given distribution defined by the initializer parameter
        (zero, normal, xavier, or kaiming). You can do this with an if statement
        cycling through each option!
        Details on each weight initialization option:
            - Zero: Weights and biases contain only 0's. Generally a bad idea since the gradient update
            will be the same for each weight so all weights will have the same values.
            - Normal: Weights are initialized according to a normal distribution.
            - Xavier: Goal is to initialize the weights so that the variance of the activations are the
            same across every layer. This helps to prevent exploding or vanishing gradients. Typically
            works better for layers with tanh or sigmoid activation.
            - Kaiming: Similar purpose as Xavier initialization. Typically works better for layers
            with ReLU activation.
        """
        initializer = initializer.lower()
        assert initializer in (
            "zero",
            "normal",
            "xavier",
            "kaiming",
        ), f"Unknown dense weight initialization strategy '{initializer}' requested"
        io_size = (input_size, output_size)
        # TODO: Implement default assumption: zero-init for weights and bias
        initial_b = np.zeros((1,output_size))
        if initializer=="zero":
            initial_w = np.zeros(io_size)
        # TODO: Implement remaining options (normal, xavier, kaiming initializations). Note that
        # strings must be exactly as written in the assert above
        elif initializer=="normal":
            initial_w = np.random.randn(input_size, output_size)
        elif initializer=="xavier":
            initial_w = np.random.randn(input_size, output_size) * np.sqrt(1 / output_size)
        elif initializer=="kaiming":
            initial_w = np.random.randn(input_size, output_size) * np.sqrt(2 / output_size)
        return initial_w, initial_b


5、激活函数


       该文件用于实现LeakRelu激活函数和SoftMax激活函数,手写了他们的前向传播[def forward]和反向传播[def input_fradients]:


● LeakyReLU ()


       〇 forward() : [TODO]给定输入x,计算并返回LeakyReLU(x)。


       〇 input_gradients() : [TODO]计算并返回与通过对LeakyReLU求导得到输入。


● Softmax():(2470 ONLY)


       〇 forward(): [TODO]给定输入x,计算并返回Softmax(x)。确保使用的是稳定的softmax,即减去所有项的最大值防止溢出/undvim erflow问题。


       〇 input_gradients(): [TODO] Softmax()的部分w.r.t输入。


.activations.py


import numpy as np
from .core import Diffable
class LeakyReLU(Diffable):
    def __init__(self, alpha=0.3):
        super().__init__()
        self.alpha = alpha
        self.inputs = None
        self.outputs = None
    def forward(self, inputs):
        # TODO: Given an input array `x`, compute LeakyReLU(x)
        self.inputs = inputs
        # Your code here:
        self.outputs = inputs if inputs.all()>=0 else inputs*self.alpha
        return self.outputs
    def input_gradients(self, eta):
        # TODO: Compute and return the gradients
        eta[self.inputs<=0] = 0
        return eta
    def compose_to_input(self, J):
        # TODO: Maybe you'll want to override the default?
        return super().compose_to_input(J)
class ReLU(LeakyReLU):
    def __init__(self):
        super().__init__(alpha=0)
class Softmax(Diffable):
    def __init__(self):
        super().__init__()
        self.inputs = None
        self.outputs = None
    def forward(self, inputs):
        """Softmax forward pass!"""
        # TODO: Implement
        # HINT: Use stable softmax, which subtracts maximum from
        # all entries to prevent overflow/underflow issues
        self.inputs = inputs
        # Your code here:
        z = inputs - np.max(inputs, axis=-1,keepdims=True)
        numerator = np.exp(z)
        denominator = np.sum(numerator)
        self.outputs = numerator/denominator
        return self.outputs
    def input_gradients(self, etc):
        """Softmax backprop!"""
        # TODO: Compute and return the gradients
        return etc


☆6、填充函数


       本文用于手写Keras中的序列模型SequentialModel类,SequentialModel继承Model类,从而我们先实现Model类具体内容如下:


● compile() : 初始化模型优化器,损失函数和精度函数,它们作为参数输入,供SequentialModel实例使用。


● fit() : 训练模型将输入和输出关联起来。重复训练每个时代,数据是基于参数的批处理。它还计算Batch_metrics、epoch_metrics和聚合的agg_metrics可以用来跟踪模型的训练进度。


● evaluate() : [TODO] 评估最终模型的性能使用测试阶段中提到的指标。它几乎和符合()函数;想想培训和测试之间会发生什么变化)。


● call() : [TODO] 提示:调用顺序模型意味着什么?还记得顺序模型是一堆层,每一层只有一个输入向量和一个输出向量。你可以在在assignment.py中的SequentialModel类。


● batch_step() : [TODO] 您将看到fit()为每一个都调用了这个函数批处理。您将首先计算输入批处理的模型预测。在训练阶段,你需要计算梯度和更新你的权重根据您正在使用的优化器。对于训练过程中的反向传播,你将使用GradientTape从核心抽象(core.py)来记录操作和中间值。然后您将使用模型的优化器来将梯度应用到模型的可训练变量上。最后,计算和返回该批次的损耗和精度。你可以在在assignment.py中的SequentialModel类。


model.py


from abc import ABC, abstractmethod
from collections import defaultdict
import numpy as np
from .core import Diffable
def print_stats(stat_dict, b=None, b_num=None, e=None, avg=False):
    """
    Given a dictionary of names statistics and batch/epoch info,
    print them in an appealing manner. If avg, display stat averages.
    """
    title_str = " - "
    if e is not None:
        title_str += f"Epoch {e+1:2}: "
    if b is not None:
        title_str += f"Batch {b+1:3}"
        if b_num is not None:
            title_str += f"/{b_num}"
    if avg:
        title_str += f"Average Stats"
    print(f"\r{title_str} : ", end="")
    op = np.mean if avg else lambda x: x
    print({k: np.round(op(v), 4) for k, v in stat_dict.items()}, end="")
    print("   ", end="" if not avg else "\n")
def update_metric_dict(super_dict, sub_dict):
    """
    Appends the average of the sub_dict metrics to the super_dict's metric list
    """
    for k, v in sub_dict.items():
        super_dict[k] += [np.mean(v)]
class Model(ABC):
    ###############################################################################################
    ## BEGIN GIVEN
    def __init__(self, layers):
        """
        Initialize all trainable parameters and take layers as inputs
        """
        # Initialize all trainable parameters
        assert all([issubclass(layer.__class__, Diffable) for layer in layers])
        self.layers = layers[:-1]
        self.trainable_variables = []
        for layer in layers:
            if hasattr(layer, "weights") and layer.trainable:
                for weight in layer.weights:
                    self.trainable_variables += [weight]
    def compile(self, optimizer, loss_fn, acc_fn):
        """
        "Compile" the model by taking in the optimizers, loss, and accuracy functions.
        In more optimized DL implementations, this will have more involved processes
        that make the components extremely efficient but very inflexible.
        """
        self.optimizer = optimizer
        self.compiled_loss = loss_fn
        self.compiled_acc = acc_fn
    def fit(self, x, y, epochs, batch_size):
        """
        Trains the model by iterating over the input dataset and feeding input batches
        into the batch_step method with training. At the end, the metrics are returned.
        """
        agg_metrics = defaultdict(lambda: [])
        batch_num = x.shape[0] // batch_size
        for e in range(epochs):
            epoch_metrics = defaultdict(lambda: [])
            for b, b1 in enumerate(range(batch_size, x.shape[0] + 1, batch_size)):
                b0 = b1 - batch_size
                batch_metrics = self.batch_step(x[b0:b1], y[b0:b1], training=True)
                update_metric_dict(epoch_metrics, batch_metrics)
                print_stats(batch_metrics, b, batch_num, e)
            update_metric_dict(agg_metrics, epoch_metrics)
            print_stats(epoch_metrics, e=e, avg=True)
        return agg_metrics
    def evaluate(self, x, y, batch_size):
        """
        X is the dataset inputs, Y is the dataset labels.
        Evaluates the model by iterating over the input dataset in batches and feeding input batches
        into the batch_step method. At the end, the metrics are returned. Should be called on
        the testing set to evaluate accuracy of the model using the metrics output from the fit method.
        NOTE: This method is almost identical to fit (think about how training and testing differ --
        the core logic should be the same)
        """
        # TODO: Implement evaluate similarly to fit.
        agg_metrics = defaultdict(lambda: [])
        batch_num = x.shape[0] // batch_size
        for e in range(1):
            epoch_metrics = defaultdict(lambda: [])
            for b, b1 in enumerate(range(batch_size, x.shape[0] + 1, batch_size)):
                b0 = b1 - batch_size
                batch_metrics = self.batch_step(x[b0:b1], y[b0:b1], training=False)
                update_metric_dict(epoch_metrics, batch_metrics)
                print_stats(batch_metrics, b, batch_num, e)
            update_metric_dict(agg_metrics, epoch_metrics)
            print_stats(epoch_metrics, e=e, avg=True)
        return agg_metrics
    @abstractmethod
    def call(self, inputs):
        """You will implement this in the SequentialModel class in assignment.py"""
        return
    @abstractmethod
    def batch_step(self, x, y, training=True):
        """You will implement this in the SequentialModel class in assignment.py"""
        return


☆def batch_step() 解析:


y_pre = self.call(x)

:通过前向传播得到网络传播一次后的预测值,


loss = self.compiled_loss.forward(y_pre, y)

:将预测值与真实值放入损失函数中通过前向传播得到损失值。


acc = self.compiled_acc(y_pre, y)

:将预测值与真实值放入精度函数中通过前向传播得到精度值。


各函数反向传播的意义:


激活函数:将神经网络上一层的输入,经过神经网络层的非线性变换转换后,通过激活函数,得到输出。常见的激活函数包括:sigmoid, tanh, relu等。


损失函数:度量神经网络的输出的预测值,与实际值之间的差距的一种方式。常见的损失函数包括:最小二乘损失函数、交叉熵损失函数、回归中使用的smooth L1损失函数等。


优化函数:也就是如何把损失值从神经网络的最外层传递到最前面。如最基础的梯度下降算法,随机梯度下降算法,批量梯度下降算法,带动量的梯度下降算法,Adagrad,Adadelta,Adam等。


损失函数


eta = self.compiled_loss.input_gradients()

:通过损失函数的反向传播得到梯度。


激活函数


for layer in self.layers[::-1]:

       eta = layer.input_gradients(eta)

:将梯度传播各个网络层进行反向传播。


优化函数


if training:

           self.optimizer.apply_gradients(self.trainable_variables[0], self.trainable_variables[1])

:将前向传播一次,反向传播一次之后更新的weights, bias放入优化器中,把损失值从神经网络的最外层传递到最前面。


☆ class SequentialModel in assgnment.py


class SequentialModel(Beras.Model):
    """
    Implemented in Beras/model.py
    def __init__(self, layers):
    def compile(self, optimizer, loss_fn, acc_fn):
    def fit(self, x, y, epochs, batch_size):
    def evaluate(self, x, y, batch_size):           ## <- TODO
    """
    def call(self, inputs):
        """
        Forward pass in sequential model. It's helpful to note that layers are initialized in Beras.Model, and
        you can refer to them with self.layers. You can call a layer by doing var = layer(input).
        """
        # TODO: The call function!
        for layer in self.layers:
            inputs = layer.forward(inputs)
        return inputs
    def batch_step(self, x, y, training=True):
        """
        Computes loss and accuracy for a batch. This step consists of both a forward and backward pass.
        If training=false, don't apply gradients to update the model! 
        Most of this method (forward, loss, applying gradients)
        will take place within the scope of Beras.GradientTape()
        """
        # TODO: Compute loss and accuracy for a batch.
        # If training, then also update the gradients according to the optimizer
        y_pre = self.call(x)
        loss = self.compiled_loss.forward(y_pre, y)
        acc = self.compiled_acc(y_pre, y)
        eta = self.compiled_loss.input_gradients()
        # backwarding...
        for layer in self.layers[::-1]:
            #print(type(layer))
            eta = layer.input_gradients(eta)
        if training:
            self.optimizer.apply_gradients(self.trainable_variables[0], self.trainable_variables[1])
        return {"loss": loss, "acc": acc}


7、损失函数


       这是模型训练中最关键的方面之一。在这次作业中,我没有按照实验中的描述那样子去实现MSE或者说均方误差损失函数,而是选择了CrossEntropyLoss损失函数。因为经过实验,其余两个损失函数效果都不太如人意。


注意:一般SoftMax的反向传播是与CrossEntropyLoss损失函数一起进行的,所以不填写SoftMax的方向传播部分。


● forward() : [TODO] 编写一个计算并返回平均值的函数给出预测和实际标签的平方误差。


提示:什么是MSE?在给出预测和实际标签的情况下,均方误差是预测值与实际值之间的差异。


● input_gradients() : [TODO] 计算并返回梯度。使用用微分法推导出这些梯度的公式。


losses.py


import numpy as np
from .core import Diffable
from abc import ABCMeta, abstractmethod
import numpy as np
class CrossEntropyLoss(Diffable):
    def __init__(self):
        self.classifier = Softmax()
    def input_gradients(self):
        return self.grad
    def forward(self, a, y):
        a = self.classifier.forward(a)
        self.grad = a - y
        loss = -1 * np.einsum('ij,ij->', y, np.log(a), optimize=True) / y.shape[0]
        return loss
class Layer(metaclass=ABCMeta):
    @abstractmethod
    def forward(self, *args):
        pass
    @abstractmethod
    def backward(self, *args):
        pass
class Softmax(Layer):
    def forward(self, x):
        v = np.exp(x - x.max(axis=-1, keepdims=True))    
        return v / v.sum(axis=-1, keepdims=True)
    def backward(self, eta):
        pass


8、优化函数


       对于Mnist数据集来讲,单单只是RMSProp :已经完全足够,所以本文只实现了这一个优化函数。


● RMSProp : [TODO] 误差传播的均方根。


.optimizer.py


from collections import defaultdict
import numpy as np
class RMSProp:
    def __init__(self, learning_rate, beta=0.9, epsilon=1e-6):
        self.learning_rate = learning_rate
        self.beta = beta
        self.epsilon = epsilon
        self.v = defaultdict(lambda: 0)
    def apply_gradients(self, weights, grads):
        # TODO: Implement RMSProp optimization
        # Refer to the lab on Optimizers for a better understanding!
        self.mean_square = self.v['mean_square']
        self.mean_square = self.beta*self.mean_square + (1-self.beta)*(grads)**2
        self.v['mean_square'] = self.mean_square
        weights = weights - self.learning_rate/(np.sqrt(self.mean_square) + self.epsilon)*grads
        return


9、精度指标


       本文件简单的实现了一个精度模型,用于测量模型精度:


● forward() : [TODO] 返回模型的分类精度预测概率和真标签。你应该返回的比例预测标签等于真实标签,其中图像的预测标签为与最高概率对应的标签。参考网络或讲座幻灯片的分类精度数学!


.metrics.py


import numpy as np
from .core import Callable
class CategoricalAccuracy(Callable):
    def forward(self, probs, labels):
        """Categorical accuracy forward pass!"""
        super().__init__()
        # TODO: Compute and return the categorical accuracy of your model given the output probabilities and true labels
        probsArg = np.argmax(probs, axis=1)
        labelsArg = np.argmax(labels, axis=1)
        return sum(probsArg==labelsArg)/len(labels)


10、训练和测试


       构建了两个模型,仿造Keras:


● get_simple_model()中的一个简单模型,最多只有一个扩散层(例如:density - ./layers.py)和一个激活函数(在/ activation.py)。虽然可以这样做,但默认情况下为您提供了这个选项。如果你愿意,可以改一下。自动评分器将评估原始的一个!


● get_advanced_model()中稍微复杂一点的模型,有两个或更多扩散层和两个或两个以上的激活函数。我们推荐使用Adam该模型的优化器具有相当低的学习率。


def  get_simple_model() in assgnment.py
def get_simple_model_components():
    """
    Returns a simple single-layer model.
    """
    ## DO NOT CHANGE IN FINAL SUBMISSION
    from Beras.activations import Softmax
    from Beras.layers import Dense
    from Beras.metrics import CategoricalAccuracy
    from Beras.optimizers import BasicOptimizer, RMSProp
    from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy
    # TODO: create a model and compile it with layers and functions of your choice
    model = SequentialModel([Dense(784, 10), Softmax()])
    model.compile(
        optimizer=RMSProp(0.02),
        loss_fn=CrossEntropyLoss(),
        acc_fn=CategoricalAccuracy(),
    )
    return SimpleNamespace(model=model, epochs=10, batch_size=100)
 get_advanced_model() in assgnment.py
def get_advanced_model_components():
    from Beras.activations import Softmax, LeakyReLU
    from Beras.layers import Dense
    from Beras.metrics import CategoricalAccuracy
    from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy
    from Beras.optimizers import BasicOptimizer, RMSProp
    from Beras.batchnorm import BatchNorm
    """
    Returns a multi-layered model with more involved components.
    """
    # TODO: create/compile a model with layers and functions of your choice.
    model = SequentialModel([Dense(784, 398), BatchNorm(398), LeakyReLU(0), Dense(398, 10), Softmax()])
    model.compile(
        optimizer=RMSProp(0.02),
        loss_fn=CrossEntropyLoss(),
        acc_fn=CategoricalAccuracy(),
    )
    return SimpleNamespace(model=model, epochs=12, batch_size=100)


11、可视化的结果


  我们为您提供了visualize_metrics方法来可视化您的损失和每次使用matplotlib后,精确度都会发生变化。


.visualize.py


import matplotlib.pyplot as plt
import numpy as np
def visualize_metrics(losses=[], accuracies=[]):
    """
    param losses: a 1D array of loss values
    param accuracies: a 1D array of accuracy values
    Displays a plot with loss and accuracy values on the y-axis and batch number/epoch number on the
    x-axis
    """
    if not losses or not accuracies:
        return print("Must provide a list of losses/accuracies to visualize")
    x = np.arange(1, max(len(losses), len(accuracies)) + 1)
    plt.plot(x, losses)
    plt.plot(x, accuracies)
    plt.ylabel("Loss/Acc Value")
    plt.show()
def visualize_images(model, train_inputs, train_labels_ohe, num_searching=500):
    """
    param model: a neural network model (i.e. SequentialModel)
    param train_inputs: sample training inputs for the model to predict
    param train_labels_ohe: one-hot encoded training labels corresponding to train_inputs
    Displays 10 sample outputs the model correctly classifies and 10 sample outputs the model
    incorrectly classifies
    """
    rand_idx = np.random.choice(len(train_inputs), num_searching)
    rand_batch = train_inputs[rand_idx]
    probs = model.call(rand_batch)
    pred_classes = np.argmax(probs, axis=1)
    true_classes = np.argmax(train_labels_ohe[rand_idx], axis=1)
    right_idx = np.where(pred_classes == true_classes)
    wrong_idx = np.where(pred_classes != true_classes)
    right = np.reshape(rand_batch[right_idx], (-1, 28, 28))
    wrong = np.reshape(rand_batch[wrong_idx], (-1, 28, 28))
    right_pred_labels = true_classes[right_idx]
    wrong_pred_labels = pred_classes[wrong_idx]
    assert len(right) >= 10, f"Found less than 10 correct predictions!"
    assert len(wrong) >= 10, f"Found less than 10 correct predictions!"
    fig, axs = plt.subplots(2, 10)
    fig.suptitle("Classigications\n(PL = Predicted Label)")
    subsets = [right, wrong]
    pred_labs = [right_pred_labels, wrong_pred_labels]
    for r in range(2):
        for c in range(10):
            axs[r, c].imshow(subsets[r][c], cmap="Greys")
            axs[r, c].set(title=f"PL: {pred_labs[r][c]}")
            plt.setp(axs[r, c].get_xticklabels(), visible=False)
            plt.setp(axs[r, c].get_yticklabels(), visible=False)
            axs[r, c].tick_params(axis="both", which="both", length=0)
    plt.show()


12、 调用前面11步写好的代码,对模型进行训练并且测试


.assignment.py


from types import SimpleNamespace
import Beras
import numpy as np
class SequentialModel(Beras.Model):
    """
    Implemented in Beras/model.py
    def __init__(self, layers):
    def compile(self, optimizer, loss_fn, acc_fn):
    def fit(self, x, y, epochs, batch_size):
    def evaluate(self, x, y, batch_size):           ## <- TODO
    """
    def call(self, inputs):
        """
        Forward pass in sequential model. It's helpful to note that layers are initialized in Beras.Model, and
        you can refer to them with self.layers. You can call a layer by doing var = layer(input).
        """
        # TODO: The call function!
        for layer in self.layers:
            inputs = layer.forward(inputs)
        return inputs
    def batch_step(self, x, y, training=True):
        """
        Computes loss and accuracy for a batch. This step consists of both a forward and backward pass.
        If training=false, don't apply gradients to update the model! 
        Most of this method (forward, loss, applying gradients)
        will take place within the scope of Beras.GradientTape()
        """
        # TODO: Compute loss and accuracy for a batch.
        # If training, then also update the gradients according to the optimizer
        y_pre = self.call(x)
        loss = self.compiled_loss.forward(y_pre, y)
        acc = self.compiled_acc(y_pre, y)
        eta = self.compiled_loss.input_gradients()
        # backwarding...
        for layer in self.layers[::-1]:
            #print(type(layer))
            eta = layer.input_gradients(eta)
        if training:
            self.optimizer.apply_gradients(self.trainable_variables[0], self.trainable_variables[1])
        return {"loss": loss, "acc": acc}
def get_simple_model_components():
    """
    Returns a simple single-layer model.
    """
    ## DO NOT CHANGE IN FINAL SUBMISSION
    from Beras.activations import Softmax
    from Beras.layers import Dense
    from Beras.metrics import CategoricalAccuracy
    from Beras.optimizers import BasicOptimizer, RMSProp
    from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy
    # TODO: create a model and compile it with layers and functions of your choice
    model = SequentialModel([Dense(784, 10), Softmax()])
    model.compile(
        optimizer=RMSProp(0.02),
        loss_fn=CrossEntropyLoss(),
        acc_fn=CategoricalAccuracy(),
    )
    return SimpleNamespace(model=model, epochs=10, batch_size=100)
def get_advanced_model_components():
    from Beras.activations import Softmax, LeakyReLU
    from Beras.layers import Dense
    from Beras.metrics import CategoricalAccuracy
    from Beras.losses import CrossEntropyLoss, MeanSquaredError, CategoricalCrossentropy
    from Beras.optimizers import BasicOptimizer, RMSProp
    from Beras.batchnorm import BatchNorm
    """
    Returns a multi-layered model with more involved components.
    """
    # TODO: create/compile a model with layers and functions of your choice.
    model = SequentialModel([Dense(784, 398), BatchNorm(398), LeakyReLU(0), Dense(398, 10), Softmax()])
    model.compile(
        optimizer=RMSProp(0.02),
        loss_fn=CrossEntropyLoss(),
        acc_fn=CategoricalAccuracy(),
    )
    return SimpleNamespace(model=model, epochs=12, batch_size=100)
if __name__ == "__main__":
    """
    Read in MNIST data and initialize/train/test your model.
    """
    from Beras.onehot import OneHotEncoder
    import preprocess
    ## Read in MNIST data,
    train_inputs, train_labels = preprocess.get_data_MNIST("train", "../data")
    test_inputs,  test_labels  = preprocess.get_data_MNIST("test",  "../data")
    ## TODO: Use the OneHotEncoder class to one hot encode the labels
    # ohe = lambda x: 0  ## placeholder function: returns zero for a given input
    ohe = OneHotEncoder()
    ohe.fit(train_labels)
    ## Get your model to train and test
    simple = False
    args = get_simple_model_components() if simple else get_advanced_model_components()
    model = args.model
    ## REMINDER: Threshold of accuracy: 
    ##  1470: >85% on testing accuracy from get_simple_model_components
    ##  2470: >95% on testing accuracy from get_advanced_model_components
    # TODO: Fit your model to the training input and the one hot encoded labels
    # Remember to pass all the arguments that SequentialModel.fit() requires
    # such as number of epochs and the batch size
    print('---------------------------[[[Train]]]]---------------------------')
    train_agg_metrics = model.fit(
        train_inputs, 
        ohe(train_labels), 
        epochs     = args.epochs, 
        batch_size = args.batch_size
    )
    print('-------------------------------------------------------------------')
    ## Feel free to use the visualize_metrics function to view your accuracy and loss.
    ## The final accuracy returned during evaluation must be > 80%.
    # from visualize import visualize_images, visualize_metrics
    # visualize_metrics(train_agg_metrics["loss"], train_agg_metrics["acc"])
    # visualize_images(model, train_inputs, ohe(tr  ain_labels))
    ## TODO: Evaluate your model using your testing inputs and one hot encoded labels.
    ## This is the number you will be using!
    print('---------------------------[[[Evaluate]]]---------------------------')
    test_agg_metrics = model.evaluate(test_inputs, ohe(test_labels), batch_size=100)
    print('Testing Performance:', test_agg_metrics)
    print('-----------------------------------------------------------------')


自认为算是一次我做的勉强合格(不够好的意思)的作业,提供的答案也仅供参考,祝大家玩的开心!


相关文章
|
4月前
|
机器学习/深度学习 数据采集 存储
时间序列预测新突破:深入解析循环神经网络(RNN)在金融数据分析中的应用
【10月更文挑战第7天】时间序列预测是数据科学领域的一个重要课题,特别是在金融行业中。准确的时间序列预测能够帮助投资者做出更明智的决策,比如股票价格预测、汇率变动预测等。近年来,随着深度学习技术的发展,尤其是循环神经网络(Recurrent Neural Networks, RNNs)及其变体如长短期记忆网络(LSTM)和门控循环单元(GRU),在处理时间序列数据方面展现出了巨大的潜力。本文将探讨RNN的基本概念,并通过具体的代码示例展示如何使用这些模型来进行金融数据分析。
557 2
|
22天前
|
JSON 缓存 API
解析电商商品详情API接口系列,json数据示例参考
电商商品详情API接口是电商平台的重要组成部分,提供了商品的详细信息,支持用户进行商品浏览和购买决策。通过合理的API设计和优化,可以提升系统性能和用户体验。希望本文的解析和示例能够为开发者提供参考,帮助构建高效、可靠的电商系统。
35 12
|
2月前
|
机器学习/深度学习 人工智能 算法
深入解析图神经网络:Graph Transformer的算法基础与工程实践
Graph Transformer是一种结合了Transformer自注意力机制与图神经网络(GNNs)特点的神经网络模型,专为处理图结构数据而设计。它通过改进的数据表示方法、自注意力机制、拉普拉斯位置编码、消息传递与聚合机制等核心技术,实现了对图中节点间关系信息的高效处理及长程依赖关系的捕捉,显著提升了图相关任务的性能。本文详细解析了Graph Transformer的技术原理、实现细节及应用场景,并通过图书推荐系统的实例,展示了其在实际问题解决中的强大能力。
260 30
|
2月前
|
域名解析 弹性计算 安全
阿里云服务器租用、注册域名、备案及域名解析完整流程参考(图文教程)
对于很多初次建站的用户来说,选购云服务器和注册应及备案和域名解析步骤必须了解的,目前轻量云服务器2核2G68元一年,2核4G4M服务器298元一年,域名注册方面,阿里云推出域名1元购买活动,新用户注册com和cn域名2年首年仅需0元,xyz和top等域名首年仅需1元。对于建站的用户来说,购买完云服务器并注册好域名之后,下一步还需要操作备案和域名绑定。本文为大家展示阿里云服务器的购买流程,域名注册、绑定以及备案的完整流程,全文以图文教程形式为大家展示具体细节及注意事项,以供新手用户参考。
|
3月前
|
存储 机器学习/深度学习 编解码
阿里云服务器计算型c8i实例解析:实例规格性能及使用场景和最新价格参考
计算型c8i实例作为阿里云服务器家族中的重要成员,以其卓越的计算性能、稳定的算力输出、强劲的I/O引擎以及芯片级的安全加固,广泛适用于机器学习推理、数据分析、批量计算、视频编码、游戏服务器前端、高性能科学和工程应用以及Web前端服务器等多种场景。本文将全面介绍阿里云服务器计算型c8i实例,从规格族特性、适用场景、详细规格指标、性能优势、实际应用案例,到最新的活动价格,以供大家参考。
|
3月前
|
运维 网络协议 算法
7 层 OSI 参考模型:详解网络通信的层次结构
7 层 OSI 参考模型:详解网络通信的层次结构
531 1
|
4月前
|
存储 固态存储 安全
阿里云服务器X86计算架构解析与X86计算架构云服务器收费价格参考
阿里云服务器架构分为X86计算、Arm计算、高性能计算等多种架构,其中X86计算是用户选择最多的一种架构,本文将深入探讨阿里云X86计算架构的云服务器,包括其技术特性、适用场景、性能优势以及最新价格情况。
|
4月前
|
编解码 弹性计算 应用服务中间件
阿里云服务器Arm计算架构解析:Arm计算架构云服务器租用收费标准价格参考
阿里云服务器架构分为X86计算、Arm计算、高性能计算等多种架构,其中Arm计算架构以其低功耗、高效率的特点受到广泛关注。本文将深入解析阿里云Arm计算架构云服务器的技术特点、适用场景以及包年包月与按量付费的收费标准与最新活动价格情况,以供选择参考。
|
3月前
|
机器学习/深度学习 人工智能 自动驾驶
深入解析深度学习中的卷积神经网络(CNN)
深入解析深度学习中的卷积神经网络(CNN)
58 0
|
5月前
阿里云服务器带宽价格参考:选择1M、3M、5M、10M宽带价格解析
阿里云服务器1M、3M、5M、10M宽带需要多少钱?单说阿里云服务器宽带多少钱,而不确定云服务器实例规格及cpu和内存配置的话,是没办法具体说多少钱的,因为云服务器的价格受很多因素影响。本文将详细解析阿里云服务器在选择1M、3M、5M、10M不同带宽下的价格差异,以供大家参考。
阿里云服务器带宽价格参考:选择1M、3M、5M、10M宽带价格解析

推荐镜像

更多