Copy #!/usr/bin/env python3
"""
GPU-accelerated ETL Pipeline using RAPIDS on Clore.ai.
Usage:
python gpu_etl.py --api-key YOUR_API_KEY --input data.csv --output result.parquet \
--filter "amount > 100" --groupby customer_id --agg "amount:sum,count:count"
"""
import os
import sys
import time
import json
import argparse
import secrets
import requests
import paramiko
from scp import SCPClient
from typing import List, Dict, Optional
from dataclasses import dataclass, asdict
@dataclass
class ETLResult:
operation: str
input_rows: int
output_rows: int
time_seconds: float
success: bool
error: Optional[str] = None
class CloreRAPIDSETL:
"""Complete GPU ETL solution using RAPIDS on Clore.ai."""
BASE_URL = "https://api.clore.ai"
RAPIDS_IMAGE = "rapidsai/rapidsai-core:24.02-cuda12.0-runtime-ubuntu22.04-py3.10"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {"auth": api_key}
self.server = None
self.order_id = None
self.ssh_host = None
self.ssh_port = None
self.ssh_password = None
self.hourly_cost = 0.0
self._ssh = None
self._scp = None
def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
url = f"{self.BASE_URL}{endpoint}"
for attempt in range(3):
response = requests.request(method, url, headers=self.headers, **kwargs)
data = response.json()
if data.get("code") == 5:
time.sleep(2 ** attempt)
continue
if data.get("code") != 0:
raise Exception(f"API Error: {data}")
return data
raise Exception("Max retries")
def find_gpu(self, min_vram: int = 16, max_price: float = 0.60) -> Optional[Dict]:
servers = self._api("GET", "/v1/marketplace")["servers"]
# GPUs with sufficient VRAM for RAPIDS
good_gpus = {"A100": 80, "RTX 4090": 24, "RTX 3090": 24, "A6000": 48, "A5000": 24}
candidates = []
for s in servers:
if s.get("rented"):
continue
gpus = s.get("gpu_array", [])
match = None
for gpu in gpus:
for g, vram in good_gpus.items():
if g in gpu and vram >= min_vram:
match = (g, vram)
break
if not match:
continue
price = s.get("price", {}).get("usd", {}).get("spot")
if price and price <= max_price:
candidates.append({
"id": s["id"], "gpu": match[0], "vram": match[1],
"price": price, "reliability": s.get("reliability", 0)
})
if not candidates:
return None
return max(candidates, key=lambda x: (x["vram"] / x["price"], x["reliability"]))
def setup(self, min_vram: int = 16, max_price: float = 0.60):
print("π Finding RAPIDS GPU...")
gpu = self.find_gpu(min_vram, max_price)
if not gpu:
raise Exception(f"No GPU with {min_vram}GB+ under ${max_price}/hr")
print(f" {gpu['gpu']} ({gpu['vram']}GB) @ ${gpu['price']:.2f}/hr")
self.ssh_password = secrets.token_urlsafe(16)
self.hourly_cost = gpu["price"]
print("π Provisioning server...")
order_data = {
"renting_server": gpu["id"],
"type": "spot",
"currency": "CLORE-Blockchain",
"image": self.RAPIDS_IMAGE,
"ports": {"22": "tcp"},
"env": {"NVIDIA_VISIBLE_DEVICES": "all"},
"ssh_password": self.ssh_password,
"spotprice": gpu["price"] * 1.15
}
result = self._api("POST", "/v1/create_order", json=order_data)
self.order_id = result["order_id"]
print("β³ Waiting for server...")
for _ in range(120):
orders = self._api("GET", "/v1/my_orders")["orders"]
order = next((o for o in orders if o["order_id"] == self.order_id), None)
if order and order.get("status") == "running":
conn = order["connection"]["ssh"]
parts = conn.split()
self.ssh_host = parts[1].split("@")[1]
self.ssh_port = int(parts[-1]) if "-p" in conn else 22
break
time.sleep(2)
else:
raise Exception("Timeout")
# Connect SSH
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._ssh.connect(self.ssh_host, port=self.ssh_port,
username="root", password=self.ssh_password, timeout=30)
self._scp = SCPClient(self._ssh.get_transport())
print(f"β
Server ready: {self.ssh_host}:{self.ssh_port}")
# Verify RAPIDS
stdin, stdout, stderr = self._ssh.exec_command(
'python3 -c "import cudf; print(cudf.__version__)"'
)
version = stdout.read().decode().strip()
print(f" cuDF version: {version}")
def _exec(self, cmd: str, timeout: int = 3600) -> str:
stdin, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout)
stdout.channel.recv_exit_status()
return stdout.read().decode()
def upload(self, local: str) -> str:
remote = f"/tmp/data/{os.path.basename(local)}"
self._exec(f"mkdir -p /tmp/data /tmp/output")
print(f"π€ Uploading {os.path.basename(local)}...")
self._scp.put(local, remote)
return remote
def download(self, remote: str, local: str):
print(f"π₯ Downloading to {local}...")
os.makedirs(os.path.dirname(local) or ".", exist_ok=True)
self._scp.get(remote, local)
def run_etl(self, script: str) -> ETLResult:
"""Execute RAPIDS ETL script."""
wrapped = f'''
import cudf
import time
import json
import rmm
rmm.reinitialize(managed_memory=True)
start = time.time()
result = {{"success": True, "error": None, "input_rows": 0, "output_rows": 0}}
try:
{self._indent(script)}
except Exception as e:
result["success"] = False
result["error"] = str(e)
result["time"] = time.time() - start
print("RESULT:" + json.dumps(result))
'''
self._exec(f"cat > /tmp/etl.py << 'EOF'\n{wrapped}\nEOF")
out = self._exec("python3 /tmp/etl.py")
for line in out.split("\n"):
if line.startswith("RESULT:"):
data = json.loads(line[7:])
return ETLResult(
operation="custom",
input_rows=data.get("input_rows", 0),
output_rows=data.get("output_rows", 0),
time_seconds=data.get("time", 0),
success=data.get("success", False),
error=data.get("error")
)
return ETLResult("custom", 0, 0, 0, False, "Parse error")
def _indent(self, s: str, n: int = 4) -> str:
return "\n".join(" " * n + line for line in s.split("\n"))
def filter_data(self, input_path: str, output_path: str, condition: str) -> ETLResult:
script = f'''
df = cudf.read_parquet("{input_path}") if "{input_path}".endswith(".parquet") else cudf.read_csv("{input_path}")
result["input_rows"] = len(df)
df = df.query("{condition}")
result["output_rows"] = len(df)
df.to_parquet("{output_path}") if "{output_path}".endswith(".parquet") else df.to_csv("{output_path}", index=False)
'''
print(f"π Filter: {condition}")
return self.run_etl(script)
def aggregate_data(self, input_path: str, output_path: str,
group_by: List[str], aggs: Dict[str, str]) -> ETLResult:
agg_str = ", ".join(f'"{k}": "{v}"' for k, v in aggs.items())
group_str = ", ".join(f'"{g}"' for g in group_by)
script = f'''
df = cudf.read_parquet("{input_path}") if "{input_path}".endswith(".parquet") else cudf.read_csv("{input_path}")
result["input_rows"] = len(df)
df = df.groupby([{group_str}]).agg({{{agg_str}}}).reset_index()
result["output_rows"] = len(df)
df.to_parquet("{output_path}") if "{output_path}".endswith(".parquet") else df.to_csv("{output_path}", index=False)
'''
print(f"π Aggregate by {group_by}")
return self.run_etl(script)
def join_data(self, left: str, right: str, output: str, on: str, how: str = "inner") -> ETLResult:
script = f'''
left = cudf.read_parquet("{left}") if "{left}".endswith(".parquet") else cudf.read_csv("{left}")
right = cudf.read_parquet("{right}") if "{right}".endswith(".parquet") else cudf.read_csv("{right}")
result["input_rows"] = len(left) + len(right)
df = left.merge(right, on="{on}", how="{how}")
result["output_rows"] = len(df)
df.to_parquet("{output}") if "{output}".endswith(".parquet") else df.to_csv("{output}", index=False)
'''
print(f"π Join on {on} ({how})")
return self.run_etl(script)
def cleanup(self):
if self._scp:
self._scp.close()
if self._ssh:
self._ssh.close()
if self.order_id:
print("π§Ή Releasing server...")
self._api("POST", "/v1/cancel_order", json={"id": self.order_id})
def __enter__(self):
return self
def __exit__(self, *args):
self.cleanup()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--api-key", required=True)
parser.add_argument("--input", "-i", required=True, help="Input file (CSV/Parquet)")
parser.add_argument("--output", "-o", required=True, help="Output file")
parser.add_argument("--filter", help="Filter condition (e.g., 'amount > 100')")
parser.add_argument("--groupby", help="Group by columns (comma-separated)")
parser.add_argument("--agg", help="Aggregations (col:func,col:func)")
parser.add_argument("--join", help="Right table for join")
parser.add_argument("--join-on", help="Join column")
parser.add_argument("--join-how", default="inner", help="Join type")
parser.add_argument("--min-vram", type=int, default=16)
parser.add_argument("--max-price", type=float, default=0.60)
args = parser.parse_args()
with CloreRAPIDSETL(args.api_key) as etl:
etl.setup(args.min_vram, args.max_price)
# Upload input
remote_input = etl.upload(args.input)
remote_output = f"/tmp/output/{os.path.basename(args.output)}"
etl._exec("mkdir -p /tmp/output")
results = []
current_input = remote_input
step = 0
# Filter
if args.filter:
step += 1
step_output = f"/tmp/output/step{step}.parquet"
result = etl.filter_data(current_input, step_output, args.filter)
results.append(result)
if result.success:
print(f" β
{result.input_rows:,} β {result.output_rows:,} rows ({result.time_seconds:.2f}s)")
current_input = step_output
# Join
if args.join and args.join_on:
remote_right = etl.upload(args.join)
step += 1
step_output = f"/tmp/output/step{step}.parquet"
result = etl.join_data(current_input, remote_right, step_output, args.join_on, args.join_how)
results.append(result)
if result.success:
print(f" β
{result.input_rows:,} β {result.output_rows:,} rows ({result.time_seconds:.2f}s)")
current_input = step_output
# Aggregate
if args.groupby and args.agg:
group_cols = [c.strip() for c in args.groupby.split(",")]
agg_dict = {}
for pair in args.agg.split(","):
col, func = pair.split(":")
agg_dict[col.strip()] = func.strip()
step += 1
step_output = f"/tmp/output/step{step}.parquet"
result = etl.aggregate_data(current_input, step_output, group_cols, agg_dict)
results.append(result)
if result.success:
print(f" β
{result.input_rows:,} β {result.output_rows:,} rows ({result.time_seconds:.2f}s)")
current_input = step_output
# Copy to final output
if current_input != remote_input:
if args.output.endswith(".parquet"):
etl._exec(f"cp {current_input} {remote_output}")
else:
etl._exec(f"python3 -c \"import cudf; cudf.read_parquet('{current_input}').to_csv('{remote_output}', index=False)\"")
else:
remote_output = current_input
# Download output
etl.download(remote_output, args.output)
# Summary
print("\n" + "="*60)
print("π ETL SUMMARY")
total_time = sum(r.time_seconds for r in results)
cost = (total_time / 3600) * etl.hourly_cost
print(f" Operations: {len(results)}")
print(f" Total time: {total_time:.2f}s")
print(f" Cost: ${cost:.4f}")
print(f" Output: {args.output}")
if __name__ == "__main__":
main()