The CloreClient is the standard Python API client used throughout the Developer Cookbook. Every recipe that interacts with the Clore.ai API uses this class — keep one canonical copy in your project instead of duplicating it.
Installation
pipinstallrequests
Optional extras — install these if you need SSH/SCP helpers or retry logic:
pipinstallparamikoscptenacity
Quick Setup
Save the class below as clore_client.py in your project root, then import it wherever you need it:
from clore_client import CloreClientclient =CloreClient(api_key="your-api-key")
Your API key is available at clore.ai/profile. You can also load it from an environment variable:
Authentication: All endpoints require the auth: <your-api-key> HTTP header.
Rate limit: ~1 request/second. The client handles this automatically.
Base URL:https://api.clore.ai
Error Codes
Code
Meaning
0
Success
1
Database error
2
Invalid input
3
Invalid or missing API key
4
Invalid endpoint
5
Rate limit exceeded
6
See error field in response
Examples
Check balance
Find and rent the cheapest RTX 4090
SSH into the server and run a command
Spot order
Migration to Official SDK
The official clore-ai SDK is now available. It provides:
Built-in rate limiting (1 req/sec) with exponential backoff
Automatic retries on transient and rate-limit errors
import time
import logging
import requests
from typing import Dict, Any, List, Optional
logger = logging.getLogger(__name__)
class CloreClient:
"""
Standard Clore.ai API client.
Used throughout the Developer Cookbook — save this as `clore_client.py`
and import it in your project.
Args:
api_key: Your Clore.ai API key (https://clore.ai/profile)
timeout: HTTP request timeout in seconds (default: 30)
rate_limit: Minimum seconds between requests (default: 1.1)
max_retries: Retry attempts on transient errors (default: 3)
"""
BASE_URL = "https://api.clore.ai"
# Human-readable error code table
ERROR_CODES = {
1: "Database error",
2: "Invalid input",
3: "Invalid or missing API key",
4: "Invalid endpoint",
5: "Rate limit exceeded",
6: "See error field in response",
}
def __init__(
self,
api_key: str,
timeout: int = 30,
rate_limit: float = 1.1,
max_retries: int = 3,
):
self.api_key = api_key
self.headers = {"auth": api_key}
self.timeout = timeout
self.max_retries = max_retries
self._rate_limit_delay = rate_limit
self._last_request_time: float = 0.0
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _rate_limit(self) -> None:
"""Sleep if needed to stay within the API rate limit."""
elapsed = time.time() - self._last_request_time
if elapsed < self._rate_limit_delay:
time.sleep(self._rate_limit_delay - elapsed)
self._last_request_time = time.time()
def _request(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
json: Optional[Dict] = None,
**kwargs,
) -> Dict[str, Any]:
"""
Make a rate-limited API request with basic retry logic.
Raises:
Exception: on non-zero API error codes or network failures.
"""
self._rate_limit()
url = f"{self.BASE_URL}{endpoint}"
last_exc: Optional[Exception] = None
for attempt in range(1, self.max_retries + 1):
try:
response = requests.request(
method,
url,
headers=self.headers,
params=params,
json=json,
timeout=self.timeout,
**kwargs,
)
data: Dict[str, Any] = response.json()
except requests.exceptions.RequestException as exc:
last_exc = exc
logger.warning(f"Request failed (attempt {attempt}): {exc}")
time.sleep(2 ** attempt)
continue
code = data.get("code", -1)
if code == 0:
return data
if code == 5:
# Rate-limited — back off and retry
wait = 2 ** attempt
logger.warning(f"Rate limited, retrying in {wait}s (attempt {attempt})")
time.sleep(wait)
continue
error_msg = data.get(
"error", self.ERROR_CODES.get(code, f"Unknown error (code {code})")
)
raise Exception(f"Clore API Error {code}: {error_msg} | response={data}")
raise Exception(
f"Clore API request failed after {self.max_retries} retries"
+ (f": {last_exc}" if last_exc else "")
)
# ------------------------------------------------------------------
# Wallet / balance
# ------------------------------------------------------------------
def get_wallets(self) -> List[Dict]:
"""
Return all wallet balances.
Example response item::
{"name": "CLORE-Blockchain", "balance": 123.45, "deposit_address": "..."}
"""
return self._request("GET", "/v1/wallets")["wallets"]
def get_balance(self, currency: str = "CLORE-Blockchain") -> float:
"""
Return the balance for a specific currency.
Args:
currency: Currency name, e.g. ``"CLORE-Blockchain"`` or ``"BTC"``.
Returns:
Balance as a float, or ``0.0`` if the wallet is not found.
"""
wallets = self.get_wallets()
for wallet in wallets:
if currency in wallet.get("name", ""):
return float(wallet.get("balance", 0.0))
return 0.0
# ------------------------------------------------------------------
# Marketplace
# ------------------------------------------------------------------
def get_marketplace(self, include_rented: bool = False) -> List[Dict]:
"""
Return all servers listed on the marketplace.
Args:
include_rented: If ``False`` (default), filters out already-rented servers.
Key fields per server:
- ``id`` — server ID used when creating orders
- ``gpu_array`` — list of GPU model strings, e.g. ``["RTX 4090"]``
- ``price.usd.on_demand_clore`` — on-demand price in USD/hr
- ``price.usd.spot`` — spot price in USD/hr (``None`` if no spot market)
- ``reliability`` — uptime reliability score (0–100)
- ``rented`` — ``True`` if currently occupied
- ``specs`` — CPU, RAM, storage details
- ``connection`` — SSH / HTTP connection info (available after order is active)
"""
servers = self._request("GET", "/v1/marketplace")["servers"]
if not include_rented:
servers = [s for s in servers if not s.get("rented")]
return servers
def find_servers(
self,
gpu_type: Optional[str] = None,
max_price_usd: Optional[float] = None,
min_reliability: float = 0.0,
use_spot: bool = False,
) -> List[Dict]:
"""
Find available servers matching the given criteria, sorted by price.
Args:
gpu_type: Substring to match against ``gpu_array`` entries,
e.g. ``"RTX 4090"``, ``"A100"``, ``"RTX"``.
max_price_usd: Maximum price per hour in USD.
min_reliability: Minimum reliability score (0–100).
use_spot: If ``True``, filter and sort by spot price instead
of on-demand price.
Returns:
Filtered and price-sorted list of server dicts.
"""
price_key = "spot" if use_spot else "on_demand_clore"
servers = self.get_marketplace(include_rented=False)
if gpu_type:
servers = [
s for s in servers
if any(gpu_type.lower() in g.lower() for g in s.get("gpu_array", []))
]
if max_price_usd is not None:
servers = [
s for s in servers
if (s.get("price", {}).get("usd", {}).get(price_key) or float("inf"))
<= max_price_usd
]
if min_reliability > 0:
servers = [s for s in servers if s.get("reliability", 0) >= min_reliability]
servers.sort(
key=lambda s: s.get("price", {}).get("usd", {}).get(price_key) or float("inf")
)
return servers
def find_cheapest(
self,
gpu_type: Optional[str] = None,
max_price_usd: Optional[float] = None,
min_reliability: float = 0.0,
use_spot: bool = False,
) -> Optional[Dict]:
"""
Return the single cheapest server matching criteria, or ``None`` if none found.
Convenience wrapper around :meth:`find_servers`.
"""
servers = self.find_servers(
gpu_type=gpu_type,
max_price_usd=max_price_usd,
min_reliability=min_reliability,
use_spot=use_spot,
)
return servers[0] if servers else None
# ------------------------------------------------------------------
# Orders
# ------------------------------------------------------------------
def get_orders(self, include_completed: bool = False) -> List[Dict]:
"""
Return all orders (active by default).
Args:
include_completed: If ``True``, also return completed/cancelled orders.
Key fields per order:
- ``order_id`` — unique order identifier
- ``renting_server`` — server ID
- ``status`` — ``"creating"``, ``"running"``, ``"cancelled"``, ``"expired"``
- ``connection.ssh`` — SSH connection string, e.g. ``"ssh root@host -p port"``
- ``connection.http_ports`` — dict of mapped HTTP ports
- ``price`` — current cost per hour
- ``started`` — Unix timestamp when the order became active
"""
params = {"return_completed": "true"} if include_completed else None
return self._request("GET", "/v1/my_orders", params=params)["orders"]
def get_order(self, order_id: int) -> Optional[Dict]:
"""
Return a single order by ID, or ``None`` if not found.
"""
orders = self.get_orders(include_completed=True)
return next((o for o in orders if o.get("order_id") == order_id), None)
def create_order(
self,
server_id: int,
image: str = "nvidia/cuda:12.1.0-base-ubuntu22.04",
order_type: str = "on-demand",
currency: str = "CLORE-Blockchain",
ports: Optional[Dict[str, str]] = None,
env: Optional[Dict[str, str]] = None,
ssh_password: Optional[str] = None,
ssh_key: Optional[str] = None,
spot_price: Optional[float] = None,
jupyter_token: Optional[str] = None,
) -> Dict:
"""
Create a GPU rental order.
Args:
server_id: ID of the server to rent (from :meth:`get_marketplace`).
image: Docker image to run.
order_type: ``"on-demand"`` (default) or ``"spot"``.
currency: Payment currency — ``"CLORE-Blockchain"`` (default) or ``"BTC"``.
ports: Port mapping dict, e.g. ``{"22": "tcp", "8888": "http"}``.
Defaults to ``{"22": "tcp"}``.
env: Environment variables dict.
Defaults to ``{"NVIDIA_VISIBLE_DEVICES": "all"}``.
ssh_password: Password for SSH access. Mutually exclusive with ``ssh_key``.
ssh_key: Public SSH key for key-based auth. Mutually exclusive with
``ssh_password``.
spot_price: Required when ``order_type="spot"``. Your bid price in USD/hr.
jupyter_token: Optional Jupyter notebook token.
Returns:
Dict with ``order_id`` and status information.
"""
import secrets as _secrets
data: Dict[str, Any] = {
"renting_server": server_id,
"type": order_type,
"currency": currency,
"image": image,
"ports": ports or {"22": "tcp"},
"env": env or {"NVIDIA_VISIBLE_DEVICES": "all"},
}
if ssh_key:
data["ssh_key"] = ssh_key
else:
data["ssh_password"] = ssh_password or _secrets.token_urlsafe(16)
if jupyter_token:
data["jupyter_token"] = jupyter_token
if order_type == "spot" and spot_price is not None:
data["spotprice"] = spot_price
result = self._request("POST", "/v1/create_order", json=data)
logger.info(f"Created order {result.get('order_id')} on server {server_id}")
return result
def cancel_order(self, order_id: int, issue: Optional[str] = None) -> bool:
"""
Cancel a single order.
Args:
order_id: ID of the order to cancel.
issue: Optional reason string (max 2048 chars).
Returns:
``True`` on success.
"""
payload: Dict[str, Any] = {"id": order_id}
if issue:
payload["issue"] = issue[:2048]
self._request("POST", "/v1/cancel_order", json=payload)
logger.info(f"Cancelled order {order_id}")
return True
def cancel_orders(self, order_ids: List[int]) -> Dict:
"""Cancel multiple orders in a single API call."""
return self._request("POST", "/v1/cancel_orders", json={"order_ids": order_ids})
def wait_for_order(
self,
order_id: int,
timeout: int = 300,
poll_interval: float = 5.0,
target_status: str = "running",
) -> Dict:
"""
Poll until the order reaches ``target_status`` (default: ``"running"``).
Args:
order_id: Order to watch.
timeout: Maximum seconds to wait (default: 300).
poll_interval: Seconds between polls (default: 5).
target_status: Status string to wait for.
Returns:
The order dict once it reaches the target status.
Raises:
TimeoutError: If the order does not reach the target status in time.
Exception: If the order ends in ``"cancelled"`` or ``"expired"``.
"""
deadline = time.time() + timeout
while time.time() < deadline:
order = self.get_order(order_id)
if order:
status = order.get("status", "")
if status == target_status:
return order
if status in ("cancelled", "expired"):
raise Exception(
f"Order {order_id} ended unexpectedly with status: {status}"
)
time.sleep(poll_interval)
raise TimeoutError(
f"Order {order_id} did not reach status '{target_status}' within {timeout}s"
)
# ------------------------------------------------------------------
# SSH connection helpers
# ------------------------------------------------------------------
@staticmethod
def parse_ssh_connection(order: Dict) -> Dict[str, Any]:
"""
Parse SSH connection info from a running order dict.
Returns a dict with keys:
- ``host`` — SSH host
- ``port`` — SSH port (int)
- ``user`` — login user (usually ``"root"``)
- ``raw`` — the original connection string
Example::
info = CloreClient.parse_ssh_connection(order)
# ssh [email protected] -p 12345
ssh_cmd = f"ssh {info['user']}@{info['host']} -p {info['port']}"
"""
ssh_string: str = order.get("connection", {}).get("ssh", "")
result: Dict[str, Any] = {
"host": "",
"port": 22,
"user": "root",
"raw": ssh_string,
}
try:
# Format: "ssh root@host -p port"
parts = ssh_string.split()
user_host = parts[1]
result["user"] = user_host.split("@")[0]
result["host"] = user_host.split("@")[1]
if "-p" in parts:
result["port"] = int(parts[parts.index("-p") + 1])
except (IndexError, ValueError):
pass
return result
def connect_ssh(
self,
order: Dict,
password: Optional[str] = None,
key_path: Optional[str] = None,
):
"""
Open a Paramiko SSH connection to a running order.
Requires ``paramiko``::
pip install paramiko
Args:
order: A running order dict (from :meth:`get_order` or
:meth:`wait_for_order`).
password: SSH password (if you used ``ssh_password`` in :meth:`create_order`).
key_path: Path to private key file (if you used ``ssh_key``).
Returns:
A connected ``paramiko.SSHClient`` instance.
Example::
ssh = client.connect_ssh(order, password="my-password")
stdin, stdout, stderr = ssh.exec_command("nvidia-smi")
print(stdout.read().decode())
ssh.close()
"""
import paramiko # type: ignore
info = self.parse_ssh_connection(order)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
connect_kwargs: Dict[str, Any] = {
"hostname": info["host"],
"port": info["port"],
"username": info["user"],
"timeout": 30,
}
if password:
connect_kwargs["password"] = password
elif key_path:
connect_kwargs["key_filename"] = key_path
ssh.connect(**connect_kwargs)
return ssh
# ------------------------------------------------------------------
# Spot market
# ------------------------------------------------------------------
def get_spot_market(self, server_id: int) -> Dict:
"""Return spot market data for a specific server."""
return self._request(
"GET", "/v1/spot_marketplace", params={"market": server_id}
)
def set_spot_price(self, order_id: int, new_price: float) -> bool:
"""
Update the bid price for a running spot order.
Args:
order_id: ID of the active spot order.
new_price: New bid price in USD/hr.
Returns:
``True`` on success.
"""
data = self._request(
"POST",
"/v1/set_spot_price",
json={"order_id": order_id, "desired_price": new_price},
)
return data.get("error") is None
# ------------------------------------------------------------------
# Context manager support
# ------------------------------------------------------------------
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass # requests.Session is created per-call; nothing to close
import os
from clore_client import CloreClient
client = CloreClient(api_key=os.environ["CLORE_API_KEY"])
server = client.find_cheapest(
gpu_type="RTX 4090",
max_price_usd=0.80,
min_reliability=85.0,
)
if not server:
print("No servers available matching criteria")
else:
print(f"Renting server {server['id']} at ${server['price']['usd']['on_demand_clore']:.2f}/hr")
order = client.create_order(
server_id=server["id"],
image="nvidia/cuda:12.1.0-base-ubuntu22.04",
ssh_password="MySecurePass123!",
)
# Wait until the server is ready
running = client.wait_for_order(order["order_id"], timeout=300)
conn = CloreClient.parse_ssh_connection(running)
print(f"Ready! SSH: ssh {conn['user']}@{conn['host']} -p {conn['port']}")
ssh = client.connect_ssh(running, password="MySecurePass123!")
stdin, stdout, stderr = ssh.exec_command("nvidia-smi --query-gpu=name --format=csv,noheader")
print(stdout.read().decode().strip())
ssh.close()
# When done, cancel the order
client.cancel_order(order["order_id"])
order = client.create_order(
server_id=server["id"],
image="nvidia/cuda:12.1.0-base-ubuntu22.04",
order_type="spot",
spot_price=0.25,
ssh_password="MySecurePass123!",
)
from clore_client import CloreClient
client = CloreClient(api_key="...")
response = client.get_marketplace()
from clore_ai import CloreAI
client = CloreAI(api_key="...")
servers = client.marketplace() # Returns List[MarketplaceServer] with type hints