# Auto-Scaling ML Training Pipeline

## What We're Building

A production-grade auto-scaling pipeline that dynamically provisions and releases Clore GPUs based on your job queue depth. Submit training jobs, and the pipeline handles everything: finding servers, renting them, distributing work, managing checkpoints, and releasing resources when idle — all while respecting your hourly budget.

**Key Features:**

* File-based job queue (no Redis required, optional Redis backend)
* Automatic GPU scaling from 1 to N workers
* Cost-aware scheduling with hourly budget caps
* Checkpoint saving and resumption across workers
* Real-time terminal dashboard
* Graceful shutdown and cleanup

## Prerequisites

* Clore.ai account with API key ([get one here](https://clore.ai))
* Python 3.10+

```bash
pip install requests rich schedule
# Optional: pip install redis  (for Redis queue backend)
```

## Architecture Overview

```
┌─────────────────────────────────────────────────────────────────┐
│                   Auto-Scaling Pipeline                          │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐  │
│  │   Job Queue     │  │    Scheduler    │  │  Worker Pool    │  │
│  │  (file/Redis)   │──│  (scale logic)  │──│  (Clore GPUs)   │  │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘  │
│          │                    │                    │            │
│          ▼                    ▼                    ▼            │
│  ┌─────────────────────────────────────────────────────────────┐  │
│  │              Clore.ai API                                   │  │
│  │  /v1/marketplace   /v1/create_order   /v1/my_orders         │  │
│  │  /v1/cancel_order                                           │  │
│  └─────────────────────────────────────────────────────────────┘  │
│          │                                         │            │
│          ▼                                         ▼            │
│  ┌─────────────────┐                  ┌─────────────────────┐  │
│  │   Checkpoints   │                  │  Terminal Dashboard │  │
│  │   (local/NFS)   │                  │  (rich live view)   │  │
│  └─────────────────┘                  └─────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘
```

## Step 1: Job Queue

The queue is a simple JSON-lines file. Each line is one job. Works without any external services.

```python
# job_queue.py
"""File-based (and optional Redis) job queue for ML training jobs."""

import json
import os
import time
import uuid
import fcntl
import threading
from enum import Enum
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field, asdict
from datetime import datetime


class JobStatus(str, Enum):
    PENDING   = "pending"
    RUNNING   = "running"
    DONE      = "done"
    FAILED    = "failed"
    CANCELLED = "cancelled"


@dataclass
class TrainingJob:
    """A single training job in the queue."""
    job_id:         str
    name:           str
    image:          str
    command:        str
    gpu_type:       str
    gpu_count:      int       = 1
    max_price:      float     = 0.50          # USD/hr per GPU
    priority:       int       = 5             # 1 (high) – 10 (low)
    max_runtime_hr: float     = 4.0
    env:            Dict[str, str] = field(default_factory=dict)
    checkpoint_dir: str       = ""
    status:         JobStatus = JobStatus.PENDING
    worker_id:      str       = ""
    order_id:       int       = 0
    submitted_at:   str       = field(default_factory=lambda: datetime.utcnow().isoformat())
    started_at:     str       = ""
    finished_at:    str       = ""
    cost_usd:       float     = 0.0
    error:          str       = ""

    def to_dict(self) -> Dict:
        d = asdict(self)
        d["status"] = self.status.value
        return d

    @classmethod
    def from_dict(cls, d: Dict) -> "TrainingJob":
        d = dict(d)
        d["status"] = JobStatus(d.get("status", "pending"))
        return cls(**d)


class FileJobQueue:
    """Thread-safe file-backed job queue."""

    def __init__(self, queue_path: str = "jobs.jsonl"):
        self.queue_path = queue_path
        self._lock = threading.Lock()
        if not os.path.exists(queue_path):
            open(queue_path, "w").close()

    # ── internal helpers ──────────────────────────────────────────

    def _read_all(self) -> List[TrainingJob]:
        jobs = []
        with open(self.queue_path, "r") as f:
            for line in f:
                line = line.strip()
                if line:
                    jobs.append(TrainingJob.from_dict(json.loads(line)))
        return jobs

    def _write_all(self, jobs: List[TrainingJob]):
        with open(self.queue_path, "w") as f:
            for job in jobs:
                f.write(json.dumps(job.to_dict()) + "\n")

    # ── public API ────────────────────────────────────────────────

    def submit(self, job: TrainingJob) -> str:
        """Add a job to the queue."""
        with self._lock:
            jobs = self._read_all()
            jobs.append(job)
            self._write_all(jobs)
        return job.job_id

    def next_pending(self) -> Optional[TrainingJob]:
        """Return the highest-priority pending job (without claiming it)."""
        with self._lock:
            jobs = self._read_all()
            pending = [j for j in jobs if j.status == JobStatus.PENDING]
            if not pending:
                return None
            pending.sort(key=lambda j: j.priority)
            return pending[0]

    def claim(self, job_id: str, worker_id: str) -> bool:
        """Atomically claim a pending job for a worker."""
        with self._lock:
            jobs = self._read_all()
            for job in jobs:
                if job.job_id == job_id and job.status == JobStatus.PENDING:
                    job.status   = JobStatus.RUNNING
                    job.worker_id = worker_id
                    job.started_at = datetime.utcnow().isoformat()
                    self._write_all(jobs)
                    return True
            return False

    def update(self, job_id: str, **kwargs):
        """Update fields on a job."""
        with self._lock:
            jobs = self._read_all()
            for job in jobs:
                if job.job_id == job_id:
                    for k, v in kwargs.items():
                        if hasattr(job, k):
                            setattr(job, k, v)
                    break
            self._write_all(jobs)

    def complete(self, job_id: str, cost_usd: float = 0.0):
        """Mark a job as done."""
        self.update(
            job_id,
            status=JobStatus.DONE,
            finished_at=datetime.utcnow().isoformat(),
            cost_usd=cost_usd,
        )

    def fail(self, job_id: str, error: str = ""):
        """Mark a job as failed."""
        self.update(
            job_id,
            status=JobStatus.FAILED,
            finished_at=datetime.utcnow().isoformat(),
            error=error,
        )

    def pending_count(self) -> int:
        jobs = self._read_all()
        return sum(1 for j in jobs if j.status == JobStatus.PENDING)

    def running_count(self) -> int:
        jobs = self._read_all()
        return sum(1 for j in jobs if j.status == JobStatus.RUNNING)

    def all_jobs(self) -> List[TrainingJob]:
        return self._read_all()

    def stats(self) -> Dict[str, int]:
        jobs = self._read_all()
        counts: Dict[str, int] = {s.value: 0 for s in JobStatus}
        for job in jobs:
            counts[job.status.value] += 1
        return counts
```

## Step 2: Set Up the Clore Client

> 📦 **Using the standard Clore API client.** See [Clore API Client Reference](https://docs.clore.ai/dev/reference/clore-client) for the full implementation and setup instructions. Save it as `clore_client.py` in your project.

```python
from clore_client import CloreClient

client = CloreClient(api_key="your-api-key")
```

## Step 3: Worker — Runs One Job on a Rented GPU

```python
# worker.py
"""Worker that executes a single TrainingJob on a Clore GPU."""

import time
import uuid
import logging
import paramiko
from typing import Optional
from dataclasses import dataclass

from clore_client import CloreClient
from job_queue import FileJobQueue, TrainingJob, JobStatus

logger = logging.getLogger(__name__)


@dataclass
class WorkerState:
    worker_id: str
    server_id: int
    order_id:  int
    job_id:    str
    ssh_host:  str
    ssh_port:  int
    started_at: float
    price_per_hr: float


class Worker:
    """Manages the lifecycle of one rented GPU for one training job."""

    def __init__(
        self,
        client:       CloreClient,
        queue:        FileJobQueue,
        ssh_password: str = "AutoScale123!",
    ):
        self.client       = client
        self.queue        = queue
        self.ssh_password = ssh_password
        self.worker_id    = str(uuid.uuid4())[:8]
        self.state: Optional[WorkerState] = None

    # ── provision ─────────────────────────────────────────────────

    def provision(self, job: TrainingJob) -> WorkerState:
        """Find a server, rent it, wait until running."""
        servers = self.client.find_servers(job.gpu_type, job.max_price)
        if not servers:
            raise RuntimeError(
                f"No {job.gpu_type} servers below ${job.max_price}/hr"
            )

        server = servers[0]
        logger.info(f"[{self.worker_id}] Renting server {server['id']} "
                    f"@ ${server['price']:.3f}/hr")

        env = dict(job.env)
        if job.checkpoint_dir:
            env["CHECKPOINT_DIR"] = job.checkpoint_dir

        order_id = self.client.create_order(
            server_id=server["id"],
            image=job.image,
            command=job.command,
            env=env,
            ssh_password=self.ssh_password,
        )

        logger.info(f"[{self.worker_id}] Order {order_id} created, waiting for start...")
        order = self.client.wait_for_order(order_id, timeout=300)

        ssh_info = order.get("connection", {}).get("ssh", "")
        parts    = ssh_info.split()
        host     = parts[1].split("@")[1] if len(parts) > 1 else "127.0.0.1"
        port     = int(parts[3]) if len(parts) > 3 else 22

        self.state = WorkerState(
            worker_id=self.worker_id,
            server_id=server["id"],
            order_id=order_id,
            job_id=job.job_id,
            ssh_host=host,
            ssh_port=port,
            started_at=time.time(),
            price_per_hr=server["price"],
        )
        return self.state

    # ── execution ─────────────────────────────────────────────────

    def _ssh_exec(self, command: str, timeout: int = 7200) -> int:
        """Run command over SSH and return exit code."""
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        for attempt in range(5):
            try:
                client.connect(
                    self.state.ssh_host,
                    port=self.state.ssh_port,
                    username="root",
                    password=self.ssh_password,
                    timeout=30,
                )
                break
            except Exception as e:
                if attempt == 4:
                    raise
                logger.warning(f"SSH connect attempt {attempt+1} failed: {e}")
                time.sleep(10)

        _, stdout, stderr = client.exec_command(command, timeout=timeout)

        # Stream output
        for line in iter(stdout.readline, ""):
            logger.info(f"[{self.worker_id}] {line.rstrip()}")

        exit_code = stdout.channel.recv_exit_status()
        client.close()
        return exit_code

    def run_job(self, job: TrainingJob) -> bool:
        """
        Claim the job from the queue, run it, mark done/failed.
        Returns True on success.
        """
        if not self.queue.claim(job.job_id, self.worker_id):
            logger.warning(f"[{self.worker_id}] Could not claim job {job.job_id}")
            return False

        self.queue.update(job.job_id, order_id=self.state.order_id)

        try:
            logger.info(f"[{self.worker_id}] Running job '{job.name}'")
            exit_code = self._ssh_exec(job.command)

            elapsed_hr = (time.time() - self.state.started_at) / 3600
            cost       = elapsed_hr * self.state.price_per_hr

            if exit_code == 0:
                self.queue.complete(job.job_id, cost_usd=cost)
                logger.info(f"[{self.worker_id}] Job '{job.name}' done "
                            f"(${cost:.4f})")
                return True
            else:
                self.queue.fail(job.job_id,
                                error=f"Exit code {exit_code}")
                logger.error(f"[{self.worker_id}] Job '{job.name}' failed "
                             f"(exit {exit_code})")
                return False

        except Exception as e:
            self.queue.fail(job.job_id, error=str(e))
            logger.exception(f"[{self.worker_id}] Exception running job: {e}")
            return False

    # ── cleanup ───────────────────────────────────────────────────

    def release(self):
        """Cancel the Clore order."""
        if self.state:
            self.client.cancel_order(self.state.order_id)
            logger.info(f"[{self.worker_id}] Released order {self.state.order_id}")
            self.state = None
```

## Step 4: Auto-Scaling Scheduler

This is the brain. It watches queue depth, compares it against active workers, and decides when to scale up or down.

```python
# auto_scaler.py
"""Auto-scaling scheduler: provisions and releases Clore GPU workers."""

import time
import threading
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass, field

from clore_client import CloreClient
from job_queue import FileJobQueue, TrainingJob
from worker import Worker

logger = logging.getLogger(__name__)


@dataclass
class ScalerConfig:
    """Tunable knobs for the scaler."""
    min_workers:          int   = 0
    max_workers:          int   = 5
    scale_up_threshold:   int   = 3     # pending jobs → add a worker
    scale_down_threshold: int   = 1     # pending jobs → remove idle worker
    max_hourly_budget:    float = 10.0  # USD/hr across all workers
    poll_interval_sec:    int   = 30
    idle_timeout_sec:     int   = 120   # release worker if idle this long
    gpu_type:             str   = "RTX 4090"
    max_price_per_gpu:    float = 0.50
    docker_image:         str   = "pytorch/pytorch:2.1.0-cuda12.1-cudnn8-devel"
    ssh_password:         str   = "AutoScale123!"


class WorkerHandle:
    """Tracks a running worker thread."""
    def __init__(self, worker: Worker):
        self.worker    = worker
        self.thread:   Optional[threading.Thread] = None
        self.idle_since: Optional[float] = None
        self.busy      = False


class AutoScalingPipeline:
    """
    Submit jobs, run pipeline.start(), and let the scaler handle everything.

    Usage:
        pipeline = AutoScalingPipeline(api_key="...", config=ScalerConfig(...))
        pipeline.submit(job1)
        pipeline.submit(job2)
        pipeline.start()          # blocks until all jobs are done
    """

    def __init__(
        self,
        api_key:    str,
        config:     Optional[ScalerConfig] = None,
        queue_path: str = "jobs.jsonl",
    ):
        self.client  = CloreClient(api_key)
        self.queue   = FileJobQueue(queue_path)
        self.config  = config or ScalerConfig()
        self._workers: Dict[str, WorkerHandle] = {}  # worker_id → handle
        self._lock   = threading.Lock()
        self._stop   = threading.Event()

    # ── public API ────────────────────────────────────────────────

    def submit(self, job: TrainingJob) -> str:
        """Add a job to the queue."""
        job_id = self.queue.submit(job)
        logger.info(f"Submitted job '{job.name}' → {job_id}")
        return job_id

    def start(self, block: bool = True):
        """Start the scaling loop. If block=True, waits until all jobs done."""
        logger.info("🚀 AutoScalingPipeline started")
        self._stop.clear()

        if block:
            self._run_loop()
        else:
            t = threading.Thread(target=self._run_loop, daemon=True)
            t.start()

    def stop(self):
        """Signal the pipeline to stop after current jobs finish."""
        self._stop.set()

    # ── scaling logic ─────────────────────────────────────────────

    def _active_worker_count(self) -> int:
        with self._lock:
            return len(self._workers)

    def _current_spend(self) -> float:
        """Sum hourly cost of all active workers."""
        total = 0.0
        with self._lock:
            for h in self._workers.values():
                if h.worker.state:
                    total += h.worker.state.price_per_hr
        return total

    def _should_scale_up(self, pending: int, active: int) -> bool:
        if active >= self.config.max_workers:
            return False
        if pending < self.config.scale_up_threshold:
            return False
        if self._current_spend() >= self.config.max_hourly_budget:
            logger.warning(
                f"Budget cap: ${self._current_spend():.2f}/hr ≥ "
                f"${self.config.max_hourly_budget:.2f}/hr — not scaling up"
            )
            return False
        return True

    def _should_scale_down(self, pending: int) -> bool:
        if self._active_worker_count() <= self.config.min_workers:
            return False
        return pending < self.config.scale_down_threshold

    def _scale_up(self):
        """Provision one new worker and start it in a thread."""
        worker = Worker(
            self.client,
            self.queue,
            ssh_password=self.config.ssh_password,
        )
        # Create a dummy job to provision on — actual job assigned in worker thread
        handle = WorkerHandle(worker)

        def worker_loop():
            """Keep taking jobs until idle timeout or queue empty."""
            consecutive_empty = 0
            while not self._stop.is_set():
                job = self.queue.next_pending()
                if job is None:
                    handle.idle_since = handle.idle_since or time.time()
                    idle_sec = time.time() - handle.idle_since
                    if idle_sec > self.config.idle_timeout_sec:
                        logger.info(
                            f"[{worker.worker_id}] Idle {idle_sec:.0f}s — releasing"
                        )
                        break
                    time.sleep(5)
                    consecutive_empty += 1
                    continue

                handle.idle_since = None
                handle.busy = True

                # Provision server if not yet done
                if worker.state is None:
                    # Find a job to determine GPU type / price
                    dummy_job = TrainingJob(
                        job_id="probe",
                        name="probe",
                        image=self.config.docker_image,
                        command="echo ok",
                        gpu_type=job.gpu_type or self.config.gpu_type,
                        max_price=job.max_price or self.config.max_price_per_gpu,
                    )
                    try:
                        worker.provision(dummy_job)
                    except Exception as e:
                        logger.error(f"Provision failed: {e}")
                        break

                worker.run_job(job)
                handle.busy = False

            worker.release()
            with self._lock:
                self._workers.pop(worker.worker_id, None)
            logger.info(f"[{worker.worker_id}] Worker exited")

        handle.thread = threading.Thread(target=worker_loop, daemon=True)

        with self._lock:
            self._workers[worker.worker_id] = handle

        handle.thread.start()
        logger.info(f"Scaled UP → {len(self._workers)} workers")

    def _scale_down_idle(self):
        """Signal the most-idle worker to stop."""
        with self._lock:
            idle_handles = [
                h for h in self._workers.values()
                if not h.busy
            ]
        if idle_handles:
            # Mark the oldest idle worker
            target = idle_handles[0]
            target.worker._stop_flag = True  # checked in worker loop (optional)
            logger.info(f"Scaled DOWN — signalled idle worker")

    # ── main loop ─────────────────────────────────────────────────

    def _run_loop(self):
        try:
            while not self._stop.is_set():
                pending = self.queue.pending_count()
                active  = self._active_worker_count()
                running = self.queue.running_count()

                logger.debug(
                    f"Queue: pending={pending} running={running} "
                    f"workers={active} spend=${self._current_spend():.2f}/hr"
                )

                # Scale up?
                if self._should_scale_up(pending, active):
                    self._scale_up()

                # Scale down?
                elif self._should_scale_down(pending):
                    self._scale_down_idle()

                # All done?
                if pending == 0 and running == 0 and active == 0:
                    logger.info("✅ All jobs complete — pipeline done")
                    break

                time.sleep(self.config.poll_interval_sec)
        finally:
            self._shutdown_all_workers()

    def _shutdown_all_workers(self):
        """Cancel all orders on exit."""
        logger.info("🧹 Shutting down all workers...")
        with self._lock:
            handles = list(self._workers.values())
        for h in handles:
            h.worker.release()
        logger.info("✅ All workers released")
```

## Step 5: Checkpoint Management

Checkpoints let jobs resume if a worker is evicted or fails mid-run.

```python
# checkpoint_manager.py
"""Simple checkpoint manager for distributed auto-scaling jobs."""

import os
import json
import shutil
import hashlib
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime


class CheckpointManager:
    """
    Save and load training checkpoints tied to a job ID.

    Checkpoints are stored under:
        <base_dir>/<job_id>/checkpoint_<epoch>.pt
        <base_dir>/<job_id>/meta.json
    """

    def __init__(self, base_dir: str = "./checkpoints"):
        self.base_dir = Path(base_dir)
        self.base_dir.mkdir(parents=True, exist_ok=True)

    def checkpoint_dir(self, job_id: str) -> Path:
        d = self.base_dir / job_id
        d.mkdir(parents=True, exist_ok=True)
        return d

    def save_meta(self, job_id: str, meta: Dict[str, Any]):
        path = self.checkpoint_dir(job_id) / "meta.json"
        with open(path, "w") as f:
            json.dump({**meta, "updated_at": datetime.utcnow().isoformat()}, f, indent=2)

    def load_meta(self, job_id: str) -> Optional[Dict[str, Any]]:
        path = self.checkpoint_dir(job_id) / "meta.json"
        if not path.exists():
            return None
        with open(path) as f:
            return json.load(f)

    def latest_checkpoint(self, job_id: str) -> Optional[Path]:
        """Return the path to the most recent checkpoint file, if any."""
        d = self.checkpoint_dir(job_id)
        checkpoints = sorted(d.glob("checkpoint_*.pt"))
        return checkpoints[-1] if checkpoints else None

    def prune_old(self, job_id: str, keep: int = 3):
        """Keep only the N most recent checkpoints."""
        d = self.checkpoint_dir(job_id)
        checkpoints = sorted(d.glob("checkpoint_*.pt"))
        for old in checkpoints[:-keep]:
            old.unlink()


# Example: wrap a PyTorch training loop with checkpoint support
TRAIN_WITH_RESUME = """
import torch
import torch.nn as nn
import os, json
from pathlib import Path

def train_with_resume(model, optimizer, loader, job_id, total_epochs=10):
    ckpt_dir = Path(os.environ.get("CHECKPOINT_DIR", "./checkpoints")) / job_id
    ckpt_dir.mkdir(parents=True, exist_ok=True)

    # Resume?
    start_epoch = 0
    checkpoints = sorted(ckpt_dir.glob("checkpoint_*.pt"))
    if checkpoints:
        ckpt = torch.load(checkpoints[-1])
        model.load_state_dict(ckpt["model"])
        optimizer.load_state_dict(ckpt["optimizer"])
        start_epoch = ckpt["epoch"] + 1
        print(f"Resumed from epoch {start_epoch}")

    criterion = nn.CrossEntropyLoss()

    for epoch in range(start_epoch, total_epochs):
        model.train()
        for batch in loader:
            x, y = batch
            optimizer.zero_grad()
            loss = criterion(model(x), y)
            loss.backward()
            optimizer.step()

        # Save checkpoint
        torch.save(
            {"epoch": epoch, "model": model.state_dict(),
             "optimizer": optimizer.state_dict()},
            ckpt_dir / f"checkpoint_{epoch:04d}.pt"
        )
        print(f"Epoch {epoch} done")

    return model
"""
```

## Step 6: Terminal Dashboard

A live dashboard using `rich` so you can watch the pipeline in real time.

```python
# dashboard.py
"""Live terminal dashboard for the auto-scaling pipeline."""

import time
import threading
from datetime import datetime
from typing import List

from rich.console import Console
from rich.live import Live
from rich.table import Table
from rich.panel import Panel
from rich.layout import Layout
from rich.text import Text

from job_queue import FileJobQueue, JobStatus, TrainingJob


class PipelineDashboard:
    """Render a live terminal view of queue state and worker costs."""

    def __init__(self, queue: FileJobQueue, refresh_rate: float = 2.0):
        self.queue        = queue
        self.refresh_rate = refresh_rate
        self._console     = Console()
        self._running     = False

    def _build_table(self, jobs: List[TrainingJob]) -> Table:
        table = Table(
            title="📋 Job Queue",
            show_header=True,
            header_style="bold cyan",
        )
        table.add_column("ID",       style="dim",    width=10)
        table.add_column("Name",     style="white",  width=24)
        table.add_column("GPU",      style="yellow", width=12)
        table.add_column("Status",   width=12)
        table.add_column("Worker",   width=10)
        table.add_column("Cost",     justify="right", width=8)

        status_colors = {
            JobStatus.PENDING:   "yellow",
            JobStatus.RUNNING:   "green",
            JobStatus.DONE:      "dim green",
            JobStatus.FAILED:    "red",
            JobStatus.CANCELLED: "dim",
        }

        for job in jobs:
            color  = status_colors.get(job.status, "white")
            status = Text(job.status.value.upper(), style=color)
            cost   = f"${job.cost_usd:.3f}" if job.cost_usd else "—"
            table.add_row(
                job.job_id[:8],
                job.name[:24],
                job.gpu_type,
                status,
                job.worker_id[:8] or "—",
                cost,
            )
        return table

    def _build_summary(self, jobs: List[TrainingJob]) -> Panel:
        stats    = {s.value: 0 for s in JobStatus}
        total_cost = 0.0
        for job in jobs:
            stats[job.status.value] += 1
            total_cost += job.cost_usd

        lines = [
            f"[cyan]Pending:[/]   {stats['pending']}",
            f"[green]Running:[/]   {stats['running']}",
            f"[dim green]Done:[/]      {stats['done']}",
            f"[red]Failed:[/]    {stats['failed']}",
            f"",
            f"[bold]Total Cost:[/] ${total_cost:.4f}",
            f"[dim]Updated:    {datetime.now().strftime('%H:%M:%S')}[/]",
        ]
        return Panel("\n".join(lines), title="📊 Summary", width=30)

    def _render(self) -> Layout:
        jobs   = self.queue.all_jobs()
        layout = Layout()
        layout.split_row(
            Layout(self._build_table(jobs), name="table"),
            Layout(self._build_summary(jobs), name="summary", size=32),
        )
        return layout

    def run(self):
        """Start the live dashboard (blocking)."""
        self._running = True
        with Live(self._render(), refresh_per_second=1 / self.refresh_rate,
                  screen=False) as live:
            while self._running:
                live.update(self._render())
                time.sleep(self.refresh_rate)

    def run_background(self):
        """Start dashboard in a background thread."""
        t = threading.Thread(target=self.run, daemon=True)
        t.start()

    def stop(self):
        self._running = False
```

## Step 7: Putting It All Together

```python
# run_pipeline.py
"""Example: submit 6 training jobs and let the auto-scaler handle them."""

import logging
import uuid
from auto_scaler import AutoScalingPipeline, ScalerConfig
from job_queue import TrainingJob
from dashboard import PipelineDashboard

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)

API_KEY = "YOUR_CLORE_API_KEY"

# ── 1. Configure the scaler ───────────────────────────────────────
config = ScalerConfig(
    min_workers=0,
    max_workers=4,
    scale_up_threshold=2,     # spawn a new worker when ≥2 jobs waiting
    scale_down_threshold=1,   # release idle workers when <1 job waiting
    max_hourly_budget=8.0,    # never exceed $8/hr total
    poll_interval_sec=20,
    idle_timeout_sec=90,
    gpu_type="RTX 4090",
    max_price_per_gpu=0.45,
    docker_image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-devel",
)

# ── 2. Create pipeline ────────────────────────────────────────────
pipeline = AutoScalingPipeline(
    api_key=API_KEY,
    config=config,
    queue_path="./my_jobs.jsonl",
)

# ── 3. Submit jobs ────────────────────────────────────────────────
training_configs = [
    {"lr": 1e-3, "batch": 32,  "epochs": 5},
    {"lr": 5e-4, "batch": 64,  "epochs": 5},
    {"lr": 1e-4, "batch": 32,  "epochs": 10},
    {"lr": 5e-3, "batch": 128, "epochs": 3},
    {"lr": 2e-4, "batch": 64,  "epochs": 8},
    {"lr": 1e-3, "batch": 128, "epochs": 5},
]

for cfg in training_configs:
    job = TrainingJob(
        job_id=str(uuid.uuid4()),
        name=f"train-lr{cfg['lr']}-bs{cfg['batch']}",
        image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-devel",
        command=(
            f"python /workspace/train.py "
            f"--lr {cfg['lr']} "
            f"--batch-size {cfg['batch']} "
            f"--epochs {cfg['epochs']}"
        ),
        gpu_type="RTX 4090",
        max_price=0.45,
        priority=5,
        max_runtime_hr=2.0,
        env={"WANDB_PROJECT": "autoscale-demo"},
        checkpoint_dir=f"/checkpoints/run-{cfg['lr']}",
    )
    pipeline.submit(job)

# ── 4. Start dashboard ────────────────────────────────────────────
dashboard = PipelineDashboard(pipeline.queue, refresh_rate=3.0)
dashboard.run_background()

# ── 5. Run (blocks until all jobs complete) ───────────────────────
pipeline.start(block=True)

print("\n✅ All jobs done!")
```

## Scaling Behaviour Reference

| Queue Depth          | Active Workers  | Action         |
| -------------------- | --------------- | -------------- |
| ≥ 2 pending          | 0 workers       | Scale up to 1  |
| ≥ 2 pending          | 1 worker        | Scale up to 2  |
| ≥ 2 pending          | 4 workers (max) | Hold           |
| < 1 pending          | 2 workers idle  | Scale down     |
| 0 pending, 0 running | any             | Pipeline exits |

## Cost Estimation

| Workers | GPU      | Price/hr | 6-job run (est.) |
| ------- | -------- | -------- | ---------------- |
| 1       | RTX 4090 | $0.35    | \~$2.80 (8hr)    |
| 2       | RTX 4090 | $0.70    | \~$2.10 (3hr)    |
| 4       | RTX 4090 | $1.40    | \~$1.75 (1.25hr) |

*With `max_hourly_budget=8.0` you can safely run up to 10 parallel workers.*

## Budget Guard Rails

The scaler checks `_current_spend()` before every scale-up event. If adding another worker would exceed `max_hourly_budget`, it logs a warning and waits for the next poll cycle. This prevents runaway spend even if a bug submits hundreds of jobs.

```python
# Hard cap: never exceed $5/hr
config = ScalerConfig(
    max_hourly_budget=5.0,
    max_workers=10,          # theoretical max; budget will kick in first
    max_price_per_gpu=0.50,
)
```

## Best Practices

1. **Set `max_hourly_budget` first.** It's your safety net — set it before anything else.
2. **Use `min_workers=0`** for batch workloads; only pay when jobs are actually running.
3. **Set `idle_timeout_sec=60–120`** to avoid paying for idle GPUs between jobs.
4. **Use `priority` field** to push urgent jobs to the front of the queue without rewriting code.
5. **Save checkpoints every N epochs** so a failed worker doesn't waste all previous compute.
6. **Keep `max_price_per_gpu` honest** — too low means the scaler won't find servers; too high wastes money.

## Monitoring & Alerts

The dashboard gives you real-time visibility in the terminal. For production, add a simple notifier:

```python
import requests

def notify_slack(webhook_url: str, message: str):
    requests.post(webhook_url, json={"text": message})

# In your worker loop, after a job completes:
notify_slack(
    SLACK_WEBHOOK,
    f"✅ Job '{job.name}' done — cost ${cost:.4f}"
)
```

## Next Steps

* [Hyperparameter Sweeps with Optuna](https://docs.clore.ai/dev/machine-learning-and-training/hyperparameter-sweeps)
* [Distributed Training Across Multiple Servers](https://docs.clore.ai/dev/machine-learning-and-training/distributed-training)
* [Training Scheduler: Auto-Rent When Price Drops](https://docs.clore.ai/dev/machine-learning-and-training/training-scheduler)
