> For the complete documentation index, see [llms.txt](https://docs.clore.ai/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://docs.clore.ai/guides/guides_v2-fr/avance/batch-processing.md).

# Traitement par lot

Traitez de grandes charges de travail efficacement sur les GPU CLORE.AI.

{% hint style="success" %}
Trouvez le GPU adapté sur [CLORE.AI Marketplace](https://clore.ai/marketplace).
{% endhint %}

## Utilisation du SDK clore-ai pour l'infrastructure par lots (recommandé)

Le SDK officiel simplifie la fourniture de GPU en batch avec prise en charge asynchrone :

```python
import asyncio
from clore_ai import AsyncCloreAI

async def batch_deploy(server_ids):
    """Déployer sur plusieurs serveurs simultanément."""
    async with AsyncCloreAI() as client:
        tasks = [
            client.create_order(
                server_id=sid,
                image="cloreai/ubuntu22.04-cuda12",
                type="on-demand",
                currency="bitcoin",
                ssh_password="BatchPass123",
                ports={"22": "tcp"}
            )
            for sid in server_ids
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for sid, result in zip(server_ids, results):
            if isinstance(result, Exception):
                print(f"❌ Server {sid}: {result}")
            else:
                print(f"✅ Server {sid}: Order {result.id}")
        return results

# Déployer sur 5 serveurs en même temps
asyncio.run(batch_deploy([142, 305, 891, 450, 612]))
```

→ Voir [Guide du SDK Python](/guides/guides_v2-fr/avance/python-sdk.md) et [Automatisation CLI](/guides/guides_v2-fr/avance/cli-automation.md) pour plus d'informations.

***

## Quand utiliser le traitement par lots

* Traitement de centaines/milliers d'éléments
* Conversion de grands jeux de données
* Génération de nombreuses images/vidéos
* Transcription en masse
* Préparation des données d'entraînement

***

## Traitement par lots pour LLM

### API de batch vLLM

vLLM gère automatiquement le batching avec batching continu :

```python
from openai import OpenAI
import asyncio
import aiohttp

client = OpenAI(base_url="http://server:8000/v1", api_key="dummy")

# Batch synchrone
def process_batch_sync(prompts):
    results = []
    for prompt in prompts:
        response = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=[{"role": "user", "content": prompt}]
        )
        results.append(response.choices[0].message.content)
    return results

# Traiter 100 prompts
prompts = [f"Résumez le sujet {i}" for i in range(100)]
results = process_batch_sync(prompts)
```

### Traitement asynchrone par lots (plus rapide)

```python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_single(prompt):
    response = await client.chat.completions.create(
        model="meta-llama/Llama-3.1-8B-Instruct",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def process_batch_async(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_process(prompt):
        async with semaphore:
            return await process_single(prompt)

    tasks = [limited_process(p) for p in prompts]
    return await asyncio.gather(*tasks)

# Traiter 1000 prompts avec 10 requêtes concurrentes
prompts = [f"Générer une description pour le produit {i}" for i in range(1000)]
results = asyncio.run(process_batch_async(prompts, max_concurrent=10))
```

### Batch avec suivi de progression

```python
import asyncio
from tqdm.asyncio import tqdm
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_with_progress(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []

    async def process_one(prompt, idx):
        async with semaphore:
            response = await client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompt}]
            )
            return idx, response.choices[0].message.content

    tasks = [process_one(p, i) for i, p in enumerate(prompts)]

    for coro in tqdm.as_completed(tasks, total=len(tasks)):
        idx, result = await coro
        results.append((idx, result))

    # Trier par ordre d'origine
    results.sort(key=lambda x: x[0])
    return [r[1] for r in results]

# Exécuter
prompts = ["..." for _ in range(500)]
results = asyncio.run(process_with_progress(prompts))
```

### Enregistrer la progression pour les longs batches

```python
import json
from pathlib import Path

def process_batch_with_checkpoint(prompts, checkpoint_file="checkpoint.json"):
    # Charger le checkpoint
    checkpoint = Path(checkpoint_file)
    if checkpoint.exists():
        with open(checkpoint) as f:
            data = json.load(f)
            results = data['results']
            start_idx = data['last_completed'] + 1
        print(f"Reprise à partir de l'index {start_idx}")
    else:
        results = [None] * len(prompts)
        start_idx = 0

    # Traiter le restant
    for i in range(start_idx, len(prompts)):
        try:
            response = client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompts[i]}]
            )
            results[i] = response.choices[0].message.content

            # Sauvegarder le checkpoint toutes les 10 unités
            if i % 10 == 0:
                with open(checkpoint_file, 'w') as f:
                    json.dump({'results': results, 'last_completed': i}, f)
                print(f"Checkpoint sauvegardé à {i}")

        except Exception as e:
            print(f"Erreur à {i} : {e}")
            # Sauvegarder le checkpoint en cas d'erreur
            with open(checkpoint_file, 'w') as f:
                json.dump({'results': results, 'last_completed': i - 1}, f)
            raise

    # Nettoyer le checkpoint à la fin
    if checkpoint.exists():
        checkpoint.unlink()

    return results
```

***

## Génération d'images en batch

### SD WebUI Batch

```python
import requests
import base64
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

SD_API = "http://server:7860"

def generate_image(prompt, output_path):
    response = requests.post(f'{SD_API}/sdapi/v1/txt2img', json={
        'prompt': prompt,
        'negative_prompt': 'flou, basse qualité',
        'steps': 20,
        'width': 512,
        'height': 512
    })

    image_data = base64.b64decode(response.json()['images'][0])

    with open(output_path, 'wb') as f:
        f.write(image_data)

    return output_path

def batch_generate(prompts, output_dir, max_workers=4):
    Path(output_dir).mkdir(exist_ok=True)

    tasks = [
        (prompt, f"{output_dir}/image_{i:04d}.png")
        for i, prompt in enumerate(prompts)
    ]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(tqdm(
            executor.map(lambda x: generate_image(*x), tasks),
            total=len(tasks)
        ))

    return results

# Générer 100 images
prompts = [f"Un beau paysage, style {i}" for i in range(100)]
batch_generate(prompts, "./outputs", max_workers=4)
```

### ComfyUI Batch avec file d'attente

```python
import json
import urllib.request
import time
from pathlib import Path

SERVER = "server:8188"

def queue_prompt(workflow):
    data = json.dumps({"prompt": workflow}).encode('utf-8')
    req = urllib.request.Request(f"http://{SERVER}/prompt", data=data)
    return json.loads(urllib.request.urlopen(req).read())

def get_history(prompt_id):
    with urllib.request.urlopen(f"http://{SERVER}/history/{prompt_id}") as response:
        return json.loads(response.read())

def batch_generate_comfyui(prompts, base_workflow_path, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    # Charger le workflow de base
    with open(base_workflow_path) as f:
        base_workflow = json.load(f)

    prompt_ids = []

    # Mettre en file tous les prompts
    for i, prompt in enumerate(prompts):
        workflow = base_workflow.copy()
        # Modifier le nœud prompt (ajuster l'ID du nœud si nécessaire)
        workflow["6"]["inputs"]["text"] = prompt
        # Définir le nom de fichier de sortie
        workflow["9"]["inputs"]["filename_prefix"] = f"batch_{i:04d}"

        result = queue_prompt(workflow)
        prompt_ids.append(result['prompt_id'])
        print(f"Enfilé {i+1}/{len(prompts)}")

    # Attendre la fin
    print("Attente de la génération...")
    completed = set()
    while len(completed) < len(prompt_ids):
        for pid in prompt_ids:
            if pid not in completed:
                history = get_history(pid)
                if pid in history:
                    completed.add(pid)
                    print(f"Terminé {len(completed)}/{len(prompt_ids)}")
        time.sleep(1)

    print("Tout est terminé !")
```

### Traitement par lots FLUX

```python
import torch
from diffusers import FluxPipeline
from pathlib import Path
from tqdm import tqdm

# Charger le modèle une seule fois
pipe = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-schnell",
    torch_dtype=torch.bfloat16
)
pipe.to("cuda")

def batch_generate_flux(prompts, output_dir, batch_size=4):
    Path(output_dir).mkdir(exist_ok=True)

    for i in tqdm(range(0, len(prompts), batch_size)):
        batch_prompts = prompts[i:i + batch_size]

        # Générer le batch
        images = pipe(
            batch_prompts,
            height=1024,
            width=1024,
            num_inference_steps=4,
            guidance_scale=0.0
        ).images

        # Sauvegarder
        for j, img in enumerate(images):
            img.save(f"{output_dir}/image_{i+j:04d}.png")

# Générer 100 images par lots de 4
prompts = [f"Un {animal} dans une forêt" for animal in ["chat", "chien", "renard"] * 34]
batch_generate_flux(prompts, "./flux_outputs", batch_size=4)
```

***

## Traitement audio par lots

### Transcription en batch avec Whisper

```python
import whisper
from pathlib import Path
from tqdm import tqdm
import json

model = whisper.load_model("large-v3")

def batch_transcribe(audio_files, output_dir):
    Path(output_dir).mkdir(exist_ok=True)
    results = {}

    for audio_path in tqdm(audio_files):
        try:
            result = model.transcribe(str(audio_path))

            results[audio_path.name] = {
                'text': result['text'],
                'language': result['language'],
                'segments': result['segments']
            }

            # Sauvegarder la transcription individuelle
            output_file = Path(output_dir) / f"{audio_path.stem}.json"
            with open(output_file, 'w') as f:
                json.dump(results[audio_path.name], f, indent=2)

        except Exception as e:
            print(f"Erreur lors du traitement de {audio_path} : {e}")
            results[audio_path.name] = {'error': str(e)}

    # Sauvegarder les résultats combinés
    with open(f"{output_dir}/all_transcripts.json", 'w') as f:
        json.dump(results, f, indent=2)

    return results

# Transcrire tous les fichiers audio dans le répertoire
audio_files = list(Path("./audio").glob("*.mp3"))
results = batch_transcribe(audio_files, "./transcripts")
```

### Whisper parallèle (plusieurs GPU)

```python
import whisper
from concurrent.futures import ProcessPoolExecutor
import torch

def transcribe_on_gpu(args):
    audio_path, gpu_id = args
    torch.cuda.set_device(gpu_id)
    model = whisper.load_model("large-v3", device=f"cuda:{gpu_id}")
    result = model.transcribe(audio_path)
    return audio_path, result['text']

def parallel_transcribe(audio_files, num_gpus=2):
    # Répartir les fichiers sur les GPU
    tasks = [(f, i % num_gpus) for i, f in enumerate(audio_files)]

    with ProcessPoolExecutor(max_workers=num_gpus) as executor:
        results = list(executor.map(transcribe_on_gpu, tasks))

    return dict(results)
```

***

## Traitement vidéo par lots

### Génération vidéo par lots (SVD)

```python
from diffusers import StableVideoDiffusionPipeline
from diffusers.utils import load_image, export_to_video
from pathlib import Path
from tqdm import tqdm
import torch

pipe = StableVideoDiffusionPipeline.from_pretrained(
    "stabilityai/stable-video-diffusion-img2vid-xt",
    torch_dtype=torch.float16,
    variant="fp16"
)
pipe.to("cuda")

def batch_generate_videos(image_paths, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    for img_path in tqdm(image_paths):
        try:
            image = load_image(str(img_path))
            image = image.resize((1024, 576))

            frames = pipe(
                image,
                num_frames=25,
                decode_chunk_size=8
            ).frames[0]

            output_path = Path(output_dir) / f"{img_path.stem}.mp4"
            export_to_video(frames, str(output_path), fps=7)

        except Exception as e:
            print(f"Erreur avec {img_path} : {e}")

# Traiter toutes les images
images = list(Path("./input_images").glob("*.png"))
batch_generate_videos(images, "./output_videos")
```

***

## Schémas de pipeline de données

### Pattern Producteur-Consommateur

```python
import asyncio
from asyncio import Queue

async def producer(queue, items):
    """Ajouter des éléments à la file"""
    for item in items:
        await queue.put(item)
    # Signaler la fin
    for _ in range(NUM_WORKERS):
        await queue.put(None)

async def consumer(queue, results, worker_id):
    """Traiter les éléments de la file"""
    while True:
        item = await queue.get()
        if item is None:
            break

        try:
            result = await process_item(item)
            results.append(result)
        except Exception as e:
            print(f"Worker {worker_id} error: {e}")

        queue.task_done()

async def run_pipeline(items, num_workers=5):
    queue = Queue(maxsize=100)
    results = []

    # Démarrer les workers
    workers = [
        asyncio.create_task(consumer(queue, results, i))
        for i in range(num_workers)
    ]

    # Démarrer le producteur
    await producer(queue, items)

    # Attendre la fin
    await asyncio.gather(*workers)

    return results

NUM_WORKERS = 5
items = list(range(1000))
results = asyncio.run(run_pipeline(items))
```

### Pattern Map-Reduce

```python
from concurrent.futures import ProcessPoolExecutor
from functools import reduce

def map_function(item):
    """Traiter un élément unique"""
    # Votre logique de traitement
    return process(item)

def reduce_function(results):
    """Combiner les résultats"""
    return combine(results)

def map_reduce(items, num_workers=4):
    # Phase de map
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        mapped = list(executor.map(map_function, items))

    # Phase de reduce
    result = reduce_function(mapped)

    return result
```

***

## Conseils d'optimisation

### 1. Dimensionner correctement la concurrence

```python
# LLM : Correspondre à la taille de batch max de vLLM
max_concurrent = 10  # valeur par défaut de vLLM

# Génération d'images : 1-4 selon la VRAM
max_concurrent = 2  # SD WebUI
max_concurrent = 4  # FLUX sur RTX 4090

# Transcription : 1 par GPU
max_concurrent = num_gpus
```

### 2. Ajustement de la taille des lots

```python
# Trop petit : sous-utilise le GPU
# Trop grand : erreurs OOM

# Tailles de batch pour la génération d'images :
# RTX 3060 : batch_size = 1
# RTX 3090 : batch_size = 2-4
# RTX 4090 : batch_size = 4-8
# A100 : batch_size = 8-16
```

### 3. Gestion de la mémoire

```python
import gc
import torch

def clear_memory():
    gc.collect()
    torch.cuda.empty_cache()

# Appeler entre de grands batches
for batch in batches:
    process_batch(batch)
    clear_memory()
```

### 4. Sauvegarder les résultats intermédiaires

```python
# Toujours faire un checkpoint pour les travaux de longue durée
CHECKPOINT_INTERVAL = 100

for i, item in enumerate(items):
    results.append(process(item))

    if i % CHECKPOINT_INTERVAL == 0:
        save_checkpoint(results, i)
```

***

## Optimisation des coûts

### Estimer avant d'exécuter

```python
def estimate_cost(num_items, time_per_item_sec, hourly_rate):
    total_hours = (num_items * time_per_item_sec) / 3600
    total_cost = total_hours * hourly_rate
    return total_hours, total_cost

# Exemple : 10 000 images à 3 s chacune sur RTX 4090
hours, cost = estimate_cost(10000, 3, 0.10)
print(f"Estimation : {hours:.1f} heures, ${cost:.2f}")
# Sortie : Estimation : 8.3 heures, $0.83
```

### Utiliser des instances spot

* 30-50% moins cher
* Bon pour les tâches par lots (interruptibles)
* Sauvegarder les checkpoints fréquemment

### Traitement hors pics

* Mettre les jobs en file pendant les heures de faible demande
* Souvent meilleure disponibilité des GPU
* Possiblement des prix spot plus bas

***

## Étapes suivantes

* [Intégration API](/guides/guides_v2-fr/avance/api-integration.md) - Construisez vos API
* [Configuration Multi-GPU](/guides/guides_v2-fr/avance/multi-gpu-setup.md) - Monter en charge
* [Calculateur de coût](/guides/guides_v2-fr/prise-en-main/cost-calculator.md) - Estimer les coûts


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.clore.ai/guides/guides_v2-fr/avance/batch-processing.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
