A production-ready multi-model inference routing system that:
Routes inference requests to different models across multiple Clore.ai GPUs
Supports A/B testing for model comparison
Enables canary deployments for gradual rollouts
Tracks performance metrics (latency, cost, accuracy) per model
Dynamically adjusts routing based on model performance and cost
Think of it as your own load balancer for AI models — deploy multiple versions across cheap Clore.ai GPUs and let the router decide which one handles each request.
Use cases:
Compare Llama 3.1 70B vs Llama 3.2 90B in production
Roll out a fine-tuned model to 10% of traffic first
## Step 1: Set Up the Clore Client
> 📦 **Using the standard Clore API client.** See [Clore API Client Reference](../reference/clore-client.md) for the full implementation and setup instructions. Save it as `clore_client.py` in your project.
```python
from clore_client import CloreClient
client = CloreClient(api_key="your-api-key")
# routing_strategies.py
import random
from abc import ABC, abstractmethod
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class RoutingDecision:
"""Result of routing decision."""
model: 'DeployedModel'
strategy_used: str
reason: str
class RoutingStrategy(ABC):
"""Base class for routing strategies."""
@abstractmethod
def select_model(self, models: List['DeployedModel'],
request_context: dict = None) -> Optional[RoutingDecision]:
pass
class WeightedRandomStrategy(RoutingStrategy):
"""Route based on model weights (for A/B testing)."""
def select_model(self, models: List['DeployedModel'],
request_context: dict = None) -> Optional[RoutingDecision]:
healthy = [m for m in models if m.health_status == "healthy"]
if not healthy:
return None
# Normalize weights
total_weight = sum(m.weight for m in healthy)
if total_weight == 0:
return None
# Weighted random selection
rand = random.random() * total_weight
cumulative = 0
for model in healthy:
cumulative += model.weight
if rand <= cumulative:
return RoutingDecision(
model=model,
strategy_used="weighted_random",
reason=f"Selected by weight ({model.weight}/{total_weight:.2f})"
)
return RoutingDecision(
model=healthy[-1],
strategy_used="weighted_random",
reason="Fallback selection"
)
class CanaryStrategy(RoutingStrategy):
"""Route small percentage to canary, rest to stable."""
def __init__(self, canary_percentage: float = 10.0):
self.canary_percentage = canary_percentage
def select_model(self, models: List['DeployedModel'],
request_context: dict = None) -> Optional[RoutingDecision]:
healthy = [m for m in models if m.health_status == "healthy"]
if not healthy:
return None
# Separate canary and stable
canary = [m for m in healthy if m.is_canary]
stable = [m for m in healthy if not m.is_canary]
# No canary? Use weighted random on stable
if not canary:
return WeightedRandomStrategy().select_model(stable, request_context)
# Route to canary based on percentage
if random.random() * 100 < self.canary_percentage:
return RoutingDecision(
model=random.choice(canary),
strategy_used="canary",
reason=f"Canary traffic ({self.canary_percentage}%)"
)
else:
if stable:
return RoutingDecision(
model=random.choice(stable),
strategy_used="canary",
reason=f"Stable traffic ({100 - self.canary_percentage}%)"
)
return RoutingDecision(
model=random.choice(canary),
strategy_used="canary",
reason="No stable models, using canary"
)
class LowestLatencyStrategy(RoutingStrategy):
"""Route to model with lowest average latency."""
def select_model(self, models: List['DeployedModel'],
request_context: dict = None) -> Optional[RoutingDecision]:
healthy = [m for m in models if m.health_status == "healthy"]
if not healthy:
return None
# Pick lowest latency (with minimum request count)
measured = [m for m in healthy if m.request_count >= 10]
if not measured:
# Not enough data, use weighted random
return WeightedRandomStrategy().select_model(healthy, request_context)
best = min(measured, key=lambda m: m.avg_latency_ms)
return RoutingDecision(
model=best,
strategy_used="lowest_latency",
reason=f"Lowest latency: {best.avg_latency_ms:.0f}ms"
)
class CostOptimizedStrategy(RoutingStrategy):
"""Route to cheapest model that meets quality thresholds."""
def __init__(self, max_latency_ms: float = 5000, max_error_rate: float = 0.05):
self.max_latency_ms = max_latency_ms
self.max_error_rate = max_error_rate
def select_model(self, models: List['DeployedModel'],
request_context: dict = None) -> Optional[RoutingDecision]:
healthy = [m for m in models if m.health_status == "healthy"]
if not healthy:
return None
# Filter by quality thresholds
qualified = []
for m in healthy:
if m.request_count < 10:
qualified.append(m) # Not enough data, include
continue
error_rate = m.error_count / m.request_count if m.request_count > 0 else 0
if m.avg_latency_ms <= self.max_latency_ms and error_rate <= self.max_error_rate:
qualified.append(m)
if not qualified:
# No models meet thresholds, use healthiest
return RoutingDecision(
model=min(healthy, key=lambda m: m.error_count),
strategy_used="cost_optimized",
reason="No qualified models, using lowest error rate"
)
# Pick cheapest
cheapest = min(qualified, key=lambda m: m.cost_per_hour)
return RoutingDecision(
model=cheapest,
strategy_used="cost_optimized",
reason=f"Cheapest qualified: ${cheapest.cost_per_hour:.2f}/hr"
)
class FailoverStrategy(RoutingStrategy):
"""Primary/secondary failover routing."""
def select_model(self, models: List['DeployedModel'],
request_context: dict = None) -> Optional[RoutingDecision]:
healthy = [m for m in models if m.health_status == "healthy"]
if not healthy:
return None
# Sort by priority (weight) descending
sorted_models = sorted(healthy, key=lambda m: m.weight, reverse=True)
return RoutingDecision(
model=sorted_models[0],
strategy_used="failover",
reason=f"Primary model (weight={sorted_models[0].weight})"
)
# model_registry.py
import asyncio
import aiohttp
import time
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ModelConfig:
"""Configuration for a model deployment."""
name: str
version: str
docker_image: str
gpu_types: List[str]
min_vram_gb: int = 24
max_price_usd: float = 1.0
port: int = 8000
health_endpoint: str = "/health"
inference_endpoint: str = "/v1/completions"
weight: float = 1.0
is_canary: bool = False
env: Dict[str, str] = field(default_factory=dict)
class ModelRegistry:
"""Registry for deployed models with health monitoring."""
def __init__(self):
self.models: Dict[str, DeployedModel] = {}
self.configs: Dict[str, ModelConfig] = {}
self._health_check_interval = 30 # seconds
self._health_check_task: Optional[asyncio.Task] = None
def register_config(self, config: ModelConfig):
"""Register a model configuration."""
key = f"{config.name}:{config.version}"
self.configs[key] = config
def add_deployed_model(self, model: DeployedModel):
"""Add a deployed model instance."""
key = f"{model.model_name}:{model.model_version}:{model.order_id}"
self.models[key] = model
def remove_model(self, order_id: int):
"""Remove a model by order ID."""
to_remove = [k for k, v in self.models.items() if v.order_id == order_id]
for key in to_remove:
del self.models[key]
def get_models_by_name(self, name: str) -> List[DeployedModel]:
"""Get all instances of a model by name."""
return [m for m in self.models.values() if m.model_name == name]
def get_all_models(self) -> List[DeployedModel]:
"""Get all deployed models."""
return list(self.models.values())
def get_healthy_models(self) -> List[DeployedModel]:
"""Get only healthy models."""
return [m for m in self.models.values() if m.health_status == "healthy"]
async def start_health_checks(self):
"""Start background health checking."""
self._health_check_task = asyncio.create_task(self._health_check_loop())
async def stop_health_checks(self):
"""Stop background health checking."""
if self._health_check_task:
self._health_check_task.cancel()
try:
await self._health_check_task
except asyncio.CancelledError:
pass
async def _health_check_loop(self):
"""Background loop for health checks."""
while True:
await self._check_all_health()
await asyncio.sleep(self._health_check_interval)
async def _check_all_health(self):
"""Check health of all models."""
tasks = []
for model in self.models.values():
tasks.append(self._check_model_health(model))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def _check_model_health(self, model: DeployedModel):
"""Check health of a single model."""
config = self.configs.get(f"{model.model_name}:{model.model_version}")
health_endpoint = config.health_endpoint if config else "/health"
url = f"{model.endpoint}{health_endpoint}"
try:
async with aiohttp.ClientSession() as session:
start = time.time()
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
latency = (time.time() - start) * 1000
if resp.status == 200:
model.health_status = "healthy"
# Update rolling average latency
alpha = 0.1
model.avg_latency_ms = (
alpha * latency + (1 - alpha) * model.avg_latency_ms
if model.avg_latency_ms > 0 else latency
)
else:
model.health_status = "unhealthy"
model.error_count += 1
except Exception as e:
model.health_status = "unhealthy"
model.error_count += 1
def get_metrics(self) -> Dict:
"""Get registry metrics."""
models = self.get_all_models()
healthy = [m for m in models if m.health_status == "healthy"]
return {
"total_models": len(models),
"healthy_models": len(healthy),
"unhealthy_models": len(models) - len(healthy),
"total_requests": sum(m.request_count for m in models),
"total_errors": sum(m.error_count for m in models),
"total_cost_per_hour": sum(m.cost_per_hour for m in healthy),
"models": [
{
"name": m.model_name,
"version": m.model_version,
"endpoint": m.endpoint,
"health": m.health_status,
"latency_ms": m.avg_latency_ms,
"requests": m.request_count,
"errors": m.error_count,
"cost_per_hour": m.cost_per_hour,
"is_canary": m.is_canary
}
for m in models
]
}
# model_deployer.py
import time
from typing import Optional
from clore_client import CloreClient, DeployedModel
from model_registry import ModelRegistry, ModelConfig
class ModelDeployer:
"""Deploys models to Clore.ai GPUs."""
def __init__(self, clore_client: CloreClient, registry: ModelRegistry):
self.client = clore_client
self.registry = registry
def deploy_model(self, config: ModelConfig,
ssh_password: str = None) -> Optional[DeployedModel]:
"""Deploy a model configuration to a Clore.ai GPU."""
# Register config
self.registry.register_config(config)
# Find suitable GPU
gpus = self.client.find_gpus_for_model(
gpu_types=config.gpu_types,
max_price_usd=config.max_price_usd,
min_vram_gb=config.min_vram_gb
)
if not gpus:
raise Exception(f"No suitable GPUs found for {config.name}:{config.version}")
# Pick best GPU (first in sorted list)
gpu = gpus[0]
print(f"Deploying {config.name}:{config.version} to server {gpu['id']}")
print(f" GPU: {gpu['gpus']}, Price: ${gpu['price_usd']:.2f}/hr")
# Prepare environment variables
env = {
"NVIDIA_VISIBLE_DEVICES": "all",
"MODEL_NAME": config.name,
"MODEL_VERSION": config.version,
**config.env
}
# Create order
order = self.client.create_order(
server_id=gpu["id"],
image=config.docker_image,
ports={
"22": "tcp",
str(config.port): "http"
},
env=env,
ssh_password=ssh_password or "CloreRouter123!"
)
order_id = order["order_id"]
print(f"Order created: {order_id}")
# Wait for order to be ready
print("Waiting for server to start...")
active_order = self._wait_for_ready(order_id)
if not active_order:
raise Exception("Order failed to start")
# Extract endpoint URL
connection = active_order.get("connection", {})
http_ports = connection.get("http_ports", {})
# Find the model port
endpoint = None
for port_info in http_ports.values():
if str(config.port) in str(port_info):
endpoint = port_info
break
if not endpoint:
# Fallback: construct from SSH info
ssh_info = connection.get("ssh", "")
# Parse host from "ssh root@host -p port"
if "@" in ssh_info:
host = ssh_info.split("@")[1].split()[0]
endpoint = f"http://{host}:{config.port}"
# Create deployed model
model = DeployedModel(
order_id=order_id,
server_id=gpu["id"],
model_name=config.name,
model_version=config.version,
endpoint=endpoint,
gpu_type=gpu["gpus"][0] if gpu["gpus"] else "unknown",
cost_per_hour=gpu["price_usd"],
weight=config.weight,
is_canary=config.is_canary,
health_status="unknown"
)
# Register in registry
self.registry.add_deployed_model(model)
print(f"✅ Deployed {config.name}:{config.version}")
print(f" Endpoint: {endpoint}")
print(f" Order ID: {order_id}")
return model
def _wait_for_ready(self, order_id: int, timeout: int = 180) -> Optional[dict]:
"""Wait for order to become active."""
for _ in range(timeout // 3):
orders = self.client.get_orders()
order = next((o for o in orders if o["order_id"] == order_id), None)
if order and order.get("status") == "running":
return order
time.sleep(3)
return None
def undeploy_model(self, order_id: int):
"""Undeploy a model by canceling its order."""
self.client.cancel_order(order_id)
self.registry.remove_model(order_id)
print(f"Undeployed order {order_id}")
def scale_model(self, config: ModelConfig, replicas: int,
ssh_password: str = None) -> list[DeployedModel]:
"""Deploy multiple replicas of a model."""
deployed = []
for i in range(replicas):
print(f"\nDeploying replica {i+1}/{replicas}...")
try:
model = self.deploy_model(config, ssh_password)
deployed.append(model)
except Exception as e:
print(f"Failed to deploy replica {i+1}: {e}")
return deployed
# router_service.py
import asyncio
import aiohttp
import time
import json
from typing import Optional, Dict, Any
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from clore_client import CloreClient, DeployedModel
from model_registry import ModelRegistry, ModelConfig
from model_deployer import ModelDeployer
from routing_strategies import (
RoutingStrategy, WeightedRandomStrategy, CanaryStrategy,
LowestLatencyStrategy, CostOptimizedStrategy, FailoverStrategy
)
# Prometheus metrics
REQUESTS_TOTAL = Counter(
"router_requests_total",
"Total inference requests",
["model", "version", "strategy"]
)
REQUEST_LATENCY = Histogram(
"router_request_latency_seconds",
"Request latency in seconds",
["model", "version"]
)
ACTIVE_MODELS = Gauge(
"router_active_models",
"Number of active model instances"
)
ERRORS_TOTAL = Counter(
"router_errors_total",
"Total errors",
["model", "version", "error_type"]
)
# Global state
registry = ModelRegistry()
deployer: Optional[ModelDeployer] = None
current_strategy: RoutingStrategy = WeightedRandomStrategy()
class InferenceRequest(BaseModel):
"""Inference request body."""
model: Optional[str] = None # If set, route to specific model
prompt: str
max_tokens: int = 256
temperature: float = 0.7
stream: bool = False
class ModelDeployRequest(BaseModel):
"""Request to deploy a new model."""
name: str
version: str
docker_image: str
gpu_types: list[str] = ["RTX 4090", "RTX 3090", "A100"]
min_vram_gb: int = 24
max_price_usd: float = 1.0
port: int = 8000
weight: float = 1.0
is_canary: bool = False
replicas: int = 1
env: dict = {}
class StrategyConfig(BaseModel):
"""Routing strategy configuration."""
strategy: str # weighted_random, canary, lowest_latency, cost_optimized, failover
canary_percentage: float = 10.0 # For canary strategy
max_latency_ms: float = 5000 # For cost_optimized strategy
max_error_rate: float = 0.05 # For cost_optimized strategy
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler."""
global deployer
# Initialize Clore client
import os
api_key = os.environ.get("CLORE_API_KEY", "YOUR_API_KEY")
client = CloreClient(api_key)
deployer = ModelDeployer(client, registry)
# Start health checks
await registry.start_health_checks()
yield
# Cleanup
await registry.stop_health_checks()
app = FastAPI(
title="Clore.ai Model Router",
description="Multi-model inference routing with A/B testing and canary deployments",
version="1.0.0",
lifespan=lifespan
)
@app.post("/v1/completions")
async def inference(request: InferenceRequest, raw_request: Request):
"""Route inference request to appropriate model."""
global current_strategy
# Get available models
if request.model:
models = registry.get_models_by_name(request.model)
models = [m for m in models if m.health_status == "healthy"]
else:
models = registry.get_healthy_models()
if not models:
raise HTTPException(status_code=503, detail="No healthy models available")
# Route request
decision = current_strategy.select_model(models)
if not decision:
raise HTTPException(status_code=503, detail="Routing failed")
model = decision.model
# Update metrics
REQUESTS_TOTAL.labels(
model=model.model_name,
version=model.model_version,
strategy=decision.strategy_used
).inc()
# Forward request to model
start_time = time.time()
try:
# Get config for inference endpoint
config = registry.configs.get(f"{model.model_name}:{model.model_version}")
inference_endpoint = config.inference_endpoint if config else "/v1/completions"
url = f"{model.endpoint}{inference_endpoint}"
# Prepare request body
body = {
"prompt": request.prompt,
"max_tokens": request.max_tokens,
"temperature": request.temperature,
"stream": request.stream
}
async with aiohttp.ClientSession() as session:
async with session.post(
url,
json=body,
timeout=aiohttp.ClientTimeout(total=120)
) as resp:
latency = time.time() - start_time
# Update model metrics
model.request_count += 1
alpha = 0.1
model.avg_latency_ms = (
alpha * (latency * 1000) + (1 - alpha) * model.avg_latency_ms
if model.avg_latency_ms > 0 else latency * 1000
)
REQUEST_LATENCY.labels(
model=model.model_name,
version=model.model_version
).observe(latency)
if request.stream:
# Streaming response
async def stream_generator():
async for chunk in resp.content.iter_any():
yield chunk
return StreamingResponse(
stream_generator(),
media_type="text/event-stream",
headers={
"X-Router-Model": model.model_name,
"X-Router-Version": model.model_version,
"X-Router-Strategy": decision.strategy_used
}
)
else:
# Non-streaming response
result = await resp.json()
# Add routing metadata
result["_routing"] = {
"model": model.model_name,
"version": model.model_version,
"strategy": decision.strategy_used,
"reason": decision.reason,
"latency_ms": latency * 1000
}
return result
except Exception as e:
model.error_count += 1
ERRORS_TOTAL.labels(
model=model.model_name,
version=model.model_version,
error_type=type(e).__name__
).inc()
raise HTTPException(status_code=502, detail=f"Model error: {str(e)}")
@app.post("/admin/deploy")
async def deploy_model(request: ModelDeployRequest):
"""Deploy a new model to Clore.ai."""
if not deployer:
raise HTTPException(status_code=500, detail="Deployer not initialized")
config = ModelConfig(
name=request.name,
version=request.version,
docker_image=request.docker_image,
gpu_types=request.gpu_types,
min_vram_gb=request.min_vram_gb,
max_price_usd=request.max_price_usd,
port=request.port,
weight=request.weight,
is_canary=request.is_canary,
env=request.env
)
try:
if request.replicas > 1:
models = deployer.scale_model(config, request.replicas)
return {
"status": "deployed",
"replicas": len(models),
"order_ids": [m.order_id for m in models]
}
else:
model = deployer.deploy_model(config)
return {
"status": "deployed",
"order_id": model.order_id,
"endpoint": model.endpoint
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/admin/undeploy/{order_id}")
async def undeploy_model(order_id: int):
"""Undeploy a model by order ID."""
if not deployer:
raise HTTPException(status_code=500, detail="Deployer not initialized")
try:
deployer.undeploy_model(order_id)
return {"status": "undeployed", "order_id": order_id}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/admin/strategy")
async def set_strategy(config: StrategyConfig):
"""Set the routing strategy."""
global current_strategy
strategies = {
"weighted_random": WeightedRandomStrategy(),
"canary": CanaryStrategy(canary_percentage=config.canary_percentage),
"lowest_latency": LowestLatencyStrategy(),
"cost_optimized": CostOptimizedStrategy(
max_latency_ms=config.max_latency_ms,
max_error_rate=config.max_error_rate
),
"failover": FailoverStrategy()
}
if config.strategy not in strategies:
raise HTTPException(
status_code=400,
detail=f"Unknown strategy: {config.strategy}. Available: {list(strategies.keys())}"
)
current_strategy = strategies[config.strategy]
return {
"status": "ok",
"strategy": config.strategy,
"config": config.model_dump()
}
@app.get("/admin/models")
async def list_models():
"""List all deployed models."""
return registry.get_metrics()
@app.get("/admin/models/{model_name}")
async def get_model(model_name: str):
"""Get details for a specific model."""
models = registry.get_models_by_name(model_name)
if not models:
raise HTTPException(status_code=404, detail=f"Model not found: {model_name}")
return {
"model": model_name,
"instances": [
{
"version": m.model_version,
"endpoint": m.endpoint,
"health": m.health_status,
"latency_ms": m.avg_latency_ms,
"requests": m.request_count,
"errors": m.error_count,
"cost_per_hour": m.cost_per_hour,
"weight": m.weight,
"is_canary": m.is_canary
}
for m in models
]
}
@app.patch("/admin/models/{order_id}/weight")
async def update_weight(order_id: int, weight: float):
"""Update routing weight for a model."""
model = None
for m in registry.get_all_models():
if m.order_id == order_id:
model = m
break
if not model:
raise HTTPException(status_code=404, detail=f"Order not found: {order_id}")
model.weight = weight
return {"status": "updated", "order_id": order_id, "weight": weight}
@app.get("/health")
async def health_check():
"""Router health check."""
models = registry.get_all_models()
healthy = [m for m in models if m.health_status == "healthy"]
return {
"status": "healthy" if healthy else "degraded",
"models": {
"total": len(models),
"healthy": len(healthy)
}
}
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
ACTIVE_MODELS.set(len(registry.get_healthy_models()))
return Response(
content=generate_latest(),
media_type="text/plain"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)
#!/usr/bin/env python3
"""
Complete Multi-Model Router Setup for Clore.ai
This script:
1. Deploys multiple model instances across Clore.ai GPUs
2. Sets up A/B testing between model versions
3. Enables canary deployment for new versions
4. Starts the routing service
Usage:
export CLORE_API_KEY=your_api_key
python deploy_router.py
"""
import os
import sys
import time
import requests
from clore_client import CloreClient
from model_registry import ModelRegistry, ModelConfig
from model_deployer import ModelDeployer
def check_balance(client: CloreClient, min_balance: float = 50):
"""Check wallet balance."""
print("💰 Checking balance...")
wallets = client.get_wallets()
clore = next((w["balance"] for w in wallets if "CLORE" in w["name"]), 0)
print(f" CLORE balance: {clore:.2f}")
if clore < min_balance:
print(f"❌ Insufficient balance (need {min_balance}+ CLORE)")
sys.exit(1)
return clore
def deploy_ab_test_setup(deployer: ModelDeployer):
"""Deploy A/B test setup with two model versions."""
print("\n🧪 Deploying A/B Test Setup...")
print(" Model A: Llama 70B v1.0 (control, 80% traffic)")
print(" Model B: Llama 70B v1.1 (treatment, 20% traffic)")
# Model A - Control (stable version)
config_a = ModelConfig(
name="llama-70b",
version="1.0.0",
docker_image="ghcr.io/huggingface/text-generation-inference:latest",
gpu_types=["RTX 4090", "RTX 3090"],
min_vram_gb=24,
max_price_usd=0.60,
port=8000,
weight=0.8, # 80% of traffic
is_canary=False,
env={
"MODEL_ID": "meta-llama/Llama-2-70b-chat-hf",
"QUANTIZE": "gptq"
}
)
# Model B - Treatment (new version)
config_b = ModelConfig(
name="llama-70b",
version="1.1.0",
docker_image="ghcr.io/huggingface/text-generation-inference:latest",
gpu_types=["RTX 4090", "RTX 3090"],
min_vram_gb=24,
max_price_usd=0.60,
port=8000,
weight=0.2, # 20% of traffic
is_canary=False,
env={
"MODEL_ID": "meta-llama/Llama-2-70b-chat-hf",
"QUANTIZE": "gptq",
"MAX_BATCH_SIZE": "8" # New parameter to test
}
)
models = []
try:
model_a = deployer.deploy_model(config_a)
models.append(model_a)
print(f" ✅ Model A deployed: {model_a.endpoint}")
except Exception as e:
print(f" ❌ Model A failed: {e}")
try:
model_b = deployer.deploy_model(config_b)
models.append(model_b)
print(f" ✅ Model B deployed: {model_b.endpoint}")
except Exception as e:
print(f" ❌ Model B failed: {e}")
return models
def deploy_canary_setup(deployer: ModelDeployer):
"""Deploy canary setup with stable and canary versions."""
print("\n🐤 Deploying Canary Setup...")
print(" Stable: Llama 70B v1.0 (90% traffic)")
print(" Canary: Llama 70B v2.0-beta (10% traffic)")
# Stable version - 2 replicas for redundancy
stable_config = ModelConfig(
name="llama-70b",
version="1.0.0",
docker_image="ghcr.io/huggingface/text-generation-inference:latest",
gpu_types=["RTX 4090", "RTX 3090", "A100"],
min_vram_gb=24,
max_price_usd=0.70,
port=8000,
weight=1.0,
is_canary=False,
env={
"MODEL_ID": "meta-llama/Llama-2-70b-chat-hf",
"QUANTIZE": "gptq"
}
)
# Canary version - new experimental version
canary_config = ModelConfig(
name="llama-70b",
version="2.0.0-beta",
docker_image="ghcr.io/huggingface/text-generation-inference:latest",
gpu_types=["A100", "RTX 4090"], # Prefer A100 for canary
min_vram_gb=40,
max_price_usd=1.50,
port=8000,
weight=1.0,
is_canary=True, # Mark as canary
env={
"MODEL_ID": "meta-llama/Llama-2-70b-chat-hf",
"QUANTIZE": "awq", # Testing new quantization
"MAX_CONCURRENT_REQUESTS": "16"
}
)
models = []
# Deploy 2 stable replicas
stable_models = deployer.scale_model(stable_config, replicas=2)
models.extend(stable_models)
print(f" ✅ Stable: {len(stable_models)} replicas deployed")
# Deploy 1 canary
try:
canary = deployer.deploy_model(canary_config)
models.append(canary)
print(f" ✅ Canary deployed: {canary.endpoint}")
except Exception as e:
print(f" ⚠️ Canary failed (continuing with stable only): {e}")
return models
def deploy_cost_optimized_setup(deployer: ModelDeployer):
"""Deploy cost-optimized multi-tier setup."""
print("\n💸 Deploying Cost-Optimized Setup...")
print(" Tier 1: RTX 3090 (cheap, $0.20/hr)")
print(" Tier 2: RTX 4090 (balanced, $0.40/hr)")
print(" Tier 3: A100 (premium, $1.00/hr)")
tiers = [
ModelConfig(
name="llama-70b",
version="1.0.0",
docker_image="ghcr.io/huggingface/text-generation-inference:latest",
gpu_types=["RTX 3090"],
min_vram_gb=24,
max_price_usd=0.25,
port=8000,
weight=1.0,
env={"MODEL_ID": "meta-llama/Llama-2-70b-chat-hf", "QUANTIZE": "gptq"}
),
ModelConfig(
name="llama-70b",
version="1.0.0",
docker_image="ghcr.io/huggingface/text-generation-inference:latest",
gpu_types=["RTX 4090"],
min_vram_gb=24,
max_price_usd=0.50,
port=8000,
weight=1.0,
env={"MODEL_ID": "meta-llama/Llama-2-70b-chat-hf", "QUANTIZE": "gptq"}
),
ModelConfig(
name="llama-70b",
version="1.0.0",
docker_image="ghcr.io/huggingface/text-generation-inference:latest",
gpu_types=["A100"],
min_vram_gb=40,
max_price_usd=1.20,
port=8000,
weight=1.0,
env={"MODEL_ID": "meta-llama/Llama-2-70b-chat-hf", "QUANTIZE": "gptq"}
),
]
models = []
for i, config in enumerate(tiers):
try:
model = deployer.deploy_model(config)
models.append(model)
print(f" ✅ Tier {i+1} deployed: ${model.cost_per_hour:.2f}/hr")
except Exception as e:
print(f" ⚠️ Tier {i+1} unavailable: {e}")
return models
def main():
api_key = os.environ.get("CLORE_API_KEY")
if not api_key:
print("❌ Set CLORE_API_KEY environment variable")
sys.exit(1)
# Initialize
client = CloreClient(api_key)
registry = ModelRegistry()
deployer = ModelDeployer(client, registry)
# Check balance
check_balance(client, min_balance=50)
# Choose deployment type
print("\n📋 Deployment Options:")
print(" 1. A/B Test Setup")
print(" 2. Canary Deployment")
print(" 3. Cost-Optimized Multi-Tier")
choice = input("\nSelect option (1-3): ").strip()
if choice == "1":
models = deploy_ab_test_setup(deployer)
strategy = "weighted_random"
elif choice == "2":
models = deploy_canary_setup(deployer)
strategy = "canary"
elif choice == "3":
models = deploy_cost_optimized_setup(deployer)
strategy = "cost_optimized"
else:
print("Invalid option")
sys.exit(1)
if not models:
print("\n❌ No models deployed successfully")
sys.exit(1)
# Summary
print("\n" + "="*50)
print("📊 Deployment Summary")
print("="*50)
total_cost = sum(m.cost_per_hour for m in models)
print(f"Models deployed: {len(models)}")
print(f"Total cost: ${total_cost:.2f}/hr (${total_cost * 24:.2f}/day)")
print(f"Routing strategy: {strategy}")
print("\nModel endpoints:")
for m in models:
canary_tag = " [CANARY]" if m.is_canary else ""
print(f" - {m.model_name}:{m.model_version}{canary_tag}")
print(f" Endpoint: {m.endpoint}")
print(f" Cost: ${m.cost_per_hour:.2f}/hr, Weight: {m.weight}")
print("\n🚀 Starting router service...")
print(" Run: uvicorn router_service:app --host 0.0.0.0 --port 8080")
print("\n📡 API Endpoints:")
print(" POST /v1/completions - Inference (routed)")
print(" GET /admin/models - List models")
print(" POST /admin/strategy - Change routing strategy")
print(" GET /metrics - Prometheus metrics")
# Cleanup prompt
input("\n⏸️ Press Enter to cleanup and cancel all orders...")
print("\n🧹 Cleaning up...")
for model in models:
try:
deployer.undeploy_model(model.order_id)
except Exception as e:
print(f" Warning: Failed to cancel {model.order_id}: {e}")
print("✅ Done!")
if __name__ == "__main__":
main()
# 1. Set API key
export CLORE_API_KEY=your_api_key
# 2. Deploy models (interactive)
python deploy_router.py
# 3. Start router service (in separate terminal)
uvicorn router_service:app --host 0.0.0.0 --port 8080
# 4. Test inference
curl -X POST http://localhost:8080/v1/completions \
-H "Content-Type: application/json" \
-d '{"prompt": "What is AI?", "max_tokens": 100}'
# 5. Check routing info
curl http://localhost:8080/admin/models
# 6. Change strategy to canary (10% to new version)
curl -X POST http://localhost:8080/admin/strategy \
-H "Content-Type: application/json" \
-d '{"strategy": "canary", "canary_percentage": 10}'
# 7. Monitor metrics
curl http://localhost:8080/metrics