import requests
import time
class CloreClient:
"""Clore.ai REST API 的简单 Python SDK。"""
BASE_URL = "https://api.clore.ai/v1"
def __init__(self, api_key: str):
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({"auth": api_key})
def _get(self, endpoint: str, params: dict = None) -> dict:
url = f"{self.BASE_URL}/{endpoint}"
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
if data.get("code") != 0:
raise CloreAPIError(data)
return data
def _post(self, endpoint: str, body: dict) -> dict:
url = f"{self.BASE_URL}/{endpoint}"
self.session.headers.update({"Content-type": "application/json"})
response = self.session.post(url, json=body)
response.raise_for_status()
data = response.json()
if data.get("code") != 0:
raise CloreAPIError(data)
return data
def list_servers(self) -> list:
"""获取市场上所有可用的服务器。"""
data = self._get("marketplace")
return data["servers"]
def get_server_details(self, server_id: int) -> dict:
"""获取特定服务器的竞价市场信息。"""
data = self._get("spot_marketplace", params={"market": server_id})
return data["market"]
def create_order(
self,
server_id: int,
image: str,
order_type: str = "on-demand",
currency: str = "bitcoin",
spotprice: float = None,
ports: dict = None,
ssh_password: str = None,
ssh_key: str = None,
jupyter_token: str = None,
env: dict = None,
command: str = None,
) -> dict:
"""
创建按需或竞价订单。
参数:
server_id:要租用的服务器 ID
image:Docker 镜像(例如 'cloreai/ubuntu20.04-jupyter')
order_type:'on-demand' 或 'spot'
currency:'bitcoin'(默认)
spotprice:竞价订单必需 — 每天的 BTC 价格
ports:端口转发,例如 {"22": "tcp", "8888": "http"}
ssh_password:SSH 密码(仅字母数字字符)
ssh_key:SSH 公钥
jupyter_token:Jupyter 笔记本令牌
env:环境变量字典
command:容器启动后要运行的 shell 命令
"""
if order_type == "spot" and spotprice is None:
raise ValueError("spotprice is required for spot orders")
body = {
"currency": currency,
"image": image,
"renting_server": server_id,
"type": order_type,
}
if spotprice is not None:
body["spotprice"] = spotprice
if ports:
body["ports"] = ports
if ssh_password:
body["ssh_password"] = ssh_password
if ssh_key:
body["ssh_key"] = ssh_key
if jupyter_token:
body["jupyter_token"] = jupyter_token
if env:
body["env"] = env
if command:
body["command"] = command
return self._post("create_order", body)
def get_orders(self, include_completed: bool = False) -> list:
"""获取您的活动(可选包括已完成)订单。"""
params = {"return_completed": "true"} if include_completed else {}
data = self._get("my_orders", params=params)
return data["orders"]
def cancel_order(self, order_id: int, issue: str = None) -> dict:
"""取消订单。可选地报告问题。"""
body = {"id": order_id}
if issue:
body["issue"] = issue
return self._post("cancel_order", body)
def get_wallets(self) -> list:
"""获取您的钱包和余额。"""
data = self._get("wallets")
return data["wallets"]
def get_my_servers(self) -> list:
"""获取您提供给市场的服务器。"""
data = self._get("my_servers")
return data["servers"]
class CloreAPIError(Exception):
"""当 Clore API 返回非零代码时抛出。"""
ERROR_CODES = {
0: "正常",
1: "数据库错误",
2: "无效的输入数据",
3: "无效的 API 令牌",
4: "无效的端点",
5: "超过速率限制(1 次/秒)",
6: "错误(见 error 字段)",
}
def __init__(self, response: dict):
self.code = response.get("code")
self.error = response.get("error", "")
message = self.ERROR_CODES.get(self.code, f"Unknown code {self.code}")
if self.error:
message = f"{message}: {self.error}"
super().__init__(f"Clore API error {self.code}: {message}")