修改的地方:
1. 函数名、数据格式与sklearn一致
2. 增加了网络性能不变时就结束的判断(否则,存在偶然的机会报NaN错误)
# neuralnetwork.py # modified by Robin 2016/04/20 import numpy as np from math import exp, pow from mpl_toolkits.mplot3d import Axes3D import matplotlib.pyplot as plt import sys import copy from scipy.linalg import norm, pinv class Layer: '''层''' def __init__(self, w, b, neure_number, transfer_function, layer_index): self.transfer_function = transfer_function self.neure_number = neure_number self.layer_index = layer_index self.w = w self.b = b class NetStruct: '''神经网络结构''' def __init__(self, ni, nh, no, active_fun_list): # ni 输入层节点(int) # ni 隐藏层节点(int 或 list) # no 输出层节点(int) # active_fun_list 隐藏层激活函数类型(list) # ==> 1 self.neurals = [] # 各层的神经元数目 self.neurals.append(ni) if isinstance(nh, list): self.neurals.extend(nh) else: self.neurals.append(nh) self.neurals.append(no) # ==> 2 if len(self.neurals)-2 == len(active_fun_list): active_fun_list.append('line') self.active_fun_list = active_fun_list # ==> 3 self.layers = [] # 所有的层 for i in range(0, len(self.neurals)): if i == 0: self.layers.append(Layer([], [], self.neurals[i], 'none', i)) continue f = self.neurals[i - 1] s = self.neurals[i] self.layers.append(Layer(np.random.randn(s, f), np.random.randn(s, 1), self.neurals[i], self.active_fun_list[i-1], i)) class NeuralNetwork: '''神经网络''' def __init__(self, net_struct, mu=1e-3, beta=10, tol=0.1): '''初始化''' self.net_struct = net_struct self.mu = mu self.beta = beta self.tol = tol def fit(self, x, y, n_iter=100, method = 'lm'): '''训练''' self.net_struct.x = x.T self.net_struct.y = y.reshape(1, -1) self.n_iter = n_iter if(method == 'lm'): self.lm() def predict(self, x, predict_type='clf'): '''预测''' self.net_struct.x = x.T self.forward() layer_num = len(self.net_struct.layers) pred = self.net_struct.layers[layer_num - 1].output_val pred = pred.reshape(-1,) if predict_type == 'clf': # 分类 return np.where(pred>pred.mean(), 1, -1) elif predict_type == 'reg': # 回归 return pred def score(self, X, y): '''预测效果,只适用于分类,不适用回归''' y0 = self.predict(X, predict_type='clf') return np.sum(y0 == y) / X.shape[0] def actFun(self, z, active_type = 'sigm'): ''' 激活函数 ''' # activ_type: 激活函数类型有 sigm、tanh、radb、line if active_type == 'sigm': f = 1.0 / (1.0 + np.exp(-z)) elif active_type == 'tanh': f = (np.exp(z) + np.exp(-z)) / (np.exp(z) + np.exp(-z)) elif active_type == 'radb': f = np.exp(-z * z) elif active_type == 'line': f = z return f def actFunGrad(self, z, active_type = 'sigm'): '''激活函数的变化(派生)率''' # active_type: 激活函数类型有 sigm、tanh、radb、line y = self.actFun(z, active_type) if active_type == 'sigm': grad = y * (1.0 - y) elif active_type == 'tanh': grad = 1.0 - y * y elif active_type == 'radb': grad = -2.0 * z * y elif active_type == 'line': m = z.shape[0] n = z.shape[1] grad = np.ones((m, n)) return grad def forward(self): ''' 前向 ''' layer_num = len(self.net_struct.layers) for i in range(0, layer_num): if i == 0: curr_layer = self.net_struct.layers[i] curr_layer.input_val = self.net_struct.x curr_layer.output_val = self.net_struct.x continue before_layer = self.net_struct.layers[i - 1] curr_layer = self.net_struct.layers[i] curr_layer.input_val = curr_layer.w.dot(before_layer.output_val) + curr_layer.b curr_layer.output_val = self.actFun(curr_layer.input_val, self.net_struct.active_fun_list[i - 1]) def backward(self): '''反向''' layer_num = len(self.net_struct.layers) last_layer = self.net_struct.layers[layer_num - 1] last_layer.error = -self.actFunGrad(last_layer.input_val, self.net_struct.active_fun_list[layer_num - 2]) layer_index = list(range(1, layer_num - 1)) layer_index.reverse() for i in layer_index: curr_layer = self.net_struct.layers[i] curr_layer.error = (last_layer.w.transpose().dot(last_layer.error)) * self.actFunGrad(curr_layer.input_val,self.net_struct.active_fun_list[i - 1]) last_layer = curr_layer def parDeriv(self): '''标准梯度(求导)''' layer_num = len(self.net_struct.layers) for i in range(1, layer_num): befor_layer = self.net_struct.layers[i - 1] befor_input_val = befor_layer.output_val.transpose() curr_layer = self.net_struct.layers[i] curr_error = curr_layer.error curr_error = curr_error.reshape(curr_error.shape[0]*curr_error.shape[1], 1, order='F') row = curr_error.shape[0] col = befor_input_val.shape[1] a = np.zeros((row, col)) num = befor_input_val.shape[0] neure_number = curr_layer.neure_number for i in range(0, num): a[neure_number*i:neure_number*i + neure_number,:] = np.repeat([befor_input_val[i,:]],neure_number,axis = 0) tmp_w_par_deriv = curr_error * a curr_layer.w_par_deriv = np.zeros((num, befor_layer.neure_number * curr_layer.neure_number)) for i in range(0, num): tmp = tmp_w_par_deriv[neure_number*i:neure_number*i + neure_number,:] tmp = tmp.reshape(tmp.shape[0] * tmp.shape[1], order='C') curr_layer.w_par_deriv[i, :] = tmp curr_layer.b_par_deriv = curr_layer.error.transpose() def jacobian(self): '''雅可比行列式''' layers = self.net_struct.neurals row = self.net_struct.x.shape[1] col = 0 for i in range(0, len(layers) - 1): col = col + layers[i] * layers[i + 1] + layers[i + 1] j = np.zeros((row, col)) layer_num = len(self.net_struct.layers) index = 0 for i in range(1, layer_num): curr_layer = self.net_struct.layers[i] w_col = curr_layer.w_par_deriv.shape[1] b_col = curr_layer.b_par_deriv.shape[1] j[:, index : index + w_col] = curr_layer.w_par_deriv index = index + w_col j[:, index : index + b_col] = curr_layer.b_par_deriv index = index + b_col return j def gradCheck(self): '''梯度检查''' W1 = self.net_struct.layers[1].w b1 = self.net_struct.layers[1].b n = self.net_struct.layers[1].neure_number W2 = self.net_struct.layers[2].w b2 = self.net_struct.layers[2].b x = self.net_struct.x p = [] p.extend(W1.reshape(1,W1.shape[0]*W1.shape[1],order = 'C')[0]) p.extend(b1.reshape(1,b1.shape[0]*b1.shape[1],order = 'C')[0]) p.extend(W2.reshape(1,W2.shape[0]*W2.shape[1],order = 'C')[0]) p.extend(b2.reshape(1,b2.shape[0]*b2.shape[1],order = 'C')[0]) old_p = p jac = [] for i in range(0, x.shape[1]): xi = np.array([x[:,i]]) xi = xi.transpose() ji = [] for j in range(0, len(p)): W1 = np.array(p[0:2*n]).reshape(n,2,order='C') b1 = np.array(p[2*n:2*n+n]).reshape(n,1,order='C') W2 = np.array(p[3*n:4*n]).reshape(1,n,order='C') b2 = np.array(p[4*n:4*n+1]).reshape(1,1,order='C') z2 = W1.dot(xi) + b1 a2 = self.actFun(z2) z3 = W2.dot(a2) + b2 h1 = self.actFun(z3) p[j] = p[j] + 0.00001 W1 = np.array(p[0:2*n]).reshape(n,2,order='C') b1 = np.array(p[2*n:2*n+n]).reshape(n,1,order='C') W2 = np.array(p[3*n:4*n]).reshape(1,n,order='C') b2 = np.array(p[4*n:4*n+1]).reshape(1,1,order='C') z2 = W1.dot(xi) + b1 a2 = self.actFun(z2) z3 = W2.dot(a2) + b2 h = self.actFun(z3) g = (h[0][0]-h1[0][0])/0.00001 ji.append(g) jac.append(ji) p = old_p return jac def jjje(self): '''计算jj与je''' layer_num = len(self.net_struct.layers) e = self.net_struct.y - self.net_struct.layers[layer_num - 1].output_val e = e.transpose() j = self.jacobian() #check gradient #j1 = -np.array(self.gradCheck()) #jk = j.reshape(1,j.shape[0]*j.shape[1]) #jk1 = j1.reshape(1,j1.shape[0]*j1.shape[1]) #plt.plot(jk[0]) #plt.plot(jk1[0],'.') #plt.show() jj = j.transpose().dot(j) je = -j.transpose().dot(e) return[jj, je] def lm(self): '''Levenberg-Marquardt训练算法''' mu = self.mu beta = self.beta n_iter = self.n_iter tol = self.tol y = self.net_struct.y layer_num = len(self.net_struct.layers) self.forward() pred = self.net_struct.layers[layer_num - 1].output_val pref = self.perfermance(y, pred) for i in range(0, n_iter): print('iter:',i, 'error:', pref) #1) 第一步: if(pref < tol): break #2) 第二步: self.backward() self.parDeriv() [jj, je] = self.jjje() while(1): #3) 第三步: A = jj + mu * np.diag(np.ones(jj.shape[0])) delta_w_b = pinv(A).dot(je) #4) 第四步: old_net_struct = copy.deepcopy(self.net_struct) self.updataNetStruct(delta_w_b) self.forward() pred1 = self.net_struct.layers[layer_num - 1].output_val pref1 = self.perfermance(y, pred1) if (pref1 < pref): mu = mu / beta pref = pref1 break if pref - pref1 ==0: # 1e-12: # 修改:若几乎不变了,就停止 return mu = mu * beta self.net_struct = copy.deepcopy(old_net_struct) def updataNetStruct(self, delta_w_b): '''更新网络权重及阈值''' layer_num = len(self.net_struct.layers) index = 0 for i in range(1, layer_num): before_layer = self.net_struct.layers[i - 1] curr_layer = self.net_struct.layers[i] w_num = before_layer.neure_number * curr_layer.neure_number b_num = curr_layer.neure_number w = delta_w_b[index : index + w_num] w = w.reshape(curr_layer.neure_number, before_layer.neure_number, order='C') index = index + w_num b = delta_w_b[index : index + b_num] index = index + b_num curr_layer.w += w curr_layer.b += b def perfermance(self, y, pred): '''性能函数''' error = y - pred return norm(error) / len(y) # 以下函数为测试样例 def plotSamples(n = 40): x = np.array([np.linspace(0, 3, n)]) x = x.repeat(n, axis = 0) y = x.transpose() z = np.zeros((n, n)) for i in range(0, x.shape[0]): for j in range(0, x.shape[1]): z[i][j] = sampleFun(x[i][j], y[i][j]) fig = plt.figure() ax = fig.gca(projection='3d') surf = ax.plot_surface(x, y, z, cmap='autumn', cstride=2, rstride=2) ax.set_xlabel("X-Label") ax.set_ylabel("Y-Label") ax.set_zlabel("Z-Label") plt.show() def sinSamples(n): x = np.array([np.linspace(-0.5, 0.5, n)]) #x = x.repeat(n, axis = 0) y = x + 0.2 z = np.zeros((n, 1)) for i in range(0, x.shape[1]): z[i] = np.sin(x[0][i] * y[0][i]) X = np.zeros((n, 2)) n = 0 for xi, yi in zip(x.transpose(), y.transpose()): X[n][0] = xi X[n][1] = yi n = n + 1 # print(x.shape, y.shape) # print(X.shape, z.shape) #return X, z.transpose() return X, z.reshape(-1,) def peaksSamples(n): x = np.array([np.linspace(-3, 3, n)]) x = x.repeat(n, axis = 0) y = x.transpose() z = np.zeros((n, n)) for i in range(0, x.shape[0]): for j in range(0, x.shape[1]): z[i][j] = sampleFun(x[i][j], y[i][j]) X = np.zeros((n*n, 2)) x_list = x.reshape(n*n,1 ) y_list = y.reshape(n*n,1) z_list = z.reshape(n*n,1) n = 0 for xi, yi in zip(x_list, y_list): X[n][0] = xi X[n][1] = yi n = n + 1 # print(x.shape, y.shape) # print(X.shape, z.shape, z_list.shape, z_list.transpose().shape) #return X,z_list.transpose() return X, z.reshape(-1,) def sampleFun( x, y): z = 3*pow((1-x),2) * exp(-(pow(x,2)) - pow((y+1),2)) - 10*(x/5 - pow(x, 3) - pow(y, 5)) * exp(-pow(x, 2) - pow(y, 2)) - 1/3*exp(-pow((x+1), 2) - pow(y, 2)) return z def test(): active_fun_list = ['sigm','sigm','sigm']# 【必须】设置【各】隐层的激活函数类型,可以设置为tanh,radb,tanh,line类型,如果不显式的设置最后一层为line ns = NetStruct(2, [10, 10, 10], 1, active_fun_list) # 确定神经网络结构,中间两个隐层各10个神经元 nn = NeuralNetwork(ns) # 神经网络类实例化 #[X, z] = peaksSamples(20) # 产生训练数据 [X, z] = sinSamples(20) # 第二个训练数据 nn.fit(X, z) # 训练!!!!!! #[X0, z0] = peaksSamples(40) # 产生测试数据 [X0, z0] = sinSamples(40) # 第二个测试数据 z1 = nn.predict(X0) # 预测!!!!!! fig = plt.figure() ax = fig.add_subplot(111) ax.plot(z0) # 画出真实值 real data ax.plot(z1,'r.') # 画出预测值 predict data plt.legend(('real data', 'predict data')) plt.show() def test_lotto(): import lotto import kline lt = lotto.Lotto() lt.make_real_history() series = lt.mod(7, m=2) kl = kline.KLine(series) X, y, x_last = kl.data_for_predict() active_fun_list = ['sigm','sigm','sigm']# 【必须】设置【各】隐层的激活函数类型,可以设置为tanh,radb,tanh,line类型,如果不显式的设置最后一层为line ns = NetStruct(4, [10, 10], 1, active_fun_list) # 确定神经网络结构,中间两个隐层各10个神经元 nn = NeuralNetwork(ns) # 神经网络类实例化 nn.fit(X, y) # 训练!!!!!! y0 = nn.predict(X) # 预测!!!!!! print(y==y0) print(nn.score(X, y)) # 0.63... def test3(): X = np.array([[1,1,0], [1,0,0], [0,1,1], [0,0,1]]) y = np.array([1,-1,-1,1]) active_fun_list = ['sigm','sigm']# 【必须】设置【各】隐层的激活函数类型,可以设置为tanh,radb,tanh,line类型,如果不显式的设置最后一层为line ns = NetStruct(3, [10], 1, active_fun_list) # 确定神经网络结构,中间两个隐层各10个神经元 nn = NeuralNetwork(ns) # 神经网络类实例化 nn.fit(X, y) # 训练!!!!!! y0 = nn.predict(X) # 预测!!!!!! print(y==y0) print(nn.score(X, y)) # 0.63... # 测试 if __name__ == '__main__': # test() #test_lotto() test3() plotSamples()