Copy #!/usr/bin/env python3
"""
YOLOv8 Training on Clore.ai GPUs.
Usage:
# Train with local dataset
python train_yolo.py --api-key YOUR_API_KEY --data dataset.yaml --model yolov8s.pt --epochs 100
# Train with Roboflow dataset
python train_yolo.py --api-key YOUR_API_KEY --roboflow WORKSPACE/PROJECT/VERSION --model yolov8m.pt
"""
import argparse
import os
import time
import json
import secrets
import requests
import paramiko
from scp import SCPClient
from typing import Dict, Optional
from dataclasses import dataclass
@dataclass
class TrainingResult:
model_path: str
mAP50: float
mAP50_95: float
precision: float
recall: float
epochs: int
time_seconds: float
cost_usd: float
success: bool
class CloreYOLOTrainer:
"""Complete YOLOv8 training on Clore.ai."""
BASE_URL = "https://api.clore.ai"
IMAGE = "ultralytics/ultralytics:latest-python"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {"auth": api_key}
self.order_id = None
self.ssh_host = None
self.ssh_port = None
self.ssh_password = None
self.hourly_cost = 0.0
self._ssh = None
self._scp = None
def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
url = f"{self.BASE_URL}{endpoint}"
for attempt in range(3):
response = requests.request(method, url, headers=self.headers, **kwargs)
data = response.json()
if data.get("code") == 5:
time.sleep(2 ** attempt)
continue
if data.get("code") != 0:
raise Exception(f"API Error: {data}")
return data
raise Exception("Max retries")
def setup(self, max_price: float = 0.50):
print("🔍 Finding GPU...")
servers = self._api("GET", "/v1/marketplace")["servers"]
gpus = ["RTX 4090", "RTX 4080", "RTX 3090", "RTX 3080", "A100"]
candidates = []
for s in servers:
if s.get("rented"):
continue
gpu_array = s.get("gpu_array", [])
if not any(any(g in gpu for g in gpus) for gpu in gpu_array):
continue
price = s.get("price", {}).get("usd", {}).get("spot")
if price and price <= max_price:
candidates.append({"id": s["id"], "gpus": gpu_array, "price": price})
if not candidates:
raise Exception(f"No GPU under ${max_price}/hr")
gpu = min(candidates, key=lambda x: x["price"])
print(f" {gpu['gpus']} @ ${gpu['price']:.2f}/hr")
self.ssh_password = secrets.token_urlsafe(16)
self.hourly_cost = gpu["price"]
print("🚀 Provisioning server...")
order_data = {
"renting_server": gpu["id"],
"type": "spot",
"currency": "CLORE-Blockchain",
"image": self.IMAGE,
"ports": {"22": "tcp"},
"env": {"NVIDIA_VISIBLE_DEVICES": "all"},
"ssh_password": self.ssh_password,
"spotprice": gpu["price"] * 1.15
}
result = self._api("POST", "/v1/create_order", json=order_data)
self.order_id = result["order_id"]
print("⏳ Waiting for server...")
for _ in range(120):
orders = self._api("GET", "/v1/my_orders")["orders"]
order = next((o for o in orders if o["order_id"] == self.order_id), None)
if order and order.get("status") == "running":
conn = order["connection"]["ssh"]
parts = conn.split()
self.ssh_host = parts[1].split("@")[1]
self.ssh_port = int(parts[-1]) if "-p" in conn else 22
break
time.sleep(2)
else:
raise Exception("Timeout")
# Connect SSH
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._ssh.connect(self.ssh_host, port=self.ssh_port,
username="root", password=self.ssh_password, timeout=30)
self._scp = SCPClient(self._ssh.get_transport())
print(f"✅ Server ready: {self.ssh_host}:{self.ssh_port}")
# Setup YOLO
print("📦 Setting up YOLOv8...")
self._exec("pip install -q ultralytics", timeout=120)
def _exec(self, cmd: str, timeout: int = 86400) -> str:
stdin, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout)
stdout.channel.recv_exit_status()
return stdout.read().decode()
def upload_dataset(self, local_path: str) -> str:
"""Upload local dataset."""
print(f"📤 Uploading dataset from {local_path}...")
remote_path = "/tmp/dataset"
self._exec(f"mkdir -p {remote_path}")
self._scp.put(local_path, remote_path, recursive=True)
return remote_path
def download_roboflow(self, workspace: str, project: str, version: int, api_key: str) -> str:
"""Download dataset from Roboflow."""
print(f"📥 Downloading from Roboflow: {workspace}/{project}/v{version}")
script = f'''
from roboflow import Roboflow
rf = Roboflow(api_key="{api_key}")
project = rf.workspace("{workspace}").project("{project}")
dataset = project.version({version}).download("yolov8", location="/tmp/dataset")
print("DONE:/tmp/dataset/data.yaml")
'''
output = self._exec(f"python3 -c '{script}'", timeout=600)
for line in output.split("\n"):
if line.startswith("DONE:"):
return line[5:]
return "/tmp/dataset/data.yaml"
def train(self, data_yaml: str, model: str = "yolov8n.pt", epochs: int = 100,
batch: int = 16, imgsz: int = 640) -> TrainingResult:
script = f'''
import json
import time
from ultralytics import YOLO
start = time.time()
result = {{"success": False}}
try:
model = YOLO("{model}")
results = model.train(
data="{data_yaml}",
epochs={epochs},
batch={batch},
imgsz={imgsz},
device=0,
project="/tmp/runs",
name="train",
exist_ok=True,
verbose=True
)
metrics = model.val()
result = {{
"success": True,
"model_path": "/tmp/runs/train/weights/best.pt",
"mAP50": float(metrics.box.map50) if hasattr(metrics.box, 'map50') else 0,
"mAP50_95": float(metrics.box.map) if hasattr(metrics.box, 'map') else 0,
"precision": float(metrics.box.mp) if hasattr(metrics.box, 'mp') else 0,
"recall": float(metrics.box.mr) if hasattr(metrics.box, 'mr') else 0,
"epochs": {epochs},
"time": time.time() - start
}}
except Exception as e:
result = {{"success": False, "error": str(e)}}
print("RESULT:" + json.dumps(result))
'''
self._exec(f"cat > /tmp/train.py << 'EOF'\n{script}\nEOF")
print(f"🎯 Training {model} for {epochs} epochs...")
start = time.time()
output = self._exec("python3 /tmp/train.py 2>&1", timeout=86400)
elapsed = time.time() - start
# Parse result
result_data = {"success": False}
for line in output.split("\n"):
if line.startswith("RESULT:"):
result_data = json.loads(line[7:])
break
cost = (elapsed / 3600) * self.hourly_cost
return TrainingResult(
model_path=result_data.get("model_path", ""),
mAP50=result_data.get("mAP50", 0),
mAP50_95=result_data.get("mAP50_95", 0),
precision=result_data.get("precision", 0),
recall=result_data.get("recall", 0),
epochs=result_data.get("epochs", 0),
time_seconds=elapsed,
cost_usd=cost,
success=result_data.get("success", False)
)
def export(self, model_path: str, format: str = "onnx") -> str:
"""Export model to different format."""
script = f'''
from ultralytics import YOLO
model = YOLO("{model_path}")
path = model.export(format="{format}")
print(f"EXPORTED:{{path}}")
'''
output = self._exec(f"python3 -c '{script}'", timeout=600)
for line in output.split("\n"):
if line.startswith("EXPORTED:"):
return line[9:]
return ""
def download_model(self, remote_path: str, local_path: str):
"""Download model file."""
os.makedirs(os.path.dirname(local_path) or ".", exist_ok=True)
self._scp.get(remote_path, local_path)
def cleanup(self):
if self._scp:
self._scp.close()
if self._ssh:
self._ssh.close()
if self.order_id:
print("🧹 Releasing server...")
self._api("POST", "/v1/cancel_order", json={"id": self.order_id})
def __enter__(self):
return self
def __exit__(self, *args):
self.cleanup()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--api-key", required=True, help="Clore.ai API key")
parser.add_argument("--data", help="Local dataset path or dataset.yaml")
parser.add_argument("--roboflow", help="Roboflow dataset (WORKSPACE/PROJECT/VERSION)")
parser.add_argument("--roboflow-key", help="Roboflow API key")
parser.add_argument("--model", default="yolov8n.pt", help="Base model")
parser.add_argument("--epochs", type=int, default=100)
parser.add_argument("--batch", type=int, default=16)
parser.add_argument("--imgsz", type=int, default=640)
parser.add_argument("--output", default="./best.pt")
parser.add_argument("--export", choices=["onnx", "torchscript", "tflite", "coreml"])
parser.add_argument("--max-price", type=float, default=0.50)
args = parser.parse_args()
with CloreYOLOTrainer(args.api_key) as trainer:
trainer.setup(args.max_price)
# Get dataset
if args.roboflow:
parts = args.roboflow.split("/")
workspace, project, version = parts[0], parts[1], int(parts[2])
data_yaml = trainer.download_roboflow(workspace, project, version, args.roboflow_key)
elif args.data:
if os.path.isdir(args.data):
trainer.upload_dataset(args.data)
data_yaml = "/tmp/dataset/data.yaml"
else:
trainer._scp.put(args.data, "/tmp/data.yaml")
data_yaml = "/tmp/data.yaml"
else:
# Use COCO128 for demo
data_yaml = "coco128.yaml"
# Train
result = trainer.train(data_yaml, args.model, args.epochs, args.batch, args.imgsz)
print("\n" + "="*60)
print("📊 TRAINING COMPLETE")
print("="*60)
print(f" Model: {args.model}")
print(f" Epochs: {result.epochs}")
print(f" Time: {result.time_seconds:.1f}s ({result.time_seconds/60:.1f} min)")
print(f" Cost: ${result.cost_usd:.4f}")
print(f"\n📈 Metrics:")
print(f" mAP50: {result.mAP50:.4f}")
print(f" mAP50-95: {result.mAP50_95:.4f}")
print(f" Precision: {result.precision:.4f}")
print(f" Recall: {result.recall:.4f}")
if result.success and result.model_path:
# Download model
trainer.download_model(result.model_path, args.output)
print(f"\n✅ Model saved: {args.output}")
# Export if requested
if args.export:
print(f"\n📦 Exporting to {args.export}...")
exported = trainer.export(result.model_path, args.export)
if exported:
export_local = args.output.replace(".pt", f".{args.export}")
trainer.download_model(exported, export_local)
print(f" Exported: {export_local}")
if __name__ == "__main__":
main()