> For the complete documentation index, see [llms.txt](https://docs.clore.ai/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://docs.clore.ai/guides/guides_v2-de/erweitert/batch-processing.md).

# Batch-Verarbeitung

Verarbeiten Sie große Arbeitslasten effizient auf CLORE.AI-GPUs.

{% hint style="success" %}
Finden Sie die richtige GPU auf [CLORE.AI-Marktplatz](https://clore.ai/marketplace).
{% endhint %}

## Verwendung des clore-ai SDK für Batch-Infrastruktur (empfohlen)

Das offizielle SDK macht die Bereitstellung von Batch-GPUs mit Async-Unterstützung einfach:

```python
import asyncio
from clore_ai import AsyncCloreAI

async def batch_deploy(server_ids):
    """Auf mehreren Servern gleichzeitig bereitstellen."""
    async with AsyncCloreAI() as client:
        tasks = [
            client.create_order(
                server_id=sid,
                image="cloreai/ubuntu22.04-cuda12",
                type="on-demand",
                currency="bitcoin",
                ssh_password="BatchPass123",
                ports={"22": "tcp"}
            )
            for sid in server_ids
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for sid, result in zip(server_ids, results):
            if isinstance(result, Exception):
                print(f"❌ Server {sid}: {result}")
            else:
                print(f"✅ Server {sid}: Bestellung {result.id}")
        return results

# Auf 5 Servern gleichzeitig bereitstellen
asyncio.run(batch_deploy([142, 305, 891, 450, 612]))
```

→ Siehe [Python SDK-Anleitung](/guides/guides_v2-de/erweitert/python-sdk.md) und [CLI-Automatisierung](/guides/guides_v2-de/erweitert/cli-automation.md) für mehr.

***

## Wann Batch-Verarbeitung verwenden

* Verarbeitung von Hunderten/Tausenden von Elementen
* Konvertierung großer Datensätze
* Erzeugung vieler Bilder/Videos
* Massen-Transkription
* Vorbereitung von Trainingsdaten

***

## LLM-Batch-Verarbeitung

### vLLM Batch-API

vLLM handhabt Batching automatisch mit kontinuierlichem Batching:

```python
from openai import OpenAI
import asyncio
import aiohttp

client = OpenAI(base_url="http://server:8000/v1", api_key="dummy")

# Synchronous batch
def process_batch_sync(prompts):
    results = []
    for prompt in prompts:
        response = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=[{"role": "user", "content": prompt}]
        )
        results.append(response.choices[0].message.content)
    return results

# 100 Prompts verarbeiten
prompts = [f"Fasse Thema {i} zusammen" for i in range(100)]
results = process_batch_sync(prompts)
```

### Asynchrone Batch-Verarbeitung (schneller)

```python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_single(prompt):
    response = await client.chat.completions.create(
        model="meta-llama/Llama-3.1-8B-Instruct",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def process_batch_async(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_process(prompt):
        async with semaphore:
            return await process_single(prompt)

    tasks = [limited_process(p) for p in prompts]
    return await asyncio.gather(*tasks)

# 1000 Prompts mit 10 gleichzeitigen Anfragen verarbeiten
prompts = [f"Generiere Beschreibung für Produkt {i}" for i in range(1000)]
results = asyncio.run(process_batch_async(prompts, max_concurrent=10))
```

### Batch mit Fortschrittsverfolgung

```python
import asyncio
from tqdm.asyncio import tqdm
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_with_progress(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []

    async def process_one(prompt, idx):
        async with semaphore:
            response = await client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompt}]
            )
            return idx, response.choices[0].message.content

    tasks = [process_one(p, i) for i, p in enumerate(prompts)]

    for coro in tqdm.as_completed(tasks, total=len(tasks)):
        idx, result = await coro
        results.append((idx, result))

    # Nach ursprünglicher Reihenfolge sortieren
    results.sort(key=lambda x: x[0])
    return [r[1] for r in results]

# Ausführen
prompts = ["..." for _ in range(500)]
results = asyncio.run(process_with_progress(prompts))
```

### Fortschritt für lange Batches speichern

```python
import json
from pathlib import Path

def process_batch_with_checkpoint(prompts, checkpoint_file="checkpoint.json"):
    # Checkpoint laden
    checkpoint = Path(checkpoint_file)
    if checkpoint.exists():
        with open(checkpoint) as f:
            data = json.load(f)
            results = data['results']
            start_idx = data['last_completed'] + 1
        print(f"Fortsetzen ab Index {start_idx}")
    else:
        results = [None] * len(prompts)
        start_idx = 0

    # Rest verarbeiten
    for i in range(start_idx, len(prompts)):
        try:
            response = client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompts[i]}]
            )
            results[i] = response.choices[0].message.content

            # Checkpoint alle 10 Elemente speichern
            if i % 10 == 0:
                with open(checkpoint_file, 'w') as f:
                    json.dump({'results': results, 'last_completed': i}, f)
                print(f"Checkpoint bei {i} gespeichert")

        except Exception as e:
            print(f"Fehler bei {i}: {e}")
            # Checkpoint bei Fehler speichern
            with open(checkpoint_file, 'w') as f:
                json.dump({'results': results, 'last_completed': i - 1}, f)
            raise

    # Checkpoint nach Abschluss bereinigen
    if checkpoint.exists():
        checkpoint.unlink()

    return results
```

***

## Bildgenerierung Batch

### SD WebUI Batch

```python
import requests
import base64
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

SD_API = "http://server:7860"

def generate_image(prompt, output_path):
    response = requests.post(f'{SD_API}/sdapi/v1/txt2img', json={
        'prompt': prompt,
        'negative_prompt': 'unscharf, niedrige Qualität',
        'steps': 20,
        'width': 512,
        'height': 512
    })

    image_data = base64.b64decode(response.json()['images'][0])

    with open(output_path, 'wb') as f:
        f.write(image_data)

    return output_path

def batch_generate(prompts, output_dir, max_workers=4):
    Path(output_dir).mkdir(exist_ok=True)

    tasks = [
        (prompt, f"{output_dir}/image_{i:04d}.png")
        for i, prompt in enumerate(prompts)
    ]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(tqdm(
            executor.map(lambda x: generate_image(*x), tasks),
            total=len(tasks)
        ))

    return results

# 100 Bilder generieren
prompts = [f"Eine schöne Landschaft, Stil {i}" for i in range(100)]
batch_generate(prompts, "./outputs", max_workers=4)
```

### ComfyUI Batch mit Warteschlange

```python
import json
import urllib.request
import time
from pathlib import Path

SERVER = "server:8188"

def queue_prompt(workflow):
    data = json.dumps({"prompt": workflow}).encode('utf-8')
    req = urllib.request.Request(f"http://{SERVER}/prompt", data=data)
    return json.loads(urllib.request.urlopen(req).read())

def get_history(prompt_id):
    with urllib.request.urlopen(f"http://{SERVER}/history/{prompt_id}") as response:
        return json.loads(response.read())

def batch_generate_comfyui(prompts, base_workflow_path, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    # Basis-Workflow laden
    with open(base_workflow_path) as f:
        base_workflow = json.load(f)

    prompt_ids = []

    # Alle Prompts in die Warteschlange stellen
    for i, prompt in enumerate(prompts):
        workflow = base_workflow.copy()
        # Prompt-Knoten anpassen (Node-ID bei Bedarf anpassen)
        workflow["6"]["inputs"]["text"] = prompt
        # Ausgabedateinamen setzen
        workflow["9"]["inputs"]["filename_prefix"] = f"batch_{i:04d}"

        result = queue_prompt(workflow)
        prompt_ids.append(result['prompt_id'])
        print(f"In Warteschlange: {i+1}/{len(prompts)}")

    # Auf Abschluss warten
    print("Warten auf Generierung...")
    completed = set()
    while len(completed) < len(prompt_ids):
        for pid in prompt_ids:
            if pid not in completed:
                history = get_history(pid)
                if pid in history:
                    completed.add(pid)
                    print(f"Abgeschlossen {len(completed)}/{len(prompt_ids)}")
        time.sleep(1)

    print("Alles erledigt!")
```

### FLUX Batch-Verarbeitung

```python
import torch
from diffusers import FluxPipeline
from pathlib import Path
from tqdm import tqdm

# Modell einmal laden
pipe = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-schnell",
    torch_dtype=torch.bfloat16
)
pipe.to("cuda")

def batch_generate_flux(prompts, output_dir, batch_size=4):
    Path(output_dir).mkdir(exist_ok=True)

    for i in tqdm(range(0, len(prompts), batch_size)):
        batch_prompts = prompts[i:i + batch_size]

        # Batch generieren
        images = pipe(
            batch_prompts,
            height=1024,
            width=1024,
            num_inference_steps=4,
            guidance_scale=0.0
        ).images

        # Speichern
        for j, img in enumerate(images):
            img.save(f"{output_dir}/image_{i+j:04d}.png")

# 100 Bilder in Chargen von 4 generieren
prompts = [f"Ein {animal} in einem Wald" for animal in ["Katze", "Hund", "Fuchs"] * 34]
batch_generate_flux(prompts, "./flux_outputs", batch_size=4)
```

***

## Audio-Batch-Verarbeitung

### Whisper Batch-Transkription

```python
import whisper
from pathlib import Path
from tqdm import tqdm
import json

model = whisper.load_model("large-v3")

def batch_transcribe(audio_files, output_dir):
    Path(output_dir).mkdir(exist_ok=True)
    results = {}

    for audio_path in tqdm(audio_files):
        try:
            result = model.transcribe(str(audio_path))

            results[audio_path.name] = {
                'text': result['text'],
                'language': result['language'],
                'segments': result['segments']
            }

            # Einzelnes Transkript speichern
            output_file = Path(output_dir) / f"{audio_path.stem}.json"
            with open(output_file, 'w') as f:
                json.dump(results[audio_path.name], f, indent=2)

        except Exception as e:
            print(f"Fehler bei der Verarbeitung von {audio_path}: {e}")
            results[audio_path.name] = {'error': str(e)}

    # Kombinierte Ergebnisse speichern
    with open(f"{output_dir}/all_transcripts.json", 'w') as f:
        json.dump(results, f, indent=2)

    return results

# Alle Audiodateien im Verzeichnis transkribieren
audio_files = list(Path("./audio").glob("*.mp3"))
results = batch_transcribe(audio_files, "./transcripts")
```

### Paralleles Whisper (mehrere GPUs)

```python
import whisper
from concurrent.futures import ProcessPoolExecutor
import torch

def transcribe_on_gpu(args):
    audio_path, gpu_id = args
    torch.cuda.set_device(gpu_id)
    model = whisper.load_model("large-v3", device=f"cuda:{gpu_id}")
    result = model.transcribe(audio_path)
    return audio_path, result['text']

def parallel_transcribe(audio_files, num_gpus=2):
    # Dateien auf GPUs verteilen
    tasks = [(f, i % num_gpus) for i, f in enumerate(audio_files)]

    with ProcessPoolExecutor(max_workers=num_gpus) as executor:
        results = list(executor.map(transcribe_on_gpu, tasks))

    return dict(results)
```

***

## Video-Batch-Verarbeitung

### Batch-Video-Generierung (SVD)

```python
from diffusers import StableVideoDiffusionPipeline
from diffusers.utils import load_image, export_to_video
from pathlib import Path
from tqdm import tqdm
import torch

pipe = StableVideoDiffusionPipeline.from_pretrained(
    "stabilityai/stable-video-diffusion-img2vid-xt",
    torch_dtype=torch.float16,
    variant="fp16"
)
pipe.to("cuda")

def batch_generate_videos(image_paths, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    for img_path in tqdm(image_paths):
        try:
            image = load_image(str(img_path))
            image = image.resize((1024, 576))

            frames = pipe(
                image,
                num_frames=25,
                decode_chunk_size=8
            ).frames[0]

            output_path = Path(output_dir) / f"{img_path.stem}.mp4"
            export_to_video(frames, str(output_path), fps=7)

        except Exception as e:
            print(f"Fehler bei {img_path}: {e}")

# Alle Bilder verarbeiten
images = list(Path("./input_images").glob("*.png"))
batch_generate_videos(images, "./output_videos")
```

***

## Daten-Pipeline-Muster

### Producer-Consumer-Muster

```python
import asyncio
from asyncio import Queue

async def producer(queue, items):
    """Elemente zur Warteschlange hinzufügen"""
    for item in items:
        await queue.put(item)
    # Abschluss signalisieren
    for _ in range(NUM_WORKERS):
        await queue.put(None)

async def consumer(queue, results, worker_id):
    """Elemente aus der Warteschlange verarbeiten"""
    while True:
        item = await queue.get()
        if item is None:
            break

        try:
            result = await process_item(item)
            results.append(result)
        except Exception as e:
            print(f"Worker {worker_id} Fehler: {e}")

        queue.task_done()

async def run_pipeline(items, num_workers=5):
    queue = Queue(maxsize=100)
    results = []

    # Worker starten
    workers = [
        asyncio.create_task(consumer(queue, results, i))
        for i in range(num_workers)
    ]

    # Producer starten
    await producer(queue, items)

    # Auf Abschluss warten
    await asyncio.gather(*workers)

    return results

NUM_WORKERS = 5
items = list(range(1000))
results = asyncio.run(run_pipeline(items))
```

### Map-Reduce-Muster

```python
from concurrent.futures import ProcessPoolExecutor
from functools import reduce

def map_function(item):
    """Einzelnes Element verarbeiten"""
    # Ihre Verarbeitungslogik
    return process(item)

def reduce_function(results):
    """Ergebnisse kombinieren"""
    return combine(results)

def map_reduce(items, num_workers=4):
    # Map-Phase
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        mapped = list(executor.map(map_function, items))

    # Reduce-Phase
    result = reduce_function(mapped)

    return result
```

***

## Optimierungstipps

### 1. Geeignete Parallelität

```python
# LLM: An vLLMs maximale Batch-Größe anpassen
max_concurrent = 10  # vLLM-Standard

# Bildgenerierung: 1-4 je nach VRAM
max_concurrent = 2  # SD WebUI
max_concurrent = 4  # FLUX auf RTX 4090

# Transkription: 1 pro GPU
max_concurrent = num_gpus
```

### 2. Feinabstimmung der Batch-Größe

```python
# Zu klein: GPU wird nicht ausgelastet
# Zu groß: OOM-Fehler

# Batch-Größen bei der Bildgenerierung:
# RTX 3060: batch_size = 1
# RTX 3090: batch_size = 2-4
# RTX 4090: batch_size = 4-8
# A100: batch_size = 8-16
```

### 3. Speichermanagement

```python
import gc
import torch

def clear_memory():
    gc.collect()
    torch.cuda.empty_cache()

# Zwischen großen Batches aufrufen
for batch in batches:
    process_batch(batch)
    clear_memory()
```

### 4. Zwischenergebnisse speichern

```python
# Lange Jobs immer checkpointen
CHECKPOINT_INTERVAL = 100

for i, item in enumerate(items):
    results.append(process(item))

    if i % CHECKPOINT_INTERVAL == 0:
        save_checkpoint(results, i)
```

***

## Kostenoptimierung

### Schätzen, bevor ausgeführt wird

```python
def estimate_cost(num_items, time_per_item_sec, hourly_rate):
    total_hours = (num_items * time_per_item_sec) / 3600
    total_cost = total_hours * hourly_rate
    return total_hours, total_cost

# Beispiel: 10.000 Bilder mit 3 Sek. je auf RTX 4090
hours, cost = estimate_cost(10000, 3, 0.10)
print(f"Geschätzt: {hours:.1f} Stunden, ${cost:.2f}")
# Ausgabe: Geschätzt: 8.3 Stunden, $0.83
```

### Spot-Instanzen verwenden

* 30–50% günstiger
* Gut für Batch-Jobs (unterbrechbar)
* Checkpointing häufig durchführen

### Verarbeitung in Nebenzeiten

* Jobs während Zeiten mit geringer Nachfrage in die Warteschlange stellen
* Oft bessere GPU-Verfügbarkeit
* Möglicherweise niedrigere Spot-Preise

***

## Nächste Schritte

* [API-Integration](/guides/guides_v2-de/erweitert/api-integration.md) - Bauen Sie Ihre APIs
* [Multi-GPU-Setup](/guides/guides_v2-de/erweitert/multi-gpu-setup.md) - Hochskalieren
* [Kostenrechner](/guides/guides_v2-de/erste-schritte/cost-calculator.md) - Kosten schätzen


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.clore.ai/guides/guides_v2-de/erweitert/batch-processing.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
