Distributed Training Across Multiple Servers
What We're Building
Prerequisites
Step 1: Multi-Node Cluster Manager
# cluster_manager.py
"""Manage a cluster of Clore GPU nodes for distributed training."""
import time
import threading
from typing import List, Dict, Optional
from dataclasses import dataclass
import requests
@dataclass
class NodeConfig:
"""Configuration for a cluster node."""
server_id: int
gpus: List[str]
price_usd: float
host: str = ""
port: int = 22
rank: int = 0
class CloreCluster:
"""Manage a distributed training cluster on Clore."""
BASE_URL = "https://api.clore.ai"
def __init__(self, api_key: str, ssh_password: str = "DistTrain123!"):
self.api_key = api_key
self.headers = {"auth": api_key}
self.ssh_password = ssh_password
self.nodes: List[NodeConfig] = []
self.orders: List[int] = []
self.master_addr: str = ""
self.master_port: int = 29500
def _request(self, method: str, endpoint: str, **kwargs) -> Dict:
url = f"{self.BASE_URL}{endpoint}"
response = requests.request(method, url, headers=self.headers, **kwargs)
data = response.json()
if data.get("code") != 0:
raise Exception(f"API Error: {data}")
return data
def find_servers(self, gpu_type: str, count: int,
max_price: float = 1.0) -> List[Dict]:
"""Find multiple available servers with matching GPUs."""
servers = self._request("GET", "/v1/marketplace")["servers"]
candidates = []
for server in servers:
if server.get("rented"):
continue
gpus = server.get("gpu_array", [])
if not any(gpu_type in g for g in gpus):
continue
price = server.get("price", {}).get("usd", {}).get("on_demand_clore", 999)
if price > max_price:
continue
candidates.append({
"id": server["id"],
"gpus": gpus,
"gpu_count": len(gpus),
"price": price,
"reliability": server.get("reliability", 0)
})
# Sort by reliability, then price
candidates.sort(key=lambda x: (-x["reliability"], x["price"]))
if len(candidates) < count:
raise Exception(f"Only found {len(candidates)} servers, need {count}")
return candidates[:count]
def provision_cluster(self, gpu_type: str, num_nodes: int,
max_price: float = 1.0,
image: str = "pytorch/pytorch:2.7.1-cuda12.8-cudnn9-devel"
) -> List[NodeConfig]:
"""Provision a multi-node cluster."""
print(f"🔍 Finding {num_nodes} {gpu_type} servers...")
servers = self.find_servers(gpu_type, num_nodes, max_price)
total_gpus = sum(s["gpu_count"] for s in servers)
total_cost = sum(s["price"] for s in servers)
print(f" Found {len(servers)} servers with {total_gpus} GPUs @ ${total_cost:.2f}/hr total")
# Provision all nodes in parallel
print(f"\n📦 Provisioning {num_nodes} nodes...")
threads = []
results = [None] * num_nodes
def provision_node(idx, server):
order = self._request("POST", "/v1/create_order", json={
"renting_server": server["id"],
"type": "on-demand",
"currency": "CLORE-Blockchain",
"image": image,
"ports": {
"22": "tcp",
"29500": "tcp", # NCCL master port
"29501": "tcp", # Additional NCCL port
},
"env": {
"NVIDIA_VISIBLE_DEVICES": "all",
"NCCL_DEBUG": "INFO"
},
"ssh_password": self.ssh_password
})
results[idx] = (server, order)
for idx, server in enumerate(servers):
t = threading.Thread(target=provision_node, args=(idx, server))
threads.append(t)
t.start()
for t in threads:
t.join()
# Wait for all nodes to be ready
print(f"⏳ Waiting for nodes to start...")
for idx, (server, order) in enumerate(results):
order_id = order["order_id"]
self.orders.append(order_id)
for _ in range(120):
orders = self._request("GET", "/v1/my_orders")["orders"]
current = next((o for o in orders if o["order_id"] == order_id), None)
if current and current.get("status") == "running":
ssh_info = current["connection"]["ssh"]
parts = ssh_info.split()
host = parts[1].split("@")[1]
port = int(parts[3]) if len(parts) > 3 else 22
node = NodeConfig(
server_id=server["id"],
gpus=server["gpus"],
price_usd=server["price"],
host=host,
port=port,
rank=idx
)
self.nodes.append(node)
print(f" ✅ Node {idx}: {host}:{port} ({len(server['gpus'])} GPUs)")
break
time.sleep(2)
else:
raise Exception(f"Timeout waiting for node {idx}")
# Set master address (first node)
self.master_addr = self.nodes[0].host
return self.nodes
def get_world_size(self) -> int:
"""Get total number of GPUs across all nodes."""
return sum(len(node.gpus) for node in self.nodes)
def get_cluster_info(self) -> Dict:
"""Get cluster information for distributed training."""
return {
"nodes": [
{
"rank": node.rank,
"host": node.host,
"port": node.port,
"gpus": len(node.gpus)
}
for node in self.nodes
],
"master_addr": self.master_addr,
"master_port": self.master_port,
"world_size": self.get_world_size(),
"num_nodes": len(self.nodes),
"total_cost_per_hour": sum(n.price_usd for n in self.nodes)
}
def shutdown(self):
"""Shutdown the cluster."""
print("\n🧹 Shutting down cluster...")
for order_id in self.orders:
try:
self._request("POST", "/v1/cancel_order", json={"id": order_id})
print(f" ✅ Cancelled order {order_id}")
except Exception as e:
print(f" ⚠️ Failed to cancel {order_id}: {e}")
self.nodes = []
self.orders = []
if __name__ == "__main__":
import sys
api_key = sys.argv[1] if len(sys.argv) > 1 else "YOUR_API_KEY"
cluster = CloreCluster(api_key)
try:
# Provision 2-node cluster
nodes = cluster.provision_cluster(
gpu_type="RTX 4090",
num_nodes=2,
max_price=0.50
)
info = cluster.get_cluster_info()
print(f"\n📊 Cluster Ready:")
print(f" Nodes: {info['num_nodes']}")
print(f" Total GPUs: {info['world_size']}")
print(f" Cost: ${info['total_cost_per_hour']:.2f}/hr")
print(f" Master: {info['master_addr']}:{info['master_port']}")
input("\nPress Enter to shutdown cluster...")
finally:
cluster.shutdown()Step 2: Distributed Training Script
Step 3: Cluster Training Orchestrator
Scaling Guide
Nodes
GPUs
Effective Batch
Est. Speedup
Cost/hr
Best Practices
Cost Comparison: 8-GPU Training
Provider
Config
Cost/hr
10hr Training
Next Steps
Last updated
Was this helpful?