> For the complete documentation index, see [llms.txt](https://docs.clore.ai/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://docs.clore.ai/guides/guides_v2-hi/advanced/batch-processing.md).

# बैच प्रोसेसिंग

CLORE.AI GPUs पर बड़े वर्कलोड को प्रभावी ढंग से प्रोसेस करें।

{% hint style="success" %}
सही GPU खोजें [CLORE.AI मार्केटप्लेस](https://clore.ai/marketplace).
{% endhint %}

## बैच इन्फ्रास्ट्रक्चर के लिए clore-ai SDK का उपयोग (अनुशंसित)

आधिकारिक SDK async समर्थन के साथ बैच GPU प्रोविज़निंग को सरल बनाता है:

```python
import asyncio
from clore_ai import AsyncCloreAI

async def batch_deploy(server_ids):
    """एक साथ कई सर्वरों पर तैनात करें."""
    async with AsyncCloreAI() as client:
        tasks = [
            client.create_order(
                server_id=sid,
                image="cloreai/ubuntu22.04-cuda12",
                type="on-demand",
                currency="bitcoin",
                ssh_password="BatchPass123",
                ports={"22": "tcp"}
            )
            for sid in server_ids
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for sid, result in zip(server_ids, results):
            if isinstance(result, Exception):
                print(f"❌ Server {sid}: {result}")
            else:
                print(f"✅ Server {sid}: Order {result.id}")
        return results

# एक साथ 5 सर्वरों पर तैनात करें
asyncio.run(batch_deploy([142, 305, 891, 450, 612]))
```

→ देखें [Python SDK गाइड](/guides/guides_v2-hi/advanced/python-sdk.md) और [CLI ऑटोमेशन](/guides/guides_v2-hi/advanced/cli-automation.md) और अधिक के लिए।

***

## कब बैच प्रोसेसिंग उपयोग करें

* सैकड़ों/हजारों आइटम प्रोसेस करना
* बड़े डेटासेट को परिवर्तित करना
* कई इमेज/वीडियो जनरेट करना
* बुल्क ट्रांसक्रिप्शन
* ट्रेनिंग डेटा की तैयारी

***

## LLM बैच प्रोसेसिंग

### vLLM बैच API

vLLM सतत बैचिंग के साथ स्वचालित रूप से बैचिंग को संभालता है:

```python
from openai import OpenAI
import asyncio
import aiohttp

client = OpenAI(base_url="http://server:8000/v1", api_key="dummy")

# सिंक्रोनस बैच
def process_batch_sync(prompts):
    results = []
    for prompt in prompts:
        response = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=[{"role": "user", "content": prompt}]
        )
        results.append(response.choices[0].message.content)
    return results

# 100 प्रॉम्प्ट्स प्रोसेस करें
prompts = [f"Summarize topic {i}" for i in range(100)]
results = process_batch_sync(prompts)
```

### Async बैच प्रोसेसिंग (तेज़)

```python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_single(prompt):
    response = await client.chat.completions.create(
        model="meta-llama/Llama-3.1-8B-Instruct",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def process_batch_async(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_process(prompt):
        async with semaphore:
            return await process_single(prompt)

    tasks = [limited_process(p) for p in prompts]
    return await asyncio.gather(*tasks)

# 10 समवर्ती अनुरोधों के साथ 1000 प्रॉम्प्ट्स प्रोसेस करें
prompts = [f"Generate description for product {i}" for i in range(1000)]
results = asyncio.run(process_batch_async(prompts, max_concurrent=10))
```

### प्रगति ट्रैकिंग के साथ बैच

```python
import asyncio
from tqdm.asyncio import tqdm
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_with_progress(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []

    async def process_one(prompt, idx):
        async with semaphore:
            response = await client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompt}]
            )
            return idx, response.choices[0].message.content

    tasks = [process_one(p, i) for i, p in enumerate(prompts)]

    for coro in tqdm.as_completed(tasks, total=len(tasks)):
        idx, result = await coro
        results.append((idx, result))

    # मूल क्रम के अनुसार सॉर्ट करें
    results.sort(key=lambda x: x[0])
    return [r[1] for r in results]

# चलाएँ
prompts = ["..." for _ in range(500)]
results = asyncio.run(process_with_progress(prompts))
```

### लंबे बैच के लिए प्रगति सहेजें

```python
import json
from pathlib import Path

def process_batch_with_checkpoint(prompts, checkpoint_file="checkpoint.json"):
    # चेकपॉइंट लोड करें
    checkpoint = Path(checkpoint_file)
    if checkpoint.exists():
        with open(checkpoint) as f:
            data = json.load(f)
            results = data['results']
            start_idx = data['last_completed'] + 1
        print(f"Resuming from index {start_idx}")
    else:
        results = [None] * len(prompts)
        start_idx = 0

    # शेष प्रोसेस करें
    for i in range(start_idx, len(prompts)):
        try:
            response = client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompts[i]}]
            )
            results[i] = response.choices[0].message.content

            # हर 10 आइटम पर चेकपॉइंट सहेजें
            if i % 10 == 0:
                with open(checkpoint_file, 'w') as f:
                    json.dump({'results': results, 'last_completed': i}, f)
                print(f"Checkpoint saved at {i}")

        except Exception as e:
            print(f"Error at {i}: {e}")
            # त्रुटि पर चेकपॉइंट सहेजें
            with open(checkpoint_file, 'w') as f:
                json.dump({'results': results, 'last_completed': i - 1}, f)
            raise

    # पूर्णता पर चेकपॉइंट हटाएँ
    if checkpoint.exists():
        checkpoint.unlink()

    return results
```

***

## इमेज जनरेशन बैच

### SD WebUI बैच

```python
import requests
import base64
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

SD_API = "http://server:7860"

def generate_image(prompt, output_path):
    response = requests.post(f'{SD_API}/sdapi/v1/txt2img', json={
        'prompt': prompt,
        'negative_prompt': 'blurry, low quality',
        'steps': 20,
        'width': 512,
        'height': 512
    })

    image_data = base64.b64decode(response.json()['images'][0])

    with open(output_path, 'wb') as f:
        f.write(image_data)

    return output_path

def batch_generate(prompts, output_dir, max_workers=4):
    Path(output_dir).mkdir(exist_ok=True)

    tasks = [
        (prompt, f"{output_dir}/image_{i:04d}.png")
        for i, prompt in enumerate(prompts)
    ]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(tqdm(
            executor.map(lambda x: generate_image(*x), tasks),
            total=len(tasks)
        ))

    return results

# 100 इमेज जनरेट करें
prompts = [f"A beautiful landscape, style {i}" for i in range(100)]
batch_generate(prompts, "./outputs", max_workers=4)
```

### ComfyUI कतार के साथ बैच

```python
import json
import urllib.request
import time
from pathlib import Path

SERVER = "server:8188"

def queue_prompt(workflow):
    data = json.dumps({"prompt": workflow}).encode('utf-8')
    req = urllib.request.Request(f"http://{SERVER}/prompt", data=data)
    return json.loads(urllib.request.urlopen(req).read())

def get_history(prompt_id):
    with urllib.request.urlopen(f"http://{SERVER}/history/{prompt_id}") as response:
        return json.loads(response.read())

def batch_generate_comfyui(prompts, base_workflow_path, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    # बेस वर्कफ़्लो लोड करें
    with open(base_workflow_path) as f:
        base_workflow = json.load(f)

    prompt_ids = []

    # सभी प्रॉम्प्ट्स को कतार में डालें
    for i, prompt in enumerate(prompts):
        workflow = base_workflow.copy()
        # प्रॉम्प्ट नोड संशोधित करें (आवश्यकतानुसार नोड ID समायोजित करें)
        workflow["6"]["inputs"]["text"] = prompt
        # आउटपुट फ़ाइलनाम सेट करें
        workflow["9"]["inputs"]["filename_prefix"] = f"batch_{i:04d}"

        result = queue_prompt(workflow)
        prompt_ids.append(result['prompt_id'])
        print(f"Queued {i+1}/{len(prompts)}")

    # पूर्णता के लिए प्रतीक्षा करें
    print("Waiting for generation...")
    completed = set()
    while len(completed) < len(prompt_ids):
        for pid in prompt_ids:
            if pid not in completed:
                history = get_history(pid)
                if pid in history:
                    completed.add(pid)
                    print(f"Completed {len(completed)}/{len(prompt_ids)}")
        time.sleep(1)

    print("All done!")
```

### FLUX बैच प्रोसेसिंग

```python
import torch
from diffusers import FluxPipeline
from pathlib import Path
from tqdm import tqdm

# मॉडल एक बार लोड करें
pipe = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-schnell",
    torch_dtype=torch.bfloat16
)
pipe.to("cuda")

def batch_generate_flux(prompts, output_dir, batch_size=4):
    Path(output_dir).mkdir(exist_ok=True)

    for i in tqdm(range(0, len(prompts), batch_size)):
        batch_prompts = prompts[i:i + batch_size]

        # बैच जनरेट करें
        images = pipe(
            batch_prompts,
            height=1024,
            width=1024,
            num_inference_steps=4,
            guidance_scale=0.0
        ).images

        # सहेजें
        for j, img in enumerate(images):
            img.save(f"{output_dir}/image_{i+j:04d}.png")

# 4 के बैच में 100 इमेज जनरेट करें
prompts = [f"A {animal} in a forest" for animal in ["cat", "dog", "fox"] * 34]
batch_generate_flux(prompts, "./flux_outputs", batch_size=4)
```

***

## ऑडियो बैच प्रोसेसिंग

### Whisper बैच ट्रांसक्रिप्शन

```python
import whisper
from pathlib import Path
from tqdm import tqdm
import json

model = whisper.load_model("large-v3")

def batch_transcribe(audio_files, output_dir):
    Path(output_dir).mkdir(exist_ok=True)
    results = {}

    for audio_path in tqdm(audio_files):
        try:
            result = model.transcribe(str(audio_path))

            results[audio_path.name] = {
                'text': result['text'],
                'language': result['language'],
                'segments': result['segments']
            }

            # व्यक्तिगत ट्रांसक्रिप्ट सहेजें
            output_file = Path(output_dir) / f"{audio_path.stem}.json"
            with open(output_file, 'w') as f:
                json.dump(results[audio_path.name], f, indent=2)

        except Exception as e:
            print(f"Error processing {audio_path}: {e}")
            results[audio_path.name] = {'error': str(e)}

    # संयुक्त परिणाम सहेजें
    with open(f"{output_dir}/all_transcripts.json", 'w') as f:
        json.dump(results, f, indent=2)

    return results

# निर्देशिका में सभी ऑडियो फ़ाइलों का ट्रांसक्राइब करें
audio_files = list(Path("./audio").glob("*.mp3"))
results = batch_transcribe(audio_files, "./transcripts")
```

### पैरलल Whisper (कई GPUs)

```python
import whisper
from concurrent.futures import ProcessPoolExecutor
import torch

def transcribe_on_gpu(args):
    audio_path, gpu_id = args
    torch.cuda.set_device(gpu_id)
    model = whisper.load_model("large-v3", device=f"cuda:{gpu_id}")
    result = model.transcribe(audio_path)
    return audio_path, result['text']

def parallel_transcribe(audio_files, num_gpus=2):
    # GPUs में फ़ाइलें वितरित करें
    tasks = [(f, i % num_gpus) for i, f in enumerate(audio_files)]

    with ProcessPoolExecutor(max_workers=num_gpus) as executor:
        results = list(executor.map(transcribe_on_gpu, tasks))

    return dict(results)
```

***

## वीडियो बैच प्रोसेसिंग

### बैच वीडियो जनरेशन (SVD)

```python
from diffusers import StableVideoDiffusionPipeline
from diffusers.utils import load_image, export_to_video
from pathlib import Path
from tqdm import tqdm
import torch

pipe = StableVideoDiffusionPipeline.from_pretrained(
    "stabilityai/stable-video-diffusion-img2vid-xt",
    torch_dtype=torch.float16,
    variant="fp16"
)
pipe.to("cuda")

def batch_generate_videos(image_paths, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    for img_path in tqdm(image_paths):
        try:
            image = load_image(str(img_path))
            image = image.resize((1024, 576))

            frames = pipe(
                image,
                num_frames=25,
                decode_chunk_size=8
            ).frames[0]

            output_path = Path(output_dir) / f"{img_path.stem}.mp4"
            export_to_video(frames, str(output_path), fps=7)

        except Exception as e:
            print(f"Error with {img_path}: {e}")

# सभी इमेजेस प्रोसेस करें
images = list(Path("./input_images").glob("*.png"))
batch_generate_videos(images, "./output_videos")
```

***

## डेटा पाइपलाइन पैटर्न

### प्रोड्यूसर-कंज्यूमर पैटर्न

```python
import asyncio
from asyncio import Queue

async def producer(queue, items):
    """आइटम्स को कतार में जोड़ें"""
    for item in items:
        await queue.put(item)
    # पूर्णता का संकेत दें
    for _ in range(NUM_WORKERS):
        await queue.put(None)

async def consumer(queue, results, worker_id):
    """कतार से आइटम्स को प्रोसेस करें"""
    while True:
        item = await queue.get()
        if item is None:
            break

        try:
            result = await process_item(item)
            results.append(result)
        except Exception as e:
            print(f"Worker {worker_id} error: {e}")

        queue.task_done()

async def run_pipeline(items, num_workers=5):
    queue = Queue(maxsize=100)
    results = []

    # वर्कर्स शुरू करें
    workers = [
        asyncio.create_task(consumer(queue, results, i))
        for i in range(num_workers)
    ]

    # प्रोड्यूसर शुरू करें
    await producer(queue, items)

    # पूर्णता के लिए प्रतीक्षा करें
    await asyncio.gather(*workers)

    return results

NUM_WORKERS = 5
items = list(range(1000))
results = asyncio.run(run_pipeline(items))
```

### मैप-रिड्यूस पैटर्न

```python
from concurrent.futures import ProcessPoolExecutor
from functools import reduce

def map_function(item):
    """एक आइटम प्रोसेस करें"""
    # आपकी प्रोसेसिंग लॉजिक
    return process(item)

def reduce_function(results):
    """परिणामों को संयोजित करें"""
    return combine(results)

def map_reduce(items, num_workers=4):
    # मैप चरण
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        mapped = list(executor.map(map_function, items))

    # रिड्यूस चरण
    result = reduce_function(mapped)

    return result
```

***

## ऑप्टिमाइज़ेशन टिप्स

### 1. समुचित समवर्तीता (Right-Size Concurrency)

```python
# LLM: vLLM के अधिकतम बैच आकार से मेल खाएँ
max_concurrent = 10  # vLLM डिफ़ॉल्ट

# इमेज जनरेशन: VRAM के अनुसार 1-4
max_concurrent = 2  # SD WebUI
max_concurrent = 4  # FLUX on RTX 4090

# ट्रांसक्रिप्शन: प्रति GPU 1
max_concurrent = num_gpus
```

### 2. बैच आकार ट्यूनिंग

```python
# बहुत छोटा: GPU का उपयोग कम करता है
# बहुत बड़ा: OOM त्रुटियाँ

# इमेज जनरेशन बैच आकार:
# RTX 3060: batch_size = 1
# RTX 3090: batch_size = 2-4
# RTX 4090: batch_size = 4-8
# A100: batch_size = 8-16
```

### 3. मेमोरी प्रबंधन

```python
import gc
import torch

def clear_memory():
    gc.collect()
    torch.cuda.empty_cache()

# बड़े बैच के बीच कॉल करें
for batch in batches:
    process_batch(batch)
    clear_memory()
```

### 4. मध्यवर्ती परिणाम सहेजें

```python
# लंबे चलने वाले जॉब्स के लिए हमेशा चेकपॉइंट करें
CHECKPOINT_INTERVAL = 100

for i, item in enumerate(items):
    results.append(process(item))

    if i % CHECKPOINT_INTERVAL == 0:
        save_checkpoint(results, i)
```

***

## लागत अनुकूलन

### चलाने से पहले अनुमान लगाएँ

```python
def estimate_cost(num_items, time_per_item_sec, hourly_rate):
    total_hours = (num_items * time_per_item_sec) / 3600
    total_cost = total_hours * hourly_rate
    return total_hours, total_cost

# उदाहरण: RTX 4090 पर 3 सेकंड प्रति इमेज के हिसाब से 10,000 इमेजेस
hours, cost = estimate_cost(10000, 3, 0.10)
print(f"Estimated: {hours:.1f} hours, ${cost:.2f}")
# आउटपुट: Estimated: 8.3 hours, $0.83
```

### Spot इंस्टेंस का उपयोग करें

* 30-50% सस्ता
* बैच जॉब्स के लिए अच्छा (इंटरप्ट होने योग्य)
* अक्सर चेकपॉइंट्स को बार-बार सहेजें

### ऑफ़-पीक प्रोसेसिंग

* कम मांग वाले घंटों के दौरान जॉब्स कतारबद्ध करें
* अक्सर बेहतर GPU उपलब्धता
* संभावित रूप से कम स्पॉट कीमतें

***

## अगले कदम

* [API एकीकरण](/guides/guides_v2-hi/advanced/api-integration.md) - अपने APIs बनाएं
* [मल्टी-GPU सेटअप](/guides/guides_v2-hi/advanced/multi-gpu-setup.md) - ऊपर स्केल करें
* [लागत कैलकुलेटर](/guides/guides_v2-hi/getting-started/cost-calculator.md) - लागत का अनुमान लगाएँ


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://docs.clore.ai/guides/guides_v2-hi/advanced/batch-processing.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
