Skip to content
Snippets Groups Projects
rnn.py 4.08 KiB
Newer Older
  • Learn to ignore specific revisions
  • johannes bilk's avatar
    johannes bilk committed
    import numpy as np
    from .layer import Layer
    from .weights import Weights
    from .linear import Linear
    from abc import abstractmethod
    
    
    def checkDims(input: np.ndarray) -> None:
        assert input.ndim == 3, f"Input input should have 3 dimensions, got {input.ndim}"
        batchsize, seqLength, _ = input.shape
        assert batchsize > 0 and seqLength > 0, "All dimensions should be greater than 0"
    
    
    class RNN(Layer):
        __slots__ = ['inputSize', 'hiddenSize', 'outputSize']
    
        def __init__(self, inputSize: int, hiddenSize: int, outputSize: int) -> None:
            super().__init__()
            self.inputSize = inputSize
            self.hiddenSize = hiddenSize
            self.outputSize = outputSize
    
        @abstractmethod
        def forward(self, input: np.ndarray, hiddenState: np.ndarray = None) -> tuple[np.ndarray, np.ndarray]:
            raise NotImplementedError('not implemented')
    
        @abstractmethod
        def backward(self, gradient: np.ndarray, hiddenGradient: np.ndarray = None) -> tuple[np.ndarray, np.ndarray]:
            raise NotImplementedError('not implemented')
    
    
    class Recuring(RNN):
        __slots__ = ['gradientClipSize', 'input', 'batchSize', 'seqLength', 'inputLayer', 'hiddenLayer', 'outputLayer']
    
        def __init__(self, inputSize: int, hiddenSize: int, outputSize: int, gradientClipSize: int = 5) -> None:
            super().__init__(inputSize, hiddenSize, outputSize)
            self.gradientClipSize = gradientClipSize
    
            self.inputLayer = Linear(inputSize, hiddenSize, bias=False)
            self.hiddenLayer = Linear(hiddenSize, hiddenSize)
            self.outputLayer = Linear(hiddenSize, outputSize)
    
        def params(self) -> tuple[Weights, Weights]:
            """
            returns weights and bias in a python list, called by optimizers
            """
            return [*self.inputLayer.params(), *self.hiddenLayer.params(), *self.outputLayer.params()]
    
        def forward(self, input: np.ndarray, hiddenState: np.ndarray = None) -> tuple[np.ndarray, np.ndarray]:
            checkDims(input)
    
            self.input = input
            self.batchSize, self.seqLength, _ = input.shape
            outputState = np.zeros((self.batchSize, self.seqLength, self.outputSize))
    
            # Initialize the hidden state
            if hiddenState is None:
                hiddenState = np.zeros((self.batchSize, self.hiddenSize))
            else:
                # Ensure that the hiddenState has the correct shape
                assert hiddenState.shape == (self.batchSize, self.hiddenSize)
    
            # Accumulate hidden states over time steps
            hiddenStates = np.zeros((self.batchSize, self.seqLength, self.hiddenSize))
    
            # Loop through each time step
            for t in range(self.seqLength):
                inputTimeStep = self.inputLayer(input[:, t, :])
                hiddenTimeStep = self.hiddenLayer(hiddenState)
                hiddenState = np.tanh(inputTimeStep + hiddenTimeStep)
                outputState[:, t, :] = self.outputLayer(hiddenState)
                hiddenStates[:, t, :] = hiddenState
    
            return outputState, hiddenStates
    
        def backward(self, gradient: np.ndarray, hiddenGradient: np.ndarray = None) -> tuple[np.ndarray, np.ndarray]:
            gradInputState = np.zeros_like(self.input)
            dhiddenNext = np.zeros((self.batchSize, self.hiddenSize))
    
            # If hiddenGradient is provided, it should be added to the last timestep's hidden state gradient
            if hiddenGradient is not None:
                dhiddenNext += hiddenGradient
    
            for t in reversed(range(self.seqLength)):
                gradientTimeStep = gradient[:, t, :]
                dhidden = self.outputLayer.backward(gradientTimeStep) + dhiddenNext
                dhiddenRaw = (1 - np.square(self.hiddenState[:, t, :])) * dhidden
                dhiddenNext = self.hiddenLayer.backward(dhiddenRaw)
    
                dinput = self.inputLayer.backward(dhiddenRaw)
                gradInputState[:, t, :] = dinput
    
            # Clip gradients
            for dparam in [self.inputLayer.weights.deltas, self.hiddenLayer.weights.deltas, self.hiddenLayer.bias.deltas, self.outputLayer.weights.deltas, self.outputLayer.bias.deltas]:
                np.clip(dparam, -self.gradientClipSize, self.gradientClipSize, out=dparam)
    
            return gradInputState, dhiddenNext