# Batch-Verarbeitung

Verarbeiten Sie große Arbeitslasten effizient auf CLORE.AI-GPUs.

{% hint style="success" %}
Finden Sie die richtige GPU auf [CLORE.AI-Marktplatz](https://clore.ai/marketplace).
{% endhint %}

## Verwendung des clore-ai SDK für Batch-Infrastruktur (empfohlen)

Das offizielle SDK macht die Bereitstellung von Batch-GPUs mit Async-Unterstützung einfach:

```python
import asyncio
from clore_ai import AsyncCloreAI

async def batch_deploy(server_ids):
    """Auf mehreren Servern gleichzeitig bereitstellen."""
    async with AsyncCloreAI() as client:
        tasks = [
            client.create_order(
                server_id=sid,
                image="cloreai/ubuntu22.04-cuda12",
                type="on-demand",
                currency="bitcoin",
                ssh_password="BatchPass123",
                ports={"22": "tcp"}
            )
            for sid in server_ids
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for sid, result in zip(server_ids, results):
            if isinstance(result, Exception):
                print(f"❌ Server {sid}: {result}")
            else:
                print(f"✅ Server {sid}: Bestellung {result.id}")
        return results

# Auf 5 Servern gleichzeitig bereitstellen
asyncio.run(batch_deploy([142, 305, 891, 450, 612]))
```

→ Siehe [Python SDK-Anleitung](https://docs.clore.ai/guides/guides_v2-de/fortgeschritten/python-sdk) und [CLI-Automatisierung](https://docs.clore.ai/guides/guides_v2-de/fortgeschritten/cli-automation) für mehr.

***

## Wann Batch-Verarbeitung verwenden

* Verarbeitung von Hunderten/Tausenden von Elementen
* Konvertierung großer Datensätze
* Erzeugung vieler Bilder/Videos
* Massen-Transkription
* Vorbereitung von Trainingsdaten

***

## LLM-Batch-Verarbeitung

### vLLM Batch-API

vLLM handhabt Batching automatisch mit kontinuierlichem Batching:

```python
from openai import OpenAI
import asyncio
import aiohttp

client = OpenAI(base_url="http://server:8000/v1", api_key="dummy")

# Synchronous batch
def process_batch_sync(prompts):
    results = []
    for prompt in prompts:
        response = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=[{"role": "user", "content": prompt}]
        )
        results.append(response.choices[0].message.content)
    return results

# 100 Prompts verarbeiten
prompts = [f"Fasse Thema {i} zusammen" for i in range(100)]
results = process_batch_sync(prompts)
```

### Asynchrone Batch-Verarbeitung (schneller)

```python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_single(prompt):
    response = await client.chat.completions.create(
        model="meta-llama/Llama-3.1-8B-Instruct",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def process_batch_async(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_process(prompt):
        async with semaphore:
            return await process_single(prompt)

    tasks = [limited_process(p) for p in prompts]
    return await asyncio.gather(*tasks)

# 1000 Prompts mit 10 gleichzeitigen Anfragen verarbeiten
prompts = [f"Generiere Beschreibung für Produkt {i}" for i in range(1000)]
results = asyncio.run(process_batch_async(prompts, max_concurrent=10))
```

### Batch mit Fortschrittsverfolgung

```python
import asyncio
from tqdm.asyncio import tqdm
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_with_progress(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []

    async def process_one(prompt, idx):
        async with semaphore:
            response = await client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompt}]
            )
            return idx, response.choices[0].message.content

    tasks = [process_one(p, i) for i, p in enumerate(prompts)]

    for coro in tqdm.as_completed(tasks, total=len(tasks)):
        idx, result = await coro
        results.append((idx, result))

    # Nach ursprünglicher Reihenfolge sortieren
    results.sort(key=lambda x: x[0])
    return [r[1] for r in results]

# Ausführen
prompts = ["..." for _ in range(500)]
results = asyncio.run(process_with_progress(prompts))
```

### Fortschritt für lange Batches speichern

```python
import json
from pathlib import Path

def process_batch_with_checkpoint(prompts, checkpoint_file="checkpoint.json"):
    # Checkpoint laden
    checkpoint = Path(checkpoint_file)
    if checkpoint.exists():
        with open(checkpoint) as f:
            data = json.load(f)
            results = data['results']
            start_idx = data['last_completed'] + 1
        print(f"Fortsetzen ab Index {start_idx}")
    else:
        results = [None] * len(prompts)
        start_idx = 0

    # Rest verarbeiten
    for i in range(start_idx, len(prompts)):
        try:
            response = client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompts[i]}]
            )
            results[i] = response.choices[0].message.content

            # Checkpoint alle 10 Elemente speichern
            if i % 10 == 0:
                with open(checkpoint_file, 'w') as f:
                    json.dump({'results': results, 'last_completed': i}, f)
                print(f"Checkpoint bei {i} gespeichert")

        except Exception as e:
            print(f"Fehler bei {i}: {e}")
            # Checkpoint bei Fehler speichern
            with open(checkpoint_file, 'w') as f:
                json.dump({'results': results, 'last_completed': i - 1}, f)
            raise

    # Checkpoint nach Abschluss bereinigen
    if checkpoint.exists():
        checkpoint.unlink()

    return results
```

***

## Bildgenerierung Batch

### SD WebUI Batch

```python
import requests
import base64
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

SD_API = "http://server:7860"

def generate_image(prompt, output_path):
    response = requests.post(f'{SD_API}/sdapi/v1/txt2img', json={
        'prompt': prompt,
        'negative_prompt': 'unscharf, niedrige Qualität',
        'steps': 20,
        'width': 512,
        'height': 512
    })

    image_data = base64.b64decode(response.json()['images'][0])

    with open(output_path, 'wb') as f:
        f.write(image_data)

    return output_path

def batch_generate(prompts, output_dir, max_workers=4):
    Path(output_dir).mkdir(exist_ok=True)

    tasks = [
        (prompt, f"{output_dir}/image_{i:04d}.png")
        for i, prompt in enumerate(prompts)
    ]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(tqdm(
            executor.map(lambda x: generate_image(*x), tasks),
            total=len(tasks)
        ))

    return results

# 100 Bilder generieren
prompts = [f"Eine schöne Landschaft, Stil {i}" for i in range(100)]
batch_generate(prompts, "./outputs", max_workers=4)
```

### ComfyUI Batch mit Warteschlange

```python
import json
import urllib.request
import time
from pathlib import Path

SERVER = "server:8188"

def queue_prompt(workflow):
    data = json.dumps({"prompt": workflow}).encode('utf-8')
    req = urllib.request.Request(f"http://{SERVER}/prompt", data=data)
    return json.loads(urllib.request.urlopen(req).read())

def get_history(prompt_id):
    with urllib.request.urlopen(f"http://{SERVER}/history/{prompt_id}") as response:
        return json.loads(response.read())

def batch_generate_comfyui(prompts, base_workflow_path, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    # Basis-Workflow laden
    with open(base_workflow_path) as f:
        base_workflow = json.load(f)

    prompt_ids = []

    # Alle Prompts in die Warteschlange stellen
    for i, prompt in enumerate(prompts):
        workflow = base_workflow.copy()
        # Prompt-Knoten anpassen (Node-ID bei Bedarf anpassen)
        workflow["6"]["inputs"]["text"] = prompt
        # Ausgabedateinamen setzen
        workflow["9"]["inputs"]["filename_prefix"] = f"batch_{i:04d}"

        result = queue_prompt(workflow)
        prompt_ids.append(result['prompt_id'])
        print(f"In Warteschlange: {i+1}/{len(prompts)}")

    # Auf Abschluss warten
    print("Warten auf Generierung...")
    completed = set()
    while len(completed) < len(prompt_ids):
        for pid in prompt_ids:
            if pid not in completed:
                history = get_history(pid)
                if pid in history:
                    completed.add(pid)
                    print(f"Abgeschlossen {len(completed)}/{len(prompt_ids)}")
        time.sleep(1)

    print("Alles erledigt!")
```

### FLUX Batch-Verarbeitung

```python
import torch
from diffusers import FluxPipeline
from pathlib import Path
from tqdm import tqdm

# Modell einmal laden
pipe = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-schnell",
    torch_dtype=torch.bfloat16
)
pipe.to("cuda")

def batch_generate_flux(prompts, output_dir, batch_size=4):
    Path(output_dir).mkdir(exist_ok=True)

    for i in tqdm(range(0, len(prompts), batch_size)):
        batch_prompts = prompts[i:i + batch_size]

        # Batch generieren
        images = pipe(
            batch_prompts,
            height=1024,
            width=1024,
            num_inference_steps=4,
            guidance_scale=0.0
        ).images

        # Speichern
        for j, img in enumerate(images):
            img.save(f"{output_dir}/image_{i+j:04d}.png")

# 100 Bilder in Chargen von 4 generieren
prompts = [f"Ein {animal} in einem Wald" for animal in ["Katze", "Hund", "Fuchs"] * 34]
batch_generate_flux(prompts, "./flux_outputs", batch_size=4)
```

***

## Audio-Batch-Verarbeitung

### Whisper Batch-Transkription

```python
import whisper
from pathlib import Path
from tqdm import tqdm
import json

model = whisper.load_model("large-v3")

def batch_transcribe(audio_files, output_dir):
    Path(output_dir).mkdir(exist_ok=True)
    results = {}

    for audio_path in tqdm(audio_files):
        try:
            result = model.transcribe(str(audio_path))

            results[audio_path.name] = {
                'text': result['text'],
                'language': result['language'],
                'segments': result['segments']
            }

            # Einzelnes Transkript speichern
            output_file = Path(output_dir) / f"{audio_path.stem}.json"
            with open(output_file, 'w') as f:
                json.dump(results[audio_path.name], f, indent=2)

        except Exception as e:
            print(f"Fehler bei der Verarbeitung von {audio_path}: {e}")
            results[audio_path.name] = {'error': str(e)}

    # Kombinierte Ergebnisse speichern
    with open(f"{output_dir}/all_transcripts.json", 'w') as f:
        json.dump(results, f, indent=2)

    return results

# Alle Audiodateien im Verzeichnis transkribieren
audio_files = list(Path("./audio").glob("*.mp3"))
results = batch_transcribe(audio_files, "./transcripts")
```

### Paralleles Whisper (mehrere GPUs)

```python
import whisper
from concurrent.futures import ProcessPoolExecutor
import torch

def transcribe_on_gpu(args):
    audio_path, gpu_id = args
    torch.cuda.set_device(gpu_id)
    model = whisper.load_model("large-v3", device=f"cuda:{gpu_id}")
    result = model.transcribe(audio_path)
    return audio_path, result['text']

def parallel_transcribe(audio_files, num_gpus=2):
    # Dateien auf GPUs verteilen
    tasks = [(f, i % num_gpus) for i, f in enumerate(audio_files)]

    with ProcessPoolExecutor(max_workers=num_gpus) as executor:
        results = list(executor.map(transcribe_on_gpu, tasks))

    return dict(results)
```

***

## Video-Batch-Verarbeitung

### Batch-Video-Generierung (SVD)

```python
from diffusers import StableVideoDiffusionPipeline
from diffusers.utils import load_image, export_to_video
from pathlib import Path
from tqdm import tqdm
import torch

pipe = StableVideoDiffusionPipeline.from_pretrained(
    "stabilityai/stable-video-diffusion-img2vid-xt",
    torch_dtype=torch.float16,
    variant="fp16"
)
pipe.to("cuda")

def batch_generate_videos(image_paths, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    for img_path in tqdm(image_paths):
        try:
            image = load_image(str(img_path))
            image = image.resize((1024, 576))

            frames = pipe(
                image,
                num_frames=25,
                decode_chunk_size=8
            ).frames[0]

            output_path = Path(output_dir) / f"{img_path.stem}.mp4"
            export_to_video(frames, str(output_path), fps=7)

        except Exception as e:
            print(f"Fehler bei {img_path}: {e}")

# Alle Bilder verarbeiten
images = list(Path("./input_images").glob("*.png"))
batch_generate_videos(images, "./output_videos")
```

***

## Daten-Pipeline-Muster

### Producer-Consumer-Muster

```python
import asyncio
from asyncio import Queue

async def producer(queue, items):
    """Elemente zur Warteschlange hinzufügen"""
    for item in items:
        await queue.put(item)
    # Abschluss signalisieren
    for _ in range(NUM_WORKERS):
        await queue.put(None)

async def consumer(queue, results, worker_id):
    """Elemente aus der Warteschlange verarbeiten"""
    while True:
        item = await queue.get()
        if item is None:
            break

        try:
            result = await process_item(item)
            results.append(result)
        except Exception as e:
            print(f"Worker {worker_id} Fehler: {e}")

        queue.task_done()

async def run_pipeline(items, num_workers=5):
    queue = Queue(maxsize=100)
    results = []

    # Worker starten
    workers = [
        asyncio.create_task(consumer(queue, results, i))
        for i in range(num_workers)
    ]

    # Producer starten
    await producer(queue, items)

    # Auf Abschluss warten
    await asyncio.gather(*workers)

    return results

NUM_WORKERS = 5
items = list(range(1000))
results = asyncio.run(run_pipeline(items))
```

### Map-Reduce-Muster

```python
from concurrent.futures import ProcessPoolExecutor
from functools import reduce

def map_function(item):
    """Einzelnes Element verarbeiten"""
    # Ihre Verarbeitungslogik
    return process(item)

def reduce_function(results):
    """Ergebnisse kombinieren"""
    return combine(results)

def map_reduce(items, num_workers=4):
    # Map-Phase
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        mapped = list(executor.map(map_function, items))

    # Reduce-Phase
    result = reduce_function(mapped)

    return result
```

***

## Optimierungstipps

### 1. Geeignete Parallelität

```python
# LLM: An vLLMs maximale Batch-Größe anpassen
max_concurrent = 10  # vLLM-Standard

# Bildgenerierung: 1-4 je nach VRAM
max_concurrent = 2  # SD WebUI
max_concurrent = 4  # FLUX auf RTX 4090

# Transkription: 1 pro GPU
max_concurrent = num_gpus
```

### 2. Feinabstimmung der Batch-Größe

```python
# Zu klein: GPU wird nicht ausgelastet
# Zu groß: OOM-Fehler

# Batch-Größen bei der Bildgenerierung:
# RTX 3060: batch_size = 1
# RTX 3090: batch_size = 2-4
# RTX 4090: batch_size = 4-8
# A100: batch_size = 8-16
```

### 3. Speichermanagement

```python
import gc
import torch

def clear_memory():
    gc.collect()
    torch.cuda.empty_cache()

# Zwischen großen Batches aufrufen
for batch in batches:
    process_batch(batch)
    clear_memory()
```

### 4. Zwischenergebnisse speichern

```python
# Lange Jobs immer checkpointen
CHECKPOINT_INTERVAL = 100

for i, item in enumerate(items):
    results.append(process(item))

    if i % CHECKPOINT_INTERVAL == 0:
        save_checkpoint(results, i)
```

***

## Kostenoptimierung

### Schätzen, bevor ausgeführt wird

```python
def estimate_cost(num_items, time_per_item_sec, hourly_rate):
    total_hours = (num_items * time_per_item_sec) / 3600
    total_cost = total_hours * hourly_rate
    return total_hours, total_cost

# Beispiel: 10.000 Bilder mit 3 Sek. je auf RTX 4090
hours, cost = estimate_cost(10000, 3, 0.10)
print(f"Geschätzt: {hours:.1f} Stunden, ${cost:.2f}")
# Ausgabe: Geschätzt: 8.3 Stunden, $0.83
```

### Spot-Instanzen verwenden

* 30–50% günstiger
* Gut für Batch-Jobs (unterbrechbar)
* Checkpointing häufig durchführen

### Verarbeitung in Nebenzeiten

* Jobs während Zeiten mit geringer Nachfrage in die Warteschlange stellen
* Oft bessere GPU-Verfügbarkeit
* Möglicherweise niedrigere Spot-Preise

***

## Nächste Schritte

* [API-Integration](https://docs.clore.ai/guides/guides_v2-de/fortgeschritten/api-integration) - Bauen Sie Ihre APIs
* [Multi-GPU-Setup](https://docs.clore.ai/guides/guides_v2-de/fortgeschritten/multi-gpu-setup) - Hochskalieren
* [Kostenrechner](https://docs.clore.ai/guides/guides_v2-de/erste-schritte/cost-calculator) - Kosten schätzen
