Source code for models.deep_rnn


import numpy as np

import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams

from layers.gating import SimplifiedLstm, BidirectionSLstm, VanillaLstm, BidirectionLstm, VanillaRNN
from layers.layers import LinearLayer, SigmoidLayer

import logging

[docs]class DeepRecurrentNetwork(object): """ This class is to assemble various neural network architectures. From basic feedforward neural network to bidirectional gated recurrent neural networks and hybrid architecture. **Hybrid** means a combination of feedforward and recurrent architecture. """
[docs] def __init__(self, n_in, hidden_layer_size, n_out, L1_reg, L2_reg, hidden_layer_type, output_type='LINEAR'): """ This function initialises a neural network :param n_in: Dimensionality of input features :type in: Integer :param hidden_layer_size: The layer size for each hidden layer :type hidden_layer_size: A list of integers :param n_out: Dimensionality of output features :type n_out: Integrer :param hidden_layer_type: the activation types of each hidden layers, e.g., TANH, LSTM, GRU, BLSTM :param L1_reg: the L1 regulasation weight :param L2_reg: the L2 regulasation weight :param output_type: the activation type of the output layer, by default is 'LINEAR', linear regression. :param p_dropout: the dropout rate, a float number between 0 and 1. """ logger = logging.getLogger("DNN initialization") self.n_in = int(n_in) # self.n_h = int(n_h) self.n_out = int(n_out) self.n_layers = len(hidden_layer_size) # print len(hidden_layer_size), len(hidden_layer_type) assert len(hidden_layer_size) == len(hidden_layer_type) self.x = T.matrix('x') self.y = T.matrix('y') self.L1_reg = L1_reg self.L2_reg = L2_reg self.rnn_layers = [] self.params = [] self.delta_params = [] rng = np.random.RandomState(123) for i in xrange(self.n_layers): if i == 0: input_size = n_in else: input_size = hidden_layer_size[i-1] if i == 0: layer_input = self.x else: layer_input = self.rnn_layers[i-1].output if hidden_layer_type[i-1] == 'BSLSTM' or hidden_layer_type[i-1] == 'BLSTM': input_size = hidden_layer_size[i-1]*2 if hidden_layer_type[i] == 'SLSTM': hidden_layer = SimplifiedLstm(rng, layer_input, input_size, hidden_layer_size[i]) elif hidden_layer_type[i] == 'LSTM': hidden_layer = VanillaLstm(rng, layer_input, input_size, hidden_layer_size[i]) elif hidden_layer_type[i] == 'BSLSTM': hidden_layer = BidirectionSLstm(rng, layer_input, input_size, hidden_layer_size[i], hidden_layer_size[i]) elif hidden_layer_type[i] == 'BLSTM': hidden_layer = BidirectionLstm(rng, layer_input, input_size, hidden_layer_size[i], hidden_layer_size[i]) elif hidden_layer_type[i] == 'RNN': hidden_layer = VanillaRNN(rng, layer_input, input_size, hidden_layer_size[i]) elif hidden_layer_type[i] == 'TANH': hidden_layer = SigmoidLayer(rng, layer_input, input_size, hidden_layer_size[i], activation=T.tanh) elif hidden_layer_type[i] == 'SIGMOID': hidden_layer = SigmoidLayer(rng, layer_input, input_size, hidden_layer_size[i], activation=T.nnet.sigmoid) else: logger.critical("This hidden layer type: %s is not supported right now! \n Please use one of the following: SLSTM, BSLSTM, TANH, SIGMOID\n" %(hidden_layer_type[i])) sys.exit(1) self.rnn_layers.append(hidden_layer) self.params.extend(hidden_layer.params) input_size = hidden_layer_size[-1] if hidden_layer_type[-1] == 'BSLSTM' or hidden_layer_type[-1] == 'BLSTM': input_size = hidden_layer_size[-1]*2 if output_type == 'LINEAR': self.final_layer = LinearLayer(rng, self.rnn_layers[-1].output, input_size, self.n_out) # elif output_type == 'BSLSTM': # self.final_layer = BidirectionLSTM(rng, self.rnn_layers[-1].output, input_size, hidden_layer_size[-1], self.n_out) else: logger.critical("This output layer type: %s is not supported right now! \n Please use one of the following: LINEAR, BSLSTM\n" %(output_type)) sys.exit(1) self.params.extend(self.final_layer.params) self.updates = {} for param in self.params: self.updates[param] = theano.shared(value = np.zeros(param.get_value(borrow = True).shape, dtype = theano.config.floatX), name = 'updates') self.finetune_cost = T.mean(T.sum((self.final_layer.output - self.y) ** 2, axis=1)) self.errors = T.mean(T.sum((self.final_layer.output - self.y) ** 2, axis=1))
# self.L2_sqr = (self.W_hy ** 2).sum()
[docs] def build_finetune_functions(self, train_shared_xy, valid_shared_xy): """ This function is to build finetune functions and to update gradients :param train_shared_xy: theano shared variable for input and output training data :type train_shared_xy: tuple of shared variable :param valid_shared_xy: theano shared variable for input and output development data :type valid_shared_xy: tuple of shared variable :returns: finetune functions for training and development """ (train_set_x, train_set_y) = train_shared_xy (valid_set_x, valid_set_y) = valid_shared_xy lr = T.scalar('lr', dtype = theano.config.floatX) mom = T.scalar('mom', dtype = theano.config.floatX) # momentum cost = self.finetune_cost #+ self.L2_reg * self.L2_sqr gparams = T.grad(cost, self.params) # zip just concatenate two lists updates = theano.compat.python2x.OrderedDict() # for dparam, gparam in zip(self.delta_params, gparams): # updates[dparam] = mom * dparam - gparam * lr # for dparam, param in zip(self.delta_params, self.params): # updates[param] = param + updates[dparam] for param, gparam in zip(self.params, gparams): weight_update = self.updates[param] upd = mom * weight_update - lr * gparam updates[weight_update] = upd updates[param] = param + upd train_model = theano.function(inputs = [lr, mom], outputs = self.errors, updates = updates, givens = {self.x: train_set_x, self.y: train_set_y}) valid_model = theano.function(inputs = [], outputs = self.errors, givens = {self.x: valid_set_x, self.y: valid_set_y}) return train_model, valid_model
[docs] def parameter_prediction(self, test_set_x): #, batch_size """ This function is to predict :param test_set_x: input features for a testing sentence :type test_set_x: python array variable :returns: predicted features """ n_test_set_x = test_set_x.shape[0] test_out = theano.function([], self.final_layer.output, givens={self.x: test_set_x[0:n_test_set_x]}) predict_parameter = test_out() return predict_parameter