Copy # rapids_engine.py
import paramiko
from scp import SCPClient
import json
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
@dataclass
class AnalyticsResult:
"""Results from analytics operation."""
operation: str
execution_time_seconds: float
rows_processed: int
gpu_memory_used_gb: float
result_data: Dict
success: bool
error: Optional[str] = None
class RAPIDSEngine:
"""Execute RAPIDS operations on remote GPU server."""
def __init__(self, ssh_host: str, ssh_port: int, ssh_password: str):
self.ssh_host = ssh_host
self.ssh_port = ssh_port
self.ssh_password = ssh_password
self._ssh = None
self._scp = None
def connect(self):
"""Establish SSH connection."""
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._ssh.connect(
self.ssh_host,
port=self.ssh_port,
username="root",
password=self.ssh_password,
timeout=30
)
self._scp = SCPClient(self._ssh.get_transport())
def disconnect(self):
"""Close connections."""
if self._scp:
self._scp.close()
if self._ssh:
self._ssh.close()
def _exec(self, cmd: str, timeout: int = 3600) -> tuple:
"""Execute command on server."""
stdin, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout)
exit_code = stdout.channel.recv_exit_status()
return stdout.read().decode(), stderr.read().decode(), exit_code
def upload_file(self, local_path: str, remote_path: str):
"""Upload file to server."""
self._scp.put(local_path, remote_path)
def download_file(self, remote_path: str, local_path: str):
"""Download file from server."""
self._scp.get(remote_path, local_path)
def verify_rapids(self) -> Dict:
"""Verify RAPIDS installation and GPU."""
script = '''
import cudf
import cuml
import rmm
import subprocess
gpu_info = subprocess.check_output(
['nvidia-smi', '--query-gpu=name,memory.total,memory.free', '--format=csv,noheader']
).decode().strip()
print(f"RAPIDS_VERSION:{cudf.__version__}")
print(f"GPU_INFO:{gpu_info}")
'''
self._exec(f"cat > /tmp/check.py << 'EOF'\n{script}\nEOF")
out, err, code = self._exec("python3 /tmp/check.py")
result = {"rapids_ready": code == 0}
for line in out.strip().split("\n"):
if ":" in line:
key, val = line.split(":", 1)
result[key.lower()] = val
return result
def run_analytics(self, script: str, description: str = "Analytics") -> AnalyticsResult:
"""Run a RAPIDS analytics script."""
wrapped_script = f'''
import time
import json
import cudf
import cuml
import rmm
# Enable managed memory for large datasets
rmm.reinitialize(managed_memory=True)
start_time = time.time()
result = {{"success": True, "error": None, "data": {{}}}}
try:
{self._indent(script, 4)}
except Exception as e:
result["success"] = False
result["error"] = str(e)
result["execution_time"] = time.time() - start_time
# Get memory usage
import subprocess
mem_out = subprocess.check_output(
["nvidia-smi", "--query-gpu=memory.used", "--format=csv,noheader,nounits"]
).decode()
result["gpu_memory_mb"] = int(mem_out.strip().split("\\n")[0])
print("RESULT:" + json.dumps(result))
'''
self._exec(f"cat > /tmp/analytics.py << 'EOF'\n{wrapped_script}\nEOF")
out, err, code = self._exec("python3 /tmp/analytics.py", timeout=3600)
for line in out.strip().split("\n"):
if line.startswith("RESULT:"):
data = json.loads(line[7:])
return AnalyticsResult(
operation=description,
execution_time_seconds=data.get("execution_time", 0),
rows_processed=data.get("rows_processed", 0),
gpu_memory_used_gb=data.get("gpu_memory_mb", 0) / 1024,
result_data=data.get("data", {}),
success=data.get("success", False),
error=data.get("error")
)
return AnalyticsResult(
operation=description,
execution_time_seconds=0,
rows_processed=0,
gpu_memory_used_gb=0,
result_data={},
success=False,
error=err or "Unknown error"
)
def _indent(self, s: str, n: int = 4) -> str:
return "\n".join(" " * n + line for line in s.split("\n"))
# --- cuDF Operations ---
def load_csv(self, file_path: str) -> AnalyticsResult:
"""Load CSV file using cuDF."""
script = f'''
df = cudf.read_csv("{file_path}")
result["rows_processed"] = len(df)
result["data"]["columns"] = list(df.columns)
result["data"]["shape"] = list(df.shape)
result["data"]["dtypes"] = {{str(k): str(v) for k, v in df.dtypes.items()}}
result["data"]["memory_mb"] = df.memory_usage(deep=True).sum() / (1024**2)
'''
return self.run_analytics(script, "Load CSV")
def load_parquet(self, file_path: str) -> AnalyticsResult:
"""Load Parquet file using cuDF."""
script = f'''
df = cudf.read_parquet("{file_path}")
result["rows_processed"] = len(df)
result["data"]["columns"] = list(df.columns)
result["data"]["shape"] = list(df.shape)
result["data"]["memory_mb"] = df.memory_usage(deep=True).sum() / (1024**2)
'''
return self.run_analytics(script, "Load Parquet")
def describe_data(self, file_path: str) -> AnalyticsResult:
"""Get statistical description of data."""
script = f'''
df = cudf.read_parquet("{file_path}") if "{file_path}".endswith(".parquet") else cudf.read_csv("{file_path}")
result["rows_processed"] = len(df)
# Get describe for numeric columns
desc = df.describe().to_pandas().to_dict()
result["data"]["describe"] = desc
result["data"]["null_counts"] = df.isnull().sum().to_pandas().to_dict()
'''
return self.run_analytics(script, "Describe Data")
def filter_data(self, input_path: str, output_path: str, condition: str) -> AnalyticsResult:
"""Filter data by condition."""
script = f'''
df = cudf.read_parquet("{input_path}") if "{input_path}".endswith(".parquet") else cudf.read_csv("{input_path}")
initial_rows = len(df)
df = df.query("{condition}")
result["rows_processed"] = initial_rows
result["data"]["input_rows"] = initial_rows
result["data"]["output_rows"] = len(df)
result["data"]["filtered_percent"] = (1 - len(df) / initial_rows) * 100
df.to_parquet("{output_path}")
'''
return self.run_analytics(script, f"Filter: {condition}")
def groupby_aggregate(self,
input_path: str,
output_path: str,
group_cols: List[str],
agg_dict: Dict[str, str]) -> AnalyticsResult:
"""Group by and aggregate."""
group_str = ", ".join(f'"{c}"' for c in group_cols)
agg_str = ", ".join(f'"{k}": "{v}"' for k, v in agg_dict.items())
script = f'''
df = cudf.read_parquet("{input_path}") if "{input_path}".endswith(".parquet") else cudf.read_csv("{input_path}")
result["data"]["input_rows"] = len(df)
df_agg = df.groupby([{group_str}]).agg({{{agg_str}}}).reset_index()
result["rows_processed"] = len(df)
result["data"]["output_rows"] = len(df_agg)
result["data"]["groups"] = len(df_agg)
df_agg.to_parquet("{output_path}")
'''
return self.run_analytics(script, f"GroupBy {group_cols}")
def join_data(self,
left_path: str,
right_path: str,
output_path: str,
on: str,
how: str = "inner") -> AnalyticsResult:
"""Join two datasets."""
script = f'''
left = cudf.read_parquet("{left_path}") if "{left_path}".endswith(".parquet") else cudf.read_csv("{left_path}")
right = cudf.read_parquet("{right_path}") if "{right_path}".endswith(".parquet") else cudf.read_csv("{right_path}")
result["data"]["left_rows"] = len(left)
result["data"]["right_rows"] = len(right)
result["rows_processed"] = len(left) + len(right)
merged = left.merge(right, on="{on}", how="{how}")
result["data"]["output_rows"] = len(merged)
merged.to_parquet("{output_path}")
'''
return self.run_analytics(script, f"Join on {on}")
# --- cuML Operations ---
def train_linear_regression(self,
data_path: str,
target_col: str,
feature_cols: List[str],
model_path: str) -> AnalyticsResult:
"""Train linear regression model."""
features_str = ", ".join(f'"{c}"' for c in feature_cols)
script = f'''
from cuml.linear_model import LinearRegression
import pickle
df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)
X = df[[{features_str}]]
y = df["{target_col}"]
model = LinearRegression()
model.fit(X, y)
result["data"]["r2_score"] = float(model.score(X, y))
result["data"]["coefficients"] = model.coef_.tolist()
result["data"]["intercept"] = float(model.intercept_)
# Save model
with open("{model_path}", "wb") as f:
pickle.dump(model, f)
'''
return self.run_analytics(script, "Train Linear Regression")
def train_random_forest(self,
data_path: str,
target_col: str,
feature_cols: List[str],
model_path: str,
n_estimators: int = 100,
max_depth: int = 16) -> AnalyticsResult:
"""Train random forest classifier/regressor."""
features_str = ", ".join(f'"{c}"' for c in feature_cols)
script = f'''
from cuml.ensemble import RandomForestClassifier
import pickle
import numpy as np
df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)
X = df[[{features_str}]].astype("float32")
y = df["{target_col}"].astype("int32")
model = RandomForestClassifier(
n_estimators={n_estimators},
max_depth={max_depth},
random_state=42
)
model.fit(X, y)
accuracy = float(model.score(X, y))
result["data"]["accuracy"] = accuracy
result["data"]["n_estimators"] = {n_estimators}
result["data"]["max_depth"] = {max_depth}
with open("{model_path}", "wb") as f:
pickle.dump(model, f)
'''
return self.run_analytics(script, "Train Random Forest")
def train_kmeans(self,
data_path: str,
feature_cols: List[str],
n_clusters: int,
output_path: str) -> AnalyticsResult:
"""Train K-Means clustering."""
features_str = ", ".join(f'"{c}"' for c in feature_cols)
script = f'''
from cuml.cluster import KMeans
df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)
X = df[[{features_str}]].astype("float32")
kmeans = KMeans(n_clusters={n_clusters}, random_state=42)
labels = kmeans.fit_predict(X)
df["cluster"] = labels
result["data"]["n_clusters"] = {n_clusters}
result["data"]["inertia"] = float(kmeans.inertia_)
result["data"]["cluster_sizes"] = df["cluster"].value_counts().to_pandas().to_dict()
df.to_parquet("{output_path}")
'''
return self.run_analytics(script, f"KMeans ({n_clusters} clusters)")
def train_pca(self,
data_path: str,
feature_cols: List[str],
n_components: int,
output_path: str) -> AnalyticsResult:
"""Perform PCA dimensionality reduction."""
features_str = ", ".join(f'"{c}"' for c in feature_cols)
script = f'''
from cuml.decomposition import PCA
df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)
X = df[[{features_str}]].astype("float32")
pca = PCA(n_components={n_components})
X_transformed = pca.fit_transform(X)
# Create output dataframe
for i in range({n_components}):
df[f"PC{{i+1}}"] = X_transformed[:, i]
result["data"]["n_components"] = {n_components}
result["data"]["explained_variance_ratio"] = pca.explained_variance_ratio_.tolist()
result["data"]["total_variance_explained"] = sum(pca.explained_variance_ratio_.tolist())
df.to_parquet("{output_path}")
'''
return self.run_analytics(script, f"PCA ({n_components} components)")
def predict(self,
model_path: str,
data_path: str,
feature_cols: List[str],
output_path: str) -> AnalyticsResult:
"""Make predictions using saved model."""
features_str = ", ".join(f'"{c}"' for c in feature_cols)
script = f'''
import pickle
df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)
with open("{model_path}", "rb") as f:
model = pickle.load(f)
X = df[[{features_str}]]
predictions = model.predict(X)
df["prediction"] = predictions
result["data"]["prediction_count"] = len(predictions)
df.to_parquet("{output_path}")
'''
return self.run_analytics(script, "Predict")