Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import numpy as np
from .layer import Layer
from .weights import Weights
from .linear import Linear
from abc import abstractmethod
def checkDims(input: np.ndarray) -> None:
assert input.ndim == 3, f"Input input should have 3 dimensions, got {input.ndim}"
batchsize, seqLength, _ = input.shape
assert batchsize > 0 and seqLength > 0, "All dimensions should be greater than 0"
class RNN(Layer):
__slots__ = ['inputSize', 'hiddenSize', 'outputSize']
def __init__(self, inputSize: int, hiddenSize: int, outputSize: int) -> None:
super().__init__()
self.inputSize = inputSize
self.hiddenSize = hiddenSize
self.outputSize = outputSize
@abstractmethod
def forward(self, input: np.ndarray, hiddenState: np.ndarray = None) -> tuple[np.ndarray, np.ndarray]:
raise NotImplementedError('not implemented')
@abstractmethod
def backward(self, gradient: np.ndarray, hiddenGradient: np.ndarray = None) -> tuple[np.ndarray, np.ndarray]:
raise NotImplementedError('not implemented')
class Recuring(RNN):
__slots__ = ['gradientClipSize', 'input', 'batchSize', 'seqLength', 'inputLayer', 'hiddenLayer', 'outputLayer']
def __init__(self, inputSize: int, hiddenSize: int, outputSize: int, gradientClipSize: int = 5) -> None:
super().__init__(inputSize, hiddenSize, outputSize)
self.gradientClipSize = gradientClipSize
self.inputLayer = Linear(inputSize, hiddenSize, bias=False)
self.hiddenLayer = Linear(hiddenSize, hiddenSize)
self.outputLayer = Linear(hiddenSize, outputSize)
def params(self) -> tuple[Weights, Weights]:
"""
returns weights and bias in a python list, called by optimizers
"""
return [*self.inputLayer.params(), *self.hiddenLayer.params(), *self.outputLayer.params()]
def forward(self, input: np.ndarray, hiddenState: np.ndarray = None) -> tuple[np.ndarray, np.ndarray]:
checkDims(input)
self.input = input
self.batchSize, self.seqLength, _ = input.shape
outputState = np.zeros((self.batchSize, self.seqLength, self.outputSize))
# Initialize the hidden state
if hiddenState is None:
hiddenState = np.zeros((self.batchSize, self.hiddenSize))
else:
# Ensure that the hiddenState has the correct shape
assert hiddenState.shape == (self.batchSize, self.hiddenSize)
# Accumulate hidden states over time steps
hiddenStates = np.zeros((self.batchSize, self.seqLength, self.hiddenSize))
# Loop through each time step
for t in range(self.seqLength):
inputTimeStep = self.inputLayer(input[:, t, :])
hiddenTimeStep = self.hiddenLayer(hiddenState)
hiddenState = np.tanh(inputTimeStep + hiddenTimeStep)
outputState[:, t, :] = self.outputLayer(hiddenState)
hiddenStates[:, t, :] = hiddenState
return outputState, hiddenStates
def backward(self, gradient: np.ndarray, hiddenGradient: np.ndarray = None) -> tuple[np.ndarray, np.ndarray]:
gradInputState = np.zeros_like(self.input)
dhiddenNext = np.zeros((self.batchSize, self.hiddenSize))
# If hiddenGradient is provided, it should be added to the last timestep's hidden state gradient
if hiddenGradient is not None:
dhiddenNext += hiddenGradient
for t in reversed(range(self.seqLength)):
gradientTimeStep = gradient[:, t, :]
dhidden = self.outputLayer.backward(gradientTimeStep) + dhiddenNext
dhiddenRaw = (1 - np.square(self.hiddenState[:, t, :])) * dhidden
dhiddenNext = self.hiddenLayer.backward(dhiddenRaw)
dinput = self.inputLayer.backward(dhiddenRaw)
gradInputState[:, t, :] = dinput
# Clip gradients
for dparam in [self.inputLayer.weights.deltas, self.hiddenLayer.weights.deltas, self.hiddenLayer.bias.deltas, self.outputLayer.weights.deltas, self.outputLayer.bias.deltas]:
np.clip(dparam, -self.gradientClipSize, self.gradientClipSize, out=dparam)
return gradInputState, dhiddenNext