Copy #!/usr/bin/env python3
"""
Reinforcement Learning Training on Clore.ai GPUs.
Usage:
python train_rl.py --api-key YOUR_API_KEY --env CartPole-v1 --algo ppo --timesteps 100000
"""
import argparse
import os
import time
import json
import secrets
import requests
import paramiko
from scp import SCPClient
from dataclasses import dataclass
from typing import Optional, Dict
@dataclass
class TrainingResult:
algorithm: str
env_id: str
timesteps: int
time_seconds: float
final_reward: float
model_path: str
success: bool
cost_usd: float
class CloreRLTrainer:
"""Complete RL training solution on Clore.ai."""
BASE_URL = "https://api.clore.ai"
IMAGE = "pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {"auth": api_key}
self.order_id = None
self.ssh_host = None
self.ssh_port = None
self.ssh_password = None
self.hourly_cost = 0.0
self._ssh = None
self._scp = None
def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
url = f"{self.BASE_URL}{endpoint}"
for attempt in range(3):
response = requests.request(method, url, headers=self.headers, **kwargs)
data = response.json()
if data.get("code") == 5:
time.sleep(2 ** attempt)
continue
if data.get("code") != 0:
raise Exception(f"API Error: {data}")
return data
raise Exception("Max retries")
def setup(self, max_price: float = 0.40):
print("🔍 Finding GPU...")
servers = self._api("GET", "/v1/marketplace")["servers"]
gpus = ["RTX 4090", "RTX 4080", "RTX 3090", "RTX 3080", "A100"]
candidates = []
for s in servers:
if s.get("rented"):
continue
gpu_array = s.get("gpu_array", [])
if not any(any(g in gpu for g in gpus) for gpu in gpu_array):
continue
price = s.get("price", {}).get("usd", {}).get("spot")
if price and price <= max_price:
candidates.append({"id": s["id"], "gpus": gpu_array, "price": price})
if not candidates:
raise Exception(f"No GPU under ${max_price}/hr")
gpu = min(candidates, key=lambda x: x["price"])
print(f" {gpu['gpus']} @ ${gpu['price']:.2f}/hr")
self.ssh_password = secrets.token_urlsafe(16)
self.hourly_cost = gpu["price"]
print("🚀 Provisioning server...")
order_data = {
"renting_server": gpu["id"],
"type": "spot",
"currency": "CLORE-Blockchain",
"image": self.IMAGE,
"ports": {"22": "tcp", "6006": "http"},
"env": {"NVIDIA_VISIBLE_DEVICES": "all"},
"ssh_password": self.ssh_password,
"spotprice": gpu["price"] * 1.15
}
result = self._api("POST", "/v1/create_order", json=order_data)
self.order_id = result["order_id"]
print("⏳ Waiting for server...")
for _ in range(120):
orders = self._api("GET", "/v1/my_orders")["orders"]
order = next((o for o in orders if o["order_id"] == self.order_id), None)
if order and order.get("status") == "running":
conn = order["connection"]["ssh"]
parts = conn.split()
self.ssh_host = parts[1].split("@")[1]
self.ssh_port = int(parts[-1]) if "-p" in conn else 22
break
time.sleep(2)
else:
raise Exception("Timeout")
# Connect SSH
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._ssh.connect(self.ssh_host, port=self.ssh_port,
username="root", password=self.ssh_password, timeout=30)
self._scp = SCPClient(self._ssh.get_transport())
print(f"✅ Server ready: {self.ssh_host}:{self.ssh_port}")
# Install packages
print("📦 Installing RL packages...")
self._exec("pip install -q stable-baselines3[extra] gymnasium tensorboard", timeout=300)
def _exec(self, cmd: str, timeout: int = 7200) -> str:
stdin, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout)
stdout.channel.recv_exit_status()
return stdout.read().decode()
def train(self, env_id: str, algorithm: str, timesteps: int,
learning_rate: float = 3e-4, n_envs: int = 4) -> TrainingResult:
algo_class = algorithm.upper()
script = f'''
import gymnasium as gym
import time
import json
from stable_baselines3 import {algo_class}
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
env = make_vec_env("{env_id}", n_envs={n_envs})
eval_env = make_vec_env("{env_id}", n_envs=1)
model = {algo_class}("MlpPolicy", env, learning_rate={learning_rate}, verbose=1, device="cuda")
start = time.time()
model.learn(total_timesteps={timesteps}, progress_bar=True)
train_time = time.time() - start
model.save("/tmp/model")
mean_reward, _ = evaluate_policy(model, eval_env, n_eval_episodes=10)
result = {{"success": True, "time": train_time, "reward": float(mean_reward)}}
print("RESULT:" + json.dumps(result))
'''
self._exec(f"cat > /tmp/train.py << 'EOF'\n{script}\nEOF")
print(f"🎮 Training {algorithm.upper()} on {env_id}...")
print(f" Timesteps: {timesteps:,}")
start = time.time()
output = self._exec("python3 /tmp/train.py 2>&1", timeout=7200)
elapsed = time.time() - start
# Parse result
result_data = {"success": False, "reward": 0, "time": elapsed}
for line in output.split("\n"):
if line.startswith("RESULT:"):
result_data = json.loads(line[7:])
break
cost = (elapsed / 3600) * self.hourly_cost
return TrainingResult(
algorithm=algorithm,
env_id=env_id,
timesteps=timesteps,
time_seconds=elapsed,
final_reward=result_data.get("reward", 0),
model_path="/tmp/model.zip",
success=result_data.get("success", False),
cost_usd=cost
)
def download_model(self, local_path: str):
self._scp.get("/tmp/model.zip", local_path)
def cleanup(self):
if self._scp:
self._scp.close()
if self._ssh:
self._ssh.close()
if self.order_id:
print("🧹 Releasing server...")
self._api("POST", "/v1/cancel_order", json={"id": self.order_id})
def __enter__(self):
return self
def __exit__(self, *args):
self.cleanup()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--api-key", required=True)
parser.add_argument("--env", default="CartPole-v1", help="Gym environment")
parser.add_argument("--algo", default="ppo", choices=["ppo", "sac", "dqn", "a2c", "td3"])
parser.add_argument("--timesteps", type=int, default=100000)
parser.add_argument("--lr", type=float, default=3e-4)
parser.add_argument("--output", default="./model.zip")
parser.add_argument("--max-price", type=float, default=0.40)
args = parser.parse_args()
with CloreRLTrainer(args.api_key) as trainer:
trainer.setup(args.max_price)
result = trainer.train(
env_id=args.env,
algorithm=args.algo,
timesteps=args.timesteps,
learning_rate=args.lr
)
print("\n" + "="*60)
print("📊 TRAINING COMPLETE")
print(f" Algorithm: {result.algorithm.upper()}")
print(f" Environment: {result.env_id}")
print(f" Timesteps: {result.timesteps:,}")
print(f" Time: {result.time_seconds:.1f}s ({result.time_seconds/60:.1f} min)")
print(f" Final Reward: {result.final_reward:.2f}")
print(f" Cost: ${result.cost_usd:.4f}")
if result.success:
trainer.download_model(args.output)
print(f" Model: {args.output}")
if __name__ == "__main__":
main()