# CI/CD Pipeline with GPU Testing

## What We're Building

A complete CI/CD pipeline that validates GPU-dependent code, runs performance benchmarks, and ensures CUDA compatibility across your ML/AI projects. Integrates with popular CI systems (GitHub Actions, GitLab CI, Jenkins).

**Key Features:**

* Automated GPU testing on every commit
* CUDA compatibility validation
* Performance regression detection
* Model accuracy verification
* Multi-GPU testing support
* Detailed test reports

## Prerequisites

* Clore.ai account with API key
* CI/CD system (GitHub Actions, GitLab CI, or Jenkins)
* Python project with GPU dependencies

## Architecture Overview

```
┌─────────────────────────────────────────────────────────────────┐
│                    CI/CD GPU Testing Pipeline                    │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   ┌────────┐   ┌────────┐   ┌────────┐   ┌────────┐            │
│   │ Lint   │──▶│ Unit   │──▶│ Build  │──▶│ GPU    │            │
│   │ Check  │   │ Tests  │   │ Image  │   │ Tests  │            │
│   └────────┘   └────────┘   └────────┘   └────────┘            │
│                                               │                  │
│                                               ▼                  │
│                              ┌──────────────────────────────┐   │
│                              │     Clore.ai GPU Server      │   │
│                              │  ┌──────────────────────┐    │   │
│                              │  │ • CUDA validation    │    │   │
│                              │  │ • Model inference    │    │   │
│                              │  │ • Performance bench  │    │   │
│                              │  │ • Accuracy tests     │    │   │
│                              │  └──────────────────────┘    │   │
│                              └──────────────────────────────┘   │
│                                               │                  │
│                                               ▼                  │
│                              ┌────────┐   ┌────────┐            │
│                              │ Report │──▶│ Deploy │            │
│                              └────────┘   └────────┘            │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘
```

## Step 1: Test Framework

