import numpy as np
import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams
from layers.gating import SimplifiedLstm, BidirectionSLstm, VanillaLstm, BidirectionLstm, VanillaRNN
from layers.layers import LinearLayer, SigmoidLayer
import logging
[docs]class DeepRecurrentNetwork(object):
"""
This class is to assemble various neural network architectures. From basic feedforward neural network to bidirectional gated recurrent neural networks and hybrid architecture. **Hybrid** means a combination of feedforward and recurrent architecture.
"""
[docs] def __init__(self, n_in, hidden_layer_size, n_out, L1_reg, L2_reg, hidden_layer_type, output_type='LINEAR'):
""" This function initialises a neural network
:param n_in: Dimensionality of input features
:type in: Integer
:param hidden_layer_size: The layer size for each hidden layer
:type hidden_layer_size: A list of integers
:param n_out: Dimensionality of output features
:type n_out: Integrer
:param hidden_layer_type: the activation types of each hidden layers, e.g., TANH, LSTM, GRU, BLSTM
:param L1_reg: the L1 regulasation weight
:param L2_reg: the L2 regulasation weight
:param output_type: the activation type of the output layer, by default is 'LINEAR', linear regression.
:param p_dropout: the dropout rate, a float number between 0 and 1.
"""
logger = logging.getLogger("DNN initialization")
self.n_in = int(n_in)
# self.n_h = int(n_h)
self.n_out = int(n_out)
self.n_layers = len(hidden_layer_size)
# print len(hidden_layer_size), len(hidden_layer_type)
assert len(hidden_layer_size) == len(hidden_layer_type)
self.x = T.matrix('x')
self.y = T.matrix('y')
self.L1_reg = L1_reg
self.L2_reg = L2_reg
self.rnn_layers = []
self.params = []
self.delta_params = []
rng = np.random.RandomState(123)
for i in xrange(self.n_layers):
if i == 0:
input_size = n_in
else:
input_size = hidden_layer_size[i-1]
if i == 0:
layer_input = self.x
else:
layer_input = self.rnn_layers[i-1].output
if hidden_layer_type[i-1] == 'BSLSTM' or hidden_layer_type[i-1] == 'BLSTM':
input_size = hidden_layer_size[i-1]*2
if hidden_layer_type[i] == 'SLSTM':
hidden_layer = SimplifiedLstm(rng, layer_input, input_size, hidden_layer_size[i])
elif hidden_layer_type[i] == 'LSTM':
hidden_layer = VanillaLstm(rng, layer_input, input_size, hidden_layer_size[i])
elif hidden_layer_type[i] == 'BSLSTM':
hidden_layer = BidirectionSLstm(rng, layer_input, input_size, hidden_layer_size[i], hidden_layer_size[i])
elif hidden_layer_type[i] == 'BLSTM':
hidden_layer = BidirectionLstm(rng, layer_input, input_size, hidden_layer_size[i], hidden_layer_size[i])
elif hidden_layer_type[i] == 'RNN':
hidden_layer = VanillaRNN(rng, layer_input, input_size, hidden_layer_size[i])
elif hidden_layer_type[i] == 'TANH':
hidden_layer = SigmoidLayer(rng, layer_input, input_size, hidden_layer_size[i], activation=T.tanh)
elif hidden_layer_type[i] == 'SIGMOID':
hidden_layer = SigmoidLayer(rng, layer_input, input_size, hidden_layer_size[i], activation=T.nnet.sigmoid)
else:
logger.critical("This hidden layer type: %s is not supported right now! \n Please use one of the following: SLSTM, BSLSTM, TANH, SIGMOID\n" %(hidden_layer_type[i]))
sys.exit(1)
self.rnn_layers.append(hidden_layer)
self.params.extend(hidden_layer.params)
input_size = hidden_layer_size[-1]
if hidden_layer_type[-1] == 'BSLSTM' or hidden_layer_type[-1] == 'BLSTM':
input_size = hidden_layer_size[-1]*2
if output_type == 'LINEAR':
self.final_layer = LinearLayer(rng, self.rnn_layers[-1].output, input_size, self.n_out)
# elif output_type == 'BSLSTM':
# self.final_layer = BidirectionLSTM(rng, self.rnn_layers[-1].output, input_size, hidden_layer_size[-1], self.n_out)
else:
logger.critical("This output layer type: %s is not supported right now! \n Please use one of the following: LINEAR, BSLSTM\n" %(output_type))
sys.exit(1)
self.params.extend(self.final_layer.params)
self.updates = {}
for param in self.params:
self.updates[param] = theano.shared(value = np.zeros(param.get_value(borrow = True).shape,
dtype = theano.config.floatX), name = 'updates')
self.finetune_cost = T.mean(T.sum((self.final_layer.output - self.y) ** 2, axis=1))
self.errors = T.mean(T.sum((self.final_layer.output - self.y) ** 2, axis=1))
# self.L2_sqr = (self.W_hy ** 2).sum()
[docs] def build_finetune_functions(self, train_shared_xy, valid_shared_xy):
""" This function is to build finetune functions and to update gradients
:param train_shared_xy: theano shared variable for input and output training data
:type train_shared_xy: tuple of shared variable
:param valid_shared_xy: theano shared variable for input and output development data
:type valid_shared_xy: tuple of shared variable
:returns: finetune functions for training and development
"""
(train_set_x, train_set_y) = train_shared_xy
(valid_set_x, valid_set_y) = valid_shared_xy
lr = T.scalar('lr', dtype = theano.config.floatX)
mom = T.scalar('mom', dtype = theano.config.floatX) # momentum
cost = self.finetune_cost #+ self.L2_reg * self.L2_sqr
gparams = T.grad(cost, self.params)
# zip just concatenate two lists
updates = theano.compat.python2x.OrderedDict()
# for dparam, gparam in zip(self.delta_params, gparams):
# updates[dparam] = mom * dparam - gparam * lr
# for dparam, param in zip(self.delta_params, self.params):
# updates[param] = param + updates[dparam]
for param, gparam in zip(self.params, gparams):
weight_update = self.updates[param]
upd = mom * weight_update - lr * gparam
updates[weight_update] = upd
updates[param] = param + upd
train_model = theano.function(inputs = [lr, mom],
outputs = self.errors,
updates = updates,
givens = {self.x: train_set_x,
self.y: train_set_y})
valid_model = theano.function(inputs = [],
outputs = self.errors,
givens = {self.x: valid_set_x,
self.y: valid_set_y})
return train_model, valid_model
[docs] def parameter_prediction(self, test_set_x): #, batch_size
""" This function is to predict
:param test_set_x: input features for a testing sentence
:type test_set_x: python array variable
:returns: predicted features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.final_layer.output,
givens={self.x: test_set_x[0:n_test_set_x]})
predict_parameter = test_out()
return predict_parameter