Copy #!/usr/bin/env python3
"""
Distributed Blender Rendering Farm using Clore.ai GPUs.
Usage:
python render_farm.py --api-key YOUR_API_KEY --blend scene.blend \
--start 1 --end 250 --nodes 5 --output ./render_output/
"""
import os
import sys
import time
import argparse
import secrets
import requests
import paramiko
from scp import SCPClient
from typing import List, Dict, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
@dataclass
class RenderNode:
server_id: int
order_id: int
ssh_host: str
ssh_port: int
ssh_password: str
gpu_model: str
gpu_count: int
hourly_cost: float
frames: List[int] = None
class CloreRenderFarm:
"""Complete distributed rendering solution using Clore.ai GPUs."""
BASE_URL = "https://api.clore.ai"
BLENDER_IMAGE = "linuxserver/blender:latest"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {"auth": api_key}
self.nodes: List[RenderNode] = []
def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
url = f"{self.BASE_URL}{endpoint}"
for attempt in range(3):
response = requests.request(method, url, headers=self.headers, **kwargs)
data = response.json()
if data.get("code") == 5:
time.sleep(2 ** attempt)
continue
if data.get("code") != 0:
raise Exception(f"API Error: {data}")
return data
raise Exception("Max retries")
def find_gpus(self, count: int, max_price: float) -> List[Dict]:
servers = self._api("GET", "/v1/marketplace")["servers"]
good_gpus = ["RTX 4090", "RTX 4080", "RTX 3090", "RTX 3080", "A100", "A6000"]
candidates = []
for s in servers:
if s.get("rented"):
continue
gpus = s.get("gpu_array", [])
if not any(any(g in gpu for g in good_gpus) for gpu in gpus):
continue
price = s.get("price", {}).get("usd", {}).get("spot")
if price and price <= max_price:
candidates.append({
"id": s["id"], "gpus": gpus, "gpu_count": len(gpus),
"price": price, "reliability": s.get("reliability", 0)
})
candidates.sort(key=lambda x: (x["price"] / x["gpu_count"], -x["reliability"]))
return candidates[:count]
def provision_node(self, gpu: Dict) -> RenderNode:
password = secrets.token_urlsafe(16)
order_data = {
"renting_server": gpu["id"],
"type": "spot",
"currency": "CLORE-Blockchain",
"image": self.BLENDER_IMAGE,
"ports": {"22": "tcp"},
"env": {"NVIDIA_VISIBLE_DEVICES": "all"},
"ssh_password": password,
"spotprice": gpu["price"] * 1.15
}
result = self._api("POST", "/v1/create_order", json=order_data)
order_id = result["order_id"]
for _ in range(120):
orders = self._api("GET", "/v1/my_orders")["orders"]
order = next((o for o in orders if o["order_id"] == order_id), None)
if order and order.get("status") == "running":
conn = order["connection"]["ssh"]
parts = conn.split()
host = parts[1].split("@")[1]
port = int(parts[-1]) if "-p" in conn else 22
return RenderNode(
server_id=gpu["id"], order_id=order_id,
ssh_host=host, ssh_port=port, ssh_password=password,
gpu_model=gpu["gpus"][0], gpu_count=gpu["gpu_count"],
hourly_cost=gpu["price"]
)
time.sleep(2)
raise Exception("Timeout")
def setup_farm(self, node_count: int, max_price: float) -> List[RenderNode]:
print(f"🔍 Finding {node_count} render GPUs...")
gpus = self.find_gpus(node_count, max_price)
if not gpus:
raise Exception(f"No GPUs found under ${max_price}/hr")
print(f"🚀 Provisioning {len(gpus)} nodes...")
with ThreadPoolExecutor(max_workers=len(gpus)) as executor:
futures = {executor.submit(self.provision_node, g): g for g in gpus}
for future in as_completed(futures):
try:
node = future.result()
self.nodes.append(node)
print(f" ✅ {node.gpu_model} x{node.gpu_count} @ ${node.hourly_cost:.2f}/hr")
except Exception as e:
print(f" ❌ Failed: {e}")
return self.nodes
def distribute_frames(self, frames: List[int]):
total_gpus = sum(n.gpu_count for n in self.nodes)
idx = 0
for node in self.nodes:
weight = node.gpu_count / total_gpus
count = max(1, int(len(frames) * weight))
node.frames = frames[idx:idx + count]
idx += count
if idx < len(frames):
self.nodes[0].frames.extend(frames[idx:])
def render_on_node(self, node: RenderNode, blend_file: str, output_dir: str) -> Dict:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(node.ssh_host, port=node.ssh_port,
username="root", password=node.ssh_password, timeout=30)
scp = SCPClient(ssh.get_transport())
try:
# Setup
ssh.exec_command("mkdir -p /tmp/blend /tmp/output")
time.sleep(1)
# Upload blend file
print(f" [{node.order_id}] Uploading {os.path.basename(blend_file)}...")
scp.put(blend_file, "/tmp/blend/")
blend_name = os.path.basename(blend_file)
completed = []
failed = []
for frame in node.frames:
print(f" [{node.order_id}] Rendering frame {frame}...")
cmd = f"blender -b /tmp/blend/{blend_name} -E CYCLES " \
f"-o /tmp/output/frame_#### -F PNG -x 1 -f {frame} " \
f"-- --cycles-device GPU 2>&1"
stdin, stdout, stderr = ssh.exec_command(cmd, timeout=3600)
stdout.channel.recv_exit_status()
# Check result
stdin, stdout, stderr = ssh.exec_command(f"ls /tmp/output/frame_{frame:04d}.png 2>/dev/null")
if stdout.read().strip():
completed.append(frame)
else:
failed.append(frame)
# Download frames
node_dir = os.path.join(output_dir, f"node_{node.order_id}")
os.makedirs(node_dir, exist_ok=True)
for frame in completed:
try:
scp.get(f"/tmp/output/frame_{frame:04d}.png", node_dir)
except:
pass
return {"node": node.order_id, "completed": completed, "failed": failed}
finally:
scp.close()
ssh.close()
def render(self, blend_file: str, start: int, end: int, output_dir: str) -> Dict:
frames = list(range(start, end + 1))
self.distribute_frames(frames)
print(f"\n📦 Distribution:")
for node in self.nodes:
print(f" Node {node.order_id}: {len(node.frames)} frames")
os.makedirs(output_dir, exist_ok=True)
print(f"\n🎬 Rendering {len(frames)} frames...")
start_time = time.time()
results = []
with ThreadPoolExecutor(max_workers=len(self.nodes)) as executor:
futures = {
executor.submit(self.render_on_node, node, blend_file, output_dir): node
for node in self.nodes
}
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
print(f" Node {result['node']}: {len(result['completed'])} done, {len(result['failed'])} failed")
except Exception as e:
print(f" Error: {e}")
elapsed = time.time() - start_time
completed = sum(len(r["completed"]) for r in results)
cost = (elapsed / 3600) * sum(n.hourly_cost for n in self.nodes)
return {
"total_frames": len(frames),
"completed": completed,
"failed": len(frames) - completed,
"time_seconds": elapsed,
"cost_usd": cost
}
def collect_frames(self, output_dir: str) -> str:
final_dir = os.path.join(output_dir, "final")
os.makedirs(final_dir, exist_ok=True)
for node in self.nodes:
node_dir = os.path.join(output_dir, f"node_{node.order_id}")
if os.path.exists(node_dir):
for f in os.listdir(node_dir):
src = os.path.join(node_dir, f)
dst = os.path.join(final_dir, f)
if not os.path.exists(dst):
os.rename(src, dst)
return final_dir
def make_video(self, frames_dir: str, output: str, fps: int = 24):
import subprocess
cmd = ["ffmpeg", "-y", "-framerate", str(fps),
"-i", os.path.join(frames_dir, "frame_%04d.png"),
"-c:v", "libx264", "-preset", "slow", "-crf", "18",
"-pix_fmt", "yuv420p", output]
subprocess.run(cmd, check=True)
def cleanup(self):
print("🧹 Releasing farm...")
for node in self.nodes:
try:
self._api("POST", "/v1/cancel_order", json={"id": node.order_id})
except:
pass
def __enter__(self):
return self
def __exit__(self, *args):
self.cleanup()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--api-key", required=True)
parser.add_argument("--blend", required=True, help="Blender file path")
parser.add_argument("--start", type=int, default=1)
parser.add_argument("--end", type=int, required=True)
parser.add_argument("--output", default="./render_output")
parser.add_argument("--nodes", type=int, default=3)
parser.add_argument("--max-price", type=float, default=0.50)
parser.add_argument("--video", action="store_true", help="Create video from frames")
parser.add_argument("--fps", type=int, default=24)
args = parser.parse_args()
with CloreRenderFarm(args.api_key) as farm:
farm.setup_farm(args.nodes, args.max_price)
print(f"\n🖥️ Farm ready: {len(farm.nodes)} nodes, "
f"{sum(n.gpu_count for n in farm.nodes)} GPUs, "
f"${sum(n.hourly_cost for n in farm.nodes):.2f}/hr")
stats = farm.render(args.blend, args.start, args.end, args.output)
print(f"\n{'='*60}")
print(f"📊 RENDER COMPLETE")
print(f" Frames: {stats['completed']}/{stats['total_frames']}")
print(f" Time: {stats['time_seconds']:.1f}s ({stats['time_seconds']/60:.1f} min)")
print(f" Cost: ${stats['cost_usd']:.4f}")
print(f" Avg per frame: {stats['time_seconds']/stats['completed']:.2f}s" if stats['completed'] > 0 else "")
# Collect frames
final_dir = farm.collect_frames(args.output)
print(f" Output: {final_dir}")
# Create video if requested
if args.video:
video_path = os.path.join(args.output, "render.mp4")
print(f"\n🎬 Creating video...")
farm.make_video(final_dir, video_path, args.fps)
print(f" Video: {video_path}")
if __name__ == "__main__":
main()