Building a GPU Job Scheduler
What We're Building
Prerequisites
Step 1: Job Definition
# scheduler/models.py
"""Job and queue data models."""
import uuid
from datetime import datetime
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
class JobStatus(Enum):
PENDING = "pending"
QUEUED = "queued"
PROVISIONING = "provisioning"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class JobPriority(Enum):
LOW = 0
NORMAL = 1
HIGH = 2
URGENT = 3
@dataclass
class GPURequirements:
"""GPU requirements for a job."""
gpu_type: str = "RTX" # Partial match: "RTX", "RTX 4090", "A100"
min_vram_gb: int = 8
min_gpu_count: int = 1
max_price_usd: float = 1.0
prefer_spot: bool = True
@dataclass
class Job:
"""A scheduled GPU job."""
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
name: str = ""
script: str = "" # Bash script to run
image: str = "nvidia/cuda:12.8.0-base-ubuntu22.04"
# Requirements
gpu_requirements: GPURequirements = field(default_factory=GPURequirements)
timeout_minutes: int = 60
max_retries: int = 2
# Priority and scheduling
priority: JobPriority = JobPriority.NORMAL
scheduled_at: Optional[datetime] = None # Run at specific time
depends_on: List[str] = field(default_factory=list) # Job IDs
# Runtime state
status: JobStatus = JobStatus.PENDING
server_id: Optional[int] = None
order_id: Optional[int] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
retries: int = 0
# Results
output: str = ""
error: str = ""
cost_usd: float = 0.0
# Metadata
created_at: datetime = field(default_factory=datetime.utcnow)
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict:
"""Convert to dictionary."""
return {
"id": self.id,
"name": self.name,
"status": self.status.value,
"priority": self.priority.value,
"gpu_type": self.gpu_requirements.gpu_type,
"created_at": self.created_at.isoformat(),
"started_at": self.started_at.isoformat() if self.started_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"cost_usd": self.cost_usd,
"retries": self.retries
}Step 2: Job Queue
Step 3: Job Executor
Step 4: The Scheduler
Step 5: REST API (Optional)
Usage Examples
Features
Next Steps
Last updated
Was this helpful?