Реалізація пакетної обробки файлів (Batch Processing) на сервері

Наша компанія займається розробкою, підтримкою та обслуговуванням сайтів будь-якої складності. Від простих односторінкових сайтів до масштабних кластерних систем, побудованих на мікро сервісах. Досвід розробників підтверджено сертифікатами від вендорів.
Розробка та обслуговування будь-яких видів сайтів:
Інформаційні сайти або веб-програми
Сайти візитки, landing page, корпоративні сайти, онлайн каталоги, квіз, промо-сайти, блоги, ресурси новин, інформаційні портали, форуми, агрегатори
Сайти або веб-програми електронної комерції
Інтернет-магазини, B2B-портали, маркетплейси, онлайн-обмінники, кешбек-сайти, біржі, дропшиппінг-платформи, парсери товарів
Веб-програми для управління бізнес-процесами
CRM-системи, ERP-системи, корпоративні портали, системи управління виробництвом, парсери інформації
Сайти або веб-програми електронних послуг
Дошки оголошень, онлайн-школи, онлайн-кінотеатри, конструктори сайтів, портали надання електронних послуг, відеохостинги, тематичні портали

Це лише деякі з технічних типів сайтів, з якими ми працюємо, і кожен із них може мати свої специфічні особливості та функціональність, а також бути адаптованим під конкретні потреби та цілі клієнта.

Пропоновані послуги
Показано 1 з 1 послугУсі 2065 послуг
Реалізація пакетної обробки файлів (Batch Processing) на сервері
Складна
~3-5 робочих днів
Часті питання
Наші компетенції:
Етапи розробки
Останні роботи
  • image_website-b2b-advance_0.png
    Розробка сайту компанії B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    874
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Розробка веб-сайту для компанії ФІКСПЕР
    851

Реалізація пакетної обробки файлів (Batch Processing) на сервері

Пакетна обробка—це коли завдань багато, вони однотипні й потрібно виконати їх ефективно, не впавши під навантаженням. Імпорт 50 000 рядків з CSV, конвертація архіву з 3 000 зображень, нічна регенерація sitemap—все це batch-завдання з різними вимогами до часу виконання, пам'яті й обробки помилок.

Ключові проблеми batch-обробки

Пам'ять. Завантажувати весь CSV у масив—шлях до OOM. Правильний паттерн—потокове читання чанками.

Часткові помилки. Якщо з 10 000 рядків 50 невалідні—зупиняти весь процес невірно. Потрібна логіка: пропускаємо погані рядки, пишемо в лог, продовжуємо.

Воспроізводимість. Якщо процес упав на 7 000-й рядку—потрібна можливість продовжити з місця зупинки, а не починати заново.

Паралелізм. Послідовна обробка 50 000 записів по 100 мс кожна = майже 1,5 години. Розбивка на паралельні Job'и скорочує це кратно.

Архітектура: chunked + parallel jobs

Паттерн "Batch → Chunks → Jobs":

Завантаження файлу
     ↓
BatchImportJob (майстер-завдання)
     ↓ розбиває на чанки
[ChunkJob 1] [ChunkJob 2] ... [ChunkJob N]  ← паралельно
     ↓
BatchCompletedJob (агрегація результатів)

Laravel надає Bus::batch() для цього паттерну.

Реалізація на прикладі імпорту CSV

namespace App\Services;

use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\LazyCollection;

class CsvImportService
{
    private const CHUNK_SIZE = 500;

    public function startImport(string $filePath, int $importId): string
    {
        $jobs = [];

        // LazyCollection—потокове читання без завантаження в пам'ять
        LazyCollection::make(function () use ($filePath) {
            $handle = fopen($filePath, 'r');
            $header = fgetcsv($handle); // перша строка—заголовки

            while (($row = fgetcsv($handle)) !== false) {
                yield array_combine($header, $row);
            }

            fclose($handle);
        })
        ->chunk(self::CHUNK_SIZE)
        ->each(function ($chunk, $index) use (&$jobs, $importId) {
            $jobs[] = new ProcessCsvChunkJob(
                importId: $importId,
                chunkIndex: $index,
                rows: $chunk->values()->toArray()
            );
        });

        $batch = Bus::batch($jobs)
            ->name("csv-import-{$importId}")
            ->allowFailures() // продовжуємо при помилці окремих Job'ів
            ->then(function (Batch $batch) use ($importId) {
                Import::find($importId)?->update(['status' => 'completed']);
                ImportCompletedEvent::dispatch($importId);
            })
            ->catch(function (Batch $batch, \Throwable $e) use ($importId) {
                Import::find($importId)?->update([
                    'status'        => 'partially_failed',
                    'error_message' => $e->getMessage(),
                ]);
            })
            ->finally(function (Batch $batch) use ($importId) {
                $import = Import::find($importId);
                $import?->update([
                    'total_jobs'    => $batch->totalJobs,
                    'failed_jobs'   => $batch->failedJobs,
                    'finished_at'   => now(),
                ]);
            })
            ->onQueue('batch-processing')
            ->dispatch();

        Import::find($importId)?->update(['batch_id' => $batch->id]);

        return $batch->id;
    }
}

