This guide uses raw requests for learning purposes. For production, use the official SDK: pip install clore-ai β built-in rate limiter, async support, type hints.
A robust Python automation framework for Clore.ai that handles server discovery, order management, automatic retries, and cost tracking β the foundation for all your GPU automation needs.
Prerequisites
Clore.ai API key
Python 3.10+
requests, tenacity libraries
pipinstallrequeststenacity
Step 1: Set Up the Clore Client
π¦ Using the standard Clore API client. See Clore API Client Reference for the full implementation and setup instructions. Save it as clore_client.py in your project.
from clore_client import CloreClientclient =CloreClient(api_key="your-api-key")
clore_automation/client.py
"""Production-ready Clore.ai API client with retry logic."""
import requests import time import logging from typing import Dict, List, Optional, Any from dataclasses import dataclass from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
def matches(self, server: dict) -> bool:
"""Check if server matches filter criteria."""
# Check if rented
if server.get("rented"):
return False
# Check GPU type
if self.gpu_types:
gpu_array = server.get("gpu_array", [])
if not any(
any(gpu_type in gpu for gpu_type in self.gpu_types)
for gpu in gpu_array
):
return False
# Check GPU count
gpu_count = len(server.get("gpu_array", []))
if gpu_count < self.min_gpu_count:
return False
# Check price
price = server.get("price", {}).get("usd", {}).get("on_demand_clore")
if price and price > self.max_price_usd:
return False
# Check reliability
reliability = server.get("reliability", 0)
if reliability < self.min_reliability:
return False
# Check currencies
if self.currencies:
allowed = server.get("allowed_coins", [])
if not any(c in allowed for c in self.currencies):
return False
return True
# Check balance
balance = client.get_balance()
print(f"CLORE Balance: {balance}")
# Find RTX 4090 under $0.50/hr
filter = ServerFilter(
gpu_types=["RTX 4090"],
max_price_usd=0.50,
min_reliability=0.80
)
servers = client.find_servers(filter)
print(f"Found {len(servers)} matching servers")
if servers:
cheapest = servers[0]
print(f"Cheapest: Server {cheapest['id']} at ${cheapest['price']['usd']['on_demand_clore']:.2f}/hr")
## Step 2: Job Runner with Auto-Provisioning
```python
# clore_automation/job_runner.py
"""Run jobs on Clore with automatic GPU provisioning."""
import time
import subprocess
import tempfile
import os
from typing import Callable, Optional, Dict, Any
from dataclasses import dataclass
from client import CloreClient, ServerFilter, CloreAPIError
@dataclass
class JobResult:
"""Result of a job execution."""
success: bool
output: str
duration_minutes: float
cost_usd: float
server_id: int
order_id: int
class JobRunner:
"""Run jobs on Clore GPUs with automatic provisioning."""
def __init__(self, client: CloreClient, ssh_key_path: str = "~/.ssh/id_rsa"):
self.client = client
self.ssh_key_path = os.path.expanduser(ssh_key_path)
# Load SSH keys
if os.path.exists(self.ssh_key_path + ".pub"):
with open(self.ssh_key_path + ".pub") as f:
self.ssh_public_key = f.read().strip()
else:
self.ssh_public_key = None
def run_script(
self,
script: str,
filter: ServerFilter,
image: str = "nvidia/cuda:12.8.0-base-ubuntu22.04",
timeout_minutes: int = 60,
setup_commands: str = "",
) -> JobResult:
"""
Run a script on a Clore GPU.
Args:
script: Bash script to run
filter: Server filter criteria
image: Docker image to use
timeout_minutes: Maximum runtime
setup_commands: Commands to run before main script
Returns:
JobResult with output and cost info
"""
# Find server
server = self.client.find_cheapest(filter)
if not server:
raise CloreAPIError("No matching servers available")
server_id = server["id"]
price_usd = server["price"]["usd"]["on_demand_clore"]
print(f"π₯οΈ Using server {server_id} ({server['gpu_array']}) at ${price_usd:.2f}/hr")
# Create order
order = self.client.create_order(
server_id=server_id,
image=image,
ssh_key=self.ssh_public_key,
ssh_password="AutoJob123!" if not self.ssh_public_key else None,
ports={"22": "tcp"},
)
order_id = order["order_id"]
try:
# Wait for ready
print(f"β³ Waiting for server to start...")
active = self.client.wait_for_order(order_id, timeout=180)
# Parse SSH info
ssh_info = active["connection"]["ssh"]
parts = ssh_info.split()
host = parts[1].split("@")[1]
port = parts[3] if len(parts) > 3 else "22"
print(f"β Server ready: {ssh_info}")
# Build full script
full_script = f"""#!/bin/bash
set -e
{setup_commands}
# Main script
{script}
"""
# Run script via SSH
print(f"π Running job...")
start_time = time.time()
result = self._run_ssh_script(host, port, full_script, timeout_minutes * 60)
duration = (time.time() - start_time) / 60
cost = price_usd * duration / 60
return JobResult(
success=result["success"],
output=result["output"],
duration_minutes=duration,
cost_usd=cost,
server_id=server_id,
order_id=order_id
)
finally:
# Always cleanup
print(f"π§Ή Cleaning up order {order_id}...")
self.client.cancel_order(order_id)
def _run_ssh_script(self, host: str, port: str, script: str,
timeout: int) -> Dict[str, Any]:
"""Run a script over SSH."""
# Write script to temp file
with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f:
f.write(script)
script_path = f.name
try:
# Copy script
scp_cmd = [
"scp", "-o", "StrictHostKeyChecking=no",
"-P", port, script_path, f"root@{host}:/tmp/job.sh"
]
subprocess.run(scp_cmd, check=True, timeout=30)
# Run script
ssh_cmd = [
"ssh", "-o", "StrictHostKeyChecking=no",
"-p", port, f"root@{host}",
"chmod +x /tmp/job.sh && /tmp/job.sh"
]
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=timeout
)
return {
"success": result.returncode == 0,
"output": result.stdout + result.stderr
}
except subprocess.TimeoutExpired:
return {"success": False, "output": "Timeout exceeded"}
except Exception as e:
return {"success": False, "output": str(e)}
finally:
os.unlink(script_path)
def run_python(
self,
python_code: str,
filter: ServerFilter,
requirements: list = None,
**kwargs
) -> JobResult:
"""Run Python code on a GPU."""
setup = ""
if requirements:
setup = f"pip install {' '.join(requirements)}\n"
script = f"""
{setup}
python3 << 'PYTHON_EOF'
{python_code}
PYTHON_EOF
"""
return self.run_script(script, filter, **kwargs)
# === Usage Example ===
if __name__ == "__main__":
client = CloreClient("YOUR_API_KEY")
runner = JobRunner(client)
# Run a simple GPU test
filter = ServerFilter(
gpu_types=["RTX"],
max_price_usd=0.50
)
result = runner.run_script(
script="nvidia-smi && echo 'GPU test passed!'",
filter=filter,
timeout_minutes=5
)
print(f"\n{'='*50}")
print(f"β Success: {result.success}")
print(f"β±οΈ Duration: {result.duration_minutes:.1f} minutes")
print(f"π° Cost: ${result.cost_usd:.4f}")
print(f"\nπ Output:\n{result.output}")
# clore_automation/cost_tracker.py
"""Track and analyze Clore.ai spending."""
import json
import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from client import CloreClient
@dataclass
class CostEntry:
"""Single cost entry."""
timestamp: str
order_id: int
server_id: int
duration_minutes: float
cost_usd: float
gpu_type: str
job_name: str = ""
class CostTracker:
"""Track GPU rental costs."""
def __init__(self, data_file: str = "clore_costs.json"):
self.data_file = data_file
self.entries: List[CostEntry] = []
self._load()
def _load(self):
"""Load entries from file."""
if os.path.exists(self.data_file):
with open(self.data_file) as f:
data = json.load(f)
self.entries = [CostEntry(**e) for e in data]
def _save(self):
"""Save entries to file."""
with open(self.data_file, "w") as f:
json.dump([asdict(e) for e in self.entries], f, indent=2)
def add(self, order_id: int, server_id: int, duration_minutes: float,
cost_usd: float, gpu_type: str, job_name: str = ""):
"""Add a cost entry."""
entry = CostEntry(
timestamp=datetime.now().isoformat(),
order_id=order_id,
server_id=server_id,
duration_minutes=duration_minutes,
cost_usd=cost_usd,
gpu_type=gpu_type,
job_name=job_name
)
self.entries.append(entry)
self._save()
def get_total(self, days: int = None) -> float:
"""Get total cost, optionally filtered by days."""
if days:
cutoff = datetime.now() - timedelta(days=days)
entries = [e for e in self.entries
if datetime.fromisoformat(e.timestamp) > cutoff]
else:
entries = self.entries
return sum(e.cost_usd for e in entries)
def get_by_gpu(self, days: int = None) -> Dict[str, float]:
"""Get costs grouped by GPU type."""
if days:
cutoff = datetime.now() - timedelta(days=days)
entries = [e for e in self.entries
if datetime.fromisoformat(e.timestamp) > cutoff]
else:
entries = self.entries
by_gpu = {}
for e in entries:
by_gpu[e.gpu_type] = by_gpu.get(e.gpu_type, 0) + e.cost_usd
return by_gpu
def get_daily_summary(self, days: int = 7) -> List[Dict]:
"""Get daily cost summary."""
cutoff = datetime.now() - timedelta(days=days)
daily = {}
for e in self.entries:
ts = datetime.fromisoformat(e.timestamp)
if ts > cutoff:
day = ts.strftime("%Y-%m-%d")
if day not in daily:
daily[day] = {"date": day, "cost": 0, "minutes": 0, "jobs": 0}
daily[day]["cost"] += e.cost_usd
daily[day]["minutes"] += e.duration_minutes
daily[day]["jobs"] += 1
return sorted(daily.values(), key=lambda x: x["date"])
def print_report(self, days: int = 30):
"""Print cost report."""
print(f"\n{'='*50}")
print(f"π Clore.ai Cost Report (Last {days} days)")
print(f"{'='*50}")
total = self.get_total(days)
by_gpu = self.get_by_gpu(days)
daily = self.get_daily_summary(days)
print(f"\nπ° Total Spent: ${total:.2f}")
print(f"\nπ¦ By GPU Type:")
for gpu, cost in sorted(by_gpu.items(), key=lambda x: -x[1]):
print(f" {gpu}: ${cost:.2f}")
print(f"\nπ Daily Breakdown:")
for day in daily[-7:]: # Last 7 days
print(f" {day['date']}: ${day['cost']:.2f} ({day['jobs']} jobs, {day['minutes']:.0f} min)")
# Calculate averages
if daily:
avg_daily = total / len(daily)
print(f"\nπ Average: ${avg_daily:.2f}/day")
print(f" Projected Monthly: ${avg_daily * 30:.2f}")
# === Integration with JobRunner ===
class TrackedJobRunner:
"""Job runner with cost tracking."""
def __init__(self, client: CloreClient, tracker: CostTracker = None):
from job_runner import JobRunner
self.runner = JobRunner(client)
self.tracker = tracker or CostTracker()
def run(self, job_name: str, script: str, filter, **kwargs):
"""Run job and track costs."""
result = self.runner.run_script(script, filter, **kwargs)
# Track cost
self.tracker.add(
order_id=result.order_id,
server_id=result.server_id,
duration_minutes=result.duration_minutes,
cost_usd=result.cost_usd,
gpu_type=str(filter.gpu_types),
job_name=job_name
)
return result
if __name__ == "__main__":
tracker = CostTracker()
tracker.print_report(30)
#!/usr/bin/env python3
"""
Complete example: Run a PyTorch training job with cost tracking.
"""
from clore_automation.client import CloreClient, ServerFilter
from clore_automation.job_runner import JobRunner
from clore_automation.cost_tracker import CostTracker, TrackedJobRunner
def main():
# Initialize
client = CloreClient("YOUR_API_KEY")
tracker = CostTracker("training_costs.json")
runner = TrackedJobRunner(client, tracker)
# Define server requirements
filter = ServerFilter(
gpu_types=["RTX 4090", "RTX 3090"],
min_gpu_count=1,
max_price_usd=0.50,
min_reliability=0.80
)
# Training script
training_script = """
# Install dependencies
pip install torch torchvision wandb
# Training code
python3 << 'EOF'
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
# Check GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
print(f"GPU: {torch.cuda.get_device_name(0)}")
# Simple model
model = nn.Sequential(
nn.Flatten(),
nn.Linear(784, 256),
nn.ReLU(),
nn.Linear(256, 10)
).to(device)
# Data
transform = transforms.Compose([transforms.ToTensor()])
train_data = datasets.MNIST('./data', train=True, download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(train_data, batch_size=64, shuffle=True)
# Train
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
for epoch in range(3):
total_loss = 0
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item()
if batch_idx % 100 == 0:
print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}")
print(f"Epoch {epoch} complete. Avg loss: {total_loss/len(train_loader):.4f}")
print("Training complete!")
torch.save(model.state_dict(), "model.pt")
EOF
"""
# Run the job
print("π Starting training job...")
result = runner.run(
job_name="mnist-training",
script=training_script,
filter=filter,
image="pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime",
timeout_minutes=30
)
# Print results
print(f"\n{'='*50}")
print(f"π Job Results")
print(f"{'='*50}")
print(f"β Success: {result.success}")
print(f"β±οΈ Duration: {result.duration_minutes:.1f} minutes")
print(f"π° Cost: ${result.cost_usd:.4f}")
# Print cost report
tracker.print_report(30)
# Compare to cloud
print(f"\nπ‘ Cost Comparison:")
print(f" Clore.ai: ${result.cost_usd:.4f}")
print(f" AWS p4d.24xlarge: ~${result.duration_minutes/60 * 32:.2f}")
print(f" Savings: ~{(1 - result.cost_usd/(result.duration_minutes/60 * 32))*100:.0f}%")
if __name__ == "__main__":
main()