Skip to content

abstractions.py

Abstract base classes for TensorImgPipeline.

This package provides the core abstractions used throughout the pipeline framework.

Copyright (C) 2025 Matti Kaupenjohann

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with this program. If not, see https://www.gnu.org/licenses/.

AbstractConfig dataclass

Bases: ABC

Abstract base class for configuration objects.

Provides common functionality for: - Path string to Path object conversion - Parameter validation - Configuration validation

Subclasses must implement the validate() method to define their specific validation logic.

Source code in tipi/abstractions/config.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@dataclass
class AbstractConfig(ABC):
    """Abstract base class for configuration objects.

    Provides common functionality for:
    - Path string to Path object conversion
    - Parameter validation
    - Configuration validation

    Subclasses must implement the validate() method to define
    their specific validation logic.
    """

    def __post_init__(self) -> None:
        """Post-initialization hook.

        Applies path conversions and runs validation.
        """
        self._apply_path()
        self.validate()

    @abstractmethod
    def validate(self) -> None:
        """Validate the configuration.

        Subclasses should implement this method to check that all
        configuration values are valid and meet requirements.

        Raises:
            InvalidConfigError: If configuration is invalid.
        """
        ...

    def validate_params(self, params: dict[str, Any], cls: type) -> None:
        """Validate that parameters match a class constructor signature.

        Args:
            params: Dictionary of parameter names to values.
            cls: The class whose constructor signature to validate against.

        Raises:
            InvalidConfigError: If params contain unexpected keys.
        """
        signature = inspect.signature(cls)  # Get constructor signature

        # Get expected parameters (excluding 'self')
        expected_params = list(signature.parameters.keys())
        if "self" in expected_params:
            expected_params.remove("self")

        # Check if all required parameters are provided
        if not set(params.keys()).issubset(expected_params):
            raise InvalidConfigError(context="params-not-valid", value=str(cls))

    def _apply_path(self) -> None:
        """Convert string fields to Path objects where appropriate.

        Examines all dataclass fields and converts string values to Path
        objects if the field's type hint includes Path in a Union.
        """
        hints = get_type_hints(self.__class__)
        for _field in fields(self):
            field_name = _field.name
            field_type = hints[field_name]
            value = getattr(self, field_name)

            # Check if the field type is a union
            origin = get_origin(field_type)
            if origin is Union:
                args = get_args(field_type)
                # If the field accepts Path and also a string type
                if Path in args and isinstance(value, str):
                    setattr(self, field_name, Path(value))

__post_init__()

Post-initialization hook.

Applies path conversions and runs validation.

Source code in tipi/abstractions/config.py
43
44
45
46
47
48
49
def __post_init__(self) -> None:
    """Post-initialization hook.

    Applies path conversions and runs validation.
    """
    self._apply_path()
    self.validate()

validate() abstractmethod

Validate the configuration.

Subclasses should implement this method to check that all configuration values are valid and meet requirements.

Raises:

Type Description
InvalidConfigError

If configuration is invalid.

Source code in tipi/abstractions/config.py
51
52
53
54
55
56
57
58
59
60
61
@abstractmethod
def validate(self) -> None:
    """Validate the configuration.

    Subclasses should implement this method to check that all
    configuration values are valid and meet requirements.

    Raises:
        InvalidConfigError: If configuration is invalid.
    """
    ...

validate_params(params, cls)

Validate that parameters match a class constructor signature.

Parameters:

Name Type Description Default
params dict[str, Any]

Dictionary of parameter names to values.

required
cls type

The class whose constructor signature to validate against.

required

Raises:

Type Description
InvalidConfigError

If params contain unexpected keys.

Source code in tipi/abstractions/config.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def validate_params(self, params: dict[str, Any], cls: type) -> None:
    """Validate that parameters match a class constructor signature.

    Args:
        params: Dictionary of parameter names to values.
        cls: The class whose constructor signature to validate against.

    Raises:
        InvalidConfigError: If params contain unexpected keys.
    """
    signature = inspect.signature(cls)  # Get constructor signature

    # Get expected parameters (excluding 'self')
    expected_params = list(signature.parameters.keys())
    if "self" in expected_params:
        expected_params.remove("self")

    # Check if all required parameters are provided
    if not set(params.keys()).issubset(expected_params):
        raise InvalidConfigError(context="params-not-valid", value=str(cls))