Job обробки чанка

// app/Jobs/ProcessCsvChunkJob.php
class ProcessCsvChunkJob implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $tries   = 3;
    public int $timeout = 120;
    public int $backoff = 10;

    public function __construct(
        private int   $importId,
        private int   $chunkIndex,
        private array $rows
    ) {}

    public function handle(): void
    {
        // Якщо весь batch був відмінений—зупиняємо
        if ($this->batch()?->cancelled()) {
            return;
        }

        $successCount = 0;
        $errors       = [];

        foreach ($this->rows as $lineNum => $row) {
            try {
                $this->processRow($row);
                $successCount++;
            } catch (\Throwable $e) {
                $errors[] = [
                    'chunk'  => $this->chunkIndex,
                    'line'   => $lineNum,
                    'data'   => array_slice($row, 0, 3), // перші 3 поля для діагностики
                    'error'  => $e->getMessage(),
                ];
            }
        }

        // Зберігаємо статистику чанка
        ImportChunkResult::create([
            'import_id'     => $this->importId,
            'chunk_index'   => $this->chunkIndex,
            'processed'     => count($this->rows),
            'succeeded'     => $successCount,
            'failed'        => count($errors),
            'errors'        => $errors,
        ]);

        // Атомарно оновлюємо лічильники імпорту
        Import::where('id', $this->importId)->increment('processed_rows', count($this->rows));
        Import::where('id', $this->importId)->increment('success_rows', $successCount);
    }

    private function processRow(array $row): void
    {
        // Валідація і збереження строки тут
        // Приклад:
        $validated = validator($row, [
            'email' => 'required|email',
            'name'  => 'required|string|max:255',
        ])->validate();

        User::updateOrCreate(
            ['email' => $validated['email']],
            ['name'  => $validated['name']]
        );
    }
}

Возобновлення переданого batch

Якщо сервер впав у середині обробки—Laravel Batch зберігає стан у таблиці job_batches. Завершені чанки не повторно запускаються. Незавершені—возобновляються автоматично при перезапуску воркера.

Принудовий рестарт незавершеного batch:

$batch = Bus::findBatch($batchId);
if ($batch && !$batch->finished()) {
    // Пересоздаємо незавершені jobs
    $pendingChunks = ImportChunkResult::where('import_id', $importId)
        ->pluck('chunk_index');

    // Логіка визначення необроблених чанків і їхнього повторного диспатчу
}

Прогрес у реальному часі

// app/Http/Controllers/ImportController.php
public function progress(int $importId): JsonResponse
{
    $import = Import::findOrFail($importId);
    $batch  = $import->batch_id ? Bus::findBatch($import->batch_id) : null;

    return response()->json([
        'status'          => $import->status,
        'processed_rows'  => $import->processed_rows,
        'success_rows'    => $import->success_rows,
        'total_rows'      => $import->total_rows,
        'percentage'      => $import->total_rows > 0
            ? round($import->processed_rows / $import->total_rows * 100, 1)
            : 0,
        'batch' => $batch ? [
            'total_jobs'      => $batch->totalJobs,
            'pending_jobs'    => $batch->pendingJobs,
            'failed_jobs'     => $batch->failedJobs,
            'progress'        => $batch->progress(),
        ] : null,
    ]);
}

Обмеження навантаження

Для batch-очереди потрібен окремий пул воркерів з обмеженим паралелізмом, щоб не забити всю БД або CPU:

[program:batch-worker]
command=php artisan queue:work --queue=batch-processing --max-jobs=50 --sleep=3 --timeout=120
numprocs=4
autostart=true
autorestart=true

numprocs=4—чотири воркери, кожен обробляє чанки послідовно. --max-jobs=50—після 50 завдань воркер перезапускається, звільняючи пам'ять.

Обробка файлів у кількох форматах

Той же паттерн працює для зображень, JSON, XLSX. Для XLSX використовуємо PhpSpreadsheet у потоковому режимі:

use PhpOffice\PhpSpreadsheet\Reader\Xlsx;

$reader = new Xlsx();
$reader->setReadDataOnly(true);
$spreadsheet = $reader->load($filePath);

$worksheet = $spreadsheet->getActiveSheet();
$highestRow = $worksheet->getHighestDataRow();

// Читаємо чанками по 500 рядків
for ($startRow = 2; $startRow <= $highestRow; $startRow += 500) {
    $endRow = min($startRow + 499, $highestRow);
    $rows   = $worksheet->rangeToArray("A{$startRow}:Z{$endRow}");
    // диспатч Job для чанка
}

Таймлайн

Базовий імпорт CSV з чанками й прогресом—1 робочий день. Додавання возобновлення, детального лога помилок, endpoint прогресу—ще 6–8 годин. Підтримка XLSX і JSON форматів—плюс 4–6 годин.