import requests import time from typing import Dict, Any, List, Optional from dataclasses import dataclass
@dataclass class GPURental: """GPU rental information.""" order_id: int server_id: int ssh_host: str ssh_port: int http_endpoint: str cost_per_hour: float gpus: List[str]
## Step 1: Set Up the Clore Client
> π¦ **Using the standard Clore API client.** See [Clore API Client Reference](../reference/clore-client.md) for the full implementation and setup instructions. Save it as `clore_client.py` in your project.
```python
from clore_client import CloreClient
client = CloreClient(api_key="your-api-key")
# video_processor.py
"""
GPU-accelerated video processing with YOLO object detection.
Supports batch and real-time modes.
"""
import cv2
import time
import queue
import threading
import numpy as np
from pathlib import Path
from typing import Optional, Callable, Dict, List, Any
from dataclasses import dataclass, field
from ultralytics import YOLO
@dataclass
class Detection:
"""Object detection result."""
class_id: int
class_name: str
confidence: float
bbox: tuple # (x1, y1, x2, y2)
track_id: Optional[int] = None
@dataclass
class ProcessedFrame:
"""Processed video frame with detections."""
frame: np.ndarray
frame_id: int
timestamp: float
detections: List[Detection] = field(default_factory=list)
processing_time_ms: float = 0.0
@dataclass
class ProcessingConfig:
"""Video processing configuration."""
model_name: str = "yolov8n.pt" # YOLOv8 nano for speed
confidence_threshold: float = 0.5
iou_threshold: float = 0.5
device: str = "cuda" # cuda or cpu
target_fps: Optional[int] = None
resize_width: Optional[int] = None
resize_height: Optional[int] = None
classes: Optional[List[int]] = None # Filter specific classes
enable_tracking: bool = True
draw_detections: bool = True
draw_tracks: bool = True
track_history_length: int = 30
class VideoProcessor:
"""GPU-accelerated video processor with YOLO."""
def __init__(self, config: ProcessingConfig):
self.config = config
self.model = None
self.track_history: Dict[int, List[tuple]] = {}
self.stats = {
"frames_processed": 0,
"total_detections": 0,
"avg_fps": 0.0,
"avg_latency_ms": 0.0
}
self._load_model()
def _load_model(self):
"""Load YOLO model."""
print(f"Loading model: {self.config.model_name}")
self.model = YOLO(self.config.model_name)
# Warm up model with dummy inference
if self.config.device == "cuda":
dummy = np.zeros((640, 640, 3), dtype=np.uint8)
self.model(dummy, verbose=False)
print(f"Model loaded on {self.config.device}")
def process_frame(self, frame: np.ndarray, frame_id: int = 0) -> ProcessedFrame:
"""Process a single frame."""
start_time = time.time()
# Resize if needed
original_size = frame.shape[:2]
if self.config.resize_width and self.config.resize_height:
frame = cv2.resize(frame, (self.config.resize_width, self.config.resize_height))
# Run YOLO
if self.config.enable_tracking:
results = self.model.track(
frame,
conf=self.config.confidence_threshold,
iou=self.config.iou_threshold,
device=self.config.device,
classes=self.config.classes,
persist=True,
verbose=False
)
else:
results = self.model(
frame,
conf=self.config.confidence_threshold,
iou=self.config.iou_threshold,
device=self.config.device,
classes=self.config.classes,
verbose=False
)
# Parse detections
detections = []
result = results[0]
if result.boxes is not None:
boxes = result.boxes.xyxy.cpu().numpy()
confidences = result.boxes.conf.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy().astype(int)
track_ids = None
if hasattr(result.boxes, 'id') and result.boxes.id is not None:
track_ids = result.boxes.id.cpu().numpy().astype(int)
for i in range(len(boxes)):
detection = Detection(
class_id=class_ids[i],
class_name=self.model.names[class_ids[i]],
confidence=float(confidences[i]),
bbox=tuple(boxes[i].astype(int)),
track_id=track_ids[i] if track_ids is not None else None
)
detections.append(detection)
# Update track history
if detection.track_id is not None:
center = (
int((detection.bbox[0] + detection.bbox[2]) / 2),
int((detection.bbox[1] + detection.bbox[3]) / 2)
)
if detection.track_id not in self.track_history:
self.track_history[detection.track_id] = []
self.track_history[detection.track_id].append(center)
# Limit history length
if len(self.track_history[detection.track_id]) > self.config.track_history_length:
self.track_history[detection.track_id].pop(0)
# Draw annotations
annotated_frame = frame.copy()
if self.config.draw_detections:
annotated_frame = self._draw_detections(annotated_frame, detections)
if self.config.draw_tracks:
annotated_frame = self._draw_tracks(annotated_frame)
# Resize back if needed
if self.config.resize_width and original_size != annotated_frame.shape[:2]:
annotated_frame = cv2.resize(annotated_frame, (original_size[1], original_size[0]))
processing_time = (time.time() - start_time) * 1000
# Update stats
self.stats["frames_processed"] += 1
self.stats["total_detections"] += len(detections)
self.stats["avg_latency_ms"] = (
self.stats["avg_latency_ms"] * 0.9 + processing_time * 0.1
)
return ProcessedFrame(
frame=annotated_frame,
frame_id=frame_id,
timestamp=time.time(),
detections=detections,
processing_time_ms=processing_time
)
def _draw_detections(self, frame: np.ndarray, detections: List[Detection]) -> np.ndarray:
"""Draw detection boxes and labels."""
for det in detections:
x1, y1, x2, y2 = det.bbox
# Color based on class
color = self._get_class_color(det.class_id)
# Draw box
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# Draw label
label = f"{det.class_name} {det.confidence:.2f}"
if det.track_id is not None:
label = f"ID:{det.track_id} {label}"
(w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(frame, (x1, y1 - 20), (x1 + w, y1), color, -1)
cv2.putText(frame, label, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return frame
def _draw_tracks(self, frame: np.ndarray) -> np.ndarray:
"""Draw tracking paths."""
for track_id, points in self.track_history.items():
if len(points) < 2:
continue
color = self._get_class_color(track_id % 80)
for i in range(1, len(points)):
thickness = int(2 * (i / len(points)) + 1)
cv2.line(frame, points[i-1], points[i], color, thickness)
return frame
def _get_class_color(self, class_id: int) -> tuple:
"""Get consistent color for a class."""
np.random.seed(class_id)
return tuple(int(c) for c in np.random.randint(0, 255, 3))
def get_stats(self) -> Dict:
"""Get processing statistics."""
return self.stats.copy()
class BatchVideoProcessor:
"""Process video files in batch mode."""
def __init__(self, processor: VideoProcessor):
self.processor = processor
def process_video(self, input_path: str, output_path: str,
progress_callback: Optional[Callable] = None) -> Dict:
"""Process a video file and save results."""
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
raise Exception(f"Cannot open video: {input_path}")
# Get video properties
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Create output writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
frame_id = 0
start_time = time.time()
all_detections = []
while True:
ret, frame = cap.read()
if not ret:
break
# Process frame
result = self.processor.process_frame(frame, frame_id)
# Write output
out.write(result.frame)
# Collect detections
for det in result.detections:
all_detections.append({
"frame_id": frame_id,
"class": det.class_name,
"confidence": det.confidence,
"bbox": det.bbox,
"track_id": det.track_id
})
frame_id += 1
if progress_callback and frame_id % 30 == 0:
progress = frame_id / total_frames * 100
progress_callback(progress, frame_id, total_frames)
cap.release()
out.release()
elapsed = time.time() - start_time
return {
"input": input_path,
"output": output_path,
"frames_processed": frame_id,
"total_detections": len(all_detections),
"processing_time_s": elapsed,
"fps": frame_id / elapsed,
"detections": all_detections
}
class RealtimeVideoProcessor:
"""Process video streams in real-time."""
def __init__(self, processor: VideoProcessor):
self.processor = processor
self.frame_queue: queue.Queue = queue.Queue(maxsize=10)
self.result_queue: queue.Queue = queue.Queue(maxsize=10)
self.running = False
self.capture_thread: Optional[threading.Thread] = None
self.process_thread: Optional[threading.Thread] = None
def start(self, source: str):
"""Start real-time processing."""
self.running = True
self.capture_thread = threading.Thread(target=self._capture_loop, args=(source,))
self.process_thread = threading.Thread(target=self._process_loop)
self.capture_thread.start()
self.process_thread.start()
def stop(self):
"""Stop real-time processing."""
self.running = False
if self.capture_thread:
self.capture_thread.join(timeout=2)
if self.process_thread:
self.process_thread.join(timeout=2)
def get_frame(self, timeout: float = 1.0) -> Optional[ProcessedFrame]:
"""Get the latest processed frame."""
try:
return self.result_queue.get(timeout=timeout)
except queue.Empty:
return None
def _capture_loop(self, source: str):
"""Capture frames from source."""
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Cannot open source: {source}")
return
frame_id = 0
while self.running:
ret, frame = cap.read()
if not ret:
# Reconnect for streams
cap.release()
cap = cv2.VideoCapture(source)
continue
# Drop old frames if queue is full
if self.frame_queue.full():
try:
self.frame_queue.get_nowait()
except queue.Empty:
pass
self.frame_queue.put((frame, frame_id))
frame_id += 1
cap.release()
def _process_loop(self):
"""Process frames from queue."""
while self.running:
try:
frame, frame_id = self.frame_queue.get(timeout=1.0)
except queue.Empty:
continue
result = self.processor.process_frame(frame, frame_id)
# Drop old results if queue is full
if self.result_queue.full():
try:
self.result_queue.get_nowait()
except queue.Empty:
pass
self.result_queue.put(result)
# video_service.py
"""
Video processing service with REST API and WebSocket streaming.
"""
import os
import json
import asyncio
import cv2
import base64
import time
from pathlib import Path
from typing import Optional, List
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, UploadFile, File
from fastapi.responses import StreamingResponse, FileResponse
from pydantic import BaseModel
import aiofiles
from video_processor import (
VideoProcessor, BatchVideoProcessor, RealtimeVideoProcessor,
ProcessingConfig, ProcessedFrame
)
# Configuration
MODEL_NAME = os.environ.get("MODEL_NAME", "yolov8n.pt")
UPLOAD_DIR = Path("/tmp/uploads")
OUTPUT_DIR = Path("/tmp/outputs")
UPLOAD_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
# Global state
processor: Optional[VideoProcessor] = None
realtime_processor: Optional[RealtimeVideoProcessor] = None
class ProcessRequest(BaseModel):
"""Video processing request."""
source: str # File path, RTSP URL, or HTTP URL
output_format: str = "mp4"
confidence: float = 0.5
enable_tracking: bool = True
classes: Optional[List[int]] = None # COCO class IDs to detect
class StreamConfig(BaseModel):
"""Real-time stream configuration."""
source: str
confidence: float = 0.5
enable_tracking: bool = True
target_fps: int = 30
class DetectionResponse(BaseModel):
"""Detection result."""
frame_id: int
timestamp: float
detections: List[dict]
processing_time_ms: float
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan."""
global processor
config = ProcessingConfig(
model_name=MODEL_NAME,
device="cuda",
enable_tracking=True
)
processor = VideoProcessor(config)
yield
# Cleanup
if realtime_processor:
realtime_processor.stop()
app = FastAPI(
title="Video Processing API",
description="GPU-accelerated video analysis with YOLO on Clore.ai",
version="1.0.0",
lifespan=lifespan
)
@app.post("/process/video")
async def process_video(file: UploadFile = File(...)):
"""Process uploaded video file."""
# Save uploaded file
input_path = UPLOAD_DIR / file.filename
output_path = OUTPUT_DIR / f"processed_{file.filename}"
async with aiofiles.open(input_path, 'wb') as f:
content = await file.read()
await f.write(content)
# Process in background
batch = BatchVideoProcessor(processor)
def progress(pct, current, total):
print(f"Processing: {pct:.1f}% ({current}/{total})")
result = batch.process_video(str(input_path), str(output_path), progress)
return {
"status": "completed",
"output_file": str(output_path),
"download_url": f"/download/{output_path.name}",
"stats": {
"frames": result["frames_processed"],
"detections": result["total_detections"],
"processing_time": result["processing_time_s"],
"fps": result["fps"]
}
}
@app.post("/process/url")
async def process_url(request: ProcessRequest):
"""Process video from URL."""
# Update config
config = ProcessingConfig(
model_name=MODEL_NAME,
confidence_threshold=request.confidence,
enable_tracking=request.enable_tracking,
classes=request.classes,
device="cuda"
)
proc = VideoProcessor(config)
batch = BatchVideoProcessor(proc)
output_name = f"processed_{int(time.time())}.{request.output_format}"
output_path = OUTPUT_DIR / output_name
result = batch.process_video(request.source, str(output_path))
return {
"status": "completed",
"download_url": f"/download/{output_name}",
"stats": {
"frames": result["frames_processed"],
"detections": result["total_detections"],
"fps": result["fps"]
}
}
@app.get("/download/{filename}")
async def download_file(filename: str):
"""Download processed video."""
file_path = OUTPUT_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(
file_path,
media_type="video/mp4",
filename=filename
)
@app.post("/stream/start")
async def start_stream(config: StreamConfig):
"""Start real-time video stream processing."""
global realtime_processor
if realtime_processor and realtime_processor.running:
realtime_processor.stop()
proc_config = ProcessingConfig(
model_name=MODEL_NAME,
confidence_threshold=config.confidence,
enable_tracking=config.enable_tracking,
target_fps=config.target_fps,
device="cuda"
)
proc = VideoProcessor(proc_config)
realtime_processor = RealtimeVideoProcessor(proc)
realtime_processor.start(config.source)
return {"status": "streaming", "source": config.source}
@app.post("/stream/stop")
async def stop_stream():
"""Stop real-time stream processing."""
global realtime_processor
if realtime_processor:
realtime_processor.stop()
realtime_processor = None
return {"status": "stopped"}
@app.websocket("/ws/stream")
async def websocket_stream(websocket: WebSocket):
"""WebSocket endpoint for streaming processed frames."""
await websocket.accept()
try:
while True:
if realtime_processor and realtime_processor.running:
frame_result = realtime_processor.get_frame(timeout=0.5)
if frame_result:
# Encode frame as JPEG
_, buffer = cv2.imencode('.jpg', frame_result.frame,
[cv2.IMWRITE_JPEG_QUALITY, 80])
frame_b64 = base64.b64encode(buffer).decode('utf-8')
# Send frame with detections
await websocket.send_json({
"frame": frame_b64,
"frame_id": frame_result.frame_id,
"timestamp": frame_result.timestamp,
"detections": [
{
"class": d.class_name,
"confidence": d.confidence,
"bbox": d.bbox,
"track_id": d.track_id
}
for d in frame_result.detections
],
"processing_time_ms": frame_result.processing_time_ms
})
else:
await asyncio.sleep(0.01)
else:
await asyncio.sleep(0.1)
except WebSocketDisconnect:
print("WebSocket disconnected")
@app.get("/ws/mjpeg")
async def mjpeg_stream():
"""MJPEG stream endpoint for browser viewing."""
async def generate():
while True:
if realtime_processor and realtime_processor.running:
frame_result = realtime_processor.get_frame(timeout=0.5)
if frame_result:
_, buffer = cv2.imencode('.jpg', frame_result.frame)
frame_bytes = buffer.tobytes()
yield (
b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' +
frame_bytes +
b'\r\n'
)
else:
await asyncio.sleep(0.01)
else:
await asyncio.sleep(0.1)
return StreamingResponse(
generate(),
media_type='multipart/x-mixed-replace; boundary=frame'
)
@app.post("/detect/image")
async def detect_image(file: UploadFile = File(...)):
"""Detect objects in a single image."""
content = await file.read()
nparr = np.frombuffer(content, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
result = processor.process_frame(image)
# Encode result
_, buffer = cv2.imencode('.jpg', result.frame)
result_b64 = base64.b64encode(buffer).decode('utf-8')
return {
"image": result_b64,
"detections": [
{
"class": d.class_name,
"confidence": d.confidence,
"bbox": d.bbox,
"track_id": d.track_id
}
for d in result.detections
],
"processing_time_ms": result.processing_time_ms
}
@app.get("/health")
async def health():
"""Health check."""
stats = processor.get_stats() if processor else {}
return {
"status": "healthy",
"model": MODEL_NAME,
"device": "cuda" if cv2.cuda.getCudaEnabledDeviceCount() > 0 else "cpu",
"streaming": realtime_processor.running if realtime_processor else False,
"stats": stats
}
@app.get("/models")
async def list_models():
"""List available YOLO models."""
return {
"available": [
{"name": "yolov8n.pt", "description": "Nano - fastest, least accurate"},
{"name": "yolov8s.pt", "description": "Small - balanced speed/accuracy"},
{"name": "yolov8m.pt", "description": "Medium - good accuracy"},
{"name": "yolov8l.pt", "description": "Large - high accuracy"},
{"name": "yolov8x.pt", "description": "XLarge - highest accuracy, slowest"},
],
"current": MODEL_NAME
}
# Need numpy import for detect_image
import numpy as np
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
#!/usr/bin/env python3
"""
Deploy Video Processing Pipeline on Clore.ai
Usage:
export CLORE_API_KEY=your_api_key
python deploy_video_pipeline.py
"""
import os
import sys
import time
import requests
from clore_client import CloreClient
# Configuration
DOCKER_IMAGE = "nvidia/cuda:12.8.0-runtime-ubuntu22.04"
SSH_PASSWORD = "VideoClore123!"
# Setup script to run on the server
SETUP_SCRIPT = """#!/bin/bash
set -e
echo "Installing dependencies..."
apt-get update
apt-get install -y python3 python3-pip ffmpeg libsm6 libxext6
echo "Installing Python packages..."
pip3 install opencv-python-headless ultralytics fastapi uvicorn websockets aiofiles
echo "Downloading YOLO models..."
python3 -c "from ultralytics import YOLO; YOLO('yolov8n.pt'); YOLO('yolov8s.pt')"
echo "Setup complete!"
"""
def deploy():
api_key = os.environ.get("CLORE_API_KEY")
if not api_key:
print("β Set CLORE_API_KEY environment variable")
sys.exit(1)
client = CloreClient(api_key)
# Check balance
print("π° Checking balance...")
balance = client.get_balance()
print(f" CLORE: {balance:.2f}")
if balance < 20:
print("β Need at least 20 CLORE")
sys.exit(1)
# Find GPU
print("\nπ Finding GPU for video processing...")
gpu = client.find_video_gpu(max_price=0.60)
if not gpu:
print("β No suitable GPU found")
sys.exit(1)
print(f" Server: {gpu['id']}")
print(f" GPU: {', '.join(gpu['gpus'])}")
print(f" Price: ${gpu['price_usd']:.2f}/hr")
print(f" Reliability: {gpu['reliability']}%")
# Confirm
confirm = input("\nπ Deploy? (y/n): ").strip().lower()
if confirm != 'y':
print("Cancelled")
return
# Rent
print("\nπ¦ Creating rental order...")
rental = client.rent_for_video(
server_id=gpu["id"],
ssh_password=SSH_PASSWORD,
docker_image=DOCKER_IMAGE
)
print(f" Order ID: {rental.order_id}")
print(f" SSH: ssh root@{rental.ssh_host} -p {rental.ssh_port}")
# Print setup instructions
print(f"""
{'='*60}
π VIDEO PROCESSING SERVER DEPLOYED!
{'='*60}
π Connection Details:
SSH: ssh root@{rental.ssh_host} -p {rental.ssh_port}
Password: {SSH_PASSWORD}
πΎ Setup Instructions:
1. SSH into the server:
ssh root@{rental.ssh_host} -p {rental.ssh_port}
2. Run setup script:
apt-get update && apt-get install -y python3 python3-pip ffmpeg libsm6 libxext6
pip3 install opencv-python-headless ultralytics fastapi uvicorn websockets aiofiles
python3 -c "from ultralytics import YOLO; YOLO('yolov8n.pt')"
3. Upload video_processor.py and video_service.py
4. Start the service:
python3 video_service.py
π API Endpoints (after setup):
POST /process/video - Upload and process video
POST /stream/start - Start real-time stream
GET /ws/mjpeg - View stream in browser
WS /ws/stream - WebSocket for detections
π° Cost:
${rental.cost_per_hour:.2f}/hour
~${rental.cost_per_hour * 24:.2f}/day
Press Enter to cancel the order...
""")
input()
print("π§Ή Cleaning up...")
client.cancel_order(rental.order_id)
print("β Done!")
if __name__ == "__main__":
deploy()
# client_examples.py
"""
Example clients for the video processing service.
"""
import cv2
import json
import asyncio
import websockets
import requests
import base64
import numpy as np
def process_local_video(server_url: str, video_path: str):
"""Upload and process a local video file."""
print(f"Uploading {video_path}...")
with open(video_path, 'rb') as f:
files = {'file': f}
response = requests.post(f"{server_url}/process/video", files=files)
if response.status_code == 200:
result = response.json()
print(f"Processing complete!")
print(f" Frames: {result['stats']['frames']}")
print(f" Detections: {result['stats']['detections']}")
print(f" FPS: {result['stats']['fps']:.1f}")
print(f" Download: {server_url}{result['download_url']}")
return result
else:
print(f"Error: {response.status_code} - {response.text}")
return None
def process_rtsp_stream(server_url: str, rtsp_url: str):
"""Start processing an RTSP stream."""
print(f"Starting stream processing: {rtsp_url}")
config = {
"source": rtsp_url,
"confidence": 0.5,
"enable_tracking": True,
"target_fps": 30
}
response = requests.post(f"{server_url}/stream/start", json=config)
if response.status_code == 200:
print("Stream started!")
print(f"View in browser: {server_url}/ws/mjpeg")
return True
else:
print(f"Error: {response.status_code}")
return False
async def websocket_client(server_url: str, callback):
"""Connect to WebSocket and receive detections."""
ws_url = server_url.replace("http", "ws") + "/ws/stream"
print(f"Connecting to {ws_url}...")
async with websockets.connect(ws_url) as websocket:
print("Connected!")
while True:
try:
message = await websocket.recv()
data = json.loads(message)
# Decode frame
frame_bytes = base64.b64decode(data['frame'])
nparr = np.frombuffer(frame_bytes, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# Call user callback
callback(frame, data['detections'], data['processing_time_ms'])
except websockets.ConnectionClosed:
print("Connection closed")
break
def view_mjpeg_stream(server_url: str):
"""View MJPEG stream with OpenCV."""
stream_url = f"{server_url}/ws/mjpeg"
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
print("Cannot open stream")
return
print("Viewing stream... Press 'q' to quit")
while True:
ret, frame = cap.read()
if not ret:
continue
cv2.imshow("Video Stream", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
# Detection callback for WebSocket
def on_detection(frame, detections, latency_ms):
"""Handle incoming frame and detections."""
print(f"Frame received - {len(detections)} detections, {latency_ms:.1f}ms")
for det in detections:
print(f" - {det['class']}: {det['confidence']:.2f}")
# Display frame
cv2.imshow("Detections", frame)
cv2.waitKey(1)
# Example usage
if __name__ == "__main__":
import sys
SERVER = "http://localhost:8000" # Replace with your server URL
if len(sys.argv) < 2:
print("Usage:")
print(" python client_examples.py upload video.mp4")
print(" python client_examples.py stream rtsp://camera/stream")
print(" python client_examples.py view")
print(" python client_examples.py websocket")
sys.exit(1)
command = sys.argv[1]
if command == "upload" and len(sys.argv) > 2:
process_local_video(SERVER, sys.argv[2])
elif command == "stream" and len(sys.argv) > 2:
process_rtsp_stream(SERVER, sys.argv[2])
elif command == "view":
view_mjpeg_stream(SERVER)
elif command == "websocket":
asyncio.run(websocket_client(SERVER, on_detection))
else:
print("Unknown command")
#!/usr/bin/env python3
"""
Complete Video Processing Pipeline Deployment
This script:
1. Rents a Clore.ai GPU
2. Sets up the environment
3. Deploys the video service
4. Provides usage examples
Usage:
export CLORE_API_KEY=your_api_key
python full_deploy.py
"""
import os
import sys
import time
import subprocess
import requests
from pathlib import Path
# Import our modules
from clore_client import CloreClient
def main():
api_key = os.environ.get("CLORE_API_KEY")
if not api_key:
print("β Set CLORE_API_KEY")
sys.exit(1)
client = CloreClient(api_key)
# Check balance
print("π° Balance check...")
balance = client.get_balance()
print(f" CLORE: {balance:.2f}")
if balance < 25:
print("β Need 25+ CLORE")
sys.exit(1)
# Find GPU
print("\nπ Finding GPU...")
gpu = client.find_video_gpu(max_price=0.50)
if not gpu:
# Try higher price
gpu = client.find_video_gpu(max_price=0.80)
if not gpu:
print("β No GPU available")
sys.exit(1)
print(f" Found: {gpu['gpus'][0]} at ${gpu['price_usd']:.2f}/hr")
# Deploy
print("\nπ Deploying...")
ssh_pass = "VideoProc123!"
rental = client.rent_for_video(
server_id=gpu["id"],
ssh_password=ssh_pass
)
print(f" Order: {rental.order_id}")
print(f" SSH: root@{rental.ssh_host}:{rental.ssh_port}")
# Wait for service
print("\nβ³ Waiting for server...")
time.sleep(30) # Initial wait
# Print summary
print(f"""
{'='*60}
π¬ VIDEO PROCESSING PIPELINE READY
{'='*60}
π‘ Server Info:
SSH: ssh root@{rental.ssh_host} -p {rental.ssh_port}
Password: {ssh_pass}
π Quick Setup (run on server):
# Install dependencies
apt-get update && apt-get install -y python3 python3-pip ffmpeg libsm6 libxext6
pip3 install opencv-python-headless ultralytics fastapi uvicorn websockets aiofiles
# Download YOLO model
python3 -c "from ultralytics import YOLO; YOLO('yolov8n.pt')"
# Start service (after uploading video_service.py)
python3 video_service.py
π API Endpoints:
POST /process/video - Process uploaded video
POST /process/url - Process video from URL
POST /stream/start - Start RTSP stream processing
GET /ws/mjpeg - View live stream
WS /ws/stream - WebSocket detections
π° Cost: ${rental.cost_per_hour:.2f}/hr (~${rental.cost_per_hour * 24:.2f}/day)
π Example - Process RTSP stream:
curl -X POST http://{rental.ssh_host}:8000/stream/start \\
-H "Content-Type: application/json" \\
-d '{{"source": "rtsp://camera/stream"}}'
π Example - View in browser:
http://{rental.ssh_host}:8000/ws/mjpeg
Press Enter to cancel order and cleanup...
""")
input()
print("\nπ§Ή Cleaning up...")
client.cancel_order(rental.order_id)
print("β Done!")
if __name__ == "__main__":
main()