Skip to content
Snippets Groups Projects
optim.py 7.12 KiB
Newer Older
  • Learn to ignore specific revisions
  • johannes bilk's avatar
    johannes bilk committed
    import numpy as np
    from abc import ABC, abstractmethod
    
    johannes bilk's avatar
    johannes bilk committed
    from .module import Module, Sequential
    #from .sequential import Sequential
    
    johannes bilk's avatar
    johannes bilk committed
    from .weights import Weights
    from typing import Protocol
    
    
    class LearningLayer(Protocol):
        def params(self) -> list:
            ...
    
    
    class Optimizer(ABC):
        """
        this is the base class for all optimizers
        it takes the parameters (weights/biases) and gradients from layers and optimizes them
        """
        __slots__ = ['name', 'learningRate', 'layers', 'scheduler', 'learning']
    
        def __init__(self, layers: list | Module, learningRate: float) -> None:
            self.name = self.__class__.__name__
            self.learningRate = learningRate
    
            # here I make the assumption that the user will want to use a sequential execution of the layers in question
            if isinstance(layers, list):
                self.layers = Sequential(layers)
            elif isinstance(layers, Module):
                self.layers = layers
            else:
                raise TypeError('The layers argument must be either a list or an instance of the Module class.')
    
        @abstractmethod
        def update(self, params: list[Weights]) -> None:
            """
            implemented according to algorithm for every optimizer
            """
            pass
    
        def step(self, gradient: np.ndarray) -> None:
            """
            stepping through the layers in reverse order, calls gradient method for each layer
            """
            _ = self.layers.backward(gradient)
            for layer in reversed(self.layers):
                try:
                    params = layer.params()
                except AttributeError:
                    # 'params' method not found in the layer, skip updating
                    continue
    
                self.update(params)
    
            self.postStep()
    
        def postStep(self) -> None:
            """
            an optional method used for optimizers to perform stuff after updating layer weights
            """
            pass
    
        def __str__(self) -> str:
            printString = self.name
            printString += '       learningRate: ' + str(self.learningRate)
    
            #printString += '    learning layers: ' + str(len(self.learning))
    
    johannes bilk's avatar
    johannes bilk committed
            return printString
    
    
    class SGD(Optimizer):
        """
        Stochastic gradient descent
        """
        __slots__ = []
    
        def __init__(self, layers: list, learningRate: float) -> None:
            super().__init__(layers, learningRate)
    
        def update(self, params: list[Weights]) -> None:
            for param in params:
                param.values -= self.learningRate * param.deltas
    
    
    class SGDMomentum(Optimizer):
        """
        Stochastic gradient descent with momentum on mini-batches.
        """
        __slots__ = ['momentum']
    
        def __init__(self, layers: list, learningRate: float, momentum: float) -> None:
            super().__init__(layers, learningRate)
            self.momentum = momentum
    
        def update(self, params: list[Weights]) -> None:
            for param in params:
                if param.prevValues is None:
                    param.prevValues = np.zeros(param.values.shape)
                delta = self.learningRate * param.deltas - self.momentum * param.prevValues
                param.values -= delta
                param.prevValues = delta
    
    
    class NesterovMomentum(Optimizer):
        """
        Stochastic Gradient Descent with Nesterov Momentum specialiation
        """
        __slots__ = ['momentum']
    
        def __init__(self, layers: list, learningRate: float, momentum: float = .9) -> None:
            super().__init__(layers, learningRate)
            self.momentum = momentum
    
        def update(self, params: list[Weights]) -> None:
            for param in params:
                if param.prevValues is None:
                    param.prevValues = np.zeros(param.values.shape)
    
                momentum_term = self.momentum * param.prevValues
                param.prevValues = momentum_term - self.learningRate * param.deltas
                param.values += momentum_term - self.learningRate * param.deltas
    
    
    class AdaGrad(Optimizer):
        """
        AdaGrad adaptive optimisation algorithm
        """
        __slots__ = ['epsilon']
    
        def __init__(self, layers: list, learningRate: float, epsilon: float = 1e-6) -> None:
            super().__init__(layers, learningRate)
            self.epsilon = epsilon
    
        def update(self, params: list[Weights]) -> None:
            for param in params:
                if param.cache is None:
                    param.cache = np.zeros(param.values.shape)
                param.cache += param.deltas ** 2
                param.values += -self.learningRate * param.deltas / (np.sqrt(param.cache) + self.epsilon)
    
    
    class AdaDelta(Optimizer):
        """
        AdaDelta optimization algorithm
        """
        __slots__ = ['epsilon']
    
        def __init__(self, layers: list, learningRate: float, rho: float = 0.9, epsilon: float = 1e-6) -> None:
            super().__init__(layers, learningRate)
            self.rho = rho
            self.epsilon = epsilon
    
        def update(self, params: list[Weights]) -> None:
            for param in params:
                if param.cache is None:
                    param.cache = {'cache': np.zeros(param.values.shape), 'delta': np.zeros(param.values.shape)}
                param.cache['cache'] = self.rho * param.cache['cache'] + (1 - self.rho) * param.deltas ** 2
                update = param.deltas * np.sqrt(param.cache['delta'] + self.epsilon) / np.sqrt(param.cache['cache'] + self.epsilon)
                param.values -= self.learningRate * update
                param.cache['delta'] = self.rho * param.cache['delta'] + (1 - self.rho) * update ** 2
    
    
    class RMSprop(Optimizer):
        """
        RMSprop adaptive optimization algorithm
        """
        __slots__ = ['decayRate', 'epsilon']
    
        def __init__(self, layers: list, learningRate: float, decayRate: float = 0.9, epsilon: float = 1e-6) -> None:
            super().__init__(layers, learningRate)
            self.decayRate = decayRate
            self.epsilon = epsilon
    
        def update(self, params: list[Weights]) -> None:
            for param in params:
                if param.cache is None:
                    param.cache = np.zeros(param.values.shape)
                param.cache = self.decayRate * param.cache + (1 - self.decayRate) * param.deltas ** 2
                param.values += - self.learningRate * param.deltas / (np.sqrt(param.cache) + self.epsilon)
    
    
    class Adam(Optimizer):
        """
        Adam optimizer, bias correction is implemented.
        """
        __slots__ = ['t', 'beta1', 'beta2', 'epsilon']
    
        def __init__(self, layers: list, learningRate: float, beta1: float = 0.9, beta2: float = 0.999, epsilon: float = 10e-8) -> None:
            super().__init__(layers, learningRate)
            self.t = 1
            self.beta1 = beta1
            self.beta2 = beta2
            self.epsilon = epsilon
    
        def update(self, params: list[Weights]) -> None:
            for param in params:
                if param.cache is None:
                    param.cache = {'m': np.zeros(param.values.shape), 'v': np.zeros(param.values.shape)}
                param.cache['m'] = self.beta1 * param.cache['m'] + (1 - self.beta1) * param.deltas
                param.cache['v'] = self.beta2 * param.cache['v'] + (1 - self.beta2) * param.deltas ** 2
                mCorrected = param.cache['m'] / (1 - self.beta1 ** self.t)
                vCorrected = param.cache['v'] / (1 - self.beta2 ** self.t)
                param.values += -self.learningRate * mCorrected / (np.sqrt(vCorrected) + self.epsilon)
    
        def postStep(self) -> None:
            self.t += 1