AbstractController

Bases: ABC

Abstract base class for pipeline controllers.

Controllers manage the execution flow of pipeline processes, including progress reporting and process lifecycle management.

Source code in tipi/abstractions/controller.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class AbstractController(ABC):
    """Abstract base class for pipeline controllers.

    Controllers manage the execution flow of pipeline processes,
    including progress reporting and process lifecycle management.
    """

    @abstractmethod
    def add_process(self, process: "PipelineProcess") -> None:
        """Add a process to the controller's execution queue.

        Args:
            process: The pipeline process to add.
        """
        ...

    @abstractmethod
    def _get_progress_decorator(self) -> Callable:
        """Get a decorator for progress reporting.

        Returns:
            A decorator function that wraps process execution with
            progress reporting capabilities.
        """
        ...

add_process(process) abstractmethod

Add a process to the controller's execution queue.

Parameters:

Name Type Description Default
process PipelineProcess

The pipeline process to add.

required
Source code in tipi/abstractions/controller.py
37
38
39
40
41
42
43
44
@abstractmethod
def add_process(self, process: "PipelineProcess") -> None:
    """Add a process to the controller's execution queue.

    Args:
        process: The pipeline process to add.
    """
    ...

Permanence

Bases: ABC

Base class for objects that persist through the entire pipeline lifecycle.

Permanences are stateful resources that: - Store structured data needed throughout pipeline execution - Are accessed by processes via controller.get_permanence(name) - Have managed lifecycles with hooks - Are extensible through abstraction

Example
class MyDataPermanence(Permanence):
    def __init__(self, path: Path):
        self.data = self._load_data(path)

    def initialize(self) -> None:
        # Setup phase - called before any process runs
        self._validate_data()

    def checkpoint(self) -> None:
        # Save intermediate state
        self._save_checkpoint()

    def cleanup(self) -> None:
        # Release resources
        del self.data
Source code in tipi/abstractions/permanence.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class Permanence(ABC):
    """Base class for objects that persist through the entire pipeline lifecycle.

    Permanences are stateful resources that:
    - Store structured data needed throughout pipeline execution
    - Are accessed by processes via controller.get_permanence(name)
    - Have managed lifecycles with hooks
    - Are extensible through abstraction

    Example:
        ```python
        class MyDataPermanence(Permanence):
            def __init__(self, path: Path):
                self.data = self._load_data(path)

            def initialize(self) -> None:
                # Setup phase - called before any process runs
                self._validate_data()

            def checkpoint(self) -> None:
                # Save intermediate state
                self._save_checkpoint()

            def cleanup(self) -> None:
                # Release resources
                del self.data
        ```
    """

    @abstractmethod
    def cleanup(self) -> None:
        """Cleans up data from RAM or VRAM.

        Called after all processes complete or on error.
        Should release any held resources (memory, file handles, connections).

        Raises:
            Exception: If cleanup fails
        """
        ...

    def initialize(self) -> None:
        """Initialize the permanence before pipeline execution.

        Called once after all permanences are constructed but before
        any process runs. Use for validation, resource allocation, or
        setup that depends on other permanences.

        Raises:
            Exception: If initialization fails
        """
        return

    def checkpoint(self) -> None:
        """Save intermediate state during pipeline execution.

        Called at configurable checkpoints during execution.
        Use for saving progress, creating backups, or logging state.

        Raises:
            Exception: If checkpointing fails
        """
        return

    def validate(self) -> None:
        """Validate the permanence state.

        Called to verify permanence is in valid state.
        Use for health checks, data validation, or consistency checks.

        Raises:
            Exception: If validation fails
        """
        return

    def get_state(self) -> dict[str, Any]:
        """Get serializable state for inspection or debugging.

        Returns a dictionary representation of the permanence state.
        Useful for logging, debugging, or state inspection.

        Returns:
            dict[str, Any]: Dictionary containing permanence state.
        """
        return {
            "type": self.__class__.__name__,
            "initialized": True,
        }

