Copy #!/usr/bin/env python3
"""
Training Scheduler - Auto-rent GPUs when price drops.
Usage:
python training_scheduler.py --api-key YOUR_API_KEY add --name "My Training" \
--gpu "RTX 4090" --max-price 0.35 --command "python train.py"
python training_scheduler.py --api-key YOUR_API_KEY start
python training_scheduler.py --api-key YOUR_API_KEY status
"""
import argparse
import json
import time
import secrets
import threading
import requests
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
import uuid
@dataclass
class Job:
id: str
name: str
gpu: str
max_price: float
command: str
image: str
status: str = "pending"
order_id: int = None
ssh: str = None
actual_price: float = None
started_at: float = None
def to_dict(self):
return {
"id": self.id, "name": self.name, "gpu": self.gpu,
"max_price": self.max_price, "command": self.command,
"status": self.status, "order_id": self.order_id,
"actual_price": self.actual_price
}
class TrainingScheduler:
"""Auto-rent GPUs when price drops."""
BASE_URL = "https://api.clore.ai"
JOBS_FILE = "scheduler_jobs.json"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {"auth": api_key}
self.jobs: Dict[str, Job] = {}
self._running = False
self._load_jobs()
def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
url = f"{self.BASE_URL}{endpoint}"
for attempt in range(3):
response = requests.request(method, url, headers=self.headers, **kwargs)
data = response.json()
if data.get("code") == 5:
time.sleep(2 ** attempt)
continue
if data.get("code") != 0:
raise Exception(f"API Error: {data}")
return data
raise Exception("Max retries")
def _load_jobs(self):
try:
with open(self.JOBS_FILE) as f:
for j in json.load(f):
self.jobs[j["id"]] = Job(**j)
except FileNotFoundError:
pass
def _save_jobs(self):
with open(self.JOBS_FILE, "w") as f:
json.dump([j.to_dict() for j in self.jobs.values()], f, indent=2)
def add_job(self, name: str, gpu: str, max_price: float, command: str,
image: str = "pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime") -> Job:
job = Job(
id=str(uuid.uuid4())[:8],
name=name,
gpu=gpu,
max_price=max_price,
command=command,
image=image
)
self.jobs[job.id] = job
self._save_jobs()
print(f"β
Job added: {job.id} - {name}")
return job
def get_prices(self) -> Dict[str, float]:
"""Get current min spot prices per GPU type."""
data = self._api("GET", "/v1/marketplace")
prices = {}
for s in data.get("servers", []):
if s.get("rented"):
continue
gpus = s.get("gpu_array", [])
if not gpus:
continue
gpu = self._normalize_gpu(gpus[0])
price = s.get("price", {}).get("usd", {}).get("spot")
if price:
if gpu not in prices or price < prices[gpu]:
prices[gpu] = price
return prices
def _normalize_gpu(self, name: str) -> str:
for gpu in ["RTX 4090", "RTX 4080", "RTX 3090", "RTX 3080", "A100", "A6000"]:
if gpu.lower().replace(" ", "") in name.lower().replace(" ", ""):
return gpu
return name
def _find_server(self, gpu: str, max_price: float) -> Optional[Dict]:
data = self._api("GET", "/v1/marketplace")
for s in data.get("servers", []):
if s.get("rented"):
continue
gpus = s.get("gpu_array", [])
if not any(gpu.lower() in g.lower() for g in gpus):
continue
price = s.get("price", {}).get("usd", {}).get("spot")
if price and price <= max_price:
return {"id": s["id"], "price": price, "gpus": gpus}
return None
def _launch_job(self, job: Job) -> bool:
server = self._find_server(job.gpu, job.max_price)
if not server:
return False
password = secrets.token_urlsafe(16)
order_data = {
"renting_server": server["id"],
"type": "spot",
"currency": "CLORE-Blockchain",
"image": job.image,
"ports": {"22": "tcp"},
"env": {"NVIDIA_VISIBLE_DEVICES": "all"},
"ssh_password": password,
"spotprice": server["price"] * 1.1
}
if job.command:
order_data["command"] = job.command
result = self._api("POST", "/v1/create_order", json=order_data)
order_id = result["order_id"]
# Wait for ready
for _ in range(90):
orders = self._api("GET", "/v1/my_orders")["orders"]
order = next((o for o in orders if o["order_id"] == order_id), None)
if order and order.get("status") == "running":
job.status = "running"
job.order_id = order_id
job.actual_price = server["price"]
job.started_at = time.time()
job.ssh = f"{order['connection']['ssh']} (pw: {password})"
self._save_jobs()
print(f"π Job {job.id} started @ ${server['price']:.3f}/hr")
print(f" SSH: {job.ssh}")
return True
time.sleep(2)
self._api("POST", "/v1/cancel_order", json={"id": order_id})
return False
def _check_pending(self):
prices = self.get_prices()
for job in self.jobs.values():
if job.status != "pending":
continue
current_price = prices.get(job.gpu)
if current_price and current_price <= job.max_price:
print(f"π― Price alert! {job.gpu}: ${current_price:.3f} (target: ${job.max_price})")
self._launch_job(job)
def _check_running(self):
orders = self._api("GET", "/v1/my_orders")["orders"]
for job in self.jobs.values():
if job.status != "running":
continue
order = next((o for o in orders if o["order_id"] == job.order_id), None)
if not order or order.get("status") == "expired":
runtime = (time.time() - job.started_at) / 3600 if job.started_at else 0
cost = runtime * (job.actual_price or 0)
job.status = "completed"
self._save_jobs()
print(f"β
Job {job.id} completed ({runtime:.1f}h, ${cost:.4f})")
def run(self, interval: int = 300):
"""Run scheduler loop."""
self._running = True
print(f"π Scheduler started (checking every {interval}s)")
while self._running:
try:
self._check_pending()
self._check_running()
except Exception as e:
print(f"Error: {e}")
time.sleep(interval)
def stop(self):
self._running = False
def status(self):
"""Print current status."""
prices = self.get_prices()
print("\n" + "="*60)
print("π SCHEDULER STATUS")
print("="*60)
print("\nπ° Current Prices:")
for gpu, price in sorted(prices.items()):
print(f" {gpu:15} ${price:.3f}/hr")
print(f"\nπ Jobs ({len(self.jobs)}):")
for job in self.jobs.values():
emoji = {"pending": "β³", "running": "π’", "completed": "β
", "failed": "β"}.get(job.status, "β")
price_status = ""
if job.status == "pending":
current = prices.get(job.gpu, float('inf'))
if current <= job.max_price:
price_status = " π― READY!"
else:
price_status = f" (current: ${current:.3f})"
print(f" {emoji} {job.id} | {job.name:20} | {job.gpu:12} | max ${job.max_price:.2f}{price_status}")
print("="*60 + "\n")
def cancel(self, job_id: str):
job = self.jobs.get(job_id)
if not job:
print(f"Job not found: {job_id}")
return
if job.status == "running" and job.order_id:
self._api("POST", "/v1/cancel_order", json={"id": job.order_id})
job.status = "cancelled"
self._save_jobs()
print(f"π Job {job_id} cancelled")
def remove(self, job_id: str):
if job_id in self.jobs:
del self.jobs[job_id]
self._save_jobs()
print(f"ποΈ Job {job_id} removed")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--api-key", required=True)
parser.add_argument("action", choices=["add", "start", "status", "cancel", "remove", "prices"])
parser.add_argument("--name", help="Job name")
parser.add_argument("--gpu", default="RTX 4090")
parser.add_argument("--max-price", type=float, default=0.40)
parser.add_argument("--command", default="")
parser.add_argument("--image", default="pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime")
parser.add_argument("--job-id")
parser.add_argument("--interval", type=int, default=300)
args = parser.parse_args()
scheduler = TrainingScheduler(args.api_key)
if args.action == "add":
if not args.name:
print("--name required")
return
scheduler.add_job(args.name, args.gpu, args.max_price, args.command, args.image)
elif args.action == "start":
try:
scheduler.run(args.interval)
except KeyboardInterrupt:
print("\nStopping...")
scheduler.stop()
elif args.action == "status":
scheduler.status()
elif args.action == "cancel":
if not args.job_id:
print("--job-id required")
return
scheduler.cancel(args.job_id)
elif args.action == "remove":
if not args.job_id:
print("--job-id required")
return
scheduler.remove(args.job_id)
elif args.action == "prices":
prices = scheduler.get_prices()
print("\nπ° Current GPU Prices (Spot):")
for gpu, price in sorted(prices.items()):
print(f" {gpu:15} ${price:.3f}/hr")
if __name__ == "__main__":
main()