Реалізація інкрементального імпорту товарів (лише зміни)
Повна перезагрузка каталогу кожен раз — марно. При 100 000 позицій за годину змінюється 1–5% даних, а система обробляє всі 100 000. Інкрементальний імпорт розв'язує це: передаються та обробляються лише товари, які реально змінилися.
Стратегії визначення змін
1. Часова мітка updated_at
Постачальник підтримує фільтр за датою зміни — найпоширеніший підхід:
GET /api/products?updated_after=2024-01-15T10:00:00Z&page=1&per_page=200
Система запам'ятовує час останньої успішної синхронізації та передає його при наступному запиті.
2. Курсор / change log
Постачальник ведить журнал змін зі зростаючим ID:
GET /api/changes?since_id=48291&limit=500
Надійніше, ніж timestamp: не пропускає зміни, які сталися під час обробки.
3. Порівняння хешів
Коли джерело не підтримує фільтрацію по змінам — скачуємо повний файл, але обробляємо лише зміни:
$hash = md5(serialize([
$row['price'], $row['qty'], $row['name'], $row['description']
]));
Рядок обробляється лише якщо хеш змінився.
4. Файли Diff
Постачальник публікує щогодинний diff-файл замість повного прайсу:
<changes>
<updated id="SKU-123"><price>4990</price><qty>15</qty></updated>
<updated id="SKU-456"><qty>0</qty></updated>
<deleted id="SKU-789"/>
<created id="SKU-999"><!-- повні дані --></created>
</changes>
Реалізація State Tracker
Стан синхронізації зберігається в БД:
CREATE TABLE import_sync_state (
source_id int PRIMARY KEY REFERENCES import_sources(id),
last_sync_at timestamptz,
last_cursor varchar(200), -- для cursor-based
last_change_id bigint, -- для changelog-based
items_synced bigint DEFAULT 0,
updated_at timestamptz DEFAULT now()
);
class SyncStateManager
{
public function getLastSyncAt(int $sourceId): ?\DateTimeInterface
{
return ImportSyncState::find($sourceId)?->last_sync_at;
}
public function markSyncStarted(int $sourceId): void
{
// Запам'ятовуємо час ПОЧАТКУ синхронізації, не КІНЦЯ
// Критично: за час обробки можуть появитися нові зміни
Cache::put("sync_start_{$sourceId}", now(), 3600);
}
public function markSyncCompleted(int $sourceId): void
{
ImportSyncState::updateOrCreate(
['source_id' => $sourceId],
['last_sync_at' => Cache::get("sync_start_{$sourceId}")]
);
}
}
Важливо: фіксуємо час початку синхронізації, а не кінця. Якщо за час обробки з'явилися нові зміни — вони попадуть у наступний цикл.
Pipeline інкрементального імпорту
class IncrementalImportJob implements ShouldQueue
{
public function handle(
SyncStateManager $state,
SupplierApiClient $client,
IncrementalProductSync $sync,
): void {
$since = $state->getLastSyncAt($this->sourceId);
$state->markSyncStarted($this->sourceId);
$stats = ['created' => 0, 'updated' => 0, 'deleted' => 0, 'skipped' => 0];
foreach ($client->fetchUpdatedSince($since) as $item) {
$result = $sync->process($item, $this->sourceId);
$stats[$result]++;
}
$state->markSyncCompleted($this->sourceId);
$this->logResult($stats);
}
}
Визначення типу зміни
class IncrementalProductSync
{
public function process(array $item, int $sourceId): string
{
// Видалені позиції
if ($item['deleted'] ?? false) {
Product::where('sku', $item['sku'])
->where('source_id', $sourceId)
->update(['deleted_at' => now()]);
return 'deleted';
}
$product = Product::where('sku', $item['sku'])
->where('source_id', $sourceId)
->first();
if (!$product) {
// Новий товар
Product::create($this->buildProductData($item, $sourceId));
return 'created';
}
// Перевіряємо хеш — оновлюємо лише якщо щось змінилось
$newHash = $this->computeHash($item);
if ($product->content_hash === $newHash) {
return 'skipped';
}
$product->update($this->buildProductData($item, $sourceId) + [
'content_hash' => $newHash,
]);
return 'updated';
}
private function computeHash(array $item): string
{
return md5(json_encode([
$item['price'] ?? null,
$item['qty'] ?? null,
$item['name'] ?? null,
]));
}
}
Порівняння хешів при відсутності delta-API
Коли джерело не підтримує фільтрацію — скачуємо повний файл, але обробляємо лише зміни:
class HashBasedDeltaProcessor
{
public function process(iterable $allItems, int $sourceId): DeltaResult
{
// Завантажуємо всі поточні хеші з БД (один запит)
$storedHashes = Product::where('source_id', $sourceId)
->pluck('content_hash', 'sku')
->all();
$toCreate = $toUpdate = $unchanged = 0;
$createBatch = $updateBatch = [];
foreach ($allItems as $item) {
$newHash = $this->computeHash($item);
$sku = $item['sku'];
if (!isset($storedHashes[$sku])) {
$createBatch[] = $item;
$toCreate++;
} elseif ($storedHashes[$sku] !== $newHash) {
$updateBatch[] = $item;
$toUpdate++;
} else {
$unchanged++;
}
}
// Batch insert/update лише змінених
if ($createBatch) Product::upsert($this->prepareRows($createBatch, $sourceId), ['sku'], [...]);
if ($updateBatch) Product::upsert($this->prepareRows($updateBatch, $sourceId), ['sku'], [...]);
return new DeltaResult($toCreate, $toUpdate, $unchanged);
}
}
Виявлення видалених позицій
Якщо джерело не надсилає явних сигналів про видалення — використовуємо «anti-join» підхід:
public function detectDeleted(array $currentSkus, int $sourceId): int
{
// SKU, які були в попередньому імпорті, але відсутні у поточному
$deletedCount = Product::where('source_id', $sourceId)
->whereNotIn('sku', $currentSkus)
->whereNull('deleted_at')
->update(['deleted_at' => now()]);
return $deletedCount;
}
Для 100 000 SKU запит WHERE sku NOT IN (...) неефективний. Краще тимчасова таблиця:
CREATE TEMP TABLE current_import_skus (sku varchar(100));
COPY current_import_skus FROM STDIN;
-- завантажуємо всі SKU поточного імпорту
UPDATE products
SET deleted_at = now()
WHERE source_id = $1
AND deleted_at IS NULL
AND sku NOT IN (SELECT sku FROM current_import_skus);
DROP TABLE current_import_skus;
Конкурентність: захист від подвійного запуску
public function handle(): void
{
$lock = Cache::lock("import_sync_{$this->sourceId}", 3600);
if (!$lock->get()) {
Log::info("Import for source {$this->sourceId} already running, skipping");
return;
}
try {
$this->runSync();
} finally {
$lock->release();
}
}
Терміни реалізації
- Timestamp-based інкремент, state manager, хеш-порівняння — 2 дні
- Виявлення видалених позицій (temp table), lock проти подвійного запуску — +1 день
- Cursor-based синхронізація + diff-файли + звітність по delta — +1–2 дні







