# Пакетная обработка

Обрабатывайте большие объемы работы эффективно на GPU CLORE.AI.

{% hint style="success" %}
Найдите подходящий GPU на [CLORE.AI Marketplace](https://clore.ai/marketplace).
{% endhint %}

## Использование clore-ai SDK для пакетной инфраструктуры (рекомендуется)

Официальный SDK упрощает предоставление пакетных GPU с поддержкой async:

```python
import asyncio
from clore_ai import AsyncCloreAI

async def batch_deploy(server_ids):
    """Развернуть на нескольких серверах одновременно."""
    async with AsyncCloreAI() as client:
        tasks = [
            client.create_order(
                server_id=sid,
                image="cloreai/ubuntu22.04-cuda12",
                type="on-demand",
                currency="bitcoin",
                ssh_password="BatchPass123",
                ports={"22": "tcp"}
            )
            for sid in server_ids
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for sid, result in zip(server_ids, results):
            if isinstance(result, Exception):
                print(f"❌ Server {sid}: {result}")
            else:
                print(f"✅ Server {sid}: Order {result.id}")
        return results

# Развернуть сразу на 5 серверах
asyncio.run(batch_deploy([142, 305, 891, 450, 612]))
```

→ См. [Руководство по Python SDK](https://docs.clore.ai/guides/guides_v2-ru/prodvinutoe/python-sdk) и [Автоматизация CLI](https://docs.clore.ai/guides/guides_v2-ru/prodvinutoe/cli-automation) для получения дополнительной информации.

***

## Когда использовать пакетную обработку

* Обработка сотен/тысяч элементов
* Преобразование больших наборов данных
* Генерация множества изображений/видео
* Массовая транскрипция
* Подготовка данных для обучения

***

## Пакетная обработка LLM

### vLLM Batch API

vLLM автоматически обрабатывает батчинг с непрерывной пакетной обработкой:

```python
from openai import OpenAI
import asyncio
import aiohttp

client = OpenAI(base_url="http://server:8000/v1", api_key="dummy")

# Синхронный батч
def process_batch_sync(prompts):
    results = []
    for prompt in prompts:
        response = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=[{"role": "user", "content": prompt}]
        )
        results.append(response.choices[0].message.content)
    return results

# Обработать 100 подсказок
prompts = [f"Summarize topic {i}" for i in range(100)]
results = process_batch_sync(prompts)
```

### Асинхронная пакетная обработка (быстрее)

```python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_single(prompt):
    response = await client.chat.completions.create(
        model="meta-llama/Llama-3.1-8B-Instruct",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def process_batch_async(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_process(prompt):
        async with semaphore:
            return await process_single(prompt)

    tasks = [limited_process(p) for p in prompts]
    return await asyncio.gather(*tasks)

# Обработать 1000 подсказок с 10 одновременными запросами
prompts = [f"Generate description for product {i}" for i in range(1000)]
results = asyncio.run(process_batch_async(prompts, max_concurrent=10))
```

### Батч с отслеживанием прогресса

```python
import asyncio
from tqdm.asyncio import tqdm
from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="http://server:8000/v1", api_key="dummy")

async def process_with_progress(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []

    async def process_one(prompt, idx):
        async with semaphore:
            response = await client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompt}]
            )
            return idx, response.choices[0].message.content

    tasks = [process_one(p, i) for i, p in enumerate(prompts)]

    for coro in tqdm.as_completed(tasks, total=len(tasks)):
        idx, result = await coro
        results.append((idx, result))

    # Отсортировать по исходному порядку
    results.sort(key=lambda x: x[0])
    return [r[1] for r in results]

# Запуск
prompts = ["..." for _ in range(500)]
results = asyncio.run(process_with_progress(prompts))
```

### Сохранять прогресс для длинных пакетов

```python
import json
from pathlib import Path

def process_batch_with_checkpoint(prompts, checkpoint_file="checkpoint.json"):
    # Загрузить контрольную точку
    checkpoint = Path(checkpoint_file)
    if checkpoint.exists():
        with open(checkpoint) as f:
            data = json.load(f)
            results = data['results']
            start_idx = data['last_completed'] + 1
        print(f"Возобновление с индекса {start_idx}")
    else:
        results = [None] * len(prompts)
        start_idx = 0

    # Обработать оставшиеся
    for i in range(start_idx, len(prompts)):
        try:
            response = client.chat.completions.create(
                model="meta-llama/Llama-3.1-8B-Instruct",
                messages=[{"role": "user", "content": prompts[i]}]
            )
            results[i] = response.choices[0].message.content

            # Сохранять контрольную точку каждые 10 элементов
            if i % 10 == 0:
                with open(checkpoint_file, 'w') as f:
                    json.dump({'results': results, 'last_completed': i}, f)
                print(f"Контрольная точка сохранена на {i}")

        except Exception as e:
            print(f"Ошибка на {i}: {e}")
            # Сохранить контрольную точку при ошибке
            with open(checkpoint_file, 'w') as f:
                json.dump({'results': results, 'last_completed': i - 1}, f)
            raise

    # Удалить контрольную точку по завершении
    if checkpoint.exists():
        checkpoint.unlink()

    return results
```

***

## Пакетная генерация изображений

### SD WebUI Batch

```python
import requests
import base64
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

SD_API = "http://server:7860"

def generate_image(prompt, output_path):
    response = requests.post(f'{SD_API}/sdapi/v1/txt2img', json={
        'prompt': prompt,
        'negative_prompt': 'blurry, low quality',
        'steps': 20,
        'width': 512,
        'height': 512
    })

    image_data = base64.b64decode(response.json()['images'][0])

    with open(output_path, 'wb') as f:
        f.write(image_data)

    return output_path

def batch_generate(prompts, output_dir, max_workers=4):
    Path(output_dir).mkdir(exist_ok=True)

    tasks = [
        (prompt, f"{output_dir}/image_{i:04d}.png")
        for i, prompt in enumerate(prompts)
    ]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(tqdm(
            executor.map(lambda x: generate_image(*x), tasks),
            total=len(tasks)
        ))

    return results

# Сгенерировать 100 изображений
prompts = [f"A beautiful landscape, style {i}" for i in range(100)]
batch_generate(prompts, "./outputs", max_workers=4)
```

### ComfyUI Batch с очередью

```python
import json
import urllib.request
import time
from pathlib import Path

SERVER = "server:8188"

def queue_prompt(workflow):
    data = json.dumps({"prompt": workflow}).encode('utf-8')
    req = urllib.request.Request(f"http://{SERVER}/prompt", data=data)
    return json.loads(urllib.request.urlopen(req).read())

def get_history(prompt_id):
    with urllib.request.urlopen(f"http://{SERVER}/history/{prompt_id}") as response:
        return json.loads(response.read())

def batch_generate_comfyui(prompts, base_workflow_path, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    # Загрузить базовый workflow
    with open(base_workflow_path) as f:
        base_workflow = json.load(f)

    prompt_ids = []

    # Поместить все подсказки в очередь
    for i, prompt in enumerate(prompts):
        workflow = base_workflow.copy()
        # Изменить узел подсказки (при необходимости отрегулируйте ID узла)
        workflow["6"]["inputs"]["text"] = prompt
        # Установить имя выходного файла
        workflow["9"]["inputs"]["filename_prefix"] = f"batch_{i:04d}"

        result = queue_prompt(workflow)
        prompt_ids.append(result['prompt_id'])
        print(f"В очереди {i+1}/{len(prompts)}")

    # Ожидание завершения
    print("Ожидание генерации...")
    completed = set()
    while len(completed) < len(prompt_ids):
        for pid in prompt_ids:
            if pid not in completed:
                history = get_history(pid)
                if pid in history:
                    completed.add(pid)
                    print(f"Завершено {len(completed)}/{len(prompt_ids)}")
        time.sleep(1)

    print("Все готово!")
```

### FLUX пакетная обработка

```python
import torch
from diffusers import FluxPipeline
from pathlib import Path
from tqdm import tqdm

# Загрузить модель один раз
pipe = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-schnell",
    torch_dtype=torch.bfloat16
)
pipe.to("cuda")

def batch_generate_flux(prompts, output_dir, batch_size=4):
    Path(output_dir).mkdir(exist_ok=True)

    for i in tqdm(range(0, len(prompts), batch_size)):
        batch_prompts = prompts[i:i + batch_size]

        # Сгенерировать пакет
        images = pipe(
            batch_prompts,
            height=1024,
            width=1024,
            num_inference_steps=4,
            guidance_scale=0.0
        ).images

        # Сохранить
        for j, img in enumerate(images):
            img.save(f"{output_dir}/image_{i+j:04d}.png")

# Сгенерировать 100 изображений пакетами по 4
prompts = [f"A {animal} in a forest" for animal in ["cat", "dog", "fox"] * 34]
batch_generate_flux(prompts, "./flux_outputs", batch_size=4)
```

***

## Аудио пакетная обработка

### Whisper пакетная транскрипция

```python
import whisper
from pathlib import Path
from tqdm import tqdm
import json

model = whisper.load_model("large-v3")

def batch_transcribe(audio_files, output_dir):
    Path(output_dir).mkdir(exist_ok=True)
    results = {}

    for audio_path in tqdm(audio_files):
        try:
            result = model.transcribe(str(audio_path))

            results[audio_path.name] = {
                'text': result['text'],
                'language': result['language'],
                'segments': result['segments']
            }

            # Сохранить отдельную расшифровку
            output_file = Path(output_dir) / f"{audio_path.stem}.json"
            with open(output_file, 'w') as f:
                json.dump(results[audio_path.name], f, indent=2)

        except Exception as e:
            print(f"Ошибка при обработке {audio_path}: {e}")
            results[audio_path.name] = {'error': str(e)}

    # Сохранить объединенные результаты
    with open(f"{output_dir}/all_transcripts.json", 'w') as f:
        json.dump(results, f, indent=2)

    return results

# Расшифровать все аудиофайлы в каталоге
audio_files = list(Path("./audio").glob("*.mp3"))
results = batch_transcribe(audio_files, "./transcripts")
```

### Параллельный Whisper (несколько GPU)

```python
import whisper
from concurrent.futures import ProcessPoolExecutor
import torch

def transcribe_on_gpu(args):
    audio_path, gpu_id = args
    torch.cuda.set_device(gpu_id)
    model = whisper.load_model("large-v3", device=f"cuda:{gpu_id}")
    result = model.transcribe(audio_path)
    return audio_path, result['text']

def parallel_transcribe(audio_files, num_gpus=2):
    # Распределить файлы по GPU
    tasks = [(f, i % num_gpus) for i, f in enumerate(audio_files)]

    with ProcessPoolExecutor(max_workers=num_gpus) as executor:
        results = list(executor.map(transcribe_on_gpu, tasks))

    return dict(results)
```

***

## Видео пакетная обработка

### Пакетная генерация видео (SVD)

```python
from diffusers import StableVideoDiffusionPipeline
from diffusers.utils import load_image, export_to_video
from pathlib import Path
from tqdm import tqdm
import torch

pipe = StableVideoDiffusionPipeline.from_pretrained(
    "stabilityai/stable-video-diffusion-img2vid-xt",
    torch_dtype=torch.float16,
    variant="fp16"
)
pipe.to("cuda")

def batch_generate_videos(image_paths, output_dir):
    Path(output_dir).mkdir(exist_ok=True)

    for img_path in tqdm(image_paths):
        try:
            image = load_image(str(img_path))
            image = image.resize((1024, 576))

            frames = pipe(
                image,
                num_frames=25,
                decode_chunk_size=8
            ).frames[0]

            output_path = Path(output_dir) / f"{img_path.stem}.mp4"
            export_to_video(frames, str(output_path), fps=7)

        except Exception as e:
            print(f"Ошибка с {img_path}: {e}")

# Обработать все изображения
images = list(Path("./input_images").glob("*.png"))
batch_generate_videos(images, "./output_videos")
```

***

## Шаблоны конвейера данных

### Шаблон производитель-потребитель

```python
import asyncio
from asyncio import Queue

async def producer(queue, items):
    """Добавить элементы в очередь"""
    for item in items:
        await queue.put(item)
    # Сигнал о завершении
    for _ in range(NUM_WORKERS):
        await queue.put(None)

async def consumer(queue, results, worker_id):
    """Обрабатывать элементы из очереди"""
    while True:
        item = await queue.get()
        if item is None:
            break

        try:
            result = await process_item(item)
            results.append(result)
        except Exception as e:
            print(f"Worker {worker_id} error: {e}")

        queue.task_done()

async def run_pipeline(items, num_workers=5):
    queue = Queue(maxsize=100)
    results = []

    # Запустить воркеры
    workers = [
        asyncio.create_task(consumer(queue, results, i))
        for i in range(num_workers)
    ]

    # Запустить производителя
    await producer(queue, items)

    # Ожидание завершения
    await asyncio.gather(*workers)

    return results

NUM_WORKERS = 5
items = list(range(1000))
results = asyncio.run(run_pipeline(items))
```

### Шаблон Map-Reduce

```python
from concurrent.futures import ProcessPoolExecutor
from functools import reduce

def map_function(item):
    """Обработать один элемент"""
    # Ваша логика обработки
    return process(item)

def reduce_function(results):
    """Объединить результаты"""
    return combine(results)

def map_reduce(items, num_workers=4):
    # Фаза Map
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        mapped = list(executor.map(map_function, items))

    # Фаза Reduce
    result = reduce_function(mapped)

    return result
```

***

## Советы по оптимизации

### 1. Подберите правильную степень параллелизма

```python
# LLM: Соответствовать максимальному размеру батча vLLM
max_concurrent = 10  # значение по умолчанию для vLLM

# Генерация изображений: 1-4 в зависимости от VRAM
max_concurrent = 2  # SD WebUI
max_concurrent = 4  # FLUX на RTX 4090

# Транскрипция: 1 на GPU
max_concurrent = num_gpus
```

### 2. Настройка размера батча

```python
# Слишком маленький: недостаточная загрузка GPU
# Слишком большой: ошибки OOM

# Размеры батчей для генерации изображений:
# RTX 3060: batch_size = 1
# RTX 3090: batch_size = 2-4
# RTX 4090: batch_size = 4-8
# A100: batch_size = 8-16
```

### 3. Управление памятью

```python
import gc
import torch

def clear_memory():
    gc.collect()
    torch.cuda.empty_cache()

# Вызывать между большими батчами
for batch in batches:
    process_batch(batch)
    clear_memory()
```

### 4. Сохранение промежуточных результатов

```python
# Всегда делайте контрольные точки для долгих задач
CHECKPOINT_INTERVAL = 100

for i, item in enumerate(items):
    results.append(process(item))

    if i % CHECKPOINT_INTERVAL == 0:
        save_checkpoint(results, i)
```

***

## Оптимизация затрат

### Оцените перед запуском

```python
def estimate_cost(num_items, time_per_item_sec, hourly_rate):
    total_hours = (num_items * time_per_item_sec) / 3600
    total_cost = total_hours * hourly_rate
    return total_hours, total_cost

# Пример: 10 000 изображений по 3 с каждое на RTX 4090
hours, cost = estimate_cost(10000, 3, 0.10)
print(f"Оценка: {hours:.1f} часов, ${cost:.2f}")
# Вывод: Оценка: 8.3 часов, $0.83
```

### Используйте Spot-инстансы

* Дешевле на 30–50%
* Подходят для пакетных задач (прерываемых)
* Часто сохраняйте контрольные точки

### Обработка в периоды низкой загрузки

* Ставьте задачи в очередь в часы низкого спроса
* Часто лучшая доступность GPU
* Возможно, более низкие цены на spot-инстансы

***

## Дальнейшие шаги

* [Интеграция API](https://docs.clore.ai/guides/guides_v2-ru/prodvinutoe/api-integration) - Постройте ваши API
* [Настройка Multi-GPU](https://docs.clore.ai/guides/guides_v2-ru/prodvinutoe/multi-gpu-setup) - Масштабируйтесь
* [Калькулятор затрат](https://docs.clore.ai/guides/guides_v2-ru/nachalo-raboty/cost-calculator) - Оцените затраты
