# Building a GPU Job Scheduler

## What We're Building

A complete job scheduling system that queues GPU tasks, automatically provisions Clore servers, runs workloads, and manages resources efficiently — perfect for batch processing, ML training pipelines, and automated workflows.

## Prerequisites

* Clore.ai API key
* Python 3.10+
* Redis (for job queue) or SQLite (simpler)

## Step 1: Job Definition

```python
# scheduler/models.py
"""Job and queue data models."""

import uuid
from datetime import datetime
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List

class JobStatus(Enum):
    PENDING = "pending"
    QUEUED = "queued"
    PROVISIONING = "provisioning"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

class JobPriority(Enum):
    LOW = 0
    NORMAL = 1
    HIGH = 2
    URGENT = 3

@dataclass
class GPURequirements:
    """GPU requirements for a job."""
    gpu_type: str = "RTX"  # Partial match: "RTX", "RTX 4090", "A100"
    min_vram_gb: int = 8
    min_gpu_count: int = 1
    max_price_usd: float = 1.0
    prefer_spot: bool = True

@dataclass
class Job:
    """A scheduled GPU job."""
    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    name: str = ""
    script: str = ""  # Bash script to run
    image: str = "nvidia/cuda:12.8.0-base-ubuntu22.04"
    
    # Requirements
    gpu_requirements: GPURequirements = field(default_factory=GPURequirements)
    timeout_minutes: int = 60
    max_retries: int = 2
    
    # Priority and scheduling
    priority: JobPriority = JobPriority.NORMAL
    scheduled_at: Optional[datetime] = None  # Run at specific time
    depends_on: List[str] = field(default_factory=list)  # Job IDs
    
    # Runtime state
    status: JobStatus = JobStatus.PENDING
    server_id: Optional[int] = None
    order_id: Optional[int] = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    retries: int = 0
    
    # Results
    output: str = ""
    error: str = ""
    cost_usd: float = 0.0
    
    # Metadata
    created_at: datetime = field(default_factory=datetime.utcnow)
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def to_dict(self) -> Dict:
        """Convert to dictionary."""
        return {
            "id": self.id,
            "name": self.name,
            "status": self.status.value,
            "priority": self.priority.value,
            "gpu_type": self.gpu_requirements.gpu_type,
            "created_at": self.created_at.isoformat(),
            "started_at": self.started_at.isoformat() if self.started_at else None,
            "completed_at": self.completed_at.isoformat() if self.completed_at else None,
            "cost_usd": self.cost_usd,
            "retries": self.retries
        }
```

## Step 2: Job Queue

