Реализация массового импорта товаров (Bulk Import) на сайт
Массовый импорт — это единовременная загрузка тысяч или сотен тысяч позиций. В отличие от инкрементального обновления, здесь задача другая: за разумное время обработать весь объём, не положив базу данных и не заняв все воркеры очереди на сутки.
Размер имеет значение: стратегии по объёму
| Объём | Метод | Время обработки |
|---|---|---|
| До 1 000 позиций | Синхронно в запросе | Секунды |
| 1 000 – 50 000 | Одна очередная задача с чанками | Минуты |
| 50 000 – 500 000 | Fan-out: N параллельных Jobs | 10–60 минут |
| Свыше 500 000 | Batch insert + отдельный pipeline | Часы |
Принцип chunk + queue
Файл с 100 000 строк не обрабатывается в один Job. Файл разбивается на чанки, каждый чанк — отдельный Job:
class BulkImportDispatcher
{
private const CHUNK_SIZE = 500;
public function dispatch(ImportFile $file): void
{
$import = ImportRun::create([
'file_id' => $file->id,
'status' => 'dispatching',
'total' => 0,
]);
$chunkIndex = 0;
foreach ($file->parser()->chunks(self::CHUNK_SIZE) as $chunk) {
ProcessImportChunkJob::dispatch($import->id, $chunkIndex, $chunk)
->onQueue('bulk-import');
$chunkIndex++;
}
$import->update([
'status' => 'processing',
'total_chunks' => $chunkIndex,
]);
}
}
После диспетчеризации все чанки находятся в очереди. Воркеры разбирают их параллельно.
Bulk Upsert вместо поштучных INSERT/UPDATE
Главный инструмент производительности — INSERT ... ON CONFLICT DO UPDATE (upsert). Laravel поддерживает это через Model::upsert():
class ProcessImportChunkJob implements ShouldQueue
{
public int $timeout = 120;
public function handle(): void
{
$rows = [];
foreach ($this->chunk as $item) {
$rows[] = [
'sku' => $item['sku'],
'name' => $item['name'],
'price' => $item['price'],
'qty' => $item['qty'],
'category_id' => $this->resolveCategory($item['category']),
'source_id' => $this->import->source_id,
'updated_at' => now(),
'created_at' => now(),
];
}
// Один SQL-запрос вместо 500 отдельных
Product::upsert(
$rows,
uniqueBy: ['sku'],
update: ['name', 'price', 'qty', 'category_id', 'updated_at']
);
// Обновить счётчик обработанных
DB::table('import_runs')
->where('id', $this->importId)
->increment('processed_chunks');
}
}
Одна операция upsert для 500 строк в PostgreSQL занимает ~50–200 мс — против 500 × 5 мс = 2500 мс для поштучных запросов.
Предзагрузка справочников в память
Самая дорогая операция при импорте — запросы к БД для разрешения зависимостей (категория по названию, поставщик по ID, тег по slug). Решение — загрузить все справочники в память перед обработкой:
class ImportContext
{
private array $categoryMap; // ['название' => id]
private array $supplierMap; // ['код' => id]
private array $existingSkus; // ['sku' => product_id]
public function preload(int $sourceId): void
{
$this->categoryMap = Category::pluck('id', 'name_normalized')->all();
$this->supplierMap = Supplier::pluck('id', 'code')->all();
$this->existingSkus = Product::where('source_id', $sourceId)
->pluck('id', 'sku')->all();
}
public function resolveCategoryId(string $name): ?int
{
return $this->categoryMap[mb_strtolower(trim($name))] ?? null;
}
public function productExists(string $sku): bool
{
return isset($this->existingSkus[$sku]);
}
}
Для 200 000 SKU этот словарь занимает ~10–20 МБ памяти — допустимо для воркера.
Финальный Job: агрегация результатов
Все чанки обработаны — нужно обновить статус импорта. Используем chain или polling:
// Вариант: финальный Job в цепочке, ждёт завершения всех предыдущих
Bus::batch(
collect($chunks)->map(fn($chunk, $i) => new ProcessImportChunkJob($importId, $i, $chunk))
)->then(function (Batch $batch) use ($importId) {
ImportRun::find($importId)->update([
'status' => 'completed',
'completed_at' => now(),
]);
// Запустить пересчёт: статус наличия, денормализация, поиск
PostImportPipeline::dispatch($importId);
})->onQueue('bulk-import')->dispatch();
Bus::batch() — встроенный в Laravel механизм группировки задач с коллбэком на завершение.
Post-import pipeline
После завершения импорта нужно обновить денормализованные данные:
class PostImportPipeline
{
public function handle(int $importId): void
{
$productIds = ImportedProduct::where('import_id', $importId)
->pluck('product_id');
// Пересчёт наличия и видимости
Product::whereIn('id', $productIds)->each(function (Product $p) {
$p->update(['in_stock' => $p->qty > 0]);
});
// Обновление поискового индекса (Scout / Elasticsearch)
Product::whereIn('id', $productIds)->searchable();
// Пересчёт фасетных данных
FilterValueRebuilder::dispatch($productIds);
}
}
Мониторинг прогресса
В admin-интерфейсе оператор должен видеть прогресс в реальном времени:
Импорт #1847: 68 420 / 100 000 (68.4%)
Время старта: 14:32:10
Прошло: 4 мин 12 сек
Ожидаемое завершение: ~14:38
Ошибок: 14
Данные из import_runs:
SELECT
total_items,
processed_items,
ROUND(processed_items::float / NULLIF(total_items, 0) * 100, 1) AS pct,
errors_count,
started_at,
now() - started_at AS elapsed
FROM import_runs WHERE id = $1;
Ограничение нагрузки
Во время массового импорта важно не деградировать сайт:
- Выделить отдельную очередь
bulk-importс ограниченным числом воркеров (2–4) - Основную очередь
defaultне трогать - Запускать тяжёлый импорт в ночное время через Laravel Scheduler
- Использовать транзакции по чанку, не по всему файлу
Сроки реализации
- Chunk-диспетчер, bulk upsert, предзагрузка справочников — 2 дня
- Bus::batch с финальным коллбэком, post-import pipeline — +1 день
- Прогресс в admin UI, ограничение нагрузки, ночной планировщик — +1 день







