# Procesamiento por lotes

Procese grandes cargas de trabajo de manera eficiente en las GPU de CLORE.AI.

{% hint style="success" %}
Encuentre la GPU adecuada en [Mercado de CLORE.AI](https://clore.ai/marketplace).
{% endhint %}

## Usando el SDK clore-ai para Infraestructura por Lotes (Recomendado)

El SDK oficial simplifica el aprovisionamiento de GPU por lotes con soporte asíncrono:

```python
import asyncio
from clore_ai import AsyncCloreAI

async def batch_deploy(server_ids):
    """Desplegar en múltiples servidores concurrentemente."""
    async with AsyncCloreAI() as client:
        tasks = [
            client.create_order(
                server_id=sid,
                image="cloreai/ubuntu22.04-cuda12",
                type="on-demand",
                currency="bitcoin",
                ssh_password="BatchPass123",
                ports={"22": "tcp"}
            )
            for sid in server_ids
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for sid, result in zip(server_ids, results):
            if isinstance(result, Exception):
                print(f"❌ Server {sid}: {result}")
            else:
                print(f"✅ Server {sid}: Order {result.id}")
        return results

# Desplegar en 5 servidores a la vez
asyncio.run(batch_deploy([142, 305, 891, 450, 612]))
```

→ Ver [Guía del SDK de Python](/guides/guides_v2-es/avanzado/python-sdk.md) y [Automatización CLI](/guides/guides_v2-es/avanzado/cli-automation.md) para más información.

***

## Cuándo usar el procesamiento por lotes

* Procesamiento de cientos/miles de elementos
* Conversión de grandes conjuntos de datos
* Generación de muchas imágenes/videos
* Transcripción masiva
* Preparación de datos de entrenamiento

***

## Procesamiento por lotes para LLM

### API de lotes vLLM

vLLM maneja el batching automáticamente con batching continuo:

```python
from openai import OpenAI
import asyncio
import aiohttp

client = OpenAI(base_url="http://server:8000/v1", api_key="dummy")

# Lote síncrono
def process_batch_sync(prompts):
    results = []
    for prompt in prompts:
        response = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=[{"role": "user", "content": prompt}]
        )
        results.append(response.choices[0].message.content)
    return results

# Procesar 100 prompts
prompts = [f"Resume el tema {i}" for i in range(100)]
results = process_batch_sync(prompts)
```

### Procesamiento por lotes asíncrono (Más rápido)

```python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_single(prompt):
    response = await client.chat.completions.create(
        model="meta-llama/Llama-3.1-8B-Instruct",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def process_batch_async(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_process(prompt):
        async with semaphore:
            return await process_single(prompt)

    tasks = [limited_process(p) for p in prompts]
    return await asyncio.gather(*tasks)

# Procesar 1000 prompts con 10 solicitudes concurrentes
prompts = [f"Generar descripción para el producto {i}" for i in range(1000)]
results = asyncio.run(process_batch_async(prompts, max_concurrent=10))
```

### Lote con seguimiento de progreso

```python
import asyncio
from tqdm.asyncio import tqdm
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_with_progress(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []

    async def process_one(prompt, idx):
        async with semaphore:
            response = await client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompt}]
            )
            return idx, response.choices[0].message.content

    tasks = [process_one(p, i) for i, p in enumerate(prompts)]

    for coro in tqdm.as_completed(tasks, total=len(tasks)):
        idx, result = await coro
        results.append((idx, result))

    # Ordenar por el orden original
    results.sort(key=lambda x: x[0])
    return [r[1] for r in results]

# Ejecutar
prompts = ["..." for _ in range(500)]
results = asyncio.run(process_with_progress(prompts))
```

### Guardar progreso para lotes largos

```python
import json
from pathlib import Path

def process_batch_with_checkpoint(prompts, checkpoint_file="checkpoint.json"):
    # Cargar punto de control
    checkpoint = Path(checkpoint_file)
    if checkpoint.exists():
        with open(checkpoint) as f:
            data = json.load(f)
            results = data['results']
            start_idx = data['last_completed'] + 1
        print(f"Reanudando desde el índice {start_idx}")
    else:
        results = [None] * len(prompts)
        start_idx = 0

    # Procesar lo restante
    for i in range(start_idx, len(prompts)):
        try:
            response = client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompts[i]}]
            )
            results[i] = response.choices[0].message.content

            # Guardar punto de control cada 10 elementos
            if i % 10 == 0:
                with open(checkpoint_file, 'w') as f:
                    json.dump({'results': results, 'last_completed': i}, f)
                print(f"Punto de control guardado en {i}")

        except Exception as e:
            print(f"Error en {i}: {e}")
            # Guardar punto de control en caso de error
            with open(checkpoint_file, 'w') as f:
                json.dump({'results': results, 'last_completed': i - 1}, f)
            raise

    # Limpiar punto de control al completar
    if checkpoint.exists():
        checkpoint.unlink()

    return results
```

***

## Generación de imágenes por lote

### SD WebUI por lote

```python
import requests
import base64
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

SD_API = "http://server:7860"

def generate_image(prompt, output_path):
    response = requests.post(f'{SD_API}/sdapi/v1/txt2img', json={
        'prompt': prompt,
        'negative_prompt': 'borroso, baja calidad',
        'steps': 20,
        'width': 512,
        'height': 512
    })

    image_data = base64.b64decode(response.json()['images'][0])

    with open(output_path, 'wb') as f:
        f.write(image_data)

    return output_path

def batch_generate(prompts, output_dir, max_workers=4):
    Path(output_dir).mkdir(exist_ok=True)

    tasks = [
        (prompt, f"{output_dir}/image_{i:04d}.png")
        for i, prompt in enumerate(prompts)
    ]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(tqdm(
            executor.map(lambda x: generate_image(*x), tasks),
            total=len(tasks)
        ))

    return results

# Generar 100 imágenes
prompts = [f"Un hermoso paisaje, estilo {i}" for i in range(100)]
batch_generate(prompts, "./outputs", max_workers=4)
```

### ComfyUI por lote con cola

```python
import json
import urllib.request
import time
from pathlib import Path

SERVER = "server:8188"

def queue_prompt(workflow):
    data = json.dumps({"prompt": workflow}).encode('utf-8')
    req = urllib.request.Request(f"http://{SERVER}/prompt", data=data)
    return json.loads(urllib.request.urlopen(req).read())

def get_history(prompt_id):
    with urllib.request.urlopen(f"http://{SERVER}/history/{prompt_id}") as response:
        return json.loads(response.read())

def batch_generate_comfyui(prompts, base_workflow_path, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    # Cargar flujo de trabajo base
    with open(base_workflow_path) as f:
        base_workflow = json.load(f)

    prompt_ids = []

    # Encolar todos los prompts
    for i, prompt in enumerate(prompts):
        workflow = base_workflow.copy()
        # Modificar el nodo de prompt (ajuste el ID del nodo según sea necesario)
        workflow["6"]["inputs"]["text"] = prompt
        # Establecer nombre de archivo de salida
        workflow["9"]["inputs"]["filename_prefix"] = f"batch_{i:04d}"

        result = queue_prompt(workflow)
        prompt_ids.append(result['prompt_id'])
        print(f"Encolado {i+1}/{len(prompts)}")

    # Esperar a la finalización
    print("Esperando a la generación...")
    completed = set()
    while len(completed) < len(prompt_ids):
        for pid in prompt_ids:
            if pid not in completed:
                history = get_history(pid)
                if pid in history:
                    completed.add(pid)
                    print(f"Completado {len(completed)}/{len(prompt_ids)}")
        time.sleep(1)

    print("¡Todo listo!")
```

### Procesamiento por lotes FLUX

```python
import torch
from diffusers import FluxPipeline
from pathlib import Path
from tqdm import tqdm

# Cargar el modelo una vez
pipe = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-schnell",
    torch_dtype=torch.bfloat16
)
pipe.to("cuda")

def batch_generate_flux(prompts, output_dir, batch_size=4):
    Path(output_dir).mkdir(exist_ok=True)

    for i in tqdm(range(0, len(prompts), batch_size)):
        batch_prompts = prompts[i:i + batch_size]

        # Generar lote
        images = pipe(
            batch_prompts,
            height=1024,
            width=1024,
            num_inference_steps=4,
            guidance_scale=0.0
        ).images

        # Guardar
        for j, img in enumerate(images):
            img.save(f"{output_dir}/image_{i+j:04d}.png")

# Generar 100 imágenes en lotes de 4
prompts = [f"Un {animal} en un bosque" for animal in ["gato", "perro", "zorro"] * 34]
batch_generate_flux(prompts, "./flux_outputs", batch_size=4)
```

***

## Procesamiento de audio por lotes

### Transcripción por lotes con Whisper

```python
import whisper
from pathlib import Path
from tqdm import tqdm
import json

model = whisper.load_model("large-v3")

def batch_transcribe(audio_files, output_dir):
    Path(output_dir).mkdir(exist_ok=True)
    results = {}

    for audio_path in tqdm(audio_files):
        try:
            result = model.transcribe(str(audio_path))

            results[audio_path.name] = {
                'text': result['text'],
                'language': result['language'],
                'segments': result['segments']
            }

            # Guardar transcripción individual
            output_file = Path(output_dir) / f"{audio_path.stem}.json"
            with open(output_file, 'w') as f:
                json.dump(results[audio_path.name], f, indent=2)

        except Exception as e:
            print(f"Error procesando {audio_path}: {e}")
            results[audio_path.name] = {'error': str(e)}

    # Guardar resultados combinados
    with open(f"{output_dir}/all_transcripts.json", 'w') as f:
        json.dump(results, f, indent=2)

    return results

# Transcribir todos los archivos de audio en el directorio
audio_files = list(Path("./audio").glob("*.mp3"))
results = batch_transcribe(audio_files, "./transcripts")
```

### Whisper en paralelo (múltiples GPU)

```python
import whisper
from concurrent.futures import ProcessPoolExecutor
import torch

def transcribe_on_gpu(args):
    audio_path, gpu_id = args
    torch.cuda.set_device(gpu_id)
    model = whisper.load_model("large-v3", device=f"cuda:{gpu_id}")
    result = model.transcribe(audio_path)
    return audio_path, result['text']

def parallel_transcribe(audio_files, num_gpus=2):
    # Distribuir archivos entre GPUs
    tasks = [(f, i % num_gpus) for i, f in enumerate(audio_files)]

    with ProcessPoolExecutor(max_workers=num_gpus) as executor:
        results = list(executor.map(transcribe_on_gpu, tasks))

    return dict(results)
```

***

## Procesamiento de video por lotes

### Generación de video por lotes (SVD)

```python
from diffusers import StableVideoDiffusionPipeline
from diffusers.utils import load_image, export_to_video
from pathlib import Path
from tqdm import tqdm
import torch

pipe = StableVideoDiffusionPipeline.from_pretrained(
    "stabilityai/stable-video-diffusion-img2vid-xt",
    torch_dtype=torch.float16,
    variant="fp16"
)
pipe.to("cuda")

def batch_generate_videos(image_paths, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    for img_path in tqdm(image_paths):
        try:
            image = load_image(str(img_path))
            image = image.resize((1024, 576))

            frames = pipe(
                image,
                num_frames=25,
                decode_chunk_size=8
            ).frames[0]

            output_path = Path(output_dir) / f"{img_path.stem}.mp4"
            export_to_video(frames, str(output_path), fps=7)

        except Exception as e:
            print(f"Error con {img_path}: {e}")

# Procesar todas las imágenes
images = list(Path("./input_images").glob("*.png"))
batch_generate_videos(images, "./output_videos")
```

***

## Patrones de canalización de datos

### Patrón Productor-Consumidor

```python
import asyncio
from asyncio import Queue

async def producer(queue, items):
    """Agregar elementos a la cola"""
    for item in items:
        await queue.put(item)
    # Señalar finalización
    for _ in range(NUM_WORKERS):
        await queue.put(None)

async def consumer(queue, results, worker_id):
    """Procesar elementos de la cola"""
    while True:
        item = await queue.get()
        if item is None:
            break

        try:
            result = await process_item(item)
            results.append(result)
        except Exception as e:
            print(f"Error del trabajador {worker_id}: {e}")

        queue.task_done()

async def run_pipeline(items, num_workers=5):
    queue = Queue(maxsize=100)
    results = []

    # Iniciar trabajadores
    workers = [
        asyncio.create_task(consumer(queue, results, i))
        for i in range(num_workers)
    ]

    # Iniciar productor
    await producer(queue, items)

    # Esperar a la finalización
    await asyncio.gather(*workers)

    return results

NUM_WORKERS = 5
items = list(range(1000))
results = asyncio.run(run_pipeline(items))
```

### Patrón Map-Reduce

```python
from concurrent.futures import ProcessPoolExecutor
from functools import reduce

def map_function(item):
    """Procesar un solo elemento"""
    # Su lógica de procesamiento
    return process(item)

def reduce_function(results):
    """Combinar resultados"""
    return combine(results)

def map_reduce(items, num_workers=4):
    # Fase de mapeo
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        mapped = list(executor.map(map_function, items))

    # Fase de reducción
    result = reduce_function(mapped)

    return result
```

***

## Consejos de optimización

### 1. Ajustar la concurrencia al tamaño adecuado

```python
# LLM: Coincidir con el tamaño máximo de lote de vLLM
max_concurrent = 10  # valor predeterminado de vLLM

# Generación de imágenes: 1-4 según la VRAM
max_concurrent = 2  # SD WebUI
max_concurrent = 4  # FLUX en RTX 4090

# Transcripción: 1 por GPU
max_concurrent = num_gpus
```

### 2. Ajuste del tamaño de lote

```python
# Demasiado pequeño: subutiliza la GPU
# Demasiado grande: errores OOM

# Tamaños de lote para generación de imágenes:
# RTX 3060: batch_size = 1
# RTX 3090: batch_size = 2-4
# RTX 4090: batch_size = 4-8
# A100: batch_size = 8-16
```

### 3. Gestión de memoria

```python
import gc
import torch

def clear_memory():
    gc.collect()
    torch.cuda.empty_cache()

# Llamar entre grandes lotes
for batch in batches:
    process_batch(batch)
    clear_memory()
```

### 4. Guardar resultados intermedios

```python
# Siempre haga puntos de control en trabajos de larga duración
CHECKPOINT_INTERVAL = 100

for i, item in enumerate(items):
    results.append(process(item))

    if i % CHECKPOINT_INTERVAL == 0:
        save_checkpoint(results, i)
```

***

## Optimización de costos

### Estimar antes de ejecutar

```python
def estimate_cost(num_items, time_per_item_sec, hourly_rate):
    total_hours = (num_items * time_per_item_sec) / 3600
    total_cost = total_hours * hourly_rate
    return total_hours, total_cost

# Ejemplo: 10.000 imágenes a 3 s cada una en RTX 4090
hours, cost = estimate_cost(10000, 3, 0.10)
print(f"Estimado: {hours:.1f} horas, ${cost:.2f}")
# Salida: Estimado: 8.3 horas, $0.83
```

### Use instancias spot

* 30-50% más barato
* Bueno para trabajos por lotes (interrumpibles)
* Guarde puntos de control con frecuencia

### Procesamiento fuera de hora pico

* Encole trabajos durante las horas de baja demanda
* A menudo mejor disponibilidad de GPU
* Posiblemente precios spot más bajos

***

## Próximos pasos

* [Integración de API](/guides/guides_v2-es/avanzado/api-integration.md) - Construya sus API
* [Configuración Multi-GPU](/guides/guides_v2-es/avanzado/multi-gpu-setup.md) - Escalar
* [Calculadora de costos](/guides/guides_v2-es/primeros-pasos/cost-calculator.md) - Estimar costos


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.clore.ai/guides/guides_v2-es/avanzado/batch-processing.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
