Copy # run_finetune.py
"""Orchestrate Hugging Face fine-tuning on Clore GPUs."""
import os
import sys
import time
import json
import requests
import paramiko
from scp import SCPClient
from typing import Dict
class HFFineTuner:
"""Run Hugging Face fine-tuning on Clore."""
BASE_URL = "https://api.clore.ai"
def __init__(self, api_key: str, hf_token: str = None, wandb_key: str = None):
self.api_key = api_key
self.hf_token = hf_token
self.wandb_key = wandb_key
self.headers = {"auth": api_key}
self.ssh_client = None
self.current_order = None
def _request(self, method: str, endpoint: str, **kwargs) -> Dict:
url = f"{self.BASE_URL}{endpoint}"
response = requests.request(method, url, headers=self.headers, **kwargs)
return response.json()
def find_gpu(self, gpu_type: str, max_price: float, min_vram_gb: int = 24) -> Dict:
"""Find a suitable GPU for fine-tuning."""
servers = self._request("GET", "/v1/marketplace")["servers"]
candidates = []
for server in servers:
if server.get("rented"):
continue
gpus = server.get("gpu_array", [])
if not any(gpu_type in g for g in gpus):
continue
price = server.get("price", {}).get("usd", {}).get("on_demand_clore", 999)
if price > max_price:
continue
candidates.append({
"id": server["id"],
"gpus": gpus,
"price": price,
"reliability": server.get("reliability", 0)
})
if not candidates:
raise Exception(f"No {gpu_type} found under ${max_price}/hr")
# Sort by reliability
candidates.sort(key=lambda x: -x["reliability"])
return candidates[0]
def provision(self, server_id: int, ssh_password: str) -> Dict:
"""Provision a server."""
order = self._request("POST", "/v1/create_order", json={
"renting_server": server_id,
"type": "on-demand",
"currency": "CLORE-Blockchain",
"image": "pytorch/pytorch:2.7.1-cuda12.8-cudnn9-devel",
"ports": {"22": "tcp"},
"env": {"NVIDIA_VISIBLE_DEVICES": "all"},
"ssh_password": ssh_password
})
order_id = order["order_id"]
print(f"📦 Order created: {order_id}")
# Wait for ready
for _ in range(120):
orders = self._request("GET", "/v1/my_orders")["orders"]
current = next((o for o in orders if o["order_id"] == order_id), None)
if current and current.get("status") == "running":
self.current_order = current
return current
time.sleep(2)
raise Exception("Timeout")
def connect_ssh(self, host: str, port: int, password: str):
"""Connect via SSH."""
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
for _ in range(5):
try:
self.ssh_client.connect(host, port=port, username="root",
password=password, timeout=30)
print(f"✅ Connected to {host}:{port}")
return
except Exception:
time.sleep(10)
raise Exception("SSH connection failed")
def run_command(self, cmd: str, stream: bool = True) -> str:
"""Run command on server."""
stdin, stdout, stderr = self.ssh_client.exec_command(cmd, get_pty=True)
output = ""
if stream:
for line in iter(stdout.readline, ""):
print(line, end="")
output += line
else:
output = stdout.read().decode()
return output
def setup_environment(self):
"""Setup fine-tuning environment."""
print("\n🔧 Setting up environment...")
commands = [
"pip install --upgrade pip",
"pip install transformers datasets accelerate peft bitsandbytes trl wandb",
"mkdir -p /workspace"
]
if self.hf_token:
commands.append(f"huggingface-cli login --token {self.hf_token}")
if self.wandb_key:
commands.append(f"wandb login {self.wandb_key}")
for cmd in commands:
self.run_command(cmd, stream=False)
print("✅ Environment ready")
def upload_files(self, files: Dict[str, str]):
"""Upload files to server."""
with SCPClient(self.ssh_client.get_transport()) as scp:
for local, remote in files.items():
scp.put(local, remote)
print(f"📤 {local} → {remote}")
def download_model(self, local_dir: str):
"""Download fine-tuned model."""
os.makedirs(local_dir, exist_ok=True)
with SCPClient(self.ssh_client.get_transport()) as scp:
scp.get("/workspace/fine-tuned-model", local_dir, recursive=True)
print(f"📥 Model downloaded to {local_dir}")
def run_finetuning(
self,
model_name: str,
dataset_name: str,
output_name: str = "fine-tuned-model",
gpu_type: str = "RTX 4090",
max_price: float = 0.50,
num_epochs: int = 3,
batch_size: int = 4,
lora_r: int = 64,
max_seq_length: int = 2048,
ssh_password: str = "HFFine123!"
):
"""Run complete fine-tuning job."""
start_time = time.time()
try:
# Find GPU
print(f"🔍 Finding {gpu_type}...")
gpu = self.find_gpu(gpu_type, max_price)
print(f" Found: Server {gpu['id']} @ ${gpu['price']:.2f}/hr")
# Provision
print(f"\n📦 Provisioning...")
order = self.provision(gpu["id"], ssh_password)
# Connect
ssh_info = order["connection"]["ssh"]
parts = ssh_info.split()
host = parts[1].split("@")[1]
port = int(parts[3]) if len(parts) > 3 else 22
self.connect_ssh(host, port, ssh_password)
# Setup
self.setup_environment()
# Create config
config = {
"model": {
"model_name": model_name,
"load_in_4bit": True,
"torch_dtype": "bfloat16"
},
"lora": {
"r": lora_r,
"lora_alpha": 16,
"lora_dropout": 0.1
},
"training": {
"dataset_name": dataset_name,
"max_seq_length": max_seq_length,
"num_epochs": num_epochs,
"per_device_batch_size": batch_size,
"output_dir": "/workspace/fine-tuned-model"
}
}
# Upload config and script
config_json = json.dumps(config, indent=2)
self.run_command(f"cat > /workspace/config.json << 'EOF'\n{config_json}\nEOF")
self.upload_files({"finetune.py": "/workspace/finetune.py"})
# Run fine-tuning
print("\n🚀 Starting fine-tuning...")
self.run_command("cd /workspace && python finetune.py config.json")
# Download results
self.download_model(f"./{output_name}")
# Summary
duration = (time.time() - start_time) / 3600
cost = duration * gpu["price"]
print("\n" + "="*50)
print("✅ Fine-tuning complete!")
print(f"⏱️ Duration: {duration:.2f} hours")
print(f"💰 Cost: ${cost:.2f}")
finally:
self.cleanup()
def cleanup(self):
"""Cleanup resources."""
if self.ssh_client:
self.ssh_client.close()
if self.current_order:
order_id = self.current_order["order_id"]
self._request("POST", "/v1/cancel_order", json={"id": order_id})
print(f"✅ Order cancelled")
def main():
api_key = os.environ.get("CLORE_API_KEY") or sys.argv[1]
hf_token = os.environ.get("HF_TOKEN")
wandb_key = os.environ.get("WANDB_API_KEY")
finetuner = HFFineTuner(api_key, hf_token, wandb_key)
finetuner.run_finetuning(
model_name="meta-llama/Llama-2-7b-hf",
dataset_name="databricks/databricks-dolly-15k",
output_name="llama2-7b-dolly",
gpu_type="RTX 4090",
max_price=0.50,
num_epochs=1,
batch_size=4,
lora_r=64
)
if __name__ == "__main__":
main()