SDK Automation Recipes
Recipe 1: GPU Auto-Scaler
"""
GPU Auto-Scaler
Watches a Redis queue and scales GPU workers up/down.
"""
import time
import logging
from dataclasses import dataclass, field
from typing import List
import redis
from clore_ai import CloreAI
from clore_ai.exceptions import CloreAPIError, InvalidInputError
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("autoscaler")
@dataclass
class ScalerConfig:
"""Auto-scaler configuration."""
queue_name: str = "gpu_tasks"
gpu_model: str = "RTX 4090"
max_price_usd: float = 0.60
image: str = "cloreai/pytorch"
currency: str = "bitcoin"
min_workers: int = 0
max_workers: int = 5
scale_up_threshold: int = 10 # Queue length to trigger scale-up
scale_down_threshold: int = 2 # Queue length to trigger scale-down
cooldown_seconds: int = 120 # Min time between scaling actions
check_interval: int = 30 # Seconds between checks
@dataclass
class WorkerState:
"""Tracks active GPU workers."""
order_ids: List[int] = field(default_factory=list)
last_scale_action: float = 0.0
def run_autoscaler(config: ScalerConfig):
client = CloreAI()
r = redis.Redis()
state = WorkerState()
log.info(f"Auto-scaler started: queue={config.queue_name}, gpu={config.gpu_model}")
while True:
try:
queue_len = r.llen(config.queue_name)
active = len(state.order_ids)
now = time.time()
cooldown_ok = (now - state.last_scale_action) > config.cooldown_seconds
log.info(f"Queue: {queue_len} | Workers: {active}/{config.max_workers}")
# --- Scale UP ---
if queue_len >= config.scale_up_threshold and active < config.max_workers and cooldown_ok:
servers = client.marketplace(
gpu=config.gpu_model,
max_price_usd=config.max_price_usd,
)
if servers:
servers.sort(key=lambda s: s.price_usd or float("inf"))
best = servers[0]
try:
order = client.create_order(
server_id=best.id,
image=config.image,
type="on-demand",
currency=config.currency,
ports={"22": "tcp"},
)
state.order_ids.append(order.id)
state.last_scale_action = now
log.info(f"⬆️ Scaled UP: order {order.id} on server {best.id}")
except InvalidInputError as e:
log.warning(f"Cannot rent server {best.id}: {e}")
else:
log.warning("No servers available matching criteria")
# --- Scale DOWN ---
elif queue_len <= config.scale_down_threshold and active > config.min_workers and cooldown_ok:
victim = state.order_ids.pop()
try:
client.cancel_order(victim, issue="Autoscaler: low demand")
state.last_scale_action = now
log.info(f"⬇️ Scaled DOWN: cancelled order {victim}")
except CloreAPIError as e:
log.warning(f"Failed to cancel order {victim}: {e}")
except Exception as e:
log.error(f"Autoscaler error: {e}")
time.sleep(config.check_interval)
if __name__ == "__main__":
run_autoscaler(ScalerConfig(
queue_name="inference_tasks",
gpu_model="RTX 4090",
max_workers=3,
))Recipe 2: Multi-GPU Orchestrator (Async)
Recipe 3: Spot Bidding Bot
Recipe 4: Server Health Checker with Auto-Recovery
Recipe 5: Budget-Aware Cost Tracker
Recipe 6: End-to-End Training Pipeline
Tips for All Recipes
1. Always use try/finally for cleanup
try/finally for cleanup2. Respect the rate limiter
3. Use async for concurrent operations
4. Handle transient errors gracefully
See Also
Last updated
Was this helpful?