```python
# scheduler/queue.py
"""Job queue with SQLite backend."""

import sqlite3
import json
import threading
from datetime import datetime
from typing import List, Optional
from contextlib import contextmanager
from .models import Job, JobStatus, JobPriority, GPURequirements

class JobQueue:
    """Persistent job queue with SQLite."""
    
    def __init__(self, db_path: str = "jobs.db"):
        self.db_path = db_path
        self.lock = threading.Lock()
        self._init_db()
    
    @contextmanager
    def _get_conn(self):
        """Get database connection."""
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        try:
            yield conn
            conn.commit()
        finally:
            conn.close()
    
    def _init_db(self):
        """Initialize database schema."""
        with self._get_conn() as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS jobs (
                    id TEXT PRIMARY KEY,
                    name TEXT,
                    script TEXT,
                    image TEXT,
                    gpu_requirements TEXT,
                    timeout_minutes INTEGER,
                    max_retries INTEGER,
                    priority INTEGER,
                    scheduled_at TEXT,
                    depends_on TEXT,
                    status TEXT,
                    server_id INTEGER,
                    order_id INTEGER,
                    started_at TEXT,
                    completed_at TEXT,
                    retries INTEGER,
                    output TEXT,
                    error TEXT,
                    cost_usd REAL,
                    created_at TEXT,
                    metadata TEXT
                )
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_status ON jobs(status)
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_priority ON jobs(priority DESC)
            """)
    
    def _job_to_row(self, job: Job) -> tuple:
        """Convert job to database row."""
        return (
            job.id, job.name, job.script, job.image,
            json.dumps(job.gpu_requirements.__dict__),
            job.timeout_minutes, job.max_retries,
            job.priority.value,
            job.scheduled_at.isoformat() if job.scheduled_at else None,
            json.dumps(job.depends_on),
            job.status.value,
            job.server_id, job.order_id,
            job.started_at.isoformat() if job.started_at else None,
            job.completed_at.isoformat() if job.completed_at else None,
            job.retries, job.output, job.error, job.cost_usd,
            job.created_at.isoformat(),
            json.dumps(job.metadata)
        )
    
    def _row_to_job(self, row: sqlite3.Row) -> Job:
        """Convert database row to job."""
        gpu_req = GPURequirements(**json.loads(row["gpu_requirements"]))
        
        return Job(
            id=row["id"],
            name=row["name"],
            script=row["script"],
            image=row["image"],
            gpu_requirements=gpu_req,
            timeout_minutes=row["timeout_minutes"],
            max_retries=row["max_retries"],
            priority=JobPriority(row["priority"]),
            scheduled_at=datetime.fromisoformat(row["scheduled_at"]) if row["scheduled_at"] else None,
            depends_on=json.loads(row["depends_on"]),
            status=JobStatus(row["status"]),
            server_id=row["server_id"],
            order_id=row["order_id"],
            started_at=datetime.fromisoformat(row["started_at"]) if row["started_at"] else None,
            completed_at=datetime.fromisoformat(row["completed_at"]) if row["completed_at"] else None,
            retries=row["retries"],
            output=row["output"] or "",
            error=row["error"] or "",
            cost_usd=row["cost_usd"] or 0,
            created_at=datetime.fromisoformat(row["created_at"]),
            metadata=json.loads(row["metadata"])
        )
    
    def add(self, job: Job) -> str:
        """Add job to queue."""
        with self.lock:
            with self._get_conn() as conn:
                conn.execute("""
                    INSERT INTO jobs VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
                """, self._job_to_row(job))
        return job.id
    
    def get(self, job_id: str) -> Optional[Job]:
        """Get job by ID."""
        with self._get_conn() as conn:
            row = conn.execute(
                "SELECT * FROM jobs WHERE id = ?", (job_id,)
            ).fetchone()
            return self._row_to_job(row) if row else None
    
    def update(self, job: Job):
        """Update job in queue."""
        with self.lock:
            with self._get_conn() as conn:
                conn.execute("""
                    UPDATE jobs SET
                        status = ?, server_id = ?, order_id = ?,
                        started_at = ?, completed_at = ?, retries = ?,
                        output = ?, error = ?, cost_usd = ?
                    WHERE id = ?
                """, (
                    job.status.value, job.server_id, job.order_id,
                    job.started_at.isoformat() if job.started_at else None,
                    job.completed_at.isoformat() if job.completed_at else None,
                    job.retries, job.output, job.error, job.cost_usd,
                    job.id
                ))
    
    def get_next(self) -> Optional[Job]:
        """Get next job to run (highest priority, oldest first)."""
        with self.lock:
            with self._get_conn() as conn:
                # Check for jobs ready to run
                now = datetime.utcnow().isoformat()
                
                row = conn.execute("""
                    SELECT * FROM jobs 
                    WHERE status IN ('pending', 'queued')
                    AND (scheduled_at IS NULL OR scheduled_at <= ?)
                    ORDER BY priority DESC, created_at ASC
                    LIMIT 1
                """, (now,)).fetchone()
                
                if row:
                    job = self._row_to_job(row)
                    
                    # Check dependencies
                    if job.depends_on:
                        deps_complete = True
                        for dep_id in job.depends_on:
                            dep = self.get(dep_id)
                            if dep and dep.status != JobStatus.COMPLETED:
                                deps_complete = False
                                break
                        
                        if not deps_complete:
                            return None
                    
                    return job
                
                return None
    
    def list_jobs(self, status: Optional[JobStatus] = None, 
                  limit: int = 100) -> List[Job]:
        """List jobs, optionally filtered by status."""
        with self._get_conn() as conn:
            if status:
                rows = conn.execute("""
                    SELECT * FROM jobs WHERE status = ?
                    ORDER BY priority DESC, created_at DESC
                    LIMIT ?
                """, (status.value, limit)).fetchall()
            else:
                rows = conn.execute("""
                    SELECT * FROM jobs
                    ORDER BY priority DESC, created_at DESC
                    LIMIT ?
                """, (limit,)).fetchall()
            
            return [self._row_to_job(row) for row in rows]
    
    def get_stats(self) -> Dict:
        """Get queue statistics."""
        with self._get_conn() as conn:
            stats = {}
            for status in JobStatus:
                count = conn.execute(
                    "SELECT COUNT(*) FROM jobs WHERE status = ?",
                    (status.value,)
                ).fetchone()[0]
                stats[status.value] = count
            
            total_cost = conn.execute(
                "SELECT SUM(cost_usd) FROM jobs WHERE status = 'completed'"
            ).fetchone()[0] or 0
            
            stats["total_cost_usd"] = total_cost
            
            return stats
```

