Distributed Training Across Multiple Servers

What We're Building

A distributed PyTorch training system that orchestrates multiple Clore GPUs, handles inter-node communication, and provides linear scaling for large model training — at a fraction of traditional cloud costs.

Prerequisites

  • Clore.ai API key

  • Python 3.10+

  • Understanding of PyTorch DDP (DistributedDataParallel)

Step 1: Multi-Node Cluster Manager

# cluster_manager.py
"""Manage a cluster of Clore GPU nodes for distributed training."""

import time
import threading
from typing import List, Dict, Optional
from dataclasses import dataclass
import requests

@dataclass
class NodeConfig:
    """Configuration for a cluster node."""
    server_id: int
    gpus: List[str]
    price_usd: float
    host: str = ""
    port: int = 22
    rank: int = 0

class CloreCluster:
    """Manage a distributed training cluster on Clore."""
    
    BASE_URL = "https://api.clore.ai"
    
    def __init__(self, api_key: str, ssh_password: str = "DistTrain123!"):
        self.api_key = api_key
        self.headers = {"auth": api_key}
        self.ssh_password = ssh_password
        self.nodes: List[NodeConfig] = []
        self.orders: List[int] = []
        self.master_addr: str = ""
        self.master_port: int = 29500
    
    def _request(self, method: str, endpoint: str, **kwargs) -> Dict:
        url = f"{self.BASE_URL}{endpoint}"
        response = requests.request(method, url, headers=self.headers, **kwargs)
        data = response.json()
        if data.get("code") != 0:
            raise Exception(f"API Error: {data}")
        return data
    
    def find_servers(self, gpu_type: str, count: int, 
                     max_price: float = 1.0) -> List[Dict]:
        """Find multiple available servers with matching GPUs."""
        
        servers = self._request("GET", "/v1/marketplace")["servers"]
        
        candidates = []
        for server in servers:
            if server.get("rented"):
                continue
            
            gpus = server.get("gpu_array", [])
            if not any(gpu_type in g for g in gpus):
                continue
            
            price = server.get("price", {}).get("usd", {}).get("on_demand_clore", 999)
            if price > max_price:
                continue
            
            candidates.append({
                "id": server["id"],
                "gpus": gpus,
                "gpu_count": len(gpus),
                "price": price,
                "reliability": server.get("reliability", 0)
            })
        
        # Sort by reliability, then price
        candidates.sort(key=lambda x: (-x["reliability"], x["price"]))
        
        if len(candidates) < count:
            raise Exception(f"Only found {len(candidates)} servers, need {count}")
        
        return candidates[:count]
    
    def provision_cluster(self, gpu_type: str, num_nodes: int,
                          max_price: float = 1.0,
                          image: str = "pytorch/pytorch:2.7.1-cuda12.8-cudnn9-devel"
                          ) -> List[NodeConfig]:
        """Provision a multi-node cluster."""
        
        print(f"🔍 Finding {num_nodes} {gpu_type} servers...")
        servers = self.find_servers(gpu_type, num_nodes, max_price)
        
        total_gpus = sum(s["gpu_count"] for s in servers)
        total_cost = sum(s["price"] for s in servers)
        print(f"   Found {len(servers)} servers with {total_gpus} GPUs @ ${total_cost:.2f}/hr total")
        
        # Provision all nodes in parallel
        print(f"\n📦 Provisioning {num_nodes} nodes...")
        threads = []
        results = [None] * num_nodes
        
        def provision_node(idx, server):
            order = self._request("POST", "/v1/create_order", json={
                "renting_server": server["id"],
                "type": "on-demand",
                "currency": "CLORE-Blockchain",
                "image": image,
                "ports": {
                    "22": "tcp",
                    "29500": "tcp",  # NCCL master port
                    "29501": "tcp",  # Additional NCCL port
                },
                "env": {
                    "NVIDIA_VISIBLE_DEVICES": "all",
                    "NCCL_DEBUG": "INFO"
                },
                "ssh_password": self.ssh_password
            })
            results[idx] = (server, order)
        
        for idx, server in enumerate(servers):
            t = threading.Thread(target=provision_node, args=(idx, server))
            threads.append(t)
            t.start()
        
        for t in threads:
            t.join()
        
        # Wait for all nodes to be ready
        print(f"⏳ Waiting for nodes to start...")
        for idx, (server, order) in enumerate(results):
            order_id = order["order_id"]
            self.orders.append(order_id)
            
            for _ in range(120):
                orders = self._request("GET", "/v1/my_orders")["orders"]
                current = next((o for o in orders if o["order_id"] == order_id), None)
                
                if current and current.get("status") == "running":
                    ssh_info = current["connection"]["ssh"]
                    parts = ssh_info.split()
                    host = parts[1].split("@")[1]
                    port = int(parts[3]) if len(parts) > 3 else 22
                    
                    node = NodeConfig(
                        server_id=server["id"],
                        gpus=server["gpus"],
                        price_usd=server["price"],
                        host=host,
                        port=port,
                        rank=idx
                    )
                    self.nodes.append(node)
                    print(f"   ✅ Node {idx}: {host}:{port} ({len(server['gpus'])} GPUs)")
                    break
                
                time.sleep(2)
            else:
                raise Exception(f"Timeout waiting for node {idx}")
        
        # Set master address (first node)
        self.master_addr = self.nodes[0].host
        
        return self.nodes
    
    def get_world_size(self) -> int:
        """Get total number of GPUs across all nodes."""
        return sum(len(node.gpus) for node in self.nodes)
    
    def get_cluster_info(self) -> Dict:
        """Get cluster information for distributed training."""
        return {
            "nodes": [
                {
                    "rank": node.rank,
                    "host": node.host,
                    "port": node.port,
                    "gpus": len(node.gpus)
                }
                for node in self.nodes
            ],
            "master_addr": self.master_addr,
            "master_port": self.master_port,
            "world_size": self.get_world_size(),
            "num_nodes": len(self.nodes),
            "total_cost_per_hour": sum(n.price_usd for n in self.nodes)
        }
    
    def shutdown(self):
        """Shutdown the cluster."""
        print("\n🧹 Shutting down cluster...")
        for order_id in self.orders:
            try:
                self._request("POST", "/v1/cancel_order", json={"id": order_id})
                print(f"   ✅ Cancelled order {order_id}")
            except Exception as e:
                print(f"   ⚠️  Failed to cancel {order_id}: {e}")
        
        self.nodes = []
        self.orders = []


