标签:
在搞清楚卷积神经网络(CNN)的原理之后,在本篇博文中,我们将讨论基于Theano的算法实现技术。我们还将以MNIST手写数字识别为例,创建卷积神经网络(CNN),训练该网络,使识别误差达到1%以内。
我们首先需要读入MNIST手写数字识别的训练样本集,为此我们定义了一个工具类:
from __future__ import print_function __docformat__ = ‘restructedtext en‘ import six.moves.cPickle as pickle import gzip import os import sys import timeit import numpy import theano import theano.tensor as T class MnistLoader(object): def load_data(self, dataset): data_dir, data_file = os.path.split(dataset) if data_dir == "" and not os.path.isfile(dataset): new_path = os.path.join( os.path.split(__file__)[0], "..", "data", dataset ) if os.path.isfile(new_path) or data_file == ‘mnist.pkl.gz‘: dataset = new_path if (not os.path.isfile(dataset)) and data_file == ‘mnist.pkl.gz‘: from six.moves import urllib origin = ( ‘http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz‘ ) print(‘Downloading data from %s‘ % origin) urllib.request.urlretrieve(origin, dataset) print(‘... loading data‘) # Load the dataset with gzip.open(dataset, ‘rb‘) as f: try: train_set, valid_set, test_set = pickle.load(f, encoding=‘latin1‘) except: train_set, valid_set, test_set = pickle.load(f) def shared_dataset(data_xy, borrow=True): data_x, data_y = data_xy shared_x = theano.shared(numpy.asarray(data_x, dtype=theano.config.floatX), borrow=borrow) shared_y = theano.shared(numpy.asarray(data_y, dtype=theano.config.floatX), borrow=borrow) return shared_x, T.cast(shared_y, ‘int32‘) test_set_x, test_set_y = shared_dataset(test_set) valid_set_x, valid_set_y = shared_dataset(valid_set) train_set_x, train_set_y = shared_dataset(train_set) rval = [(train_set_x, train_set_y), (valid_set_x, valid_set_y), (test_set_x, test_set_y)] return rval这个类在之前我们已经用过,在这里就不详细讲解了。之所以单独定义这个类,是因为如果我们将问题换为其他类型时,我们只需要修改这一个类,就可以实现训练数据的载入了,这样简化了程序修改工作量。
我们所采用的方法是将图像先接入卷积神经网络,之后再接入BP网络的隐藏层,然后再接入逻辑回归的输出层,因此我们需要先定义多层前向网络的隐藏层和逻辑回归输出层。隐藏层的定义如下所示:
from __future__ import print_function __docformat__ = ‘restructedtext en‘ import os import sys import timeit import numpy import theano import theano.tensor as T from logistic_regression import LogisticRegression # start-snippet-1 class HiddenLayer(object): def __init__(self, rng, input, n_in, n_out, W=None, b=None, activation=T.tanh): self.input = input if W is None: W_values = numpy.asarray( rng.uniform( low=-numpy.sqrt(6. / (n_in + n_out)), high=numpy.sqrt(6. / (n_in + n_out)), size=(n_in, n_out) ), dtype=theano.config.floatX ) if activation == theano.tensor.nnet.sigmoid: W_values *= 4 W = theano.shared(value=W_values, name=‘W‘, borrow=True) if b is None: b_values = numpy.zeros((n_out,), dtype=theano.config.floatX) b = theano.shared(value=b_values, name=‘b‘, borrow=True) self.W = W self.b = b lin_output = T.dot(input, self.W) + self.b self.output = ( lin_output if activation is None else activation(lin_output) ) # parameters of the model self.params = [self.W, self.b]接下来我们定义逻辑回归算法类:
from __future__ import print_function __docformat__ = ‘restructedtext en‘ import six.moves.cPickle as pickle import gzip import os import sys import timeit import numpy import theano import theano.tensor as T class LogisticRegression(object): def __init__(self, input, n_in, n_out): self.W = theano.shared( value=numpy.zeros( (n_in, n_out), dtype=theano.config.floatX ), name=‘W‘, borrow=True ) self.b = theano.shared( value=numpy.zeros( (n_out,), dtype=theano.config.floatX ), name=‘b‘, borrow=True ) self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b) self.y_pred = T.argmax(self.p_y_given_x, axis=1) self.params = [self.W, self.b] self.input = input print("Yantao: ***********************************") def negative_log_likelihood(self, y): return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y]) def errors(self, y): if y.ndim != self.y_pred.ndim: raise TypeError( ‘y should have the same shape as self.y_pred‘, (‘y‘, y.type, ‘y_pred‘, self.y_pred.type) ) if y.dtype.startswith(‘int‘): return T.mean(T.neq(self.y_pred, y)) else: raise NotImplementedError()
这段代码在逻辑回归博文中已经详细讨论过了,这里就不再重复了,有兴趣的读者可以查看这篇博文(逻辑回归算法实现)。
做完上述准备工作之后,我们就可以开始卷积神经网络(CNN)实现了。我们先来定义基于简化版Lenet5的卷积神经网络(CNN)的定义,代码如下所示:
from __future__ import print_function import os import sys import timeit import numpy import theano import theano.tensor as T from theano.tensor.signal import pool from theano.tensor.nnet import conv2d class LeNetConvPoolLayer(object): def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2, 2)): assert image_shape[1] == filter_shape[1] self.input = input fan_in = numpy.prod(filter_shape[1:]) fan_out = (filter_shape[0] * numpy.prod(filter_shape[2:]) // numpy.prod(poolsize)) W_bound = numpy.sqrt(6. / (fan_in + fan_out)) self.W = theano.shared( numpy.asarray( rng.uniform(low=-W_bound, high=W_bound, size=filter_shape), dtype=theano.config.floatX ), borrow=True ) b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX) self.b = theano.shared(value=b_values, borrow=True) conv_out = conv2d( input=input, filters=self.W, filter_shape=filter_shape, input_shape=image_shape ) pooled_out = pool.pool_2d( input=conv_out, ds=poolsize, ignore_border=True ) self.output = T.tanh(pooled_out + self.b.dimshuffle(‘x‘, 0, ‘x‘, ‘x‘)) self.params = [self.W, self.b] self.input = input上面代码实现了对输入信号的卷积操作,并对结果进行最大化池化。
下面我们来看怎样初始化Lenet层,怎样将Lenet层输出信号转为MLP网络隐藏层的输入信号,具体代码如下所示:
layer0 = LeNetConvPoolLayer( rng, input=layer0_input, image_shape=(batch_size, 1, 28, 28), filter_shape=(nkerns[0], 1, 5, 5), poolsize=(2, 2) )如上所示,我们的输入信号是28*28的黑白图像,而且我们采用的批量学习,因此输入图像就定义为(batch_size, 1, 28, 28),我们对图像进行5*5卷积操作,根据卷积操作定义,最终得到的卷积输出层为(28-5+1,28-5+1)=(24,24)的“图像”,我们采用2*2的最大池化操作,即取2*2区域像素的最大值作为新的像素点的值,则最终输出层得到12*12的输出信号。
接下来,我们将输出信号继续输入一个Lenet卷积池化层,代码如下所示:
layer1 = LeNetConvPoolLayer( rng, input=layer0.output, image_shape=(batch_size, nkerns[0], 12, 12), filter_shape=(nkerns[1], nkerns[0], 5, 5), poolsize=(2, 2) )如上所示,这时输入信号变化为12*12的图像,我们还使用5*5的卷积核,可以得到(12-5+1, 12-5+1)=(8,8)的图像,采用2*2最大池化操作后,得到(4,4)图像。可以通过调用layer1.output.flatten(2)将其变为一维信号,从而输入MLP的隐藏层。
下面我们定义Lenet引擎来实现装入数据,定义网络模型,训练网络工作,代码如下所示:
from __future__ import print_function import os import sys import timeit import numpy import theano import theano.tensor as T from theano.tensor.signal import pool from theano.tensor.nnet import conv2d from mnist_loader import MnistLoader from logistic_regression import LogisticRegression from hidden_layer import HiddenLayer from lenet_conv_pool_layer import LeNetConvPoolLayer class LenetMnistEngine(object): def __init__(self): print("create LenetMnistEngine") def train_model(self): learning_rate = 0.1 n_epochs = 200 dataset = ‘mnist.pkl.gz‘ nkerns = [20, 50] batch_size = 500 (n_train_batches, n_test_batches, n_valid_batches, train_model, test_model, validate_model) = self.build_model(learning_rate, n_epochs, dataset, nkerns, batch_size) self.train(n_epochs, n_train_batches, n_test_batches, n_valid_batches, train_model, test_model, validate_model) def run(self): print("run the model") classifier = pickle.load(open(‘best_model.pkl‘, ‘rb‘)) predict_model = theano.function( inputs=[classifier.input], outputs=classifier.logRegressionLayer.y_pred ) dataset=‘mnist.pkl.gz‘ loader = MnistLoader() datasets = loader.load_data(dataset) test_set_x, test_set_y = datasets[2] test_set_x = test_set_x.get_value() predicted_values = predict_model(test_set_x[:10]) print("Predicted values for the first 10 examples in test set:") print(predicted_values) def build_model(self, learning_rate=0.1, n_epochs=200, dataset=‘mnist.pkl.gz‘, nkerns=[20, 50], batch_size=500): rng = numpy.random.RandomState(23455) loader = MnistLoader() datasets = loader.load_data(dataset) train_set_x, train_set_y = datasets[0] valid_set_x, valid_set_y = datasets[1] test_set_x, test_set_y = datasets[2] n_train_batches = train_set_x.get_value(borrow=True).shape[0] n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] n_test_batches = test_set_x.get_value(borrow=True).shape[0] n_train_batches //= batch_size n_valid_batches //= batch_size n_test_batches //= batch_size index = T.lscalar() x = T.matrix(‘x‘) y = T.ivector(‘y‘) print(‘... building the model‘) layer0_input = x.reshape((batch_size, 1, 28, 28)) layer0 = LeNetConvPoolLayer( rng, input=layer0_input, image_shape=(batch_size, 1, 28, 28), filter_shape=(nkerns[0], 1, 5, 5), poolsize=(2, 2) ) layer1 = LeNetConvPoolLayer( rng, input=layer0.output, image_shape=(batch_size, nkerns[0], 12, 12), filter_shape=(nkerns[1], nkerns[0], 5, 5), poolsize=(2, 2) ) layer2_input = layer1.output.flatten(2) layer2 = HiddenLayer( rng, input=layer2_input, n_in=nkerns[1] * 4 * 4, n_out=500, activation=T.tanh ) layer3 = LogisticRegression(input=layer2.output, n_in=500, n_out=10) cost = layer3.negative_log_likelihood(y) test_model = theano.function( [index], layer3.errors(y), givens={ x: test_set_x[index * batch_size: (index + 1) * batch_size], y: test_set_y[index * batch_size: (index + 1) * batch_size] } ) validate_model = theano.function( [index], layer3.errors(y), givens={ x: valid_set_x[index * batch_size: (index + 1) * batch_size], y: valid_set_y[index * batch_size: (index + 1) * batch_size] } ) params = layer3.params + layer2.params + layer1.params + layer0.params grads = T.grad(cost, params) updates = [ (param_i, param_i - learning_rate * grad_i) for param_i, grad_i in zip(params, grads) ] train_model = theano.function( [index], cost, updates=updates, givens={ x: train_set_x[index * batch_size: (index + 1) * batch_size], y: train_set_y[index * batch_size: (index + 1) * batch_size] } ) return (n_train_batches, n_test_batches, n_valid_batches, train_model, test_model, validate_model) def train(self, n_epochs, n_train_batches, n_test_batches, n_valid_batches, train_model, test_model, validate_model): print(‘... training‘) patience = 10000 patience_increase = 2 improvement_threshold = 0.995 validation_frequency = min(n_train_batches, patience // 2) best_validation_loss = numpy.inf best_iter = 0 test_score = 0. start_time = timeit.default_timer() epoch = 0 done_looping = False while (epoch < n_epochs) and (not done_looping): epoch = epoch + 1 for minibatch_index in range(n_train_batches): iter = (epoch - 1) * n_train_batches + minibatch_index if iter % 100 == 0: print(‘training @ iter = ‘, iter) cost_ij = train_model(minibatch_index) if (iter + 1) % validation_frequency == 0: validation_losses = [validate_model(i) for i in range(n_valid_batches)] this_validation_loss = numpy.mean(validation_losses) print(‘epoch %i, minibatch %i/%i, validation error %f %%‘ % (epoch, minibatch_index + 1, n_train_batches, this_validation_loss * 100.)) if this_validation_loss < best_validation_loss: if this_validation_loss < best_validation_loss * improvement_threshold: patience = max(patience, iter * patience_increase) best_validation_loss = this_validation_loss best_iter = iter test_losses = [ test_model(i) for i in range(n_test_batches) ] test_score = numpy.mean(test_losses) with open(‘best_model.pkl‘, ‘wb‘) as f: pickle.dump(classifier, f) print((‘ epoch %i, minibatch %i/%i, test error of ‘ ‘best model %f %%‘) % (epoch, minibatch_index + 1, n_train_batches, test_score * 100.)) if patience <= iter: done_looping = True break end_time = timeit.default_timer() print(‘Optimization complete.‘) print(‘Best validation score of %f %% obtained at iteration %i, ‘ ‘with test performance %f %%‘ % (best_validation_loss * 100., best_iter + 1, test_score * 100.)) print((‘The code for file ‘ + os.path.split(__file__)[1] + ‘ ran for %.2fm‘ % ((end_time - start_time) / 60.)), file=sys.stderr)上述代码与之前的MLP的训练代码类似,这里就不再讨论了。在我的Mac笔记本上,运行大约6个小时,会得到错误率小于1%的结果。
标签:
原文地址:http://blog.csdn.net/yt7589/article/details/52366966