## Step 3: Job Executor

```python
# scheduler/executor.py
"""Execute jobs on Clore GPUs."""

import time
import threading
import paramiko
from datetime import datetime
from typing import Dict, Optional
import requests

from .models import Job, JobStatus, GPURequirements
from .queue import JobQueue

class CloreExecutor:
    """Execute jobs on Clore GPUs."""
    
    BASE_URL = "https://api.clore.ai"
    
    def __init__(self, api_key: str, ssh_password: str = "JobSched123!"):
        self.api_key = api_key
        self.headers = {"auth": api_key}
        self.ssh_password = ssh_password
    
    def _request(self, method: str, endpoint: str, **kwargs) -> Dict:
        url = f"{self.BASE_URL}{endpoint}"
        response = requests.request(method, url, headers=self.headers, **kwargs)
        data = response.json()
        if data.get("code") != 0:
            raise Exception(f"API Error: {data}")
        return data
    
    def find_server(self, requirements: GPURequirements) -> Optional[Dict]:
        """Find a server matching requirements."""
        servers = self._request("GET", "/v1/marketplace")["servers"]
        
        candidates = []
        for server in servers:
            if server.get("rented"):
                continue
            
            gpus = server.get("gpu_array", [])
            if not any(requirements.gpu_type in g for g in gpus):
                continue
            
            if len(gpus) < requirements.min_gpu_count:
                continue
            
            price = server.get("price", {}).get("usd", {}).get("on_demand_clore", 999)
            if price > requirements.max_price_usd:
                continue
            
            candidates.append({
                "id": server["id"],
                "gpus": gpus,
                "price": price,
                "reliability": server.get("reliability", 0)
            })
        
        if not candidates:
            return None
        
        # Sort by reliability, then price
        candidates.sort(key=lambda x: (-x["reliability"], x["price"]))
        return candidates[0]
    
    def provision_server(self, server_id: int, job: Job) -> int:
        """Provision a server for a job."""
        
        order_type = "spot" if job.gpu_requirements.prefer_spot else "on-demand"
        
        order_data = {
            "renting_server": server_id,
            "type": order_type,
            "currency": "CLORE-Blockchain",
            "image": job.image,
            "ports": {"22": "tcp"},
            "env": {"NVIDIA_VISIBLE_DEVICES": "all"},
            "ssh_password": self.ssh_password
        }
        
        if order_type == "spot":
            order_data["spotprice"] = job.gpu_requirements.max_price_usd * 0.8
        
        order = self._request("POST", "/v1/create_order", json=order_data)
        return order["order_id"]
    
    def wait_for_server(self, order_id: int, timeout: int = 180) -> Dict:
        """Wait for server to be ready."""
        start = time.time()
        while time.time() - start < timeout:
            orders = self._request("GET", "/v1/my_orders")["orders"]
            order = next((o for o in orders if o["order_id"] == order_id), None)
            
            if order and order.get("status") == "running":
                return order
            
            time.sleep(5)
        
        raise Exception("Timeout waiting for server")
    
    def run_script(self, host: str, port: int, script: str, 
                   timeout_minutes: int) -> tuple:
        """Run script on server via SSH."""
        
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        
        # Connect with retries
        for _ in range(5):
            try:
                client.connect(host, port=port, username="root",
                              password=self.ssh_password, timeout=30)
                break
            except:
                time.sleep(10)
        else:
            raise Exception("SSH connection failed")
        
        try:
            # Run script
            stdin, stdout, stderr = client.exec_command(
                f"bash << 'SCRIPT'\n{script}\nSCRIPT",
                timeout=timeout_minutes * 60,
                get_pty=True
            )
            
            output = stdout.read().decode()
            error = stderr.read().decode()
            exit_code = stdout.channel.recv_exit_status()
            
            return output, error, exit_code == 0
            
        finally:
            client.close()
    
    def release_server(self, order_id: int):
        """Release a server."""
        try:
            self._request("POST", "/v1/cancel_order", json={"id": order_id})
        except Exception as e:
            print(f"Warning: Failed to cancel order {order_id}: {e}")
    
    def execute(self, job: Job) -> Job:
        """Execute a job."""
        
        job.status = JobStatus.PROVISIONING
        job.started_at = datetime.utcnow()
        
        try:
            # Find server
            server = self.find_server(job.gpu_requirements)
            if not server:
                raise Exception(f"No server available for {job.gpu_requirements.gpu_type}")
            
            job.server_id = server["id"]
            
            # Provision
            job.order_id = self.provision_server(server["id"], job)
            
            # Wait for ready
            order = self.wait_for_server(job.order_id)
            
            # Parse SSH info
            ssh_info = order["connection"]["ssh"]
            parts = ssh_info.split()
            host = parts[1].split("@")[1]
            port = int(parts[3]) if len(parts) > 3 else 22
            
            job.status = JobStatus.RUNNING
            
            # Run script
            output, error, success = self.run_script(
                host, port, job.script, job.timeout_minutes
            )
            
            job.output = output
            job.error = error
            
            if success:
                job.status = JobStatus.COMPLETED
            else:
                job.status = JobStatus.FAILED
            
            # Calculate cost
            duration_hours = (datetime.utcnow() - job.started_at).total_seconds() / 3600
            job.cost_usd = duration_hours * server["price"]
            
        except Exception as e:
            job.status = JobStatus.FAILED
            job.error = str(e)
        
        finally:
            job.completed_at = datetime.utcnow()
            
            # Always release server
            if job.order_id:
                self.release_server(job.order_id)
        
        return job
```

