Copy #!/usr/bin/env python3
"""
Prometheus Exporter for Clore.ai Metrics
Exposes metrics about GPU marketplace, orders, and costs.
"""
import time
import requests
import threading
from flask import Flask, Response
from prometheus_client import (
Counter, Gauge, Histogram, Info,
generate_latest, CONTENT_TYPE_LATEST,
CollectorRegistry
)
# Create custom registry
registry = CollectorRegistry()
# --- Marketplace Metrics ---
gpu_available = Gauge(
'clore_gpu_available_total',
'Number of available GPUs by type',
['gpu_type'],
registry=registry
)
gpu_price_spot = Gauge(
'clore_gpu_price_spot_usd',
'Minimum spot price per GPU type in USD',
['gpu_type'],
registry=registry
)
gpu_price_ondemand = Gauge(
'clore_gpu_price_ondemand_usd',
'Minimum on-demand price per GPU type in USD',
['gpu_type'],
registry=registry
)
marketplace_total_servers = Gauge(
'clore_marketplace_servers_total',
'Total number of servers in marketplace',
registry=registry
)
marketplace_available_servers = Gauge(
'clore_marketplace_servers_available',
'Number of available servers',
registry=registry
)
# --- Order Metrics ---
active_orders = Gauge(
'clore_orders_active_total',
'Number of active orders',
registry=registry
)
orders_by_status = Gauge(
'clore_orders_by_status',
'Orders by status',
['status'],
registry=registry
)
order_hourly_cost = Gauge(
'clore_order_hourly_cost_usd',
'Hourly cost of order in USD',
['order_id', 'gpu_type'],
registry=registry
)
order_runtime_seconds = Gauge(
'clore_order_runtime_seconds',
'Runtime of order in seconds',
['order_id'],
registry=registry
)
# --- Cost Metrics ---
total_daily_cost = Gauge(
'clore_daily_cost_usd',
'Estimated daily cost of all active orders',
registry=registry
)
# --- Wallet Metrics ---
wallet_balance = Gauge(
'clore_wallet_balance',
'Wallet balance by currency',
['currency'],
registry=registry
)
# --- Scrape Metrics ---
scrape_duration = Histogram(
'clore_scrape_duration_seconds',
'Time spent scraping Clore.ai API',
registry=registry
)
scrape_errors = Counter(
'clore_scrape_errors_total',
'Number of scrape errors',
registry=registry
)
class CloreExporter:
"""Prometheus exporter for Clore.ai metrics."""
BASE_URL = "https://api.clore.ai"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {"auth": api_key}
def _request(self, endpoint: str):
"""Make API request."""
response = requests.get(
f"{self.BASE_URL}{endpoint}",
headers=self.headers,
timeout=30
)
data = response.json()
if data.get("code") != 0:
raise Exception(f"API Error: {data}")
return data
def _normalize_gpu(self, name: str) -> str:
"""Normalize GPU name for consistent labels."""
patterns = [
("RTX_4090", ["4090"]),
("RTX_4080", ["4080"]),
("RTX_3090", ["3090"]),
("RTX_3080", ["3080"]),
("RTX_3070", ["3070"]),
("A100", ["a100"]),
("A6000", ["a6000"]),
("A5000", ["a5000"]),
]
name_lower = name.lower()
for normalized, matches in patterns:
if any(m in name_lower for m in matches):
return normalized
return name.replace(" ", "_")
def collect_marketplace_metrics(self):
"""Collect marketplace metrics."""
data = self._request("/v1/marketplace")
servers = data.get("servers", [])
# Track by GPU type
gpu_data = {}
total_servers = len(servers)
available_servers = 0
for server in servers:
is_available = not server.get("rented", True)
if is_available:
available_servers += 1
gpu_array = server.get("gpu_array", [])
for gpu in gpu_array:
gpu_type = self._normalize_gpu(gpu)
if gpu_type not in gpu_data:
gpu_data[gpu_type] = {
"available": 0,
"spot_min": float('inf'),
"ondemand_min": float('inf')
}
if is_available:
gpu_data[gpu_type]["available"] += 1
usd = server.get("price", {}).get("usd", {})
spot = usd.get("spot")
ondemand = usd.get("on_demand_clore")
if spot:
gpu_data[gpu_type]["spot_min"] = min(
gpu_data[gpu_type]["spot_min"], spot
)
if ondemand:
gpu_data[gpu_type]["ondemand_min"] = min(
gpu_data[gpu_type]["ondemand_min"], ondemand
)
# Set metrics
marketplace_total_servers.set(total_servers)
marketplace_available_servers.set(available_servers)
for gpu_type, data in gpu_data.items():
gpu_available.labels(gpu_type=gpu_type).set(data["available"])
if data["spot_min"] != float('inf'):
gpu_price_spot.labels(gpu_type=gpu_type).set(data["spot_min"])
if data["ondemand_min"] != float('inf'):
gpu_price_ondemand.labels(gpu_type=gpu_type).set(data["ondemand_min"])
def collect_order_metrics(self):
"""Collect order metrics."""
data = self._request("/v1/my_orders")
orders = data.get("orders", [])
# Count by status
status_counts = {}
total_hourly_cost = 0
for order in orders:
status = order.get("status", "unknown")
status_counts[status] = status_counts.get(status, 0) + 1
order_id = str(order.get("order_id", ""))
# Get GPU type from order
gpu_type = "unknown"
if order.get("gpu_array"):
gpu_type = self._normalize_gpu(order["gpu_array"][0])
# Calculate hourly cost (price is per minute)
price_per_minute = order.get("price", 0)
hourly = price_per_minute * 60
if status == "running":
total_hourly_cost += hourly
order_hourly_cost.labels(
order_id=order_id,
gpu_type=gpu_type
).set(hourly)
# Runtime
started = order.get("started", 0)
if started:
runtime = time.time() - started
order_runtime_seconds.labels(order_id=order_id).set(runtime)
# Set status metrics
for status, count in status_counts.items():
orders_by_status.labels(status=status).set(count)
active_orders.set(status_counts.get("running", 0))
total_daily_cost.set(total_hourly_cost * 24)
def collect_wallet_metrics(self):
"""Collect wallet balance metrics."""
data = self._request("/v1/wallets")
wallets = data.get("wallets", [])
for wallet in wallets:
currency = wallet.get("name", "unknown")
balance = wallet.get("balance", 0)
wallet_balance.labels(currency=currency).set(balance)
def collect(self):
"""Collect all metrics."""
start = time.time()
try:
self.collect_marketplace_metrics()
self.collect_order_metrics()
self.collect_wallet_metrics()
except Exception as e:
scrape_errors.inc()
print(f"Error collecting metrics: {e}")
duration = time.time() - start
scrape_duration.observe(duration)
# Flask app for metrics endpoint
app = Flask(__name__)
exporter = None
@app.route('/metrics')
def metrics():
"""Prometheus metrics endpoint."""
if exporter:
exporter.collect()
return Response(
generate_latest(registry),
mimetype=CONTENT_TYPE_LATEST
)
@app.route('/health')
def health():
return 'OK'
def start_exporter(api_key: str, port: int = 9090):
"""Start the exporter."""
global exporter
exporter = CloreExporter(api_key)
app.run(host='0.0.0.0', port=port)
if __name__ == '__main__':
import os
api_key = os.environ.get('CLORE_API_KEY')
if not api_key:
print("Set CLORE_API_KEY environment variable")
exit(1)
start_exporter(api_key, port=9090)