checkpoint()

Save intermediate state during pipeline execution.

Called at configurable checkpoints during execution. Use for saving progress, creating backups, or logging state.

Raises:

Type Description
Exception

If checkpointing fails

Source code in tipi/abstractions/permanence.py
58
59
60
61
62
63
64
65
66
67
def checkpoint(self) -> None:
    """Save intermediate state during pipeline execution.

    Called at configurable checkpoints during execution.
    Use for saving progress, creating backups, or logging state.

    Raises:
        Exception: If checkpointing fails
    """
    return

cleanup() abstractmethod

Cleans up data from RAM or VRAM.

Called after all processes complete or on error. Should release any held resources (memory, file handles, connections).

Raises:

Type Description
Exception

If cleanup fails

Source code in tipi/abstractions/permanence.py
34
35
36
37
38
39
40
41
42
43
44
@abstractmethod
def cleanup(self) -> None:
    """Cleans up data from RAM or VRAM.

    Called after all processes complete or on error.
    Should release any held resources (memory, file handles, connections).

    Raises:
        Exception: If cleanup fails
    """
    ...

get_state()

Get serializable state for inspection or debugging.

Returns a dictionary representation of the permanence state. Useful for logging, debugging, or state inspection.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Dictionary containing permanence state.

Source code in tipi/abstractions/permanence.py
80
81
82
83
84
85
86
87
88
89
90
91
92
def get_state(self) -> dict[str, Any]:
    """Get serializable state for inspection or debugging.

    Returns a dictionary representation of the permanence state.
    Useful for logging, debugging, or state inspection.

    Returns:
        dict[str, Any]: Dictionary containing permanence state.
    """
    return {
        "type": self.__class__.__name__,
        "initialized": True,
    }

initialize()

Initialize the permanence before pipeline execution.

Called once after all permanences are constructed but before any process runs. Use for validation, resource allocation, or setup that depends on other permanences.

Raises:

Type Description
Exception

If initialization fails

Source code in tipi/abstractions/permanence.py
46
47
48
49
50
51
52
53
54
55
56
def initialize(self) -> None:
    """Initialize the permanence before pipeline execution.

    Called once after all permanences are constructed but before
    any process runs. Use for validation, resource allocation, or
    setup that depends on other permanences.

    Raises:
        Exception: If initialization fails
    """
    return

validate()

Validate the permanence state.

Called to verify permanence is in valid state. Use for health checks, data validation, or consistency checks.

Raises:

Type Description
Exception

If validation fails

Source code in tipi/abstractions/permanence.py
69
70
71
72
73
74
75
76
77
78
def validate(self) -> None:
    """Validate the permanence state.

    Called to verify permanence is in valid state.
    Use for health checks, data validation, or consistency checks.

    Raises:
        Exception: If validation fails
    """
    return

PipelineProcess

Bases: ABC

Abstract base class for pipeline processes.

A process represents a unit of work within the pipeline that: - Can access permanences via a controller/manager - Can be skipped based on conditions - Executes its main logic via execute() - Can be forced to run via the force parameter

Example
class MyProcess(PipelineProcess):
    def __init__(self, controller, force: bool):
        super().__init__(controller, force)
        self.data = controller.get_permanence("data")

    def skip(self) -> bool:
        return not self.force and self.data.is_cached()

    def execute(self) -> None:
        # Process logic here
        self.data.process()
