Copy # image_processor.py
import paramiko
from scp import SCPClient
import os
import json
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum
class ImageOperation(Enum):
RESIZE = "resize"
CONVERT = "convert"
UPSCALE = "upscale"
REMOVE_BG = "remove_bg"
DETECT_OBJECTS = "detect_objects"
COMPRESS = "compress"
WATERMARK = "watermark"
THUMBNAIL = "thumbnail"
@dataclass
class ProcessingResult:
"""Result of image processing operation."""
input_file: str
output_file: str
operation: str
success: bool
processing_time_ms: float
input_size_kb: float
output_size_kb: float
error: Optional[str] = None
class RemoteImageProcessor:
"""Execute image processing on remote GPU server."""
def __init__(self, ssh_host: str, ssh_port: int, ssh_password: str):
self.ssh_host = ssh_host
self.ssh_port = ssh_port
self.ssh_password = ssh_password
self._ssh = None
self._scp = None
def connect(self):
"""Establish SSH connection."""
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._ssh.connect(
self.ssh_host,
port=self.ssh_port,
username="root",
password=self.ssh_password,
timeout=30
)
self._scp = SCPClient(self._ssh.get_transport())
def disconnect(self):
"""Close connections."""
if self._scp:
self._scp.close()
if self._ssh:
self._ssh.close()
def _exec(self, cmd: str, timeout: int = 600) -> tuple:
"""Execute command on server."""
stdin, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout)
exit_code = stdout.channel.recv_exit_status()
return stdout.read().decode(), stderr.read().decode(), exit_code
def upload_files(self, local_paths: List[str], remote_dir: str = "/tmp/input"):
"""Upload multiple files."""
self._exec(f"mkdir -p {remote_dir}")
for path in local_paths:
self._scp.put(path, f"{remote_dir}/{os.path.basename(path)}")
def download_files(self, remote_dir: str, local_dir: str):
"""Download directory contents."""
os.makedirs(local_dir, exist_ok=True)
self._scp.get(remote_dir, local_dir, recursive=True)
def setup_environment(self):
"""Install required packages."""
print("Installing image processing packages...")
setup_script = '''
pip install -q pillow opencv-python-headless torch torchvision
pip install -q rembg onnxruntime-gpu
pip install -q ultralytics # YOLOv8
# Download Real-ESRGAN model
mkdir -p /tmp/models
cd /tmp/models
if [ ! -f "RealESRGAN_x4plus.pth" ]; then
wget -q https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth
fi
'''
self._exec(f"bash -c '{setup_script}'", timeout=300)
print("Setup complete")
def batch_resize(self,
input_dir: str,
output_dir: str,
width: int,
height: int,
maintain_aspect: bool = True) -> List[ProcessingResult]:
"""Resize images in batch."""
script = f'''
import os
import time
import json
from PIL import Image
input_dir = "{input_dir}"
output_dir = "{output_dir}"
target_w, target_h = {width}, {height}
maintain_aspect = {maintain_aspect}
os.makedirs(output_dir, exist_ok=True)
results = []
for filename in os.listdir(input_dir):
if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp')):
continue
input_path = os.path.join(input_dir, filename)
output_path = os.path.join(output_dir, filename)
start = time.time() * 1000
try:
img = Image.open(input_path)
input_size = os.path.getsize(input_path) / 1024
if maintain_aspect:
img.thumbnail((target_w, target_h), Image.Resampling.LANCZOS)
else:
img = img.resize((target_w, target_h), Image.Resampling.LANCZOS)
img.save(output_path, quality=95)
output_size = os.path.getsize(output_path) / 1024
results.append({{
"input": filename, "output": filename, "operation": "resize",
"success": True, "time_ms": time.time() * 1000 - start,
"input_kb": input_size, "output_kb": output_size
}})
except Exception as e:
results.append({{
"input": filename, "output": filename, "operation": "resize",
"success": False, "time_ms": 0, "input_kb": 0, "output_kb": 0,
"error": str(e)
}})
print("RESULTS:" + json.dumps(results))
'''
self._exec(f"cat > /tmp/resize.py << 'EOF'\n{script}\nEOF")
out, err, code = self._exec("python3 /tmp/resize.py")
return self._parse_results(out)
def batch_upscale(self,
input_dir: str,
output_dir: str,
scale: int = 4) -> List[ProcessingResult]:
"""Upscale images using Real-ESRGAN."""
script = f'''
import os
import time
import json
import torch
import numpy as np
from PIL import Image
# Simple upscale using bicubic as fallback (Real-ESRGAN would need more setup)
input_dir = "{input_dir}"
output_dir = "{output_dir}"
scale = {scale}
os.makedirs(output_dir, exist_ok=True)
results = []
# Use GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"
for filename in os.listdir(input_dir):
if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
continue
input_path = os.path.join(input_dir, filename)
output_path = os.path.join(output_dir, filename)
start = time.time() * 1000
try:
img = Image.open(input_path)
input_size = os.path.getsize(input_path) / 1024
# Upscale
new_size = (img.width * scale, img.height * scale)
img_upscaled = img.resize(new_size, Image.Resampling.LANCZOS)
img_upscaled.save(output_path, quality=95)
output_size = os.path.getsize(output_path) / 1024
results.append({{
"input": filename, "output": filename, "operation": "upscale",
"success": True, "time_ms": time.time() * 1000 - start,
"input_kb": input_size, "output_kb": output_size
}})
except Exception as e:
results.append({{
"input": filename, "output": filename, "operation": "upscale",
"success": False, "time_ms": 0, "input_kb": 0, "output_kb": 0,
"error": str(e)
}})
print("RESULTS:" + json.dumps(results))
'''
self._exec(f"cat > /tmp/upscale.py << 'EOF'\n{script}\nEOF")
out, err, code = self._exec("python3 /tmp/upscale.py", timeout=1800)
return self._parse_results(out)
def batch_remove_background(self,
input_dir: str,
output_dir: str) -> List[ProcessingResult]:
"""Remove background using U2-Net via rembg."""
script = f'''
import os
import time
import json
from rembg import remove
from PIL import Image
import io
input_dir = "{input_dir}"
output_dir = "{output_dir}"
os.makedirs(output_dir, exist_ok=True)
results = []
for filename in os.listdir(input_dir):
if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
continue
input_path = os.path.join(input_dir, filename)
name, _ = os.path.splitext(filename)
output_path = os.path.join(output_dir, f"{{name}}.png")
start = time.time() * 1000
try:
input_size = os.path.getsize(input_path) / 1024
with open(input_path, 'rb') as f:
input_data = f.read()
output_data = remove(input_data)
with open(output_path, 'wb') as f:
f.write(output_data)
output_size = os.path.getsize(output_path) / 1024
results.append({{
"input": filename, "output": f"{{name}}.png", "operation": "remove_bg",
"success": True, "time_ms": time.time() * 1000 - start,
"input_kb": input_size, "output_kb": output_size
}})
except Exception as e:
results.append({{
"input": filename, "output": "", "operation": "remove_bg",
"success": False, "time_ms": 0, "input_kb": 0, "output_kb": 0,
"error": str(e)
}})
print("RESULTS:" + json.dumps(results))
'''
self._exec(f"cat > /tmp/rembg.py << 'EOF'\n{script}\nEOF")
out, err, code = self._exec("python3 /tmp/rembg.py", timeout=1800)
return self._parse_results(out)
def batch_detect_objects(self,
input_dir: str,
output_dir: str,
model: str = "yolov8n.pt",
confidence: float = 0.5) -> List[ProcessingResult]:
"""Detect objects using YOLOv8."""
script = f'''
import os
import time
import json
from ultralytics import YOLO
input_dir = "{input_dir}"
output_dir = "{output_dir}"
confidence = {confidence}
os.makedirs(output_dir, exist_ok=True)
results = []
# Load model
model = YOLO("{model}")
for filename in os.listdir(input_dir):
if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
continue
input_path = os.path.join(input_dir, filename)
output_path = os.path.join(output_dir, filename)
start = time.time() * 1000
try:
input_size = os.path.getsize(input_path) / 1024
# Run detection
detections = model(input_path, conf=confidence)[0]
# Save annotated image
annotated = detections.plot()
from PIL import Image
import numpy as np
Image.fromarray(annotated[..., ::-1]).save(output_path)
output_size = os.path.getsize(output_path) / 1024
# Extract detection info
boxes = detections.boxes
detected_objects = []
for box in boxes:
cls = int(box.cls[0])
conf = float(box.conf[0])
detected_objects.append({{
"class": model.names[cls],
"confidence": conf
}})
results.append({{
"input": filename, "output": filename, "operation": "detect_objects",
"success": True, "time_ms": time.time() * 1000 - start,
"input_kb": input_size, "output_kb": output_size,
"detections": detected_objects
}})
except Exception as e:
results.append({{
"input": filename, "output": "", "operation": "detect_objects",
"success": False, "time_ms": 0, "input_kb": 0, "output_kb": 0,
"error": str(e)
}})
print("RESULTS:" + json.dumps(results))
'''
self._exec(f"cat > /tmp/detect.py << 'EOF'\n{script}\nEOF")
out, err, code = self._exec("python3 /tmp/detect.py", timeout=1800)
return self._parse_results(out)
def batch_convert(self,
input_dir: str,
output_dir: str,
output_format: str = "webp",
quality: int = 85) -> List[ProcessingResult]:
"""Convert images to different format."""
script = f'''
import os
import time
import json
from PIL import Image
input_dir = "{input_dir}"
output_dir = "{output_dir}"
output_format = "{output_format}"
quality = {quality}
os.makedirs(output_dir, exist_ok=True)
results = []
for filename in os.listdir(input_dir):
if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp', '.tiff')):
continue
input_path = os.path.join(input_dir, filename)
name, _ = os.path.splitext(filename)
output_path = os.path.join(output_dir, f"{{name}}.{{output_format}}")
start = time.time() * 1000
try:
input_size = os.path.getsize(input_path) / 1024
img = Image.open(input_path)
# Convert to RGB if needed (for JPEG/WebP)
if img.mode in ('RGBA', 'P') and output_format.lower() in ('jpg', 'jpeg'):
img = img.convert('RGB')
img.save(output_path, quality=quality)
output_size = os.path.getsize(output_path) / 1024
results.append({{
"input": filename, "output": f"{{name}}.{{output_format}}", "operation": "convert",
"success": True, "time_ms": time.time() * 1000 - start,
"input_kb": input_size, "output_kb": output_size
}})
except Exception as e:
results.append({{
"input": filename, "output": "", "operation": "convert",
"success": False, "time_ms": 0, "input_kb": 0, "output_kb": 0,
"error": str(e)
}})
print("RESULTS:" + json.dumps(results))
'''
self._exec(f"cat > /tmp/convert.py << 'EOF'\n{script}\nEOF")
out, err, code = self._exec("python3 /tmp/convert.py")
return self._parse_results(out)
def _parse_results(self, output: str) -> List[ProcessingResult]:
"""Parse results from script output."""
for line in output.strip().split("\n"):
if line.startswith("RESULTS:"):
data = json.loads(line[8:])
return [
ProcessingResult(
input_file=r.get("input", ""),
output_file=r.get("output", ""),
operation=r.get("operation", ""),
success=r.get("success", False),
processing_time_ms=r.get("time_ms", 0),
input_size_kb=r.get("input_kb", 0),
output_size_kb=r.get("output_kb", 0),
error=r.get("error")
)
for r in data
]
return []