# Milvus

> **面向人工智能应用的最具可扩展性的开源向量数据库 —— 为数十亿向量而构建**

Milvus 是一个为可扩展相似性搜索和人工智能应用专门构建的开源向量数据库。最初由 Zilliz 创建并捐赠给 LF AI & Data Foundation，Milvus 为包括 NVIDIA、AT\&T、IBM 和 Salesforce 在内的公司提供生产级 AI 工作负载支持。当您需要扩展到数十亿向量时，它是首选。

**GitHub：** [milvus-io/milvus](https://github.com/milvus-io/milvus) — 32K+ ⭐

***

## Milvus 与 Qdrant — 何时选择哪个

| 评估标准            | Milvus         | Qdrant       |
| --------------- | -------------- | ------------ |
| 扩展能力            | 数十亿向量          | 数亿级别         |
| 架构              | 分布式（多个服务）      | 单一二进制        |
| 安装复杂度           | 较高             | 较低           |
| GPU 索引支持        | ✅ 原生 GPU FAISS | 有限           |
| 多租户             | ✅ 分区 + 别名      | 基于集合         |
| 流式写入（ingestion） | ✅ Kafka/Pulsar | 有限           |
| 混合检索            | ✅ 稠密 + 稀疏      | ✅            |
| 云托管选项           | Zilliz Cloud   | Qdrant Cloud |

{% hint style="success" %}
**何时选择 Milvus：** 当您需要扩展到数十亿向量、需要 GPU 加速索引（如 IVF\_FLAT\_GPU），或需要企业功能（如多租户、流式写入和基于角色的访问控制）时，请选择 Milvus。
{% endhint %}

***

## Milvus 架构

Milvus 的独立模式（单服务器）包含：

* **milvus** — 主服务（proxy、query、data、index 协调器）
* **etcd** — 元数据存储和服务发现
* **MinIO** — 用于段数据的对象存储

在分布式模式（集群）中，每个组件可独立扩展。

***

## 先决条件

* 具有 GPU 租用的 Clore.ai 帐户
* Docker Compose（通常已预装）
* 基础的 Python 知识
* 16GB+ 内存（生产环境建议 32GB）

***

## 步骤 1 — 在 Clore.ai 上租用 GPU 服务器

1. 前往 [clore.ai](https://clore.ai) → **市场**
2. **推荐 GPU：** 用于 GPU 加速索引的 RTX 4090 或 A100
3. **CPU 替代方案：** 任何具有 32GB+ 内存的服务器用于基于 CPU 的索引

**最低要求：**

* CPU：8 核
* 内存：16GB（建议 32GB）
* 磁盘：50GB SSD/NVMe
* GPU：可选（仅在使用 GPU 索引类型时需要）

{% hint style="info" %}
**Milvus 中的 GPU 索引类型** （IVF\_FLAT\_GPU、IVFSQ8\_GPU）需要支持 CUDA 的 GPU，并能显著加速大集合的索引构建。如果您计划频繁为 1000 万级以上的向量构建索引，GPU 索引能很快收回成本。
{% endhint %}

***

## 步骤 2 — 部署 Milvus 独立版

**Docker 镜像：**

```
milvusdb/milvus:v2.4.0
```

Milvus 独立版需要 etcd 和 MinIO。对于最简单的安装，请使用 Docker Compose。

**端口：**

```
22
19530
```

* **端口 19530：** Milvus SDK/gRPC 端口（主要端口）
* **端口 9091：** Milvus REST API 和健康检查（内部）

**环境变量：**

```
NVIDIA_VISIBLE_DEVICES=all
NVIDIA_DRIVER_CAPABILITIES=compute,utility
```

***

## 步骤 3 — 使用 Docker Compose 进行设置

通过 SSH 登录到您的 Clore.ai 服务器并创建 compose 文件：

```bash
ssh root@<server-ip> -p <ssh-port>

# 如果未安装则安装 Docker Compose
which docker-compose || pip install docker-compose
# 或使用 Docker 插件：
docker compose version

# 创建项目目录
mkdir -p /opt/milvus && cd /opt/milvus

# 下载官方 Milvus 独立版 compose 文件
wget https://github.com/milvus-io/milvus/releases/download/v2.4.0/milvus-standalone-docker-compose.yml \
    -O docker-compose.yml

# 查看 compose 文件
cat docker-compose.yml
```

### 自定义 docker-compose.yml

```yaml
version: '3.5'

services:
  etcd：
    container_name: milvus-etcd
    image: quay.io/coreos/etcd:v3.5.5
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      - ETCD_SNAPSHOT_COUNT=50000
    volumes:
      - /opt/milvus/etcd:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
    healthcheck:
      test: ["CMD", "etcdctl", "endpoint", "health"]
      interval: 30s
      timeout: 20s
      retries: 3

  minio:
    container_name: milvus-minio
    image: minio/minio:RELEASE.2023-03-13T19-46-17Z
    environment:
      MINIO_ACCESS_KEY: minioadmin
      MINIO_SECRET_KEY: minioadmin
    ports:
      - "9001:9001"
      - "9000:9000"
    volumes:
      - /opt/milvus/minio:/minio_data
    command: minio server /minio_data --console-address ":9001"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 20s
      retries: 3

  standalone：
    container_name: milvus-standalone
    image: milvusdb/milvus:v2.4.0
    command: ["milvus", "run", "standalone"]
    security_opt：
      - seccomp:unconfined
    environment:
      ETCD_ENDPOINTS: etcd:2379
      MINIO_ADDRESS: minio:9000
    volumes:
      - /opt/milvus/milvus:/var/lib/milvus
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
      interval: 30s
      start_period: 90s
      timeout: 20s
      retries: 3
    ports:
      - "19530:19530"
      - "9091:9091"
    depends_on:
      - "etcd"
      - "minio"
    deploy:
      resources:
        reservations:
          devices:
            - capabilities: [gpu]  # 启用 GPU 访问
```

### 启动 Milvus

```bash
cd /opt/milvus
docker compose up -d

# 等待服务启动（约 60 秒）
sleep 60

# 检查所有服务是否健康
docker compose ps

# 检查 Milvus 健康状态
curl http://localhost:9091/healthz
# 预期：{"status":"ok"}

# 查看日志
docker compose logs -f standalone --tail 50
```

***

## 步骤 4 — 安装 Python 客户端

```bash
pip install pymilvus sentence-transformers numpy tqdm

# 验证连接
python3 << 'EOF'
from pymilvus import connections, utility

connections.connect("default", host="localhost", port="19530")
print(f"Milvus connected!")
print(f"Version: {utility.get_server_version()}")
EOF
```

***

## 步骤 5 — 创建集合

在 Milvus 中， **collection（集合）** 类似于数据库表。它有一个带类型字段的 schema，其中包括向量字段。

```python
from pymilvus import (
    connections,
    FieldSchema,
    CollectionSchema,
    DataType,
    Collection,
    utility
)

# 连接
connections.connect("default", host="localhost", port="19530")

# 定义 schema
fields = [
    FieldSchema(
        name="id",
        dtype=DataType.INT64,
        is_primary=True,
        auto_id=True           # 自动生成 ID
    ),
    FieldSchema(
        name="text",
        dtype=DataType.VARCHAR,
        max_length=2048        # 最大文本长度
    ),
    FieldSchema(
        name="source",
        dtype=DataType.VARCHAR,
        max_length=256
    ),
    FieldSchema(
        name="category",
        dtype=DataType.VARCHAR,
        max_length=128
    ),
    FieldSchema(
        name="year",
        dtype=DataType.INT32
    ),
    FieldSchema(
        name="embedding",
        dtype=DataType.FLOAT_VECTOR,
        dim=384                # 您的嵌入模型的维度
    )
]

schema = CollectionSchema(
    fields=fields,
    description="用于语义搜索的文档嵌入",
    enable_dynamic_field=True  # 允许添加不在 schema 中的字段
)

# 创建集合
collection_name = "documents"
if utility.has_collection(collection_name):
    utility.drop_collection(collection_name)

collection = Collection(
    name=collection_name,
    schema=schema,
    using="default"
)
print(f"Collection '{collection_name}' created!")
```

***

## 步骤 6 — 创建索引

在加载用于搜索的数据之前，创建合适的索引：

```python
from pymilvus import Collection

collection = Collection("documents")

# HNSW 索引（适用于大多数用例，低延迟）
hnsw_params = {
    "metric_type": "COSINE",     # COSINE、L2 或 IP（内积）
    "index_type": "HNSW",
    "params": {
        "M": 16,                 # HNSW 图的连通性（8-64）
        "efConstruction": 200    # 构建时的搜索深度
    }
}

# IVF_FLAT 索引（CPU，适合大集合）
ivf_params = {
    "metric_type": "COSINE",
    "index_type": "IVF_FLAT",
    "params": {
        "nlist": 1024            # 聚类数（通常取数据量平方根）
    }
}

# GPU_IVF_FLAT 索引（需要 CUDA GPU — 对批量查询最快）
gpu_ivf_params = {
    "metric_type": "L2",
    "index_type": "GPU_IVF_FLAT",
    "params": {
        "nlist": 1024,
        "cache_dataset_on_device": True
    }
}

# 在 embedding 字段上创建索引
collection.create_index(
    field_name="embedding",
    index_params=hnsw_params,
    index_name="embedding_idx"
)

# 为过滤搜索创建标量索引
collection.create_index(field_name="category", index_name="category_idx")
collection.create_index(field_name="year", index_name="year_idx")

print("索引已创建！")
collection.load()  # 加载到内存以便搜索
```

***

## 步骤 7 — 插入数据

```python
from pymilvus import Collection
from sentence_transformers import SentenceTransformer
import tqdm

collection = Collection("documents")
model = SentenceTransformer("all-MiniLM-L6-v2", device="cuda")

# 您的文档
documents = [
    {
        "text": "Milvus is an open-source vector database for scalable AI applications.",
        "source": "documentation",
        "category": "database",
        "year": 2024
    },
    {
        "text": "HNSW provides fast approximate nearest neighbor search with high recall.",
        "source": "research",
        "category": "algorithm",
        "year": 2023
    },
    {
        "text": "GPU-accelerated indexing dramatically reduces build time for large vector collections.",
        "source": "blog",
        "category": "performance",
        "year": 2024
    },
    # 在此处添加数千个文档
]

def insert_batch(docs: list, batch_size: int = 1000):
    texts = [d["text"] for d in docs]
    
    # GPU 加速的嵌入生成
    embeddings = model.encode(
        texts,
        batch_size=256,
        show_progress_bar=False,
        normalize_embeddings=True
    )
    
    # 插入到 Milvus
    data = {
        "text": [d["text"] for d in docs],
        "source": [d["source"] for d in docs],
        "category": [d["category"] for d in docs],
        "year": [d["year"] for d in docs],
        "embedding": embeddings.tolist()
    }
    
    result = collection.insert(data)
    return result.insert_count

# 分批插入
BATCH_SIZE = 1000
total_inserted = 0

for i in range(0, len(documents), BATCH_SIZE):
    batch = documents[i:i + BATCH_SIZE]
    count = insert_batch(batch)
    total_inserted += count
    print(f"Inserted {total_inserted}/{len(documents)} documents")

# Flush 以确保存储并建立索引
collection.flush()
print(f"Total inserted and flushed: {total_inserted}")
```

***

## 步骤 8 — 搜索与查询

### 基础语义搜索

```python
from pymilvus import Collection
from sentence_transformers import SentenceTransformer

collection = Collection("documents")
collection.load()

model = SentenceTransformer("all-MiniLM-L6-v2", device="cuda")

def search(query: str, top_k: int = 10):
    query_embedding = model.encode(
        [query],
        normalize_embeddings=True
    )[0].tolist()
    
    results = collection.search(
        data=[query_embedding],
        anns_field="embedding",
        param={
            "metric_type": "COSINE",
            "params": {"ef": 64}    # HNSW 搜索时参数（ef >= top_k）
        },
        limit=top_k,
        output_fields=["text", "source", "category", "year"]
    )
    
    return results[0]

# 搜索
hits = search("how does vector similarity search work")
for hit in hits:
    print(f"Score: {hit.score:.4f}")
    print(f"Text: {hit.entity.get('text')[:100]}")
    print(f"Source: {hit.entity.get('source')}")
    print()
```

### 过滤搜索

```python
from pymilvus import Collection

collection = Collection("documents")

# 使用元数据过滤进行搜索（布尔表达式）
results = collection.search(
    data=[query_embedding],
    anns_field="embedding",
    param={"metric_type": "COSINE", "params": {"ef": 64}},
    limit=10,
    expr='category == "database" and year >= 2023',  # 布尔过滤
    output_fields=["text", "category", "year"]
)
```

### 混合搜索（稠密 + 稀疏）

```python
# Milvus 2.4+ 支持稠密+稀疏混合搜索
from pymilvus import AnnSearchRequest, WeightedRanker, Collection

collection = Collection("documents")

# 稠密搜索请求
dense_req = AnnSearchRequest(
    data=[dense_embedding],
    anns_field="embedding",
    param={"metric_type": "COSINE", "params": {"ef": 64}},
    limit=20
)

# 稀疏搜索请求（需要稀疏向量字段）
sparse_req = AnnSearchRequest(
    data=[sparse_embedding],
    anns_field="sparse_embedding",
    param={"metric_type": "IP"},
    limit=20
)

# 使用互惠排名融合（Reciprocal Rank Fusion）合并
results = collection.hybrid_search(
    [dense_req, sparse_req],
    rerank=WeightedRanker(0.7, 0.3),  # 70% 稠密，30% 稀疏
    limit=10,
    output_fields=["text"]
)
```

***

## 步骤 9 — 构建 RAG 服务

```bash
pip install fastapi uvicorn openai

cat > /workspace/milvus_rag.py << 'EOF'
from fastapi import FastAPI
from pydantic import BaseModel
from pymilvus import Collection, connections
from sentence_transformers import SentenceTransformer
from openai import OpenAI
import os

app = FastAPI(title="Milvus RAG API")

# 在启动时初始化
connections.connect("default", host="localhost", port="19530")
collection = Collection("documents")
collection.load()
embedder = SentenceTransformer("all-MiniLM-L6-v2", device="cuda")
llm = OpenAI(api_key=os.environ["OPENAI_API_KEY"])

class QueryRequest(BaseModel):
    question: str
    n_results: int = 5

@app.get("/health")
async def health():
    return {"status": "ok", "vectors": collection.num_entities}

@app.post("/search")
async def semantic_search(req: QueryRequest):
    embedding = embedder.encode(
        [req.question],
        normalize_embeddings=True
    )[0].tolist()
    
    results = collection.search(
        data=[embedding],
        anns_field="embedding",
        param={"metric_type": "COSINE", "params": {"ef": 64}},
        limit=req.n_results,
        output_fields=["text", "source", "category"]
    )
    
    return {
        "results": [
            {
                "text": hit.entity.get("text"),
                "source": hit.entity.get("source"),
                "score": hit.score
            }
            for hit in results[0]
        ]
    }

@app.post("/rag")
async def rag(req: QueryRequest):
    embedding = embedder.encode([req.question], normalize_embeddings=True)[0].tolist()
    
    hits = collection.search(
        data=[embedding],
        anns_field="embedding",
        param={"metric_type": "COSINE", "params": {"ef": 64}},
        limit=req.n_results,
        output_fields=["text", "source"]
    )[0]
    
    context = "\n\n".join([
        f"[{hit.entity.get('source')}]: {hit.entity.get('text')}"
        for hit in hits if hit.score > 0.4
    ])
    
    response = llm.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "Answer based on context. Be concise."},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {req.question}"}
        ]
    )
    
    return {"answer": response.choices[0].message.content, "context_used": len(hits)}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
EOF

python3 /workspace/milvus_rag.py
```

***

## 步骤 10 — 监控与管理

```python
from pymilvus import connections, utility, Collection

connections.connect("default", host="localhost", port="19530")

# 列出所有集合
print("Collections:", utility.list_collections())

# 集合统计信息
col = Collection("documents")
print(f"Entity count: {col.num_entities:,}")
print(f"Schema: {col.schema}")

# 分区管理
col.create_partition("2024_docs")
col.create_partition("2023_docs")

# 使用分区插入
col.insert(data, partition_name="2024_docs")

# 搜索指定分区
results = col.search(
    data=[query_vec],
    anns_field="embedding",
    param={"metric_type": "COSINE", "params": {"ef": 64}},
    limit=10,
    partition_names=["2024_docs"]  # 仅搜索此分区
)
```

***

## 故障排除

### 服务未启动

```bash
# 检查容器日志
docker compose logs etcd
docker compose logs minio
docker compose logs standalone

# 检查磁盘空间
df -h /opt/milvus

# 重启服务
docker compose restart
```

### 19530 端口连接被拒绝

```bash
# 验证 Milvus 是否在监听
netstat -tlnp | grep 19530

# 检查健康状态
curl http://localhost:9091/healthz

# 给启动留出时间（90 秒）
docker compose logs standalone | tail -20
```

### 大集合索引构建超时

```python
# 增加大型索引构建的超时
from pymilvus import Collection

collection = Collection("documents")
collection.create_index(
    field_name="embedding",
    index_params=hnsw_params,
    timeout=3600  # 1 小时超时
)
```

### 高内存使用

```bash
# 在 docker-compose.yml 中配置 Milvus 内存限制
# 添加到 standalone 服务：
deploy:
  resources:
    limits：
      memory: 16g
```

***

## 索引类型选择指南

| 索引类型           | 适合用于          | 内存     | 适用场景    | 是否需要 GPU |
| -------------- | ------------- | ------ | ------- | -------- |
| FLAT           | 小规模（<1M），精确搜索 | 高      | 较慢      | 否        |
| IVF\_FLAT      | 中等（1M–10M）    | 高细节    | 快速      | 否        |
| HNSW           | 低延迟，<100M     | 高      | 适合照片级写实 | 否        |
| IVF\_SQ8       | 压缩，适合大规模      | 低      | 快速      | 否        |
| GPU\_IVF\_FLAT | 快速批量查询        | GPU+内存 | 最佳选择    | 是        |
| DISKANN        | 十亿级规模         | 低（磁盘）  | 快速      | 否        |

***

## 性能基准

| 集合大小    | 索引             | GPU      | QPS（每秒查询数） |
| ------- | -------------- | -------- | ---------- |
| 1M 向量   | HNSW           | RTX 3090 | \~8,000    |
| 10M 向量  | IVF\_FLAT      | RTX 4090 | \~2,500    |
| 10M 向量  | GPU\_IVF\_FLAT | A100     | \~12,000   |
| 100M 向量 | DISKANN        | A100     | \~1,200    |

***

## 附加资源

* [Milvus 文档](https://milvus.io/docs)
* [Milvus GitHub](https://github.com/milvus-io/milvus)
* [PyMilvus 文档](https://milvus.io/api-reference/pymilvus/v2.4.x/About.md)
* [Milvus 训练营（Bootcamp）](https://github.com/milvus-io/bootcamp) — 示例应用
* [Zilliz Cloud](https://cloud.zilliz.com/) — 托管 Milvus
* [向量数据库比较](https://milvus.io/docs/benchmark.md)
* [Attu GUI](https://github.com/zilliztech/attu) — Milvus 管理的 Web UI

***

*在 Clore.ai 上的 Milvus 是需要超越数亿向量扩展的人工智能应用的理想解决方案。结合 GPU 加速的嵌入生成，您可以以远低于托管云成本的价格构建一流的语义搜索和 RAG 系统。*

***

## Clore.ai 的 GPU 建议

| 在 Clore.ai 上的预估费用 | 开发/测试 | RTX 3090（24GB） |
| ----------------- | ----- | -------------- |
| \~$0.12/每 GPU/每小时 | 生产    | RTX 4090（24GB） |
| 生产级向量搜索           | 生产    | RTX 4090（24GB） |
| 高吞吐量嵌入            | 大规模   | A100 80GB      |

> GPU 服务器上。浏览可用 GPU 并按小时租用 — 无需承诺，提供完整的 root 访问权限。 [Clore.ai](https://clore.ai/marketplace) GPU 服务器。浏览可用 GPU 并按小时租用 — 无需承诺，提供完整的 root 访问权限。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.clore.ai/guides/guides_v2-zh/rag-yu-xiang-liang-shu-ju-ku/milvus.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
