Реализация очереди импорта товаров (фоновая обработка)
Импорт тысяч товаров в синхронном режиме — это таймаут, зависший процесс и сломанный пользовательский опыт. Правильное решение: принять файл, поставить задачу в очередь, вернуть ответ немедленно. Обработка идёт в фоне, прогресс отображается в интерфейсе.
Архитектура
HTTP Upload (файл/URL)
↓
Import Job (запись в очередь)
↓
Queue (Redis / SQS / RabbitMQ)
↓
Worker Process (отдельный процесс/контейнер)
↓
Chunk Processing (батчи по 500 товаров)
↓
Database (upsert)
↓
Progress Event (WebSocket / SSE → UI)
Реализация на Laravel
// Контроллер — принимает файл и ставит задачу
class ProductImportController extends Controller
{
public function upload(Request $request): JsonResponse
{
$path = $request->file('file')->store('imports');
$import = ImportJob::create([
'file_path' => $path,
'status' => 'pending',
'total' => 0,
'processed' => 0,
'errors' => 0,
]);
ProcessProductImport::dispatch($import->id);
return response()->json(['import_id' => $import->id]);
}
}
// Job — обработка в фоне
class ProcessProductImport implements ShouldQueue
{
use Dispatchable, InteractsWithQueue;
public int $timeout = 3600; // 1 час
public int $tries = 3;
public function handle(): void
{
$import = ImportJob::findOrFail($this->importId);
$import->update(['status' => 'processing', 'started_at' => now()]);
$reader = new CsvReader(storage_path('app/' . $import->file_path));
$total = $reader->count();
$import->update(['total' => $total]);
foreach ($reader->chunk(500) as $chunkIndex => $rows) {
try {
DB::transaction(function () use ($rows) {
foreach ($rows as $row) {
Product::updateOrCreate(
['sku' => $row['sku']],
$this->mapRow($row)
);
}
});
$processed = ($chunkIndex + 1) * 500;
$import->update(['processed' => min($processed, $total)]);
// Событие прогресса
event(new ImportProgressUpdated($import->id, min($processed, $total), $total));
} catch (\Exception $e) {
$import->increment('errors');
Log::error("Import chunk failed", ['chunk' => $chunkIndex, 'error' => $e->getMessage()]);
}
}
$import->update(['status' => 'completed', 'finished_at' => now()]);
}
}
WebSocket / SSE для прогресса
// Laravel Broadcasting: событие прогресса
class ImportProgressUpdated implements ShouldBroadcast
{
public function broadcastOn(): Channel
{
return new PrivateChannel("import.{$this->importId}");
}
public function broadcastWith(): array
{
return [
'processed' => $this->processed,
'total' => $this->total,
'percent' => round($this->processed / $this->total * 100),
];
}
}
На фронтенде — подписка через Laravel Echo или нативный EventSource (SSE).
Параллельная обработка
Для очень больших файлов (100 000+ товаров) — разбивка на независимые части:
// Dispatch Fan-out: один координатор → N воркеров
class DispatchImportChunks implements ShouldQueue
{
public function handle(): void
{
$chunks = $this->splitFile($this->filePath, chunkSize: 1000);
Bus::batch(
array_map(fn($chunk) => new ProcessImportChunk($chunk), $chunks)
)
->then(fn(Batch $batch) => $this->onComplete($batch))
->catch(fn(Batch $batch, Throwable $e) => $this->onError($batch, $e))
->dispatch();
}
}
Bus::batch() в Laravel обрабатывает чанки параллельно через несколько воркеров и отслеживает общий прогресс.
Обработка ошибок
Строки с ошибками не прерывают импорт, а сохраняются в таблицу import_errors:
CREATE TABLE import_errors (
id BIGSERIAL PRIMARY KEY,
import_id BIGINT,
row_number INT,
row_data JSONB,
error_msg TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
После завершения пользователь скачивает CSV с ошибками для исправления.
Сроки
Система очереди импорта с прогрессом в реальном времени и логированием ошибок: 4–6 рабочих дней.