if __name__ == "__main__":
    import sys
    
    api_key = sys.argv[1] if len(sys.argv) > 1 else "YOUR_API_KEY"
    
    cluster = CloreCluster(api_key)
    
    try:
        # Provision 2-node cluster
        nodes = cluster.provision_cluster(
            gpu_type="RTX 4090",
            num_nodes=2,
            max_price=0.50
        )
        
        info = cluster.get_cluster_info()
        print(f"\n📊 Cluster Ready:")
        print(f"   Nodes: {info['num_nodes']}")
        print(f"   Total GPUs: {info['world_size']}")
        print(f"   Cost: ${info['total_cost_per_hour']:.2f}/hr")
        print(f"   Master: {info['master_addr']}:{info['master_port']}")
        
        input("\nPress Enter to shutdown cluster...")
        
    finally:
        cluster.shutdown()

Step 2: Distributed Training Script

Step 3: Cluster Training Orchestrator

Scaling Guide

Nodes
GPUs
Effective Batch
Est. Speedup
Cost/hr

1

1

64

1.0x

~$0.35

1

2

128

1.9x

~$0.70

2

4

256

3.6x

~$1.40

4

8

512

6.8x

~$2.80

8

16

1024

12.0x

~$5.60

Speedup factors assume near-linear scaling with proper batch size adjustment

Best Practices

  1. Scale batch size with GPUs: effective_batch = per_gpu_batch * world_size

  2. Scale learning rate: scaled_lr = base_lr * world_size (linear scaling rule)

  3. Use gradient accumulation for very large effective batches

  4. Enable NCCL optimizations: Set NCCL_DEBUG=INFO for debugging

Cost Comparison: 8-GPU Training

Provider
Config
Cost/hr
10hr Training

Clore.ai

2x4-GPU nodes

~$2.80

~$28

AWS

p4d.24xlarge

~$32.77

~$328

GCP

a2-highgpu-8g

~$29.39

~$294

Savings: ~90% compared to major cloud providers

Next Steps

Last updated

Was this helpful?