Skip to content

controller.py

This module defines an PipelineController responsible for managing a pipeline of processes and handling potential errors that occur during their execution.

Classes:

Name Description
PipelineController

Manages a pipeline of processes and handles errors that occur during.

Copyright (C) 2025 Matti Kaupenjohann

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with this program. If not, see https://www.gnu.org/licenses/.

PipelineController

Coordinates pipeline permanences and processes.

Responsibilities: - Manage permanence lifecycle - Provide permanence access to processes - Instantiate processes with their parameters - Yield processes for execution

Source code in tipi/core/controller.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class PipelineController:
    """Coordinates pipeline permanences and processes.

    Responsibilities:
    - Manage permanence lifecycle
    - Provide permanence access to processes
    - Instantiate processes with their parameters
    - Yield processes for execution
    """

    def __init__(self, permanences: dict[str, Permanence], process_specs: list[ProcessWithParams]) -> None:
        self._permanences = permanences
        self._process_specs = process_specs

    def get_permanence(self, name: str, default: Any = _MISSING) -> Any:
        """Get a permanence by name.

        Args:
            name: Name of the permanence to retrieve
            default: Value to return if permanence not found. If not provided,
                    raises PermanenceKeyError instead. Can be None.

        Returns:
            The permanence instance or default value

        Raises:
            PermanenceKeyError: If permanence not found and no default provided
        """
        if default is not _MISSING:
            return self._permanences.get(name, default)
        if name not in self._permanences:
            raise PermanenceKeyError(ErrorCode.PERMA_KEY, key=name)
        return self._permanences[name]

    def iterate_processes(self) -> Iterator[tuple[int, PipelineProcess]]:
        """Yield (index, process_instance) for execution."""
        for idx, spec in enumerate(self._process_specs):
            process_instance = spec.get_instance(self)
            yield idx, process_instance

    def get_process_count(self) -> int:
        """Get total number of processes."""
        return len(self._process_specs)

    def iterate_permanences(self) -> Iterator[Permanence]:
        """Yield permanence instances for cleanup or inspection."""
        yield from self._permanences.values()

    def get_permanence_count(self) -> int:
        """Get total number of permanences."""
        return len(self._permanences)

    def cleanup(self) -> None:
        """Cleanup all permanences."""
        for permanence in self._permanences.values():
            permanence.cleanup()

cleanup()

Cleanup all permanences.

Source code in tipi/core/controller.py
93
94
95
96
def cleanup(self) -> None:
    """Cleanup all permanences."""
    for permanence in self._permanences.values():
        permanence.cleanup()

get_permanence(name, default=_MISSING)

Get a permanence by name.

Parameters:

Name Type Description Default
name str

Name of the permanence to retrieve

required
default Any

Value to return if permanence not found. If not provided, raises PermanenceKeyError instead. Can be None.

_MISSING

Returns:

Type Description
Any

The permanence instance or default value

Raises:

Type Description
PermanenceKeyError

If permanence not found and no default provided

Source code in tipi/core/controller.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def get_permanence(self, name: str, default: Any = _MISSING) -> Any:
    """Get a permanence by name.

    Args:
        name: Name of the permanence to retrieve
        default: Value to return if permanence not found. If not provided,
                raises PermanenceKeyError instead. Can be None.

    Returns:
        The permanence instance or default value

    Raises:
        PermanenceKeyError: If permanence not found and no default provided
    """
    if default is not _MISSING:
        return self._permanences.get(name, default)
    if name not in self._permanences:
        raise PermanenceKeyError(ErrorCode.PERMA_KEY, key=name)
    return self._permanences[name]

get_permanence_count()

Get total number of permanences.

Source code in tipi/core/controller.py
89
90
91
def get_permanence_count(self) -> int:
    """Get total number of permanences."""
    return len(self._permanences)

get_process_count()

Get total number of processes.

Source code in tipi/core/controller.py
81
82
83
def get_process_count(self) -> int:
    """Get total number of processes."""
    return len(self._process_specs)

iterate_permanences()

Yield permanence instances for cleanup or inspection.

Source code in tipi/core/controller.py
85
86
87
def iterate_permanences(self) -> Iterator[Permanence]:
    """Yield permanence instances for cleanup or inspection."""
    yield from self._permanences.values()

iterate_processes()

Yield (index, process_instance) for execution.

Source code in tipi/core/controller.py
75
76
77
78
79
def iterate_processes(self) -> Iterator[tuple[int, PipelineProcess]]:
    """Yield (index, process_instance) for execution."""
    for idx, spec in enumerate(self._process_specs):
        process_instance = spec.get_instance(self)
        yield idx, process_instance