# Building a Distributed Rendering Farm

## What We're Building

A production-ready distributed rendering farm that automatically provisions Clore.ai GPUs, distributes Blender rendering jobs across multiple nodes, and achieves massive parallelization for complex 3D scenes and animations.

**Key Features:**

* Automatic multi-GPU provisioning via Clore.ai API
* Frame-based job distribution across render nodes
* Support for Cycles GPU rendering (CUDA/OptiX)
* Real-time progress monitoring
* Automatic frame collection and video assembly
* Cost-optimized spot instance usage
* Fault tolerance with automatic re-rendering of failed frames

## Prerequisites

* Clore.ai account with API key ([get one here](https://clore.ai))
* Python 3.10+
* Blender project files (.blend)
* Basic understanding of 3D rendering

```bash
pip install requests paramiko scp tqdm
```

## Architecture Overview

```
                    ┌─────────────────┐
                    │   Coordinator   │
                    │   (Your PC)     │
                    └────────┬────────┘
                             │
         ┌───────────────────┼───────────────────┐
         │                   │                   │
         ▼                   ▼                   ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│  Render Node 1  │ │  Render Node 2  │ │  Render Node N  │
│  RTX 4090       │ │  RTX 3090       │ │  A100           │
│  Frames 1-100   │ │  Frames 101-200 │ │  Frames 201-300 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
         │                   │                   │
         └───────────────────┼───────────────────┘
                             │
                    ┌────────▼────────┐
                    │  Output Storage │
                    │  (Frames/Video) │
                    └─────────────────┘
```

## Step 1: Clore.ai Render Farm Client

```python
# render_farm_client.py
import requests
import time
import secrets
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed

@dataclass
class RenderNode:
    """Represents a single render node in the farm."""
    server_id: int
    order_id: int
    ssh_host: str
    ssh_port: int
    ssh_password: str
    gpu_model: str
    gpu_count: int
    hourly_cost: float
    status: str = "initializing"
    frames_assigned: List[int] = None
    frames_completed: List[int] = None
    
    def __post_init__(self):
        self.frames_assigned = self.frames_assigned or []
        self.frames_completed = self.frames_completed or []


class CloreRenderFarmClient:
    """Clore.ai client for distributed rendering operations."""
    
    BASE_URL = "https://api.clore.ai"
    
    # Docker image with Blender + CUDA support
    BLENDER_IMAGE = "linuxserver/blender:latest"
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {"auth": api_key}
        self.nodes: List[RenderNode] = []
    
    def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
        """Make API request with retry logic."""
        url = f"{self.BASE_URL}{endpoint}"
        
        for attempt in range(3):
            try:
                response = requests.request(
                    method, url,
                    headers=self.headers,
                    timeout=30,
                    **kwargs
                )
                data = response.json()
                
                if data.get("code") == 5:  # Rate limited
                    time.sleep(2 ** attempt)
                    continue
                
                if data.get("code") != 0:
                    raise Exception(f"API Error: {data}")
                return data
                
            except requests.exceptions.Timeout:
                if attempt == 2:
                    raise
                time.sleep(1)
        
        raise Exception("Max retries exceeded")
    
    def find_render_gpus(self, 
                         count: int = 3,
                         min_vram_gb: int = 8,
                         max_price_usd: float = 0.50,
                         prefer_spot: bool = True) -> List[Dict]:
        """Find multiple GPUs suitable for rendering."""
        
        servers = self._request("GET", "/v1/marketplace")["servers"]
        
        # GPUs with good CUDA/OptiX support for Blender
        render_gpus = [
            "RTX 4090", "RTX 4080", "RTX 3090", "RTX 3080", "RTX 3070",
            "A100", "A6000", "A5000", "A4000", "RTX A4000"
        ]
        
        candidates = []
        for server in servers:
            if server.get("rented"):
                continue
            
            gpu_array = server.get("gpu_array", [])
            if not any(any(g in gpu for g in render_gpus) for gpu in gpu_array):
                continue
            
            price_data = server.get("price", {}).get("usd", {})
            price = price_data.get("spot" if prefer_spot else "on_demand_clore")
            
            if not price or price > max_price_usd:
                continue
            
            candidates.append({
                "id": server["id"],
                "gpus": gpu_array,
                "gpu_count": len(gpu_array),
                "price_usd": price,
                "reliability": server.get("reliability", 0),
                "specs": server.get("specs", {})
            })
        
        # Sort by value: price per GPU, then reliability
        candidates.sort(key=lambda x: (x["price_usd"] / x["gpu_count"], -x["reliability"]))
        
        return candidates[:count]
    
    def provision_render_node(self, 
                              server: Dict,
                              use_spot: bool = True) -> RenderNode:
        """Provision a single render node."""
        
        ssh_password = secrets.token_urlsafe(16)
        
        order_data = {
            "renting_server": server["id"],
            "type": "spot" if use_spot else "on-demand",
            "currency": "CLORE-Blockchain",
            "image": self.BLENDER_IMAGE,
            "ports": {"22": "tcp"},
            "env": {
                "NVIDIA_VISIBLE_DEVICES": "all",
                "NVIDIA_DRIVER_CAPABILITIES": "all"
            },
            "ssh_password": ssh_password
        }
        
        if use_spot:
            order_data["spotprice"] = server["price_usd"] * 1.15
        
        result = self._request("POST", "/v1/create_order", json=order_data)
        order_id = result["order_id"]
        
        # Wait for ready
        for _ in range(120):
            orders = self._request("GET", "/v1/my_orders")["orders"]
            order = next((o for o in orders if o["order_id"] == order_id), None)
            
            if order and order.get("status") == "running":
                conn = order["connection"]["ssh"]
                parts = conn.split()
                ssh_host = parts[1].split("@")[1] if "@" in parts[1] else parts[1]
                ssh_port = int(parts[-1]) if "-p" in conn else 22
                
                node = RenderNode(
                    server_id=server["id"],
                    order_id=order_id,
                    ssh_host=ssh_host,
                    ssh_port=ssh_port,
                    ssh_password=ssh_password,
                    gpu_model=server["gpus"][0] if server["gpus"] else "Unknown",
                    gpu_count=server["gpu_count"],
                    hourly_cost=server["price_usd"],
                    status="ready"
                )
                self.nodes.append(node)
                return node
            
            time.sleep(2)
        
        raise Exception(f"Timeout waiting for server {server['id']}")
    
    def provision_farm(self,
                       node_count: int = 3,
                       max_price_per_node: float = 0.50,
                       use_spot: bool = True) -> List[RenderNode]:
        """Provision multiple render nodes in parallel."""
        
        print(f"🔍 Finding {node_count} render GPUs...")
        gpus = self.find_render_gpus(
            count=node_count,
            max_price_usd=max_price_per_node,
            prefer_spot=use_spot
        )
        
        if len(gpus) < node_count:
            print(f"⚠️  Only found {len(gpus)} suitable GPUs")
        
        print(f"🚀 Provisioning {len(gpus)} render nodes...")
        
        # Provision in parallel
        with ThreadPoolExecutor(max_workers=len(gpus)) as executor:
            futures = {
                executor.submit(self.provision_render_node, gpu, use_spot): gpu
                for gpu in gpus
            }
            
            for future in as_completed(futures):
                gpu = futures[future]
                try:
                    node = future.result()
                    print(f"   ✅ Node ready: {node.gpu_model} x{node.gpu_count} @ ${node.hourly_cost:.2f}/hr")
                except Exception as e:
                    print(f"   ❌ Failed to provision {gpu['id']}: {e}")
        
        return self.nodes
    
    def release_farm(self):
        """Release all render nodes."""
        print("🧹 Releasing render farm...")
        
        for node in self.nodes:
            try:
                self._request("POST", "/v1/cancel_order", json={"id": node.order_id})
                print(f"   Released node {node.order_id}")
            except Exception as e:
                print(f"   Failed to release {node.order_id}: {e}")
        
        self.nodes = []
    
    def get_total_gpus(self) -> int:
        """Get total GPU count across all nodes."""
        return sum(node.gpu_count for node in self.nodes)
    
    def get_hourly_cost(self) -> float:
        """Get total hourly cost of the farm."""
        return sum(node.hourly_cost for node in self.nodes)
```

## Step 2: Blender Render Engine

```python
# blender_render.py
import subprocess
import os
import paramiko
from scp import SCPClient
from typing import List, Dict, Tuple
from dataclasses import dataclass
import json

@dataclass
class RenderJob:
    """Represents a rendering job."""
    blend_file: str
    output_dir: str
    start_frame: int
    end_frame: int
    output_format: str = "PNG"
    resolution_x: int = 1920
    resolution_y: int = 1080
    samples: int = 128
    render_engine: str = "CYCLES"
    device: str = "GPU"

class BlenderRenderEngine:
    """Manages Blender rendering on a remote GPU node."""
    
    def __init__(self, ssh_host: str, ssh_port: int, ssh_password: str):
        self.ssh_host = ssh_host
        self.ssh_port = ssh_port
        self.ssh_password = ssh_password
        self._ssh_client = None
        self._scp_client = None
    
    def connect(self):
        """Establish SSH connection."""
        self._ssh_client = paramiko.SSHClient()
        self._ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self._ssh_client.connect(
            self.ssh_host,
            port=self.ssh_port,
            username="root",
            password=self.ssh_password,
            timeout=30
        )
        self._scp_client = SCPClient(self._ssh_client.get_transport())
    
    def disconnect(self):
        """Close SSH connection."""
        if self._scp_client:
            self._scp_client.close()
        if self._ssh_client:
            self._ssh_client.close()
    
    def _exec(self, cmd: str) -> Tuple[str, str]:
        """Execute command on remote node."""
        stdin, stdout, stderr = self._ssh_client.exec_command(cmd, timeout=3600)
        return stdout.read().decode(), stderr.read().decode()
    
    def upload_file(self, local_path: str, remote_path: str):
        """Upload file to render node."""
        self._scp_client.put(local_path, remote_path)
    
    def download_file(self, remote_path: str, local_path: str):
        """Download file from render node."""
        self._scp_client.get(remote_path, local_path)
    
    def download_directory(self, remote_path: str, local_path: str):
        """Download directory from render node."""
        self._scp_client.get(remote_path, local_path, recursive=True)
    
    def setup_blender(self):
        """Ensure Blender is properly installed and configured."""
        # Check Blender version
        out, err = self._exec("blender --version")
        print(f"Blender: {out.strip().split(chr(10))[0]}")
        
        # Check GPU
        out, err = self._exec("nvidia-smi --query-gpu=name --format=csv,noheader")
        print(f"GPUs: {out.strip()}")
        
        # Create working directories
        self._exec("mkdir -p /tmp/blend_input /tmp/blend_output")
    
    def build_render_command(self, job: RenderJob, frames: List[int]) -> str:
        """Build Blender command line for rendering."""
        
        # Build frame list or range
        if len(frames) == 1:
            frame_arg = f"-f {frames[0]}"
        elif frames == list(range(frames[0], frames[-1] + 1)):
            frame_arg = f"-s {frames[0]} -e {frames[-1]} -a"
        else:
            # Non-contiguous frames - render one by one
            frame_arg = f"-f {frames[0]}"  # We'll call multiple times
        
        cmd = f"""blender -b /tmp/blend_input/{os.path.basename(job.blend_file)} \
            -E {job.render_engine} \
            -o /tmp/blend_output/frame_#### \
            -F {job.output_format} \
            -x 1 \
            {frame_arg} \
            -- --cycles-device {job.device}"""
        
        return cmd
    
    def render_frames(self, job: RenderJob, frames: List[int]) -> Dict:
        """Render specified frames."""
        
        results = {
            "frames_requested": frames,
            "frames_completed": [],
            "frames_failed": [],
            "render_times": {}
        }
        
        # Upload blend file
        remote_blend = f"/tmp/blend_input/{os.path.basename(job.blend_file)}"
        print(f"   Uploading {os.path.basename(job.blend_file)}...")
        self.upload_file(job.blend_file, remote_blend)
        
        # Render each frame (or range for contiguous)
        for frame in frames:
            print(f"   Rendering frame {frame}...")
            
            cmd = f"""blender -b {remote_blend} \
                -E {job.render_engine} \
                -o /tmp/blend_output/frame_#### \
                -F {job.output_format} \
                -x 1 \
                -f {frame} \
                -- --cycles-device {job.device}"""
            
            import time
            start = time.time()
            out, err = self._exec(cmd)
            elapsed = time.time() - start
            
            # Check if frame was rendered
            frame_file = f"/tmp/blend_output/frame_{frame:04d}.png"
            check_out, _ = self._exec(f"ls {frame_file} 2>/dev/null")
            
            if check_out.strip():
                results["frames_completed"].append(frame)
                results["render_times"][frame] = elapsed
                print(f"      ✅ Frame {frame} completed in {elapsed:.1f}s")
            else:
                results["frames_failed"].append(frame)
                print(f"      ❌ Frame {frame} failed")
        
        return results
    
    def get_rendered_frames(self, local_output_dir: str) -> List[str]:
        """Download all rendered frames."""
        os.makedirs(local_output_dir, exist_ok=True)
        
        # List rendered files
        out, _ = self._exec("ls /tmp/blend_output/")
        files = [f.strip() for f in out.strip().split("\n") if f.strip()]
        
        downloaded = []
        for f in files:
            remote = f"/tmp/blend_output/{f}"
            local = os.path.join(local_output_dir, f)
            self.download_file(remote, local)
            downloaded.append(local)
        
        return downloaded
```

## Step 3: Distributed Job Coordinator

```python
# render_coordinator.py
import os
import time
import math
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass

from render_farm_client import CloreRenderFarmClient, RenderNode
from blender_render import BlenderRenderEngine, RenderJob

@dataclass
class FarmStats:
    """Statistics for the render farm."""
    total_frames: int
    completed_frames: int
    failed_frames: int
    total_time_seconds: float
    total_cost_usd: float
    avg_frame_time: float
    nodes_used: int
    
    @property
    def progress_percent(self) -> float:
        return (self.completed_frames / self.total_frames * 100) if self.total_frames > 0 else 0


class RenderCoordinator:
    """Coordinates distributed rendering across multiple GPU nodes."""
    
    def __init__(self, api_key: str):
        self.client = CloreRenderFarmClient(api_key)
        self.job: RenderJob = None
        self.stats: FarmStats = None
    
    def distribute_frames(self, 
                          total_frames: List[int], 
                          nodes: List[RenderNode]) -> Dict[int, List[int]]:
        """Distribute frames across nodes based on GPU power."""
        
        # Weight nodes by GPU count (more GPUs = more frames)
        total_weight = sum(node.gpu_count for node in nodes)
        
        distribution = {}
        frame_index = 0
        
        for node in nodes:
            # Calculate frames for this node
            weight = node.gpu_count / total_weight
            node_frame_count = int(len(total_frames) * weight)
            
            # Ensure at least 1 frame per node
            if node_frame_count == 0 and frame_index < len(total_frames):
                node_frame_count = 1
            
            # Assign frames
            end_index = min(frame_index + node_frame_count, len(total_frames))
            distribution[node.order_id] = total_frames[frame_index:end_index]
            node.frames_assigned = total_frames[frame_index:end_index]
            frame_index = end_index
        
        # Handle remaining frames
        if frame_index < len(total_frames):
            distribution[nodes[0].order_id].extend(total_frames[frame_index:])
            nodes[0].frames_assigned.extend(total_frames[frame_index:])
        
        return distribution
    
    def render_on_node(self, 
                       node: RenderNode, 
                       job: RenderJob, 
                       frames: List[int]) -> Dict:
        """Execute rendering on a single node."""
        
        engine = BlenderRenderEngine(
            node.ssh_host,
            node.ssh_port,
            node.ssh_password
        )
        
        try:
            engine.connect()
            engine.setup_blender()
            
            results = engine.render_frames(job, frames)
            
            # Download completed frames
            node_output = os.path.join(job.output_dir, f"node_{node.order_id}")
            downloaded = engine.get_rendered_frames(node_output)
            
            results["downloaded_files"] = downloaded
            results["node_id"] = node.order_id
            
            node.frames_completed = results["frames_completed"]
            node.status = "completed"
            
            return results
            
        except Exception as e:
            node.status = "failed"
            return {
                "node_id": node.order_id,
                "frames_requested": frames,
                "frames_completed": [],
                "frames_failed": frames,
                "error": str(e)
            }
        finally:
            engine.disconnect()
    
    def render_distributed(self,
                           job: RenderJob,
                           node_count: int = 3,
                           max_price_per_node: float = 0.50,
                           use_spot: bool = True) -> FarmStats:
        """Execute distributed rendering across multiple nodes."""
        
        self.job = job
        start_time = time.time()
        
        # Calculate total frames
        all_frames = list(range(job.start_frame, job.end_frame + 1))
        print(f"📊 Total frames to render: {len(all_frames)}")
        print(f"   Frames: {job.start_frame} - {job.end_frame}")
        
        # Provision render farm
        try:
            nodes = self.client.provision_farm(
                node_count=node_count,
                max_price_per_node=max_price_per_node,
                use_spot=use_spot
            )
        except Exception as e:
            raise Exception(f"Failed to provision render farm: {e}")
        
        if not nodes:
            raise Exception("No render nodes available")
        
        print(f"\n🖥️  Render farm ready:")
        print(f"   Nodes: {len(nodes)}")
        print(f"   Total GPUs: {self.client.get_total_gpus()}")
        print(f"   Hourly cost: ${self.client.get_hourly_cost():.2f}")
        
        # Distribute frames
        distribution = self.distribute_frames(all_frames, nodes)
        
        print(f"\n📦 Frame distribution:")
        for node in nodes:
            print(f"   Node {node.order_id}: {len(node.frames_assigned)} frames ({node.gpu_model} x{node.gpu_count})")
        
        # Create output directory
        os.makedirs(job.output_dir, exist_ok=True)
        
        # Render in parallel across all nodes
        print(f"\n🎬 Starting distributed render...")
        all_results = []
        
        with ThreadPoolExecutor(max_workers=len(nodes)) as executor:
            futures = {
                executor.submit(
                    self.render_on_node,
                    node,
                    job,
                    distribution[node.order_id]
                ): node
                for node in nodes
            }
            
            for future in as_completed(futures):
                node = futures[future]
                try:
                    result = future.result()
                    all_results.append(result)
                    completed = len(result.get("frames_completed", []))
                    failed = len(result.get("frames_failed", []))
                    print(f"   Node {node.order_id}: {completed} completed, {failed} failed")
                except Exception as e:
                    print(f"   Node {node.order_id} error: {e}")
                    all_results.append({
                        "node_id": node.order_id,
                        "frames_completed": [],
                        "frames_failed": distribution[node.order_id],
                        "error": str(e)
                    })
        
        # Collect statistics
        total_time = time.time() - start_time
        completed_frames = sum(len(r.get("frames_completed", [])) for r in all_results)
        failed_frames = sum(len(r.get("frames_failed", [])) for r in all_results)
        
        self.stats = FarmStats(
            total_frames=len(all_frames),
            completed_frames=completed_frames,
            failed_frames=failed_frames,
            total_time_seconds=total_time,
            total_cost_usd=(total_time / 3600) * self.client.get_hourly_cost(),
            avg_frame_time=total_time / completed_frames if completed_frames > 0 else 0,
            nodes_used=len(nodes)
        )
        
        return self.stats
    
    def collect_frames(self) -> List[str]:
        """Collect all rendered frames into single directory."""
        if not self.job:
            raise Exception("No job configured")
        
        final_dir = os.path.join(self.job.output_dir, "final")
        os.makedirs(final_dir, exist_ok=True)
        
        # Move all frames from node directories
        collected = []
        for node in self.client.nodes:
            node_dir = os.path.join(self.job.output_dir, f"node_{node.order_id}")
            if os.path.exists(node_dir):
                for f in os.listdir(node_dir):
                    src = os.path.join(node_dir, f)
                    dst = os.path.join(final_dir, f)
                    os.rename(src, dst)
                    collected.append(dst)
        
        collected.sort()
        return collected
    
    def assemble_video(self, 
                       frames_dir: str,
                       output_file: str,
                       fps: int = 24) -> str:
        """Assemble rendered frames into video using FFmpeg."""
        import subprocess
        
        # Find frame pattern
        frames = sorted(os.listdir(frames_dir))
        if not frames:
            raise Exception("No frames found")
        
        # Build FFmpeg command
        cmd = [
            "ffmpeg", "-y",
            "-framerate", str(fps),
            "-i", os.path.join(frames_dir, "frame_%04d.png"),
            "-c:v", "libx264",
            "-preset", "slow",
            "-crf", "18",
            "-pix_fmt", "yuv420p",
            output_file
        ]
        
        subprocess.run(cmd, check=True)
        return output_file
    
    def cleanup(self):
        """Release all resources."""
        self.client.release_farm()
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.cleanup()
```

## Full Script: Production Render Farm

```python
#!/usr/bin/env python3
"""
Distributed Blender Rendering Farm using Clore.ai GPUs.

Usage:
    python render_farm.py --api-key YOUR_API_KEY --blend scene.blend \
        --start 1 --end 250 --nodes 5 --output ./render_output/
"""

import os
import sys
import time
import argparse
import secrets
import requests
import paramiko
from scp import SCPClient
from typing import List, Dict, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed


@dataclass
class RenderNode:
    server_id: int
    order_id: int
    ssh_host: str
    ssh_port: int
    ssh_password: str
    gpu_model: str
    gpu_count: int
    hourly_cost: float
    frames: List[int] = None


class CloreRenderFarm:
    """Complete distributed rendering solution using Clore.ai GPUs."""
    
    BASE_URL = "https://api.clore.ai"
    BLENDER_IMAGE = "linuxserver/blender:latest"
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {"auth": api_key}
        self.nodes: List[RenderNode] = []
    
    def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
        url = f"{self.BASE_URL}{endpoint}"
        for attempt in range(3):
            response = requests.request(method, url, headers=self.headers, **kwargs)
            data = response.json()
            if data.get("code") == 5:
                time.sleep(2 ** attempt)
                continue
            if data.get("code") != 0:
                raise Exception(f"API Error: {data}")
            return data
        raise Exception("Max retries")
    
    def find_gpus(self, count: int, max_price: float) -> List[Dict]:
        servers = self._api("GET", "/v1/marketplace")["servers"]
        good_gpus = ["RTX 4090", "RTX 4080", "RTX 3090", "RTX 3080", "A100", "A6000"]
        
        candidates = []
        for s in servers:
            if s.get("rented"):
                continue
            gpus = s.get("gpu_array", [])
            if not any(any(g in gpu for g in good_gpus) for gpu in gpus):
                continue
            price = s.get("price", {}).get("usd", {}).get("spot")
            if price and price <= max_price:
                candidates.append({
                    "id": s["id"], "gpus": gpus, "gpu_count": len(gpus),
                    "price": price, "reliability": s.get("reliability", 0)
                })
        
        candidates.sort(key=lambda x: (x["price"] / x["gpu_count"], -x["reliability"]))
        return candidates[:count]
    
    def provision_node(self, gpu: Dict) -> RenderNode:
        password = secrets.token_urlsafe(16)
        
        order_data = {
            "renting_server": gpu["id"],
            "type": "spot",
            "currency": "CLORE-Blockchain",
            "image": self.BLENDER_IMAGE,
            "ports": {"22": "tcp"},
            "env": {"NVIDIA_VISIBLE_DEVICES": "all"},
            "ssh_password": password,
            "spotprice": gpu["price"] * 1.15
        }
        
        result = self._api("POST", "/v1/create_order", json=order_data)
        order_id = result["order_id"]
        
        for _ in range(120):
            orders = self._api("GET", "/v1/my_orders")["orders"]
            order = next((o for o in orders if o["order_id"] == order_id), None)
            if order and order.get("status") == "running":
                conn = order["connection"]["ssh"]
                parts = conn.split()
                host = parts[1].split("@")[1]
                port = int(parts[-1]) if "-p" in conn else 22
                
                return RenderNode(
                    server_id=gpu["id"], order_id=order_id,
                    ssh_host=host, ssh_port=port, ssh_password=password,
                    gpu_model=gpu["gpus"][0], gpu_count=gpu["gpu_count"],
                    hourly_cost=gpu["price"]
                )
            time.sleep(2)
        raise Exception("Timeout")
    
    def setup_farm(self, node_count: int, max_price: float) -> List[RenderNode]:
        print(f"🔍 Finding {node_count} render GPUs...")
        gpus = self.find_gpus(node_count, max_price)
        
        if not gpus:
            raise Exception(f"No GPUs found under ${max_price}/hr")
        
        print(f"🚀 Provisioning {len(gpus)} nodes...")
        
        with ThreadPoolExecutor(max_workers=len(gpus)) as executor:
            futures = {executor.submit(self.provision_node, g): g for g in gpus}
            for future in as_completed(futures):
                try:
                    node = future.result()
                    self.nodes.append(node)
                    print(f"   ✅ {node.gpu_model} x{node.gpu_count} @ ${node.hourly_cost:.2f}/hr")
                except Exception as e:
                    print(f"   ❌ Failed: {e}")
        
        return self.nodes
    
    def distribute_frames(self, frames: List[int]):
        total_gpus = sum(n.gpu_count for n in self.nodes)
        idx = 0
        for node in self.nodes:
            weight = node.gpu_count / total_gpus
            count = max(1, int(len(frames) * weight))
            node.frames = frames[idx:idx + count]
            idx += count
        if idx < len(frames):
            self.nodes[0].frames.extend(frames[idx:])
    
    def render_on_node(self, node: RenderNode, blend_file: str, output_dir: str) -> Dict:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(node.ssh_host, port=node.ssh_port, 
                    username="root", password=node.ssh_password, timeout=30)
        scp = SCPClient(ssh.get_transport())
        
        try:
            # Setup
            ssh.exec_command("mkdir -p /tmp/blend /tmp/output")
            time.sleep(1)
            
            # Upload blend file
            print(f"   [{node.order_id}] Uploading {os.path.basename(blend_file)}...")
            scp.put(blend_file, "/tmp/blend/")
            blend_name = os.path.basename(blend_file)
            
            completed = []
            failed = []
            
            for frame in node.frames:
                print(f"   [{node.order_id}] Rendering frame {frame}...")
                cmd = f"blender -b /tmp/blend/{blend_name} -E CYCLES " \
                      f"-o /tmp/output/frame_#### -F PNG -x 1 -f {frame} " \
                      f"-- --cycles-device GPU 2>&1"
                
                stdin, stdout, stderr = ssh.exec_command(cmd, timeout=3600)
                stdout.channel.recv_exit_status()
                
                # Check result
                stdin, stdout, stderr = ssh.exec_command(f"ls /tmp/output/frame_{frame:04d}.png 2>/dev/null")
                if stdout.read().strip():
                    completed.append(frame)
                else:
                    failed.append(frame)
            
            # Download frames
            node_dir = os.path.join(output_dir, f"node_{node.order_id}")
            os.makedirs(node_dir, exist_ok=True)
            
            for frame in completed:
                try:
                    scp.get(f"/tmp/output/frame_{frame:04d}.png", node_dir)
                except:
                    pass
            
            return {"node": node.order_id, "completed": completed, "failed": failed}
            
        finally:
            scp.close()
            ssh.close()
    
    def render(self, blend_file: str, start: int, end: int, output_dir: str) -> Dict:
        frames = list(range(start, end + 1))
        self.distribute_frames(frames)
        
        print(f"\n📦 Distribution:")
        for node in self.nodes:
            print(f"   Node {node.order_id}: {len(node.frames)} frames")
        
        os.makedirs(output_dir, exist_ok=True)
        
        print(f"\n🎬 Rendering {len(frames)} frames...")
        start_time = time.time()
        results = []
        
        with ThreadPoolExecutor(max_workers=len(self.nodes)) as executor:
            futures = {
                executor.submit(self.render_on_node, node, blend_file, output_dir): node
                for node in self.nodes
            }
            for future in as_completed(futures):
                try:
                    result = future.result()
                    results.append(result)
                    print(f"   Node {result['node']}: {len(result['completed'])} done, {len(result['failed'])} failed")
                except Exception as e:
                    print(f"   Error: {e}")
        
        elapsed = time.time() - start_time
        completed = sum(len(r["completed"]) for r in results)
        cost = (elapsed / 3600) * sum(n.hourly_cost for n in self.nodes)
        
        return {
            "total_frames": len(frames),
            "completed": completed,
            "failed": len(frames) - completed,
            "time_seconds": elapsed,
            "cost_usd": cost
        }
    
    def collect_frames(self, output_dir: str) -> str:
        final_dir = os.path.join(output_dir, "final")
        os.makedirs(final_dir, exist_ok=True)
        
        for node in self.nodes:
            node_dir = os.path.join(output_dir, f"node_{node.order_id}")
            if os.path.exists(node_dir):
                for f in os.listdir(node_dir):
                    src = os.path.join(node_dir, f)
                    dst = os.path.join(final_dir, f)
                    if not os.path.exists(dst):
                        os.rename(src, dst)
        
        return final_dir
    
    def make_video(self, frames_dir: str, output: str, fps: int = 24):
        import subprocess
        cmd = ["ffmpeg", "-y", "-framerate", str(fps),
               "-i", os.path.join(frames_dir, "frame_%04d.png"),
               "-c:v", "libx264", "-preset", "slow", "-crf", "18",
               "-pix_fmt", "yuv420p", output]
        subprocess.run(cmd, check=True)
    
    def cleanup(self):
        print("🧹 Releasing farm...")
        for node in self.nodes:
            try:
                self._api("POST", "/v1/cancel_order", json={"id": node.order_id})
            except:
                pass
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.cleanup()


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--api-key", required=True)
    parser.add_argument("--blend", required=True, help="Blender file path")
    parser.add_argument("--start", type=int, default=1)
    parser.add_argument("--end", type=int, required=True)
    parser.add_argument("--output", default="./render_output")
    parser.add_argument("--nodes", type=int, default=3)
    parser.add_argument("--max-price", type=float, default=0.50)
    parser.add_argument("--video", action="store_true", help="Create video from frames")
    parser.add_argument("--fps", type=int, default=24)
    args = parser.parse_args()
    
    with CloreRenderFarm(args.api_key) as farm:
        farm.setup_farm(args.nodes, args.max_price)
        
        print(f"\n🖥️  Farm ready: {len(farm.nodes)} nodes, "
              f"{sum(n.gpu_count for n in farm.nodes)} GPUs, "
              f"${sum(n.hourly_cost for n in farm.nodes):.2f}/hr")
        
        stats = farm.render(args.blend, args.start, args.end, args.output)
        
        print(f"\n{'='*60}")
        print(f"📊 RENDER COMPLETE")
        print(f"   Frames: {stats['completed']}/{stats['total_frames']}")
        print(f"   Time: {stats['time_seconds']:.1f}s ({stats['time_seconds']/60:.1f} min)")
        print(f"   Cost: ${stats['cost_usd']:.4f}")
        print(f"   Avg per frame: {stats['time_seconds']/stats['completed']:.2f}s" if stats['completed'] > 0 else "")
        
        # Collect frames
        final_dir = farm.collect_frames(args.output)
        print(f"   Output: {final_dir}")
        
        # Create video if requested
        if args.video:
            video_path = os.path.join(args.output, "render.mp4")
            print(f"\n🎬 Creating video...")
            farm.make_video(final_dir, video_path, args.fps)
            print(f"   Video: {video_path}")


if __name__ == "__main__":
    main()
```

## Cost Comparison

| Setup                      | 250 Frames (1080p) | Time     | Cost            |
| -------------------------- | ------------------ | -------- | --------------- |
| **Clore.ai (3x RTX 4090)** | 250 frames         | \~20 min | **$0.50**       |
| **Clore.ai (5x RTX 3090)** | 250 frames         | \~25 min | **$0.40**       |
| Single RTX 4090 (local)    | 250 frames         | \~60 min | \~$0.15 (power) |
| AWS p4d.24xlarge (8x A100) | 250 frames         | \~15 min | $8.00           |
| Render farm (commercial)   | 250 frames         | \~30 min | $10-50          |

## Performance Tips

1. **Use OptiX** for RTX cards (faster than CUDA for ray tracing)
2. **Match GPU to scene complexity** — simple scenes don't need A100
3. **Distribute evenly** by GPU power, not node count
4. **Use spot instances** for batch rendering (50%+ savings)
5. **Pre-bake** lighting and physics before distributed render
6. **Use persistent data** to avoid re-uploading textures

## Next Steps

* [GPU ETL with RAPIDS](https://docs.clore.ai/dev/data-processing-and-pipelines/gpu-etl)
* [Batch Image Processing](https://docs.clore.ai/dev/data-processing-and-pipelines/image-processing)
* [Video Transcoding](https://docs.clore.ai/dev/data-processing-and-pipelines/video-transcoding)