```python
#!/usr/bin/env python3
"""
GPU Test Framework for CI/CD

A framework for running GPU tests in CI/CD pipelines with Clore.ai.
"""

import argparse
import json
import os
import sys
import time
import secrets
import requests
import paramiko
import subprocess
from scp import SCPClient
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
import xml.etree.ElementTree as ET


@dataclass
class TestResult:
    """Result of a single test."""
    name: str
    status: str  # passed, failed, skipped, error
    duration_seconds: float
    message: str = ""
    stdout: str = ""
    stderr: str = ""


@dataclass
class TestSuite:
    """Collection of test results."""
    name: str
    tests: List[TestResult] = field(default_factory=list)
    start_time: datetime = field(default_factory=datetime.now)
    end_time: Optional[datetime] = None
    
    @property
    def total(self) -> int:
        return len(self.tests)
    
    @property
    def passed(self) -> int:
        return len([t for t in self.tests if t.status == "passed"])
    
    @property
    def failed(self) -> int:
        return len([t for t in self.tests if t.status == "failed"])
    
    @property
    def duration(self) -> float:
        if self.end_time:
            return (self.end_time - self.start_time).total_seconds()
        return sum(t.duration_seconds for t in self.tests)
    
    def to_junit_xml(self) -> str:
        """Generate JUnit XML report."""
        testsuite = ET.Element("testsuite", {
            "name": self.name,
            "tests": str(self.total),
            "failures": str(self.failed),
            "time": str(self.duration)
        })
        
        for test in self.tests:
            testcase = ET.SubElement(testsuite, "testcase", {
                "name": test.name,
                "time": str(test.duration_seconds)
            })
            
            if test.status == "failed":
                failure = ET.SubElement(testcase, "failure", {
                    "message": test.message
                })
                failure.text = test.stdout + "\n" + test.stderr
            elif test.status == "error":
                error = ET.SubElement(testcase, "error", {
                    "message": test.message
                })
                error.text = test.stdout + "\n" + test.stderr
            elif test.status == "skipped":
                ET.SubElement(testcase, "skipped")
        
        return ET.tostring(testsuite, encoding="unicode")


class GPUTestRunner:
    """Run GPU tests on Clore.ai infrastructure."""
    
    BASE_URL = "https://api.clore.ai"
    
    def __init__(self, api_key: str, project_dir: str = "."):
        self.api_key = api_key
        self.headers = {"auth": api_key}
        self.project_dir = Path(project_dir)
        
        self.order_id = None
        self.ssh_host = None
        self.ssh_port = None
        self.ssh_password = None
        self.ssh_client = None
        self.scp_client = None
        
        self.hourly_cost = 0.0
        self.start_time = None
    
    def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
        """Make API request."""
        url = f"{self.BASE_URL}{endpoint}"
        for attempt in range(3):
            response = requests.request(method, url, headers=self.headers, timeout=30)
            data = response.json()
            if data.get("code") == 5:
                time.sleep(2 ** attempt)
                continue
            if data.get("code") != 0:
                raise Exception(f"API Error: {data}")
            return data
        raise Exception("Max retries exceeded")
    
    def provision(self, gpu_type: str = "RTX 3080", max_price: float = 0.30,
                  image: str = "nvidia/cuda:12.8.0-base-ubuntu22.04") -> bool:
        """Provision GPU server for testing."""
        print(f"\n🔍 Looking for {gpu_type} under ${max_price}/hr...")
        
        servers = self._api("GET", "/v1/marketplace")["servers"]
        
        server = None
        for s in servers:
            if s.get("rented"):
                continue
            gpus = s.get("gpu_array", [])
            if not any(gpu_type.lower() in g.lower() for g in gpus):
                continue
            price = s.get("price", {}).get("usd", {}).get("spot")
            if price and price <= max_price:
                server = {"id": s["id"], "gpus": gpus, "price": price}
                break
        
        if not server:
            print(f"❌ No {gpu_type} available under ${max_price}/hr")
            return False
        
        print(f"✅ Found: {server['gpus']} @ ${server['price']}/hr")
        
        self.ssh_password = secrets.token_urlsafe(16)
        self.hourly_cost = server["price"]
        
        order_data = {
            "renting_server": server["id"],
            "type": "spot",
            "currency": "CLORE-Blockchain",
            "image": image,
            "ports": {"22": "tcp"},
            "env": {"NVIDIA_VISIBLE_DEVICES": "all"},
            "ssh_password": self.ssh_password,
            "spotprice": server["price"] * 1.1
        }
        
        print("🚀 Creating order...")
        result = self._api("POST", "/v1/create_order", json=order_data)
        self.order_id = result["order_id"]
        
        print("⏳ Waiting for server...")
        for _ in range(90):
            orders = self._api("GET", "/v1/my_orders")["orders"]
            order = next((o for o in orders if o["order_id"] == self.order_id), None)
            
            if order and order.get("status") == "running":
                conn = order["connection"]["ssh"]
                parts = conn.split()
                self.ssh_host = parts[1].split("@")[1]
                self.ssh_port = int(parts[-1]) if "-p" in conn else 22
                self.start_time = time.time()
                
                print(f"✅ Server ready: {self.ssh_host}:{self.ssh_port}")
                return self._connect()
            
            time.sleep(2)
        
        print("❌ Timeout")
        self.cleanup()
        return False
    
    def _connect(self) -> bool:
        """Establish SSH connection."""
        try:
            self.ssh_client = paramiko.SSHClient()
            self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.ssh_client.connect(
                self.ssh_host,
                port=self.ssh_port,
                username="root",
                password=self.ssh_password,
                timeout=30
            )
            self.scp_client = SCPClient(self.ssh_client.get_transport())
            return True
        except Exception as e:
            print(f"❌ SSH connection failed: {e}")
            return False
    
    def _exec(self, cmd: str, timeout: int = 600) -> Tuple[int, str, str]:
        """Execute command on remote server."""
        stdin, stdout, stderr = self.ssh_client.exec_command(cmd, timeout=timeout)
        exit_code = stdout.channel.recv_exit_status()
        return exit_code, stdout.read().decode(), stderr.read().decode()
    
    def setup_environment(self, requirements_file: str = "requirements.txt"):
        """Set up test environment on GPU server."""
        print("\n📦 Setting up environment...")
        
        # Create workspace
        self._exec("mkdir -p /workspace")
        
        # Upload project
        print("   Uploading project files...")
        for item in self.project_dir.iterdir():
            if item.name.startswith(".") or item.name == "__pycache__":
                continue
            if item.is_file():
                self.scp_client.put(str(item), f"/workspace/{item.name}")
            elif item.is_dir() and item.name not in ["venv", "env", ".git", "node_modules"]:
                self._exec(f"mkdir -p /workspace/{item.name}")
                try:
                    self.scp_client.put(str(item), f"/workspace/{item.name}", recursive=True)
                except:
                    pass
        
        # Install dependencies
        if (self.project_dir / requirements_file).exists():
            print("   Installing dependencies...")
            self._exec(f"pip install -q -r /workspace/{requirements_file}")
        
        # Verify GPU
        print("   Verifying GPU...")
        exit_code, stdout, _ = self._exec("nvidia-smi --query-gpu=name,memory.total --format=csv,noheader")
        print(f"   GPU: {stdout.strip()}")
    
    def run_test(self, name: str, command: str, timeout: int = 300) -> TestResult:
        """Run a single test."""
        print(f"\n🧪 Running: {name}")
        
        start = time.time()
        exit_code, stdout, stderr = self._exec(f"cd /workspace && {command}", timeout=timeout)
        duration = time.time() - start
        
        if exit_code == 0:
            status = "passed"
            message = "Test passed"
            print(f"   ✅ PASSED ({duration:.1f}s)")
        else:
            status = "failed"
            message = f"Exit code: {exit_code}"
            print(f"   ❌ FAILED ({duration:.1f}s)")
            if stderr:
                print(f"   Error: {stderr[:200]}")
        
        return TestResult(
            name=name,
            status=status,
            duration_seconds=duration,
            message=message,
            stdout=stdout,
            stderr=stderr
        )
    
    def run_test_suite(self, tests: List[Dict]) -> TestSuite:
        """Run a collection of tests."""
        suite = TestSuite(name="GPU Tests")
        
        for test in tests:
            result = self.run_test(
                name=test["name"],
                command=test["command"],
                timeout=test.get("timeout", 300)
            )
            suite.tests.append(result)
        
        suite.end_time = datetime.now()
        return suite
    
    def run_pytest(self, test_path: str = "tests/", markers: str = "gpu") -> TestSuite:
        """Run pytest with GPU marker."""
        print(f"\n🧪 Running pytest: {test_path}")
        
        # Install pytest
        self._exec("pip install -q pytest pytest-json-report")
        
        # Run tests
        cmd = f"cd /workspace && python -m pytest {test_path} -m {markers} " \
              f"--json-report --json-report-file=/tmp/report.json -v 2>&1"
        
        exit_code, stdout, stderr = self._exec(cmd, timeout=1800)
        
        # Parse results
        suite = TestSuite(name="pytest")
        
        try:
            self._exec("cat /tmp/report.json > /tmp/report_copy.json")
            stdin, stdout_json, _ = self.ssh_client.exec_command("cat /tmp/report.json")
            report = json.loads(stdout_json.read().decode())
            
            for test in report.get("tests", []):
                suite.tests.append(TestResult(
                    name=test["nodeid"],
                    status="passed" if test["outcome"] == "passed" else "failed",
                    duration_seconds=test.get("duration", 0),
                    message=test.get("call", {}).get("longrepr", ""),
                    stdout=stdout
                ))
        except Exception as e:
            print(f"   Warning: Could not parse pytest report: {e}")
            suite.tests.append(TestResult(
                name="pytest",
                status="passed" if exit_code == 0 else "failed",
                duration_seconds=0,
                message=stdout,
                stdout=stdout,
                stderr=stderr
            ))
        
        suite.end_time = datetime.now()
        return suite
    
    def cleanup(self):
        """Cancel order and clean up."""
        if self.scp_client:
            self.scp_client.close()
        if self.ssh_client:
            self.ssh_client.close()
        
        if self.order_id:
            print(f"\n🧹 Cleaning up (order {self.order_id})...")
            try:
                self._api("POST", "/v1/cancel_order", json={"id": self.order_id})
                
                # Calculate cost
                if self.start_time:
                    duration = (time.time() - self.start_time) / 3600
                    cost = duration * self.hourly_cost
                    print(f"   Duration: {duration*60:.1f} min")
                    print(f"   Cost: ${cost:.4f}")
                
                print("✅ Cleaned up")
            except Exception as e:
                print(f"⚠️ Cleanup error: {e}")
    
    def generate_report(self, suite: TestSuite, output_dir: str = "test-results"):
        """Generate test reports."""
        os.makedirs(output_dir, exist_ok=True)
        
        # JUnit XML
        with open(f"{output_dir}/junit.xml", "w") as f:
            f.write(suite.to_junit_xml())
        
        # JSON report
        with open(f"{output_dir}/report.json", "w") as f:
            json.dump({
                "suite": suite.name,
                "total": suite.total,
                "passed": suite.passed,
                "failed": suite.failed,
                "duration": suite.duration,
                "tests": [asdict(t) for t in suite.tests]
            }, f, indent=2)
        
        # Summary
        summary = f"""
# GPU Test Results

**Suite:** {suite.name}
**Status:** {'✅ PASSED' if suite.failed == 0 else '❌ FAILED'}
**Tests:** {suite.passed}/{suite.total} passed
**Duration:** {suite.duration:.1f}s

## Results

| Test | Status | Duration |
|------|--------|----------|
"""
        for test in suite.tests:
            emoji = "✅" if test.status == "passed" else "❌"
            summary += f"| {test.name} | {emoji} {test.status} | {test.duration_seconds:.1f}s |\n"
        
        with open(f"{output_dir}/summary.md", "w") as f:
            f.write(summary)
        
        print(f"\n📊 Reports saved to {output_dir}/")


def main():
    parser = argparse.ArgumentParser(description="GPU Test Runner for CI/CD")
    parser.add_argument("--api-key", required=True)
    parser.add_argument("--gpu", default="RTX 3080")
    parser.add_argument("--max-price", type=float, default=0.30)
    parser.add_argument("--image", default="nvidia/cuda:12.8.0-base-ubuntu22.04")
    parser.add_argument("--project", default=".")
    parser.add_argument("--requirements", default="requirements.txt")
    parser.add_argument("--pytest", action="store_true", help="Run pytest")
    parser.add_argument("--test-path", default="tests/")
    parser.add_argument("--markers", default="gpu")
    parser.add_argument("--output", default="test-results")
    args = parser.parse_args()
    
    runner = GPUTestRunner(args.api_key, args.project)
    
    try:
        if not runner.provision(args.gpu, args.max_price, args.image):
            sys.exit(1)
        
        runner.setup_environment(args.requirements)
        
        if args.pytest:
            suite = runner.run_pytest(args.test_path, args.markers)
        else:
            # Default test suite
            suite = runner.run_test_suite([
                {"name": "CUDA Check", "command": "python -c 'import torch; assert torch.cuda.is_available()'"},
                {"name": "GPU Memory", "command": "nvidia-smi"},
            ])
        
        runner.generate_report(suite, args.output)
        
        # Exit with appropriate code
        if suite.failed > 0:
            print(f"\n❌ {suite.failed} test(s) failed")
            sys.exit(1)
        else:
            print(f"\n✅ All {suite.total} tests passed!")
            sys.exit(0)
        
    finally:
        runner.cleanup()


if __name__ == "__main__":
    main()
```

