# Real-Time Video Processing Pipeline

## Real-Time Video Processing Pipeline on Clore.ai

### What We're Building

A complete GPU-accelerated video processing pipeline that:

* **Object detection** on video streams using YOLO
* **Real-time processing** with OpenCV + CUDA
* **Batch mode** for processing video files
* **Streaming output** via WebSocket or RTSP
* Runs on rented **Clore.ai GPUs** for maximum cost efficiency

**Use cases:**

* Security camera analysis
* Sports video analytics
* Traffic monitoring
* Content moderation
* Manufacturing quality inspection

### Prerequisites

* Clore.ai account with **30+ CLORE** balance
* Python 3.10+
* Basic understanding of computer vision

```bash
pip install requests opencv-python-headless ultralytics fastapi uvicorn websockets aiofiles
```

```
                               Clore.ai GPU Server
```

┌────────────────────────────────────────────────────────────────────┐ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │ │ │ Video Input │───▶│ GPU Pipeline │───▶│ Output Streams │ │ │ │ │ │ │ │ │ │ │ │ • RTSP │ │ • Decode │ │ • WebSocket │ │ │ │ • Files │ │ • YOLO │ │ • RTSP │ │ │ │ • HTTP │ │ • Track │ │ • Files │ │ │ │ • Webcam │ │ • Annotate │ │ • Webhook │ │ │ └──────────────┘ └──────────────┘ └──────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────┐ │ │ │ CUDA GPU │ │ │ │ RTX 4090 │ │ │ │ 24GB VRAM │ │ │ └──────────────┘ │ │ │ │ Ports: 22 (SSH), 8000 (API), 8765 (WebSocket), 554 (RTSP) │ └────────────────────────────────────────────────────────────────────┘

````

## Step 1: Set Up the Clore Client

> 📦 **Using the standard Clore API client.** See [Clore API Client Reference](../reference/clore-client.md) for the full implementation and setup instructions. Save it as `clore_client.py` in your project.

```python
from clore_client import CloreClient

client = CloreClient(api_key="your-api-key")
````

## clore\_client.py

import requests import time from typing import Dict, Any, List, Optional from dataclasses import dataclass

@dataclass class GPURental: """GPU rental information.""" order\_id: int server\_id: int ssh\_host: str ssh\_port: int http\_endpoint: str cost\_per\_hour: float gpus: List\[str]

### Step 2: Video Processor Core

```python
# video_processor.py
"""
GPU-accelerated video processing with YOLO object detection.
Supports batch and real-time modes.
"""

import cv2
import time
import queue
import threading
import numpy as np
from pathlib import Path
from typing import Optional, Callable, Dict, List, Any
from dataclasses import dataclass, field
from ultralytics import YOLO

@dataclass
class Detection:
    """Object detection result."""
    class_id: int
    class_name: str
    confidence: float
    bbox: tuple  # (x1, y1, x2, y2)
    track_id: Optional[int] = None

@dataclass
class ProcessedFrame:
    """Processed video frame with detections."""
    frame: np.ndarray
    frame_id: int
    timestamp: float
    detections: List[Detection] = field(default_factory=list)
    processing_time_ms: float = 0.0

@dataclass
class ProcessingConfig:
    """Video processing configuration."""
    model_name: str = "yolov8n.pt"  # YOLOv8 nano for speed
    confidence_threshold: float = 0.5
    iou_threshold: float = 0.5
    device: str = "cuda"  # cuda or cpu
    target_fps: Optional[int] = None
    resize_width: Optional[int] = None
    resize_height: Optional[int] = None
    classes: Optional[List[int]] = None  # Filter specific classes
    enable_tracking: bool = True
    draw_detections: bool = True
    draw_tracks: bool = True
    track_history_length: int = 30

