# Batch Processing

Process large workloads efficiently on CLORE.AI GPUs.

{% hint style="success" %}
Find the right GPU at [CLORE.AI Marketplace](https://clore.ai/marketplace).
{% endhint %}

## Using clore-ai SDK for Batch Infrastructure (Recommended)

The official SDK makes batch GPU provisioning simple with async support:

```python
import asyncio
from clore_ai import AsyncCloreAI

async def batch_deploy(server_ids):
    """Deploy on multiple servers concurrently."""
    async with AsyncCloreAI() as client:
        tasks = [
            client.create_order(
                server_id=sid,
                image="cloreai/ubuntu22.04-cuda12",
                type="on-demand",
                currency="bitcoin",
                ssh_password="BatchPass123",
                ports={"22": "tcp"}
            )
            for sid in server_ids
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for sid, result in zip(server_ids, results):
            if isinstance(result, Exception):
                print(f"❌ Server {sid}: {result}")
            else:
                print(f"✅ Server {sid}: Order {result.id}")
        return results

# Deploy on 5 servers at once
asyncio.run(batch_deploy([142, 305, 891, 450, 612]))
```

→ See [Python SDK Guide](https://docs.clore.ai/guides/advanced/python-sdk) and [CLI Automation](https://docs.clore.ai/guides/advanced/cli-automation) for more.

***

## When to Use Batch Processing

* Processing hundreds/thousands of items
* Converting large datasets
* Generating many images/videos
* Bulk transcription
* Training data preparation

***

## LLM Batch Processing

### vLLM Batch API

vLLM handles batching automatically with continuous batching:

```python
from openai import OpenAI
import asyncio
import aiohttp

client = OpenAI(base_url="http://server:8000/v1", api_key="dummy")

# Synchronous batch
def process_batch_sync(prompts):
    results = []
    for prompt in prompts:
        response = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=[{"role": "user", "content": prompt}]
        )
        results.append(response.choices[0].message.content)
    return results

# Process 100 prompts
prompts = [f"Summarize topic {i}" for i in range(100)]
results = process_batch_sync(prompts)
```

### Async Batch Processing (Faster)

```python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_single(prompt):
    response = await client.chat.completions.create(
        model="meta-llama/Llama-3.1-8B-Instruct",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def process_batch_async(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_process(prompt):
        async with semaphore:
            return await process_single(prompt)

    tasks = [limited_process(p) for p in prompts]
    return await asyncio.gather(*tasks)

# Process 1000 prompts with 10 concurrent requests
prompts = [f"Generate description for product {i}" for i in range(1000)]
results = asyncio.run(process_batch_async(prompts, max_concurrent=10))
```

### Batch with Progress Tracking

```python
import asyncio
from tqdm.asyncio import tqdm
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_with_progress(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []

    async def process_one(prompt, idx):
        async with semaphore:
            response = await client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompt}]
            )
            return idx, response.choices[0].message.content

    tasks = [process_one(p, i) for i, p in enumerate(prompts)]

    for coro in tqdm.as_completed(tasks, total=len(tasks)):
        idx, result = await coro
        results.append((idx, result))

    # Sort by original order
    results.sort(key=lambda x: x[0])
    return [r[1] for r in results]

# Run
prompts = ["..." for _ in range(500)]
results = asyncio.run(process_with_progress(prompts))
```

### Save Progress for Long Batches

```python
import json
from pathlib import Path

def process_batch_with_checkpoint(prompts, checkpoint_file="checkpoint.json"):
    # Load checkpoint
    checkpoint = Path(checkpoint_file)
    if checkpoint.exists():
        with open(checkpoint) as f:
            data = json.load(f)
            results = data['results']
            start_idx = data['last_completed'] + 1
        print(f"Resuming from index {start_idx}")
    else:
        results = [None] * len(prompts)
        start_idx = 0

    # Process remaining
    for i in range(start_idx, len(prompts)):
        try:
            response = client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompts[i]}]
            )
            results[i] = response.choices[0].message.content

            # Save checkpoint every 10 items
            if i % 10 == 0:
                with open(checkpoint_file, 'w') as f:
                    json.dump({'results': results, 'last_completed': i}, f)
                print(f"Checkpoint saved at {i}")

        except Exception as e:
            print(f"Error at {i}: {e}")
            # Save checkpoint on error
            with open(checkpoint_file, 'w') as f:
                json.dump({'results': results, 'last_completed': i - 1}, f)
            raise

    # Clean up checkpoint on completion
    if checkpoint.exists():
        checkpoint.unlink()

    return results
```

***

## Image Generation Batch

### SD WebUI Batch

```python
import requests
import base64
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

SD_API = "http://server:7860"

def generate_image(prompt, output_path):
    response = requests.post(f'{SD_API}/sdapi/v1/txt2img', json={
        'prompt': prompt,
        'negative_prompt': 'blurry, low quality',
        'steps': 20,
        'width': 512,
        'height': 512
    })

    image_data = base64.b64decode(response.json()['images'][0])

    with open(output_path, 'wb') as f:
        f.write(image_data)

    return output_path

def batch_generate(prompts, output_dir, max_workers=4):
    Path(output_dir).mkdir(exist_ok=True)

    tasks = [
        (prompt, f"{output_dir}/image_{i:04d}.png")
        for i, prompt in enumerate(prompts)
    ]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(tqdm(
            executor.map(lambda x: generate_image(*x), tasks),
            total=len(tasks)
        ))

    return results

# Generate 100 images
prompts = [f"A beautiful landscape, style {i}" for i in range(100)]
batch_generate(prompts, "./outputs", max_workers=4)
```

### ComfyUI Batch with Queue

```python
import json
import urllib.request
import time
from pathlib import Path

SERVER = "server:8188"

def queue_prompt(workflow):
    data = json.dumps({"prompt": workflow}).encode('utf-8')
    req = urllib.request.Request(f"http://{SERVER}/prompt", data=data)
    return json.loads(urllib.request.urlopen(req).read())

def get_history(prompt_id):
    with urllib.request.urlopen(f"http://{SERVER}/history/{prompt_id}") as response:
        return json.loads(response.read())

def batch_generate_comfyui(prompts, base_workflow_path, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    # Load base workflow
    with open(base_workflow_path) as f:
        base_workflow = json.load(f)

    prompt_ids = []

    # Queue all prompts
    for i, prompt in enumerate(prompts):
        workflow = base_workflow.copy()
        # Modify prompt node (adjust node ID as needed)
        workflow["6"]["inputs"]["text"] = prompt
        # Set output filename
        workflow["9"]["inputs"]["filename_prefix"] = f"batch_{i:04d}"

        result = queue_prompt(workflow)
        prompt_ids.append(result['prompt_id'])
        print(f"Queued {i+1}/{len(prompts)}")

    # Wait for completion
    print("Waiting for generation...")
    completed = set()
    while len(completed) < len(prompt_ids):
        for pid in prompt_ids:
            if pid not in completed:
                history = get_history(pid)
                if pid in history:
                    completed.add(pid)
                    print(f"Completed {len(completed)}/{len(prompt_ids)}")
        time.sleep(1)

    print("All done!")
```

### FLUX Batch Processing

```python
import torch
from diffusers import FluxPipeline
from pathlib import Path
from tqdm import tqdm

# Load model once
pipe = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-schnell",
    torch_dtype=torch.bfloat16
)
pipe.to("cuda")

def batch_generate_flux(prompts, output_dir, batch_size=4):
    Path(output_dir).mkdir(exist_ok=True)

    for i in tqdm(range(0, len(prompts), batch_size)):
        batch_prompts = prompts[i:i + batch_size]

        # Generate batch
        images = pipe(
            batch_prompts,
            height=1024,
            width=1024,
            num_inference_steps=4,
            guidance_scale=0.0
        ).images

        # Save
        for j, img in enumerate(images):
            img.save(f"{output_dir}/image_{i+j:04d}.png")

# Generate 100 images in batches of 4
prompts = [f"A {animal} in a forest" for animal in ["cat", "dog", "fox"] * 34]
batch_generate_flux(prompts, "./flux_outputs", batch_size=4)
```

***

## Audio Batch Processing

### Whisper Batch Transcription

```python
import whisper
from pathlib import Path
from tqdm import tqdm
import json

model = whisper.load_model("large-v3")

def batch_transcribe(audio_files, output_dir):
    Path(output_dir).mkdir(exist_ok=True)
    results = {}

    for audio_path in tqdm(audio_files):
        try:
            result = model.transcribe(str(audio_path))

            results[audio_path.name] = {
                'text': result['text'],
                'language': result['language'],
                'segments': result['segments']
            }

            # Save individual transcript
            output_file = Path(output_dir) / f"{audio_path.stem}.json"
            with open(output_file, 'w') as f:
                json.dump(results[audio_path.name], f, indent=2)

        except Exception as e:
            print(f"Error processing {audio_path}: {e}")
            results[audio_path.name] = {'error': str(e)}

    # Save combined results
    with open(f"{output_dir}/all_transcripts.json", 'w') as f:
        json.dump(results, f, indent=2)

    return results

# Transcribe all audio files in directory
audio_files = list(Path("./audio").glob("*.mp3"))
results = batch_transcribe(audio_files, "./transcripts")
```

### Parallel Whisper (Multiple GPUs)

```python
import whisper
from concurrent.futures import ProcessPoolExecutor
import torch

def transcribe_on_gpu(args):
    audio_path, gpu_id = args
    torch.cuda.set_device(gpu_id)
    model = whisper.load_model("large-v3", device=f"cuda:{gpu_id}")
    result = model.transcribe(audio_path)
    return audio_path, result['text']

def parallel_transcribe(audio_files, num_gpus=2):
    # Distribute files across GPUs
    tasks = [(f, i % num_gpus) for i, f in enumerate(audio_files)]

    with ProcessPoolExecutor(max_workers=num_gpus) as executor:
        results = list(executor.map(transcribe_on_gpu, tasks))

    return dict(results)
```

***

## Video Batch Processing

### Batch Video Generation (SVD)

```python
from diffusers import StableVideoDiffusionPipeline
from diffusers.utils import load_image, export_to_video
from pathlib import Path
from tqdm import tqdm
import torch

pipe = StableVideoDiffusionPipeline.from_pretrained(
    "stabilityai/stable-video-diffusion-img2vid-xt",
    torch_dtype=torch.float16,
    variant="fp16"
)
pipe.to("cuda")

def batch_generate_videos(image_paths, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    for img_path in tqdm(image_paths):
        try:
            image = load_image(str(img_path))
            image = image.resize((1024, 576))

            frames = pipe(
                image,
                num_frames=25,
                decode_chunk_size=8
            ).frames[0]

            output_path = Path(output_dir) / f"{img_path.stem}.mp4"
            export_to_video(frames, str(output_path), fps=7)

        except Exception as e:
            print(f"Error with {img_path}: {e}")

# Process all images
images = list(Path("./input_images").glob("*.png"))
batch_generate_videos(images, "./output_videos")
```

***

## Data Pipeline Patterns

### Producer-Consumer Pattern

```python
import asyncio
from asyncio import Queue

async def producer(queue, items):
    """Add items to queue"""
    for item in items:
        await queue.put(item)
    # Signal completion
    for _ in range(NUM_WORKERS):
        await queue.put(None)

async def consumer(queue, results, worker_id):
    """Process items from queue"""
    while True:
        item = await queue.get()
        if item is None:
            break

        try:
            result = await process_item(item)
            results.append(result)
        except Exception as e:
            print(f"Worker {worker_id} error: {e}")

        queue.task_done()

async def run_pipeline(items, num_workers=5):
    queue = Queue(maxsize=100)
    results = []

    # Start workers
    workers = [
        asyncio.create_task(consumer(queue, results, i))
        for i in range(num_workers)
    ]

    # Start producer
    await producer(queue, items)

    # Wait for completion
    await asyncio.gather(*workers)

    return results

NUM_WORKERS = 5
items = list(range(1000))
results = asyncio.run(run_pipeline(items))
```

### Map-Reduce Pattern

```python
from concurrent.futures import ProcessPoolExecutor
from functools import reduce

def map_function(item):
    """Process single item"""
    # Your processing logic
    return process(item)

def reduce_function(results):
    """Combine results"""
    return combine(results)

def map_reduce(items, num_workers=4):
    # Map phase
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        mapped = list(executor.map(map_function, items))

    # Reduce phase
    result = reduce_function(mapped)

    return result
```

***

## Optimization Tips

### 1. Right-Size Concurrency

```python
# LLM: Match to vLLM's max batch size
max_concurrent = 10  # vLLM default

# Image generation: 1-4 depending on VRAM
max_concurrent = 2  # SD WebUI
max_concurrent = 4  # FLUX on RTX 4090

# Transcription: 1 per GPU
max_concurrent = num_gpus
```

### 2. Batch Size Tuning

```python
# Too small: Underutilizes GPU
# Too large: OOM errors

# Image generation batch sizes:
# RTX 3060: batch_size = 1
# RTX 3090: batch_size = 2-4
# RTX 4090: batch_size = 4-8
# A100: batch_size = 8-16
```

### 3. Memory Management

```python
import gc
import torch

def clear_memory():
    gc.collect()
    torch.cuda.empty_cache()

# Call between large batches
for batch in batches:
    process_batch(batch)
    clear_memory()
```

### 4. Save Intermediate Results

```python
# Always checkpoint long-running jobs
CHECKPOINT_INTERVAL = 100

for i, item in enumerate(items):
    results.append(process(item))

    if i % CHECKPOINT_INTERVAL == 0:
        save_checkpoint(results, i)
```

***

## Cost Optimization

### Estimate Before Running

```python
def estimate_cost(num_items, time_per_item_sec, hourly_rate):
    total_hours = (num_items * time_per_item_sec) / 3600
    total_cost = total_hours * hourly_rate
    return total_hours, total_cost

# Example: 10,000 images at 3 sec each on RTX 4090
hours, cost = estimate_cost(10000, 3, 0.10)
print(f"Estimated: {hours:.1f} hours, ${cost:.2f}")
# Output: Estimated: 8.3 hours, $0.83
```

### Use Spot Instances

* 30-50% cheaper
* Good for batch jobs (interruptible)
* Save checkpoints frequently

### Off-Peak Processing

* Queue jobs during low-demand hours
* Often better GPU availability
* Potentially lower spot prices

***

## Next Steps

* [API Integration](https://docs.clore.ai/guides/advanced/api-integration) - Build your APIs
* [Multi-GPU Setup](https://docs.clore.ai/guides/advanced/multi-gpu-setup) - Scale up
* [Cost Calculator](https://docs.clore.ai/guides/getting-started/cost-calculator) - Estimate costs