Source code in tipi/abstractions/process.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class PipelineProcess(ABC):
    """Abstract base class for pipeline processes.

    A process represents a unit of work within the pipeline that:
    - Can access permanences via a controller/manager
    - Can be skipped based on conditions
    - Executes its main logic via execute()
    - Can be forced to run via the force parameter

    Example:
        ```python
        class MyProcess(PipelineProcess):
            def __init__(self, controller, force: bool):
                super().__init__(controller, force)
                self.data = controller.get_permanence("data")

            def skip(self) -> bool:
                return not self.force and self.data.is_cached()

            def execute(self) -> None:
                # Process logic here
                self.data.process()
        ```
    """

    def __init__(self, controller: Any, force: bool) -> None:
        """Initialize the process.

        When overriding this method, make sure to call super().__init__(controller, force).

        Args:
            controller: The controller/manager providing access to permanences.
                       Should have a get_permanence(name: str) method.
            force: If True, process should run even if outputs exist.
        """
        self.controller = controller
        self.force = force

    @abstractmethod
    def execute(self) -> None:
        """Execute the process logic.

        This method should contain the main work of the process.
        It should handle any errors internally or let them propagate.

        Raises:
            Exception: Any exceptions during execution.
        """
        ...

    @abstractmethod
    def skip(self) -> bool:
        """Determine if the process should be skipped.

        Returns:
            True if the process should be skipped, False otherwise.
            Common reasons to skip:
            - Outputs already exist and force=False
            - Required inputs are missing
            - Conditional execution based on config
        """
        ...

__init__(controller, force)

Initialize the process.

When overriding this method, make sure to call super().init(controller, force).

Parameters:

Name Type Description Default
controller Any

The controller/manager providing access to permanences. Should have a get_permanence(name: str) method.

required
force bool

If True, process should run even if outputs exist.

required
Source code in tipi/abstractions/process.py
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(self, controller: Any, force: bool) -> None:
    """Initialize the process.

    When overriding this method, make sure to call super().__init__(controller, force).

    Args:
        controller: The controller/manager providing access to permanences.
                   Should have a get_permanence(name: str) method.
        force: If True, process should run even if outputs exist.
    """
    self.controller = controller
    self.force = force

execute() abstractmethod

Execute the process logic.

This method should contain the main work of the process. It should handle any errors internally or let them propagate.

Raises:

Type Description
Exception

Any exceptions during execution.

Source code in tipi/abstractions/process.py
69
70
71
72
73
74
75
76
77
78
79
@abstractmethod
def execute(self) -> None:
    """Execute the process logic.

    This method should contain the main work of the process.
    It should handle any errors internally or let them propagate.

    Raises:
        Exception: Any exceptions during execution.
    """
    ...

skip() abstractmethod

Determine if the process should be skipped.

Returns:

Type Description
bool

True if the process should be skipped, False otherwise.

bool

Common reasons to skip:

bool
  • Outputs already exist and force=False
bool
  • Required inputs are missing
bool
  • Conditional execution based on config
Source code in tipi/abstractions/process.py
81
82
83
84
85
86
87
88
89
90
91
92
@abstractmethod
def skip(self) -> bool:
    """Determine if the process should be skipped.

    Returns:
        True if the process should be skipped, False otherwise.
        Common reasons to skip:
        - Outputs already exist and force=False
        - Required inputs are missing
        - Conditional execution based on config
    """
    ...

ProcessConfig dataclass

Bases: AbstractConfig

Base configuration for pipeline processes.

Attributes:

Name Type Description
force bool

If True, forces execution even if outputs exist.

Source code in tipi/abstractions/config.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@dataclass
class ProcessConfig(AbstractConfig):
    """Base configuration for pipeline processes.

    Attributes:
        force: If True, forces execution even if outputs exist.
    """

    force: bool = False

    def validate(self) -> None:
        """Validate the process configuration.

        Raises:
            InvalidConfigError: If force is not a boolean.
        """
        if not isinstance(self.force, bool):
            raise InvalidConfigError(context="invalid-force-type", value=f"{self.force=}")

validate()

Validate the process configuration.

Raises:

Type Description
InvalidConfigError

If force is not a boolean.

Source code in tipi/abstractions/config.py
115
116
117
118
119
120
121
122
def validate(self) -> None:
    """Validate the process configuration.

    Raises:
        InvalidConfigError: If force is not a boolean.
    """
    if not isinstance(self.force, bool):
        raise InvalidConfigError(context="invalid-force-type", value=f"{self.force=}")