Skip to content
Snippets Groups Projects
optim.py 7.12 KiB
Newer Older
johannes bilk's avatar
johannes bilk committed
import numpy as np
from abc import ABC, abstractmethod
johannes bilk's avatar
johannes bilk committed
from .module import Module, Sequential
#from .sequential import Sequential
johannes bilk's avatar
johannes bilk committed
from .layer import Weights
johannes bilk's avatar
johannes bilk committed
from typing import Protocol


class LearningLayer(Protocol):
    def params(self) -> list:
        ...


class Optimizer(ABC):
    """
    this is the base class for all optimizers
    it takes the parameters (weights/biases) and gradients from layers and optimizes them
    """
    __slots__ = ['name', 'learningRate', 'layers', 'scheduler', 'learning']

    def __init__(self, layers: list | Module, learningRate: float) -> None:
        self.name = self.__class__.__name__
        self.learningRate = learningRate

        # here I make the assumption that the user will want to use a sequential execution of the layers in question
        if isinstance(layers, list):
            self.layers = Sequential(layers)
        elif isinstance(layers, Module):
            self.layers = layers
        else:
            raise TypeError('The layers argument must be either a list or an instance of the Module class.')

    @abstractmethod
    def update(self, params: list[Weights]) -> None:
        """
        implemented according to algorithm for every optimizer
        """
        pass

    def step(self, gradient: np.ndarray) -> None:
        """
        stepping through the layers in reverse order, calls gradient method for each layer
        """
        _ = self.layers.backward(gradient)
        for layer in reversed(self.layers):
            try:
                params = layer.params()
            except AttributeError:
                # 'params' method not found in the layer, skip updating
                continue

            self.update(params)

        self.postStep()

    def postStep(self) -> None:
        """
        an optional method used for optimizers to perform stuff after updating layer weights
        """
        pass

    def __str__(self) -> str:
        printString = self.name
        printString += '       learningRate: ' + str(self.learningRate)
        #printString += '    learning layers: ' + str(len(self.learning))
johannes bilk's avatar
johannes bilk committed
        return printString


class SGD(Optimizer):
    """
    Stochastic gradient descent
    """
    __slots__ = []

    def __init__(self, layers: list, learningRate: float) -> None:
        super().__init__(layers, learningRate)

    def update(self, params: list[Weights]) -> None:
        for param in params:
            param.values -= self.learningRate * param.deltas


class SGDMomentum(Optimizer):
    """
    Stochastic gradient descent with momentum on mini-batches.
    """
    __slots__ = ['momentum']

    def __init__(self, layers: list, learningRate: float, momentum: float) -> None:
        super().__init__(layers, learningRate)
        self.momentum = momentum

    def update(self, params: list[Weights]) -> None:
        for param in params:
            if param.prevValues is None:
                param.prevValues = np.zeros(param.values.shape)
            delta = self.learningRate * param.deltas - self.momentum * param.prevValues
            param.values -= delta
            param.prevValues = delta


class NesterovMomentum(Optimizer):
    """
    Stochastic Gradient Descent with Nesterov Momentum specialiation
    """
    __slots__ = ['momentum']

    def __init__(self, layers: list, learningRate: float, momentum: float = .9) -> None:
        super().__init__(layers, learningRate)
        self.momentum = momentum

    def update(self, params: list[Weights]) -> None:
        for param in params:
            if param.prevValues is None:
                param.prevValues = np.zeros(param.values.shape)

            momentum_term = self.momentum * param.prevValues
            param.prevValues = momentum_term - self.learningRate * param.deltas
            param.values += momentum_term - self.learningRate * param.deltas


class AdaGrad(Optimizer):
    """
    AdaGrad adaptive optimisation algorithm
    """
    __slots__ = ['epsilon']

    def __init__(self, layers: list, learningRate: float, epsilon: float = 1e-6) -> None:
        super().__init__(layers, learningRate)
        self.epsilon = epsilon

    def update(self, params: list[Weights]) -> None:
        for param in params:
            if param.cache is None:
                param.cache = np.zeros(param.values.shape)
            param.cache += param.deltas ** 2
            param.values += -self.learningRate * param.deltas / (np.sqrt(param.cache) + self.epsilon)


class AdaDelta(Optimizer):
    """
    AdaDelta optimization algorithm
    """
    __slots__ = ['epsilon']

    def __init__(self, layers: list, learningRate: float, rho: float = 0.9, epsilon: float = 1e-6) -> None:
        super().__init__(layers, learningRate)
        self.rho = rho
        self.epsilon = epsilon

    def update(self, params: list[Weights]) -> None:
        for param in params:
            if param.cache is None:
                param.cache = {'cache': np.zeros(param.values.shape), 'delta': np.zeros(param.values.shape)}
            param.cache['cache'] = self.rho * param.cache['cache'] + (1 - self.rho) * param.deltas ** 2
            update = param.deltas * np.sqrt(param.cache['delta'] + self.epsilon) / np.sqrt(param.cache['cache'] + self.epsilon)
            param.values -= self.learningRate * update
            param.cache['delta'] = self.rho * param.cache['delta'] + (1 - self.rho) * update ** 2


class RMSprop(Optimizer):
    """
    RMSprop adaptive optimization algorithm
    """
    __slots__ = ['decayRate', 'epsilon']

    def __init__(self, layers: list, learningRate: float, decayRate: float = 0.9, epsilon: float = 1e-6) -> None:
        super().__init__(layers, learningRate)
        self.decayRate = decayRate
        self.epsilon = epsilon

    def update(self, params: list[Weights]) -> None:
        for param in params:
            if param.cache is None:
                param.cache = np.zeros(param.values.shape)
            param.cache = self.decayRate * param.cache + (1 - self.decayRate) * param.deltas ** 2
            param.values += - self.learningRate * param.deltas / (np.sqrt(param.cache) + self.epsilon)


class Adam(Optimizer):
    """
    Adam optimizer, bias correction is implemented.
    """
    __slots__ = ['t', 'beta1', 'beta2', 'epsilon']

    def __init__(self, layers: list, learningRate: float, beta1: float = 0.9, beta2: float = 0.999, epsilon: float = 10e-8) -> None:
        super().__init__(layers, learningRate)
        self.t = 1
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon

    def update(self, params: list[Weights]) -> None:
        for param in params:
            if param.cache is None:
                param.cache = {'m': np.zeros(param.values.shape), 'v': np.zeros(param.values.shape)}
            param.cache['m'] = self.beta1 * param.cache['m'] + (1 - self.beta1) * param.deltas
            param.cache['v'] = self.beta2 * param.cache['v'] + (1 - self.beta2) * param.deltas ** 2
            mCorrected = param.cache['m'] / (1 - self.beta1 ** self.t)
            vCorrected = param.cache['v'] / (1 - self.beta2 ** self.t)
            param.values += -self.learningRate * mCorrected / (np.sqrt(vCorrected) + self.epsilon)

    def postStep(self) -> None:
        self.t += 1