Skip to content
Snippets Groups Projects
pooling2d.py 3.35 KiB
Newer Older
  • Learn to ignore specific revisions
  • johannes bilk's avatar
    johannes bilk committed
    import numpy as np
    from .convolution2D import getWindows, checkDims, assignParameter
    from .layer import Layer
    from abc import ABC, abstractmethod
    
    
    class Pooling2D(Layer, ABC):
        """
        pooling layer to be used after convolution
        it can do min/max pooling, avg/mean pooling hasn't been implemented, because backwards pass is a bit more complicated
        """
        def __init__(self, kernelSize: tuple = (2,2), stride: tuple = (2,2)) -> None:
            super().__init__()
            self.kernelSize = assignParameter(kernelSize)
            self.xKern, self.yKern = kernelSize[0], kernelSize[1]
            self.stride = assignParameter(stride)
            self.xStride, self.yStride = stride[0], stride[1]
            self.xSize, self.ySize = None, None
            self.channels = None
            self.xOut, self.yOut = None, None
    
        @abstractmethod
        def _function(self) -> np.ndarray:
            raise NotImplementedError('this needs to be implemented')
    
        @abstractmethod
        def _derivative(self) -> np.ndarray:
            raise NotImplementedError('this needs to be implemented')
    
        def forward(self, input: np.ndarray) -> np.ndarray:
            """
            The forward pass of pooling
            """
            self.input = input
            checkDims(input)
            self.batchSize = input.shape[0]
    
            # setting output sizes
            if self.xOut is None or self.yOut is None:
                _, self.channels, self.xSize, self.ySize = input.shape
                self.xOut = int((self.xSize - self.xKern) / self.xStride) + 1
                self.yOut = int((self.ySize - self.yKern) / self.yStride) + 1
    
            # setting output size for backward pass
            self.outputShape = (self.batchSize, self.channels, self.xOut, self.yOut)
    
            self.output = getWindows(input, self.kernelSize, self.outputShape, stride=self.stride)
            self.output = self.output.reshape(self.batchSize, self.channels, -1, self.xKern * self.yKern)
            return self._function().reshape(self.batchSize, self.channels, self.xOut, self.yOut)
    
        def backward(self, gradient: np.ndarray) -> np.ndarray:
            """
            The backward pass of pooling
            """
            gradient = gradient.reshape(self.batchSize, self.channels, -1, 1)
            return gradient * self._derivative()
    
    
    class MaxPooling2D(Pooling2D):
        def __init__(self, kernelSize: tuple = (2,2), stride: tuple = (2,2)) -> None:
            super().__init__(kernelSize, stride)
    
        def _function(self, keepdims: bool = False) -> np.ndarray:
            return np.max(self.output, axis=3, keepdims=keepdims)
    
        def _derivative(self) -> np.ndarray:
            return (self.output == self._function(keepdims=True)).astype(int)
    
    
    class MinPooling2D(Pooling2D):
        def __init__(self, kernelSize: tuple = (2,2), stride: tuple = (2,2)) -> None:
            super().__init__(kernelSize, stride)
    
        def _function(self, keepdims: bool = False) -> np.ndarray:
            return np.min(self.output, axis=3, keepdims=keepdims)
    
        def _derivative(self) -> np.ndarray:
            return (self.output == self._function(keepdims=True)).astype(int)
    
    
    class AvgPooling2D(Pooling2D):
        def __init__(self, kernelSize: tuple = (2,2), stride: tuple = (2,2)) -> None:
            super().__init__(kernelSize, stride)
    
        def _function(self, keepdims: bool = False) -> np.ndarray:
            return np.mean(self.output, axis=3, keepdims=keepdims)
    
        def _derivative(self) -> np.ndarray:
            return np.ones_like(self.output) / (self.xKern * self.yKern)