import time
import tensorlayer as tl
import tensorflow as tf
from tensorlayer.layers import *
import gzip
import os
import numpy as np
import logging
def load_mnist_dataset_by_q(shape=(-1, 784), path="data"):
path = os.path.join(path, 'MNIST_data/')
def load_mnist_images(path, filename):
# filepath = maybe_download_and_extract(filename, path, 'http://yann.lecun.com/exdb/mnist/')
filepath = path + filename
logging.info(filepath)
# Read the inputs in Yann LeCun's binary format.
with gzip.open(filepath, 'rb') as f:
data = np.frombuffer(f.read(), np.uint8, offset=16)
# The inputs are vectors now, we reshape them to monochrome 2D images,
# following the shape convention: (examples, channels, rows, columns)
data = data.reshape(shape)
# The inputs come as bytes, we convert them to float32 in range [0,1].
# (Actually to range [0, 255/256], for compatibility to the version
# provided at http://deeplearning.net/data/mnist/mnist.pkl.gz.)
return data / np.float32(256)
def load_mnist_labels(path, filename):
# filepath = maybe_download_and_extract(filename, path, 'http://yann.lecun.com/exdb/mnist/')
filepath = path + filename
# Read the labels in Yann LeCun's binary format.
with gzip.open(filepath, 'rb') as f:
data = np.frombuffer(f.read(), np.uint8, offset=8)
# The labels are vectors of integers now, that's exactly what we want.
return data
# Download and read the training and test set images and labels.
logging.info("Load or Download MNIST > {}".format(path))
X_train = load_mnist_images(path, 'train-images-idx3-ubyte.gz')
y_train = load_mnist_labels(path, 'train-labels-idx1-ubyte.gz')
X_test = load_mnist_images(path, 't10k-images-idx3-ubyte.gz')
y_test = load_mnist_labels(path, 't10k-labels-idx1-ubyte.gz')
# We reserve the last 10000 training examples for validation.
X_train, X_val = X_train[:-10000], X_train[-10000:]
y_train, y_val = y_train[:-10000], y_train[-10000:]
# We just return all the arrays in order, as expected in main().
# (It doesn't matter how we do this as long as we can read them again.)
X_train = np.asarray(X_train, dtype=np.float32)
y_train = np.asarray(y_train, dtype=np.int32)
X_val = np.asarray(X_val, dtype=np.float32)
y_val = np.asarray(y_val, dtype=np.int32)
X_test = np.asarray(X_test, dtype=np.float32)
y_test = np.asarray(y_test, dtype=np.int32)
return X_train, y_train, X_val, y_val, X_test, y_test
X_train, y_train, X_val, y_val, X_test, y_test = load_mnist_dataset_by_q(shape=(-1,28,28,1),path='/Users/qyk/Desktop/py/tensorflow/MNIST')
print('X_train.shape', X_train.shape)
print('y_train.shape', y_train.shape)
print('X_val.shape', X_val.shape)
print('y_val.shape', y_val.shape)
print('X_test.shape', X_test.shape)
print('y_test.shape', y_test.shape)
print('X %s y %s' % (X_test.dtype, y_test.dtype))
sess = tf.InteractiveSession()
batch_size = 128
x = tf.placeholder(tf.float32,shape=[batch_size,28,28,1],name='x')
# x = tf.placeholder(tf.float32,shape=[-1,28,28,1],name='x')
y_ = tf.placeholder(tf.int64,shape=[batch_size,],name='y_')
network = InputLayer(x, name='input')
network = Conv2d(network,n_filter=32,filter_size=(5,5),strides=(1,1),act=tf.nn.relu,padding='SAME',name='cnn1')
network = MaxPool2d(network,filter_size=(2,2),strides=(2,2),padding='SAME',name='pool1')
network = Conv2d(network,n_filter=64,filter_size=(5,5),strides=(1,1),act=tf.nn.relu,padding='SAME',name='cnn2')
network = MaxPool2d(network,filter_size=(2,2),strides=(2,2),padding='SAME',name='pool2')
network = FlattenLayer(layer=network,name='flatten')
network = DropoutLayer(layer=network,keep=0.5,name='drop1')
network = DenseLayer(layer=network,n_units=256,act=tf.nn.relu,name='relu1')
network = DropoutLayer(layer=network,keep=0.5,name='drop2')
network = DenseLayer(layer=network,n_units=10,act=tf.identity,name='output')
y = network.outputs
cost = tl.cost.cross_entropy(y,y_,name='cost')
correct_prediction = tf.equal(tf.argmax(y,1),y_)
accuracy = tf.reduce_mean(tf.cast(correct_prediction,tf.float32))
n_epoch = 200
learning_rate = 0.0001
print_freq = 10
train_params = network.all_params
train_op = tf.train.AdamOptimizer(learning_rate).minimize(loss=cost,var_list=train_params)
tl.layers.initialize_global_variables(sess)
network.print_params()
network.print_layers()
for epoch in range(n_epoch):
start_time = time.time()
for X_train_a, y_train_a in tl.iterate.minibatches(X_train, y_train, batch_size=batch_size, shuffle=True):
feed_dict = {x:X_train_a, y_:y_train_a}
feed_dict.update(network.all_drop)
sess.run(fetches=train_op,feed_dict=feed_dict)
if epoch + 1 == 1 or (epoch + 1) % print_freq == 0:
print('Epoch %d of %d took %fs'%(epoch + 1, n_epoch, time.time() - start_time))
train_loss, train_acc, n_batch = 0, 0, 0
for X_train_a, y_train_a in tl.iterate.minibatches(X_train, y_train, batch_size=batch_size, shuffle=True):
dp_dict = tl.utils.dict_to_one(network.all_drop)
feed_dict = {x: X_train_a, y_: y_train_a}
feed_dict.update(dp_dict)
err, ac = sess.run(fetches=[cost,accuracy], feed_dict=feed_dict)
train_loss += err; train_acc += ac; n_batch += 1
print(" train loss: %f" % (train_loss / n_batch))
print(" train acc: %f" % (train_acc / n_batch))
val_loss, val_acc, n_batch = 0, 0, 0
for X_val_a, y_val_a in tl.iterate.minibatches(X_val, y_val, batch_size, shuffle=True):
dp_dict = tl.utils.dict_to_one(network.all_drop) # disable noise layers
feed_dict = {x: X_val_a, y_: y_val_a}
feed_dict.update(dp_dict)
err, ac = sess.run([cost, accuracy], feed_dict=feed_dict)
val_loss += err
val_acc += ac
n_batch += 1
print(" val loss: %f" % (val_loss / n_batch))
print(" val acc: %f" % (val_acc / n_batch))
test_loss, test_acc, n_batch = 0, 0, 0
for X_test_a, y_test_a in tl.iterate.minibatches(X_test, y_test, batch_size, shuffle=True):
dp_dict = tl.utils.dict_to_one(network.all_drop) # disable noise layers
feed_dict = {x: X_test_a, y_: y_test_a}
feed_dict.update(dp_dict)
err, ac = sess.run([cost, accuracy], feed_dict=feed_dict)
test_loss += err
test_acc += ac
n_batch += 1
print(" test loss: %f" % (test_loss / n_batch))
print(" test acc: %f" % (test_acc / n_batch))