Реалізація черги імпорту товарів (фонова обробка)
Імпорт тисяч товарів у синхронному режимі — це таймаут, завислий процес та зламаний користувацький досвід. Правильне рішення: прийняти файл, поставити завдання в чергу, повернути відповідь негайно. Обробка йде у фоні, прогрес відображається в інтерфейсі.
Архітектура
HTTP Upload (файл/URL)
↓
Import Job (запис у чергу)
↓
Queue (Redis / SQS / RabbitMQ)
↓
Worker Process (окремий процес/контейнер)
↓
Chunk Processing (батчі по 500 товарів)
↓
Database (upsert)
↓
Progress Event (WebSocket / SSE → UI)
Реалізація на Laravel
// Контроллер — приймає файл та ставить завдання у чергу
class ProductImportController extends Controller
{
public function upload(Request $request): JsonResponse
{
$path = $request->file('file')->store('imports');
$import = ImportJob::create([
'file_path' => $path,
'status' => 'pending',
'total' => 0,
'processed' => 0,
'errors' => 0,
]);
ProcessProductImport::dispatch($import->id);
return response()->json(['import_id' => $import->id]);
}
}
// Job — обробка у фоні
class ProcessProductImport implements ShouldQueue
{
use Dispatchable, InteractsWithQueue;
public int $timeout = 3600; // 1 година
public int $tries = 3;
public function handle(): void
{
$import = ImportJob::findOrFail($this->importId);
$import->update(['status' => 'processing', 'started_at' => now()]);
$reader = new CsvReader(storage_path('app/' . $import->file_path));
$total = $reader->count();
$import->update(['total' => $total]);
foreach ($reader->chunk(500) as $chunkIndex => $rows) {
try {
DB::transaction(function () use ($rows) {
foreach ($rows as $row) {
Product::updateOrCreate(
['sku' => $row['sku']],
$this->mapRow($row)
);
}
});
$processed = ($chunkIndex + 1) * 500;
$import->update(['processed' => min($processed, $total)]);
// Подія прогресу
event(new ImportProgressUpdated($import->id, min($processed, $total), $total));
} catch (\Exception $e) {
$import->increment('errors');
Log::error("Import chunk failed", ['chunk' => $chunkIndex, 'error' => $e->getMessage()]);
}
}
$import->update(['status' => 'completed', 'finished_at' => now()]);
}
}
WebSocket / SSE для прогресу
// Laravel Broadcasting: подія прогресу
class ImportProgressUpdated implements ShouldBroadcast
{
public function broadcastOn(): Channel
{
return new PrivateChannel("import.{$this->importId}");
}
public function broadcastWith(): array
{
return [
'processed' => $this->processed,
'total' => $this->total,
'percent' => round($this->processed / $this->total * 100),
];
}
}
На фронтенді — підписка через Laravel Echo або нативний EventSource (SSE).
Паралельна обробка
Для дуже великих файлів (100 000+ товарів) — розбивка на незалежні частини:
// Dispatch Fan-out: один координатор → N воркерів
class DispatchImportChunks implements ShouldQueue
{
public function handle(): void
{
$chunks = $this->splitFile($this->filePath, chunkSize: 1000);
Bus::batch(
array_map(fn($chunk) => new ProcessImportChunk($chunk), $chunks)
)
->then(fn(Batch $batch) => $this->onComplete($batch))
->catch(fn(Batch $batch, Throwable $e) => $this->onError($batch, $e))
->dispatch();
}
}
Bus::batch() у Laravel обробляє чанки паралельно через кілька воркерів та відслідковує загальний прогрес.
Обробка помилок
Строки з помилками не перериває імпорт, а зберігаються у таблицю import_errors:
CREATE TABLE import_errors (
id BIGSERIAL PRIMARY KEY,
import_id BIGINT,
row_number INT,
row_data JSONB,
error_msg TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
Після завершення користувач скачує CSV з помилками для виправлення.
Терміни
Система черги імпорту з прогресом у реальному часі та логуванням помилок: 4–6 робочих днів.