## Step 4: The Scheduler

```python
# scheduler/scheduler.py
"""Main scheduler that orchestrates job execution."""

import time
import threading
import logging
from datetime import datetime
from typing import Optional

from .models import Job, JobStatus, JobPriority, GPURequirements
from .queue import JobQueue
from .executor import CloreExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GPUScheduler:
    """GPU job scheduler."""
    
    def __init__(
        self,
        api_key: str,
        db_path: str = "jobs.db",
        max_concurrent: int = 3,
        poll_interval: int = 10
    ):
        self.queue = JobQueue(db_path)
        self.executor = CloreExecutor(api_key)
        self.max_concurrent = max_concurrent
        self.poll_interval = poll_interval
        
        self.running_jobs = {}  # job_id -> thread
        self.lock = threading.Lock()
        self._stop = threading.Event()
    
    def submit(
        self,
        name: str,
        script: str,
        gpu_type: str = "RTX",
        max_price: float = 0.50,
        timeout_minutes: int = 60,
        priority: JobPriority = JobPriority.NORMAL,
        image: str = "nvidia/cuda:12.8.0-base-ubuntu22.04",
        scheduled_at: Optional[datetime] = None,
        depends_on: list = None,
        metadata: dict = None
    ) -> str:
        """Submit a new job."""
        
        job = Job(
            name=name,
            script=script,
            image=image,
            gpu_requirements=GPURequirements(
                gpu_type=gpu_type,
                max_price_usd=max_price
            ),
            timeout_minutes=timeout_minutes,
            priority=priority,
            scheduled_at=scheduled_at,
            depends_on=depends_on or [],
            metadata=metadata or {}
        )
        
        job_id = self.queue.add(job)
        logger.info(f"Submitted job {job_id}: {name}")
        
        return job_id
    
    def cancel(self, job_id: str) -> bool:
        """Cancel a job."""
        job = self.queue.get(job_id)
        if not job:
            return False
        
        if job.status in [JobStatus.PENDING, JobStatus.QUEUED]:
            job.status = JobStatus.CANCELLED
            self.queue.update(job)
            logger.info(f"Cancelled job {job_id}")
            return True
        
        return False
    
    def get_status(self, job_id: str) -> Optional[dict]:
        """Get job status."""
        job = self.queue.get(job_id)
        return job.to_dict() if job else None
    
    def list_jobs(self, status: Optional[str] = None) -> list:
        """List jobs."""
        status_enum = JobStatus(status) if status else None
        jobs = self.queue.list_jobs(status_enum)
        return [j.to_dict() for j in jobs]
    
    def get_stats(self) -> dict:
        """Get scheduler statistics."""
        stats = self.queue.get_stats()
        stats["running_threads"] = len(self.running_jobs)
        stats["max_concurrent"] = self.max_concurrent
        return stats
    
    def _run_job(self, job: Job):
        """Run a single job (in thread)."""
        try:
            logger.info(f"Starting job {job.id}: {job.name}")
            
            result = self.executor.execute(job)
            self.queue.update(result)
            
            if result.status == JobStatus.COMPLETED:
                logger.info(f"Job {job.id} completed (${result.cost_usd:.4f})")
            else:
                logger.warning(f"Job {job.id} failed: {result.error[:100]}")
                
                # Retry if possible
                if result.retries < result.max_retries:
                    result.retries += 1
                    result.status = JobStatus.PENDING
                    self.queue.update(result)
                    logger.info(f"Job {job.id} queued for retry ({result.retries}/{result.max_retries})")
        
        finally:
            with self.lock:
                if job.id in self.running_jobs:
                    del self.running_jobs[job.id]
    
    def _scheduler_loop(self):
        """Main scheduler loop."""
        logger.info("Scheduler started")
        
        while not self._stop.is_set():
            try:
                # Check if we can start more jobs
                with self.lock:
                    available_slots = self.max_concurrent - len(self.running_jobs)
                
                if available_slots > 0:
                    # Get next job
                    job = self.queue.get_next()
                    
                    if job:
                        # Mark as queued
                        job.status = JobStatus.QUEUED
                        self.queue.update(job)
                        
                        # Start job thread
                        thread = threading.Thread(
                            target=self._run_job,
                            args=(job,),
                            daemon=True
                        )
                        
                        with self.lock:
                            self.running_jobs[job.id] = thread
                        
                        thread.start()
                
            except Exception as e:
                logger.error(f"Scheduler error: {e}")
            
            self._stop.wait(self.poll_interval)
        
        logger.info("Scheduler stopped")
    
    def start(self):
        """Start the scheduler."""
        self._stop.clear()
        self._thread = threading.Thread(target=self._scheduler_loop, daemon=True)
        self._thread.start()
    
    def stop(self):
        """Stop the scheduler."""
        self._stop.set()
        if hasattr(self, '_thread'):
            self._thread.join(timeout=5)
    
    def run_forever(self):
        """Run scheduler until interrupted."""
        self.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            logger.info("Shutting down...")
            self.stop()


# CLI interface
if __name__ == "__main__":
    import sys
    import argparse
    
    parser = argparse.ArgumentParser(description="Clore GPU Job Scheduler")
    subparsers = parser.add_subparsers(dest="command")
    
    # Start scheduler
    start_parser = subparsers.add_parser("start", help="Start scheduler")
    start_parser.add_argument("--api-key", required=True)
    start_parser.add_argument("--max-concurrent", type=int, default=3)
    
    # Submit job
    submit_parser = subparsers.add_parser("submit", help="Submit job")
    submit_parser.add_argument("--api-key", required=True)
    submit_parser.add_argument("--name", required=True)
    submit_parser.add_argument("--script", required=True)
    submit_parser.add_argument("--gpu-type", default="RTX")
    submit_parser.add_argument("--max-price", type=float, default=0.50)
    submit_parser.add_argument("--timeout", type=int, default=60)
    submit_parser.add_argument("--priority", choices=["low", "normal", "high", "urgent"], default="normal")
    
    # List jobs
    list_parser = subparsers.add_parser("list", help="List jobs")
    list_parser.add_argument("--api-key", required=True)
    list_parser.add_argument("--status", choices=["pending", "running", "completed", "failed"])
    
    # Get stats
    stats_parser = subparsers.add_parser("stats", help="Get statistics")
    stats_parser.add_argument("--api-key", required=True)
    
    args = parser.parse_args()
    
    if args.command == "start":
        scheduler = GPUScheduler(args.api_key, max_concurrent=args.max_concurrent)
        scheduler.run_forever()
    
    elif args.command == "submit":
        scheduler = GPUScheduler(args.api_key)
        
        # Read script from file
        with open(args.script) as f:
            script = f.read()
        
        priority_map = {
            "low": JobPriority.LOW,
            "normal": JobPriority.NORMAL,
            "high": JobPriority.HIGH,
            "urgent": JobPriority.URGENT
        }
        
        job_id = scheduler.submit(
            name=args.name,
            script=script,
            gpu_type=args.gpu_type,
            max_price=args.max_price,
            timeout_minutes=args.timeout,
            priority=priority_map[args.priority]
        )
        print(f"Job submitted: {job_id}")
    
    elif args.command == "list":
        scheduler = GPUScheduler(args.api_key)
        jobs = scheduler.list_jobs(args.status)
        
        print(f"{'ID':<10} {'Name':<20} {'Status':<12} {'Cost':<10}")
        print("-" * 55)
        for job in jobs:
            print(f"{job['id']:<10} {job['name']:<20} {job['status']:<12} ${job['cost_usd']:.4f}")
    
    elif args.command == "stats":
        scheduler = GPUScheduler(args.api_key)
        stats = scheduler.get_stats()
        
        print("Queue Statistics:")
        for key, value in stats.items():
            print(f"  {key}: {value}")
```