class VideoProcessor:
    """GPU-accelerated video processor with YOLO."""
    
    def __init__(self, config: ProcessingConfig):
        self.config = config
        self.model = None
        self.track_history: Dict[int, List[tuple]] = {}
        self.stats = {
            "frames_processed": 0,
            "total_detections": 0,
            "avg_fps": 0.0,
            "avg_latency_ms": 0.0
        }
        self._load_model()
    
    def _load_model(self):
        """Load YOLO model."""
        print(f"Loading model: {self.config.model_name}")
        self.model = YOLO(self.config.model_name)
        
        # Warm up model with dummy inference
        if self.config.device == "cuda":
            dummy = np.zeros((640, 640, 3), dtype=np.uint8)
            self.model(dummy, verbose=False)
        
        print(f"Model loaded on {self.config.device}")
    
    def process_frame(self, frame: np.ndarray, frame_id: int = 0) -> ProcessedFrame:
        """Process a single frame."""
        start_time = time.time()
        
        # Resize if needed
        original_size = frame.shape[:2]
        if self.config.resize_width and self.config.resize_height:
            frame = cv2.resize(frame, (self.config.resize_width, self.config.resize_height))
        
        # Run YOLO
        if self.config.enable_tracking:
            results = self.model.track(
                frame,
                conf=self.config.confidence_threshold,
                iou=self.config.iou_threshold,
                device=self.config.device,
                classes=self.config.classes,
                persist=True,
                verbose=False
            )
        else:
            results = self.model(
                frame,
                conf=self.config.confidence_threshold,
                iou=self.config.iou_threshold,
                device=self.config.device,
                classes=self.config.classes,
                verbose=False
            )
        
        # Parse detections
        detections = []
        result = results[0]
        
        if result.boxes is not None:
            boxes = result.boxes.xyxy.cpu().numpy()
            confidences = result.boxes.conf.cpu().numpy()
            class_ids = result.boxes.cls.cpu().numpy().astype(int)
            
            track_ids = None
            if hasattr(result.boxes, 'id') and result.boxes.id is not None:
                track_ids = result.boxes.id.cpu().numpy().astype(int)
            
            for i in range(len(boxes)):
                detection = Detection(
                    class_id=class_ids[i],
                    class_name=self.model.names[class_ids[i]],
                    confidence=float(confidences[i]),
                    bbox=tuple(boxes[i].astype(int)),
                    track_id=track_ids[i] if track_ids is not None else None
                )
                detections.append(detection)
                
                # Update track history
                if detection.track_id is not None:
                    center = (
                        int((detection.bbox[0] + detection.bbox[2]) / 2),
                        int((detection.bbox[1] + detection.bbox[3]) / 2)
                    )
                    if detection.track_id not in self.track_history:
                        self.track_history[detection.track_id] = []
                    self.track_history[detection.track_id].append(center)
                    
                    # Limit history length
                    if len(self.track_history[detection.track_id]) > self.config.track_history_length:
                        self.track_history[detection.track_id].pop(0)
        
        # Draw annotations
        annotated_frame = frame.copy()
        if self.config.draw_detections:
            annotated_frame = self._draw_detections(annotated_frame, detections)
        
        if self.config.draw_tracks:
            annotated_frame = self._draw_tracks(annotated_frame)
        
        # Resize back if needed
        if self.config.resize_width and original_size != annotated_frame.shape[:2]:
            annotated_frame = cv2.resize(annotated_frame, (original_size[1], original_size[0]))
        
        processing_time = (time.time() - start_time) * 1000
        
        # Update stats
        self.stats["frames_processed"] += 1
        self.stats["total_detections"] += len(detections)
        self.stats["avg_latency_ms"] = (
            self.stats["avg_latency_ms"] * 0.9 + processing_time * 0.1
        )
        
        return ProcessedFrame(
            frame=annotated_frame,
            frame_id=frame_id,
            timestamp=time.time(),
            detections=detections,
            processing_time_ms=processing_time
        )
    
    def _draw_detections(self, frame: np.ndarray, detections: List[Detection]) -> np.ndarray:
        """Draw detection boxes and labels."""
        for det in detections:
            x1, y1, x2, y2 = det.bbox
            
            # Color based on class
            color = self._get_class_color(det.class_id)
            
            # Draw box
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            
            # Draw label
            label = f"{det.class_name} {det.confidence:.2f}"
            if det.track_id is not None:
                label = f"ID:{det.track_id} {label}"
            
            (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            cv2.rectangle(frame, (x1, y1 - 20), (x1 + w, y1), color, -1)
            cv2.putText(frame, label, (x1, y1 - 5), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
        
        return frame
    
    def _draw_tracks(self, frame: np.ndarray) -> np.ndarray:
        """Draw tracking paths."""
        for track_id, points in self.track_history.items():
            if len(points) < 2:
                continue
            
            color = self._get_class_color(track_id % 80)
            
            for i in range(1, len(points)):
                thickness = int(2 * (i / len(points)) + 1)
                cv2.line(frame, points[i-1], points[i], color, thickness)
        
        return frame
    
    def _get_class_color(self, class_id: int) -> tuple:
        """Get consistent color for a class."""
        np.random.seed(class_id)
        return tuple(int(c) for c in np.random.randint(0, 255, 3))
    
    def get_stats(self) -> Dict:
        """Get processing statistics."""
        return self.stats.copy()

class BatchVideoProcessor:
    """Process video files in batch mode."""
    
    def __init__(self, processor: VideoProcessor):
        self.processor = processor
    
    def process_video(self, input_path: str, output_path: str,
                      progress_callback: Optional[Callable] = None) -> Dict:
        """Process a video file and save results."""
        cap = cv2.VideoCapture(input_path)
        
        if not cap.isOpened():
            raise Exception(f"Cannot open video: {input_path}")
        
        # Get video properties
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        # Create output writer
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
        
        frame_id = 0
        start_time = time.time()
        all_detections = []
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            # Process frame
            result = self.processor.process_frame(frame, frame_id)
            
            # Write output
            out.write(result.frame)
            
            # Collect detections
            for det in result.detections:
                all_detections.append({
                    "frame_id": frame_id,
                    "class": det.class_name,
                    "confidence": det.confidence,
                    "bbox": det.bbox,
                    "track_id": det.track_id
                })
            
            frame_id += 1
            
            if progress_callback and frame_id % 30 == 0:
                progress = frame_id / total_frames * 100
                progress_callback(progress, frame_id, total_frames)
        
        cap.release()
        out.release()
        
        elapsed = time.time() - start_time
        
        return {
            "input": input_path,
            "output": output_path,
            "frames_processed": frame_id,
            "total_detections": len(all_detections),
            "processing_time_s": elapsed,
            "fps": frame_id / elapsed,
            "detections": all_detections
        }

class RealtimeVideoProcessor:
    """Process video streams in real-time."""
    
    def __init__(self, processor: VideoProcessor):
        self.processor = processor
        self.frame_queue: queue.Queue = queue.Queue(maxsize=10)
        self.result_queue: queue.Queue = queue.Queue(maxsize=10)
        self.running = False
        self.capture_thread: Optional[threading.Thread] = None
        self.process_thread: Optional[threading.Thread] = None
    
    def start(self, source: str):
        """Start real-time processing."""
        self.running = True
        
        self.capture_thread = threading.Thread(target=self._capture_loop, args=(source,))
        self.process_thread = threading.Thread(target=self._process_loop)
        
        self.capture_thread.start()
        self.process_thread.start()
    
    def stop(self):
        """Stop real-time processing."""
        self.running = False
        
        if self.capture_thread:
            self.capture_thread.join(timeout=2)
        if self.process_thread:
            self.process_thread.join(timeout=2)
    
    def get_frame(self, timeout: float = 1.0) -> Optional[ProcessedFrame]:
        """Get the latest processed frame."""
        try:
            return self.result_queue.get(timeout=timeout)
        except queue.Empty:
            return None
    
    def _capture_loop(self, source: str):
        """Capture frames from source."""
        cap = cv2.VideoCapture(source)
        
        if not cap.isOpened():
            print(f"Cannot open source: {source}")
            return
        
        frame_id = 0
        
        while self.running:
            ret, frame = cap.read()
            if not ret:
                # Reconnect for streams
                cap.release()
                cap = cv2.VideoCapture(source)
                continue
            
            # Drop old frames if queue is full
            if self.frame_queue.full():
                try:
                    self.frame_queue.get_nowait()
                except queue.Empty:
                    pass
            
            self.frame_queue.put((frame, frame_id))
            frame_id += 1
        
        cap.release()
    
    def _process_loop(self):
        """Process frames from queue."""
        while self.running:
            try:
                frame, frame_id = self.frame_queue.get(timeout=1.0)
            except queue.Empty:
                continue
            
            result = self.processor.process_frame(frame, frame_id)
            
            # Drop old results if queue is full
            if self.result_queue.full():
                try:
                    self.result_queue.get_nowait()
                except queue.Empty:
                    pass
            
            self.result_queue.put(result)
```

### Step 3: FastAPI Video Service

```python
# video_service.py
"""
Video processing service with REST API and WebSocket streaming.
"""

import os
import json
import asyncio
import cv2
import base64
import time
from pathlib import Path
from typing import Optional, List
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, UploadFile, File
from fastapi.responses import StreamingResponse, FileResponse
from pydantic import BaseModel
import aiofiles

from video_processor import (
    VideoProcessor, BatchVideoProcessor, RealtimeVideoProcessor,
    ProcessingConfig, ProcessedFrame
)

# Configuration
MODEL_NAME = os.environ.get("MODEL_NAME", "yolov8n.pt")
UPLOAD_DIR = Path("/tmp/uploads")
OUTPUT_DIR = Path("/tmp/outputs")
UPLOAD_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)

# Global state
processor: Optional[VideoProcessor] = None
realtime_processor: Optional[RealtimeVideoProcessor] = None

class ProcessRequest(BaseModel):
    """Video processing request."""
    source: str  # File path, RTSP URL, or HTTP URL
    output_format: str = "mp4"
    confidence: float = 0.5
    enable_tracking: bool = True
    classes: Optional[List[int]] = None  # COCO class IDs to detect

class StreamConfig(BaseModel):
    """Real-time stream configuration."""
    source: str
    confidence: float = 0.5
    enable_tracking: bool = True
    target_fps: int = 30

class DetectionResponse(BaseModel):
    """Detection result."""
    frame_id: int
    timestamp: float
    detections: List[dict]
    processing_time_ms: float

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan."""
    global processor
    
    config = ProcessingConfig(
        model_name=MODEL_NAME,
        device="cuda",
        enable_tracking=True
    )
    
    processor = VideoProcessor(config)
    
    yield
    
    # Cleanup
    if realtime_processor:
        realtime_processor.stop()

app = FastAPI(
    title="Video Processing API",
    description="GPU-accelerated video analysis with YOLO on Clore.ai",
    version="1.0.0",
    lifespan=lifespan
)

@app.post("/process/video")
async def process_video(file: UploadFile = File(...)):
    """Process uploaded video file."""
    
    # Save uploaded file
    input_path = UPLOAD_DIR / file.filename
    output_path = OUTPUT_DIR / f"processed_{file.filename}"
    
    async with aiofiles.open(input_path, 'wb') as f:
        content = await file.read()
        await f.write(content)
    
    # Process in background
    batch = BatchVideoProcessor(processor)
    
    def progress(pct, current, total):
        print(f"Processing: {pct:.1f}% ({current}/{total})")
    
    result = batch.process_video(str(input_path), str(output_path), progress)
    
    return {
        "status": "completed",
        "output_file": str(output_path),
        "download_url": f"/download/{output_path.name}",
        "stats": {
            "frames": result["frames_processed"],
            "detections": result["total_detections"],
            "processing_time": result["processing_time_s"],
            "fps": result["fps"]
        }
    }

@app.post("/process/url")
async def process_url(request: ProcessRequest):
    """Process video from URL."""
    
    # Update config
    config = ProcessingConfig(
        model_name=MODEL_NAME,
        confidence_threshold=request.confidence,
        enable_tracking=request.enable_tracking,
        classes=request.classes,
        device="cuda"
    )
    
    proc = VideoProcessor(config)
    batch = BatchVideoProcessor(proc)
    
    output_name = f"processed_{int(time.time())}.{request.output_format}"
    output_path = OUTPUT_DIR / output_name
    
    result = batch.process_video(request.source, str(output_path))
    
    return {
        "status": "completed",
        "download_url": f"/download/{output_name}",
        "stats": {
            "frames": result["frames_processed"],
            "detections": result["total_detections"],
            "fps": result["fps"]
        }
    }

@app.get("/download/{filename}")
async def download_file(filename: str):
    """Download processed video."""
    file_path = OUTPUT_DIR / filename
    
    if not file_path.exists():
        raise HTTPException(status_code=404, detail="File not found")
    
    return FileResponse(
        file_path,
        media_type="video/mp4",
        filename=filename
    )

@app.post("/stream/start")
async def start_stream(config: StreamConfig):
    """Start real-time video stream processing."""
    global realtime_processor
    
    if realtime_processor and realtime_processor.running:
        realtime_processor.stop()
    
    proc_config = ProcessingConfig(
        model_name=MODEL_NAME,
        confidence_threshold=config.confidence,
        enable_tracking=config.enable_tracking,
        target_fps=config.target_fps,
        device="cuda"
    )
    
    proc = VideoProcessor(proc_config)
    realtime_processor = RealtimeVideoProcessor(proc)
    realtime_processor.start(config.source)
    
    return {"status": "streaming", "source": config.source}

@app.post("/stream/stop")
async def stop_stream():
    """Stop real-time stream processing."""
    global realtime_processor
    
    if realtime_processor:
        realtime_processor.stop()
        realtime_processor = None
    
    return {"status": "stopped"}

@app.websocket("/ws/stream")
async def websocket_stream(websocket: WebSocket):
    """WebSocket endpoint for streaming processed frames."""
    await websocket.accept()
    
    try:
        while True:
            if realtime_processor and realtime_processor.running:
                frame_result = realtime_processor.get_frame(timeout=0.5)
                
                if frame_result:
                    # Encode frame as JPEG
                    _, buffer = cv2.imencode('.jpg', frame_result.frame, 
                                            [cv2.IMWRITE_JPEG_QUALITY, 80])
                    frame_b64 = base64.b64encode(buffer).decode('utf-8')
                    
                    # Send frame with detections
                    await websocket.send_json({
                        "frame": frame_b64,
                        "frame_id": frame_result.frame_id,
                        "timestamp": frame_result.timestamp,
                        "detections": [
                            {
                                "class": d.class_name,
                                "confidence": d.confidence,
                                "bbox": d.bbox,
                                "track_id": d.track_id
                            }
                            for d in frame_result.detections
                        ],
                        "processing_time_ms": frame_result.processing_time_ms
                    })
                else:
                    await asyncio.sleep(0.01)
            else:
                await asyncio.sleep(0.1)
    
    except WebSocketDisconnect:
        print("WebSocket disconnected")

@app.get("/ws/mjpeg")
async def mjpeg_stream():
    """MJPEG stream endpoint for browser viewing."""
    
    async def generate():
        while True:
            if realtime_processor and realtime_processor.running:
                frame_result = realtime_processor.get_frame(timeout=0.5)
                
                if frame_result:
                    _, buffer = cv2.imencode('.jpg', frame_result.frame)
                    frame_bytes = buffer.tobytes()
                    
                    yield (
                        b'--frame\r\n'
                        b'Content-Type: image/jpeg\r\n\r\n' +
                        frame_bytes +
                        b'\r\n'
                    )
                else:
                    await asyncio.sleep(0.01)
            else:
                await asyncio.sleep(0.1)
    
    return StreamingResponse(
        generate(),
        media_type='multipart/x-mixed-replace; boundary=frame'
    )

@app.post("/detect/image")
async def detect_image(file: UploadFile = File(...)):
    """Detect objects in a single image."""
    
    content = await file.read()
    nparr = np.frombuffer(content, np.uint8)
    image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    
    result = processor.process_frame(image)
    
    # Encode result
    _, buffer = cv2.imencode('.jpg', result.frame)
    result_b64 = base64.b64encode(buffer).decode('utf-8')
    
    return {
        "image": result_b64,
        "detections": [
            {
                "class": d.class_name,
                "confidence": d.confidence,
                "bbox": d.bbox,
                "track_id": d.track_id
            }
            for d in result.detections
        ],
        "processing_time_ms": result.processing_time_ms
    }

@app.get("/health")
async def health():
    """Health check."""
    stats = processor.get_stats() if processor else {}
    
    return {
        "status": "healthy",
        "model": MODEL_NAME,
        "device": "cuda" if cv2.cuda.getCudaEnabledDeviceCount() > 0 else "cpu",
        "streaming": realtime_processor.running if realtime_processor else False,
        "stats": stats
    }

@app.get("/models")
async def list_models():
    """List available YOLO models."""
    return {
        "available": [
            {"name": "yolov8n.pt", "description": "Nano - fastest, least accurate"},
            {"name": "yolov8s.pt", "description": "Small - balanced speed/accuracy"},
            {"name": "yolov8m.pt", "description": "Medium - good accuracy"},
            {"name": "yolov8l.pt", "description": "Large - high accuracy"},
            {"name": "yolov8x.pt", "description": "XLarge - highest accuracy, slowest"},
        ],
        "current": MODEL_NAME
    }

# Need numpy import for detect_image
import numpy as np

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
```

### Step 4: Deployment Script

```python
#!/usr/bin/env python3
"""
Deploy Video Processing Pipeline on Clore.ai

Usage:
    export CLORE_API_KEY=your_api_key
    python deploy_video_pipeline.py
"""

import os
import sys
import time
import requests
from clore_client import CloreClient

# Configuration
DOCKER_IMAGE = "nvidia/cuda:12.8.0-runtime-ubuntu22.04"
SSH_PASSWORD = "VideoClore123!"

# Setup script to run on the server
SETUP_SCRIPT = """#!/bin/bash
set -e

echo "Installing dependencies..."
apt-get update
apt-get install -y python3 python3-pip ffmpeg libsm6 libxext6

echo "Installing Python packages..."
pip3 install opencv-python-headless ultralytics fastapi uvicorn websockets aiofiles

echo "Downloading YOLO models..."
python3 -c "from ultralytics import YOLO; YOLO('yolov8n.pt'); YOLO('yolov8s.pt')"

echo "Setup complete!"
"""

def deploy():
    api_key = os.environ.get("CLORE_API_KEY")
    if not api_key:
        print("❌ Set CLORE_API_KEY environment variable")
        sys.exit(1)
    
    client = CloreClient(api_key)
    
    # Check balance
    print("💰 Checking balance...")
    balance = client.get_balance()
    print(f"   CLORE: {balance:.2f}")
    
    if balance < 20:
        print("❌ Need at least 20 CLORE")
        sys.exit(1)
    
    # Find GPU
    print("\n🔍 Finding GPU for video processing...")
    gpu = client.find_video_gpu(max_price=0.60)
    
    if not gpu:
        print("❌ No suitable GPU found")
        sys.exit(1)
    
    print(f"   Server: {gpu['id']}")
    print(f"   GPU: {', '.join(gpu['gpus'])}")
    print(f"   Price: ${gpu['price_usd']:.2f}/hr")
    print(f"   Reliability: {gpu['reliability']}%")
    
    # Confirm
    confirm = input("\n🚀 Deploy? (y/n): ").strip().lower()
    if confirm != 'y':
        print("Cancelled")
        return
    
    # Rent
    print("\n📦 Creating rental order...")
    rental = client.rent_for_video(
        server_id=gpu["id"],
        ssh_password=SSH_PASSWORD,
        docker_image=DOCKER_IMAGE
    )
    
    print(f"   Order ID: {rental.order_id}")
    print(f"   SSH: ssh root@{rental.ssh_host} -p {rental.ssh_port}")
    
    # Print setup instructions
    print(f"""
{'='*60}
🎉 VIDEO PROCESSING SERVER DEPLOYED!
{'='*60}

📋 Connection Details:
   SSH: ssh root@{rental.ssh_host} -p {rental.ssh_port}
   Password: {SSH_PASSWORD}
   
💾 Setup Instructions:

1. SSH into the server:
   ssh root@{rental.ssh_host} -p {rental.ssh_port}

2. Run setup script:
   apt-get update && apt-get install -y python3 python3-pip ffmpeg libsm6 libxext6
   pip3 install opencv-python-headless ultralytics fastapi uvicorn websockets aiofiles
   python3 -c "from ultralytics import YOLO; YOLO('yolov8n.pt')"

3. Upload video_processor.py and video_service.py

4. Start the service:
   python3 video_service.py

🌐 API Endpoints (after setup):
   POST /process/video - Upload and process video
   POST /stream/start  - Start real-time stream
   GET  /ws/mjpeg      - View stream in browser
   WS   /ws/stream     - WebSocket for detections
   
💰 Cost:
   ${rental.cost_per_hour:.2f}/hour
   ~${rental.cost_per_hour * 24:.2f}/day

Press Enter to cancel the order...
""")
    
    input()
    
    print("🧹 Cleaning up...")
    client.cancel_order(rental.order_id)
    print("✅ Done!")

if __name__ == "__main__":
    deploy()
```

### Step 5: Client Examples

```python
# client_examples.py
"""
Example clients for the video processing service.
"""

import cv2
import json
import asyncio
import websockets
import requests
import base64
import numpy as np

def process_local_video(server_url: str, video_path: str):
    """Upload and process a local video file."""
    
    print(f"Uploading {video_path}...")
    
    with open(video_path, 'rb') as f:
        files = {'file': f}
        response = requests.post(f"{server_url}/process/video", files=files)
    
    if response.status_code == 200:
        result = response.json()
        print(f"Processing complete!")
        print(f"  Frames: {result['stats']['frames']}")
        print(f"  Detections: {result['stats']['detections']}")
        print(f"  FPS: {result['stats']['fps']:.1f}")
        print(f"  Download: {server_url}{result['download_url']}")
        
        return result
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return None

def process_rtsp_stream(server_url: str, rtsp_url: str):
    """Start processing an RTSP stream."""
    
    print(f"Starting stream processing: {rtsp_url}")
    
    config = {
        "source": rtsp_url,
        "confidence": 0.5,
        "enable_tracking": True,
        "target_fps": 30
    }
    
    response = requests.post(f"{server_url}/stream/start", json=config)
    
    if response.status_code == 200:
        print("Stream started!")
        print(f"View in browser: {server_url}/ws/mjpeg")
        return True
    else:
        print(f"Error: {response.status_code}")
        return False

async def websocket_client(server_url: str, callback):
    """Connect to WebSocket and receive detections."""
    
    ws_url = server_url.replace("http", "ws") + "/ws/stream"
    
    print(f"Connecting to {ws_url}...")
    
    async with websockets.connect(ws_url) as websocket:
        print("Connected!")
        
        while True:
            try:
                message = await websocket.recv()
                data = json.loads(message)
                
                # Decode frame
                frame_bytes = base64.b64decode(data['frame'])
                nparr = np.frombuffer(frame_bytes, np.uint8)
                frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
                
                # Call user callback
                callback(frame, data['detections'], data['processing_time_ms'])
                
            except websockets.ConnectionClosed:
                print("Connection closed")
                break

def view_mjpeg_stream(server_url: str):
    """View MJPEG stream with OpenCV."""
    
    stream_url = f"{server_url}/ws/mjpeg"
    
    cap = cv2.VideoCapture(stream_url)
    
    if not cap.isOpened():
        print("Cannot open stream")
        return
    
    print("Viewing stream... Press 'q' to quit")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            continue
        
        cv2.imshow("Video Stream", frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

# Detection callback for WebSocket
def on_detection(frame, detections, latency_ms):
    """Handle incoming frame and detections."""
    
    print(f"Frame received - {len(detections)} detections, {latency_ms:.1f}ms")
    
    for det in detections:
        print(f"  - {det['class']}: {det['confidence']:.2f}")
    
    # Display frame
    cv2.imshow("Detections", frame)
    cv2.waitKey(1)

# Example usage
if __name__ == "__main__":
    import sys
    
    SERVER = "http://localhost:8000"  # Replace with your server URL
    
    if len(sys.argv) < 2:
        print("Usage:")
        print("  python client_examples.py upload video.mp4")
        print("  python client_examples.py stream rtsp://camera/stream")
        print("  python client_examples.py view")
        print("  python client_examples.py websocket")
        sys.exit(1)
    
    command = sys.argv[1]
    
    if command == "upload" and len(sys.argv) > 2:
        process_local_video(SERVER, sys.argv[2])
    
    elif command == "stream" and len(sys.argv) > 2:
        process_rtsp_stream(SERVER, sys.argv[2])
    
    elif command == "view":
        view_mjpeg_stream(SERVER)
    
    elif command == "websocket":
        asyncio.run(websocket_client(SERVER, on_detection))
    
    else:
        print("Unknown command")
```

### Full Deployment Script

```python
#!/usr/bin/env python3
"""
Complete Video Processing Pipeline Deployment

This script:
1. Rents a Clore.ai GPU
2. Sets up the environment
3. Deploys the video service
4. Provides usage examples

Usage:
    export CLORE_API_KEY=your_api_key
    python full_deploy.py
"""

import os
import sys
import time
import subprocess
import requests
from pathlib import Path

# Import our modules
from clore_client import CloreClient

def main():
    api_key = os.environ.get("CLORE_API_KEY")
    if not api_key:
        print("❌ Set CLORE_API_KEY")
        sys.exit(1)
    
    client = CloreClient(api_key)
    
    # Check balance
    print("💰 Balance check...")
    balance = client.get_balance()
    print(f"   CLORE: {balance:.2f}")
    
    if balance < 25:
        print("❌ Need 25+ CLORE")
        sys.exit(1)
    
    # Find GPU
    print("\n🔍 Finding GPU...")
    gpu = client.find_video_gpu(max_price=0.50)
    
    if not gpu:
        # Try higher price
        gpu = client.find_video_gpu(max_price=0.80)
    
    if not gpu:
        print("❌ No GPU available")
        sys.exit(1)
    
    print(f"   Found: {gpu['gpus'][0]} at ${gpu['price_usd']:.2f}/hr")
    
    # Deploy
    print("\n🚀 Deploying...")
    ssh_pass = "VideoProc123!"
    
    rental = client.rent_for_video(
        server_id=gpu["id"],
        ssh_password=ssh_pass
    )
    
    print(f"   Order: {rental.order_id}")
    print(f"   SSH: root@{rental.ssh_host}:{rental.ssh_port}")
    
    # Wait for service
    print("\n⏳ Waiting for server...")
    time.sleep(30)  # Initial wait
    
    # Print summary
    print(f"""
{'='*60}
🎬 VIDEO PROCESSING PIPELINE READY
{'='*60}

📡 Server Info:
   SSH: ssh root@{rental.ssh_host} -p {rental.ssh_port}
   Password: {ssh_pass}

📋 Quick Setup (run on server):

# Install dependencies
apt-get update && apt-get install -y python3 python3-pip ffmpeg libsm6 libxext6
pip3 install opencv-python-headless ultralytics fastapi uvicorn websockets aiofiles

# Download YOLO model
python3 -c "from ultralytics import YOLO; YOLO('yolov8n.pt')"

# Start service (after uploading video_service.py)
python3 video_service.py

🌐 API Endpoints:
   POST /process/video  - Process uploaded video
   POST /process/url    - Process video from URL
   POST /stream/start   - Start RTSP stream processing
   GET  /ws/mjpeg       - View live stream
   WS   /ws/stream      - WebSocket detections

💰 Cost: ${rental.cost_per_hour:.2f}/hr (~${rental.cost_per_hour * 24:.2f}/day)

📝 Example - Process RTSP stream:
   curl -X POST http://{rental.ssh_host}:8000/stream/start \\
     -H "Content-Type: application/json" \\
     -d '{{"source": "rtsp://camera/stream"}}'

📝 Example - View in browser:
   http://{rental.ssh_host}:8000/ws/mjpeg

Press Enter to cancel order and cleanup...
""")
    
    input()
    
    print("\n🧹 Cleaning up...")
    client.cancel_order(rental.order_id)
    print("✅ Done!")

if __name__ == "__main__":
    main()
```

### Cost Comparison

| Task              | GPU         | Clore.ai   | AWS g4dn | Google Cloud | Savings |
| ----------------- | ----------- | ---------- | -------- | ------------ | ------- |
| 1080p @ 30fps     | RTX 3090    | \~$0.25/hr | $0.71/hr | $0.73/hr     | 65%     |
| 4K @ 30fps        | RTX 4090    | \~$0.40/hr | N/A      | N/A          | N/A     |
| Multi-stream (4x) | 2x RTX 4090 | \~$0.80/hr | $2.84/hr | $2.92/hr     | 72%     |
| Batch processing  | A100        | \~$1.50/hr | $4.10/hr | $3.67/hr     | 60%     |

**Example: 24/7 security monitoring**

* Clore.ai (RTX 3090): \~$180/month
* AWS (g4dn.xlarge): \~$512/month
* **Savings: $332/month (65%)**

### Performance Benchmarks

| Model   | GPU      | Resolution | FPS | Latency |
| ------- | -------- | ---------- | --- | ------- |
| YOLOv8n | RTX 3090 | 1080p      | 120 | 8ms     |
| YOLOv8s | RTX 3090 | 1080p      | 90  | 11ms    |
| YOLOv8m | RTX 4090 | 1080p      | 85  | 12ms    |
| YOLOv8l | RTX 4090 | 1080p      | 55  | 18ms    |
| YOLOv8x | A100     | 1080p      | 70  | 14ms    |

### Next Steps

* [Multi-Model Inference Router](https://docs.clore.ai/dev/inference-and-deployment/model-router) — Route between detection models
* [Deploy Model as REST API](https://docs.clore.ai/dev/inference-and-deployment/rest-api-deployment) — General model deployment
* [Spot Bidding Strategies](https://github.com/defiocean/dev/blob/main/cost-optimization/spot-bidding.md) — Reduce costs further
