Skip to content
Snippets Groups Projects
Commit b74a0ebd authored by Maja Jabłońska's avatar Maja Jabłońska
Browse files

Add COMBO Encoder

parent 26044b45
No related branches found
No related tags found
1 merge request!46Merge COMBO 3.0 into master
class StackedBidirectionalLstm:
pass
"""
Adapted parts from AllenNLP
and COMBO (Author: Mateusz Klimaszewski)
"""
from typing import Optional, Tuple, List
import torch
import torch.nn.utils.rnn as rnn
from overrides import overrides
from torch.nn.utils.rnn import PackedSequence, pack_padded_sequence, pad_packed_sequence
from combo.modules import input_variational_dropout
from combo.modules.augmented_lstm import AugmentedLstm
from combo.modules.input_variational_dropout import InputVariationalDropout
from combo.utils import ConfigurationError
TensorPair = Tuple[torch.Tensor, torch.Tensor]
class StackedBidirectionalLstm(torch.nn.Module):
"""
A standard stacked Bidirectional LSTM where the LSTM layers
are concatenated between each layer. The only difference between
this and a regular bidirectional LSTM is the application of
variational dropout to the hidden states and outputs of each layer apart
from the last layer of the LSTM. Note that this will be slower, as it
doesn't use CUDNN.
[0]: https://arxiv.org/abs/1512.05287
# Parameters
input_size : `int`, required
The dimension of the inputs to the LSTM.
hidden_size : `int`, required
The dimension of the outputs of the LSTM.
num_layers : `int`, required
The number of stacked Bidirectional LSTMs to use.
recurrent_dropout_probability : `float`, optional (default = `0.0`)
The recurrent dropout probability to be used in a dropout scheme as
stated in [A Theoretically Grounded Application of Dropout in Recurrent
Neural Networks][0].
layer_dropout_probability : `float`, optional (default = `0.0`)
The layer wise dropout probability to be used in a dropout scheme as
stated in [A Theoretically Grounded Application of Dropout in Recurrent
Neural Networks][0].
use_highway : `bool`, optional (default = `True`)
Whether or not to use highway connections between layers. This effectively involves
reparameterising the normal output of an LSTM as::
gate = sigmoid(W_x1 * x_t + W_h * h_t)
output = gate * h_t + (1 - gate) * (W_x2 * x_t)
"""
def __init__(
self,
input_size: int,
hidden_size: int,
num_layers: int,
recurrent_dropout_probability: float = 0.0,
layer_dropout_probability: float = 0.0,
use_highway: bool = True,
) -> None:
super().__init__()
# Required to be wrapped with a `PytorchSeq2SeqWrapper`.
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.bidirectional = True
layers = []
lstm_input_size = input_size
for layer_index in range(num_layers):
forward_layer = AugmentedLstm(
lstm_input_size,
hidden_size,
go_forward=True,
recurrent_dropout_probability=recurrent_dropout_probability,
use_highway=use_highway,
use_input_projection_bias=False,
)
backward_layer = AugmentedLstm(
lstm_input_size,
hidden_size,
go_forward=False,
recurrent_dropout_probability=recurrent_dropout_probability,
use_highway=use_highway,
use_input_projection_bias=False,
)
lstm_input_size = hidden_size * 2
self.add_module("forward_layer_{}".format(layer_index), forward_layer)
self.add_module("backward_layer_{}".format(layer_index), backward_layer)
layers.append([forward_layer, backward_layer])
self.lstm_layers = layers
self.layer_dropout = InputVariationalDropout(layer_dropout_probability)
def forward(
self, inputs: PackedSequence, initial_state: Optional[TensorPair] = None
) -> Tuple[PackedSequence, TensorPair]:
"""
# Parameters
inputs : `PackedSequence`, required.
A batch first `PackedSequence` to run the stacked LSTM over.
initial_state : `Tuple[torch.Tensor, torch.Tensor]`, optional, (default = `None`)
A tuple (state, memory) representing the initial hidden state and memory
of the LSTM. Each tensor has shape (num_layers, batch_size, output_dimension * 2).
# Returns
output_sequence : `PackedSequence`
The encoded sequence of shape (batch_size, sequence_length, hidden_size * 2)
final_states: `torch.Tensor`
The per-layer final (state, memory) states of the LSTM, each with shape
(num_layers * 2, batch_size, hidden_size * 2).
"""
if initial_state is None:
hidden_states: List[Optional[TensorPair]] = [None] * len(self.lstm_layers)
elif initial_state[0].size()[0] != len(self.lstm_layers):
raise ConfigurationError(
"Initial states were passed to forward() but the number of "
"initial states does not match the number of layers."
)
else:
hidden_states = list(zip(initial_state[0].split(1, 0), initial_state[1].split(1, 0)))
output_sequence = inputs
final_h = []
final_c = []
for i, state in enumerate(hidden_states):
forward_layer = getattr(self, "forward_layer_{}".format(i))
backward_layer = getattr(self, "backward_layer_{}".format(i))
# The state is duplicated to mirror the Pytorch API for LSTMs.
forward_output, final_forward_state = forward_layer(output_sequence, state)
backward_output, final_backward_state = backward_layer(output_sequence, state)
forward_output, lengths = pad_packed_sequence(forward_output, batch_first=True)
backward_output, _ = pad_packed_sequence(backward_output, batch_first=True)
output_sequence = torch.cat([forward_output, backward_output], -1)
# Apply layer wise dropout on each output sequence apart from the
# first (input) and last
if i < (self.num_layers - 1):
output_sequence = self.layer_dropout(output_sequence)
output_sequence = pack_padded_sequence(output_sequence, lengths, batch_first=True)
final_h.extend([final_forward_state[0], final_backward_state[0]])
final_c.extend([final_forward_state[1], final_backward_state[1]])
final_h = torch.cat(final_h, dim=0)
final_c = torch.cat(final_c, dim=0)
final_state_tuple = (final_h, final_c)
return output_sequence, final_state_tuple
# TODO: merge into one
class ComboStackedBidirectionalLSTM(StackedBidirectionalLstm):
def __init__(self, input_size: int, hidden_size: int, num_layers: int, recurrent_dropout_probability: float,
layer_dropout_probability: float, use_highway: bool = False):
super().__init__(input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
recurrent_dropout_probability=recurrent_dropout_probability,
layer_dropout_probability=layer_dropout_probability,
use_highway=use_highway)
@overrides
def forward(self,
inputs: rnn.PackedSequence,
initial_state: Optional[Tuple[torch.Tensor, torch.Tensor]] = None
) -> Tuple[rnn.PackedSequence, Tuple[torch.Tensor, torch.Tensor]]:
"""Changes when compared to stacked_bidirectional_lstm.StackedBidirectionalLstm
* dropout also on last layer
* accepts BxTxD tensor
* state from n-1 layer used as n layer initial state
:param inputs:
:param initial_state:
:return:
"""
output_sequence = inputs
state_fwd = None
state_bwd = None
for i in range(self.num_layers):
forward_layer = getattr(self, f"forward_layer_{i}")
backward_layer = getattr(self, f"backward_layer_{i}")
forward_output, state_fwd = forward_layer(output_sequence, state_fwd)
backward_output, state_bwd = backward_layer(output_sequence, state_bwd)
forward_output, lengths = rnn.pad_packed_sequence(forward_output, batch_first=True)
backward_output, _ = rnn.pad_packed_sequence(backward_output, batch_first=True)
output_sequence = torch.cat([forward_output, backward_output], -1)
output_sequence = self.layer_dropout(output_sequence)
output_sequence = rnn.pack_padded_sequence(output_sequence, lengths, batch_first=True)
return output_sequence, (state_fwd, state_bwd)
class ComboEncoder:
pass
"""COMBO encoder (https://www.aclweb.org/anthology/K18-2004.pdf).
This implementation uses Variational Dropout on the input and then outputs of each BiLSTM layer
(instead of used Gaussian Dropout and Gaussian Noise).
"""
def __init__(self,
stacked_bilstm: ComboStackedBidirectionalLSTM,
layer_dropout_probability: float):
super().__init__(stacked_bilstm, stateful=False)
self.layer_dropout = input_variational_dropout.InputVariationalDropout(p=layer_dropout_probability)
@overrides
def forward(self,
inputs: torch.Tensor,
mask: torch.BoolTensor,
hidden_state: torch.Tensor = None) -> torch.Tensor:
x = self.layer_dropout(inputs)
x = super().forward(x, mask)
return self.layer_dropout(x)
"""
Adapted from AllenNLP
https://github.com/allenai/allennlp/blob/main/allennlp/modules/augmented_lstm.py
"""
from typing import Optional, Tuple
import torch
from torch.nn.utils.rnn import PackedSequence, pack_padded_sequence, pad_packed_sequence
from combo.nn.util import get_dropout_mask
from combo.nn.initializers import block_orthogonal
from combo.utils import ConfigurationError
class AugmentedLSTMCell(torch.nn.Module):
"""
`AugmentedLSTMCell` implements a AugmentedLSTM cell.
# Parameters
embed_dim : `int`
The number of expected features in the input.
lstm_dim : `int`
Number of features in the hidden state of the LSTM.
use_highway : `bool`, optional (default = `True`)
If `True` we append a highway network to the outputs of the LSTM.
use_bias : `bool`, optional (default = `True`)
If `True` we use a bias in our LSTM calculations, otherwise we don't.
# Attributes
input_linearity : `nn.Module`
Fused weight matrix which computes a linear function over the input.
state_linearity : `nn.Module`
Fused weight matrix which computes a linear function over the states.
"""
def __init__(
self, embed_dim: int, lstm_dim: int, use_highway: bool = True, use_bias: bool = True
):
super().__init__()
self.embed_dim = embed_dim
self.lstm_dim = lstm_dim
self.use_highway = use_highway
self.use_bias = use_bias
if use_highway:
self._highway_inp_proj_start = 5 * self.lstm_dim
self._highway_inp_proj_end = 6 * self.lstm_dim
# fused linearity of input to input_gate,
# forget_gate, memory_init, output_gate, highway_gate,
# and the actual highway value
self.input_linearity = torch.nn.Linear(
self.embed_dim, self._highway_inp_proj_end, bias=self.use_bias
)
# fused linearity of input to input_gate,
# forget_gate, memory_init, output_gate, highway_gate
self.state_linearity = torch.nn.Linear(
self.lstm_dim, self._highway_inp_proj_start, bias=True
)
else:
# If there's no highway layer then we have a standard
# LSTM. The 4 comes from fusing input, forget, memory, output
# gates/inputs.
self.input_linearity = torch.nn.Linear(
self.embed_dim, 4 * self.lstm_dim, bias=self.use_bias
)
self.state_linearity = torch.nn.Linear(self.lstm_dim, 4 * self.lstm_dim, bias=True)
self.reset_parameters()
def reset_parameters(self):
# Use sensible default initializations for parameters.
block_orthogonal(self.input_linearity.weight.data, [self.lstm_dim, self.embed_dim])
block_orthogonal(self.state_linearity.weight.data, [self.lstm_dim, self.lstm_dim])
self.state_linearity.bias.data.fill_(0.0)
# Initialize forget gate biases to 1.0 as per An Empirical
# Exploration of Recurrent Network Architectures, (Jozefowicz, 2015).
self.state_linearity.bias.data[self.lstm_dim : 2 * self.lstm_dim].fill_(1.0)
def forward(
self,
x: torch.Tensor,
states=Tuple[torch.Tensor, torch.Tensor],
variational_dropout_mask: Optional[torch.BoolTensor] = None,
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
!!! Warning
DO NOT USE THIS LAYER DIRECTLY, instead use the AugmentedLSTM class
# Parameters
x : `torch.Tensor`
Input tensor of shape (bsize x input_dim).
states : `Tuple[torch.Tensor, torch.Tensor]`
Tuple of tensors containing
the hidden state and the cell state of each element in
the batch. Each of these tensors have a dimension of
(bsize x nhid). Defaults to `None`.
# Returns
`Tuple[torch.Tensor, torch.Tensor]`
Returned states. Shape of each state is (bsize x nhid).
"""
hidden_state, memory_state = states
# In Pytext this was done as the last step of the cell.
# But the original AugmentedLSTM from AllenNLP this was done before the processing
if variational_dropout_mask is not None and self.training:
hidden_state = hidden_state * variational_dropout_mask
projected_input = self.input_linearity(x)
projected_state = self.state_linearity(hidden_state)
input_gate = forget_gate = memory_init = output_gate = highway_gate = None
if self.use_highway:
fused_op = projected_input[:, : 5 * self.lstm_dim] + projected_state
fused_chunked = torch.chunk(fused_op, 5, 1)
(input_gate, forget_gate, memory_init, output_gate, highway_gate) = fused_chunked
highway_gate = torch.sigmoid(highway_gate)
else:
fused_op = projected_input + projected_state
input_gate, forget_gate, memory_init, output_gate = torch.chunk(fused_op, 4, 1)
input_gate = torch.sigmoid(input_gate)
forget_gate = torch.sigmoid(forget_gate)
memory_init = torch.tanh(memory_init)
output_gate = torch.sigmoid(output_gate)
memory = input_gate * memory_init + forget_gate * memory_state
timestep_output: torch.Tensor = output_gate * torch.tanh(memory)
if self.use_highway:
highway_input_projection = projected_input[
:, self._highway_inp_proj_start : self._highway_inp_proj_end
]
timestep_output = (
highway_gate * timestep_output
+ (1 - highway_gate) * highway_input_projection # type: ignore
)
return timestep_output, memory
class AugmentedLstm(torch.nn.Module):
"""
`AugmentedLstm` implements a one-layer single directional
AugmentedLSTM layer. AugmentedLSTM is an LSTM which optionally
appends an optional highway network to the output layer. Furthermore the
dropout controls the level of variational dropout done.
# Parameters
input_size : `int`
The number of expected features in the input.
hidden_size : `int`
Number of features in the hidden state of the LSTM.
Defaults to 32.
go_forward : `bool`
Whether to compute features left to right (forward)
or right to left (backward).
recurrent_dropout_probability : `float`
Variational dropout probability to use. Defaults to 0.0.
use_highway : `bool`
If `True` we append a highway network to the outputs of the LSTM.
use_input_projection_bias : `bool`
If `True` we use a bias in our LSTM calculations, otherwise we don't.
# Attributes
cell : `AugmentedLSTMCell`
`AugmentedLSTMCell` that is applied at every timestep.
"""
def __init__(
self,
input_size: int,
hidden_size: int,
go_forward: bool = True,
recurrent_dropout_probability: float = 0.0,
use_highway: bool = True,
use_input_projection_bias: bool = True,
):
super().__init__()
self.embed_dim = input_size
self.lstm_dim = hidden_size
self.go_forward = go_forward
self.use_highway = use_highway
self.recurrent_dropout_probability = recurrent_dropout_probability
self.cell = AugmentedLSTMCell(
self.embed_dim, self.lstm_dim, self.use_highway, use_input_projection_bias
)
def forward(
self, inputs: PackedSequence, states: Optional[Tuple[torch.Tensor, torch.Tensor]] = None
) -> Tuple[PackedSequence, Tuple[torch.Tensor, torch.Tensor]]:
"""
Warning: Would be better to use the BiAugmentedLstm class in a regular model
Given an input batch of sequential data such as word embeddings, produces a single layer unidirectional
AugmentedLSTM representation of the sequential input and new state tensors.
# Parameters
inputs : `PackedSequence`
`bsize` sequences of shape `(len, input_dim)` each, in PackedSequence format
states : `Tuple[torch.Tensor, torch.Tensor]`
Tuple of tensors containing the initial hidden state and
the cell state of each element in the batch. Each of these tensors have a dimension of
(1 x bsize x nhid). Defaults to `None`.
# Returns
`Tuple[PackedSequence, Tuple[torch.Tensor, torch.Tensor]]`
AugmentedLSTM representation of input and the state of the LSTM `t = seq_len`.
Shape of representation is (bsize x seq_len x representation_dim).
Shape of each state is (1 x bsize x nhid).
"""
if not isinstance(inputs, PackedSequence):
raise ConfigurationError("inputs must be PackedSequence but got %s" % (type(inputs)))
sequence_tensor, batch_lengths = pad_packed_sequence(inputs, batch_first=True)
batch_size = sequence_tensor.size()[0]
total_timesteps = sequence_tensor.size()[1]
output_accumulator = sequence_tensor.new_zeros(batch_size, total_timesteps, self.lstm_dim)
if states is None:
full_batch_previous_memory = sequence_tensor.new_zeros(batch_size, self.lstm_dim)
full_batch_previous_state = sequence_tensor.data.new_zeros(batch_size, self.lstm_dim)
else:
full_batch_previous_state = states[0].squeeze(0)
full_batch_previous_memory = states[1].squeeze(0)
current_length_index = batch_size - 1 if self.go_forward else 0
if self.recurrent_dropout_probability > 0.0:
dropout_mask = get_dropout_mask(
self.recurrent_dropout_probability, full_batch_previous_memory
)
else:
dropout_mask = None
for timestep in range(total_timesteps):
index = timestep if self.go_forward else total_timesteps - timestep - 1
if self.go_forward:
while batch_lengths[current_length_index] <= index:
current_length_index -= 1
# If we're going backwards, we are _picking up_ more indices.
else:
# First conditional: Are we already at the maximum
# number of elements in the batch?
# Second conditional: Does the next shortest
# sequence beyond the current batch
# index require computation use this timestep?
while (
current_length_index < (len(batch_lengths) - 1)
and batch_lengths[current_length_index + 1] > index
):
current_length_index += 1
previous_memory = full_batch_previous_memory[0 : current_length_index + 1].clone()
previous_state = full_batch_previous_state[0 : current_length_index + 1].clone()
timestep_input = sequence_tensor[0 : current_length_index + 1, index]
timestep_output, memory = self.cell(
timestep_input,
(previous_state, previous_memory),
dropout_mask[0 : current_length_index + 1] if dropout_mask is not None else None,
)
full_batch_previous_memory = full_batch_previous_memory.data.clone()
full_batch_previous_state = full_batch_previous_state.data.clone()
full_batch_previous_memory[0 : current_length_index + 1] = memory
full_batch_previous_state[0 : current_length_index + 1] = timestep_output
output_accumulator[0 : current_length_index + 1, index, :] = timestep_output
output_accumulator = pack_padded_sequence(
output_accumulator, batch_lengths, batch_first=True
)
# Mimic the pytorch API by returning state in the following shape:
# (num_layers * num_directions, batch_size, lstm_dim). As this
# LSTM cannot be stacked, the first dimension here is just 1.
final_state = (
full_batch_previous_state.unsqueeze(0),
full_batch_previous_memory.unsqueeze(0),
)
return output_accumulator, final_state
"""
Adapted from AllenNLP
https://github.com/allenai/allennlp/blob/main/allennlp/modules/input_variational_dropout.py
"""
import torch
class InputVariationalDropout(torch.nn.Dropout):
"""
Apply the dropout technique in Gal and Ghahramani, [Dropout as a Bayesian Approximation:
Representing Model Uncertainty in Deep Learning](https://arxiv.org/abs/1506.02142) to a
3D tensor.
This module accepts a 3D tensor of shape `(batch_size, num_timesteps, embedding_dim)`
and samples a single dropout mask of shape `(batch_size, embedding_dim)` and applies
it to every time step.
"""
def forward(self, input_tensor):
"""
Apply dropout to input tensor.
# Parameters
input_tensor : `torch.FloatTensor`
A tensor of shape `(batch_size, num_timesteps, embedding_dim)`
# Returns
output : `torch.FloatTensor`
A tensor of shape `(batch_size, num_timesteps, embedding_dim)` with dropout applied.
"""
ones = input_tensor.data.new_ones(input_tensor.shape[0], input_tensor.shape[-1])
dropout_mask = torch.nn.functional.dropout(ones, self.p, self.training, inplace=False)
if self.inplace:
input_tensor *= dropout_mask.unsqueeze(1)
return None
else:
return dropout_mask.unsqueeze(1) * input_tensor
"""
Adapted from AllenNLP
https://github.com/allenai/allennlp/blob/main/allennlp/nn/initializers.py
"""
import itertools
import torch
from typing import List
from combo.utils import ConfigurationError
def block_orthogonal(tensor: torch.Tensor, split_sizes: List[int], gain: float = 1.0) -> None:
"""
An initializer which allows initializing model parameters in "blocks". This is helpful
in the case of recurrent models which use multiple gates applied to linear projections,
which can be computed efficiently if they are concatenated together. However, they are
separate parameters which should be initialized independently.
# Parameters
tensor : `torch.Tensor`, required.
A tensor to initialize.
split_sizes : `List[int]`, required.
A list of length `tensor.ndim()` specifying the size of the
blocks along that particular dimension. E.g. `[10, 20]` would
result in the tensor being split into chunks of size 10 along the
first dimension and 20 along the second.
gain : `float`, optional (default = `1.0`)
The gain (scaling) applied to the orthogonal initialization.
"""
data = tensor.data
sizes = list(tensor.size())
if any(a % b != 0 for a, b in zip(sizes, split_sizes)):
raise ConfigurationError(
"tensor dimensions must be divisible by their respective "
"split_sizes. Found size: {} and split_sizes: {}".format(sizes, split_sizes)
)
indexes = [list(range(0, max_size, split)) for max_size, split in zip(sizes, split_sizes)]
# Iterate over all possible blocks within the tensor.
for block_start_indices in itertools.product(*indexes):
# A list of tuples containing the index to start at for this block
# and the appropriate step size (i.e split_size[i] for dimension i).
index_and_step_tuples = zip(block_start_indices, split_sizes)
# This is a tuple of slices corresponding to:
# tensor[index: index + step_size, ...]. This is
# required because we could have an arbitrary number
# of dimensions. The actual slices we need are the
# start_index: start_index + step for each dimension in the tensor.
block_slice = tuple(
slice(start_index, start_index + step) for start_index, step in index_and_step_tuples
)
data[block_slice] = torch.nn.init.orthogonal_(tensor[block_slice].contiguous(), gain=gain)
\ No newline at end of file
......@@ -2,7 +2,7 @@
Adapted from AllenNLP
https://github.com/allenai/allennlp/blob/80fb6061e568cb9d6ab5d45b661e86eb61b92c82/allennlp/nn/util.py
"""
from typing import Union
from typing import Union, Dict
import torch
......@@ -159,3 +159,27 @@ def get_text_field_mask(
return (character_tensor != padding_id).any(dim=-1)
else:
raise ValueError("Expected a tensor with dimension 2 or 3, found {}".format(smallest_dim))
def get_dropout_mask(dropout_probability: float, tensor_for_masking: torch.Tensor):
"""
Computes and returns an element-wise dropout mask for a given tensor, where
each element in the mask is dropped out with probability dropout_probability.
Note that the mask is NOT applied to the tensor - the tensor is passed to retain
the correct CUDA tensor type for the mask.
# Parameters
dropout_probability : `float`, required.
Probability of dropping a dimension of the input.
tensor_for_masking : `torch.Tensor`, required.
# Returns
`torch.FloatTensor`
A torch.FloatTensor consisting of the binary mask scaled by 1/ (1 - dropout_probability).
This scaling ensures expected values and variances of the output of applying this mask
and the original tensor are the same.
"""
binary_mask = (torch.rand(tensor_for_masking.size()) > dropout_probability).to(
tensor_for_masking.device
)
# Scale mask by 1/keep_prob to preserve output statistics.
dropout_mask = binary_mask.float().div(1.0 - dropout_probability)
return dropout_mask
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment