# Automating GPU Rental with Python

## Automating GPU Rental with Python Scripts

> 💡 **New: Official Python SDK**
>
> This guide uses raw `requests` for learning purposes. For production, use the official SDK: `pip install clore-ai` — built-in rate limiter, async support, type hints.
>
> → [SDK Quick Start](https://docs.clore.ai/dev/getting-started/python-sdk-quickstart)

### What We're Building

A robust Python automation framework for Clore.ai that handles server discovery, order management, automatic retries, and cost tracking — the foundation for all your GPU automation needs.

### Prerequisites

* Clore.ai API key
* Python 3.10+
* `requests`, `tenacity` libraries

```bash
pip install requests tenacity
```

### Step 1: Set Up the Clore Client

> 📦 **Using the standard Clore API client.** See [Clore API Client Reference](https://docs.clore.ai/dev/reference/clore-client) for the full implementation and setup instructions. Save it as `clore_client.py` in your project.

```python
from clore_client import CloreClient

client = CloreClient(api_key="your-api-key")
```

## clore\_automation/client.py

"""Production-ready Clore.ai API client with retry logic."""

import requests import time import logging from typing import Dict, List, Optional, Any from dataclasses import dataclass from tenacity import retry, stop\_after\_attempt, wait\_exponential, retry\_if\_exception\_type

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(**name**)

class CloreAPIError(Exception): """Clore API error with code.""" def **init**(self, message: str, code: int = -1, response: dict = None): self.code = code self.response = response or {} super().**init**(f"\[Code {code}] {message}")

class RateLimitError(CloreAPIError): """Rate limit exceeded.""" pass

class InsufficientBalanceError(CloreAPIError): """Not enough balance.""" pass

@dataclass class ServerFilter: """Filter criteria for server search.""" gpu\_types: List\[str] = None # \["RTX 4090", "A100"] min\_gpu\_count: int = 1 max\_price\_usd: float = 10.0 min\_reliability: float = 0.0 min\_vram\_gb: int = 0 currencies: List\[str] = None # \["CLORE-Blockchain", "bitcoin"]

```
def matches(self, server: dict) -> bool:
    """Check if server matches filter criteria."""
    # Check if rented
    if server.get("rented"):
        return False
    
    # Check GPU type
    if self.gpu_types:
        gpu_array = server.get("gpu_array", [])
        if not any(
            any(gpu_type in gpu for gpu_type in self.gpu_types)
            for gpu in gpu_array
        ):
            return False
    
    # Check GPU count
    gpu_count = len(server.get("gpu_array", []))
    if gpu_count < self.min_gpu_count:
        return False
    
    # Check price
    price = server.get("price", {}).get("usd", {}).get("on_demand_clore")
    if price and price > self.max_price_usd:
        return False
    
    # Check reliability
    reliability = server.get("reliability", 0)
    if reliability < self.min_reliability:
        return False
    
    # Check currencies
    if self.currencies:
        allowed = server.get("allowed_coins", [])
        if not any(c in allowed for c in self.currencies):
            return False
    
    return True
```

## === Usage Example ===

if **name** == "**main**": client = CloreClient("YOUR\_API\_KEY")

```
# Check balance
balance = client.get_balance()
print(f"CLORE Balance: {balance}")

# Find RTX 4090 under $0.50/hr
filter = ServerFilter(
    gpu_types=["RTX 4090"],
    max_price_usd=0.50,
    min_reliability=0.80
)

servers = client.find_servers(filter)
print(f"Found {len(servers)} matching servers")

if servers:
    cheapest = servers[0]
    print(f"Cheapest: Server {cheapest['id']} at ${cheapest['price']['usd']['on_demand_clore']:.2f}/hr")
```

````

## Step 2: Job Runner with Auto-Provisioning

```python
# clore_automation/job_runner.py
"""Run jobs on Clore with automatic GPU provisioning."""

import time
import subprocess
import tempfile
import os
from typing import Callable, Optional, Dict, Any
from dataclasses import dataclass
from client import CloreClient, ServerFilter, CloreAPIError

@dataclass
class JobResult:
    """Result of a job execution."""
    success: bool
    output: str
    duration_minutes: float
    cost_usd: float
    server_id: int
    order_id: int
    
class JobRunner:
    """Run jobs on Clore GPUs with automatic provisioning."""
    
    def __init__(self, client: CloreClient, ssh_key_path: str = "~/.ssh/id_rsa"):
        self.client = client
        self.ssh_key_path = os.path.expanduser(ssh_key_path)
        
        # Load SSH keys
        if os.path.exists(self.ssh_key_path + ".pub"):
            with open(self.ssh_key_path + ".pub") as f:
                self.ssh_public_key = f.read().strip()
        else:
            self.ssh_public_key = None
    
    def run_script(
        self,
        script: str,
        filter: ServerFilter,
        image: str = "nvidia/cuda:12.8.0-base-ubuntu22.04",
        timeout_minutes: int = 60,
        setup_commands: str = "",
    ) -> JobResult:
        """
        Run a script on a Clore GPU.
        
        Args:
            script: Bash script to run
            filter: Server filter criteria
            image: Docker image to use
            timeout_minutes: Maximum runtime
            setup_commands: Commands to run before main script
        
        Returns:
            JobResult with output and cost info
        """
        
        # Find server
        server = self.client.find_cheapest(filter)
        if not server:
            raise CloreAPIError("No matching servers available")
        
        server_id = server["id"]
        price_usd = server["price"]["usd"]["on_demand_clore"]
        
        print(f"🖥️  Using server {server_id} ({server['gpu_array']}) at ${price_usd:.2f}/hr")
        
        # Create order
        order = self.client.create_order(
            server_id=server_id,
            image=image,
            ssh_key=self.ssh_public_key,
            ssh_password="AutoJob123!" if not self.ssh_public_key else None,
            ports={"22": "tcp"},
        )
        order_id = order["order_id"]
        
        try:
            # Wait for ready
            print(f"⏳ Waiting for server to start...")
            active = self.client.wait_for_order(order_id, timeout=180)
            
            # Parse SSH info
            ssh_info = active["connection"]["ssh"]
            parts = ssh_info.split()
            host = parts[1].split("@")[1]
            port = parts[3] if len(parts) > 3 else "22"
            
            print(f"✅ Server ready: {ssh_info}")
            
            # Build full script
            full_script = f"""#!/bin/bash
set -e
{setup_commands}

# Main script
{script}
"""
            
            # Run script via SSH
            print(f"🚀 Running job...")
            start_time = time.time()
            
            result = self._run_ssh_script(host, port, full_script, timeout_minutes * 60)
            
            duration = (time.time() - start_time) / 60
            cost = price_usd * duration / 60
            
            return JobResult(
                success=result["success"],
                output=result["output"],
                duration_minutes=duration,
                cost_usd=cost,
                server_id=server_id,
                order_id=order_id
            )
            
        finally:
            # Always cleanup
            print(f"🧹 Cleaning up order {order_id}...")
            self.client.cancel_order(order_id)
    
    def _run_ssh_script(self, host: str, port: str, script: str, 
                        timeout: int) -> Dict[str, Any]:
        """Run a script over SSH."""
        
        # Write script to temp file
        with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f:
            f.write(script)
            script_path = f.name
        
        try:
            # Copy script
            scp_cmd = [
                "scp", "-o", "StrictHostKeyChecking=no", 
                "-P", port, script_path, f"root@{host}:/tmp/job.sh"
            ]
            subprocess.run(scp_cmd, check=True, timeout=30)
            
            # Run script
            ssh_cmd = [
                "ssh", "-o", "StrictHostKeyChecking=no",
                "-p", port, f"root@{host}",
                "chmod +x /tmp/job.sh && /tmp/job.sh"
            ]
            
            result = subprocess.run(
                ssh_cmd, 
                capture_output=True, 
                text=True, 
                timeout=timeout
            )
            
            return {
                "success": result.returncode == 0,
                "output": result.stdout + result.stderr
            }
            
        except subprocess.TimeoutExpired:
            return {"success": False, "output": "Timeout exceeded"}
        except Exception as e:
            return {"success": False, "output": str(e)}
        finally:
            os.unlink(script_path)
    
    def run_python(
        self,
        python_code: str,
        filter: ServerFilter,
        requirements: list = None,
        **kwargs
    ) -> JobResult:
        """Run Python code on a GPU."""
        
        setup = ""
        if requirements:
            setup = f"pip install {' '.join(requirements)}\n"
        
        script = f"""
{setup}
python3 << 'PYTHON_EOF'
{python_code}
PYTHON_EOF
"""
        return self.run_script(script, filter, **kwargs)


# === Usage Example ===

if __name__ == "__main__":
    client = CloreClient("YOUR_API_KEY")
    runner = JobRunner(client)
    
    # Run a simple GPU test
    filter = ServerFilter(
        gpu_types=["RTX"],
        max_price_usd=0.50
    )
    
    result = runner.run_script(
        script="nvidia-smi && echo 'GPU test passed!'",
        filter=filter,
        timeout_minutes=5
    )
    
    print(f"\n{'='*50}")
    print(f"✅ Success: {result.success}")
    print(f"⏱️  Duration: {result.duration_minutes:.1f} minutes")
    print(f"💰 Cost: ${result.cost_usd:.4f}")
    print(f"\n📋 Output:\n{result.output}")
````

### Step 3: Cost Tracker

```python
# clore_automation/cost_tracker.py
"""Track and analyze Clore.ai spending."""

import json
import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from client import CloreClient

@dataclass
class CostEntry:
    """Single cost entry."""
    timestamp: str
    order_id: int
    server_id: int
    duration_minutes: float
    cost_usd: float
    gpu_type: str
    job_name: str = ""

class CostTracker:
    """Track GPU rental costs."""
    
    def __init__(self, data_file: str = "clore_costs.json"):
        self.data_file = data_file
        self.entries: List[CostEntry] = []
        self._load()
    
    def _load(self):
        """Load entries from file."""
        if os.path.exists(self.data_file):
            with open(self.data_file) as f:
                data = json.load(f)
                self.entries = [CostEntry(**e) for e in data]
    
    def _save(self):
        """Save entries to file."""
        with open(self.data_file, "w") as f:
            json.dump([asdict(e) for e in self.entries], f, indent=2)
    
    def add(self, order_id: int, server_id: int, duration_minutes: float,
            cost_usd: float, gpu_type: str, job_name: str = ""):
        """Add a cost entry."""
        entry = CostEntry(
            timestamp=datetime.now().isoformat(),
            order_id=order_id,
            server_id=server_id,
            duration_minutes=duration_minutes,
            cost_usd=cost_usd,
            gpu_type=gpu_type,
            job_name=job_name
        )
        self.entries.append(entry)
        self._save()
    
    def get_total(self, days: int = None) -> float:
        """Get total cost, optionally filtered by days."""
        if days:
            cutoff = datetime.now() - timedelta(days=days)
            entries = [e for e in self.entries 
                      if datetime.fromisoformat(e.timestamp) > cutoff]
        else:
            entries = self.entries
        return sum(e.cost_usd for e in entries)
    
    def get_by_gpu(self, days: int = None) -> Dict[str, float]:
        """Get costs grouped by GPU type."""
        if days:
            cutoff = datetime.now() - timedelta(days=days)
            entries = [e for e in self.entries 
                      if datetime.fromisoformat(e.timestamp) > cutoff]
        else:
            entries = self.entries
        
        by_gpu = {}
        for e in entries:
            by_gpu[e.gpu_type] = by_gpu.get(e.gpu_type, 0) + e.cost_usd
        return by_gpu
    
    def get_daily_summary(self, days: int = 7) -> List[Dict]:
        """Get daily cost summary."""
        cutoff = datetime.now() - timedelta(days=days)
        
        daily = {}
        for e in self.entries:
            ts = datetime.fromisoformat(e.timestamp)
            if ts > cutoff:
                day = ts.strftime("%Y-%m-%d")
                if day not in daily:
                    daily[day] = {"date": day, "cost": 0, "minutes": 0, "jobs": 0}
                daily[day]["cost"] += e.cost_usd
                daily[day]["minutes"] += e.duration_minutes
                daily[day]["jobs"] += 1
        
        return sorted(daily.values(), key=lambda x: x["date"])
    
    def print_report(self, days: int = 30):
        """Print cost report."""
        print(f"\n{'='*50}")
        print(f"📊 Clore.ai Cost Report (Last {days} days)")
        print(f"{'='*50}")
        
        total = self.get_total(days)
        by_gpu = self.get_by_gpu(days)
        daily = self.get_daily_summary(days)
        
        print(f"\n💰 Total Spent: ${total:.2f}")
        
        print(f"\n📦 By GPU Type:")
        for gpu, cost in sorted(by_gpu.items(), key=lambda x: -x[1]):
            print(f"   {gpu}: ${cost:.2f}")
        
        print(f"\n📅 Daily Breakdown:")
        for day in daily[-7:]:  # Last 7 days
            print(f"   {day['date']}: ${day['cost']:.2f} ({day['jobs']} jobs, {day['minutes']:.0f} min)")
        
        # Calculate averages
        if daily:
            avg_daily = total / len(daily)
            print(f"\n📈 Average: ${avg_daily:.2f}/day")
            print(f"   Projected Monthly: ${avg_daily * 30:.2f}")


# === Integration with JobRunner ===

class TrackedJobRunner:
    """Job runner with cost tracking."""
    
    def __init__(self, client: CloreClient, tracker: CostTracker = None):
        from job_runner import JobRunner
        self.runner = JobRunner(client)
        self.tracker = tracker or CostTracker()
    
    def run(self, job_name: str, script: str, filter, **kwargs):
        """Run job and track costs."""
        result = self.runner.run_script(script, filter, **kwargs)
        
        # Track cost
        self.tracker.add(
            order_id=result.order_id,
            server_id=result.server_id,
            duration_minutes=result.duration_minutes,
            cost_usd=result.cost_usd,
            gpu_type=str(filter.gpu_types),
            job_name=job_name
        )
        
        return result


if __name__ == "__main__":
    tracker = CostTracker()
    tracker.print_report(30)
```

### Full Example: Automated Training Job

```python
#!/usr/bin/env python3
"""
Complete example: Run a PyTorch training job with cost tracking.
"""

from clore_automation.client import CloreClient, ServerFilter
from clore_automation.job_runner import JobRunner
from clore_automation.cost_tracker import CostTracker, TrackedJobRunner

def main():
    # Initialize
    client = CloreClient("YOUR_API_KEY")
    tracker = CostTracker("training_costs.json")
    runner = TrackedJobRunner(client, tracker)
    
    # Define server requirements
    filter = ServerFilter(
        gpu_types=["RTX 4090", "RTX 3090"],
        min_gpu_count=1,
        max_price_usd=0.50,
        min_reliability=0.80
    )
    
    # Training script
    training_script = """
# Install dependencies
pip install torch torchvision wandb

# Training code
python3 << 'EOF'
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms

# Check GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
print(f"GPU: {torch.cuda.get_device_name(0)}")

# Simple model
model = nn.Sequential(
    nn.Flatten(),
    nn.Linear(784, 256),
    nn.ReLU(),
    nn.Linear(256, 10)
).to(device)

# Data
transform = transforms.Compose([transforms.ToTensor()])
train_data = datasets.MNIST('./data', train=True, download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(train_data, batch_size=64, shuffle=True)

# Train
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

for epoch in range(3):
    total_loss = 0
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        
        if batch_idx % 100 == 0:
            print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}")
    
    print(f"Epoch {epoch} complete. Avg loss: {total_loss/len(train_loader):.4f}")

print("Training complete!")
torch.save(model.state_dict(), "model.pt")
EOF
"""
    
    # Run the job
    print("🚀 Starting training job...")
    result = runner.run(
        job_name="mnist-training",
        script=training_script,
        filter=filter,
        image="pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime",
        timeout_minutes=30
    )
    
    # Print results
    print(f"\n{'='*50}")
    print(f"📊 Job Results")
    print(f"{'='*50}")
    print(f"✅ Success: {result.success}")
    print(f"⏱️  Duration: {result.duration_minutes:.1f} minutes")
    print(f"💰 Cost: ${result.cost_usd:.4f}")
    
    # Print cost report
    tracker.print_report(30)
    
    # Compare to cloud
    print(f"\n💡 Cost Comparison:")
    print(f"   Clore.ai: ${result.cost_usd:.4f}")
    print(f"   AWS p4d.24xlarge: ~${result.duration_minutes/60 * 32:.2f}")
    print(f"   Savings: ~{(1 - result.cost_usd/(result.duration_minutes/60 * 32))*100:.0f}%")


if __name__ == "__main__":
    main()
```

### Best Practices

#### 1. Always Use Retry Logic

```python
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
def safe_api_call():
    return client._request("GET", "/v1/marketplace")
```

#### 2. Implement Proper Cleanup

```python
import signal
import atexit

def cleanup():
    for order_id in active_orders:
        client.cancel_order(order_id)

atexit.register(cleanup)
signal.signal(signal.SIGTERM, lambda *args: cleanup())
```

#### 3. Track All Costs

```python
# Always track costs, even for failed jobs
try:
    result = runner.run(...)
finally:
    tracker.add(...)
```

### Next Steps

* [Understanding Spot vs On-Demand](https://docs.clore.ai/dev/getting-started/spot-vs-ondemand)
* [Building a GPU Job Scheduler](https://docs.clore.ai/dev/devops-and-automation/job-scheduler)
* [Cost Optimization Strategies](https://docs.clore.ai/dev/devops-and-automation/cost-optimization)
