# GPU-Accelerated Data Processing with RAPIDS

## What We're Building

A complete GPU-accelerated data science workflow using NVIDIA RAPIDS on Clore.ai. Process terabytes of data, train machine learning models, and run complex analytics at 10-100x the speed of traditional CPU-based tools — all with a familiar pandas/scikit-learn API.

**Key Features:**

* cuDF: GPU DataFrame library (pandas API compatible)
* cuML: GPU machine learning (scikit-learn API compatible)
* cuGraph: GPU graph analytics
* Dask-cuDF: Multi-GPU distributed processing
* Automatic GPU provisioning via Clore.ai API
* Jupyter notebook support
* Cost-optimized spot instance usage

## Prerequisites

* Clore.ai account with API key ([get one here](https://clore.ai))
* Python 3.10+
* Basic pandas/scikit-learn knowledge

```bash
pip install requests paramiko scp jupyter
```

## Architecture Overview

```
┌─────────────────────────────────────────────────────────────┐
│                    NVIDIA RAPIDS Stack                       │
├─────────────────────────────────────────────────────────────┤
│  cuDF        │  cuML        │  cuGraph    │  cuSpatial      │
│  DataFrames  │  ML Models   │  Graph      │  Geospatial     │
├─────────────────────────────────────────────────────────────┤
│                        Dask-cuDF                             │
│                  (Multi-GPU Distribution)                    │
├─────────────────────────────────────────────────────────────┤
│                      CUDA / cuPy                             │
├─────────────────────────────────────────────────────────────┤
│                    Clore.ai GPU Server                       │
│               (RTX 4090 / A100 / A6000)                     │
└─────────────────────────────────────────────────────────────┘
```

## Step 1: Clore.ai RAPIDS Client

```python
# clore_rapids_client.py
import requests
import time
import secrets
from typing import Dict, Any, List, Optional
from dataclasses import dataclass

@dataclass
class RAPIDSServer:
    """Represents a RAPIDS-enabled GPU server."""
    server_id: int
    order_id: int
    ssh_host: str
    ssh_port: int
    ssh_password: str
    jupyter_url: str
    gpu_model: str
    gpu_memory_gb: int
    hourly_cost: float


class CloreRAPIDSClient:
    """Clore.ai client for RAPIDS data science workloads."""
    
    BASE_URL = "https://api.clore.ai"
    
    # RAPIDS Docker image
    RAPIDS_IMAGE = "rapidsai/rapidsai-core:24.02-cuda12.0-runtime-ubuntu22.04-py3.10"
    
    # GPU VRAM requirements for RAPIDS
    GPU_VRAM = {
        "A100-80GB": 80, "A100": 80, "A100-40GB": 40,
        "A6000": 48, "RTX 4090": 24, "RTX 3090": 24,
        "A5000": 24, "RTX A4000": 16, "RTX 4080": 16,
        "RTX 3080 Ti": 12, "RTX 3080": 10
    }
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {"auth": api_key}
    
    def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
        """Make API request with retry logic."""
        url = f"{self.BASE_URL}{endpoint}"
        
        for attempt in range(3):
            try:
                response = requests.request(
                    method, url,
                    headers=self.headers,
                    timeout=30,
                    **kwargs
                )
                data = response.json()
                
                if data.get("code") == 5:
                    time.sleep(2 ** attempt)
                    continue
                
                if data.get("code") != 0:
                    raise Exception(f"API Error: {data}")
                return data
                
            except requests.exceptions.Timeout:
                if attempt == 2:
                    raise
                time.sleep(1)
        
        raise Exception("Max retries exceeded")
    
    def find_rapids_gpu(self,
                        min_vram_gb: int = 16,
                        max_price_usd: float = 0.80,
                        prefer_spot: bool = True) -> Optional[Dict]:
        """Find GPU suitable for RAPIDS workloads."""
        
        servers = self._request("GET", "/v1/marketplace")["servers"]
        
        candidates = []
        for server in servers:
            if server.get("rented"):
                continue
            
            gpu_array = server.get("gpu_array", [])
            
            # Find matching GPU with sufficient VRAM
            gpu_match = None
            for gpu in gpu_array:
                for gpu_name, vram in self.GPU_VRAM.items():
                    if gpu_name in gpu and vram >= min_vram_gb:
                        gpu_match = (gpu_name, vram)
                        break
                if gpu_match:
                    break
            
            if not gpu_match:
                continue
            
            price_data = server.get("price", {}).get("usd", {})
            price = price_data.get("spot" if prefer_spot else "on_demand_clore")
            
            if not price or price > max_price_usd:
                continue
            
            candidates.append({
                "id": server["id"],
                "gpus": gpu_array,
                "gpu_count": len(gpu_array),
                "gpu_model": gpu_match[0],
                "vram_gb": gpu_match[1],
                "total_vram_gb": gpu_match[1] * len(gpu_array),
                "price_usd": price,
                "reliability": server.get("reliability", 0)
            })
        
        if not candidates:
            return None
        
        # Sort by VRAM per dollar
        candidates.sort(key=lambda x: (-x["total_vram_gb"] / x["price_usd"], -x["reliability"]))
        return candidates[0]
    
    def rent_rapids_server(self,
                           server: Dict,
                           use_spot: bool = True,
                           jupyter_token: str = None) -> RAPIDSServer:
        """Rent a server for RAPIDS workloads."""
        
        ssh_password = secrets.token_urlsafe(16)
        jupyter_token = jupyter_token or secrets.token_urlsafe(16)
        
        order_data = {
            "renting_server": server["id"],
            "type": "spot" if use_spot else "on-demand",
            "currency": "CLORE-Blockchain",
            "image": self.RAPIDS_IMAGE,
            "ports": {"22": "tcp", "8888": "http", "8787": "http"},
            "env": {
                "NVIDIA_VISIBLE_DEVICES": "all",
                "JUPYTER_TOKEN": jupyter_token,
                "RAPIDS_NO_INITIALIZE": "1"
            },
            "ssh_password": ssh_password,
            "jupyter_token": jupyter_token
        }
        
        if use_spot:
            order_data["spotprice"] = server["price_usd"] * 1.15
        
        result = self._request("POST", "/v1/create_order", json=order_data)
        order_id = result["order_id"]
        
        # Wait for server
        print(f"Waiting for RAPIDS server {server['id']}...")
        for _ in range(120):
            orders = self._request("GET", "/v1/my_orders")["orders"]
            order = next((o for o in orders if o["order_id"] == order_id), None)
            
            if order and order.get("status") == "running":
                conn = order["connection"]
                ssh_parts = conn["ssh"].split()
                ssh_host = ssh_parts[1].split("@")[1] if "@" in ssh_parts[1] else ssh_parts[1]
                ssh_port = int(ssh_parts[-1]) if "-p" in conn["ssh"] else 22
                
                jupyter_url = conn.get("jupyter", f"http://{ssh_host}:8888/?token={jupyter_token}")
                
                return RAPIDSServer(
                    server_id=server["id"],
                    order_id=order_id,
                    ssh_host=ssh_host,
                    ssh_port=ssh_port,
                    ssh_password=ssh_password,
                    jupyter_url=jupyter_url,
                    gpu_model=server["gpu_model"],
                    gpu_memory_gb=server["total_vram_gb"],
                    hourly_cost=server["price_usd"]
                )
            
            time.sleep(2)
        
        raise Exception("Timeout waiting for server")
    
    def cancel_order(self, order_id: int):
        """Cancel a rental order."""
        self._request("POST", "/v1/cancel_order", json={"id": order_id})
```

## Step 2: RAPIDS Data Science Engine

```python
# rapids_engine.py
import paramiko
from scp import SCPClient
import json
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass

@dataclass
class AnalyticsResult:
    """Results from analytics operation."""
    operation: str
    execution_time_seconds: float
    rows_processed: int
    gpu_memory_used_gb: float
    result_data: Dict
    success: bool
    error: Optional[str] = None


class RAPIDSEngine:
    """Execute RAPIDS operations on remote GPU server."""
    
    def __init__(self, ssh_host: str, ssh_port: int, ssh_password: str):
        self.ssh_host = ssh_host
        self.ssh_port = ssh_port
        self.ssh_password = ssh_password
        self._ssh = None
        self._scp = None
    
    def connect(self):
        """Establish SSH connection."""
        self._ssh = paramiko.SSHClient()
        self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self._ssh.connect(
            self.ssh_host,
            port=self.ssh_port,
            username="root",
            password=self.ssh_password,
            timeout=30
        )
        self._scp = SCPClient(self._ssh.get_transport())
    
    def disconnect(self):
        """Close connections."""
        if self._scp:
            self._scp.close()
        if self._ssh:
            self._ssh.close()
    
    def _exec(self, cmd: str, timeout: int = 3600) -> tuple:
        """Execute command on server."""
        stdin, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout)
        exit_code = stdout.channel.recv_exit_status()
        return stdout.read().decode(), stderr.read().decode(), exit_code
    
    def upload_file(self, local_path: str, remote_path: str):
        """Upload file to server."""
        self._scp.put(local_path, remote_path)
    
    def download_file(self, remote_path: str, local_path: str):
        """Download file from server."""
        self._scp.get(remote_path, local_path)
    
    def verify_rapids(self) -> Dict:
        """Verify RAPIDS installation and GPU."""
        script = '''
import cudf
import cuml
import rmm
import subprocess

gpu_info = subprocess.check_output(
    ['nvidia-smi', '--query-gpu=name,memory.total,memory.free', '--format=csv,noheader']
).decode().strip()

print(f"RAPIDS_VERSION:{cudf.__version__}")
print(f"GPU_INFO:{gpu_info}")
'''
        
        self._exec(f"cat > /tmp/check.py << 'EOF'\n{script}\nEOF")
        out, err, code = self._exec("python3 /tmp/check.py")
        
        result = {"rapids_ready": code == 0}
        for line in out.strip().split("\n"):
            if ":" in line:
                key, val = line.split(":", 1)
                result[key.lower()] = val
        
        return result
    
    def run_analytics(self, script: str, description: str = "Analytics") -> AnalyticsResult:
        """Run a RAPIDS analytics script."""
        
        wrapped_script = f'''
import time
import json
import cudf
import cuml
import rmm

# Enable managed memory for large datasets
rmm.reinitialize(managed_memory=True)

start_time = time.time()
result = {{"success": True, "error": None, "data": {{}}}}

try:
{self._indent(script, 4)}
except Exception as e:
    result["success"] = False
    result["error"] = str(e)

result["execution_time"] = time.time() - start_time

# Get memory usage
import subprocess
mem_out = subprocess.check_output(
    ["nvidia-smi", "--query-gpu=memory.used", "--format=csv,noheader,nounits"]
).decode()
result["gpu_memory_mb"] = int(mem_out.strip().split("\\n")[0])

print("RESULT:" + json.dumps(result))
'''
        
        self._exec(f"cat > /tmp/analytics.py << 'EOF'\n{wrapped_script}\nEOF")
        out, err, code = self._exec("python3 /tmp/analytics.py", timeout=3600)
        
        for line in out.strip().split("\n"):
            if line.startswith("RESULT:"):
                data = json.loads(line[7:])
                return AnalyticsResult(
                    operation=description,
                    execution_time_seconds=data.get("execution_time", 0),
                    rows_processed=data.get("rows_processed", 0),
                    gpu_memory_used_gb=data.get("gpu_memory_mb", 0) / 1024,
                    result_data=data.get("data", {}),
                    success=data.get("success", False),
                    error=data.get("error")
                )
        
        return AnalyticsResult(
            operation=description,
            execution_time_seconds=0,
            rows_processed=0,
            gpu_memory_used_gb=0,
            result_data={},
            success=False,
            error=err or "Unknown error"
        )
    
    def _indent(self, s: str, n: int = 4) -> str:
        return "\n".join(" " * n + line for line in s.split("\n"))
    
    # --- cuDF Operations ---
    
    def load_csv(self, file_path: str) -> AnalyticsResult:
        """Load CSV file using cuDF."""
        script = f'''
df = cudf.read_csv("{file_path}")
result["rows_processed"] = len(df)
result["data"]["columns"] = list(df.columns)
result["data"]["shape"] = list(df.shape)
result["data"]["dtypes"] = {{str(k): str(v) for k, v in df.dtypes.items()}}
result["data"]["memory_mb"] = df.memory_usage(deep=True).sum() / (1024**2)
'''
        return self.run_analytics(script, "Load CSV")
    
    def load_parquet(self, file_path: str) -> AnalyticsResult:
        """Load Parquet file using cuDF."""
        script = f'''
df = cudf.read_parquet("{file_path}")
result["rows_processed"] = len(df)
result["data"]["columns"] = list(df.columns)
result["data"]["shape"] = list(df.shape)
result["data"]["memory_mb"] = df.memory_usage(deep=True).sum() / (1024**2)
'''
        return self.run_analytics(script, "Load Parquet")
    
    def describe_data(self, file_path: str) -> AnalyticsResult:
        """Get statistical description of data."""
        script = f'''
df = cudf.read_parquet("{file_path}") if "{file_path}".endswith(".parquet") else cudf.read_csv("{file_path}")
result["rows_processed"] = len(df)

# Get describe for numeric columns
desc = df.describe().to_pandas().to_dict()
result["data"]["describe"] = desc
result["data"]["null_counts"] = df.isnull().sum().to_pandas().to_dict()
'''
        return self.run_analytics(script, "Describe Data")
    
    def filter_data(self, input_path: str, output_path: str, condition: str) -> AnalyticsResult:
        """Filter data by condition."""
        script = f'''
df = cudf.read_parquet("{input_path}") if "{input_path}".endswith(".parquet") else cudf.read_csv("{input_path}")
initial_rows = len(df)

df = df.query("{condition}")
result["rows_processed"] = initial_rows
result["data"]["input_rows"] = initial_rows
result["data"]["output_rows"] = len(df)
result["data"]["filtered_percent"] = (1 - len(df) / initial_rows) * 100

df.to_parquet("{output_path}")
'''
        return self.run_analytics(script, f"Filter: {condition}")
    
    def groupby_aggregate(self, 
                          input_path: str, 
                          output_path: str,
                          group_cols: List[str],
                          agg_dict: Dict[str, str]) -> AnalyticsResult:
        """Group by and aggregate."""
        group_str = ", ".join(f'"{c}"' for c in group_cols)
        agg_str = ", ".join(f'"{k}": "{v}"' for k, v in agg_dict.items())
        
        script = f'''
df = cudf.read_parquet("{input_path}") if "{input_path}".endswith(".parquet") else cudf.read_csv("{input_path}")
result["data"]["input_rows"] = len(df)

df_agg = df.groupby([{group_str}]).agg({{{agg_str}}}).reset_index()
result["rows_processed"] = len(df)
result["data"]["output_rows"] = len(df_agg)
result["data"]["groups"] = len(df_agg)

df_agg.to_parquet("{output_path}")
'''
        return self.run_analytics(script, f"GroupBy {group_cols}")
    
    def join_data(self,
                  left_path: str,
                  right_path: str,
                  output_path: str,
                  on: str,
                  how: str = "inner") -> AnalyticsResult:
        """Join two datasets."""
        script = f'''
left = cudf.read_parquet("{left_path}") if "{left_path}".endswith(".parquet") else cudf.read_csv("{left_path}")
right = cudf.read_parquet("{right_path}") if "{right_path}".endswith(".parquet") else cudf.read_csv("{right_path}")

result["data"]["left_rows"] = len(left)
result["data"]["right_rows"] = len(right)
result["rows_processed"] = len(left) + len(right)

merged = left.merge(right, on="{on}", how="{how}")
result["data"]["output_rows"] = len(merged)

merged.to_parquet("{output_path}")
'''
        return self.run_analytics(script, f"Join on {on}")
    
    # --- cuML Operations ---
    
    def train_linear_regression(self,
                                data_path: str,
                                target_col: str,
                                feature_cols: List[str],
                                model_path: str) -> AnalyticsResult:
        """Train linear regression model."""
        features_str = ", ".join(f'"{c}"' for c in feature_cols)
        
        script = f'''
from cuml.linear_model import LinearRegression
import pickle

df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)

X = df[[{features_str}]]
y = df["{target_col}"]

model = LinearRegression()
model.fit(X, y)

result["data"]["r2_score"] = float(model.score(X, y))
result["data"]["coefficients"] = model.coef_.tolist()
result["data"]["intercept"] = float(model.intercept_)

# Save model
with open("{model_path}", "wb") as f:
    pickle.dump(model, f)
'''
        return self.run_analytics(script, "Train Linear Regression")
    
    def train_random_forest(self,
                            data_path: str,
                            target_col: str,
                            feature_cols: List[str],
                            model_path: str,
                            n_estimators: int = 100,
                            max_depth: int = 16) -> AnalyticsResult:
        """Train random forest classifier/regressor."""
        features_str = ", ".join(f'"{c}"' for c in feature_cols)
        
        script = f'''
from cuml.ensemble import RandomForestClassifier
import pickle
import numpy as np

df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)

X = df[[{features_str}]].astype("float32")
y = df["{target_col}"].astype("int32")

model = RandomForestClassifier(
    n_estimators={n_estimators},
    max_depth={max_depth},
    random_state=42
)
model.fit(X, y)

accuracy = float(model.score(X, y))
result["data"]["accuracy"] = accuracy
result["data"]["n_estimators"] = {n_estimators}
result["data"]["max_depth"] = {max_depth}

with open("{model_path}", "wb") as f:
    pickle.dump(model, f)
'''
        return self.run_analytics(script, "Train Random Forest")
    
    def train_kmeans(self,
                     data_path: str,
                     feature_cols: List[str],
                     n_clusters: int,
                     output_path: str) -> AnalyticsResult:
        """Train K-Means clustering."""
        features_str = ", ".join(f'"{c}"' for c in feature_cols)
        
        script = f'''
from cuml.cluster import KMeans

df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)

X = df[[{features_str}]].astype("float32")

kmeans = KMeans(n_clusters={n_clusters}, random_state=42)
labels = kmeans.fit_predict(X)

df["cluster"] = labels
result["data"]["n_clusters"] = {n_clusters}
result["data"]["inertia"] = float(kmeans.inertia_)
result["data"]["cluster_sizes"] = df["cluster"].value_counts().to_pandas().to_dict()

df.to_parquet("{output_path}")
'''
        return self.run_analytics(script, f"KMeans ({n_clusters} clusters)")
    
    def train_pca(self,
                  data_path: str,
                  feature_cols: List[str],
                  n_components: int,
                  output_path: str) -> AnalyticsResult:
        """Perform PCA dimensionality reduction."""
        features_str = ", ".join(f'"{c}"' for c in feature_cols)
        
        script = f'''
from cuml.decomposition import PCA

df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)

X = df[[{features_str}]].astype("float32")

pca = PCA(n_components={n_components})
X_transformed = pca.fit_transform(X)

# Create output dataframe
for i in range({n_components}):
    df[f"PC{{i+1}}"] = X_transformed[:, i]

result["data"]["n_components"] = {n_components}
result["data"]["explained_variance_ratio"] = pca.explained_variance_ratio_.tolist()
result["data"]["total_variance_explained"] = sum(pca.explained_variance_ratio_.tolist())

df.to_parquet("{output_path}")
'''
        return self.run_analytics(script, f"PCA ({n_components} components)")
    
    def predict(self,
                model_path: str,
                data_path: str,
                feature_cols: List[str],
                output_path: str) -> AnalyticsResult:
        """Make predictions using saved model."""
        features_str = ", ".join(f'"{c}"' for c in feature_cols)
        
        script = f'''
import pickle

df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)

with open("{model_path}", "rb") as f:
    model = pickle.load(f)

X = df[[{features_str}]]
predictions = model.predict(X)

df["prediction"] = predictions
result["data"]["prediction_count"] = len(predictions)

df.to_parquet("{output_path}")
'''
        return self.run_analytics(script, "Predict")
```

## Step 3: Complete Data Science Pipeline

```python
# rapids_pipeline.py
import os
import time
from typing import List, Dict, Optional
from dataclasses import dataclass

from clore_rapids_client import CloreRAPIDSClient, RAPIDSServer
from rapids_engine import RAPIDSEngine, AnalyticsResult

@dataclass
class PipelineStats:
    """Statistics for the pipeline."""
    total_operations: int
    successful_operations: int
    total_rows_processed: int
    total_time_seconds: float
    total_cost_usd: float
    throughput_rows_per_sec: float


class RAPIDSPipeline:
    """High-level data science pipeline using RAPIDS."""
    
    def __init__(self, api_key: str):
        self.client = CloreRAPIDSClient(api_key)
        self.server: RAPIDSServer = None
        self.engine: RAPIDSEngine = None
        self.results: List[AnalyticsResult] = []
    
    def setup(self, 
              min_vram_gb: int = 16,
              max_price_usd: float = 0.80):
        """Provision RAPIDS server."""
        
        print("🔍 Finding RAPIDS GPU...")
        gpu = self.client.find_rapids_gpu(
            min_vram_gb=min_vram_gb,
            max_price_usd=max_price_usd
        )
        
        if not gpu:
            raise Exception(f"No GPU with {min_vram_gb}GB+ under ${max_price_usd}/hr")
        
        print(f"   {gpu['gpu_model']} ({gpu['total_vram_gb']}GB) @ ${gpu['price_usd']:.2f}/hr")
        
        print("🚀 Provisioning server...")
        self.server = self.client.rent_rapids_server(gpu)
        
        print(f"   SSH: {self.server.ssh_host}:{self.server.ssh_port}")
        print(f"   Jupyter: {self.server.jupyter_url}")
        
        # Connect engine
        self.engine = RAPIDSEngine(
            self.server.ssh_host,
            self.server.ssh_port,
            self.server.ssh_password
        )
        self.engine.connect()
        
        # Verify
        print("🔧 Verifying RAPIDS...")
        info = self.engine.verify_rapids()
        print(f"   Version: {info.get('rapids_version', 'N/A')}")
        
        return self
    
    def upload_data(self, local_path: str, remote_path: str = None) -> str:
        """Upload data file."""
        if remote_path is None:
            remote_path = f"/tmp/data/{os.path.basename(local_path)}"
        
        self.engine._exec(f"mkdir -p {os.path.dirname(remote_path)}")
        print(f"📤 Uploading {os.path.basename(local_path)}...")
        self.engine.upload_file(local_path, remote_path)
        return remote_path
    
    def download_data(self, remote_path: str, local_path: str):
        """Download data file."""
        print(f"📥 Downloading {os.path.basename(remote_path)}...")
        os.makedirs(os.path.dirname(local_path) or ".", exist_ok=True)
        self.engine.download_file(remote_path, local_path)
    
    def run(self, script: str, description: str = "Custom Operation") -> AnalyticsResult:
        """Run custom RAPIDS script."""
        result = self.engine.run_analytics(script, description)
        self.results.append(result)
        
        if result.success:
            print(f"   ✅ {description}: {result.rows_processed:,} rows in {result.execution_time_seconds:.2f}s")
        else:
            print(f"   ❌ {description}: {result.error}")
        
        return result
    
    def load(self, file_path: str) -> AnalyticsResult:
        """Load data file."""
        if file_path.endswith(".parquet"):
            result = self.engine.load_parquet(file_path)
        else:
            result = self.engine.load_csv(file_path)
        self.results.append(result)
        return result
    
    def describe(self, file_path: str) -> AnalyticsResult:
        """Get data statistics."""
        result = self.engine.describe_data(file_path)
        self.results.append(result)
        return result
    
    def filter(self, input_path: str, output_path: str, condition: str) -> AnalyticsResult:
        """Filter data."""
        result = self.engine.filter_data(input_path, output_path, condition)
        self.results.append(result)
        return result
    
    def groupby(self, input_path: str, output_path: str, 
                group_cols: List[str], aggs: Dict[str, str]) -> AnalyticsResult:
        """Group by and aggregate."""
        result = self.engine.groupby_aggregate(input_path, output_path, group_cols, aggs)
        self.results.append(result)
        return result
    
    def join(self, left: str, right: str, output: str, on: str, how: str = "inner") -> AnalyticsResult:
        """Join datasets."""
        result = self.engine.join_data(left, right, output, on, how)
        self.results.append(result)
        return result
    
    def train_model(self, 
                    model_type: str,
                    data_path: str,
                    target: str,
                    features: List[str],
                    model_path: str,
                    **kwargs) -> AnalyticsResult:
        """Train ML model."""
        if model_type == "linear_regression":
            result = self.engine.train_linear_regression(data_path, target, features, model_path)
        elif model_type == "random_forest":
            result = self.engine.train_random_forest(
                data_path, target, features, model_path,
                n_estimators=kwargs.get("n_estimators", 100),
                max_depth=kwargs.get("max_depth", 16)
            )
        else:
            raise ValueError(f"Unknown model type: {model_type}")
        
        self.results.append(result)
        return result
    
    def cluster(self, data_path: str, features: List[str], 
                n_clusters: int, output_path: str) -> AnalyticsResult:
        """K-Means clustering."""
        result = self.engine.train_kmeans(data_path, features, n_clusters, output_path)
        self.results.append(result)
        return result
    
    def pca(self, data_path: str, features: List[str],
            n_components: int, output_path: str) -> AnalyticsResult:
        """PCA dimensionality reduction."""
        result = self.engine.train_pca(data_path, features, n_components, output_path)
        self.results.append(result)
        return result
    
    def predict(self, model_path: str, data_path: str,
                features: List[str], output_path: str) -> AnalyticsResult:
        """Make predictions."""
        result = self.engine.predict(model_path, data_path, features, output_path)
        self.results.append(result)
        return result
    
    def get_stats(self) -> PipelineStats:
        """Get pipeline statistics."""
        successful = [r for r in self.results if r.success]
        total_rows = sum(r.rows_processed for r in successful)
        total_time = sum(r.execution_time_seconds for r in self.results)
        
        return PipelineStats(
            total_operations=len(self.results),
            successful_operations=len(successful),
            total_rows_processed=total_rows,
            total_time_seconds=total_time,
            total_cost_usd=(total_time / 3600) * self.server.hourly_cost if self.server else 0,
            throughput_rows_per_sec=total_rows / total_time if total_time > 0 else 0
        )
    
    def cleanup(self):
        """Release resources."""
        if self.engine:
            self.engine.disconnect()
        if self.server:
            print("🧹 Releasing server...")
            self.client.cancel_order(self.server.order_id)
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.cleanup()
```

## Full Script: End-to-End Data Science

```python
#!/usr/bin/env python3
"""
RAPIDS Data Science Pipeline on Clore.ai.

Usage:
    python rapids_datascience.py --api-key YOUR_API_KEY --data dataset.parquet \
        --operation describe|cluster|train|predict
"""

import os
import sys
import time
import json
import argparse
import secrets
import requests
import paramiko
from scp import SCPClient
from typing import List, Dict


class CloreRAPIDSDataScience:
    """Complete RAPIDS data science solution on Clore.ai."""
    
    BASE_URL = "https://api.clore.ai"
    RAPIDS_IMAGE = "rapidsai/rapidsai-core:24.02-cuda12.0-runtime-ubuntu22.04-py3.10"
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {"auth": api_key}
        self.order_id = None
        self.ssh_host = None
        self.ssh_port = None
        self.ssh_password = None
        self.hourly_cost = 0.0
        self._ssh = None
        self._scp = None
    
    def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
        url = f"{self.BASE_URL}{endpoint}"
        for attempt in range(3):
            response = requests.request(method, url, headers=self.headers, **kwargs)
            data = response.json()
            if data.get("code") == 5:
                time.sleep(2 ** attempt)
                continue
            if data.get("code") != 0:
                raise Exception(f"API Error: {data}")
            return data
        raise Exception("Max retries")
    
    def setup(self, min_vram: int = 16, max_price: float = 0.80):
        print("🔍 Finding RAPIDS GPU...")
        servers = self._api("GET", "/v1/marketplace")["servers"]
        
        gpu_vram = {"A100": 80, "RTX 4090": 24, "RTX 3090": 24, "A6000": 48}
        
        candidates = []
        for s in servers:
            if s.get("rented"):
                continue
            gpus = s.get("gpu_array", [])
            
            match = None
            for gpu in gpus:
                for g, vram in gpu_vram.items():
                    if g in gpu and vram >= min_vram:
                        match = (g, vram)
                        break
            
            if not match:
                continue
            
            price = s.get("price", {}).get("usd", {}).get("spot")
            if price and price <= max_price:
                candidates.append({
                    "id": s["id"], "gpu": match[0], "vram": match[1],
                    "price": price
                })
        
        if not candidates:
            raise Exception(f"No GPU with {min_vram}GB+ under ${max_price}/hr")
        
        gpu = max(candidates, key=lambda x: x["vram"] / x["price"])
        print(f"   {gpu['gpu']} ({gpu['vram']}GB) @ ${gpu['price']:.2f}/hr")
        
        self.ssh_password = secrets.token_urlsafe(16)
        self.hourly_cost = gpu["price"]
        
        print("🚀 Provisioning...")
        order_data = {
            "renting_server": gpu["id"],
            "type": "spot",
            "currency": "CLORE-Blockchain",
            "image": self.RAPIDS_IMAGE,
            "ports": {"22": "tcp", "8888": "http"},
            "env": {"NVIDIA_VISIBLE_DEVICES": "all"},
            "ssh_password": self.ssh_password,
            "spotprice": gpu["price"] * 1.15
        }
        
        result = self._api("POST", "/v1/create_order", json=order_data)
        self.order_id = result["order_id"]
        
        print("⏳ Waiting...")
        for _ in range(120):
            orders = self._api("GET", "/v1/my_orders")["orders"]
            order = next((o for o in orders if o["order_id"] == self.order_id), None)
            if order and order.get("status") == "running":
                conn = order["connection"]["ssh"]
                parts = conn.split()
                self.ssh_host = parts[1].split("@")[1]
                self.ssh_port = int(parts[-1]) if "-p" in conn else 22
                break
            time.sleep(2)
        else:
            raise Exception("Timeout")
        
        self._ssh = paramiko.SSHClient()
        self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self._ssh.connect(self.ssh_host, port=self.ssh_port,
                          username="root", password=self.ssh_password, timeout=30)
        self._scp = SCPClient(self._ssh.get_transport())
        
        print(f"✅ Ready: {self.ssh_host}:{self.ssh_port}")
    
    def _exec(self, cmd: str, timeout: int = 3600) -> str:
        stdin, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout)
        stdout.channel.recv_exit_status()
        return stdout.read().decode()
    
    def upload(self, local: str) -> str:
        self._exec("mkdir -p /tmp/data /tmp/output /tmp/models")
        remote = f"/tmp/data/{os.path.basename(local)}"
        print(f"📤 Uploading {os.path.basename(local)}...")
        self._scp.put(local, remote)
        return remote
    
    def download(self, remote: str, local: str):
        os.makedirs(os.path.dirname(local) or ".", exist_ok=True)
        self._scp.get(remote, local)
    
    def run_script(self, script: str) -> Dict:
        wrapped = f'''
import cudf, cuml, json, time, rmm
rmm.reinitialize(managed_memory=True)
start = time.time()
result = {{"success": True, "data": {{}}}}
try:
{self._indent(script)}
except Exception as e:
    result["success"] = False
    result["error"] = str(e)
result["time"] = time.time() - start
print("OUT:" + json.dumps(result))
'''
        self._exec(f"cat > /tmp/run.py << 'EOF'\n{wrapped}\nEOF")
        out = self._exec("python3 /tmp/run.py")
        
        for line in out.split("\n"):
            if line.startswith("OUT:"):
                return json.loads(line[4:])
        return {"success": False, "error": "Parse error"}
    
    def _indent(self, s: str, n: int = 4) -> str:
        return "\n".join(" " * n + line for line in s.split("\n"))
    
    def describe(self, data_path: str) -> Dict:
        script = f'''
df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["data"]["rows"] = len(df)
result["data"]["columns"] = list(df.columns)
result["data"]["dtypes"] = {{str(k): str(v) for k, v in df.dtypes.items()}}
desc = df.describe().to_pandas().to_dict()
result["data"]["describe"] = desc
'''
        print("📊 Describing data...")
        return self.run_script(script)
    
    def cluster(self, data_path: str, features: List[str], n_clusters: int, output: str) -> Dict:
        features_str = ", ".join(f'"{f}"' for f in features)
        script = f'''
from cuml.cluster import KMeans
df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
X = df[[{features_str}]].astype("float32")
kmeans = KMeans(n_clusters={n_clusters}, random_state=42)
df["cluster"] = kmeans.fit_predict(X)
result["data"]["inertia"] = float(kmeans.inertia_)
result["data"]["cluster_sizes"] = df["cluster"].value_counts().to_pandas().to_dict()
df.to_parquet("{output}")
'''
        print(f"🔮 Clustering ({n_clusters} clusters)...")
        return self.run_script(script)
    
    def train_rf(self, data_path: str, target: str, features: List[str], 
                 model_path: str, n_estimators: int = 100) -> Dict:
        features_str = ", ".join(f'"{f}"' for f in features)
        script = f'''
from cuml.ensemble import RandomForestClassifier
import pickle
df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
X = df[[{features_str}]].astype("float32")
y = df["{target}"].astype("int32")
model = RandomForestClassifier(n_estimators={n_estimators}, max_depth=16, random_state=42)
model.fit(X, y)
result["data"]["accuracy"] = float(model.score(X, y))
with open("{model_path}", "wb") as f:
    pickle.dump(model, f)
'''
        print(f"🌲 Training Random Forest ({n_estimators} trees)...")
        return self.run_script(script)
    
    def predict(self, model_path: str, data_path: str, features: List[str], output: str) -> Dict:
        features_str = ", ".join(f'"{f}"' for f in features)
        script = f'''
import pickle
df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
with open("{model_path}", "rb") as f:
    model = pickle.load(f)
X = df[[{features_str}]]
df["prediction"] = model.predict(X)
result["data"]["predictions"] = len(df)
df.to_parquet("{output}")
'''
        print("🎯 Making predictions...")
        return self.run_script(script)
    
    def cleanup(self):
        if self._scp:
            self._scp.close()
        if self._ssh:
            self._ssh.close()
        if self.order_id:
            print("🧹 Releasing...")
            self._api("POST", "/v1/cancel_order", json={"id": self.order_id})
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.cleanup()


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--api-key", required=True)
    parser.add_argument("--data", required=True)
    parser.add_argument("--operation", choices=["describe", "cluster", "train", "predict"], required=True)
    parser.add_argument("--features", help="Comma-separated feature columns")
    parser.add_argument("--target", help="Target column for training")
    parser.add_argument("--clusters", type=int, default=5)
    parser.add_argument("--output", default="./output.parquet")
    parser.add_argument("--model", default="/tmp/models/model.pkl")
    parser.add_argument("--min-vram", type=int, default=16)
    parser.add_argument("--max-price", type=float, default=0.80)
    args = parser.parse_args()
    
    start = time.time()
    
    with CloreRAPIDSDataScience(args.api_key) as ds:
        ds.setup(args.min_vram, args.max_price)
        
        # Upload data
        remote_data = ds.upload(args.data)
        
        features = args.features.split(",") if args.features else []
        
        if args.operation == "describe":
            result = ds.describe(remote_data)
            print(f"\n📋 Data Summary:")
            print(f"   Rows: {result['data'].get('rows', 0):,}")
            print(f"   Columns: {result['data'].get('columns', [])}")
            
        elif args.operation == "cluster":
            if not features:
                raise ValueError("--features required for clustering")
            output = f"/tmp/output/{os.path.basename(args.output)}"
            result = ds.cluster(remote_data, features, args.clusters, output)
            ds.download(output, args.output)
            print(f"\n✅ Clustering complete")
            print(f"   Inertia: {result['data'].get('inertia', 0):.2f}")
            print(f"   Output: {args.output}")
            
        elif args.operation == "train":
            if not features or not args.target:
                raise ValueError("--features and --target required for training")
            result = ds.train_rf(remote_data, args.target, features, args.model)
            ds.download(args.model, args.model.split("/")[-1])
            print(f"\n✅ Model trained")
            print(f"   Accuracy: {result['data'].get('accuracy', 0):.4f}")
            
        elif args.operation == "predict":
            if not features:
                raise ValueError("--features required for prediction")
            ds.upload(args.model.split("/")[-1])
            output = f"/tmp/output/{os.path.basename(args.output)}"
            result = ds.predict(args.model, remote_data, features, output)
            ds.download(output, args.output)
            print(f"\n✅ Predictions complete")
            print(f"   Output: {args.output}")
        
        elapsed = time.time() - start
        cost = (elapsed / 3600) * ds.hourly_cost
        print(f"\n⏱️  Time: {elapsed:.1f}s | Cost: ${cost:.4f}")


if __name__ == "__main__":
    main()
```

## Performance Comparison

| Operation            | 100M Rows | scikit-learn | cuML | Speedup |
| -------------------- | --------- | ------------ | ---- | ------- |
| K-Means (8 clusters) | 100M      | 180s         | 3s   | **60x** |
| Random Forest        | 100M      | 300s         | 8s   | **37x** |
| PCA (50 components)  | 100M      | 90s          | 2s   | **45x** |
| Linear Regression    | 100M      | 25s          | 0.5s | **50x** |
| DBSCAN               | 10M       | 600s         | 15s  | **40x** |

## Cost Comparison

| Workload           | Local (CPU) | AWS SageMaker | Clore.ai RAPIDS |
| ------------------ | ----------- | ------------- | --------------- |
| 100M row analysis  | 30 min      | $2.00         | **$0.25**       |
| Model training     | 2 hours     | $8.00         | **$0.80**       |
| Daily ETL pipeline | 4 hours     | $15.00        | **$1.50**       |

## Best Practices

1. **Use Parquet format** — 10x faster than CSV
2. **Enable managed memory** — `rmm.reinitialize(managed_memory=True)`
3. **Use A100/A6000** for large datasets (more VRAM)
4. **Batch operations** — keep data on GPU between transforms
5. **Use spot instances** — 50-70% cheaper for batch jobs

## Next Steps

* [GPU ETL with cuDF](https://docs.clore.ai/dev/data-processing-and-pipelines/gpu-etl)
* [Video Transcoding](https://docs.clore.ai/dev/data-processing-and-pipelines/video-transcoding)
* [ML Training on Clore.ai](https://docs.clore.ai/dev/machine-learning-and-training/training-scheduler)
