TF的模型搭建
总的来说常见带监督的机器学习问题分为两类:分类和回归,我们使用Tensorflow来解决这些问题的时候就得自己搭建网络模型,但是对于TensorFlow不同级别的API也就产生了不同的模型搭建方式。越底层的API灵活性越大,可以更加自由地添加自己想加入的内容,但是编码难度就有所提高;反之,越高阶的api具有更好的封装性,简单的几行代码就能实现模型的搭建,但是灵活性难以避免的有所下降。今天就讲讲几种不同层次API搭建网络的方式。
1.回归问题
1.1 数据生成
首先我们得自己设计一个回归问题,也就是建一个方程,然后训练网络去拟合它。
我们熟知的线性方程:Y=W∗X+bY=W*X+bY=W∗X+b
我们这里生成200个数据,X是在(-10,10)之间的均匀分布,W为(2,-2),b=3,另外添加噪音
# 设置随机数种子 tf.random.set_seed(0) # 样本数 n=200 # 生成测试用数据集 # Y=WX+b+noise # 相当于两个方程: Y=2X+3+noise;Y=-3X+3+noise X = tf.random.uniform([n,2],minval=-10,maxval=10) w0 = tf.constant([[2.0],[-2.0]]) b0 = tf.constant([[3.0]]) Y = X@w0 + b0 + tf.random.normal([n,1],mean = 0.0,stddev= 2.0) 复制代码
生成的数据长这样
plt.figure(figsize = (12,5)) ax1 = plt.subplot(121) ax1.scatter(X[:,0],Y[:,0], c = "b") plt.xlabel("x1") plt.ylabel("y",rotation = 0) ax2 = plt.subplot(122) ax2.scatter(X[:,1],Y[:,0], c = "g") plt.xlabel("x2") plt.ylabel("y",rotation = 0) plt.show() 复制代码
在这里为了简单起见,我就不划分训练集、测试集啥的,直接用全部数据训练加预测。接下来我们还需要构造一个数据生成器,用来生成每一个batch_size中的X和Y。
数据生成器的大致思路如下:
- 首先随机打乱数据下标
- 遍历数据,每一个batch_size作为一个分隔,得到打乱后的下标切片(大小为batch_size)
- 使用tf.gather()函数将X,Y分别和上一步得到的随机下标组合,yield返回生成器
tf.gather(params,indices,axis=0)函数是根据indices下标从params中返回对应元素的切片
# 构建数据生成器 # 其中tf.gather(params,indices,axis=0)是根据indices下标从params中返回对应元素的切片 def data_iter(features, labels, batch_size=8): num_examples = len(features) indices = list(range(num_examples)) np.random.shuffle(indices) for i in range(0, num_examples, batch_size): indexs = indices[i: min(i + batch_size, num_examples)] yield tf.gather(features,indexs), tf.gather(labels,indexs) # 测试数据生成器效果 batch_size = 8 (datas,labels) = next(data_iter(X,Y,batch_size)) print(datas) 复制代码
到这里,数据生成以及构建生成器已经实现了,接下来就是定义并训练模型,也是今天的关键。
1.2 高阶API实现
首先,我想用大家最常见的写法来实现模型构建,这样可能不用涉及太多细节知识就可以简单搭建起来网络,放在开始更能让人接受。
在这里我设置学习率为0.001,batch_size=20,epochs=100,优化器选择SGD
在这里关于优化器,我想额外说几句。优化器作为参数优化的关键,在模型训练中的重要程度可想而知。大多时候我们选择Adam作为优化器,但是就最近我自己的实践结果来看,Adam拟合慢而且效果并不太好(这里的效果指的是最终的loss大小),达到同样的loss需要更多次的迭代训练,而且往往loss也高于SGD。然后我就去研究了一下近几年关于优化器所提出的新方法,比如AggMo、Apollo、diffGrad、Lamb、MADGRAD……我对于优化器有了更深的理解,同时也在思考有没有更好的优化器方法。当然,上面提到的一些方法不论是在Tensorflow还是Pytorch中的optimizers中都是没有对应API的,不过有人专门收集整理建了一个Pytorch兼容的优化器第三方库pytorch-optimizer,大家可以去试试新的优化器与经典的优化器之间效果上有什么不同。
好,回到今天的主题,开始搭建网络
这里我们已知方程是一个线性方程,所以只需要一层线性层拟合就可以了,也不用添加其他非线性的激活函数
lr3=0.001 optimizer=optimizers.SGD(learning_rate=lr3) model3=models.Sequential() model3.add(layers.Dense(1,input_shape=(2,))) model3.compile(optimizer=optimizer,loss='mse',metrics=['mae']) model3.fit(X,Y,batch_size=20,epochs=100) tf.print(f"w={model3.layers[0].kernel}") tf.print(f"b={model3.layers[0].bias}") 复制代码
最终loss在3.62,模型拟合效果如下
w,b = model3.variables plt.figure(figsize = (12,5)) ax1 = plt.subplot(121) ax1.scatter(X[:,0],Y[:,0], c = "b",label = "samples") ax1.plot(X[:,0],w[0]*X[:,0]+b[0],"-r",linewidth = 5.0,label = "model") ax1.legend() plt.xlabel("x1") plt.ylabel("y",rotation = 0) ax2 = plt.subplot(122) ax2.scatter(X[:,1],Y[:,0], c = "g",label = "samples") ax2.plot(X[:,1],w[1]*X[:,1]+b[0],"-r",linewidth = 5.0,label = "model") ax2.legend() plt.xlabel("x2") plt.ylabel("y",rotation = 0) plt.show()
1.3 中阶API实现
高阶API实现最主要的特点就是方便简洁,几行代码就完成了模型的搭建训练,但是中阶的API使用就不再那么依赖封装好的接口,部分功能开始可以自己实现。
# 学习率 lr2=0.001 # 批次大小 batch_size2=30 model2=layers.Dense(1,input_shape=(2,)) model2.loss_func=losses.mean_squared_error model2.optimizer=optimizers.SGD(learning_rate=lr2) 复制代码
和高级API实现一样,这里只是不再需要models,而是简单一层。设置好损失函数和优化器,接下来就是自己编写训练过程。
首先写一个epoch训练的函数,然后再用循环训练epochs次
@tf.function def train_step(model, features, labels): with tf.GradientTape() as tape: predictions = model(features) loss = model.loss_func(tf.reshape(labels,[-1]), tf.reshape(predictions,[-1])) grads = tape.gradient(loss,model.variables) model.optimizer.apply_gradients(zip(grads,model.variables)) return loss # 测试train_step效果 features,labels = next(data_iter(X,Y,batch_size2)) train_step(model2,features,labels) 复制代码
借助之前说的自动求微分,我们正向得到每一次的预测值以及损失,然后根据损失对模型的参数求偏导数,得到每一个参数的梯度,最后优化器根据每个参数的梯度进行更新
有了对每一次epoch训练,然后只用循环就可以迭代训练模型了
def train_model(model,epochs): for epoch in tf.range(1,epochs+1): loss = tf.constant(0.0) for features, labels in data_iter(X,Y,batch_size2): loss = train_step(model,features,labels) if epoch%50==0: tf.print(f"========================================================= {time.asctime(time.localtime(time.time()))}") tf.print("epoch =",epoch,"loss = ",loss) tf.print("w =",model.variables[0]) tf.print("b =",model.variables[1]) train_model(model2,epochs = 200) w,b = model2.variables plt.figure(figsize = (12,5)) ax1 = plt.subplot(121) ax1.scatter(X[:,0],Y[:,0], c = "b",label = "samples") ax1.plot(X[:,0],w[0]*X[:,0]+b[0],"-r",linewidth = 5.0,label = "model") ax1.legend() plt.xlabel("x1") plt.ylabel("y",rotation = 0) ax2 = plt.subplot(122) ax2.scatter(X[:,1],Y[:,0], c = "g",label = "samples") ax2.plot(X[:,1],w[1]*X[:,1]+b[0],"-r",linewidth = 5.0,label = "model") ax2.legend() plt.xlabel("x2") plt.ylabel("y",rotation = 0) plt.show()
最终的loss在3.46左右
1.4 最基础API的实现
上面两种实现中Dense帮我们省去了构建y=w*x+by=w∗x+b
optimizer让我们可以不用自己去实现优化算法,下面不用任何封装,就用最基本的API以及一些基础知识去实现试试。
首先自己构造y=w∗x+by=w*x+by=w∗x+b,声明两个变量w,b,然后定义正向传播的计算公式x@w+bx@w+bx@w+b,这就是Dense中去掉激活函数而已;再定义损失函数,还是使用均方误差(groudtruth−predict)2N\frac{(groudtruth-predict)^2}{N}N(groudtruth−predict)2,为了求导之后把指数消掉,通常前面再乘以一个12\frac{1}{2}21
# 构建wx+b拟合函数 w = tf.Variable(tf.random.normal(w0.shape)) b = tf.Variable(tf.zeros_like(b0,dtype = tf.float32)) # 定义模型 class LinearRegression: #正向传播 def __call__(self,x): return x@w + b # 损失函数 def loss_func(self,y_true,y_pred): return tf.reduce_mean((y_true - y_pred)**2/2) model = LinearRegression() 复制代码
这样,一个基本的线性回归模型就建立好了,接下来就是优化模型参数的阶段,也就是训练模型。模型的参数量不大,所以我们这里就用最基础的梯度下降法。其中梯度依靠自动求导,梯度更新公式为:w=w−α∗∂∂wJ(w,b)w=w-\alpha*\frac{\partial}{\partial w}J(w,b)w=w−α∗∂w∂J(w,b)
其中www为权重,α\alphaα为学习率也就是每次更新下降的步长,J(w,b)J(w,b)J(w,b)是损失函数即上面的均方误差,其他训练部分和上面的训练部分类似
# 学习率 lr=0.001 # 批次大小 batch_size=20 @tf.function def train_step(model, features, labels): # 用于自动微分 with tf.GradientTape() as tape: predictions = model(features) loss = model.loss_func(labels, predictions) # 反向传播求梯度,即各系数的偏导数 dloss_dw,dloss_db = tape.gradient(loss,[w,b]) # 梯度下降法更新参数 w.assign(w - lr*dloss_dw) b.assign(b - lr*dloss_db) return loss def train_model(model,epochs): for epoch in tf.range(1,epochs+1): for features, labels in data_iter(X,Y,batch_size): loss = train_step(model,features,labels) if epoch%50==0: tf.print(f"========================================================= {time.asctime(time.localtime(time.time()))}") tf.print("epoch =",epoch,"loss = ",loss) tf.print("w =",w) tf.print("b =",b) train_model(model,epochs = 200) 复制代码
最终的loss竟然只有2.00左右,自己一个简单的梯度下降效果比其他高级的优化器效果更好,也许是方程太简单。
2. 分类问题
分类问题和回归问题流程大致类似,还是先生成数据,然后用API搭建模型并训练。唯一的不同之处在于回归问题由于是线性分布,不需要激活函数,训练时使用梯度下降也能很好的收敛拟合;但是分类时是非线性的,所以必须在每一层添加激活函数,并且训练的时候损失函数就不再是MSE了,而是使用交叉熵作为损失函数。
为什么MSE不再适用?
这是Sigmoid激活函数的原函数曲线和它的导数曲线,如果我们使用MSE作为损失函数,一开始MSE如果很大(往往都是这样),那么在一开始Sigmoid函数的导数值几乎为0,这个时候梯度下降几乎没有梯度(梯度消失),也就无法进行参数更新,最终训练失败。
这个时候交叉熵就是一个很好的分类损失函数
H(p,q)=−∑i=1np(xi)log(q(xi))H(p,q)=-\sum_{i=1}^{n}p(x_i)log(q(x_i))H(p,q)=−∑i=1np(xi)log(q(xi))
其中p(xi)p(x_i)p(xi)是事件发生的概率,q(xi)q(x_i)q(xi)是预测概率
对于二分类来说,只有0或者1两种标签,然后q=1-p,于是上面的公式可以简化为
Cross_Entropy(p,q)=−(plogq+(1−p)log(1−q))Cross\_Entropy(p,q)=-(plog{q}+(1-p)log(1-q))Cross_Entropy(p,q)=−(plogq+(1−p)log(1−q))
说了基本的原理,现在就开始实现
2.1 数据生成
#正负样本数量 n_positive,n_negative = 2000,2000 #生成正样本, 小圆环分布 r_p = 5.0 + tf.random.truncated_normal([n_positive,1],0.0,1.0) theta_p = tf.random.uniform([n_positive,1],0.0,2*np.pi) Xp = tf.concat([r_p*tf.cos(theta_p),r_p*tf.sin(theta_p)],axis = 1) Yp = tf.ones_like(r_p) #生成负样本, 大圆环分布 r_n = 8.0 + tf.random.truncated_normal([n_negative,1],0.0,1.0) theta_n = tf.random.uniform([n_negative,1],0.0,2*np.pi) Xn = tf.concat([r_n*tf.cos(theta_n),r_n*tf.sin(theta_n)],axis = 1) Yn = tf.zeros_like(r_n) #汇总样本 X = tf.concat([Xp,Xn],axis = 0) Y = tf.concat([Yp,Yn],axis = 0) #可视化 plt.figure(figsize = (6,6)) plt.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r") plt.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g") plt.legend(["正样本","负样本"]) plt.show() 复制代码
tf.random.truncated_normal()函数是截断正态分布,也就是限定随机生成正态分布数据的范围在(μ−2δ,μ+2δ)(\mu-2\delta,\mu+2\delta)(μ−2δ,μ+2δ),最终的样本长这样
2.2 高阶API实现
使用高级API依然还是几行代码就可以实现
model3=models.Sequential() model3.add(layers.Dense(4,input_shape=(2,),activation='relu')) model3.add(layers.Dense(8,activation='relu')) model3.add(layers.Dense(1,activation='sigmoid')) model3.summary() 复制代码
optimizer = optimizers.SGD(learning_rate=0.001) loss_func = tf.keras.losses.BinaryCrossentropy() model3.compile(optimizer=optimizer,loss=loss_func,metrics=['acc']) model3.fit(X,Y,batch_size=100,epochs=50) fig, (ax1,ax2) = plt.subplots(nrows=1,ncols=2,figsize = (12,5)) ax1.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r") ax1.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g") ax1.legend(["positive","negative"]); ax1.set_title("y_true"); Xp_pred = tf.boolean_mask(X,tf.squeeze(model3(X)>=0.5),axis = 0) Xn_pred = tf.boolean_mask(X,tf.squeeze(model3(X)<0.5),axis = 0) ax2.scatter(Xp_pred[:,0].numpy(),Xp_pred[:,1].numpy(),c = "r") ax2.scatter(Xn_pred[:,0].numpy(),Xn_pred[:,1].numpy(),c = "g") ax2.legend(["positive","negative"]); ax2.set_title("y_pred") plt.show() 复制代码
2.3 中阶API实现
使用中阶API主要就是不再依赖models.Sequential(),自己定义深度神经网络,然后写好里面的各层以及正向传播。实例化之后再用二元交叉熵以及优化器去优化,训练模型
class DNNModel2(tf.Module): def __init__(self,name = None): super(DNNModel2, self).__init__(name=name) self.dense1 = layers.Dense(4,activation = "relu") self.dense2 = layers.Dense(8,activation = "relu") self.dense3 = layers.Dense(1,activation = "sigmoid") # 正向传播 @tf.function(input_signature=[tf.TensorSpec(shape = [None,2], dtype = tf.float32)]) def __call__(self,x): x = self.dense1(x) x = self.dense2(x) y = self.dense3(x) return y model2 = DNNModel2() model2.loss_func = losses.binary_crossentropy model2.metric_func = metrics.binary_accuracy model2.optimizer = optimizers.Adam(learning_rate=0.001) (features,labels) = next(data_iter(X,Y,batch_size)) predictions = model2(features) loss = model2.loss_func(tf.reshape(labels,[-1]),tf.reshape(predictions,[-1])) metric = model2.metric_func(tf.reshape(labels,[-1]),tf.reshape(predictions,[-1])) tf.print("初始损失:",loss) tf.print("初始化准确率",metric) 复制代码
模型的训练方式和上面的回归问题相似
@tf.function def train_step(model, features, labels): with tf.GradientTape() as tape: predictions = model(features) loss = model.loss_func(tf.reshape(labels,[-1]), tf.reshape(predictions,[-1])) grads = tape.gradient(loss,model.trainable_variables) model.optimizer.apply_gradients(zip(grads,model.trainable_variables)) metric = model.metric_func(tf.reshape(labels,[-1]), tf.reshape(predictions,[-1])) return loss,metric # 测试train_step效果 (features,labels) = next(data_iter(X,Y,batch_size)) train_step(model2,features,labels) def train_model(model,epochs): for epoch in tf.range(1,epochs+1): loss, metric = tf.constant(0.0),tf.constant(0.0) for features, labels in data_iter(X,Y,batch_size): loss,metric = train_step(model,features,labels) if epoch%10==0: tf.print(f"========================================================= {time.asctime(time.localtime(time.time()))}") tf.print("epoch =",epoch,"loss = ",loss, "accuracy = ",metric) train_model(model2,epochs = 50) fig, (ax1,ax2) = plt.subplots(nrows=1,ncols=2,figsize = (12,5)) ax1.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r") ax1.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g") ax1.legend(["positive","negative"]); ax1.set_title("y_true"); Xp_pred = tf.boolean_mask(X,tf.squeeze(model2(X)>=0.5),axis = 0) Xn_pred = tf.boolean_mask(X,tf.squeeze(model2(X)<0.5),axis = 0) ax2.scatter(Xp_pred[:,0].numpy(),Xp_pred[:,1].numpy(),c = "r") ax2.scatter(Xn_pred[:,0].numpy(),Xn_pred[:,1].numpy(),c = "g") ax2.legend(["positive","negative"]); ax2.set_title("y_pred") plt.show()
2.4 低阶API实现
上面实现中除了最后分类层以外每一个层其实都只是在完成一件事,即计算Y=relu(w∗x+b)Y=relu(w*x+b)Y=relu(w∗x+b),最后一层为了分类将relu改成sigmoid,其他不变;所以我们在低级实现中,老老实实定义变量,然后前向传播,反向计算更新这些参数。然后loss_func也就是二元交叉熵就用我们一开始讲的简化公式
class DNNModel(tf.Module): def __init__(self,name = None): super(DNNModel, self).__init__(name=name) self.w1 = tf.Variable(tf.random.truncated_normal([2,4]),dtype = tf.float32) self.b1 = tf.Variable(tf.zeros([1,4]),dtype = tf.float32) self.w2 = tf.Variable(tf.random.truncated_normal([4,8]),dtype = tf.float32) self.b2 = tf.Variable(tf.zeros([1,8]),dtype = tf.float32) self.w3 = tf.Variable(tf.random.truncated_normal([8,1]),dtype = tf.float32) self.b3 = tf.Variable(tf.zeros([1,1]),dtype = tf.float32) # 正向传播 @tf.function(input_signature=[tf.TensorSpec(shape = [None,2], dtype = tf.float32)]) def __call__(self,x): x = tf.nn.relu(x@self.w1 + self.b1) x = tf.nn.relu(x@self.w2 + self.b2) y = tf.nn.sigmoid(x@self.w3 + self.b3) return y # 损失函数(二元交叉熵) @tf.function(input_signature=[tf.TensorSpec(shape = [None,1], dtype = tf.float32), tf.TensorSpec(shape = [None,1], dtype = tf.float32)]) def loss_func(self,y_true,y_pred): #将预测值限制在 1e-7 以上, 1 - 1e-7 以下,避免log(0)错误 eps = 1e-7 y_pred = tf.clip_by_value(y_pred,eps,1.0-eps) bce = - y_true*tf.math.log(y_pred) - (1-y_true)*tf.math.log(1-y_pred) return tf.reduce_mean(bce) # 评估指标(准确率) @tf.function(input_signature=[tf.TensorSpec(shape = [None,1], dtype = tf.float32), tf.TensorSpec(shape = [None,1], dtype = tf.float32)]) def metric_func(self,y_true,y_pred): y_pred = tf.where(y_pred>0.5,tf.ones_like(y_pred,dtype = tf.float32), tf.zeros_like(y_pred,dtype = tf.float32)) acc = tf.reduce_mean(1-tf.abs(y_true-y_pred)) return acc batch_size = 10 (features,labels) = next(data_iter(X,Y,batch_size)) # 模型实例化 model = DNNModel() predictions = model(features) loss = model.loss_func(labels,predictions) metric = model.metric_func(labels,predictions) tf.print("初始损失:",loss) tf.print("初始化准确率",metric) 复制代码
根据loss_func对所有可训练参数,也就是我们自己定义的那些参数进行训练,更新方式依旧是w=w−α∗∂∂ww=w-\alpha *\frac{\partial}{\partial w}w=w−α∗∂w∂
# 开始训练 lr=0.005 @tf.function def train_step(model, features, labels): # 正向传播求损失 with tf.GradientTape() as tape: predictions = model(features) loss = model.loss_func(labels, predictions) # 反向传播求梯度 grads = tape.gradient(loss, model.trainable_variables) # 执行梯度下降 for p, dloss_dp in zip(model.trainable_variables,grads): p.assign(p - lr*dloss_dp) # 计算评估指标 metric = model.metric_func(labels,predictions) return loss, metric def train_model(model,epochs): for epoch in tf.range(1,epochs+1): for features, labels in data_iter(X,Y,150): loss,metric = train_step(model,features,labels) if epoch%100==0: tf.print(f"========================================================= {time.asctime(time.localtime(time.time()))}") tf.print("epoch =",epoch,"loss = ",loss, "accuracy = ", metric) train_model(model,epochs = 600) fig, (ax1,ax2) = plt.subplots(nrows=1,ncols=2,figsize = (12,5)) ax1.scatter(Xp[:,0],Xp[:,1],c = "r") ax1.scatter(Xn[:,0],Xn[:,1],c = "g") ax1.legend(["positive","negative"]); ax1.set_title("y_true"); Xp_pred = tf.boolean_mask(X,tf.squeeze(model(X)>=0.5),axis = 0) Xn_pred = tf.boolean_mask(X,tf.squeeze(model(X)<0.5),axis = 0) ax2.scatter(Xp_pred[:,0],Xp_pred[:,1],c = "r") ax2.scatter(Xn_pred[:,0],Xn_pred[:,1],c = "g") ax2.legend(["positive","negative"]); ax2.set_title("y_pred") plt.show() 复制代码
结束
今天算是把如何搭建一个模型重新复习了一遍,其中有一些重要的细节比如input_shape的大小还有Dense中的值的含义,这些常用的API会在后续细细的讲。这么多种模型搭建方式,复习一遍之后对调参技巧更加熟练,加深了对模型实现原理的理解。进度比我预期的还是慢了一点,之后应该还要加快点。