Copy #!/usr/bin/env python3
"""
Spot Instance Auto-Rebid Manager for Clore.ai
Automatically monitors spot instances, handles preemptions, and rebids
to maintain continuous GPU availability.
Usage:
python spot_manager.py --api-key YOUR_API_KEY start
python spot_manager.py --api-key YOUR_API_KEY status
python spot_manager.py --api-key YOUR_API_KEY add-workload --gpu RTX\ 4090 --max-price 0.50
"""
import argparse
import json
import time
import threading
import requests
import secrets
from typing import Dict, List, Optional
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum
class WorkloadStatus(Enum):
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused" # Outbid, waiting for rebid
FAILED = "failed"
COMPLETED = "completed"
@dataclass
class SpotWorkload:
"""A workload that needs spot GPU resources."""
id: str
name: str
gpu_type: str
max_price: float
image: str
command: str
env: Dict[str, str] = field(default_factory=dict)
# State
status: WorkloadStatus = WorkloadStatus.PENDING
order_id: Optional[int] = None
server_id: Optional[int] = None
current_price: Optional[float] = None
ssh_connection: Optional[str] = None
# Tracking
total_runtime_seconds: float = 0
total_cost: float = 0
preemption_count: int = 0
created_at: datetime = field(default_factory=datetime.now)
last_started: Optional[datetime] = None
# Checkpoint
checkpoint_path: Optional[str] = None
checkpoint_command: Optional[str] = None
restore_command: Optional[str] = None
@dataclass
class SpotMarketData:
"""Spot market data for a server."""
server_id: int
current_bid: float
min_bid: float
my_bid: Optional[float]
is_winning: bool
class SpotManager:
"""Manages spot instances with automatic rebidding."""
BASE_URL = "https://api.clore.ai"
STATE_FILE = "spot_manager_state.json"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {"auth": api_key}
self.workloads: Dict[str, SpotWorkload] = {}
self._running = False
self._lock = threading.Lock()
# Config
self.check_interval = 60 # seconds
self.rebid_margin = 1.05 # 5% above current winning bid
self.max_rebid_attempts = 3
self.notification_callback = None
self._load_state()
def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
"""Make API request with retry."""
url = f"{self.BASE_URL}{endpoint}"
for attempt in range(3):
try:
response = requests.request(
method, url,
headers=self.headers,
timeout=30,
**kwargs
)
data = response.json()
if data.get("code") == 5:
time.sleep(2 ** attempt)
continue
if data.get("code") != 0:
raise Exception(f"API Error: {data}")
return data
except requests.exceptions.Timeout:
if attempt == 2:
raise
time.sleep(1)
raise Exception("Max retries exceeded")
def _notify(self, message: str):
"""Send notification."""
print(f"π’ [{datetime.now().strftime('%H:%M:%S')}] {message}")
if self.notification_callback:
self.notification_callback(message)
def _save_state(self):
"""Persist state to file."""
state = {
"workloads": {
wid: {
**asdict(w),
"status": w.status.value,
"created_at": w.created_at.isoformat() if w.created_at else None,
"last_started": w.last_started.isoformat() if w.last_started else None
}
for wid, w in self.workloads.items()
}
}
with open(self.STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
def _load_state(self):
"""Load state from file."""
try:
with open(self.STATE_FILE, "r") as f:
state = json.load(f)
for wid, data in state.get("workloads", {}).items():
data["status"] = WorkloadStatus(data.get("status", "pending"))
data["created_at"] = datetime.fromisoformat(data["created_at"]) if data.get("created_at") else datetime.now()
data["last_started"] = datetime.fromisoformat(data["last_started"]) if data.get("last_started") else None
self.workloads[wid] = SpotWorkload(**data)
except FileNotFoundError:
pass
def add_workload(self,
name: str,
gpu_type: str,
max_price: float,
image: str = "nvidia/cuda:12.8.0-base-ubuntu22.04",
command: str = "",
checkpoint_command: str = None,
restore_command: str = None) -> SpotWorkload:
"""Add a new workload to manage."""
workload_id = f"w_{secrets.token_hex(4)}"
workload = SpotWorkload(
id=workload_id,
name=name,
gpu_type=gpu_type,
max_price=max_price,
image=image,
command=command,
checkpoint_command=checkpoint_command,
restore_command=restore_command
)
self.workloads[workload_id] = workload
self._save_state()
self._notify(f"Workload added: {name} (GPU: {gpu_type}, max: ${max_price}/hr)")
return workload
def remove_workload(self, workload_id: str):
"""Remove a workload."""
if workload_id in self.workloads:
workload = self.workloads[workload_id]
# Cancel order if running
if workload.order_id and workload.status == WorkloadStatus.RUNNING:
try:
self._api("POST", "/v1/cancel_order", json={"id": workload.order_id})
except:
pass
del self.workloads[workload_id]
self._save_state()
self._notify(f"Workload removed: {workload.name}")
def _find_server(self, gpu_type: str, max_price: float) -> Optional[Dict]:
"""Find available server matching criteria."""
data = self._api("GET", "/v1/marketplace")
candidates = []
for server in data.get("servers", []):
if server.get("rented"):
continue
gpu_array = server.get("gpu_array", [])
if not any(gpu_type.lower() in g.lower() for g in gpu_array):
continue
price = server.get("price", {}).get("usd", {}).get("spot")
if not price or price > max_price:
continue
candidates.append({
"id": server["id"],
"gpus": gpu_array,
"price": price,
"reliability": server.get("reliability", 0)
})
if not candidates:
return None
# Sort by price, then reliability
candidates.sort(key=lambda x: (x["price"], -x["reliability"]))
return candidates[0]
def _get_spot_market(self, server_id: int) -> Optional[SpotMarketData]:
"""Get spot market data for a server."""
try:
data = self._api("GET", f"/v1/spot_marketplace", params={"market": server_id})
market = data.get("market", {})
if not market:
return None
offers = market.get("offers", [])
min_pricing = market.get("server", {}).get("min_pricing", {})
current_bid = 0
my_bid = None
for offer in offers:
if offer.get("active"):
current_bid = max(current_bid, offer.get("bid", 0))
if offer.get("my"):
my_bid = offer.get("bid")
return SpotMarketData(
server_id=server_id,
current_bid=current_bid,
min_bid=min_pricing.get("CLORE-Blockchain", 0),
my_bid=my_bid,
is_winning=my_bid is not None and my_bid >= current_bid
)
except Exception as e:
print(f"Error getting spot market: {e}")
return None
def _provision_workload(self, workload: SpotWorkload) -> bool:
"""Provision a spot instance for workload."""
# Find server
server = self._find_server(workload.gpu_type, workload.max_price)
if not server:
return False
ssh_password = secrets.token_urlsafe(16)
# Create spot order
order_data = {
"renting_server": server["id"],
"type": "spot",
"currency": "CLORE-Blockchain",
"image": workload.image,
"ports": {"22": "tcp"},
"env": {**workload.env, "NVIDIA_VISIBLE_DEVICES": "all"},
"ssh_password": ssh_password,
"spotprice": min(server["price"] * self.rebid_margin, workload.max_price)
}
if workload.command:
order_data["command"] = workload.command
try:
result = self._api("POST", "/v1/create_order", json=order_data)
order_id = result["order_id"]
# Wait for order to start
for _ in range(90):
orders = self._api("GET", "/v1/my_orders")["orders"]
order = next((o for o in orders if o["order_id"] == order_id), None)
if order and order.get("status") == "running":
workload.order_id = order_id
workload.server_id = server["id"]
workload.current_price = server["price"]
workload.status = WorkloadStatus.RUNNING
workload.last_started = datetime.now()
workload.ssh_connection = f"{order['connection']['ssh']} (pw: {ssh_password})"
self._save_state()
self._notify(f"Workload started: {workload.name} @ ${server['price']:.3f}/hr")
return True
time.sleep(2)
# Timeout - cancel
self._api("POST", "/v1/cancel_order", json={"id": order_id})
return False
except Exception as e:
print(f"Error provisioning: {e}")
return False
def _handle_preemption(self, workload: SpotWorkload):
"""Handle spot preemption."""
workload.preemption_count += 1
workload.status = WorkloadStatus.PAUSED
# Update runtime
if workload.last_started:
runtime = (datetime.now() - workload.last_started).total_seconds()
workload.total_runtime_seconds += runtime
workload.total_cost += (runtime / 3600) * (workload.current_price or 0)
self._save_state()
self._notify(f"β οΈ Workload preempted: {workload.name} (count: {workload.preemption_count})")
# Try to reprovision
self._provision_workload(workload)
def _rebid(self, workload: SpotWorkload, new_price: float) -> bool:
"""Adjust spot bid price."""
if not workload.order_id:
return False
try:
result = self._api("POST", "/v1/set_spot_price", json={
"order_id": workload.order_id,
"desired_price": new_price
})
if result.get("error"):
return False
workload.current_price = new_price
self._save_state()
self._notify(f"Rebid successful: {workload.name} @ ${new_price:.3f}/hr")
return True
except Exception as e:
print(f"Rebid failed: {e}")
return False
def _check_workloads(self):
"""Check all workloads and handle issues."""
# Get current orders
try:
orders_data = self._api("GET", "/v1/my_orders")
orders = {o["order_id"]: o for o in orders_data.get("orders", [])}
except Exception as e:
print(f"Error getting orders: {e}")
return
with self._lock:
for workload in self.workloads.values():
if workload.status == WorkloadStatus.COMPLETED:
continue
if workload.status == WorkloadStatus.RUNNING:
# Check if order still exists and running
order = orders.get(workload.order_id)
if not order:
# Order gone - preempted or cancelled
self._handle_preemption(workload)
elif order.get("status") == "paused":
# Outbid
self._handle_preemption(workload)
elif order.get("status") == "expired":
# Completed
workload.status = WorkloadStatus.COMPLETED
self._save_state()
self._notify(f"β
Workload completed: {workload.name}")
elif workload.status in (WorkloadStatus.PENDING, WorkloadStatus.PAUSED):
# Try to provision
self._provision_workload(workload)
def _monitor_loop(self):
"""Main monitoring loop."""
while self._running:
try:
self._check_workloads()
except Exception as e:
print(f"Monitor error: {e}")
time.sleep(self.check_interval)
def start(self):
"""Start the spot manager."""
self._running = True
thread = threading.Thread(target=self._monitor_loop, daemon=True)
thread.start()
self._notify("Spot Manager started")
# Initial check
self._check_workloads()
return thread
def stop(self):
"""Stop the spot manager."""
self._running = False
self._notify("Spot Manager stopped")
def get_status(self) -> Dict:
"""Get current status."""
return {
"running": self._running,
"workloads": [
{
"id": w.id,
"name": w.name,
"status": w.status.value,
"gpu": w.gpu_type,
"price": w.current_price,
"runtime_hours": w.total_runtime_seconds / 3600,
"cost": w.total_cost,
"preemptions": w.preemption_count,
"ssh": w.ssh_connection
}
for w in self.workloads.values()
]
}
def cancel_workload(self, workload_id: str):
"""Cancel a running workload."""
workload = self.workloads.get(workload_id)
if not workload:
return
if workload.order_id and workload.status == WorkloadStatus.RUNNING:
try:
self._api("POST", "/v1/cancel_order", json={"id": workload.order_id})
except:
pass
workload.status = WorkloadStatus.COMPLETED
self._save_state()
self._notify(f"Workload cancelled: {workload.name}")
def main():
parser = argparse.ArgumentParser(description="Spot Instance Auto-Rebid Manager")
parser.add_argument("--api-key", required=True)
parser.add_argument("action", choices=["start", "status", "add-workload", "remove", "cancel"])
parser.add_argument("--name", help="Workload name")
parser.add_argument("--gpu", default="RTX 4090")
parser.add_argument("--max-price", type=float, default=0.50)
parser.add_argument("--image", default="nvidia/cuda:12.8.0-base-ubuntu22.04")
parser.add_argument("--command", default="")
parser.add_argument("--workload-id", help="Workload ID for remove/cancel")
parser.add_argument("--interval", type=int, default=60, help="Check interval in seconds")
args = parser.parse_args()
manager = SpotManager(args.api_key)
manager.check_interval = args.interval
if args.action == "add-workload":
if not args.name:
print("--name required")
return
manager.add_workload(
name=args.name,
gpu_type=args.gpu,
max_price=args.max_price,
image=args.image,
command=args.command
)
elif args.action == "start":
try:
thread = manager.start()
print("Press Ctrl+C to stop...")
while True:
time.sleep(1)
except KeyboardInterrupt:
manager.stop()
elif args.action == "status":
status = manager.get_status()
print("\n" + "="*70)
print("π SPOT MANAGER STATUS")
print("="*70)
print(f"\nπ Running: {status['running']}")
print(f"\nπ Workloads ({len(status['workloads'])}):")
for w in status['workloads']:
emoji = {
"running": "π’",
"paused": "π‘",
"pending": "β³",
"completed": "β
",
"failed": "β"
}.get(w['status'], "β")
print(f"\n {emoji} {w['id']} | {w['name']}")
print(f" GPU: {w['gpu']} | Status: {w['status']}")
print(f" Price: ${w['price']:.3f}/hr" if w['price'] else " Price: N/A")
print(f" Runtime: {w['runtime_hours']:.2f}h | Cost: ${w['cost']:.4f}")
print(f" Preemptions: {w['preemptions']}")
if w['ssh']:
print(f" SSH: {w['ssh']}")
print("\n" + "="*70)
elif args.action == "remove":
if not args.workload_id:
print("--workload-id required")
return
manager.remove_workload(args.workload_id)
elif args.action == "cancel":
if not args.workload_id:
print("--workload-id required")
return
manager.cancel_workload(args.workload_id)
if __name__ == "__main__":
main()