TensorImgPipeline - Proposed Architecture¶
Philosophy: Progressive Enhancement - Start Simple, Scale When Needed
This architecture supports researchers from initial script to production pipeline with minimal friction at each step.
Overview Architecture Diagram¶
┌────────────────────────────────────────────────────────────────────────────┐
│ CLI LAYER │
│ (cli.py) │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ • Parse command line arguments with Typer │ │
│ │ • Display formatted errors with Rich │ │
│ │ • Delegate to PipelineRunner │ │
│ │ │ │
│ │ @app.command() │ │
│ │ def build_pipeline(pipeline_name: str): │ │
│ │ runner = PipelineRunner(pipeline_name) │ │
│ │ runner.run() │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ APPLICATION LAYER │
│ (runner.py) │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ class PipelineRunner: │ │
│ │ • High-level orchestration │ │
│ │ • Coordinates Builder → Controller → Executor │ │
│ │ • Handles WandB sweep integration │ │
│ │ • Provides programmatic API entry point │ │
│ │ │ │
│ │ def build() -> (controller, error) │ │
│ │ def run() -> None │ │
│ │ def _run_once(controller) -> None │ │
│ │ def _run_with_sweep(controller, wandb) -> None │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────────┬─────────────────────────────────┘
│
┌───────────────────┴───────────────────┐
▼ ▼
┌────────────────────────────────────────┐ ┌────────────────────────────────┐
│ BUILDER LAYER │ │ CONTROLLER LAYER │
│ (builder.py) │ │ (controller.py) │
│ ┌─────────────────────────────────┐ │ │ ┌───────────────────────────┐ │
│ │ class PipelineBuilder: │ │ │ │ class PipelineController │ │
│ │ • Load & validate config │ │ │ │ • Manage permanence │ │
│ │ • Register classes │ │ │ │ lifecycle │ │
│ │ • Build permanences │ │ │ │ • Provide permanence │ │
│ │ • Build process specs │ │ │ │ access │ │
│ │ │ │ │ │ • Instantiate │ │
│ │ def build() -> │ │ │ │ processes │ │
│ │ (permanences, │───┼───┼▶│ • Coordinate cleanup │ │
│ │ process_specs, │ │ │ │ │ │
│ │ error) │ │ │ │ Methods: │ │
│ └─────────────────────────────────┘ │ │ │ • initialize_ │ │
└────────────────────────────────────────┘ │ │ permanences() │ │
│ │ • validate_ │ │
│ │ permanences() │ │
│ │ • checkpoint_ │ │
│ │ permanences() │ │
│ │ • get_permanence() │ │
│ │ • iterate_processes() │ │
│ │ • cleanup() │ │
│ └───────────────────────────┘ │
└────────────┬───────────────────┘
│
▼
┌───────────────────────────────┐
│ EXECUTOR LAYER │
│ (executor.py) │
│ ┌────────────────────────┐ │
│ │ class PipelineExecutor│ │
│ │ • Execute processes │ │
│ │ • Apply progress │ │
│ │ decoration │ │
│ │ • Handle nested │ │
│ │ progress bars │ │
│ │ • Integrate WandB │ │
│ │ • Error handling │ │
│ │ │ │
│ │ Methods: │ │
│ │ • run() │ │
│ │ • _run_processes() │ │
│ │ • _run_cleanup() │ │
│ │ • _handle_error() │ │
│ └────────────────────────┘ │
└────────────┬──────────────────┘
│
┌───────────────────────────────────────┴─────────────┐
▼ ▼
┌─────────────────────────────────────────┐ ┌────────────────────────────────┐
│ PERMANENCES │ │ PROCESSES │
│ (abstractions.py) │ │ (abstractions.py) │
│ ┌──────────────────────────────────┐ │ │ ┌──────────────────────────┐ │
│ │ class Permanence(ABC): │ │ │ │ class PipelineProcess: │ │
│ │ • cleanup() │ │ │ │ • execute() │ │
│ │ • initialize() [optional] │ │ │ │ • skip() │ │
│ │ • validate() [optional] │ │ │ │ • Access controller │ │
│ │ • checkpoint() [optional] │ │ │ └──────────────────────────┘ │
│ │ • get_state() [optional] │ │ │ │
│ └──────────────────────────────────┘ │ │ Implementations: │
│ │ │ • ResultProcess │
│ Implementations: │ │ • TrainingProcess │
│ ┌──────────────────────────────────┐ │ │ • ValidationProcess │
│ │ Device │ │ │ • TestProcess │
│ │ • Manages CUDA device │ │ │ • DataLoadProcess │
│ │ • Selects best GPU │ │ │ • ...custom processes │
│ ├──────────────────────────────────┤ │ └────────────────────────────────┘
│ │ ProgressManager │ │
│ │ • Rich progress bars │ │
│ │ • Nested progress tracking │ │
│ ├──────────────────────────────────┤ │
│ │ WandBManager │ │
│ │ • Experiment logging │ │
│ │ • Sweep management │ │
│ ├──────────────────────────────────┤ │
│ │ Network │ │
│ │ • Model instance │ │
│ │ • Model state │ │
│ ├──────────────────────────────────┤ │
│ │ Data │ │
│ │ • Datasets │ │
│ │ • Data loaders │ │
│ ├──────────────────────────────────┤ │
│ │ Hyperparameters │ │
│ │ • Configuration │ │
│ │ • Sweep config │ │
│ └──────────────────────────────────┘ │
└─────────────────────────────────────────┘
Permanence Lifecycle Diagram¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ PERMANENCE LIFECYCLE │
└─────────────────────────────────────────────────────────────────────────────┘
Phase 1: CONSTRUCTION
┌─────────────────────────────────────────────────────────────┐
│ Builder instantiates permanence with config params │
│ permanence = Device() │
│ permanence = ProgressManager(console=console) │
│ permanence = WandBManager(project="...", entity="...") │
└─────────────────────────────────────────────────────────────┘
│
▼
Phase 2: INITIALIZATION (Controller)
┌─────────────────────────────────────────────────────────────┐
│ controller.initialize_permanences() │
│ → permanence.initialize() │
│ • Validate dependencies exist │
│ • Allocate resources (memory, GPU) │
│ • Setup connections (WandB, databases) │
│ • Verify configuration validity │
│ │
│ Example: Device.initialize() │
│ - Checks CUDA availability │
│ - Validates VRAM thresholds │
└─────────────────────────────────────────────────────────────┘
│
▼
Phase 3: VALIDATION (Executor - Optional)
┌─────────────────────────────────────────────────────────────┐
│ controller.validate_permanences() │
│ → permanence.validate() │
│ • Health checks │
│ • State verification │
│ • Consistency checks │
│ │
│ Example: Device.validate() │
│ - Checks VRAM usage < 95% │
└─────────────────────────────────────────────────────────────┘
│
▼
Phase 4: EXECUTION (Processes access permanences)
┌─────────────────────────────────────────────────────────────┐
│ Process execution: │
│ device = controller.get_permanence("device") │
│ progress = controller.get_permanence("progress_manager") │
│ wandb = controller.get_permanence("wandb_logger") │
│ │
│ Processes use permanences throughout execution: │
│ • Access device for GPU operations │
│ • Update progress bars │
│ • Log metrics to WandB │
│ • Load data from datasets │
│ │
│ Nested progress bars created by processes: │
│ @progress_manager.progress_task("train") │
│ def train_epoch(task_id, total, progress): │
│ for batch in dataloader: │
│ progress.advance(task_id) │
└─────────────────────────────────────────────────────────────┘
│
▼
Phase 5: CHECKPOINTING (Executor - Optional)
┌─────────────────────────────────────────────────────────────┐
│ controller.checkpoint_permanences() │
│ → permanence.checkpoint() │
│ • Save intermediate state │
│ • Create backups │
│ • Log checkpoint metrics │
│ │
│ Example: Network.checkpoint() │
│ - Saves model weights │
│ - Saves optimizer state │
└─────────────────────────────────────────────────────────────┘
│
▼
Phase 6: CLEANUP (Always executed)
┌─────────────────────────────────────────────────────────────┐
│ controller.cleanup() │
│ → permanence.cleanup() │
│ • Release memory (RAM/VRAM) │
│ • Close file handles │
│ • Close network connections │
│ • Finalize logging │
│ • Save final state │
│ │
│ Example: Device.cleanup() │
│ - Clears CUDA cache │
│ Example: WandBManager.cleanup() │
│ - Finalizes WandB run │
│ Example: ProgressManager.cleanup() │
│ - Closes progress bars │
└─────────────────────────────────────────────────────────────┘
Data Flow Diagram¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ DATA FLOW │
└─────────────────────────────────────────────────────────────────────────────┘
1. Configuration Loading
┌────────────────┐
│ TOML Config │
│ Files │
└────────┬───────┘
│
▼
┌────────────────┐ ┌──────────────────┐
│ PipelineBuilder│─────▶│ Class Registry │
│ │ │ (Permanences + │
│ • Load TOML │ │ Processes) │
│ • Validate │ └──────────────────┘
│ • Parse │
└────────┬───────┘
│
▼
┌────────────────────────────────┐
│ Permanence Instances │
│ + Process Specifications │
└────────┬───────────────────────┘
│
▼
2. Pipeline Execution Flow
┌─────────────────────────────────────────────────────────┐
│ PipelineRunner │
│ runner.run() │
└───────────────────┬─────────────────────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Builder │ │Controller│ │Executor │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
│ build() │ │
├───────────▶│ │
│ │ │
│ initialize_ │
│ permanences() │
│ ├────────────▶│
│ │ │
│ │ validate_
│ │ permanences()
│ │ │
│ │ │
│ │ run()│
│ │ ├─────┐
│ │ │ │
│ │ │ Execute Processes
│ │ │ │
│ │ │ ▼
│ │ │ ┌──────────────┐
│ │ │ │ Process 1 │
│ │◀────────────┼──│ get_perm() │
│ │ │ └──────────────┘
│ │ │ │
│ │ │ ▼
│ │ │ ┌──────────────┐
│ │ │ │ Process 2 │
│ │◀────────────┼──│ get_perm() │
│ │ │ └──────────────┘
│ │ │ │
│ │ │ ▼
│ │ │ ┌──────────────┐
│ │ │ │ Process N │
│ │◀────────────┼──│ get_perm() │
│ │ │ └──────────────┘
│ │ │ │
│ │ │ checkpoint_
│ │ │ permanences()
│ │ ◀─────┘
│ │ │
│ │ cleanup()
│ ◀─────────────│
│ │ │
│ cleanup() │
│ │ │
▼ ▼ ▼
Process Access Pattern¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ PROCESS PERMANENCE ACCESS │
└─────────────────────────────────────────────────────────────────────────────┘
┌───────────────────────────────────────────────────────────────┐
│ class TrainingProcess(PipelineProcess): │
│ │
│ def __init__(self, controller, force, epochs): │
│ super().__init__(controller, force) │
│ │
│ # Access permanences through controller │
│ self.device = controller.get_permanence("device") │
│ self.network = controller.get_permanence("network") │
│ self.progress = controller.get_permanence( │
│ "progress_manager" │
│ ) │
│ self.wandb = controller.get_permanence( │
│ "wandb_logger" │
│ ) │
│ self.data = controller.get_permanence("data") │
│ │
│ def execute(self) -> Optional[Exception]: │
│ # Use permanences in execution │
│ model = self.network.model_instance │
│ model.to(self.device.device) │
│ │
│ # Create nested progress bar │
│ @self.progress.progress_task("train") │
│ def train_loop(task_id, total, progress): │
│ for epoch in range(total): │
│ loss = self._train_epoch(model) │
│ self.wandb.log_metrics({"loss": loss}) │
│ progress.advance(task_id) │
│ │
│ train_loop(self.epochs) │
│ return None │
└───────────────────────────────────────────────────────────────┘
│
│ Access Pattern
▼
┌───────────────────────────────────────────────────────────────┐
│ PipelineController │
│ │
│ _permanences = { │
│ "device": <Device instance>, │
│ "network": <Network instance>, │
│ "progress_manager": <ProgressManager instance>, │
│ "wandb_logger": <WandBManager instance>, │
│ "data": <Data instance> │
│ } │
│ │
│ def get_permanence(self, name: str) -> Any: │
│ if name not in self._permanences: │
│ raise PermanenceKeyError(...) │
│ return self._permanences[name] │
└───────────────────────────────────────────────────────────────┘
WandB Integration Flow¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ WandB INTEGRATION │
└─────────────────────────────────────────────────────────────────────────────┘
Standard Run (No Sweep)
┌─────────────────────────────────────────────────────────┐
│ PipelineRunner.run() │
│ │ │
│ ├─ Check for WandB sweep config │
│ │ wandb = controller.get_permanence("wandb") │
│ │ if not wandb.sweep_id: │
│ │ │
│ └─▶ runner._run_once(controller) │
│ │ │
│ └─▶ PipelineExecutor.run() │
│ │ │
│ ├─ wandb.init_wandb() # Init single run │
│ │ │
│ ├─ executor._run_processes() │
│ │ └─▶ Processes log to WandB │
│ │ │
│ └─ executor._run_cleanup() │
│ └─▶ wandb.cleanup() # Finalize run │
└─────────────────────────────────────────────────────────┘
Sweep Run (Multiple Runs)
┌────────────────────────────────────────────────────────┐
│ PipelineRunner.run() │
│ │ │
│ ├─ Check for WandB sweep config │
│ │ wandb = controller.get_permanence("wandb") │
│ │ hyperparams = controller.get_permanence( │
│ │ "hyperparams" │
│ │ ) │
│ │ if wandb.sweep_id: │
│ │ │
│ └─▶ runner._run_with_sweep(controller, wandb) │
│ │ │
│ ├─ wandb.create_sweep( │
│ │ hyperparams.sweep_configuration │
│ │ ) │
│ │ │
│ └─ wandb.create_sweep_agent( │
│ function=lambda: runner._run_once() │
│ ) │
│ │ │
│ │ ┌─────── Agent spawns N runs ────────┐ │
│ │ │ │ │
│ └─▶│ Run 1: │ │
│ │ wandb.init_wandb() │ │
│ │ executor._run_processes() │ │
│ │ wandb.cleanup() │ │
│ │ │ │
│ │ Run 2: │ │
│ │ wandb.init_wandb() │ │
│ │ executor._run_processes() │ │
│ │ wandb.cleanup() │ │
│ │ │ │
│ │ ... │ │
│ │ │ │
│ │ Run N: │ │
│ │ wandb.init_wandb() │ │
│ │ executor._run_processes() │ │
│ │ wandb.cleanup() │ │
│ └────────────────────────────────────┘ │
└────────────────────────────────────────────────────────┘
Progress Bar Nesting Example¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ NESTED PROGRESS BARS │
└─────────────────────────────────────────────────────────────────────────────┘
Visual Output:
┌─────────────────────────────────────────────────────────────────────┐
│ Overall ████████░░░░░░░░░░░░░░░░░░░░ (2/5) • 00:15:30 │ ← Executor
│ Cleanup ░░░░░░░░░░░░░░░░░░░░░░░░░░░░ (0/6) • 00:00:00 │ ← Executor
│ Epoch ████████████████░░░░░░░░░░░░ (2/3) • Status: Train │ ← Process
│ Train-Val ████████████████████████████ (100/100) • 00:02:15 │ ← Process
│ Result ░░░░░░░░░░░░░░░░░░░░░░░░░░░░ (0/15) • 00:00:00 │ ← Process
└─────────────────────────────────────────────────────────────────────┘
Code Flow:
┌──────────────────────────────────────────────────────────┐
│ PipelineExecutor │
│ │
│ @progress_decorator("overall") # Top-level bar │
│ def _execute(task_id, total, progress): │
│ for idx, process in enumerate(processes): │
│ process.execute() # Process creates nested │
│ progress.advance(task_id) │
│ │
│ @progress_decorator("cleanup") # Top-level bar │
│ def _cleanup(task_id, total, progress): │
│ controller.cleanup() │
│ progress.advance(task_id) │
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ TrainingProcess │
│ │
│ def execute(self): │
│ @self.progress.progress_task("epoch") │
│ def train_epochs(task_id, total, progress): │
│ for epoch in range(total): │
│ @self.progress.progress_task("train-val") │
│ def train_val(tid, tot, prog): │
│ for batch in dataloader: │
│ # Training logic │
│ prog.advance(tid) │
│ train_val(num_batches) │
│ progress.advance(task_id) │
│ │
│ train_epochs(self.epochs) │
└──────────────────────────────────────────────────────────┘
Testing Strategy¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ TESTING LAYERS │
└─────────────────────────────────────────────────────────────────────────────┘
Unit Tests:
┌────────────────────────────────────────────────────┐
│ test_permanences.py │
│ • Test each permanence in isolation │
│ • Mock dependencies │
│ • Test lifecycle methods │
│ │
│ test_builder.py │
│ • Test config loading │
│ • Test class registration │
│ • Test permanence/process building │
│ │
│ test_controller.py │
│ • Test permanence access │
│ • Test process iteration │
│ • Test lifecycle coordination │
│ │
│ test_executor.py │
│ • Test process execution │
│ • Test error handling │
│ • Mock progress manager │
└────────────────────────────────────────────────────┘
Integration Tests:
┌────────────────────────────────────────────────────┐
│ test_pipeline_integration.py │
│ • Test Builder → Controller → Executor flow │
│ • Use mock permanences and processes │
│ • Verify data flow │
│ │
│ test_runner.py │
│ • Test PipelineRunner end-to-end │
│ • Mock WandB integration │
│ • Test programmatic API │
└────────────────────────────────────────────────────┘
Programmatic Testing Example:
┌────────────────────────────────────────────────────┐
│ # No CLI needed! │
│ │
│ def test_pipeline_execution(): │
│ # Create mock permanences │
│ permanences = { │
│ "device": MockDevice(), │
│ "network": MockNetwork(), │
│ "progress_manager": None, # Optional │
│ } │
│ │
│ # Create mock process specs │
│ process_specs = [ │
│ ProcessWithParams( │
│ MockProcess, │
│ {"force": False} │
│ ) │
│ ] │
│ │
│ # Create controller │
│ controller = PipelineController( │
│ permanences, │
│ process_specs │
│ ) │
│ │
│ # Execute │
│ executor = PipelineExecutor(controller) │
│ executor.run() │
│ │
│ # Verify results │
│ assert MockProcess.executed │
└────────────────────────────────────────────────────┘
Key Benefits Summary¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ KEY BENEFITS │
└─────────────────────────────────────────────────────────────────────────────┘
✓ Clear Separation of Concerns
├─ CLI: User interaction only
├─ Runner: Orchestration
├─ Builder: Component construction
├─ Controller: Lifecycle management
└─ Executor: Execution & visualization
✓ Testability
├─ Each layer independently testable
├─ Mock permanences and processes
└─ No CLI required for testing
✓ Reusability
├─ Programmatic API via PipelineRunner
├─ Can embed in other applications
└─ Flexible executor implementations
✓ Extensibility
├─ Easy to add new permanences
├─ Easy to add new processes
├─ Optional lifecycle hooks
└─ Plugin architecture ready
✓ Maintainability
├─ Single responsibility per class
├─ Clear dependencies
├─ Well-defined interfaces
└─ Structured lifecycle
✓ Flexibility
├─ Swap executor implementations
├─ Optional visualization
├─ WandB sweep support
└─ Nested progress bars
Progressive Enhancement: Helper Layer¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ HELPER LAYER (NEW!) │
│ (helpers.py, decorators.py) │
│ │
│ Bridges the gap between scripts and pipelines │
│ Works standalone OR with pipeline context │
└─────────────────────────────────────────────────────────────────────────────┘
SCRIPT MODE (Standalone) │ PIPELINE MODE
│
┌─────────────────────────────────┐ │ ┌──────────────────────────────┐
│ # researcher_script.py │ │ │ # Within PipelineExecutor │
│ │ │ │ │
│ from helpers import ( │ │ │ # Set context │
│ progress_bar, │ │ │ set_pipeline_context({ │
│ logger, │ │ │ "progress_manager": pm, │
│ device_manager │ │ │ "wandb_logger": wb, │
│ ) │ │ │ "device": dev │
│ │ │ │ }) │
│ # Works without pipeline! │ │ │ │
│ logger.init(project="exp") │ │ │ # Helpers auto-detect │
│ │ │ │ # pipeline and use it │
│ for epoch in progress_bar(...):│ │ │ │
│ device = device_manager │ │ │ # Same code works! │
│ .get_device() │ │ │ for epoch in progress_bar():│
│ logger.log({"loss": loss}) │ │ │ device = device_manager │
│ │ │ │ .get_device() │
└─────────────────────────────────┘ │ │ logger.log({...}) │
│ └──────────────────────────────┘
Uses Rich & basic WandB │ Uses Pipeline's permanences
DECORATOR PATTERN: Function → Process
┌────────────────────────────────────────────────────────────────────────────┐
│ @pipeline_process # Marks function as pipeline-ready │
│ def train(epochs: int = 10): # Parameters become config options │
│ '''Can run standalone OR in pipeline!''' │
│ │
│ device = device_manager.get_device() # Works in both modes │
│ model = MyModel().to(device) │
│ │
│ for epoch in progress_bar(range(epochs), desc="Training"): │
│ loss = train_step(model) │
│ logger.log({"loss": loss}) │
│ │
│ # Run as script │
│ if __name__ == "__main__": │
│ train(epochs=5) # ← Normal function call │
│ │
│ # Or in config.toml for pipeline │
│ # [processes.training] │
│ # type = "train" │
│ # params = { epochs = 10 } │
└────────────────────────────────────────────────────────────────────────────┘
HELPER IMPLEMENTATIONS
┌────────────────────────────────────────────────────────────────────────────┐
│ progress_bar(iterable, desc="Processing") │
│ ├─ No pipeline context: Uses rich.track (tqdm-like) │
│ └─ With pipeline context: Uses pipeline's ProgressManager │
│ │
│ logger.init(project, entity) / logger.log(metrics) │
│ ├─ No pipeline context: Initializes WandB manually │
│ └─ With pipeline context: Uses pipeline's WandBManager │
│ │
│ device_manager.get_device() │
│ ├─ No pipeline context: Selects first available GPU │
│ └─ With pipeline context: Uses pipeline's Device permanence │
└────────────────────────────────────────────────────────────────────────────┘
Research-to-Production Journey¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ FROM SCRIPT TO PIPELINE │
└─────────────────────────────────────────────────────────────────────────────┘
Level 0: Raw Script Changes: 0
┌────────────────────────────────┐
│ for epoch in range(10): │ • No framework
│ loss = train() │ • Manual everything
│ print(loss) │ • Hard to track
└────────────────────────────────┘
│ Add 1 import, wrap loops
▼
Level 1: Progress Bars Changes: +2 lines
┌──────────────────────────────────┐
│ from helpers import │ ✓ Visual progress
│ progress_bar │ ✓ Time estimates
│ │ • Still a script
│ for epoch in progress_bar(...): │
│ loss = train() │
└──────────────────────────────────┘
│ Add logger.init() and logger.log()
▼
Level 2: Experiment Tracking Changes: +3 lines
┌──────────────────────────────────┐
│ from helpers import │ ✓ Progress bars
│ progress_bar, logger │ ✓ WandB logging
│ │ ✓ Experiment tracking
│ logger.init(project="exp") │ • Still a script
│ │
│ for epoch in progress_bar(...): │
│ loss = train() │
│ logger.log({"loss": loss}) │
└──────────────────────────────────┘
│ Add device_manager
▼
Level 3: Device Management Changes: +2 lines
┌────────────────────────────────┐
│ from helpers import │ ✓ Progress + logging
│ progress_bar, logger, │ ✓ Auto device select
│ device_manager │ ✓ Multi-GPU support
│ │ • Still a script
│ device = device_manager │
│ .get_device() │
│ model.to(device) │
└────────────────────────────────┘
│ Extract functions, add decorator
▼
Level 4: Reusable Functions Changes: Extract to functions
┌─────────────────────────────────┐
│ @pipeline_process │ ✓ All previous features
│ def train(epochs: int = 10): │ ✓ Reusable code
│ device = device_manager │ ✓ Type hints
│ .get_device() │ ✓ Ready for pipeline
│ for epoch in progress_bar( │ • Still runs as script
│ range(epochs) │
│ ): │
│ loss = train_step() │
│ logger.log(...) │
│ │
│ if __name__ == "__main__": │
│ train(epochs=5) │
└─────────────────────────────────┘
│ Create config, register in pipeline
▼
Level 5: Full Pipeline Changes: Config file + class wrapper
┌────────────────────────────────┐
│ # config.toml │ ✓ All previous features
│ [processes.training] │ ✓ Config-driven
│ type = "TrainingProcess" │ ✓ Permanence lifecycle
│ params = { epochs = 10 } │ ✓ Production ready
│ │ ✓ Team collaboration
│ # Run via CLI │ ✓ CI/CD integration
│ $ tipi run exp │
└────────────────────────────────┘
Key Insight: Copy-Paste Compatibility¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ CODE REUSE BETWEEN LEVELS │
└─────────────────────────────────────────────────────────────────────────────┘
Training Logic (Same across levels!)
┌────────────────────────────────────────────────────────────┐
│ def train_step(model, batch, device): │
│ inputs, targets = batch │
│ inputs = inputs.to(device) │
│ targets = targets.to(device) │
│ │
│ optimizer.zero_grad() │
│ outputs = model(inputs) │
│ loss = criterion(outputs, targets) │
│ loss.backward() │
│ optimizer.step() │
│ │
│ return loss.item() │
└────────────────────────────────────────────────────────────┘
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
Level 2-3 Level 4 Level 5
(Script) (Function) (Process)
Same code! Same code! Same code!
Just called Just decorated Just wrapped
directly with @pipeline in PipelineProcess
class
No rewriting needed - just progressive wrapping!
Documentation References¶
- Full Guide: See
docs/progressive_enhancement.md - Helper API: See
tipi/helpers.py - Decorators: See
tipi/decorators.py