03_logreg_placeholder.py
代码有详细的注释:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Date : 2018-06-05 17:00:43 # @Author : quincy # @Email :yanqiang@cyou-inc.com import os os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' import numpy as np import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data import time import utils # 定义模型的参数 learning_rate = 0.01 batch_size = 128 n_epochs = 30 # 步骤1:读取数据 # batch_size:每批数据量的大小。 # DL通常用SGD的优化算法进行训练,也就是一次(1 个iteration)一起训练batchsize个样本,计算它们的平均损失函数值,来更新参数。 mnist = input_data.read_data_sets('data/mnist', one_hot=True) # mnist = input_data.read_data_sets("MNIST_data/", one_hot=True) 这个使用tensorflow框架下载,效果一样 X_batch, Y_batch = mnist.train.next_batch(batch_size) # 步骤2:为特征和标签创建占位符placeholder # 每张图片是28*28=784,因此一张图片用1x784的tensor标示 # 每张图片属于10类之一,0-9十个数字,每个标签是一个one hot 向量,比如图片“1”,[0,1,0,0,0,0,0,0,0,0] X = tf.placeholder(tf.float32, [batch_size, 784], name='image') Y = tf.placeholder(tf.float32, [batch_size, 10], name='label') # 步骤3:创建权重和偏置 # w为随机变量,服从平均值为0,标准方差(stddev)为0.01的正态分布 # b初始化为0 # w的shape取决于X和Y Y = tf.matmul(X, w) # b的shape取决于Y # Y=Xw+b [1,10]=[1,784][784,10]+[1,10] w = tf.get_variable(name='weights', shape=(784, 10), initializer=tf.random_normal_initializer(mean=0, stddev=0.01)) b = tf.get_variable(name='bias', shape=(1, 10), initializer=tf.zeros_initializer()) # 步骤4:创建模型 logits = tf.matmul(X, w) + b # 步骤5:定义损失函数 entropy = tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=Y, name='loss') loss = tf.reduce_mean(entropy) # 计算一个batch下的loss平均值 # loss = tf.reduce_mean(-tf.reduce_sum(tf.nn.softmax(logits) * tf.log(Y), reduction_indices=[1])) # 步骤6:定义训练optimizer optimizer = tf.train.AdamOptimizer(learning_rate).minimize(loss) # 步骤7:计算测试集的准确度 preds = tf.nn.softmax(logits) correct_preds = tf.equal(tf.argmax(preds, 1), tf.argmax(Y, 1)) accuracy = tf.reduce_sum(tf.cast(correct_preds, tf.float32)) writer = tf.summary.FileWriter('./graphs/logreg_placeholder', tf.get_default_graph()) with tf.Session() as sess: start_time = time.time() sess.run(tf.global_variables_initializer()) n_batches = int(mnist.train.num_examples / batch_size) print(n_batches) # 训练模型 n_epochs 次 for i in range(n_epochs): total_loss = 0 for j in range(n_batches): X_batch, Y_batch = mnist.train.next_batch(batch_size) _, loss_batch = sess.run([optimizer, loss], {X: X_batch, Y: Y_batch}) total_loss += loss_batch print("Average loss opoch {0}:{1}".format(i, total_loss / n_batches)) print("Total time:{0}seconds".format(time.time() - start_time)) # 测试模型 n_batches = int(mnist.test.num_examples / batch_size) total_correct_preds = 0 for i in range(n_batches): X_batch, Y_batch = mnist.test.next_batch(batch_size) accuracy_batch = sess.run(accuracy, {X: X_batch, Y: Y_batch}) total_correct_preds += accuracy_batch print('Accuracy {0}'.format(total_correct_preds / mnist.test.num_examples)) writer.close()
实验结果
实验结果
utils.py
import os import gzip import shutil import struct import urllib os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' from matplotlib import pyplot as plt import numpy as np import tensorflow as tf def huber_loss(labels, predictions, delta=14.0): residual = tf.abs(labels - predictions) def f1(): return 0.5 * tf.square(residual) def f2(): return delta * residual - 0.5 * tf.square(delta) return tf.cond(residual < delta, f1, f2) def safe_mkdir(path): """ Create a directory if there isn't one already. """ try: os.mkdir(path) except OSError: pass def read_birth_life_data(filename): """ Read in birth_life_2010.txt and return: data in the form of NumPy array n_samples: number of samples """ text = open(filename, 'r').readlines()[1:] data = [line[:-1].split('\t') for line in text] births = [float(line[1]) for line in data] lifes = [float(line[2]) for line in data] data = list(zip(births, lifes)) n_samples = len(data) data = np.asarray(data, dtype=np.float32) return data, n_samples def download_one_file(download_url, local_dest, expected_byte=None, unzip_and_remove=False): """ Download the file from download_url into local_dest if the file doesn't already exists. If expected_byte is provided, check if the downloaded file has the same number of bytes. If unzip_and_remove is True, unzip the file and remove the zip file """ if os.path.exists(local_dest) or os.path.exists(local_dest[:-3]): print('%s already exists' % local_dest) else: print('Downloading %s' % download_url) local_file, _ = urllib.request.urlretrieve(download_url, local_dest) file_stat = os.stat(local_dest) if expected_byte: if file_stat.st_size == expected_byte: print('Successfully downloaded %s' % local_dest) if unzip_and_remove: with gzip.open(local_dest, 'rb') as f_in, open(local_dest[:-3], 'wb') as f_out: shutil.copyfileobj(f_in, f_out) os.remove(local_dest) else: print('The downloaded file has unexpected number of bytes') def download_mnist(path): """ Download and unzip the dataset mnist if it's not already downloaded Download from http://yann.lecun.com/exdb/mnist """ safe_mkdir(path) url = 'http://yann.lecun.com/exdb/mnist/' filenames = ['train-images-idx3-ubyte.gz', 'train-labels-idx1-ubyte.gz', 't10k-images-idx3-ubyte.gz', 't10k-labels-idx1-ubyte.gz'] expected_bytes = [9912422, 28881, 1648877, 4542] for filename, byte in zip(filenames, expected_bytes): download_url = os.path.join(url, filename) local_dest = os.path.join(path, filename) download_one_file(download_url, local_dest, byte, True) def parse_data(path, dataset, flatten): if dataset != 'train' and dataset != 't10k': raise NameError('dataset must be train or t10k') label_file = os.path.join(path, dataset + '-labels-idx1-ubyte') with open(label_file, 'rb') as file: _, num = struct.unpack(">II", file.read(8)) labels = np.fromfile(file, dtype=np.int8) # int8 new_labels = np.zeros((num, 10)) new_labels[np.arange(num), labels] = 1 img_file = os.path.join(path, dataset + '-images-idx3-ubyte') with open(img_file, 'rb') as file: _, num, rows, cols = struct.unpack(">IIII", file.read(16)) imgs = np.fromfile(file, dtype=np.uint8).reshape(num, rows, cols) # uint8 imgs = imgs.astype(np.float32) / 255.0 if flatten: imgs = imgs.reshape([num, -1]) return imgs, new_labels def read_mnist(path, flatten=True, num_train=55000): """ Read in the mnist dataset, given that the data is stored in path Return two tuples of numpy arrays ((train_imgs, train_labels), (test_imgs, test_labels)) """ imgs, labels = parse_data(path, 'train', flatten) indices = np.random.permutation(labels.shape[0]) train_idx, val_idx = indices[:num_train], indices[num_train:] train_img, train_labels = imgs[train_idx, :], labels[train_idx, :] val_img, val_labels = imgs[val_idx, :], labels[val_idx, :] test = parse_data(path, 't10k', flatten) return (train_img, train_labels), (val_img, val_labels), test def get_mnist_dataset(batch_size): # Step 1: Read in data mnist_folder = 'data/mnist' download_mnist(mnist_folder) train, val, test = read_mnist(mnist_folder, flatten=False) # Step 2: Create datasets and iterator train_data = tf.data.Dataset.from_tensor_slices(train) train_data = train_data.shuffle(10000) # if you want to shuffle your data train_data = train_data.batch(batch_size) test_data = tf.data.Dataset.from_tensor_slices(test) test_data = test_data.batch(batch_size) return train_data, test_data def show(image): """ Render a given numpy.uint8 2D array of pixel data. """ plt.imshow(image, cmap='gray') plt.show()