Newer
Older
from .module import Module, Sequential
#from .sequential import Sequential
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from typing import Protocol
class LearningLayer(Protocol):
def params(self) -> list:
...
class Optimizer(ABC):
"""
this is the base class for all optimizers
it takes the parameters (weights/biases) and gradients from layers and optimizes them
"""
__slots__ = ['name', 'learningRate', 'layers', 'scheduler', 'learning']
def __init__(self, layers: list | Module, learningRate: float) -> None:
self.name = self.__class__.__name__
self.learningRate = learningRate
# here I make the assumption that the user will want to use a sequential execution of the layers in question
if isinstance(layers, list):
self.layers = Sequential(layers)
elif isinstance(layers, Module):
self.layers = layers
else:
raise TypeError('The layers argument must be either a list or an instance of the Module class.')
@abstractmethod
def update(self, params: list[Weights]) -> None:
"""
implemented according to algorithm for every optimizer
"""
pass
def step(self, gradient: np.ndarray) -> None:
"""
stepping through the layers in reverse order, calls gradient method for each layer
"""
_ = self.layers.backward(gradient)
for layer in reversed(self.layers):
try:
params = layer.params()
except AttributeError:
# 'params' method not found in the layer, skip updating
continue
self.update(params)
self.postStep()
def postStep(self) -> None:
"""
an optional method used for optimizers to perform stuff after updating layer weights
"""
pass
def __str__(self) -> str:
printString = self.name
printString += ' learningRate: ' + str(self.learningRate)
#printString += ' learning layers: ' + str(len(self.learning))
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
return printString
class SGD(Optimizer):
"""
Stochastic gradient descent
"""
__slots__ = []
def __init__(self, layers: list, learningRate: float) -> None:
super().__init__(layers, learningRate)
def update(self, params: list[Weights]) -> None:
for param in params:
param.values -= self.learningRate * param.deltas
class SGDMomentum(Optimizer):
"""
Stochastic gradient descent with momentum on mini-batches.
"""
__slots__ = ['momentum']
def __init__(self, layers: list, learningRate: float, momentum: float) -> None:
super().__init__(layers, learningRate)
self.momentum = momentum
def update(self, params: list[Weights]) -> None:
for param in params:
if param.prevValues is None:
param.prevValues = np.zeros(param.values.shape)
delta = self.learningRate * param.deltas - self.momentum * param.prevValues
param.values -= delta
param.prevValues = delta
class NesterovMomentum(Optimizer):
"""
Stochastic Gradient Descent with Nesterov Momentum specialiation
"""
__slots__ = ['momentum']
def __init__(self, layers: list, learningRate: float, momentum: float = .9) -> None:
super().__init__(layers, learningRate)
self.momentum = momentum
def update(self, params: list[Weights]) -> None:
for param in params:
if param.prevValues is None:
param.prevValues = np.zeros(param.values.shape)
momentum_term = self.momentum * param.prevValues
param.prevValues = momentum_term - self.learningRate * param.deltas
param.values += momentum_term - self.learningRate * param.deltas
class AdaGrad(Optimizer):
"""
AdaGrad adaptive optimisation algorithm
"""
__slots__ = ['epsilon']
def __init__(self, layers: list, learningRate: float, epsilon: float = 1e-6) -> None:
super().__init__(layers, learningRate)
self.epsilon = epsilon
def update(self, params: list[Weights]) -> None:
for param in params:
if param.cache is None:
param.cache = np.zeros(param.values.shape)
param.cache += param.deltas ** 2
param.values += -self.learningRate * param.deltas / (np.sqrt(param.cache) + self.epsilon)
class AdaDelta(Optimizer):
"""
AdaDelta optimization algorithm
"""
__slots__ = ['epsilon']
def __init__(self, layers: list, learningRate: float, rho: float = 0.9, epsilon: float = 1e-6) -> None:
super().__init__(layers, learningRate)
self.rho = rho
self.epsilon = epsilon
def update(self, params: list[Weights]) -> None:
for param in params:
if param.cache is None:
param.cache = {'cache': np.zeros(param.values.shape), 'delta': np.zeros(param.values.shape)}
param.cache['cache'] = self.rho * param.cache['cache'] + (1 - self.rho) * param.deltas ** 2
update = param.deltas * np.sqrt(param.cache['delta'] + self.epsilon) / np.sqrt(param.cache['cache'] + self.epsilon)
param.values -= self.learningRate * update
param.cache['delta'] = self.rho * param.cache['delta'] + (1 - self.rho) * update ** 2
class RMSprop(Optimizer):
"""
RMSprop adaptive optimization algorithm
"""
__slots__ = ['decayRate', 'epsilon']
def __init__(self, layers: list, learningRate: float, decayRate: float = 0.9, epsilon: float = 1e-6) -> None:
super().__init__(layers, learningRate)
self.decayRate = decayRate
self.epsilon = epsilon
def update(self, params: list[Weights]) -> None:
for param in params:
if param.cache is None:
param.cache = np.zeros(param.values.shape)
param.cache = self.decayRate * param.cache + (1 - self.decayRate) * param.deltas ** 2
param.values += - self.learningRate * param.deltas / (np.sqrt(param.cache) + self.epsilon)
class Adam(Optimizer):
"""
Adam optimizer, bias correction is implemented.
"""
__slots__ = ['t', 'beta1', 'beta2', 'epsilon']
def __init__(self, layers: list, learningRate: float, beta1: float = 0.9, beta2: float = 0.999, epsilon: float = 10e-8) -> None:
super().__init__(layers, learningRate)
self.t = 1
self.beta1 = beta1
self.beta2 = beta2
self.epsilon = epsilon
def update(self, params: list[Weights]) -> None:
for param in params:
if param.cache is None:
param.cache = {'m': np.zeros(param.values.shape), 'v': np.zeros(param.values.shape)}
param.cache['m'] = self.beta1 * param.cache['m'] + (1 - self.beta1) * param.deltas
param.cache['v'] = self.beta2 * param.cache['v'] + (1 - self.beta2) * param.deltas ** 2
mCorrected = param.cache['m'] / (1 - self.beta1 ** self.t)
vCorrected = param.cache['v'] / (1 - self.beta2 ** self.t)
param.values += -self.learningRate * mCorrected / (np.sqrt(vCorrected) + self.epsilon)
def postStep(self) -> None:
self.t += 1