A production-grade auto-scaling pipeline that dynamically provisions and releases Clore GPUs based on your job queue depth. Submit training jobs, and the pipeline handles everything: finding servers, renting them, distributing work, managing checkpoints, and releasing resources when idle β all while respecting your hourly budget.
Key Features:
File-based job queue (no Redis required, optional Redis backend)
The queue is a simple JSON-lines file. Each line is one job. Works without any external services.
Step 2: Set Up the Clore Client
π¦ Using the standard Clore API client. See Clore API Client Reference for the full implementation and setup instructions. Save it as clore_client.py in your project.
Step 3: Worker β Runs One Job on a Rented GPU
Step 4: Auto-Scaling Scheduler
This is the brain. It watches queue depth, compares it against active workers, and decides when to scale up or down.
Step 5: Checkpoint Management
Checkpoints let jobs resume if a worker is evicted or fails mid-run.
Step 6: Terminal Dashboard
A live dashboard using rich so you can watch the pipeline in real time.
Step 7: Putting It All Together
Scaling Behaviour Reference
Queue Depth
Active Workers
Action
β₯ 2 pending
0 workers
Scale up to 1
β₯ 2 pending
1 worker
Scale up to 2
β₯ 2 pending
4 workers (max)
Hold
< 1 pending
2 workers idle
Scale down
0 pending, 0 running
any
Pipeline exits
Cost Estimation
Workers
GPU
Price/hr
6-job run (est.)
1
RTX 4090
$0.35
~$2.80 (8hr)
2
RTX 4090
$0.70
~$2.10 (3hr)
4
RTX 4090
$1.40
~$1.75 (1.25hr)
With max_hourly_budget=8.0 you can safely run up to 10 parallel workers.
Budget Guard Rails
The scaler checks _current_spend() before every scale-up event. If adding another worker would exceed max_hourly_budget, it logs a warning and waits for the next poll cycle. This prevents runaway spend even if a bug submits hundreds of jobs.
Best Practices
Set max_hourly_budget first. It's your safety net β set it before anything else.
Use min_workers=0 for batch workloads; only pay when jobs are actually running.
Set idle_timeout_sec=60β120 to avoid paying for idle GPUs between jobs.
Use priority field to push urgent jobs to the front of the queue without rewriting code.
Save checkpoints every N epochs so a failed worker doesn't waste all previous compute.
Keep max_price_per_gpu honest β too low means the scaler won't find servers; too high wastes money.
Monitoring & Alerts
The dashboard gives you real-time visibility in the terminal. For production, add a simple notifier:
# job_queue.py
"""File-based (and optional Redis) job queue for ML training jobs."""
import json
import os
import time
import uuid
import fcntl
import threading
from enum import Enum
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field, asdict
from datetime import datetime
class JobStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class TrainingJob:
"""A single training job in the queue."""
job_id: str
name: str
image: str
command: str
gpu_type: str
gpu_count: int = 1
max_price: float = 0.50 # USD/hr per GPU
priority: int = 5 # 1 (high) β 10 (low)
max_runtime_hr: float = 4.0
env: Dict[str, str] = field(default_factory=dict)
checkpoint_dir: str = ""
status: JobStatus = JobStatus.PENDING
worker_id: str = ""
order_id: int = 0
submitted_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
started_at: str = ""
finished_at: str = ""
cost_usd: float = 0.0
error: str = ""
def to_dict(self) -> Dict:
d = asdict(self)
d["status"] = self.status.value
return d
@classmethod
def from_dict(cls, d: Dict) -> "TrainingJob":
d = dict(d)
d["status"] = JobStatus(d.get("status", "pending"))
return cls(**d)
class FileJobQueue:
"""Thread-safe file-backed job queue."""
def __init__(self, queue_path: str = "jobs.jsonl"):
self.queue_path = queue_path
self._lock = threading.Lock()
if not os.path.exists(queue_path):
open(queue_path, "w").close()
# ββ internal helpers ββββββββββββββββββββββββββββββββββββββββββ
def _read_all(self) -> List[TrainingJob]:
jobs = []
with open(self.queue_path, "r") as f:
for line in f:
line = line.strip()
if line:
jobs.append(TrainingJob.from_dict(json.loads(line)))
return jobs
def _write_all(self, jobs: List[TrainingJob]):
with open(self.queue_path, "w") as f:
for job in jobs:
f.write(json.dumps(job.to_dict()) + "\n")
# ββ public API ββββββββββββββββββββββββββββββββββββββββββββββββ
def submit(self, job: TrainingJob) -> str:
"""Add a job to the queue."""
with self._lock:
jobs = self._read_all()
jobs.append(job)
self._write_all(jobs)
return job.job_id
def next_pending(self) -> Optional[TrainingJob]:
"""Return the highest-priority pending job (without claiming it)."""
with self._lock:
jobs = self._read_all()
pending = [j for j in jobs if j.status == JobStatus.PENDING]
if not pending:
return None
pending.sort(key=lambda j: j.priority)
return pending[0]
def claim(self, job_id: str, worker_id: str) -> bool:
"""Atomically claim a pending job for a worker."""
with self._lock:
jobs = self._read_all()
for job in jobs:
if job.job_id == job_id and job.status == JobStatus.PENDING:
job.status = JobStatus.RUNNING
job.worker_id = worker_id
job.started_at = datetime.utcnow().isoformat()
self._write_all(jobs)
return True
return False
def update(self, job_id: str, **kwargs):
"""Update fields on a job."""
with self._lock:
jobs = self._read_all()
for job in jobs:
if job.job_id == job_id:
for k, v in kwargs.items():
if hasattr(job, k):
setattr(job, k, v)
break
self._write_all(jobs)
def complete(self, job_id: str, cost_usd: float = 0.0):
"""Mark a job as done."""
self.update(
job_id,
status=JobStatus.DONE,
finished_at=datetime.utcnow().isoformat(),
cost_usd=cost_usd,
)
def fail(self, job_id: str, error: str = ""):
"""Mark a job as failed."""
self.update(
job_id,
status=JobStatus.FAILED,
finished_at=datetime.utcnow().isoformat(),
error=error,
)
def pending_count(self) -> int:
jobs = self._read_all()
return sum(1 for j in jobs if j.status == JobStatus.PENDING)
def running_count(self) -> int:
jobs = self._read_all()
return sum(1 for j in jobs if j.status == JobStatus.RUNNING)
def all_jobs(self) -> List[TrainingJob]:
return self._read_all()
def stats(self) -> Dict[str, int]:
jobs = self._read_all()
counts: Dict[str, int] = {s.value: 0 for s in JobStatus}
for job in jobs:
counts[job.status.value] += 1
return counts
from clore_client import CloreClient
client = CloreClient(api_key="your-api-key")
# worker.py
"""Worker that executes a single TrainingJob on a Clore GPU."""
import time
import uuid
import logging
import paramiko
from typing import Optional
from dataclasses import dataclass
from clore_client import CloreClient
from job_queue import FileJobQueue, TrainingJob, JobStatus
logger = logging.getLogger(__name__)
@dataclass
class WorkerState:
worker_id: str
server_id: int
order_id: int
job_id: str
ssh_host: str
ssh_port: int
started_at: float
price_per_hr: float
class Worker:
"""Manages the lifecycle of one rented GPU for one training job."""
def __init__(
self,
client: CloreClient,
queue: FileJobQueue,
ssh_password: str = "AutoScale123!",
):
self.client = client
self.queue = queue
self.ssh_password = ssh_password
self.worker_id = str(uuid.uuid4())[:8]
self.state: Optional[WorkerState] = None
# ββ provision βββββββββββββββββββββββββββββββββββββββββββββββββ
def provision(self, job: TrainingJob) -> WorkerState:
"""Find a server, rent it, wait until running."""
servers = self.client.find_servers(job.gpu_type, job.max_price)
if not servers:
raise RuntimeError(
f"No {job.gpu_type} servers below ${job.max_price}/hr"
)
server = servers[0]
logger.info(f"[{self.worker_id}] Renting server {server['id']} "
f"@ ${server['price']:.3f}/hr")
env = dict(job.env)
if job.checkpoint_dir:
env["CHECKPOINT_DIR"] = job.checkpoint_dir
order_id = self.client.create_order(
server_id=server["id"],
image=job.image,
command=job.command,
env=env,
ssh_password=self.ssh_password,
)
logger.info(f"[{self.worker_id}] Order {order_id} created, waiting for start...")
order = self.client.wait_for_order(order_id, timeout=300)
ssh_info = order.get("connection", {}).get("ssh", "")
parts = ssh_info.split()
host = parts[1].split("@")[1] if len(parts) > 1 else "127.0.0.1"
port = int(parts[3]) if len(parts) > 3 else 22
self.state = WorkerState(
worker_id=self.worker_id,
server_id=server["id"],
order_id=order_id,
job_id=job.job_id,
ssh_host=host,
ssh_port=port,
started_at=time.time(),
price_per_hr=server["price"],
)
return self.state
# ββ execution βββββββββββββββββββββββββββββββββββββββββββββββββ
def _ssh_exec(self, command: str, timeout: int = 7200) -> int:
"""Run command over SSH and return exit code."""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
for attempt in range(5):
try:
client.connect(
self.state.ssh_host,
port=self.state.ssh_port,
username="root",
password=self.ssh_password,
timeout=30,
)
break
except Exception as e:
if attempt == 4:
raise
logger.warning(f"SSH connect attempt {attempt+1} failed: {e}")
time.sleep(10)
_, stdout, stderr = client.exec_command(command, timeout=timeout)
# Stream output
for line in iter(stdout.readline, ""):
logger.info(f"[{self.worker_id}] {line.rstrip()}")
exit_code = stdout.channel.recv_exit_status()
client.close()
return exit_code
def run_job(self, job: TrainingJob) -> bool:
"""
Claim the job from the queue, run it, mark done/failed.
Returns True on success.
"""
if not self.queue.claim(job.job_id, self.worker_id):
logger.warning(f"[{self.worker_id}] Could not claim job {job.job_id}")
return False
self.queue.update(job.job_id, order_id=self.state.order_id)
try:
logger.info(f"[{self.worker_id}] Running job '{job.name}'")
exit_code = self._ssh_exec(job.command)
elapsed_hr = (time.time() - self.state.started_at) / 3600
cost = elapsed_hr * self.state.price_per_hr
if exit_code == 0:
self.queue.complete(job.job_id, cost_usd=cost)
logger.info(f"[{self.worker_id}] Job '{job.name}' done "
f"(${cost:.4f})")
return True
else:
self.queue.fail(job.job_id,
error=f"Exit code {exit_code}")
logger.error(f"[{self.worker_id}] Job '{job.name}' failed "
f"(exit {exit_code})")
return False
except Exception as e:
self.queue.fail(job.job_id, error=str(e))
logger.exception(f"[{self.worker_id}] Exception running job: {e}")
return False
# ββ cleanup βββββββββββββββββββββββββββββββββββββββββββββββββββ
def release(self):
"""Cancel the Clore order."""
if self.state:
self.client.cancel_order(self.state.order_id)
logger.info(f"[{self.worker_id}] Released order {self.state.order_id}")
self.state = None
# auto_scaler.py
"""Auto-scaling scheduler: provisions and releases Clore GPU workers."""
import time
import threading
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from clore_client import CloreClient
from job_queue import FileJobQueue, TrainingJob
from worker import Worker
logger = logging.getLogger(__name__)
@dataclass
class ScalerConfig:
"""Tunable knobs for the scaler."""
min_workers: int = 0
max_workers: int = 5
scale_up_threshold: int = 3 # pending jobs β add a worker
scale_down_threshold: int = 1 # pending jobs β remove idle worker
max_hourly_budget: float = 10.0 # USD/hr across all workers
poll_interval_sec: int = 30
idle_timeout_sec: int = 120 # release worker if idle this long
gpu_type: str = "RTX 4090"
max_price_per_gpu: float = 0.50
docker_image: str = "pytorch/pytorch:2.1.0-cuda12.1-cudnn8-devel"
ssh_password: str = "AutoScale123!"
class WorkerHandle:
"""Tracks a running worker thread."""
def __init__(self, worker: Worker):
self.worker = worker
self.thread: Optional[threading.Thread] = None
self.idle_since: Optional[float] = None
self.busy = False
class AutoScalingPipeline:
"""
Submit jobs, run pipeline.start(), and let the scaler handle everything.
Usage:
pipeline = AutoScalingPipeline(api_key="...", config=ScalerConfig(...))
pipeline.submit(job1)
pipeline.submit(job2)
pipeline.start() # blocks until all jobs are done
"""
def __init__(
self,
api_key: str,
config: Optional[ScalerConfig] = None,
queue_path: str = "jobs.jsonl",
):
self.client = CloreClient(api_key)
self.queue = FileJobQueue(queue_path)
self.config = config or ScalerConfig()
self._workers: Dict[str, WorkerHandle] = {} # worker_id β handle
self._lock = threading.Lock()
self._stop = threading.Event()
# ββ public API ββββββββββββββββββββββββββββββββββββββββββββββββ
def submit(self, job: TrainingJob) -> str:
"""Add a job to the queue."""
job_id = self.queue.submit(job)
logger.info(f"Submitted job '{job.name}' β {job_id}")
return job_id
def start(self, block: bool = True):
"""Start the scaling loop. If block=True, waits until all jobs done."""
logger.info("π AutoScalingPipeline started")
self._stop.clear()
if block:
self._run_loop()
else:
t = threading.Thread(target=self._run_loop, daemon=True)
t.start()
def stop(self):
"""Signal the pipeline to stop after current jobs finish."""
self._stop.set()
# ββ scaling logic βββββββββββββββββββββββββββββββββββββββββββββ
def _active_worker_count(self) -> int:
with self._lock:
return len(self._workers)
def _current_spend(self) -> float:
"""Sum hourly cost of all active workers."""
total = 0.0
with self._lock:
for h in self._workers.values():
if h.worker.state:
total += h.worker.state.price_per_hr
return total
def _should_scale_up(self, pending: int, active: int) -> bool:
if active >= self.config.max_workers:
return False
if pending < self.config.scale_up_threshold:
return False
if self._current_spend() >= self.config.max_hourly_budget:
logger.warning(
f"Budget cap: ${self._current_spend():.2f}/hr β₯ "
f"${self.config.max_hourly_budget:.2f}/hr β not scaling up"
)
return False
return True
def _should_scale_down(self, pending: int) -> bool:
if self._active_worker_count() <= self.config.min_workers:
return False
return pending < self.config.scale_down_threshold
def _scale_up(self):
"""Provision one new worker and start it in a thread."""
worker = Worker(
self.client,
self.queue,
ssh_password=self.config.ssh_password,
)
# Create a dummy job to provision on β actual job assigned in worker thread
handle = WorkerHandle(worker)
def worker_loop():
"""Keep taking jobs until idle timeout or queue empty."""
consecutive_empty = 0
while not self._stop.is_set():
job = self.queue.next_pending()
if job is None:
handle.idle_since = handle.idle_since or time.time()
idle_sec = time.time() - handle.idle_since
if idle_sec > self.config.idle_timeout_sec:
logger.info(
f"[{worker.worker_id}] Idle {idle_sec:.0f}s β releasing"
)
break
time.sleep(5)
consecutive_empty += 1
continue
handle.idle_since = None
handle.busy = True
# Provision server if not yet done
if worker.state is None:
# Find a job to determine GPU type / price
dummy_job = TrainingJob(
job_id="probe",
name="probe",
image=self.config.docker_image,
command="echo ok",
gpu_type=job.gpu_type or self.config.gpu_type,
max_price=job.max_price or self.config.max_price_per_gpu,
)
try:
worker.provision(dummy_job)
except Exception as e:
logger.error(f"Provision failed: {e}")
break
worker.run_job(job)
handle.busy = False
worker.release()
with self._lock:
self._workers.pop(worker.worker_id, None)
logger.info(f"[{worker.worker_id}] Worker exited")
handle.thread = threading.Thread(target=worker_loop, daemon=True)
with self._lock:
self._workers[worker.worker_id] = handle
handle.thread.start()
logger.info(f"Scaled UP β {len(self._workers)} workers")
def _scale_down_idle(self):
"""Signal the most-idle worker to stop."""
with self._lock:
idle_handles = [
h for h in self._workers.values()
if not h.busy
]
if idle_handles:
# Mark the oldest idle worker
target = idle_handles[0]
target.worker._stop_flag = True # checked in worker loop (optional)
logger.info(f"Scaled DOWN β signalled idle worker")
# ββ main loop βββββββββββββββββββββββββββββββββββββββββββββββββ
def _run_loop(self):
try:
while not self._stop.is_set():
pending = self.queue.pending_count()
active = self._active_worker_count()
running = self.queue.running_count()
logger.debug(
f"Queue: pending={pending} running={running} "
f"workers={active} spend=${self._current_spend():.2f}/hr"
)
# Scale up?
if self._should_scale_up(pending, active):
self._scale_up()
# Scale down?
elif self._should_scale_down(pending):
self._scale_down_idle()
# All done?
if pending == 0 and running == 0 and active == 0:
logger.info("β All jobs complete β pipeline done")
break
time.sleep(self.config.poll_interval_sec)
finally:
self._shutdown_all_workers()
def _shutdown_all_workers(self):
"""Cancel all orders on exit."""
logger.info("π§Ή Shutting down all workers...")
with self._lock:
handles = list(self._workers.values())
for h in handles:
h.worker.release()
logger.info("β All workers released")
# checkpoint_manager.py
"""Simple checkpoint manager for distributed auto-scaling jobs."""
import os
import json
import shutil
import hashlib
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime
class CheckpointManager:
"""
Save and load training checkpoints tied to a job ID.
Checkpoints are stored under:
<base_dir>/<job_id>/checkpoint_<epoch>.pt
<base_dir>/<job_id>/meta.json
"""
def __init__(self, base_dir: str = "./checkpoints"):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(parents=True, exist_ok=True)
def checkpoint_dir(self, job_id: str) -> Path:
d = self.base_dir / job_id
d.mkdir(parents=True, exist_ok=True)
return d
def save_meta(self, job_id: str, meta: Dict[str, Any]):
path = self.checkpoint_dir(job_id) / "meta.json"
with open(path, "w") as f:
json.dump({**meta, "updated_at": datetime.utcnow().isoformat()}, f, indent=2)
def load_meta(self, job_id: str) -> Optional[Dict[str, Any]]:
path = self.checkpoint_dir(job_id) / "meta.json"
if not path.exists():
return None
with open(path) as f:
return json.load(f)
def latest_checkpoint(self, job_id: str) -> Optional[Path]:
"""Return the path to the most recent checkpoint file, if any."""
d = self.checkpoint_dir(job_id)
checkpoints = sorted(d.glob("checkpoint_*.pt"))
return checkpoints[-1] if checkpoints else None
def prune_old(self, job_id: str, keep: int = 3):
"""Keep only the N most recent checkpoints."""
d = self.checkpoint_dir(job_id)
checkpoints = sorted(d.glob("checkpoint_*.pt"))
for old in checkpoints[:-keep]:
old.unlink()
# Example: wrap a PyTorch training loop with checkpoint support
TRAIN_WITH_RESUME = """
import torch
import torch.nn as nn
import os, json
from pathlib import Path
def train_with_resume(model, optimizer, loader, job_id, total_epochs=10):
ckpt_dir = Path(os.environ.get("CHECKPOINT_DIR", "./checkpoints")) / job_id
ckpt_dir.mkdir(parents=True, exist_ok=True)
# Resume?
start_epoch = 0
checkpoints = sorted(ckpt_dir.glob("checkpoint_*.pt"))
if checkpoints:
ckpt = torch.load(checkpoints[-1])
model.load_state_dict(ckpt["model"])
optimizer.load_state_dict(ckpt["optimizer"])
start_epoch = ckpt["epoch"] + 1
print(f"Resumed from epoch {start_epoch}")
criterion = nn.CrossEntropyLoss()
for epoch in range(start_epoch, total_epochs):
model.train()
for batch in loader:
x, y = batch
optimizer.zero_grad()
loss = criterion(model(x), y)
loss.backward()
optimizer.step()
# Save checkpoint
torch.save(
{"epoch": epoch, "model": model.state_dict(),
"optimizer": optimizer.state_dict()},
ckpt_dir / f"checkpoint_{epoch:04d}.pt"
)
print(f"Epoch {epoch} done")
return model
"""
# dashboard.py
"""Live terminal dashboard for the auto-scaling pipeline."""
import time
import threading
from datetime import datetime
from typing import List
from rich.console import Console
from rich.live import Live
from rich.table import Table
from rich.panel import Panel
from rich.layout import Layout
from rich.text import Text
from job_queue import FileJobQueue, JobStatus, TrainingJob
class PipelineDashboard:
"""Render a live terminal view of queue state and worker costs."""
def __init__(self, queue: FileJobQueue, refresh_rate: float = 2.0):
self.queue = queue
self.refresh_rate = refresh_rate
self._console = Console()
self._running = False
def _build_table(self, jobs: List[TrainingJob]) -> Table:
table = Table(
title="π Job Queue",
show_header=True,
header_style="bold cyan",
)
table.add_column("ID", style="dim", width=10)
table.add_column("Name", style="white", width=24)
table.add_column("GPU", style="yellow", width=12)
table.add_column("Status", width=12)
table.add_column("Worker", width=10)
table.add_column("Cost", justify="right", width=8)
status_colors = {
JobStatus.PENDING: "yellow",
JobStatus.RUNNING: "green",
JobStatus.DONE: "dim green",
JobStatus.FAILED: "red",
JobStatus.CANCELLED: "dim",
}
for job in jobs:
color = status_colors.get(job.status, "white")
status = Text(job.status.value.upper(), style=color)
cost = f"${job.cost_usd:.3f}" if job.cost_usd else "β"
table.add_row(
job.job_id[:8],
job.name[:24],
job.gpu_type,
status,
job.worker_id[:8] or "β",
cost,
)
return table
def _build_summary(self, jobs: List[TrainingJob]) -> Panel:
stats = {s.value: 0 for s in JobStatus}
total_cost = 0.0
for job in jobs:
stats[job.status.value] += 1
total_cost += job.cost_usd
lines = [
f"[cyan]Pending:[/] {stats['pending']}",
f"[green]Running:[/] {stats['running']}",
f"[dim green]Done:[/] {stats['done']}",
f"[red]Failed:[/] {stats['failed']}",
f"",
f"[bold]Total Cost:[/] ${total_cost:.4f}",
f"[dim]Updated: {datetime.now().strftime('%H:%M:%S')}[/]",
]
return Panel("\n".join(lines), title="π Summary", width=30)
def _render(self) -> Layout:
jobs = self.queue.all_jobs()
layout = Layout()
layout.split_row(
Layout(self._build_table(jobs), name="table"),
Layout(self._build_summary(jobs), name="summary", size=32),
)
return layout
def run(self):
"""Start the live dashboard (blocking)."""
self._running = True
with Live(self._render(), refresh_per_second=1 / self.refresh_rate,
screen=False) as live:
while self._running:
live.update(self._render())
time.sleep(self.refresh_rate)
def run_background(self):
"""Start dashboard in a background thread."""
t = threading.Thread(target=self.run, daemon=True)
t.start()
def stop(self):
self._running = False
# run_pipeline.py
"""Example: submit 6 training jobs and let the auto-scaler handle them."""
import logging
import uuid
from auto_scaler import AutoScalingPipeline, ScalerConfig
from job_queue import TrainingJob
from dashboard import PipelineDashboard
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
API_KEY = "YOUR_CLORE_API_KEY"
# ββ 1. Configure the scaler βββββββββββββββββββββββββββββββββββββββ
config = ScalerConfig(
min_workers=0,
max_workers=4,
scale_up_threshold=2, # spawn a new worker when β₯2 jobs waiting
scale_down_threshold=1, # release idle workers when <1 job waiting
max_hourly_budget=8.0, # never exceed $8/hr total
poll_interval_sec=20,
idle_timeout_sec=90,
gpu_type="RTX 4090",
max_price_per_gpu=0.45,
docker_image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-devel",
)
# ββ 2. Create pipeline ββββββββββββββββββββββββββββββββββββββββββββ
pipeline = AutoScalingPipeline(
api_key=API_KEY,
config=config,
queue_path="./my_jobs.jsonl",
)
# ββ 3. Submit jobs ββββββββββββββββββββββββββββββββββββββββββββββββ
training_configs = [
{"lr": 1e-3, "batch": 32, "epochs": 5},
{"lr": 5e-4, "batch": 64, "epochs": 5},
{"lr": 1e-4, "batch": 32, "epochs": 10},
{"lr": 5e-3, "batch": 128, "epochs": 3},
{"lr": 2e-4, "batch": 64, "epochs": 8},
{"lr": 1e-3, "batch": 128, "epochs": 5},
]
for cfg in training_configs:
job = TrainingJob(
job_id=str(uuid.uuid4()),
name=f"train-lr{cfg['lr']}-bs{cfg['batch']}",
image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-devel",
command=(
f"python /workspace/train.py "
f"--lr {cfg['lr']} "
f"--batch-size {cfg['batch']} "
f"--epochs {cfg['epochs']}"
),
gpu_type="RTX 4090",
max_price=0.45,
priority=5,
max_runtime_hr=2.0,
env={"WANDB_PROJECT": "autoscale-demo"},
checkpoint_dir=f"/checkpoints/run-{cfg['lr']}",
)
pipeline.submit(job)
# ββ 4. Start dashboard ββββββββββββββββββββββββββββββββββββββββββββ
dashboard = PipelineDashboard(pipeline.queue, refresh_rate=3.0)
dashboard.run_background()
# ββ 5. Run (blocks until all jobs complete) βββββββββββββββββββββββ
pipeline.start(block=True)
print("\nβ All jobs done!")
# Hard cap: never exceed $5/hr
config = ScalerConfig(
max_hourly_budget=5.0,
max_workers=10, # theoretical max; budget will kick in first
max_price_per_gpu=0.50,
)
import requests
def notify_slack(webhook_url: str, message: str):
requests.post(webhook_url, json={"text": message})
# In your worker loop, after a job completes:
notify_slack(
SLACK_WEBHOOK,
f"β Job '{job.name}' done β cost ${cost:.4f}"
)