## Step 5: REST API (Optional)

```python
# scheduler/api.py
"""REST API for the scheduler."""

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime

from .scheduler import GPUScheduler
from .models import JobPriority

app = FastAPI(title="Clore GPU Scheduler API")
scheduler: Optional[GPUScheduler] = None

class JobSubmission(BaseModel):
    name: str
    script: str
    gpu_type: str = "RTX"
    max_price: float = 0.50
    timeout_minutes: int = 60
    priority: str = "normal"
    depends_on: List[str] = []

@app.on_event("startup")
async def startup():
    global scheduler
    import os
    api_key = os.environ.get("CLORE_API_KEY")
    if not api_key:
        raise ValueError("CLORE_API_KEY required")
    scheduler = GPUScheduler(api_key)
    scheduler.start()

@app.on_event("shutdown")
async def shutdown():
    if scheduler:
        scheduler.stop()

@app.post("/jobs")
async def submit_job(job: JobSubmission):
    """Submit a new job."""
    priority_map = {
        "low": JobPriority.LOW,
        "normal": JobPriority.NORMAL,
        "high": JobPriority.HIGH,
        "urgent": JobPriority.URGENT
    }
    
    job_id = scheduler.submit(
        name=job.name,
        script=job.script,
        gpu_type=job.gpu_type,
        max_price=job.max_price,
        timeout_minutes=job.timeout_minutes,
        priority=priority_map.get(job.priority, JobPriority.NORMAL),
        depends_on=job.depends_on
    )
    
    return {"job_id": job_id}

@app.get("/jobs")
async def list_jobs(status: Optional[str] = None):
    """List all jobs."""
    return scheduler.list_jobs(status)

@app.get("/jobs/{job_id}")
async def get_job(job_id: str):
    """Get job details."""
    job = scheduler.get_status(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="Job not found")
    return job

@app.delete("/jobs/{job_id}")
async def cancel_job(job_id: str):
    """Cancel a job."""
    if scheduler.cancel(job_id):
        return {"status": "cancelled"}
    raise HTTPException(status_code=400, detail="Cannot cancel job")

@app.get("/stats")
async def get_stats():
    """Get scheduler statistics."""
    return scheduler.get_stats()
```

