# Video Transcoding Pipeline

## What We're Building

A production-grade video transcoding pipeline that automatically rents Clore.ai GPUs, processes video files with NVIDIA NVENC hardware acceleration, and achieves 10-50x faster encoding compared to CPU-only transcoding.

**Key Features:**

* Automatic GPU server provisioning via Clore.ai API
* FFmpeg with NVENC H.264/H.265/AV1 hardware encoding
* Batch processing multiple videos in parallel
* Automatic quality/bitrate optimization
* Cost-effective spot instance usage
* Progress tracking and webhook notifications

## Prerequisites

* Clore.ai account with API key ([get one here](https://clore.ai))
* Python 3.10+
* Basic understanding of video codecs
* Videos to transcode (local or S3/cloud storage)

```bash
pip install requests boto3 tqdm
```

## Architecture Overview

```
┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│ Video Queue │────▶│ Clore.ai GPU │────▶│   Output    │
│  (S3/Local) │     │  + FFmpeg    │     │   Storage   │
└─────────────┘     └──────────────┘     └─────────────┘
       │                   │                    │
       └───────────────────┴────────────────────┘
                    Orchestrator
```

## Step 1: Clore.ai Client for Transcoding

```python
# clore_client.py
import requests
import time
from typing import Optional, Dict, Any, List
from dataclasses import dataclass

@dataclass
class TranscodeServer:
    """Represents a rented GPU server for transcoding."""
    server_id: int
    order_id: int
    ssh_host: str
    ssh_port: int
    ssh_password: str
    gpu_model: str
    hourly_cost: float

class CloreTranscodeClient:
    """Clore.ai client optimized for video transcoding workloads."""
    
    BASE_URL = "https://api.clore.ai"
    
    # Docker image with FFmpeg + NVENC support
    FFMPEG_IMAGE = "jrottenberg/ffmpeg:4.4-nvidia"
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {"auth": api_key}
    
    def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
        """Make API request with error handling and retry."""
        url = f"{self.BASE_URL}{endpoint}"
        
        for attempt in range(3):
            try:
                response = requests.request(
                    method, url, 
                    headers=self.headers, 
                    timeout=30,
                    **kwargs
                )
                data = response.json()
                
                if data.get("code") == 5:  # Rate limited
                    time.sleep(2 ** attempt)
                    continue
                    
                if data.get("code") != 0:
                    raise Exception(f"API Error: {data}")
                return data
                
            except requests.exceptions.Timeout:
                if attempt == 2:
                    raise
                time.sleep(1)
        
        raise Exception("Max retries exceeded")
    
    def find_transcode_gpu(self, 
                           min_vram_gb: int = 8,
                           max_price_usd: float = 0.50,
                           prefer_spot: bool = True) -> Optional[Dict]:
        """Find optimal GPU for video transcoding."""
        
        servers = self._request("GET", "/v1/marketplace")["servers"]
        
        # GPUs with good NVENC support
        nvenc_gpus = ["RTX 4090", "RTX 4080", "RTX 3090", "RTX 3080", 
                      "RTX 3070", "A100", "A6000", "A5000", "RTX A4000"]
        
        candidates = []
        for server in servers:
            if server.get("rented"):
                continue
            
            gpu_array = server.get("gpu_array", [])
            if not any(any(g in gpu for g in nvenc_gpus) for gpu in gpu_array):
                continue
            
            # Check price
            price_data = server.get("price", {}).get("usd", {})
            if prefer_spot:
                price = price_data.get("spot", price_data.get("on_demand_clore"))
            else:
                price = price_data.get("on_demand_clore")
            
            if not price or price > max_price_usd:
                continue
            
            candidates.append({
                "id": server["id"],
                "gpus": gpu_array,
                "price_usd": price,
                "reliability": server.get("reliability", 0),
                "specs": server.get("specs", {}),
                "is_spot": prefer_spot
            })
        
        if not candidates:
            return None
        
        # Sort: price first, then reliability
        candidates.sort(key=lambda x: (x["price_usd"], -x["reliability"]))
        return candidates[0]
    
    def rent_transcode_server(self, 
                              server_id: int, 
                              ssh_password: str,
                              use_spot: bool = True,
                              spot_price: float = 0.3) -> TranscodeServer:
        """Rent a server for video transcoding."""
        
        order_data = {
            "renting_server": server_id,
            "type": "spot" if use_spot else "on-demand",
            "currency": "CLORE-Blockchain",
            "image": self.FFMPEG_IMAGE,
            "ports": {"22": "tcp"},
            "env": {
                "NVIDIA_VISIBLE_DEVICES": "all",
                "NVIDIA_DRIVER_CAPABILITIES": "compute,video,utility"
            },
            "ssh_password": ssh_password
        }
        
        if use_spot:
            order_data["spotprice"] = spot_price
        
        result = self._request("POST", "/v1/create_order", json=order_data)
        order_id = result["order_id"]
        
        # Wait for server to be ready
        print(f"Waiting for server {server_id} to start...")
        for _ in range(90):
            orders = self._request("GET", "/v1/my_orders")["orders"]
            order = next((o for o in orders if o["order_id"] == order_id), None)
            
            if order and order.get("status") == "running":
                conn = order["connection"]
                # Parse SSH connection string
                ssh_parts = conn["ssh"].split()
                ssh_host = ssh_parts[1].split("@")[1] if "@" in ssh_parts[1] else ssh_parts[1]
                ssh_port = int(ssh_parts[-1]) if "-p" in conn["ssh"] else 22
                
                return TranscodeServer(
                    server_id=server_id,
                    order_id=order_id,
                    ssh_host=ssh_host,
                    ssh_port=ssh_port,
                    ssh_password=ssh_password,
                    gpu_model=order.get("gpu_array", ["Unknown"])[0] if "gpu_array" in order else "GPU",
                    hourly_cost=order.get("price", 0) * 60
                )
            
            time.sleep(2)
        
        raise Exception("Timeout waiting for server to start")
    
    def cancel_order(self, order_id: int):
        """Cancel a rental order."""
        self._request("POST", "/v1/cancel_order", json={"id": order_id})
```

## Step 2: FFmpeg Transcoding Engine

```python
# transcode_engine.py
import subprocess
import os
import json
from dataclasses import dataclass
from typing import List, Optional
from enum import Enum

class VideoCodec(Enum):
    H264 = "h264_nvenc"
    H265 = "hevc_nvenc"
    AV1 = "av1_nvenc"  # RTX 40 series only

class VideoPreset(Enum):
    FASTEST = "p1"
    FAST = "p4"
    BALANCED = "p5"
    QUALITY = "p6"
    BEST = "p7"

@dataclass
class TranscodeJob:
    input_path: str
    output_path: str
    codec: VideoCodec = VideoCodec.H265
    preset: VideoPreset = VideoPreset.BALANCED
    crf: int = 23
    resolution: Optional[str] = None  # e.g., "1920x1080"
    fps: Optional[int] = None
    audio_bitrate: str = "128k"
    
class FFmpegTranscoder:
    """FFmpeg wrapper for GPU-accelerated transcoding."""
    
    def __init__(self, ssh_host: str, ssh_port: int, ssh_password: str):
        self.ssh_host = ssh_host
        self.ssh_port = ssh_port
        self.ssh_password = ssh_password
    
    def _ssh_command(self, cmd: str) -> str:
        """Execute command via SSH."""
        ssh_cmd = f"sshpass -p '{self.ssh_password}' ssh -o StrictHostKeyChecking=no " \
                  f"-p {self.ssh_port} root@{self.ssh_host} '{cmd}'"
        result = subprocess.run(ssh_cmd, shell=True, capture_output=True, text=True)
        if result.returncode != 0:
            raise Exception(f"SSH command failed: {result.stderr}")
        return result.stdout
    
    def _scp_upload(self, local_path: str, remote_path: str):
        """Upload file via SCP."""
        cmd = f"sshpass -p '{self.ssh_password}' scp -o StrictHostKeyChecking=no " \
              f"-P {self.ssh_port} '{local_path}' root@{self.ssh_host}:'{remote_path}'"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
        if result.returncode != 0:
            raise Exception(f"Upload failed: {result.stderr}")
    
    def _scp_download(self, remote_path: str, local_path: str):
        """Download file via SCP."""
        cmd = f"sshpass -p '{self.ssh_password}' scp -o StrictHostKeyChecking=no " \
              f"-P {self.ssh_port} root@{self.ssh_host}:'{remote_path}' '{local_path}'"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
        if result.returncode != 0:
            raise Exception(f"Download failed: {result.stderr}")
    
    def check_gpu(self) -> dict:
        """Verify GPU and NVENC availability."""
        # Check nvidia-smi
        nvidia_info = self._ssh_command("nvidia-smi --query-gpu=name,memory.total --format=csv,noheader")
        
        # Check NVENC support
        nvenc_check = self._ssh_command("ffmpeg -hide_banner -encoders 2>/dev/null | grep nvenc || true")
        
        return {
            "gpu": nvidia_info.strip(),
            "nvenc_available": "nvenc" in nvenc_check.lower(),
            "encoders": nvenc_check.strip().split("\n") if nvenc_check.strip() else []
        }
    
    def get_video_info(self, remote_path: str) -> dict:
        """Get video file information."""
        cmd = f"ffprobe -v quiet -print_format json -show_format -show_streams '{remote_path}'"
        output = self._ssh_command(cmd)
        return json.loads(output)
    
    def build_ffmpeg_command(self, job: TranscodeJob) -> str:
        """Build FFmpeg command for transcoding job."""
        
        cmd_parts = [
            "ffmpeg -y",
            "-hwaccel cuda",
            "-hwaccel_output_format cuda",
            f"-i '{job.input_path}'"
        ]
        
        # Video filters
        filters = []
        if job.resolution:
            w, h = job.resolution.split("x")
            filters.append(f"scale_cuda={w}:{h}")
        if job.fps:
            filters.append(f"fps={job.fps}")
        
        if filters:
            cmd_parts.append(f"-vf '{','.join(filters)}'")
        
        # Video encoding
        cmd_parts.extend([
            f"-c:v {job.codec.value}",
            f"-preset {job.preset.value}",
            f"-cq {job.crf}",
            "-b:v 0",  # Use CRF mode
        ])
        
        # Audio encoding
        cmd_parts.extend([
            "-c:a aac",
            f"-b:a {job.audio_bitrate}",
        ])
        
        # Output
        cmd_parts.append(f"'{job.output_path}'")
        
        return " ".join(cmd_parts)
    
    def transcode(self, job: TranscodeJob, progress_callback=None) -> dict:
        """Execute transcoding job."""
        
        cmd = self.build_ffmpeg_command(job)
        print(f"Running: {cmd}")
        
        # Execute with progress
        full_cmd = f"{cmd} -progress pipe:1 2>&1"
        output = self._ssh_command(full_cmd)
        
        return {
            "input": job.input_path,
            "output": job.output_path,
            "codec": job.codec.value,
            "status": "completed"
        }
    
    def batch_transcode(self, jobs: List[TranscodeJob]) -> List[dict]:
        """Process multiple transcoding jobs."""
        results = []
        for i, job in enumerate(jobs):
            print(f"Processing job {i+1}/{len(jobs)}: {job.input_path}")
            result = self.transcode(job)
            results.append(result)
        return results
```

## Step 3: Complete Transcoding Pipeline

```python
# transcode_pipeline.py
import os
import time
import argparse
from pathlib import Path
from typing import List
from clore_client import CloreTranscodeClient, TranscodeServer
from transcode_engine import FFmpegTranscoder, TranscodeJob, VideoCodec, VideoPreset

class TranscodePipeline:
    """End-to-end video transcoding pipeline using Clore.ai GPUs."""
    
    def __init__(self, api_key: str):
        self.client = CloreTranscodeClient(api_key)
        self.server: TranscodeServer = None
        self.transcoder: FFmpegTranscoder = None
    
    def setup(self, max_price_usd: float = 0.40, use_spot: bool = True):
        """Find and rent a GPU server."""
        
        print("🔍 Finding optimal GPU for transcoding...")
        gpu = self.client.find_transcode_gpu(
            max_price_usd=max_price_usd,
            prefer_spot=use_spot
        )
        
        if not gpu:
            raise Exception(f"No suitable GPU found under ${max_price_usd}/hr")
        
        print(f"   Found: Server {gpu['id']} with {gpu['gpus']}")
        print(f"   Price: ${gpu['price_usd']:.2f}/hr")
        
        # Generate secure password
        import secrets
        ssh_password = secrets.token_urlsafe(16)
        
        print("\n🚀 Renting server...")
        self.server = self.client.rent_transcode_server(
            server_id=gpu["id"],
            ssh_password=ssh_password,
            use_spot=use_spot,
            spot_price=gpu["price_usd"] * 1.1  # Bid 10% above current
        )
        
        print(f"   Server ready: {self.server.ssh_host}:{self.server.ssh_port}")
        
        # Initialize transcoder
        self.transcoder = FFmpegTranscoder(
            self.server.ssh_host,
            self.server.ssh_port,
            self.server.ssh_password
        )
        
        # Verify GPU
        print("\n🔧 Verifying GPU setup...")
        gpu_info = self.transcoder.check_gpu()
        print(f"   GPU: {gpu_info['gpu']}")
        print(f"   NVENC: {'✅ Available' if gpu_info['nvenc_available'] else '❌ Not available'}")
        
        return self
    
    def transcode_file(self, 
                       input_path: str, 
                       output_path: str,
                       codec: VideoCodec = VideoCodec.H265,
                       preset: VideoPreset = VideoPreset.BALANCED,
                       crf: int = 23,
                       resolution: str = None) -> dict:
        """Transcode a single video file."""
        
        if not self.transcoder:
            raise Exception("Pipeline not set up. Call setup() first.")
        
        # Upload input file
        remote_input = f"/tmp/{os.path.basename(input_path)}"
        remote_output = f"/tmp/output_{os.path.basename(output_path)}"
        
        print(f"\n📤 Uploading {input_path}...")
        self.transcoder._scp_upload(input_path, remote_input)
        
        # Create job
        job = TranscodeJob(
            input_path=remote_input,
            output_path=remote_output,
            codec=codec,
            preset=preset,
            crf=crf,
            resolution=resolution
        )
        
        # Transcode
        print(f"🎬 Transcoding with {codec.value}...")
        start_time = time.time()
        result = self.transcoder.transcode(job)
        duration = time.time() - start_time
        
        # Download output
        print(f"📥 Downloading result...")
        self.transcoder._scp_download(remote_output, output_path)
        
        # Get file sizes
        input_size = os.path.getsize(input_path)
        output_size = os.path.getsize(output_path)
        
        result.update({
            "duration_seconds": duration,
            "input_size_mb": input_size / (1024 * 1024),
            "output_size_mb": output_size / (1024 * 1024),
            "compression_ratio": input_size / output_size if output_size > 0 else 0,
            "cost_usd": (duration / 3600) * self.server.hourly_cost
        })
        
        print(f"✅ Completed in {duration:.1f}s")
        print(f"   Size: {result['input_size_mb']:.1f}MB → {result['output_size_mb']:.1f}MB")
        print(f"   Compression: {result['compression_ratio']:.1f}x")
        print(f"   Cost: ${result['cost_usd']:.4f}")
        
        return result
    
    def transcode_batch(self, 
                        input_files: List[str],
                        output_dir: str,
                        codec: VideoCodec = VideoCodec.H265,
                        preset: VideoPreset = VideoPreset.BALANCED,
                        crf: int = 23) -> List[dict]:
        """Transcode multiple video files."""
        
        os.makedirs(output_dir, exist_ok=True)
        results = []
        
        for i, input_path in enumerate(input_files):
            print(f"\n{'='*60}")
            print(f"Processing file {i+1}/{len(input_files)}")
            
            filename = os.path.basename(input_path)
            name, _ = os.path.splitext(filename)
            output_path = os.path.join(output_dir, f"{name}.mp4")
            
            try:
                result = self.transcode_file(
                    input_path, output_path,
                    codec=codec, preset=preset, crf=crf
                )
                results.append(result)
            except Exception as e:
                print(f"❌ Failed: {e}")
                results.append({"input": input_path, "status": "failed", "error": str(e)})
        
        return results
    
    def cleanup(self):
        """Release GPU server."""
        if self.server:
            print(f"\n🧹 Cancelling order {self.server.order_id}...")
            self.client.cancel_order(self.server.order_id)
            print("   Done!")
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cleanup()


def main():
    parser = argparse.ArgumentParser(description="GPU-accelerated video transcoding")
    parser.add_argument("--api-key", required=True, help="Clore.ai API key")
    parser.add_argument("--input", "-i", required=True, nargs="+", help="Input video files")
    parser.add_argument("--output", "-o", required=True, help="Output directory")
    parser.add_argument("--codec", choices=["h264", "h265", "av1"], default="h265")
    parser.add_argument("--preset", choices=["fastest", "fast", "balanced", "quality", "best"], default="balanced")
    parser.add_argument("--crf", type=int, default=23, help="Quality (lower = better, 18-28)")
    parser.add_argument("--max-price", type=float, default=0.40, help="Max hourly price in USD")
    parser.add_argument("--on-demand", action="store_true", help="Use on-demand instead of spot")
    
    args = parser.parse_args()
    
    # Map codec
    codec_map = {
        "h264": VideoCodec.H264,
        "h265": VideoCodec.H265,
        "av1": VideoCodec.AV1
    }
    
    preset_map = {
        "fastest": VideoPreset.FASTEST,
        "fast": VideoPreset.FAST,
        "balanced": VideoPreset.BALANCED,
        "quality": VideoPreset.QUALITY,
        "best": VideoPreset.BEST
    }
    
    with TranscodePipeline(args.api_key) as pipeline:
        pipeline.setup(
            max_price_usd=args.max_price,
            use_spot=not args.on_demand
        )
        
        results = pipeline.transcode_batch(
            args.input,
            args.output,
            codec=codec_map[args.codec],
            preset=preset_map[args.preset],
            crf=args.crf
        )
        
        # Summary
        print("\n" + "="*60)
        print("📊 SUMMARY")
        print("="*60)
        
        total_input = sum(r.get("input_size_mb", 0) for r in results)
        total_output = sum(r.get("output_size_mb", 0) for r in results)
        total_time = sum(r.get("duration_seconds", 0) for r in results)
        total_cost = sum(r.get("cost_usd", 0) for r in results)
        
        print(f"Files processed: {len(results)}")
        print(f"Total input: {total_input:.1f} MB")
        print(f"Total output: {total_output:.1f} MB")
        print(f"Total time: {total_time:.1f}s")
        print(f"Total cost: ${total_cost:.4f}")
        print(f"Average speed: {total_input / total_time:.1f} MB/s" if total_time > 0 else "")


if __name__ == "__main__":
    main()
```

## Step 4: Streaming Transcoding for Large Files

```python
# stream_transcode.py
"""
Streaming transcoding for very large files using chunked processing.
Useful when you can't upload the entire file at once.
"""

import subprocess
import threading
import queue
from typing import Generator

class StreamingTranscoder:
    """Stream video data directly to GPU for transcoding."""
    
    def __init__(self, ssh_host: str, ssh_port: int, ssh_password: str):
        self.ssh_host = ssh_host
        self.ssh_port = ssh_port  
        self.ssh_password = ssh_password
    
    def stream_transcode(self, 
                         input_stream: Generator[bytes, None, None],
                         output_path: str,
                         codec: str = "hevc_nvenc") -> dict:
        """
        Stream video data directly to remote FFmpeg.
        
        Args:
            input_stream: Generator yielding video data chunks
            output_path: Local path for output file
            codec: NVENC codec to use
        """
        
        # SSH command to receive piped data and transcode
        ssh_cmd = [
            "sshpass", "-p", self.ssh_password,
            "ssh", "-o", "StrictHostKeyChecking=no",
            "-p", str(self.ssh_port),
            f"root@{self.ssh_host}",
            f"ffmpeg -y -hwaccel cuda -i pipe:0 -c:v {codec} -preset p5 -cq 23 -c:a aac -f mp4 pipe:1"
        ]
        
        # Start SSH process
        proc = subprocess.Popen(
            ssh_cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        
        # Write output to file
        output_data = []
        
        def read_output():
            while True:
                chunk = proc.stdout.read(65536)
                if not chunk:
                    break
                output_data.append(chunk)
        
        output_thread = threading.Thread(target=read_output)
        output_thread.start()
        
        # Stream input data
        bytes_sent = 0
        for chunk in input_stream:
            proc.stdin.write(chunk)
            bytes_sent += len(chunk)
        
        proc.stdin.close()
        output_thread.join()
        proc.wait()
        
        # Write output file
        with open(output_path, "wb") as f:
            for chunk in output_data:
                f.write(chunk)
        
        return {
            "bytes_sent": bytes_sent,
            "bytes_received": sum(len(c) for c in output_data),
            "output_path": output_path
        }


def file_chunk_generator(file_path: str, chunk_size: int = 1024 * 1024):
    """Generate chunks from a file."""
    with open(file_path, "rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk
```

## Full Script: Production Transcoding Service

```python
#!/usr/bin/env python3
"""
Production-ready video transcoding service using Clore.ai GPUs.
Supports batch processing, multiple codecs, and cost optimization.

Usage:
    python video_transcode.py --api-key YOUR_API_KEY --input video1.mp4 video2.mov --output ./transcoded/
"""

import os
import sys
import time
import json
import secrets
import argparse
import requests
from pathlib import Path
from typing import List, Dict, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import subprocess


class VideoCodec(Enum):
    H264 = "h264_nvenc"
    H265 = "hevc_nvenc"
    AV1 = "av1_nvenc"


class VideoPreset(Enum):
    FASTEST = "p1"
    FAST = "p4"
    BALANCED = "p5"
    QUALITY = "p6"
    BEST = "p7"


@dataclass
class TranscodeResult:
    input_file: str
    output_file: str
    status: str
    codec: str
    duration_seconds: float
    input_size_mb: float
    output_size_mb: float
    compression_ratio: float
    cost_usd: float
    error: Optional[str] = None


class CloreVideoTranscoder:
    """Complete video transcoding solution using Clore.ai GPUs."""
    
    BASE_URL = "https://api.clore.ai"
    FFMPEG_IMAGE = "jrottenberg/ffmpeg:4.4-nvidia"
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {"auth": api_key}
        self.server = None
        self.order_id = None
        self.ssh_host = None
        self.ssh_port = None
        self.ssh_password = None
        self.hourly_cost = 0.0
    
    def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
        """Make API request."""
        url = f"{self.BASE_URL}{endpoint}"
        for attempt in range(3):
            response = requests.request(method, url, headers=self.headers, **kwargs)
            data = response.json()
            if data.get("code") == 5:
                time.sleep(2 ** attempt)
                continue
            if data.get("code") != 0:
                raise Exception(f"API Error: {data}")
            return data
        raise Exception("Max retries exceeded")
    
    def _ssh(self, cmd: str) -> str:
        """Execute SSH command."""
        ssh_cmd = f"sshpass -p '{self.ssh_password}' ssh -o StrictHostKeyChecking=no " \
                  f"-p {self.ssh_port} root@{self.ssh_host} '{cmd}'"
        result = subprocess.run(ssh_cmd, shell=True, capture_output=True, text=True)
        return result.stdout
    
    def _upload(self, local: str, remote: str):
        """Upload file via SCP."""
        cmd = f"sshpass -p '{self.ssh_password}' scp -o StrictHostKeyChecking=no " \
              f"-P {self.ssh_port} '{local}' root@{self.ssh_host}:'{remote}'"
        subprocess.run(cmd, shell=True, check=True)
    
    def _download(self, remote: str, local: str):
        """Download file via SCP."""
        cmd = f"sshpass -p '{self.ssh_password}' scp -o StrictHostKeyChecking=no " \
              f"-P {self.ssh_port} root@{self.ssh_host}:'{remote}' '{local}'"
        subprocess.run(cmd, shell=True, check=True)
    
    def find_gpu(self, max_price: float = 0.50, prefer_spot: bool = True) -> Optional[Dict]:
        """Find optimal transcoding GPU."""
        servers = self._api("GET", "/v1/marketplace")["servers"]
        
        nvenc_gpus = ["RTX 4090", "RTX 4080", "RTX 3090", "RTX 3080", "A100", "A6000"]
        candidates = []
        
        for s in servers:
            if s.get("rented"):
                continue
            gpus = s.get("gpu_array", [])
            if not any(any(g in gpu for g in nvenc_gpus) for gpu in gpus):
                continue
            
            price = s.get("price", {}).get("usd", {})
            cost = price.get("spot" if prefer_spot else "on_demand_clore")
            if cost and cost <= max_price:
                candidates.append({
                    "id": s["id"], "gpus": gpus, "price": cost,
                    "reliability": s.get("reliability", 0)
                })
        
        if not candidates:
            return None
        return min(candidates, key=lambda x: (x["price"], -x["reliability"]))
    
    def setup(self, max_price: float = 0.50, use_spot: bool = True):
        """Rent and setup GPU server."""
        print("🔍 Finding GPU...")
        gpu = self.find_gpu(max_price, use_spot)
        if not gpu:
            raise Exception(f"No GPU found under ${max_price}/hr")
        
        print(f"   Server {gpu['id']}: {gpu['gpus']} @ ${gpu['price']:.2f}/hr")
        
        self.ssh_password = secrets.token_urlsafe(16)
        
        print("🚀 Renting server...")
        order_data = {
            "renting_server": gpu["id"],
            "type": "spot" if use_spot else "on-demand",
            "currency": "CLORE-Blockchain",
            "image": self.FFMPEG_IMAGE,
            "ports": {"22": "tcp"},
            "env": {"NVIDIA_VISIBLE_DEVICES": "all"},
            "ssh_password": self.ssh_password
        }
        if use_spot:
            order_data["spotprice"] = gpu["price"] * 1.1
        
        result = self._api("POST", "/v1/create_order", json=order_data)
        self.order_id = result["order_id"]
        
        print("⏳ Waiting for server...")
        for _ in range(90):
            orders = self._api("GET", "/v1/my_orders")["orders"]
            order = next((o for o in orders if o["order_id"] == self.order_id), None)
            if order and order.get("status") == "running":
                conn = order["connection"]["ssh"]
                parts = conn.split()
                self.ssh_host = parts[1].split("@")[1]
                self.ssh_port = int(parts[-1]) if "-p" in conn else 22
                self.hourly_cost = gpu["price"]
                print(f"✅ Server ready: {self.ssh_host}:{self.ssh_port}")
                return
            time.sleep(2)
        raise Exception("Timeout")
    
    def transcode(self, input_path: str, output_path: str,
                  codec: VideoCodec = VideoCodec.H265,
                  preset: VideoPreset = VideoPreset.BALANCED,
                  crf: int = 23, resolution: str = None) -> TranscodeResult:
        """Transcode single file."""
        
        filename = os.path.basename(input_path)
        remote_in = f"/tmp/{filename}"
        remote_out = f"/tmp/out_{filename}.mp4"
        
        print(f"📤 Uploading {filename}...")
        self._upload(input_path, remote_in)
        
        # Build FFmpeg command
        vf = f"-vf 'scale_cuda={resolution}'" if resolution else ""
        cmd = f"ffmpeg -y -hwaccel cuda -hwaccel_output_format cuda -i '{remote_in}' " \
              f"{vf} -c:v {codec.value} -preset {preset.value} -cq {crf} -b:v 0 " \
              f"-c:a aac -b:a 128k '{remote_out}' 2>&1"
        
        print(f"🎬 Transcoding...")
        start = time.time()
        self._ssh(cmd)
        duration = time.time() - start
        
        print(f"📥 Downloading...")
        self._download(remote_out, output_path)
        
        input_size = os.path.getsize(input_path) / (1024*1024)
        output_size = os.path.getsize(output_path) / (1024*1024)
        
        return TranscodeResult(
            input_file=input_path,
            output_file=output_path,
            status="success",
            codec=codec.value,
            duration_seconds=duration,
            input_size_mb=input_size,
            output_size_mb=output_size,
            compression_ratio=input_size/output_size if output_size > 0 else 0,
            cost_usd=(duration/3600) * self.hourly_cost
        )
    
    def batch_transcode(self, input_files: List[str], output_dir: str,
                        codec: VideoCodec = VideoCodec.H265,
                        preset: VideoPreset = VideoPreset.BALANCED,
                        crf: int = 23) -> List[TranscodeResult]:
        """Transcode multiple files."""
        os.makedirs(output_dir, exist_ok=True)
        results = []
        
        for i, inp in enumerate(input_files):
            print(f"\n[{i+1}/{len(input_files)}] {os.path.basename(inp)}")
            name = Path(inp).stem
            out = os.path.join(output_dir, f"{name}.mp4")
            
            try:
                result = self.transcode(inp, out, codec, preset, crf)
                print(f"   ✅ {result.input_size_mb:.1f}MB → {result.output_size_mb:.1f}MB ({result.compression_ratio:.1f}x) ${result.cost_usd:.4f}")
                results.append(result)
            except Exception as e:
                print(f"   ❌ {e}")
                results.append(TranscodeResult(
                    input_file=inp, output_file=out, status="failed",
                    codec=codec.value, duration_seconds=0, input_size_mb=0,
                    output_size_mb=0, compression_ratio=0, cost_usd=0, error=str(e)
                ))
        
        return results
    
    def cleanup(self):
        """Cancel order and release resources."""
        if self.order_id:
            print("🧹 Releasing server...")
            self._api("POST", "/v1/cancel_order", json={"id": self.order_id})
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.cleanup()


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--api-key", required=True)
    parser.add_argument("--input", "-i", nargs="+", required=True)
    parser.add_argument("--output", "-o", required=True)
    parser.add_argument("--codec", choices=["h264", "h265", "av1"], default="h265")
    parser.add_argument("--preset", choices=["fastest", "fast", "balanced", "quality", "best"], default="balanced")
    parser.add_argument("--crf", type=int, default=23)
    parser.add_argument("--max-price", type=float, default=0.50)
    parser.add_argument("--on-demand", action="store_true")
    args = parser.parse_args()
    
    codec_map = {"h264": VideoCodec.H264, "h265": VideoCodec.H265, "av1": VideoCodec.AV1}
    preset_map = {"fastest": VideoPreset.FASTEST, "fast": VideoPreset.FAST,
                  "balanced": VideoPreset.BALANCED, "quality": VideoPreset.QUALITY, "best": VideoPreset.BEST}
    
    with CloreVideoTranscoder(args.api_key) as transcoder:
        transcoder.setup(args.max_price, not args.on_demand)
        results = transcoder.batch_transcode(
            args.input, args.output,
            codec_map[args.codec], preset_map[args.preset], args.crf
        )
        
        # Summary
        print("\n" + "="*60)
        success = [r for r in results if r.status == "success"]
        total_in = sum(r.input_size_mb for r in success)
        total_out = sum(r.output_size_mb for r in success)
        total_time = sum(r.duration_seconds for r in success)
        total_cost = sum(r.cost_usd for r in success)
        
        print(f"📊 SUMMARY: {len(success)}/{len(results)} succeeded")
        print(f"   Total: {total_in:.1f}MB → {total_out:.1f}MB")
        print(f"   Time: {total_time:.1f}s | Cost: ${total_cost:.4f}")
        if total_time > 0:
            print(f"   Speed: {total_in/total_time:.1f} MB/s")


if __name__ == "__main__":
    main()
```

## Cost Comparison

| Provider            | RTX 4090        | 1TB Video | Time   | Cost            |
| ------------------- | --------------- | --------- | ------ | --------------- |
| **Clore.ai (spot)** | $0.25-0.40/hr   | 1TB       | \~3hr  | **$0.75-1.20**  |
| AWS g5.xlarge       | $1.00/hr        | 1TB       | \~8hr  | $8.00           |
| Local RTX 4090      | $1,599 one-time | 1TB       | \~3hr  | \~$0.50 (power) |
| CPU Transcoding     | -               | 1TB       | \~30hr | $30+            |

**GPU transcoding is 10-50x faster than CPU** and with Clore.ai spot pricing, it's incredibly cost-effective for batch workloads.

## Performance Tips

1. **Use HEVC (H.265)** for best compression ratio
2. **CRF 23** is a good balance of quality vs size
3. **Preset p5 (balanced)** offers good speed/quality tradeoff
4. **Use spot instances** for batch jobs (up to 70% cheaper)
5. **RTX 4090** has the fastest NVENC encoder
6. **AV1 encoding** only available on RTX 40 series

## Next Steps

* [Distributed Rendering with Blender](https://docs.clore.ai/dev/data-processing-and-pipelines/rendering-farm)
* [Batch Image Processing](https://docs.clore.ai/dev/data-processing-and-pipelines/image-processing)
* [GPU ETL with RAPIDS](https://docs.clore.ai/dev/data-processing-and-pipelines/gpu-etl)
