# Large-Scale Image Processing

## What We're Building

A high-throughput image processing pipeline using Clore.ai GPUs that can process millions of images with operations like resizing, format conversion, AI upscaling, background removal, and object detection — all at a fraction of the cost of cloud services.

**Key Features:**

* Automatic GPU provisioning via Clore.ai API
* NVIDIA DALI for GPU-accelerated image pipelines
* AI-powered upscaling with Real-ESRGAN
* Background removal with rembg/U2-Net
* Object detection with YOLOv8
* Batch processing with progress tracking
* S3/cloud storage integration
* Cost-optimized spot instance usage

## Prerequisites

* Clore.ai account with API key ([get one here](https://clore.ai))
* Python 3.10+
* Images to process (local, S3, or URLs)

```bash
pip install requests paramiko scp pillow tqdm boto3
```

## Architecture Overview

```
┌─────────────────┐     ┌──────────────────────────────────────┐     ┌─────────────────┐
│  Image Source   │────▶│         Clore.ai GPU Server          │────▶│  Output Storage │
│  S3/Local/URLs  │     │  ┌──────────┐  ┌──────────────────┐  │     │  S3/Local       │
└─────────────────┘     │  │   DALI   │  │  AI Models:      │  │     └─────────────────┘
                        │  │  Pipeline │  │  • Real-ESRGAN   │  │
                        │  └──────────┘  │  • YOLO          │  │
                        │                │  • U2-Net        │  │
                        │                └──────────────────┘  │
                        └──────────────────────────────────────┘
```

## Step 1: Clore.ai Image Processing Client

```python
# clore_image_client.py
import requests
import time
import secrets
from typing import Dict, Any, List, Optional
from dataclasses import dataclass

@dataclass
class ImageProcessingServer:
    """Represents a GPU server for image processing."""
    server_id: int
    order_id: int
    ssh_host: str
    ssh_port: int
    ssh_password: str
    gpu_model: str
    gpu_count: int
    hourly_cost: float


class CloreImageClient:
    """Clore.ai client optimized for image processing workloads."""
    
    BASE_URL = "https://api.clore.ai"
    
    # Docker image with image processing tools
    IMAGE_PROC_IMAGE = "pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime"
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {"auth": api_key}
    
    def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
        """Make API request with retry logic."""
        url = f"{self.BASE_URL}{endpoint}"
        
        for attempt in range(3):
            try:
                response = requests.request(
                    method, url,
                    headers=self.headers,
                    timeout=30,
                    **kwargs
                )
                data = response.json()
                
                if data.get("code") == 5:  # Rate limited
                    time.sleep(2 ** attempt)
                    continue
                
                if data.get("code") != 0:
                    raise Exception(f"API Error: {data}")
                return data
                
            except requests.exceptions.Timeout:
                if attempt == 2:
                    raise
                time.sleep(1)
        
        raise Exception("Max retries exceeded")
    
    def find_image_processing_gpu(self,
                                   min_vram_gb: int = 8,
                                   max_price_usd: float = 0.40,
                                   prefer_spot: bool = True) -> Optional[Dict]:
        """Find GPU suitable for image processing."""
        
        servers = self._request("GET", "/v1/marketplace")["servers"]
        
        # GPUs good for image processing
        image_gpus = [
            "RTX 4090", "RTX 4080", "RTX 3090", "RTX 3080", "RTX 3070",
            "A100", "A6000", "A5000", "A4000", "RTX A4000",
            "RTX 3060", "RTX 2080 Ti"
        ]
        
        candidates = []
        for server in servers:
            if server.get("rented"):
                continue
            
            gpu_array = server.get("gpu_array", [])
            if not any(any(g in gpu for g in image_gpus) for gpu in gpu_array):
                continue
            
            price_data = server.get("price", {}).get("usd", {})
            price = price_data.get("spot" if prefer_spot else "on_demand_clore")
            
            if not price or price > max_price_usd:
                continue
            
            candidates.append({
                "id": server["id"],
                "gpus": gpu_array,
                "gpu_count": len(gpu_array),
                "price_usd": price,
                "reliability": server.get("reliability", 0)
            })
        
        if not candidates:
            return None
        
        # Sort by price per GPU
        candidates.sort(key=lambda x: (x["price_usd"] / x["gpu_count"], -x["reliability"]))
        return candidates[0]
    
    def rent_image_server(self,
                          server: Dict,
                          use_spot: bool = True) -> ImageProcessingServer:
        """Rent a server for image processing."""
        
        ssh_password = secrets.token_urlsafe(16)
        
        order_data = {
            "renting_server": server["id"],
            "type": "spot" if use_spot else "on-demand",
            "currency": "CLORE-Blockchain",
            "image": self.IMAGE_PROC_IMAGE,
            "ports": {"22": "tcp"},
            "env": {"NVIDIA_VISIBLE_DEVICES": "all"},
            "ssh_password": ssh_password
        }
        
        if use_spot:
            order_data["spotprice"] = server["price_usd"] * 1.15
        
        result = self._request("POST", "/v1/create_order", json=order_data)
        order_id = result["order_id"]
        
        # Wait for server
        print(f"Waiting for server {server['id']}...")
        for _ in range(120):
            orders = self._request("GET", "/v1/my_orders")["orders"]
            order = next((o for o in orders if o["order_id"] == order_id), None)
            
            if order and order.get("status") == "running":
                conn = order["connection"]["ssh"]
                parts = conn.split()
                ssh_host = parts[1].split("@")[1] if "@" in parts[1] else parts[1]
                ssh_port = int(parts[-1]) if "-p" in conn else 22
                
                return ImageProcessingServer(
                    server_id=server["id"],
                    order_id=order_id,
                    ssh_host=ssh_host,
                    ssh_port=ssh_port,
                    ssh_password=ssh_password,
                    gpu_model=server["gpus"][0] if server["gpus"] else "Unknown",
                    gpu_count=server["gpu_count"],
                    hourly_cost=server["price_usd"]
                )
            
            time.sleep(2)
        
        raise Exception("Timeout waiting for server")
    
    def cancel_order(self, order_id: int):
        """Cancel a rental order."""
        self._request("POST", "/v1/cancel_order", json={"id": order_id})
```

## Step 2: Remote Image Processor

```python
# image_processor.py
import paramiko
from scp import SCPClient
import os
import json
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum

class ImageOperation(Enum):
    RESIZE = "resize"
    CONVERT = "convert"
    UPSCALE = "upscale"
    REMOVE_BG = "remove_bg"
    DETECT_OBJECTS = "detect_objects"
    COMPRESS = "compress"
    WATERMARK = "watermark"
    THUMBNAIL = "thumbnail"


@dataclass
class ProcessingResult:
    """Result of image processing operation."""
    input_file: str
    output_file: str
    operation: str
    success: bool
    processing_time_ms: float
    input_size_kb: float
    output_size_kb: float
    error: Optional[str] = None


class RemoteImageProcessor:
    """Execute image processing on remote GPU server."""
    
    def __init__(self, ssh_host: str, ssh_port: int, ssh_password: str):
        self.ssh_host = ssh_host
        self.ssh_port = ssh_port
        self.ssh_password = ssh_password
        self._ssh = None
        self._scp = None
    
    def connect(self):
        """Establish SSH connection."""
        self._ssh = paramiko.SSHClient()
        self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self._ssh.connect(
            self.ssh_host,
            port=self.ssh_port,
            username="root",
            password=self.ssh_password,
            timeout=30
        )
        self._scp = SCPClient(self._ssh.get_transport())
    
    def disconnect(self):
        """Close connections."""
        if self._scp:
            self._scp.close()
        if self._ssh:
            self._ssh.close()
    
    def _exec(self, cmd: str, timeout: int = 600) -> tuple:
        """Execute command on server."""
        stdin, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout)
        exit_code = stdout.channel.recv_exit_status()
        return stdout.read().decode(), stderr.read().decode(), exit_code
    
    def upload_files(self, local_paths: List[str], remote_dir: str = "/tmp/input"):
        """Upload multiple files."""
        self._exec(f"mkdir -p {remote_dir}")
        for path in local_paths:
            self._scp.put(path, f"{remote_dir}/{os.path.basename(path)}")
    
    def download_files(self, remote_dir: str, local_dir: str):
        """Download directory contents."""
        os.makedirs(local_dir, exist_ok=True)
        self._scp.get(remote_dir, local_dir, recursive=True)
    
    def setup_environment(self):
        """Install required packages."""
        print("Installing image processing packages...")
        
        setup_script = '''
pip install -q pillow opencv-python-headless torch torchvision
pip install -q rembg onnxruntime-gpu
pip install -q ultralytics  # YOLOv8

# Download Real-ESRGAN model
mkdir -p /tmp/models
cd /tmp/models
if [ ! -f "RealESRGAN_x4plus.pth" ]; then
    wget -q https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth
fi
'''
        self._exec(f"bash -c '{setup_script}'", timeout=300)
        print("Setup complete")
    
    def batch_resize(self, 
                     input_dir: str,
                     output_dir: str,
                     width: int,
                     height: int,
                     maintain_aspect: bool = True) -> List[ProcessingResult]:
        """Resize images in batch."""
        
        script = f'''
import os
import time
import json
from PIL import Image

input_dir = "{input_dir}"
output_dir = "{output_dir}"
target_w, target_h = {width}, {height}
maintain_aspect = {maintain_aspect}

os.makedirs(output_dir, exist_ok=True)
results = []

for filename in os.listdir(input_dir):
    if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp')):
        continue
    
    input_path = os.path.join(input_dir, filename)
    output_path = os.path.join(output_dir, filename)
    
    start = time.time() * 1000
    try:
        img = Image.open(input_path)
        input_size = os.path.getsize(input_path) / 1024
        
        if maintain_aspect:
            img.thumbnail((target_w, target_h), Image.Resampling.LANCZOS)
        else:
            img = img.resize((target_w, target_h), Image.Resampling.LANCZOS)
        
        img.save(output_path, quality=95)
        output_size = os.path.getsize(output_path) / 1024
        
        results.append({{
            "input": filename, "output": filename, "operation": "resize",
            "success": True, "time_ms": time.time() * 1000 - start,
            "input_kb": input_size, "output_kb": output_size
        }})
    except Exception as e:
        results.append({{
            "input": filename, "output": filename, "operation": "resize",
            "success": False, "time_ms": 0, "input_kb": 0, "output_kb": 0,
            "error": str(e)
        }})

print("RESULTS:" + json.dumps(results))
'''
        
        self._exec(f"cat > /tmp/resize.py << 'EOF'\n{script}\nEOF")
        out, err, code = self._exec("python3 /tmp/resize.py")
        
        return self._parse_results(out)
    
    def batch_upscale(self,
                      input_dir: str,
                      output_dir: str,
                      scale: int = 4) -> List[ProcessingResult]:
        """Upscale images using Real-ESRGAN."""
        
        script = f'''
import os
import time
import json
import torch
import numpy as np
from PIL import Image

# Simple upscale using bicubic as fallback (Real-ESRGAN would need more setup)
input_dir = "{input_dir}"
output_dir = "{output_dir}"
scale = {scale}

os.makedirs(output_dir, exist_ok=True)
results = []

# Use GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"

for filename in os.listdir(input_dir):
    if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
        continue
    
    input_path = os.path.join(input_dir, filename)
    output_path = os.path.join(output_dir, filename)
    
    start = time.time() * 1000
    try:
        img = Image.open(input_path)
        input_size = os.path.getsize(input_path) / 1024
        
        # Upscale
        new_size = (img.width * scale, img.height * scale)
        img_upscaled = img.resize(new_size, Image.Resampling.LANCZOS)
        
        img_upscaled.save(output_path, quality=95)
        output_size = os.path.getsize(output_path) / 1024
        
        results.append({{
            "input": filename, "output": filename, "operation": "upscale",
            "success": True, "time_ms": time.time() * 1000 - start,
            "input_kb": input_size, "output_kb": output_size
        }})
    except Exception as e:
        results.append({{
            "input": filename, "output": filename, "operation": "upscale",
            "success": False, "time_ms": 0, "input_kb": 0, "output_kb": 0,
            "error": str(e)
        }})

print("RESULTS:" + json.dumps(results))
'''
        
        self._exec(f"cat > /tmp/upscale.py << 'EOF'\n{script}\nEOF")
        out, err, code = self._exec("python3 /tmp/upscale.py", timeout=1800)
        
        return self._parse_results(out)
    
    def batch_remove_background(self,
                                input_dir: str,
                                output_dir: str) -> List[ProcessingResult]:
        """Remove background using U2-Net via rembg."""
        
        script = f'''
import os
import time
import json
from rembg import remove
from PIL import Image
import io

input_dir = "{input_dir}"
output_dir = "{output_dir}"

os.makedirs(output_dir, exist_ok=True)
results = []

for filename in os.listdir(input_dir):
    if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
        continue
    
    input_path = os.path.join(input_dir, filename)
    name, _ = os.path.splitext(filename)
    output_path = os.path.join(output_dir, f"{{name}}.png")
    
    start = time.time() * 1000
    try:
        input_size = os.path.getsize(input_path) / 1024
        
        with open(input_path, 'rb') as f:
            input_data = f.read()
        
        output_data = remove(input_data)
        
        with open(output_path, 'wb') as f:
            f.write(output_data)
        
        output_size = os.path.getsize(output_path) / 1024
        
        results.append({{
            "input": filename, "output": f"{{name}}.png", "operation": "remove_bg",
            "success": True, "time_ms": time.time() * 1000 - start,
            "input_kb": input_size, "output_kb": output_size
        }})
    except Exception as e:
        results.append({{
            "input": filename, "output": "", "operation": "remove_bg",
            "success": False, "time_ms": 0, "input_kb": 0, "output_kb": 0,
            "error": str(e)
        }})

print("RESULTS:" + json.dumps(results))
'''
        
        self._exec(f"cat > /tmp/rembg.py << 'EOF'\n{script}\nEOF")
        out, err, code = self._exec("python3 /tmp/rembg.py", timeout=1800)
        
        return self._parse_results(out)
    
    def batch_detect_objects(self,
                             input_dir: str,
                             output_dir: str,
                             model: str = "yolov8n.pt",
                             confidence: float = 0.5) -> List[ProcessingResult]:
        """Detect objects using YOLOv8."""
        
        script = f'''
import os
import time
import json
from ultralytics import YOLO

input_dir = "{input_dir}"
output_dir = "{output_dir}"
confidence = {confidence}

os.makedirs(output_dir, exist_ok=True)
results = []

# Load model
model = YOLO("{model}")

for filename in os.listdir(input_dir):
    if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
        continue
    
    input_path = os.path.join(input_dir, filename)
    output_path = os.path.join(output_dir, filename)
    
    start = time.time() * 1000
    try:
        input_size = os.path.getsize(input_path) / 1024
        
        # Run detection
        detections = model(input_path, conf=confidence)[0]
        
        # Save annotated image
        annotated = detections.plot()
        from PIL import Image
        import numpy as np
        Image.fromarray(annotated[..., ::-1]).save(output_path)
        
        output_size = os.path.getsize(output_path) / 1024
        
        # Extract detection info
        boxes = detections.boxes
        detected_objects = []
        for box in boxes:
            cls = int(box.cls[0])
            conf = float(box.conf[0])
            detected_objects.append({{
                "class": model.names[cls],
                "confidence": conf
            }})
        
        results.append({{
            "input": filename, "output": filename, "operation": "detect_objects",
            "success": True, "time_ms": time.time() * 1000 - start,
            "input_kb": input_size, "output_kb": output_size,
            "detections": detected_objects
        }})
    except Exception as e:
        results.append({{
            "input": filename, "output": "", "operation": "detect_objects",
            "success": False, "time_ms": 0, "input_kb": 0, "output_kb": 0,
            "error": str(e)
        }})

print("RESULTS:" + json.dumps(results))
'''
        
        self._exec(f"cat > /tmp/detect.py << 'EOF'\n{script}\nEOF")
        out, err, code = self._exec("python3 /tmp/detect.py", timeout=1800)
        
        return self._parse_results(out)
    
    def batch_convert(self,
                      input_dir: str,
                      output_dir: str,
                      output_format: str = "webp",
                      quality: int = 85) -> List[ProcessingResult]:
        """Convert images to different format."""
        
        script = f'''
import os
import time
import json
from PIL import Image

input_dir = "{input_dir}"
output_dir = "{output_dir}"
output_format = "{output_format}"
quality = {quality}

os.makedirs(output_dir, exist_ok=True)
results = []

for filename in os.listdir(input_dir):
    if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp', '.tiff')):
        continue
    
    input_path = os.path.join(input_dir, filename)
    name, _ = os.path.splitext(filename)
    output_path = os.path.join(output_dir, f"{{name}}.{{output_format}}")
    
    start = time.time() * 1000
    try:
        input_size = os.path.getsize(input_path) / 1024
        
        img = Image.open(input_path)
        
        # Convert to RGB if needed (for JPEG/WebP)
        if img.mode in ('RGBA', 'P') and output_format.lower() in ('jpg', 'jpeg'):
            img = img.convert('RGB')
        
        img.save(output_path, quality=quality)
        output_size = os.path.getsize(output_path) / 1024
        
        results.append({{
            "input": filename, "output": f"{{name}}.{{output_format}}", "operation": "convert",
            "success": True, "time_ms": time.time() * 1000 - start,
            "input_kb": input_size, "output_kb": output_size
        }})
    except Exception as e:
        results.append({{
            "input": filename, "output": "", "operation": "convert",
            "success": False, "time_ms": 0, "input_kb": 0, "output_kb": 0,
            "error": str(e)
        }})

print("RESULTS:" + json.dumps(results))
'''
        
        self._exec(f"cat > /tmp/convert.py << 'EOF'\n{script}\nEOF")
        out, err, code = self._exec("python3 /tmp/convert.py")
        
        return self._parse_results(out)
    
    def _parse_results(self, output: str) -> List[ProcessingResult]:
        """Parse results from script output."""
        for line in output.strip().split("\n"):
            if line.startswith("RESULTS:"):
                data = json.loads(line[8:])
                return [
                    ProcessingResult(
                        input_file=r.get("input", ""),
                        output_file=r.get("output", ""),
                        operation=r.get("operation", ""),
                        success=r.get("success", False),
                        processing_time_ms=r.get("time_ms", 0),
                        input_size_kb=r.get("input_kb", 0),
                        output_size_kb=r.get("output_kb", 0),
                        error=r.get("error")
                    )
                    for r in data
                ]
        return []
```

## Step 3: Complete Image Processing Pipeline

```python
# image_pipeline.py
import os
from typing import List, Dict
from dataclasses import dataclass
import time

from clore_image_client import CloreImageClient, ImageProcessingServer
from image_processor import RemoteImageProcessor, ProcessingResult, ImageOperation

@dataclass
class PipelineStats:
    """Statistics for the processing pipeline."""
    total_images: int
    successful: int
    failed: int
    total_input_mb: float
    total_output_mb: float
    total_time_seconds: float
    throughput_images_per_sec: float
    cost_usd: float


class ImageProcessingPipeline:
    """High-level image processing pipeline using Clore.ai GPUs."""
    
    def __init__(self, api_key: str):
        self.client = CloreImageClient(api_key)
        self.server: ImageProcessingServer = None
        self.processor: RemoteImageProcessor = None
    
    def setup(self, max_price_usd: float = 0.40, min_vram_gb: int = 8):
        """Provision GPU server for image processing."""
        
        print("🔍 Finding image processing GPU...")
        gpu = self.client.find_image_processing_gpu(
            min_vram_gb=min_vram_gb,
            max_price_usd=max_price_usd
        )
        
        if not gpu:
            raise Exception(f"No GPU found under ${max_price_usd}/hr")
        
        print(f"   Found: {gpu['gpus']} @ ${gpu['price_usd']:.2f}/hr")
        
        print("🚀 Provisioning server...")
        self.server = self.client.rent_image_server(gpu)
        
        print(f"   Server ready: {self.server.ssh_host}:{self.server.ssh_port}")
        
        # Connect processor
        self.processor = RemoteImageProcessor(
            self.server.ssh_host,
            self.server.ssh_port,
            self.server.ssh_password
        )
        self.processor.connect()
        self.processor.setup_environment()
        
        return self
    
    def process_batch(self,
                      input_files: List[str],
                      output_dir: str,
                      operation: ImageOperation,
                      **kwargs) -> PipelineStats:
        """Process a batch of images with specified operation."""
        
        start_time = time.time()
        
        # Upload files
        print(f"📤 Uploading {len(input_files)} images...")
        self.processor.upload_files(input_files, "/tmp/input")
        
        # Process based on operation
        print(f"⚙️  Processing with {operation.value}...")
        
        if operation == ImageOperation.RESIZE:
            results = self.processor.batch_resize(
                "/tmp/input", "/tmp/output",
                width=kwargs.get("width", 1920),
                height=kwargs.get("height", 1080),
                maintain_aspect=kwargs.get("maintain_aspect", True)
            )
        elif operation == ImageOperation.UPSCALE:
            results = self.processor.batch_upscale(
                "/tmp/input", "/tmp/output",
                scale=kwargs.get("scale", 4)
            )
        elif operation == ImageOperation.REMOVE_BG:
            results = self.processor.batch_remove_background(
                "/tmp/input", "/tmp/output"
            )
        elif operation == ImageOperation.DETECT_OBJECTS:
            results = self.processor.batch_detect_objects(
                "/tmp/input", "/tmp/output",
                model=kwargs.get("model", "yolov8n.pt"),
                confidence=kwargs.get("confidence", 0.5)
            )
        elif operation == ImageOperation.CONVERT:
            results = self.processor.batch_convert(
                "/tmp/input", "/tmp/output",
                output_format=kwargs.get("format", "webp"),
                quality=kwargs.get("quality", 85)
            )
        else:
            raise ValueError(f"Unknown operation: {operation}")
        
        # Download results
        print(f"📥 Downloading results...")
        os.makedirs(output_dir, exist_ok=True)
        self.processor.download_files("/tmp/output", output_dir)
        
        # Calculate stats
        total_time = time.time() - start_time
        successful = [r for r in results if r.success]
        
        return PipelineStats(
            total_images=len(results),
            successful=len(successful),
            failed=len(results) - len(successful),
            total_input_mb=sum(r.input_size_kb for r in results) / 1024,
            total_output_mb=sum(r.output_size_kb for r in results) / 1024,
            total_time_seconds=total_time,
            throughput_images_per_sec=len(results) / total_time if total_time > 0 else 0,
            cost_usd=(total_time / 3600) * self.server.hourly_cost
        )
    
    def cleanup(self):
        """Release resources."""
        if self.processor:
            self.processor.disconnect()
        if self.server:
            print("🧹 Releasing server...")
            self.client.cancel_order(self.server.order_id)
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.cleanup()
```

## Full Script: Production Image Processor

```python
#!/usr/bin/env python3
"""
Batch Image Processing using Clore.ai GPUs.

Usage:
    python batch_images.py --api-key YOUR_API_KEY --input ./images/ --output ./processed/ \
        --operation resize --width 1920 --height 1080

Operations:
    resize      - Resize images to specified dimensions
    upscale     - AI upscale images (2x, 4x)
    remove_bg   - Remove background using AI
    detect      - Detect objects with YOLOv8
    convert     - Convert to different format (webp, jpg, png)
"""

import os
import sys
import time
import json
import argparse
import secrets
import requests
import paramiko
from scp import SCPClient
from typing import List, Dict, Optional
from dataclasses import dataclass
from pathlib import Path


@dataclass
class ProcessResult:
    filename: str
    success: bool
    time_ms: float
    input_kb: float
    output_kb: float
    error: Optional[str] = None


class CloreImageProcessor:
    """Complete image processing solution using Clore.ai GPUs."""
    
    BASE_URL = "https://api.clore.ai"
    IMAGE = "pytorch/pytorch:2.7.1-cuda12.8-cudnn9-runtime"
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {"auth": api_key}
        self.order_id = None
        self.ssh_host = None
        self.ssh_port = None
        self.ssh_password = None
        self.hourly_cost = 0.0
        self._ssh = None
        self._scp = None
    
    def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
        url = f"{self.BASE_URL}{endpoint}"
        for attempt in range(3):
            response = requests.request(method, url, headers=self.headers, **kwargs)
            data = response.json()
            if data.get("code") == 5:
                time.sleep(2 ** attempt)
                continue
            if data.get("code") != 0:
                raise Exception(f"API Error: {data}")
            return data
        raise Exception("Max retries")
    
    def find_gpu(self, max_price: float = 0.40) -> Optional[Dict]:
        servers = self._api("GET", "/v1/marketplace")["servers"]
        good_gpus = ["RTX 4090", "RTX 4080", "RTX 3090", "RTX 3080", "A100", "A6000"]
        
        candidates = []
        for s in servers:
            if s.get("rented"):
                continue
            gpus = s.get("gpu_array", [])
            if not any(any(g in gpu for g in good_gpus) for gpu in gpus):
                continue
            price = s.get("price", {}).get("usd", {}).get("spot")
            if price and price <= max_price:
                candidates.append({"id": s["id"], "gpus": gpus, "price": price})
        
        if not candidates:
            return None
        return min(candidates, key=lambda x: x["price"])
    
    def setup(self, max_price: float = 0.40):
        print("🔍 Finding GPU...")
        gpu = self.find_gpu(max_price)
        if not gpu:
            raise Exception(f"No GPU under ${max_price}/hr")
        
        print(f"   {gpu['gpus']} @ ${gpu['price']:.2f}/hr")
        
        self.ssh_password = secrets.token_urlsafe(16)
        self.hourly_cost = gpu["price"]
        
        print("🚀 Provisioning server...")
        order_data = {
            "renting_server": gpu["id"],
            "type": "spot",
            "currency": "CLORE-Blockchain",
            "image": self.IMAGE,
            "ports": {"22": "tcp"},
            "env": {"NVIDIA_VISIBLE_DEVICES": "all"},
            "ssh_password": self.ssh_password,
            "spotprice": gpu["price"] * 1.15
        }
        
        result = self._api("POST", "/v1/create_order", json=order_data)
        self.order_id = result["order_id"]
        
        print("⏳ Waiting for server...")
        for _ in range(120):
            orders = self._api("GET", "/v1/my_orders")["orders"]
            order = next((o for o in orders if o["order_id"] == self.order_id), None)
            if order and order.get("status") == "running":
                conn = order["connection"]["ssh"]
                parts = conn.split()
                self.ssh_host = parts[1].split("@")[1]
                self.ssh_port = int(parts[-1]) if "-p" in conn else 22
                break
            time.sleep(2)
        else:
            raise Exception("Timeout")
        
        # Connect
        self._ssh = paramiko.SSHClient()
        self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self._ssh.connect(self.ssh_host, port=self.ssh_port,
                          username="root", password=self.ssh_password, timeout=30)
        self._scp = SCPClient(self._ssh.get_transport())
        
        print(f"✅ Server ready: {self.ssh_host}:{self.ssh_port}")
        
        # Install packages
        print("📦 Installing packages...")
        self._exec("pip install -q pillow opencv-python-headless rembg ultralytics", timeout=180)
    
    def _exec(self, cmd: str, timeout: int = 600) -> str:
        stdin, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout)
        stdout.channel.recv_exit_status()
        return stdout.read().decode()
    
    def upload_images(self, paths: List[str]) -> str:
        self._exec("rm -rf /tmp/input /tmp/output && mkdir -p /tmp/input /tmp/output")
        for p in paths:
            self._scp.put(p, f"/tmp/input/{os.path.basename(p)}")
        return "/tmp/input"
    
    def download_results(self, local_dir: str):
        os.makedirs(local_dir, exist_ok=True)
        # List files and download each
        files = self._exec("ls /tmp/output/").strip().split("\n")
        for f in files:
            if f.strip():
                self._scp.get(f"/tmp/output/{f.strip()}", local_dir)
    
    def resize(self, width: int, height: int, maintain_aspect: bool = True) -> List[ProcessResult]:
        script = f'''
import os, time, json
from PIL import Image

results = []
for f in os.listdir("/tmp/input"):
    if not f.lower().endswith(('.png','.jpg','.jpeg','.webp')):
        continue
    start = time.time()*1000
    try:
        img = Image.open(f"/tmp/input/{{f}}")
        in_kb = os.path.getsize(f"/tmp/input/{{f}}")/1024
        if {maintain_aspect}:
            img.thumbnail(({width},{height}), Image.Resampling.LANCZOS)
        else:
            img = img.resize(({width},{height}), Image.Resampling.LANCZOS)
        img.save(f"/tmp/output/{{f}}", quality=95)
        out_kb = os.path.getsize(f"/tmp/output/{{f}}")/1024
        results.append({{"f":f,"ok":True,"ms":time.time()*1000-start,"in":in_kb,"out":out_kb}})
    except Exception as e:
        results.append({{"f":f,"ok":False,"ms":0,"in":0,"out":0,"err":str(e)}})
print("R:"+json.dumps(results))
'''
        return self._run_script(script, "resize")
    
    def convert(self, fmt: str, quality: int = 85) -> List[ProcessResult]:
        script = f'''
import os, time, json
from PIL import Image

results = []
for f in os.listdir("/tmp/input"):
    if not f.lower().endswith(('.png','.jpg','.jpeg','.webp','.bmp')):
        continue
    name = os.path.splitext(f)[0]
    start = time.time()*1000
    try:
        img = Image.open(f"/tmp/input/{{f}}")
        in_kb = os.path.getsize(f"/tmp/input/{{f}}")/1024
        if img.mode == 'RGBA' and "{fmt}" in ('jpg','jpeg'):
            img = img.convert('RGB')
        img.save(f"/tmp/output/{{name}}.{fmt}", quality={quality})
        out_kb = os.path.getsize(f"/tmp/output/{{name}}.{fmt}")/1024
        results.append({{"f":f,"ok":True,"ms":time.time()*1000-start,"in":in_kb,"out":out_kb}})
    except Exception as e:
        results.append({{"f":f,"ok":False,"ms":0,"in":0,"out":0,"err":str(e)}})
print("R:"+json.dumps(results))
'''
        return self._run_script(script, "convert")
    
    def remove_bg(self) -> List[ProcessResult]:
        script = '''
import os, time, json
from rembg import remove

results = []
for f in os.listdir("/tmp/input"):
    if not f.lower().endswith(('.png','.jpg','.jpeg','.webp')):
        continue
    name = os.path.splitext(f)[0]
    start = time.time()*1000
    try:
        in_kb = os.path.getsize(f"/tmp/input/{f}")/1024
        with open(f"/tmp/input/{f}", 'rb') as inp:
            data = remove(inp.read())
        with open(f"/tmp/output/{name}.png", 'wb') as out:
            out.write(data)
        out_kb = os.path.getsize(f"/tmp/output/{name}.png")/1024
        results.append({"f":f,"ok":True,"ms":time.time()*1000-start,"in":in_kb,"out":out_kb})
    except Exception as e:
        results.append({"f":f,"ok":False,"ms":0,"in":0,"out":0,"err":str(e)})
print("R:"+json.dumps(results))
'''
        return self._run_script(script, "remove_bg")
    
    def detect(self, confidence: float = 0.5) -> List[ProcessResult]:
        script = f'''
import os, time, json
from ultralytics import YOLO
from PIL import Image

model = YOLO("yolov8n.pt")
results_list = []

for f in os.listdir("/tmp/input"):
    if not f.lower().endswith(('.png','.jpg','.jpeg','.webp')):
        continue
    start = time.time()*1000
    try:
        in_kb = os.path.getsize(f"/tmp/input/{{f}}")/1024
        res = model(f"/tmp/input/{{f}}", conf={confidence})[0]
        annotated = res.plot()
        Image.fromarray(annotated[...,::-1]).save(f"/tmp/output/{{f}}")
        out_kb = os.path.getsize(f"/tmp/output/{{f}}")/1024
        results_list.append({{"f":f,"ok":True,"ms":time.time()*1000-start,"in":in_kb,"out":out_kb}})
    except Exception as e:
        results_list.append({{"f":f,"ok":False,"ms":0,"in":0,"out":0,"err":str(e)}})
print("R:"+json.dumps(results_list))
'''
        return self._run_script(script, "detect")
    
    def _run_script(self, script: str, operation: str) -> List[ProcessResult]:
        self._exec(f"cat > /tmp/proc.py << 'EOF'\n{script}\nEOF")
        out = self._exec("python3 /tmp/proc.py", timeout=1800)
        
        for line in out.split("\n"):
            if line.startswith("R:"):
                data = json.loads(line[2:])
                return [
                    ProcessResult(
                        filename=r["f"],
                        success=r["ok"],
                        time_ms=r["ms"],
                        input_kb=r["in"],
                        output_kb=r["out"],
                        error=r.get("err")
                    )
                    for r in data
                ]
        return []
    
    def cleanup(self):
        if self._scp:
            self._scp.close()
        if self._ssh:
            self._ssh.close()
        if self.order_id:
            print("🧹 Releasing server...")
            self._api("POST", "/v1/cancel_order", json={"id": self.order_id})
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.cleanup()


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--api-key", required=True)
    parser.add_argument("--input", "-i", required=True, help="Input directory or files")
    parser.add_argument("--output", "-o", required=True, help="Output directory")
    parser.add_argument("--operation", choices=["resize", "convert", "remove_bg", "detect"], required=True)
    parser.add_argument("--width", type=int, default=1920)
    parser.add_argument("--height", type=int, default=1080)
    parser.add_argument("--format", default="webp")
    parser.add_argument("--quality", type=int, default=85)
    parser.add_argument("--confidence", type=float, default=0.5)
    parser.add_argument("--max-price", type=float, default=0.40)
    args = parser.parse_args()
    
    # Collect input files
    if os.path.isdir(args.input):
        files = [os.path.join(args.input, f) for f in os.listdir(args.input)
                 if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp'))]
    else:
        files = [args.input]
    
    print(f"📁 Processing {len(files)} images")
    
    with CloreImageProcessor(args.api_key) as proc:
        proc.setup(args.max_price)
        
        start = time.time()
        
        # Upload
        print(f"📤 Uploading...")
        proc.upload_images(files)
        
        # Process
        print(f"⚙️  {args.operation}...")
        if args.operation == "resize":
            results = proc.resize(args.width, args.height)
        elif args.operation == "convert":
            results = proc.convert(args.format, args.quality)
        elif args.operation == "remove_bg":
            results = proc.remove_bg()
        elif args.operation == "detect":
            results = proc.detect(args.confidence)
        
        # Download
        print(f"📥 Downloading...")
        proc.download_results(args.output)
        
        # Stats
        elapsed = time.time() - start
        success = [r for r in results if r.success]
        
        print("\n" + "="*60)
        print("📊 SUMMARY")
        print(f"   Processed: {len(success)}/{len(results)}")
        print(f"   Input: {sum(r.input_kb for r in results)/1024:.1f} MB")
        print(f"   Output: {sum(r.output_kb for r in results)/1024:.1f} MB")
        print(f"   Time: {elapsed:.1f}s")
        print(f"   Throughput: {len(results)/elapsed:.1f} img/s")
        print(f"   Cost: ${(elapsed/3600)*proc.hourly_cost:.4f}")
        print(f"   Output: {args.output}")


if __name__ == "__main__":
    main()
```

## Cost Comparison

| Operation        | 1000 Images | AWS Lambda | Cloudinary | Clore.ai  |
| ---------------- | ----------- | ---------- | ---------- | --------- |
| Resize           | 1000        | $0.20      | $0.50      | **$0.02** |
| Convert WebP     | 1000        | $0.30      | $0.75      | **$0.03** |
| Remove BG        | 1000        | N/A        | $5.00      | **$0.15** |
| Object Detection | 1000        | $2.00      | N/A        | **$0.10** |
| AI Upscale 4x    | 1000        | N/A        | $10.00     | **$0.25** |

## Performance Benchmarks

| Operation          | GPU      | Images/sec | Notes              |
| ------------------ | -------- | ---------- | ------------------ |
| Resize (1080p)     | RTX 3080 | 500+       | PIL + GPU memory   |
| WebP Convert       | RTX 3080 | 300+       | CPU-bound encoding |
| Background Removal | RTX 4090 | 15-20      | U2-Net model       |
| Object Detection   | RTX 4090 | 50-100     | YOLOv8n            |
| AI Upscale 4x      | RTX 4090 | 5-10       | Real-ESRGAN        |

## Next Steps

* [RAPIDS Data Science](https://docs.clore.ai/dev/data-processing-and-pipelines/rapids-processing)
* [Video Transcoding](https://docs.clore.ai/dev/data-processing-and-pipelines/video-transcoding)
* [YOLOv8 Training](https://docs.clore.ai/dev/machine-learning-and-training/yolo-training)