## Step 2: GitLab CI Configuration

```yaml
# .gitlab-ci.yml
stages:
  - lint
  - test
  - gpu-test
  - deploy

variables:
  CLORE_API_KEY: ${CLORE_API_KEY}
  GPU_TYPE: "RTX 3080"
  MAX_PRICE: "0.30"

lint:
  stage: lint
  image: python:3.11
  script:
    - pip install ruff black
    - ruff check .
    - black --check .

unit-tests:
  stage: test
  image: python:3.11
  script:
    - pip install -r requirements.txt
    - pip install pytest
    - pytest tests/unit/ -v

gpu-tests:
  stage: gpu-test
  image: python:3.11
  needs: [unit-tests]
  script:
    - pip install requests paramiko scp
    - python ci/gpu_test_runner.py
      --api-key "$CLORE_API_KEY"
      --gpu "$GPU_TYPE"
      --max-price "$MAX_PRICE"
      --pytest
      --test-path "tests/gpu/"
      --output test-results
  artifacts:
    when: always
    paths:
      - test-results/
    reports:
      junit: test-results/junit.xml
  rules:
    - if: $CI_PIPELINE_SOURCE == "merge_request_event"
    - if: $CI_COMMIT_BRANCH == "main"

deploy:
  stage: deploy
  needs: [gpu-tests]
  script:
    - echo "Deploying..."
  only:
    - main
```

