Building a GPU Job Scheduler

What We're Building

A complete job scheduling system that queues GPU tasks, automatically provisions Clore servers, runs workloads, and manages resources efficiently — perfect for batch processing, ML training pipelines, and automated workflows.

Prerequisites

  • Clore.ai API key

  • Python 3.10+

  • Redis (for job queue) or SQLite (simpler)

Step 1: Job Definition

# scheduler/models.py
"""Job and queue data models."""

import uuid
from datetime import datetime
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List

class JobStatus(Enum):
    PENDING = "pending"
    QUEUED = "queued"
    PROVISIONING = "provisioning"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

class JobPriority(Enum):
    LOW = 0
    NORMAL = 1
    HIGH = 2
    URGENT = 3

@dataclass
class GPURequirements:
    """GPU requirements for a job."""
    gpu_type: str = "RTX"  # Partial match: "RTX", "RTX 4090", "A100"
    min_vram_gb: int = 8
    min_gpu_count: int = 1
    max_price_usd: float = 1.0
    prefer_spot: bool = True

@dataclass
class Job:
    """A scheduled GPU job."""
    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    name: str = ""
    script: str = ""  # Bash script to run
    image: str = "nvidia/cuda:12.8.0-base-ubuntu22.04"
    
    # Requirements
    gpu_requirements: GPURequirements = field(default_factory=GPURequirements)
    timeout_minutes: int = 60
    max_retries: int = 2
    
    # Priority and scheduling
    priority: JobPriority = JobPriority.NORMAL
    scheduled_at: Optional[datetime] = None  # Run at specific time
    depends_on: List[str] = field(default_factory=list)  # Job IDs
    
    # Runtime state
    status: JobStatus = JobStatus.PENDING
    server_id: Optional[int] = None
    order_id: Optional[int] = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    retries: int = 0
    
    # Results
    output: str = ""
    error: str = ""
    cost_usd: float = 0.0
    
    # Metadata
    created_at: datetime = field(default_factory=datetime.utcnow)
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def to_dict(self) -> Dict:
        """Convert to dictionary."""
        return {
            "id": self.id,
            "name": self.name,
            "status": self.status.value,
            "priority": self.priority.value,
            "gpu_type": self.gpu_requirements.gpu_type,
            "created_at": self.created_at.isoformat(),
            "started_at": self.started_at.isoformat() if self.started_at else None,
            "completed_at": self.completed_at.isoformat() if self.completed_at else None,
            "cost_usd": self.cost_usd,
            "retries": self.retries
        }

Step 2: Job Queue

Step 3: Job Executor

Step 4: The Scheduler

Step 5: REST API (Optional)

Usage Examples

Features

  • ✅ Priority-based scheduling (low → urgent)

  • ✅ Job dependencies (wait for other jobs)

  • ✅ Scheduled execution (run at specific time)

  • ✅ Automatic retries on failure

  • ✅ Concurrent job execution (configurable limit)

  • ✅ Persistent queue (SQLite)

  • ✅ Cost tracking

  • ✅ REST API interface

Next Steps

Last updated

Was this helpful?