Реалізація пакетної обробки файлів (Batch Processing) на сервері
Пакетна обробка—це коли завдань багато, вони однотипні й потрібно виконати їх ефективно, не впавши під навантаженням. Імпорт 50 000 рядків з CSV, конвертація архіву з 3 000 зображень, нічна регенерація sitemap—все це batch-завдання з різними вимогами до часу виконання, пам'яті й обробки помилок.
Ключові проблеми batch-обробки
Пам'ять. Завантажувати весь CSV у масив—шлях до OOM. Правильний паттерн—потокове читання чанками.
Часткові помилки. Якщо з 10 000 рядків 50 невалідні—зупиняти весь процес невірно. Потрібна логіка: пропускаємо погані рядки, пишемо в лог, продовжуємо.
Воспроізводимість. Якщо процес упав на 7 000-й рядку—потрібна можливість продовжити з місця зупинки, а не починати заново.
Паралелізм. Послідовна обробка 50 000 записів по 100 мс кожна = майже 1,5 години. Розбивка на паралельні Job'и скорочує це кратно.
Архітектура: chunked + parallel jobs
Паттерн "Batch → Chunks → Jobs":
Завантаження файлу
↓
BatchImportJob (майстер-завдання)
↓ розбиває на чанки
[ChunkJob 1] [ChunkJob 2] ... [ChunkJob N] ← паралельно
↓
BatchCompletedJob (агрегація результатів)
Laravel надає Bus::batch() для цього паттерну.
Реалізація на прикладі імпорту CSV
namespace App\Services;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\LazyCollection;
class CsvImportService
{
private const CHUNK_SIZE = 500;
public function startImport(string $filePath, int $importId): string
{
$jobs = [];
// LazyCollection—потокове читання без завантаження в пам'ять
LazyCollection::make(function () use ($filePath) {
$handle = fopen($filePath, 'r');
$header = fgetcsv($handle); // перша строка—заголовки
while (($row = fgetcsv($handle)) !== false) {
yield array_combine($header, $row);
}
fclose($handle);
})
->chunk(self::CHUNK_SIZE)
->each(function ($chunk, $index) use (&$jobs, $importId) {
$jobs[] = new ProcessCsvChunkJob(
importId: $importId,
chunkIndex: $index,
rows: $chunk->values()->toArray()
);
});
$batch = Bus::batch($jobs)
->name("csv-import-{$importId}")
->allowFailures() // продовжуємо при помилці окремих Job'ів
->then(function (Batch $batch) use ($importId) {
Import::find($importId)?->update(['status' => 'completed']);
ImportCompletedEvent::dispatch($importId);
})
->catch(function (Batch $batch, \Throwable $e) use ($importId) {
Import::find($importId)?->update([
'status' => 'partially_failed',
'error_message' => $e->getMessage(),
]);
})
->finally(function (Batch $batch) use ($importId) {
$import = Import::find($importId);
$import?->update([
'total_jobs' => $batch->totalJobs,
'failed_jobs' => $batch->failedJobs,
'finished_at' => now(),
]);
})
->onQueue('batch-processing')
->dispatch();
Import::find($importId)?->update(['batch_id' => $batch->id]);
return $batch->id;
}
}
Job обробки чанка
// app/Jobs/ProcessCsvChunkJob.php
class ProcessCsvChunkJob implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public int $tries = 3;
public int $timeout = 120;
public int $backoff = 10;
public function __construct(
private int $importId,
private int $chunkIndex,
private array $rows
) {}
public function handle(): void
{
// Якщо весь batch був відмінений—зупиняємо
if ($this->batch()?->cancelled()) {
return;
}
$successCount = 0;
$errors = [];
foreach ($this->rows as $lineNum => $row) {
try {
$this->processRow($row);
$successCount++;
} catch (\Throwable $e) {
$errors[] = [
'chunk' => $this->chunkIndex,
'line' => $lineNum,
'data' => array_slice($row, 0, 3), // перші 3 поля для діагностики
'error' => $e->getMessage(),
];
}
}
// Зберігаємо статистику чанка
ImportChunkResult::create([
'import_id' => $this->importId,
'chunk_index' => $this->chunkIndex,
'processed' => count($this->rows),
'succeeded' => $successCount,
'failed' => count($errors),
'errors' => $errors,
]);
// Атомарно оновлюємо лічильники імпорту
Import::where('id', $this->importId)->increment('processed_rows', count($this->rows));
Import::where('id', $this->importId)->increment('success_rows', $successCount);
}
private function processRow(array $row): void
{
// Валідація і збереження строки тут
// Приклад:
$validated = validator($row, [
'email' => 'required|email',
'name' => 'required|string|max:255',
])->validate();
User::updateOrCreate(
['email' => $validated['email']],
['name' => $validated['name']]
);
}
}
Возобновлення переданого batch
Якщо сервер впав у середині обробки—Laravel Batch зберігає стан у таблиці job_batches. Завершені чанки не повторно запускаються. Незавершені—возобновляються автоматично при перезапуску воркера.
Принудовий рестарт незавершеного batch:
$batch = Bus::findBatch($batchId);
if ($batch && !$batch->finished()) {
// Пересоздаємо незавершені jobs
$pendingChunks = ImportChunkResult::where('import_id', $importId)
->pluck('chunk_index');
// Логіка визначення необроблених чанків і їхнього повторного диспатчу
}
Прогрес у реальному часі
// app/Http/Controllers/ImportController.php
public function progress(int $importId): JsonResponse
{
$import = Import::findOrFail($importId);
$batch = $import->batch_id ? Bus::findBatch($import->batch_id) : null;
return response()->json([
'status' => $import->status,
'processed_rows' => $import->processed_rows,
'success_rows' => $import->success_rows,
'total_rows' => $import->total_rows,
'percentage' => $import->total_rows > 0
? round($import->processed_rows / $import->total_rows * 100, 1)
: 0,
'batch' => $batch ? [
'total_jobs' => $batch->totalJobs,
'pending_jobs' => $batch->pendingJobs,
'failed_jobs' => $batch->failedJobs,
'progress' => $batch->progress(),
] : null,
]);
}
Обмеження навантаження
Для batch-очереди потрібен окремий пул воркерів з обмеженим паралелізмом, щоб не забити всю БД або CPU:
[program:batch-worker]
command=php artisan queue:work --queue=batch-processing --max-jobs=50 --sleep=3 --timeout=120
numprocs=4
autostart=true
autorestart=true
numprocs=4—чотири воркери, кожен обробляє чанки послідовно. --max-jobs=50—після 50 завдань воркер перезапускається, звільняючи пам'ять.
Обробка файлів у кількох форматах
Той же паттерн працює для зображень, JSON, XLSX. Для XLSX використовуємо PhpSpreadsheet у потоковому режимі:
use PhpOffice\PhpSpreadsheet\Reader\Xlsx;
$reader = new Xlsx();
$reader->setReadDataOnly(true);
$spreadsheet = $reader->load($filePath);
$worksheet = $spreadsheet->getActiveSheet();
$highestRow = $worksheet->getHighestDataRow();
// Читаємо чанками по 500 рядків
for ($startRow = 2; $startRow <= $highestRow; $startRow += 500) {
$endRow = min($startRow + 499, $highestRow);
$rows = $worksheet->rangeToArray("A{$startRow}:Z{$endRow}");
// диспатч Job для чанка
}
Таймлайн
Базовий імпорт CSV з чанками й прогресом—1 робочий день. Додавання возобновлення, детального лога помилок, endpoint прогресу—ще 6–8 годин. Підтримка XLSX і JSON форматів—плюс 4–6 годин.