## Step 3: Example GPU Tests (pytest)

```python
# tests/gpu/test_cuda.py
import pytest
import torch


@pytest.mark.gpu
def test_cuda_available():
    """Test that CUDA is available."""
    assert torch.cuda.is_available(), "CUDA is not available"


@pytest.mark.gpu
def test_cuda_device_count():
    """Test that at least one GPU is available."""
    assert torch.cuda.device_count() >= 1


@pytest.mark.gpu
def test_cuda_memory():
    """Test GPU memory operations."""
    device = torch.device("cuda:0")
    
    # Allocate tensor
    x = torch.randn(1000, 1000, device=device)
    assert x.device.type == "cuda"
    
    # Perform operation
    y = torch.matmul(x, x.T)
    assert y.shape == (1000, 1000)


@pytest.mark.gpu
def test_cuda_computation():
    """Test CUDA computation correctness."""
    device = torch.device("cuda:0")
    
    # Create tensors
    a = torch.tensor([[1.0, 2.0], [3.0, 4.0]], device=device)
    b = torch.tensor([[5.0, 6.0], [7.0, 8.0]], device=device)
    
    # Matrix multiplication
    c = torch.matmul(a, b)
    expected = torch.tensor([[19.0, 22.0], [43.0, 50.0]], device=device)
    
    assert torch.allclose(c, expected)


# tests/gpu/test_model.py
@pytest.mark.gpu
def test_model_inference():
    """Test model inference on GPU."""
    from transformers import AutoModel, AutoTokenizer
    
    model_name = "bert-base-uncased"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name).cuda()
    
    inputs = tokenizer("Hello world", return_tensors="pt")
    inputs = {k: v.cuda() for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = model(**inputs)
    
    assert outputs.last_hidden_state.shape[0] == 1


@pytest.mark.gpu
@pytest.mark.benchmark
def test_inference_performance(benchmark):
    """Benchmark model inference."""
    import time
    
    model = torch.nn.Linear(1024, 1024).cuda()
    x = torch.randn(32, 1024).cuda()
    
    # Warmup
    with torch.no_grad():
        for _ in range(10):
            _ = model(x)
    torch.cuda.synchronize()
    
    # Benchmark
    start = time.time()
    with torch.no_grad():
        for _ in range(100):
            _ = model(x)
    torch.cuda.synchronize()
    elapsed = time.time() - start
    
    throughput = 100 / elapsed
    assert throughput > 1000, f"Throughput too low: {throughput}"
```

## Cost Comparison

| CI Provider          | GPU Type | Cost/Hour     | Notes        |
| -------------------- | -------- | ------------- | ------------ |
| Clore.ai             | RTX 3080 | $0.20         | On-demand    |
| Clore.ai             | RTX 4090 | $0.40         | On-demand    |
| GitHub (self-hosted) | Varies   | Hardware cost | Need own GPU |
| GitLab GPU runners   | A100     | $3.50+        | Expensive    |

## Best Practices

1. **Run GPU tests only when needed** - Use markers and filters
2. **Keep tests fast** - Aim for <5 min total
3. **Use cheaper GPUs for CI** - RTX 3080 is usually sufficient
4. **Cache dependencies** - Pre-built Docker images
5. **Fail fast** - Stop on first failure in CI

## Next Steps

* [GitHub Actions Integration](https://docs.clore.ai/dev/devops-and-automation/github-actions)
* [Prometheus Monitoring](https://docs.clore.ai/dev/devops-and-automation/prometheus-monitoring)
* [Cost Optimization](https://docs.clore.ai/dev/devops-and-automation/cost-optimization)
