Copy #!/usr/bin/env python3
"""
GPU Test Framework for CI/CD
A framework for running GPU tests in CI/CD pipelines with Clore.ai.
"""
import argparse
import json
import os
import sys
import time
import secrets
import requests
import paramiko
import subprocess
from scp import SCPClient
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
import xml.etree.ElementTree as ET
@dataclass
class TestResult:
"""Result of a single test."""
name: str
status: str # passed, failed, skipped, error
duration_seconds: float
message: str = ""
stdout: str = ""
stderr: str = ""
@dataclass
class TestSuite:
"""Collection of test results."""
name: str
tests: List[TestResult] = field(default_factory=list)
start_time: datetime = field(default_factory=datetime.now)
end_time: Optional[datetime] = None
@property
def total(self) -> int:
return len(self.tests)
@property
def passed(self) -> int:
return len([t for t in self.tests if t.status == "passed"])
@property
def failed(self) -> int:
return len([t for t in self.tests if t.status == "failed"])
@property
def duration(self) -> float:
if self.end_time:
return (self.end_time - self.start_time).total_seconds()
return sum(t.duration_seconds for t in self.tests)
def to_junit_xml(self) -> str:
"""Generate JUnit XML report."""
testsuite = ET.Element("testsuite", {
"name": self.name,
"tests": str(self.total),
"failures": str(self.failed),
"time": str(self.duration)
})
for test in self.tests:
testcase = ET.SubElement(testsuite, "testcase", {
"name": test.name,
"time": str(test.duration_seconds)
})
if test.status == "failed":
failure = ET.SubElement(testcase, "failure", {
"message": test.message
})
failure.text = test.stdout + "\n" + test.stderr
elif test.status == "error":
error = ET.SubElement(testcase, "error", {
"message": test.message
})
error.text = test.stdout + "\n" + test.stderr
elif test.status == "skipped":
ET.SubElement(testcase, "skipped")
return ET.tostring(testsuite, encoding="unicode")
class GPUTestRunner:
"""Run GPU tests on Clore.ai infrastructure."""
BASE_URL = "https://api.clore.ai"
def __init__(self, api_key: str, project_dir: str = "."):
self.api_key = api_key
self.headers = {"auth": api_key}
self.project_dir = Path(project_dir)
self.order_id = None
self.ssh_host = None
self.ssh_port = None
self.ssh_password = None
self.ssh_client = None
self.scp_client = None
self.hourly_cost = 0.0
self.start_time = None
def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
"""Make API request."""
url = f"{self.BASE_URL}{endpoint}"
for attempt in range(3):
response = requests.request(method, url, headers=self.headers, timeout=30)
data = response.json()
if data.get("code") == 5:
time.sleep(2 ** attempt)
continue
if data.get("code") != 0:
raise Exception(f"API Error: {data}")
return data
raise Exception("Max retries exceeded")
def provision(self, gpu_type: str = "RTX 3080", max_price: float = 0.30,
image: str = "nvidia/cuda:12.8.0-base-ubuntu22.04") -> bool:
"""Provision GPU server for testing."""
print(f"\n🔍 Looking for {gpu_type} under ${max_price}/hr...")
servers = self._api("GET", "/v1/marketplace")["servers"]
server = None
for s in servers:
if s.get("rented"):
continue
gpus = s.get("gpu_array", [])
if not any(gpu_type.lower() in g.lower() for g in gpus):
continue
price = s.get("price", {}).get("usd", {}).get("spot")
if price and price <= max_price:
server = {"id": s["id"], "gpus": gpus, "price": price}
break
if not server:
print(f"❌ No {gpu_type} available under ${max_price}/hr")
return False
print(f"✅ Found: {server['gpus']} @ ${server['price']}/hr")
self.ssh_password = secrets.token_urlsafe(16)
self.hourly_cost = server["price"]
order_data = {
"renting_server": server["id"],
"type": "spot",
"currency": "CLORE-Blockchain",
"image": image,
"ports": {"22": "tcp"},
"env": {"NVIDIA_VISIBLE_DEVICES": "all"},
"ssh_password": self.ssh_password,
"spotprice": server["price"] * 1.1
}
print("🚀 Creating order...")
result = self._api("POST", "/v1/create_order", json=order_data)
self.order_id = result["order_id"]
print("⏳ Waiting for server...")
for _ in range(90):
orders = self._api("GET", "/v1/my_orders")["orders"]
order = next((o for o in orders if o["order_id"] == self.order_id), None)
if order and order.get("status") == "running":
conn = order["connection"]["ssh"]
parts = conn.split()
self.ssh_host = parts[1].split("@")[1]
self.ssh_port = int(parts[-1]) if "-p" in conn else 22
self.start_time = time.time()
print(f"✅ Server ready: {self.ssh_host}:{self.ssh_port}")
return self._connect()
time.sleep(2)
print("❌ Timeout")
self.cleanup()
return False
def _connect(self) -> bool:
"""Establish SSH connection."""
try:
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh_client.connect(
self.ssh_host,
port=self.ssh_port,
username="root",
password=self.ssh_password,
timeout=30
)
self.scp_client = SCPClient(self.ssh_client.get_transport())
return True
except Exception as e:
print(f"❌ SSH connection failed: {e}")
return False
def _exec(self, cmd: str, timeout: int = 600) -> Tuple[int, str, str]:
"""Execute command on remote server."""
stdin, stdout, stderr = self.ssh_client.exec_command(cmd, timeout=timeout)
exit_code = stdout.channel.recv_exit_status()
return exit_code, stdout.read().decode(), stderr.read().decode()
def setup_environment(self, requirements_file: str = "requirements.txt"):
"""Set up test environment on GPU server."""
print("\n📦 Setting up environment...")
# Create workspace
self._exec("mkdir -p /workspace")
# Upload project
print(" Uploading project files...")
for item in self.project_dir.iterdir():
if item.name.startswith(".") or item.name == "__pycache__":
continue
if item.is_file():
self.scp_client.put(str(item), f"/workspace/{item.name}")
elif item.is_dir() and item.name not in ["venv", "env", ".git", "node_modules"]:
self._exec(f"mkdir -p /workspace/{item.name}")
try:
self.scp_client.put(str(item), f"/workspace/{item.name}", recursive=True)
except:
pass
# Install dependencies
if (self.project_dir / requirements_file).exists():
print(" Installing dependencies...")
self._exec(f"pip install -q -r /workspace/{requirements_file}")
# Verify GPU
print(" Verifying GPU...")
exit_code, stdout, _ = self._exec("nvidia-smi --query-gpu=name,memory.total --format=csv,noheader")
print(f" GPU: {stdout.strip()}")
def run_test(self, name: str, command: str, timeout: int = 300) -> TestResult:
"""Run a single test."""
print(f"\n🧪 Running: {name}")
start = time.time()
exit_code, stdout, stderr = self._exec(f"cd /workspace && {command}", timeout=timeout)
duration = time.time() - start
if exit_code == 0:
status = "passed"
message = "Test passed"
print(f" ✅ PASSED ({duration:.1f}s)")
else:
status = "failed"
message = f"Exit code: {exit_code}"
print(f" ❌ FAILED ({duration:.1f}s)")
if stderr:
print(f" Error: {stderr[:200]}")
return TestResult(
name=name,
status=status,
duration_seconds=duration,
message=message,
stdout=stdout,
stderr=stderr
)
def run_test_suite(self, tests: List[Dict]) -> TestSuite:
"""Run a collection of tests."""
suite = TestSuite(name="GPU Tests")
for test in tests:
result = self.run_test(
name=test["name"],
command=test["command"],
timeout=test.get("timeout", 300)
)
suite.tests.append(result)
suite.end_time = datetime.now()
return suite
def run_pytest(self, test_path: str = "tests/", markers: str = "gpu") -> TestSuite:
"""Run pytest with GPU marker."""
print(f"\n🧪 Running pytest: {test_path}")
# Install pytest
self._exec("pip install -q pytest pytest-json-report")
# Run tests
cmd = f"cd /workspace && python -m pytest {test_path} -m {markers} " \
f"--json-report --json-report-file=/tmp/report.json -v 2>&1"
exit_code, stdout, stderr = self._exec(cmd, timeout=1800)
# Parse results
suite = TestSuite(name="pytest")
try:
self._exec("cat /tmp/report.json > /tmp/report_copy.json")
stdin, stdout_json, _ = self.ssh_client.exec_command("cat /tmp/report.json")
report = json.loads(stdout_json.read().decode())
for test in report.get("tests", []):
suite.tests.append(TestResult(
name=test["nodeid"],
status="passed" if test["outcome"] == "passed" else "failed",
duration_seconds=test.get("duration", 0),
message=test.get("call", {}).get("longrepr", ""),
stdout=stdout
))
except Exception as e:
print(f" Warning: Could not parse pytest report: {e}")
suite.tests.append(TestResult(
name="pytest",
status="passed" if exit_code == 0 else "failed",
duration_seconds=0,
message=stdout,
stdout=stdout,
stderr=stderr
))
suite.end_time = datetime.now()
return suite
def cleanup(self):
"""Cancel order and clean up."""
if self.scp_client:
self.scp_client.close()
if self.ssh_client:
self.ssh_client.close()
if self.order_id:
print(f"\n🧹 Cleaning up (order {self.order_id})...")
try:
self._api("POST", "/v1/cancel_order", json={"id": self.order_id})
# Calculate cost
if self.start_time:
duration = (time.time() - self.start_time) / 3600
cost = duration * self.hourly_cost
print(f" Duration: {duration*60:.1f} min")
print(f" Cost: ${cost:.4f}")
print("✅ Cleaned up")
except Exception as e:
print(f"⚠️ Cleanup error: {e}")
def generate_report(self, suite: TestSuite, output_dir: str = "test-results"):
"""Generate test reports."""
os.makedirs(output_dir, exist_ok=True)
# JUnit XML
with open(f"{output_dir}/junit.xml", "w") as f:
f.write(suite.to_junit_xml())
# JSON report
with open(f"{output_dir}/report.json", "w") as f:
json.dump({
"suite": suite.name,
"total": suite.total,
"passed": suite.passed,
"failed": suite.failed,
"duration": suite.duration,
"tests": [asdict(t) for t in suite.tests]
}, f, indent=2)
# Summary
summary = f"""
# GPU Test Results
**Suite:** {suite.name}
**Status:** {'✅ PASSED' if suite.failed == 0 else '❌ FAILED'}
**Tests:** {suite.passed}/{suite.total} passed
**Duration:** {suite.duration:.1f}s
## Results
| Test | Status | Duration |
|------|--------|----------|
"""
for test in suite.tests:
emoji = "✅" if test.status == "passed" else "❌"
summary += f"| {test.name} | {emoji} {test.status} | {test.duration_seconds:.1f}s |\n"
with open(f"{output_dir}/summary.md", "w") as f:
f.write(summary)
print(f"\n📊 Reports saved to {output_dir}/")
def main():
parser = argparse.ArgumentParser(description="GPU Test Runner for CI/CD")
parser.add_argument("--api-key", required=True)
parser.add_argument("--gpu", default="RTX 3080")
parser.add_argument("--max-price", type=float, default=0.30)
parser.add_argument("--image", default="nvidia/cuda:12.8.0-base-ubuntu22.04")
parser.add_argument("--project", default=".")
parser.add_argument("--requirements", default="requirements.txt")
parser.add_argument("--pytest", action="store_true", help="Run pytest")
parser.add_argument("--test-path", default="tests/")
parser.add_argument("--markers", default="gpu")
parser.add_argument("--output", default="test-results")
args = parser.parse_args()
runner = GPUTestRunner(args.api_key, args.project)
try:
if not runner.provision(args.gpu, args.max_price, args.image):
sys.exit(1)
runner.setup_environment(args.requirements)
if args.pytest:
suite = runner.run_pytest(args.test_path, args.markers)
else:
# Default test suite
suite = runner.run_test_suite([
{"name": "CUDA Check", "command": "python -c 'import torch; assert torch.cuda.is_available()'"},
{"name": "GPU Memory", "command": "nvidia-smi"},
])
runner.generate_report(suite, args.output)
# Exit with appropriate code
if suite.failed > 0:
print(f"\n❌ {suite.failed} test(s) failed")
sys.exit(1)
else:
print(f"\n✅ All {suite.total} tests passed!")
sys.exit(0)
finally:
runner.cleanup()
if __name__ == "__main__":
main()