# Batch Processing

Process large workloads efficiently on CLORE.AI GPUs.

{% hint style="success" %}
Find the right GPU at [CLORE.AI Marketplace](https://clore.ai/marketplace).
{% endhint %}

## Using clore-ai SDK for Batch Infrastructure (Recommended)

The official SDK makes batch GPU provisioning simple with async support:

```python
import asyncio
from clore_ai import AsyncCloreAI

async def batch_deploy(server_ids):
    """Deploy on multiple servers concurrently."""
    async with AsyncCloreAI() as client:
        tasks = [
            client.create_order(
                server_id=sid,
                image="cloreai/ubuntu22.04-cuda12",
                type="on-demand",
                currency="bitcoin",
                ssh_password="BatchPass123",
                ports={"22": "tcp"}
            )
            for sid in server_ids
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for sid, result in zip(server_ids, results):
            if isinstance(result, Exception):
                print(f"❌ Server {sid}: {result}")
            else:
                print(f"✅ Server {sid}: Order {result.id}")
        return results

# Deploy on 5 servers at once
asyncio.run(batch_deploy([142, 305, 891, 450, 612]))
```

→ See [Python SDK Guide](/guides/advanced/python-sdk.md) and [CLI Automation](/guides/advanced/cli-automation.md) for more.

***

## When to Use Batch Processing

* Processing hundreds/thousands of items
* Converting large datasets
* Generating many images/videos
* Bulk transcription
* Training data preparation

***

## LLM Batch Processing

### vLLM Batch API

vLLM handles batching automatically with continuous batching:

```python
from openai import OpenAI
import asyncio
import aiohttp

client = OpenAI(base_url="http://server:8000/v1", api_key="dummy")

# Synchronous batch
def process_batch_sync(prompts):
    results = []
    for prompt in prompts:
        response = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=[{"role": "user", "content": prompt}]
        )
        results.append(response.choices[0].message.content)
    return results

# Process 100 prompts
prompts = [f"Summarize topic {i}" for i in range(100)]
results = process_batch_sync(prompts)
```

### Async Batch Processing (Faster)

```python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_single(prompt):
    response = await client.chat.completions.create(
        model="meta-llama/Llama-3.1-8B-Instruct",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def process_batch_async(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_process(prompt):
        async with semaphore:
            return await process_single(prompt)

    tasks = [limited_process(p) for p in prompts]
    return await asyncio.gather(*tasks)

# Process 1000 prompts with 10 concurrent requests
prompts = [f"Generate description for product {i}" for i in range(1000)]
results = asyncio.run(process_batch_async(prompts, max_concurrent=10))
```

### Batch with Progress Tracking

```python
import asyncio
from tqdm.asyncio import tqdm
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_with_progress(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []

    async def process_one(prompt, idx):
        async with semaphore:
            response = await client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompt}]
            )
            return idx, response.choices[0].message.content

    tasks = [process_one(p, i) for i, p in enumerate(prompts)]

    for coro in tqdm.as_completed(tasks, total=len(tasks)):
        idx, result = await coro
        results.append((idx, result))

    # Sort by original order
    results.sort(key=lambda x: x[0])
    return [r[1] for r in results]

# Run
prompts = ["..." for _ in range(500)]
results = asyncio.run(process_with_progress(prompts))
```

### Save Progress for Long Batches

```python
import json
from pathlib import Path

def process_batch_with_checkpoint(prompts, checkpoint_file="checkpoint.json"):
    # Load checkpoint
    checkpoint = Path(checkpoint_file)
    if checkpoint.exists():
        with open(checkpoint) as f:
            data = json.load(f)
            results = data['results']
            start_idx = data['last_completed'] + 1
        print(f"Resuming from index {start_idx}")
    else:
        results = [None] * len(prompts)
        start_idx = 0

    # Process remaining
    for i in range(start_idx, len(prompts)):
        try:
            response = client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompts[i]}]
            )
            results[i] = response.choices[0].message.content

            # Save checkpoint every 10 items
            if i % 10 == 0:
                with open(checkpoint_file, 'w') as f:
                    json.dump({'results': results, 'last_completed': i}, f)
                print(f"Checkpoint saved at {i}")

        except Exception as e:
            print(f"Error at {i}: {e}")
            # Save checkpoint on error
            with open(checkpoint_file, 'w') as f:
                json.dump({'results': results, 'last_completed': i - 1}, f)
            raise

    # Clean up checkpoint on completion
    if checkpoint.exists():
        checkpoint.unlink()

    return results
```

***

## Image Generation Batch

### SD WebUI Batch

```python
import requests
import base64
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

SD_API = "http://server:7860"

def generate_image(prompt, output_path):
    response = requests.post(f'{SD_API}/sdapi/v1/txt2img', json={
        'prompt': prompt,
        'negative_prompt': 'blurry, low quality',
        'steps': 20,
        'width': 512,
        'height': 512
    })

    image_data = base64.b64decode(response.json()['images'][0])

    with open(output_path, 'wb') as f:
        f.write(image_data)

    return output_path

def batch_generate(prompts, output_dir, max_workers=4):
    Path(output_dir).mkdir(exist_ok=True)

    tasks = [
        (prompt, f"{output_dir}/image_{i:04d}.png")
        for i, prompt in enumerate(prompts)
    ]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(tqdm(
            executor.map(lambda x: generate_image(*x), tasks),
            total=len(tasks)
        ))

    return results

# Generate 100 images
prompts = [f"A beautiful landscape, style {i}" for i in range(100)]
batch_generate(prompts, "./outputs", max_workers=4)
```

### ComfyUI Batch with Queue

```python
import json
import urllib.request
import time
from pathlib import Path

SERVER = "server:8188"

def queue_prompt(workflow):
    data = json.dumps({"prompt": workflow}).encode('utf-8')
    req = urllib.request.Request(f"http://{SERVER}/prompt", data=data)
    return json.loads(urllib.request.urlopen(req).read())

def get_history(prompt_id):
    with urllib.request.urlopen(f"http://{SERVER}/history/{prompt_id}") as response:
        return json.loads(response.read())

def batch_generate_comfyui(prompts, base_workflow_path, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    # Load base workflow
    with open(base_workflow_path) as f:
        base_workflow = json.load(f)

    prompt_ids = []

    # Queue all prompts
    for i, prompt in enumerate(prompts):
        workflow = base_workflow.copy()
        # Modify prompt node (adjust node ID as needed)
        workflow["6"]["inputs"]["text"] = prompt
        # Set output filename
        workflow["9"]["inputs"]["filename_prefix"] = f"batch_{i:04d}"

        result = queue_prompt(workflow)
        prompt_ids.append(result['prompt_id'])
        print(f"Queued {i+1}/{len(prompts)}")

    # Wait for completion
    print("Waiting for generation...")
    completed = set()
    while len(completed) < len(prompt_ids):
        for pid in prompt_ids:
            if pid not in completed:
                history = get_history(pid)
                if pid in history:
                    completed.add(pid)
                    print(f"Completed {len(completed)}/{len(prompt_ids)}")
        time.sleep(1)

    print("All done!")
```

### FLUX Batch Processing

```python
import torch
from diffusers import FluxPipeline
from pathlib import Path
from tqdm import tqdm

# Load model once
pipe = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-schnell",
    torch_dtype=torch.bfloat16
)
pipe.to("cuda")

def batch_generate_flux(prompts, output_dir, batch_size=4):
    Path(output_dir).mkdir(exist_ok=True)

    for i in tqdm(range(0, len(prompts), batch_size)):
        batch_prompts = prompts[i:i + batch_size]

        # Generate batch
        images = pipe(
            batch_prompts,
            height=1024,
            width=1024,
            num_inference_steps=4,
            guidance_scale=0.0
        ).images

        # Save
        for j, img in enumerate(images):
            img.save(f"{output_dir}/image_{i+j:04d}.png")

# Generate 100 images in batches of 4
prompts = [f"A {animal} in a forest" for animal in ["cat", "dog", "fox"] * 34]
batch_generate_flux(prompts, "./flux_outputs", batch_size=4)
```

***

## Audio Batch Processing

### Whisper Batch Transcription

```python
import whisper
from pathlib import Path
from tqdm import tqdm
import json

model = whisper.load_model("large-v3")

def batch_transcribe(audio_files, output_dir):
    Path(output_dir).mkdir(exist_ok=True)
    results = {}

    for audio_path in tqdm(audio_files):
        try:
            result = model.transcribe(str(audio_path))

            results[audio_path.name] = {
                'text': result['text'],
                'language': result['language'],
                'segments': result['segments']
            }

            # Save individual transcript
            output_file = Path(output_dir) / f"{audio_path.stem}.json"
            with open(output_file, 'w') as f:
                json.dump(results[audio_path.name], f, indent=2)

        except Exception as e:
            print(f"Error processing {audio_path}: {e}")
            results[audio_path.name] = {'error': str(e)}

    # Save combined results
    with open(f"{output_dir}/all_transcripts.json", 'w') as f:
        json.dump(results, f, indent=2)

    return results

# Transcribe all audio files in directory
audio_files = list(Path("./audio").glob("*.mp3"))
results = batch_transcribe(audio_files, "./transcripts")
```

### Parallel Whisper (Multiple GPUs)

```python
import whisper
from concurrent.futures import ProcessPoolExecutor
import torch

def transcribe_on_gpu(args):
    audio_path, gpu_id = args
    torch.cuda.set_device(gpu_id)
    model = whisper.load_model("large-v3", device=f"cuda:{gpu_id}")
    result = model.transcribe(audio_path)
    return audio_path, result['text']

def parallel_transcribe(audio_files, num_gpus=2):
    # Distribute files across GPUs
    tasks = [(f, i % num_gpus) for i, f in enumerate(audio_files)]

    with ProcessPoolExecutor(max_workers=num_gpus) as executor:
        results = list(executor.map(transcribe_on_gpu, tasks))

    return dict(results)
```

***

## Video Batch Processing

### Batch Video Generation (SVD)

```python
from diffusers import StableVideoDiffusionPipeline
from diffusers.utils import load_image, export_to_video
from pathlib import Path
from tqdm import tqdm
import torch

pipe = StableVideoDiffusionPipeline.from_pretrained(
    "stabilityai/stable-video-diffusion-img2vid-xt",
    torch_dtype=torch.float16,
    variant="fp16"
)
pipe.to("cuda")

def batch_generate_videos(image_paths, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    for img_path in tqdm(image_paths):
        try:
            image = load_image(str(img_path))
            image = image.resize((1024, 576))

            frames = pipe(
                image,
                num_frames=25,
                decode_chunk_size=8
            ).frames[0]

            output_path = Path(output_dir) / f"{img_path.stem}.mp4"
            export_to_video(frames, str(output_path), fps=7)

        except Exception as e:
            print(f"Error with {img_path}: {e}")

# Process all images
images = list(Path("./input_images").glob("*.png"))
batch_generate_videos(images, "./output_videos")
```

***

## Data Pipeline Patterns

### Producer-Consumer Pattern

```python
import asyncio
from asyncio import Queue

async def producer(queue, items):
    """Add items to queue"""
    for item in items:
        await queue.put(item)
    # Signal completion
    for _ in range(NUM_WORKERS):
        await queue.put(None)

async def consumer(queue, results, worker_id):
    """Process items from queue"""
    while True:
        item = await queue.get()
        if item is None:
            break

        try:
            result = await process_item(item)
            results.append(result)
        except Exception as e:
            print(f"Worker {worker_id} error: {e}")

        queue.task_done()

async def run_pipeline(items, num_workers=5):
    queue = Queue(maxsize=100)
    results = []

    # Start workers
    workers = [
        asyncio.create_task(consumer(queue, results, i))
        for i in range(num_workers)
    ]

    # Start producer
    await producer(queue, items)

    # Wait for completion
    await asyncio.gather(*workers)

    return results

NUM_WORKERS = 5
items = list(range(1000))
results = asyncio.run(run_pipeline(items))
```

### Map-Reduce Pattern

```python
from concurrent.futures import ProcessPoolExecutor
from functools import reduce

def map_function(item):
    """Process single item"""
    # Your processing logic
    return process(item)

def reduce_function(results):
    """Combine results"""
    return combine(results)

def map_reduce(items, num_workers=4):
    # Map phase
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        mapped = list(executor.map(map_function, items))

    # Reduce phase
    result = reduce_function(mapped)

    return result
```

***

## Optimization Tips

### 1. Right-Size Concurrency

```python
# LLM: Match to vLLM's max batch size
max_concurrent = 10  # vLLM default

# Image generation: 1-4 depending on VRAM
max_concurrent = 2  # SD WebUI
max_concurrent = 4  # FLUX on RTX 4090

# Transcription: 1 per GPU
max_concurrent = num_gpus
```

### 2. Batch Size Tuning

```python
# Too small: Underutilizes GPU
# Too large: OOM errors

# Image generation batch sizes:
# RTX 3060: batch_size = 1
# RTX 3090: batch_size = 2-4
# RTX 4090: batch_size = 4-8
# A100: batch_size = 8-16
```

### 3. Memory Management

```python
import gc
import torch

def clear_memory():
    gc.collect()
    torch.cuda.empty_cache()

# Call between large batches
for batch in batches:
    process_batch(batch)
    clear_memory()
```

### 4. Save Intermediate Results

```python
# Always checkpoint long-running jobs
CHECKPOINT_INTERVAL = 100

for i, item in enumerate(items):
    results.append(process(item))

    if i % CHECKPOINT_INTERVAL == 0:
        save_checkpoint(results, i)
```

***

## Cost Optimization

### Estimate Before Running

```python
def estimate_cost(num_items, time_per_item_sec, hourly_rate):
    total_hours = (num_items * time_per_item_sec) / 3600
    total_cost = total_hours * hourly_rate
    return total_hours, total_cost

# Example: 10,000 images at 3 sec each on RTX 4090
hours, cost = estimate_cost(10000, 3, 0.10)
print(f"Estimated: {hours:.1f} hours, ${cost:.2f}")
# Output: Estimated: 8.3 hours, $0.83
```

### Use Spot Instances

* 30-50% cheaper
* Good for batch jobs (interruptible)
* Save checkpoints frequently

### Off-Peak Processing

* Queue jobs during low-demand hours
* Often better GPU availability
* Potentially lower spot prices

***

## Next Steps

* [API Integration](/guides/advanced/api-integration.md) - Build your APIs
* [Multi-GPU Setup](/guides/advanced/multi-gpu-setup.md) - Scale up
* [Cost Calculator](/guides/getting-started/cost-calculator.md) - Estimate costs


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.clore.ai/guides/advanced/batch-processing.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
