CIFAR10案例卷积神经⽹络实践
按课程案例,动⼿完成编编码实践。
可尝试采⽤不同的学习率、单批次样本数、训练轮数等超参数,或是改变模型结构(例如:增加卷积层、池化层或全连接层的数量)让模型的准确率达到70%。
提交要求:
1、你认为最优的⼀次带运⾏结果的源代码⽂件(.ipynb 格式)
2、作为附件上传
评分标准:
1、完成CIFAR10图像识别问题的卷积神经⽹络的建模与应⽤(必须采⽤卷积神经⽹络),有完整的代码,模型能运⾏,准确率达60%以上;得6分;每下降⼀个百分点,少得1分;
2、准确率达65%以上;再得2分,否则再得0分;
3、准确率到70%以上;再得2分,否则再得0分;
4、之所以只提70%的准确率作为满分的标准,不是说只能达到这个识别率,⽽是考虑同学们设备的算⼒和时间,有GPU的会很快,如果只是CPU的还是需要⼀些时间的。
注:之所以不跑了,因为代码迭代了80轮跑了我的电脑2个⼩时,电脑⽤了7-8年了,太卡了,CPU完全跑不动。
%matplotlib inline
import matplotlib.pyplot as plt
import pickle as p
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import OneHotEncoder
import tensorflowpat.v1 as tf
import os
from time import time
tf.disable_eager_execution()
#忽略级别2及以下信息(级别1提⽰,级别2警告)
class Operation:
#定义共享函数
def weight(self,shape):#self相当于c++的this指针
return tf.uncated_normal(shape,stddev=0.1),name="W")
#定义偏置
#初始化值为1
def bias(self,shape):
return tf.stant(0.1,shape=shape),name="b")
#定义卷积操作
#步长为1,padding为"SAME"
def conv2d(self,x,W):
v2d(x,W,strides=[1,1,1,1],padding="SAME")
#定义池化操作
#步长为2
def max_pool_2x2(self,x):
ax_pool(x,ksize=[1,2,2,1],strides=[1,2,2,1],padding="SAME")
class ImageType(Operation):
def__init__(self,data_dir):
import pickleself.data_dir = data_dir
#定义标签字典,每⼀个数字所代表的图像类别的名称
self.label_dict ={0:"airplane",1:"automobile",2:"birld",3:"cat",4:"deer",
5:"dog",6:"frog",7:"horse",8:"ship",9:"truch"}
self.Xtrain,self.Ytrain,self.Xtest,self.Ytest = self._load_CIFAR_data(self.data_dir)
def_load_CIFAR_batch(self,filename):
with open(filename,"rb")as f:
data_dict = p.load(f,encoding="bytes")
images = data_dict[b'data']
labels = data_dict[b'labels']
#把原始数据结构调整为BCWH
images = shape(10000,3,32,32)
#tensorflow处理图像数据的结构BWHC
#把通道数据C移动到最后⼀个维度
images = anspose(0,2,3,1)
labels = np.array(labels)
return images,labels
#内部⽅法,加载数据
def_load_CIFAR_data(self,data_dir):
images_train =[]
labels_train =[]
for i in range(5):
f = os.path.join(data_dir,"data_batch_%d"%(i+1))
print("loading ",f)
#调⽤load_CIFAR_batch()获取批量的图像及其对应的标签
image_batch,label_batch = self._load_CIFAR_batch(f)
images_train.append(image_batch)
labels_train.append(label_batch)
Xtrain = np.concatenate(images_train)
Ytrain = np.concatenate(labels_train)
del image_batch,label_batch
Xtest,Ytest = self._load_CIFAR_batch(os.path.join(data_dir,"test_batch")) print("finished loading CIFAR_10 data")
#返回训练集的图像和标签,测试集的图像和标签
return Xtrain,Ytrain,Xtest,Ytest
#定义显⽰图像的数据及其对应的标签的函数
def plot_images_labels_prediction(self,images,labels,prediction,idx,num=10):        fig = f()
fig.set_size_inches(12,6)
if num >10:
num =10
for i in range(num):
ax = plt.subplot(2,5,i+1)
ax.imshow(images[idx],cmap="binary")
title =str(i)+","+self.label_dict[labels[idx]]
if len(prediction)>0:
title +="=>"+ self.label_dict[prediction[idx]]
ax.set_title(title,fontsize=10)
idx +=1
plt.show()
#对图像进⾏数字标准化
def normalize(self):
self.Xtrain_normalize = self.Xtrain.astype("float32")/255.0
self.Xtest_normalize = self.Xtest.astype("float32")/255.0
#标签数据预处理——独热编码
def Encode(self):
encoder = OneHotEncoder(sparse =False)
yy =[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]
encoder.fit(yy)
self.Ytrain_reshape = shape(-1,1)
self.Ytrain_onehot = ansform(self.Ytrain_reshape)
self.Ytrain_onehot = ansform(self.Ytrain_reshape)
self.Ytest_reshape = shape(-1,1)
self.Ytest_onehot = ansform(self.Ytest_reshape)
#定义⽹络结构
def define(self):
#输⼊层
#32*32,通道为3(RGB)
with tf.name_scope("input_layer"):
self.x = tf.placeholder("float",shape=[None,32,32,3],name="X")
#第1个卷积层
#输⼊通道:3,输出通道:32,卷积后图像尺⼨不变,仍然是32*32
with tf.name_scope("conv_1"):
W1 = self.weight([3,3,3,32])
b1 = self.bias([32])
conv_1 = v2d(self.x,W1)+ b1
conv_1 = lu(conv_1)
#第1个池化层
#将32*32图像缩⼩为16*16,池化不改变通道数量,因此依然是32个
with tf.name_scope("pool_1"):
pool_1 = self.max_pool_2x2(conv_1)
#第2个卷积层
#输⼊通道:32,输出通道:64,卷积后图像尺⼨不变,依然是16*16
with tf.name_scope("conv_2"):
W2 = self.weight([3,3,32,64])
b2 = self.bias([64])
conv_2 = v2d(pool_1,W2)+ b2
conv_2 = lu(conv_2)
#第2个池化层
#将16*16图像缩⼩为8*8,池化不改变通道数量,因此依然是64个
with tf.name_scope("pool_2"):
pool_2 = self.max_pool_2x2(conv_2)
#全连接层
#将池第2个池化层的64个8*8的图像转换为⼀维的向量,长度是64*8*8
#128个神经元
with tf.name_scope("fc"):
W3 = self.weight([4096,128])
b3 = self.bias([128])
flat = tf.reshape(pool_2,[-1,4096])
h = lu(tf.matmul(flat,W3)+ b3)
h_dropout = tf.nn.dropout(h,keep_prob=0.8)
#输出层
#输出层有10个神经元,对应0-9这10个类别
with tf.name_scope("output_layer"):
W4 = self.weight([128,10])
b4 = self.bias([10])
self.pred = tf.nn.softmax(tf.matmul(h_dropout,W4)+b4)
#构建模型
def Model(self):
with tf.name_scope("optimizer"):
#定义占位符
self.y = tf.placeholder("float",shape=[None,10],name="label")
#定义损失函数
self.loss_function = tf.reduce_softmax_cross_entropy_with_logits(logits=self.pred,labels=self.y)) #选择优化器
self.optimizer = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(self.loss_function)
#定义准确率
with tf.name_scope("evaluation"):
correct_prediction = tf.equal(tf.argmax(self.pred,1),tf.argmax(self.y,1))
self.accuracy = tf.reduce_mean(tf.cast(correct_prediction,"float"))
#启动会话
def Sess(self):
self.batch_size =40
self.epoch_list =[]
self.accuracy_list =[]
self.loss_list =[]
self.epoch = tf.Variable(0,name="epoch",trainable=False)
self.startTime = time()
self.sess = tf.Session()
self.init = tf.global_variables_initializer()
self.sess.run(self.init)
#断点训练
def PointTrain(self):
#设置检查点存储⽬录
self.ckpt_dir ="D:\\log"
if not ists(self.ckpt_dir):
os.makedirs(self.ckpt_dir)#创建⽬录
#⽣成saver
self.saver = tf.train.Saver(max_to_keep=1)
#如果有检查点⽂件,读取最新的检查点⽂件,回复各种变量
ckpt = tf.train.latest_checkpoint(self.ckpt_dir)
if ckpt !=None:
store(self.sess,ckpt)#加载所有的参数
else:
print("Training from scratch")
#获取续练参数
self.start = self.sess.run(self.epoch)
print("Training starts from {} epoch".format(self.start+1))
def get_train_batch(self,number,batch_size):
return self.Xtrain_normalize[number*batch_size:(number+1)*batch_size],\
self.Ytrain_onehot[number*batch_size:(number+1)*batch_size]
#迭代训练
def Iteration(self):
for ep in range(self.ain_epochs):
for i in al_batch):
batch_x,batch_y = _train_batch(i,self.batch_size)
self.sess.run(self.optimizer,feed_dict={self.x:batch_x,self.y:batch_y})
if i%100==0:
print("Step{:-3d} finished".format(i))
loss,acc = self.sess.run([self.loss_function,self.accuracy],feed_dict={self.x:batch_x,self.y:batch_y})            self.epoch_list.append(ep+1)
self.loss_list.append(loss)
self.accuracy_list.append(acc)
print("Train epoch:{:<4d}loss={:<10.6f}Accuracy={}".format(self.sess.run(self.epoch+1),loss,acc)) #保存检查点
self.saver.save(self.sess,"D://log./CIFAR10_cnm_model.cpkt",global_step = ep+1)
self.saver.save(self.sess,"D://log./CIFAR10_cnm_model.cpkt",global_step = ep+1)
self.sess.run(self.epoch.assign(ep+1))
duration = time()- self.startTime
print("Train finished takes:",duration)
#可视化准确率
def Plot(self):
plt.plot(self.epoch_list,self.accuracy_list,label="accuracy")
fig = f()
fig.set_size_inches(4,2)
plt.ylim(0,1,1)
plt.ylabel("accuracy")
plt.xlabel("epoch")
plt.legend()
plt.show()
#计算测试集上的准确率
def Calcalate(self):
test_total_batch =int(len(self.Xtest_normalize)/self.batch_size)
test_acc_sum =0.0
for i in range(test_total_batch):
test_image_batch =self.Xtest_normalize[i*self.batch_size:(i+1)*self.batch_size]
test_label_batch =self.Ytest_onehot[i*self.batch_size:(i+1)*self.batch_size]
test_batch_acc = self.sess.run(self.accuracy,feed_dict={self.x:test_image_batch,self.y:test_label_batch})            test_acc_sum += test_batch_acc
test_acc =float(test_acc_sum/test_total_batch)
print("\n")
print("Test accuracy{:6f}".format(test_acc))
#利⽤模型进⾏预测
def Prediction(self):
test_pred = self.sess.run(self.pred,feed_dict={self.x:self.Xtest_normalize[:10]})
prediction_result = self.sess.run(tf.argmax(test_pred,1))
#可视化预测结果
self.plot_images_labels_prediction(self.Xtest,self.Ytest,prediction_result,0,10)
if __name__ =="__main__":
data_dir = r"D:\编程代码\python程序\cifar-10-batches-py"
I = ImageType(data_dir)
#显⽰图像数据及其对应的标签
I.plot_images_labels_prediction(I.Xtest,I.Ytest,[],1,10)
#对图像进⾏数字标准化
#标签数据预处理——独热编码
I.Encode()
#定义⽹络结构
I.define()
#构建模型
I.Model()
#启动会话
I.Sess()
#断点训练
I.PointTrain()
#迭代训练
I.Iteration()
#可视化准确率
I.Plot()
#计算测试集上的准确率
I.Calcalate()