## Usage Examples

```bash
# Start the scheduler daemon
python -m scheduler.scheduler start --api-key YOUR_KEY --max-concurrent 5

# Submit a job
python -m scheduler.scheduler submit \
    --api-key YOUR_KEY \
    --name "training-job" \
    --script ./train.sh \
    --gpu-type "RTX 4090" \
    --max-price 0.50 \
    --priority high

# List jobs
python -m scheduler.scheduler list --api-key YOUR_KEY --status running

# Get stats
python -m scheduler.scheduler stats --api-key YOUR_KEY

# Run as REST API
export CLORE_API_KEY="your_key"
uvicorn scheduler.api:app --host 0.0.0.0 --port 8000
```

## Features

* ✅ Priority-based scheduling (low → urgent)
* ✅ Job dependencies (wait for other jobs)
* ✅ Scheduled execution (run at specific time)
* ✅ Automatic retries on failure
* ✅ Concurrent job execution (configurable limit)
* ✅ Persistent queue (SQLite)
* ✅ Cost tracking
* ✅ REST API interface

## Next Steps

* [CI/CD Pipeline with GPU Testing](https://docs.clore.ai/dev/devops-and-automation/cicd-gpu-testing)
* [Monitoring with Prometheus + Grafana](https://docs.clore.ai/dev/devops-and-automation/prometheus-monitoring)
* [Cost Optimization Strategies](https://docs.clore.ai/dev/devops-and-automation/cost-optimization)
