Copy #!/usr/bin/env python3
"""
Clore.ai Cost Optimizer
Analyzes marketplace data and provides recommendations for cost optimization.
Usage:
python cost_optimizer.py --api-key YOUR_API_KEY analyze
python cost_optimizer.py --api-key YOUR_API_KEY recommend --workload training --budget 10
python cost_optimizer.py --api-key YOUR_API_KEY best-time --gpu "RTX 4090"
"""
import argparse
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from collections import defaultdict
import requests
@dataclass
class GPUPricing:
"""Pricing data for a GPU type."""
gpu_type: str
spot_min: float
spot_avg: float
spot_max: float
ondemand_min: Optional[float]
available_count: int
total_count: int
@property
def spot_savings_percent(self) -> float:
if self.ondemand_min and self.ondemand_min > 0:
return (1 - self.spot_min / self.ondemand_min) * 100
return 0
@dataclass
class CostRecommendation:
"""Cost optimization recommendation."""
gpu_type: str
pricing_type: str # spot or on-demand
estimated_hourly: float
estimated_daily: float
reason: str
confidence: float # 0-1
class CostOptimizer:
"""Analyzes and optimizes Clore.ai GPU costs."""
BASE_URL = "https://api.clore.ai"
# Performance ratings (relative to RTX 4090 = 1.0)
GPU_PERFORMANCE = {
"RTX 4090": 1.0,
"RTX 4080": 0.75,
"RTX 4070 Ti": 0.60,
"RTX 3090": 0.70,
"RTX 3080": 0.55,
"RTX 3070": 0.40,
"RTX 3060": 0.30,
"A100": 1.2,
"A6000": 0.65,
"A5000": 0.50,
"V100": 0.45,
}
# VRAM by GPU type (GB)
GPU_VRAM = {
"RTX 4090": 24,
"RTX 4080": 16,
"RTX 4070 Ti": 12,
"RTX 3090": 24,
"RTX 3080": 10,
"RTX 3070": 8,
"RTX 3060": 12,
"A100": 80,
"A6000": 48,
"A5000": 24,
"V100": 16,
}
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {"auth": api_key}
self.price_history: Dict[str, List[Tuple[datetime, float]]] = defaultdict(list)
def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
"""Make API request."""
url = f"{self.BASE_URL}{endpoint}"
response = requests.request(method, url, headers=self.headers, timeout=30)
data = response.json()
if data.get("code") != 0:
raise Exception(f"API Error: {data}")
return data
def _normalize_gpu(self, name: str) -> str:
"""Normalize GPU name."""
patterns = [
("RTX 4090", ["4090"]),
("RTX 4080", ["4080"]),
("RTX 4070 Ti", ["4070 ti"]),
("RTX 3090", ["3090"]),
("RTX 3080", ["3080"]),
("RTX 3070", ["3070"]),
("RTX 3060", ["3060"]),
("A100", ["a100"]),
("A6000", ["a6000"]),
("A5000", ["a5000"]),
("V100", ["v100"]),
]
name_lower = name.lower()
for normalized, matches in patterns:
if any(m in name_lower for m in matches):
return normalized
return name
def get_current_prices(self) -> Dict[str, GPUPricing]:
"""Get current marketplace prices."""
data = self._api("GET", "/v1/marketplace")
servers = data.get("servers", [])
gpu_data = defaultdict(lambda: {
"spot_prices": [],
"ondemand_prices": [],
"available": 0,
"total": 0
})
for server in servers:
gpus = server.get("gpu_array", [])
if not gpus:
continue
gpu_type = self._normalize_gpu(gpus[0])
is_available = not server.get("rented", True)
gpu_data[gpu_type]["total"] += len(gpus)
if is_available:
gpu_data[gpu_type]["available"] += len(gpus)
usd = server.get("price", {}).get("usd", {})
spot = usd.get("spot")
ondemand = usd.get("on_demand_clore")
if spot:
gpu_data[gpu_type]["spot_prices"].append(spot)
if ondemand:
gpu_data[gpu_type]["ondemand_prices"].append(ondemand)
result = {}
for gpu_type, data in gpu_data.items():
if not data["spot_prices"]:
continue
result[gpu_type] = GPUPricing(
gpu_type=gpu_type,
spot_min=min(data["spot_prices"]),
spot_avg=sum(data["spot_prices"]) / len(data["spot_prices"]),
spot_max=max(data["spot_prices"]),
ondemand_min=min(data["ondemand_prices"]) if data["ondemand_prices"] else None,
available_count=data["available"],
total_count=data["total"]
)
return result
def calculate_value_score(self, pricing: GPUPricing) -> float:
"""Calculate value score (performance per dollar)."""
perf = self.GPU_PERFORMANCE.get(pricing.gpu_type, 0.5)
if pricing.spot_min > 0:
return perf / pricing.spot_min
return 0
def recommend_gpu(self,
workload: str,
min_vram: int = 8,
max_budget_hourly: float = None) -> List[CostRecommendation]:
"""Recommend GPUs based on workload and budget."""
prices = self.get_current_prices()
recommendations = []
for gpu_type, pricing in prices.items():
# Check VRAM
vram = self.GPU_VRAM.get(gpu_type, 8)
if vram < min_vram:
continue
# Check budget
if max_budget_hourly and pricing.spot_min > max_budget_hourly:
continue
# Check availability
if pricing.available_count == 0:
continue
# Calculate confidence based on availability
availability_ratio = pricing.available_count / max(pricing.total_count, 1)
confidence = min(0.9, availability_ratio + 0.3)
# Workload-specific recommendations
reason = self._get_workload_reason(gpu_type, workload, pricing)
recommendations.append(CostRecommendation(
gpu_type=gpu_type,
pricing_type="spot",
estimated_hourly=pricing.spot_min,
estimated_daily=pricing.spot_min * 24,
reason=reason,
confidence=confidence
))
# Sort by value (performance/cost)
recommendations.sort(
key=lambda r: self.GPU_PERFORMANCE.get(r.gpu_type, 0.5) / r.estimated_hourly,
reverse=True
)
return recommendations[:5] # Top 5
def _get_workload_reason(self, gpu_type: str, workload: str, pricing: GPUPricing) -> str:
"""Get recommendation reason based on workload."""
vram = self.GPU_VRAM.get(gpu_type, 8)
perf = self.GPU_PERFORMANCE.get(gpu_type, 0.5)
value = self.calculate_value_score(pricing)
if workload == "training":
if vram >= 24:
return f"Excellent for training ({vram}GB VRAM), value score: {value:.2f}"
return f"Good for smaller models ({vram}GB VRAM), value score: {value:.2f}"
elif workload == "inference":
if perf >= 0.6:
return f"Fast inference (perf: {perf:.1f}x), cost-effective"
return f"Budget inference option (perf: {perf:.1f}x)"
elif workload == "rendering":
if "RTX" in gpu_type:
return f"Ray tracing optimized, {vram}GB VRAM"
return f"Good compute power, {vram}GB VRAM"
else:
return f"Value score: {value:.2f}, {pricing.available_count} available"
def analyze_market(self) -> Dict:
"""Comprehensive market analysis."""
prices = self.get_current_prices()
analysis = {
"timestamp": datetime.now().isoformat(),
"summary": {
"total_gpu_types": len(prices),
"cheapest_gpu": None,
"best_value_gpu": None,
"most_available_gpu": None
},
"by_gpu": {},
"recommendations": []
}
best_value = 0
most_available = 0
cheapest_price = float('inf')
for gpu_type, pricing in prices.items():
value = self.calculate_value_score(pricing)
analysis["by_gpu"][gpu_type] = {
"spot_min": pricing.spot_min,
"spot_avg": pricing.spot_avg,
"ondemand_min": pricing.ondemand_min,
"available": pricing.available_count,
"total": pricing.total_count,
"spot_savings_percent": pricing.spot_savings_percent,
"value_score": value
}
if pricing.spot_min < cheapest_price:
cheapest_price = pricing.spot_min
analysis["summary"]["cheapest_gpu"] = gpu_type
if value > best_value:
best_value = value
analysis["summary"]["best_value_gpu"] = gpu_type
if pricing.available_count > most_available:
most_available = pricing.available_count
analysis["summary"]["most_available_gpu"] = gpu_type
# Add recommendations
analysis["recommendations"] = [
f"π° Cheapest option: {analysis['summary']['cheapest_gpu']} at ${cheapest_price:.3f}/hr",
f"π Best value: {analysis['summary']['best_value_gpu']} (performance/cost)",
f"β
Most available: {analysis['summary']['most_available_gpu']} ({most_available} GPUs)"
]
return analysis
def estimate_cost(self,
gpu_type: str,
hours: float,
use_spot: bool = True) -> Dict:
"""Estimate cost for a workload."""
prices = self.get_current_prices()
pricing = prices.get(gpu_type)
if not pricing:
return {"error": f"GPU type {gpu_type} not found"}
hourly = pricing.spot_min if use_spot else (pricing.ondemand_min or pricing.spot_min * 1.5)
total = hourly * hours
return {
"gpu_type": gpu_type,
"pricing_type": "spot" if use_spot else "on-demand",
"hourly_rate": hourly,
"hours": hours,
"total_cost": total,
"available_gpus": pricing.available_count
}
def find_best_time(self, gpu_type: str) -> Dict:
"""Analyze historical data to find best time to rent."""
# Note: This is a simplified version.
# In production, you'd track price history over time.
now = datetime.now()
# General patterns based on market observation
best_times = {
"day_of_week": {
"best": ["Saturday", "Sunday"],
"worst": ["Monday", "Tuesday"],
"savings_estimate": "10-20%"
},
"time_of_day": {
"best": "02:00-10:00 UTC",
"worst": "14:00-22:00 UTC",
"savings_estimate": "5-15%"
},
"current_status": {
"day": now.strftime("%A"),
"hour": now.hour,
"is_good_time": now.weekday() >= 5 or now.hour < 10 or now.hour > 22
}
}
return best_times
def budget_plan(self,
monthly_budget: float,
workload_hours_per_day: float,
days_per_month: int = 30) -> Dict:
"""Create a budget plan for the month."""
prices = self.get_current_prices()
total_hours = workload_hours_per_day * days_per_month
hourly_budget = monthly_budget / total_hours
plans = []
for gpu_type, pricing in prices.items():
if pricing.spot_min <= hourly_budget:
monthly_cost = pricing.spot_min * total_hours
within_budget = monthly_cost <= monthly_budget
plans.append({
"gpu_type": gpu_type,
"hourly_rate": pricing.spot_min,
"monthly_cost": monthly_cost,
"within_budget": within_budget,
"budget_utilization": (monthly_cost / monthly_budget) * 100,
"available": pricing.available_count
})
# Sort by value (cheaper first that are within budget)
plans.sort(key=lambda p: (not p["within_budget"], p["monthly_cost"]))
return {
"monthly_budget": monthly_budget,
"total_hours": total_hours,
"max_hourly_rate": hourly_budget,
"plans": plans[:5]
}
def main():
parser = argparse.ArgumentParser(description="Clore.ai Cost Optimizer")
parser.add_argument("--api-key", required=True)
parser.add_argument("action", choices=["analyze", "recommend", "estimate", "best-time", "budget"])
parser.add_argument("--workload", default="training", choices=["training", "inference", "rendering", "general"])
parser.add_argument("--gpu", default="RTX 4090")
parser.add_argument("--hours", type=float, default=24)
parser.add_argument("--budget", type=float, help="Hourly or monthly budget")
parser.add_argument("--min-vram", type=int, default=8)
args = parser.parse_args()
optimizer = CostOptimizer(args.api_key)
if args.action == "analyze":
analysis = optimizer.analyze_market()
print("\n" + "="*70)
print("π CLORE.AI MARKET ANALYSIS")
print("="*70)
print(f"\nTimestamp: {analysis['timestamp']}")
print(f"\nπ Summary:")
print(f" π° Cheapest GPU: {analysis['summary']['cheapest_gpu']}")
print(f" π Best Value: {analysis['summary']['best_value_gpu']}")
print(f" β
Most Available: {analysis['summary']['most_available_gpu']}")
print(f"\nπ΅ Current Prices (Spot):")
print("-"*70)
print(f"{'GPU Type':<15} {'Min':<10} {'Avg':<10} {'Avail':<8} {'Value':<8} {'Savings'}")
print("-"*70)
for gpu, data in sorted(analysis["by_gpu"].items(), key=lambda x: x[1]["spot_min"]):
savings = f"{data['spot_savings_percent']:.0f}%" if data['spot_savings_percent'] > 0 else "N/A"
print(f"{gpu:<15} ${data['spot_min']:<9.3f} ${data['spot_avg']:<9.3f} {data['available']:<8} {data['value_score']:<8.2f} {savings}")
print("\n" + "="*70)
print("π‘ Recommendations:")
for rec in analysis["recommendations"]:
print(f" {rec}")
print("="*70 + "\n")
elif args.action == "recommend":
recommendations = optimizer.recommend_gpu(
workload=args.workload,
min_vram=args.min_vram,
max_budget_hourly=args.budget
)
print("\n" + "="*70)
print(f"π― GPU RECOMMENDATIONS for {args.workload.upper()}")
if args.budget:
print(f" Budget: ${args.budget}/hr | Min VRAM: {args.min_vram}GB")
print("="*70)
for i, rec in enumerate(recommendations, 1):
print(f"\n#{i} {rec.gpu_type}")
print(f" π΅ ${rec.estimated_hourly:.3f}/hr (${rec.estimated_daily:.2f}/day)")
print(f" π {rec.reason}")
print(f" π― Confidence: {rec.confidence*100:.0f}%")
print("\n" + "="*70)
elif args.action == "estimate":
estimate = optimizer.estimate_cost(args.gpu, args.hours)
print("\n" + "="*70)
print("π° COST ESTIMATE")
print("="*70)
if "error" in estimate:
print(f"β {estimate['error']}")
else:
print(f" GPU: {estimate['gpu_type']}")
print(f" Type: {estimate['pricing_type']}")
print(f" Rate: ${estimate['hourly_rate']:.3f}/hr")
print(f" Hours: {estimate['hours']}")
print(f" Total: ${estimate['total_cost']:.2f}")
print(f" Available: {estimate['available_gpus']} GPUs")
print("="*70 + "\n")
elif args.action == "best-time":
best_time = optimizer.find_best_time(args.gpu)
print("\n" + "="*70)
print(f"β° BEST TIME TO RENT: {args.gpu}")
print("="*70)
print(f"\nπ
Day of Week:")
print(f" Best: {', '.join(best_time['day_of_week']['best'])}")
print(f" Worst: {', '.join(best_time['day_of_week']['worst'])}")
print(f" Savings: {best_time['day_of_week']['savings_estimate']}")
print(f"\nπ Time of Day:")
print(f" Best: {best_time['time_of_day']['best']}")
print(f" Worst: {best_time['time_of_day']['worst']}")
print(f" Savings: {best_time['time_of_day']['savings_estimate']}")
print(f"\nπ Current Status:")
status = best_time['current_status']
emoji = "β
" if status['is_good_time'] else "β οΈ"
print(f" {emoji} {status['day']} at {status['hour']}:00")
print(f" {'Good time to rent!' if status['is_good_time'] else 'Prices may be higher now'}")
print("="*70 + "\n")
elif args.action == "budget":
if not args.budget:
print("--budget required for budget planning")
return
plan = optimizer.budget_plan(
monthly_budget=args.budget,
workload_hours_per_day=args.hours
)
print("\n" + "="*70)
print("π MONTHLY BUDGET PLAN")
print("="*70)
print(f"\nπ° Budget: ${plan['monthly_budget']}/month")
print(f"β±οΈ Usage: {args.hours}h/day Γ 30 days = {plan['total_hours']}h/month")
print(f"π Max hourly rate: ${plan['max_hourly_rate']:.3f}/hr")
print(f"\n{'GPU Type':<15} {'$/hr':<10} {'$/month':<12} {'Budget %':<10} {'Status'}")
print("-"*60)
for p in plan['plans']:
status = "β
" if p['within_budget'] else "β"
print(f"{p['gpu_type']:<15} ${p['hourly_rate']:<9.3f} ${p['monthly_cost']:<11.2f} {p['budget_utilization']:<9.0f}% {status}")
print("="*70 + "\n")
if __name__ == "__main__":
main()