Copy #!/usr/bin/env python3
"""
Production-ready video transcoding service using Clore.ai GPUs.
Supports batch processing, multiple codecs, and cost optimization.
Usage:
python video_transcode.py --api-key YOUR_API_KEY --input video1.mp4 video2.mov --output ./transcoded/
"""
import os
import sys
import time
import json
import secrets
import argparse
import requests
from pathlib import Path
from typing import List, Dict, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import subprocess
class VideoCodec(Enum):
H264 = "h264_nvenc"
H265 = "hevc_nvenc"
AV1 = "av1_nvenc"
class VideoPreset(Enum):
FASTEST = "p1"
FAST = "p4"
BALANCED = "p5"
QUALITY = "p6"
BEST = "p7"
@dataclass
class TranscodeResult:
input_file: str
output_file: str
status: str
codec: str
duration_seconds: float
input_size_mb: float
output_size_mb: float
compression_ratio: float
cost_usd: float
error: Optional[str] = None
class CloreVideoTranscoder:
"""Complete video transcoding solution using Clore.ai GPUs."""
BASE_URL = "https://api.clore.ai"
FFMPEG_IMAGE = "jrottenberg/ffmpeg:4.4-nvidia"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {"auth": api_key}
self.server = None
self.order_id = None
self.ssh_host = None
self.ssh_port = None
self.ssh_password = None
self.hourly_cost = 0.0
def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
"""Make API request."""
url = f"{self.BASE_URL}{endpoint}"
for attempt in range(3):
response = requests.request(method, url, headers=self.headers, **kwargs)
data = response.json()
if data.get("code") == 5:
time.sleep(2 ** attempt)
continue
if data.get("code") != 0:
raise Exception(f"API Error: {data}")
return data
raise Exception("Max retries exceeded")
def _ssh(self, cmd: str) -> str:
"""Execute SSH command."""
ssh_cmd = f"sshpass -p '{self.ssh_password}' ssh -o StrictHostKeyChecking=no " \
f"-p {self.ssh_port} root@{self.ssh_host} '{cmd}'"
result = subprocess.run(ssh_cmd, shell=True, capture_output=True, text=True)
return result.stdout
def _upload(self, local: str, remote: str):
"""Upload file via SCP."""
cmd = f"sshpass -p '{self.ssh_password}' scp -o StrictHostKeyChecking=no " \
f"-P {self.ssh_port} '{local}' root@{self.ssh_host}:'{remote}'"
subprocess.run(cmd, shell=True, check=True)
def _download(self, remote: str, local: str):
"""Download file via SCP."""
cmd = f"sshpass -p '{self.ssh_password}' scp -o StrictHostKeyChecking=no " \
f"-P {self.ssh_port} root@{self.ssh_host}:'{remote}' '{local}'"
subprocess.run(cmd, shell=True, check=True)
def find_gpu(self, max_price: float = 0.50, prefer_spot: bool = True) -> Optional[Dict]:
"""Find optimal transcoding GPU."""
servers = self._api("GET", "/v1/marketplace")["servers"]
nvenc_gpus = ["RTX 4090", "RTX 4080", "RTX 3090", "RTX 3080", "A100", "A6000"]
candidates = []
for s in servers:
if s.get("rented"):
continue
gpus = s.get("gpu_array", [])
if not any(any(g in gpu for g in nvenc_gpus) for gpu in gpus):
continue
price = s.get("price", {}).get("usd", {})
cost = price.get("spot" if prefer_spot else "on_demand_clore")
if cost and cost <= max_price:
candidates.append({
"id": s["id"], "gpus": gpus, "price": cost,
"reliability": s.get("reliability", 0)
})
if not candidates:
return None
return min(candidates, key=lambda x: (x["price"], -x["reliability"]))
def setup(self, max_price: float = 0.50, use_spot: bool = True):
"""Rent and setup GPU server."""
print("π Finding GPU...")
gpu = self.find_gpu(max_price, use_spot)
if not gpu:
raise Exception(f"No GPU found under ${max_price}/hr")
print(f" Server {gpu['id']}: {gpu['gpus']} @ ${gpu['price']:.2f}/hr")
self.ssh_password = secrets.token_urlsafe(16)
print("π Renting server...")
order_data = {
"renting_server": gpu["id"],
"type": "spot" if use_spot else "on-demand",
"currency": "CLORE-Blockchain",
"image": self.FFMPEG_IMAGE,
"ports": {"22": "tcp"},
"env": {"NVIDIA_VISIBLE_DEVICES": "all"},
"ssh_password": self.ssh_password
}
if use_spot:
order_data["spotprice"] = gpu["price"] * 1.1
result = self._api("POST", "/v1/create_order", json=order_data)
self.order_id = result["order_id"]
print("β³ Waiting for server...")
for _ in range(90):
orders = self._api("GET", "/v1/my_orders")["orders"]
order = next((o for o in orders if o["order_id"] == self.order_id), None)
if order and order.get("status") == "running":
conn = order["connection"]["ssh"]
parts = conn.split()
self.ssh_host = parts[1].split("@")[1]
self.ssh_port = int(parts[-1]) if "-p" in conn else 22
self.hourly_cost = gpu["price"]
print(f"β
Server ready: {self.ssh_host}:{self.ssh_port}")
return
time.sleep(2)
raise Exception("Timeout")
def transcode(self, input_path: str, output_path: str,
codec: VideoCodec = VideoCodec.H265,
preset: VideoPreset = VideoPreset.BALANCED,
crf: int = 23, resolution: str = None) -> TranscodeResult:
"""Transcode single file."""
filename = os.path.basename(input_path)
remote_in = f"/tmp/{filename}"
remote_out = f"/tmp/out_{filename}.mp4"
print(f"π€ Uploading {filename}...")
self._upload(input_path, remote_in)
# Build FFmpeg command
vf = f"-vf 'scale_cuda={resolution}'" if resolution else ""
cmd = f"ffmpeg -y -hwaccel cuda -hwaccel_output_format cuda -i '{remote_in}' " \
f"{vf} -c:v {codec.value} -preset {preset.value} -cq {crf} -b:v 0 " \
f"-c:a aac -b:a 128k '{remote_out}' 2>&1"
print(f"π¬ Transcoding...")
start = time.time()
self._ssh(cmd)
duration = time.time() - start
print(f"π₯ Downloading...")
self._download(remote_out, output_path)
input_size = os.path.getsize(input_path) / (1024*1024)
output_size = os.path.getsize(output_path) / (1024*1024)
return TranscodeResult(
input_file=input_path,
output_file=output_path,
status="success",
codec=codec.value,
duration_seconds=duration,
input_size_mb=input_size,
output_size_mb=output_size,
compression_ratio=input_size/output_size if output_size > 0 else 0,
cost_usd=(duration/3600) * self.hourly_cost
)
def batch_transcode(self, input_files: List[str], output_dir: str,
codec: VideoCodec = VideoCodec.H265,
preset: VideoPreset = VideoPreset.BALANCED,
crf: int = 23) -> List[TranscodeResult]:
"""Transcode multiple files."""
os.makedirs(output_dir, exist_ok=True)
results = []
for i, inp in enumerate(input_files):
print(f"\n[{i+1}/{len(input_files)}] {os.path.basename(inp)}")
name = Path(inp).stem
out = os.path.join(output_dir, f"{name}.mp4")
try:
result = self.transcode(inp, out, codec, preset, crf)
print(f" β
{result.input_size_mb:.1f}MB β {result.output_size_mb:.1f}MB ({result.compression_ratio:.1f}x) ${result.cost_usd:.4f}")
results.append(result)
except Exception as e:
print(f" β {e}")
results.append(TranscodeResult(
input_file=inp, output_file=out, status="failed",
codec=codec.value, duration_seconds=0, input_size_mb=0,
output_size_mb=0, compression_ratio=0, cost_usd=0, error=str(e)
))
return results
def cleanup(self):
"""Cancel order and release resources."""
if self.order_id:
print("π§Ή Releasing server...")
self._api("POST", "/v1/cancel_order", json={"id": self.order_id})
def __enter__(self):
return self
def __exit__(self, *args):
self.cleanup()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--api-key", required=True)
parser.add_argument("--input", "-i", nargs="+", required=True)
parser.add_argument("--output", "-o", required=True)
parser.add_argument("--codec", choices=["h264", "h265", "av1"], default="h265")
parser.add_argument("--preset", choices=["fastest", "fast", "balanced", "quality", "best"], default="balanced")
parser.add_argument("--crf", type=int, default=23)
parser.add_argument("--max-price", type=float, default=0.50)
parser.add_argument("--on-demand", action="store_true")
args = parser.parse_args()
codec_map = {"h264": VideoCodec.H264, "h265": VideoCodec.H265, "av1": VideoCodec.AV1}
preset_map = {"fastest": VideoPreset.FASTEST, "fast": VideoPreset.FAST,
"balanced": VideoPreset.BALANCED, "quality": VideoPreset.QUALITY, "best": VideoPreset.BEST}
with CloreVideoTranscoder(args.api_key) as transcoder:
transcoder.setup(args.max_price, not args.on_demand)
results = transcoder.batch_transcode(
args.input, args.output,
codec_map[args.codec], preset_map[args.preset], args.crf
)
# Summary
print("\n" + "="*60)
success = [r for r in results if r.status == "success"]
total_in = sum(r.input_size_mb for r in success)
total_out = sum(r.output_size_mb for r in success)
total_time = sum(r.duration_seconds for r in success)
total_cost = sum(r.cost_usd for r in success)
print(f"π SUMMARY: {len(success)}/{len(results)} succeeded")
print(f" Total: {total_in:.1f}MB β {total_out:.1f}MB")
print(f" Time: {total_time:.1f}s | Cost: ${total_cost:.4f}")
if total_time > 0:
print(f" Speed: {total_in/total_time:.1f} MB/s")
if __name__ == "__main__":
main()