Реалізація інкрементального імпорту товарів (тільки зміни)

Наша компанія займається розробкою, підтримкою та обслуговуванням сайтів будь-якої складності. Від простих односторінкових сайтів до масштабних кластерних систем, побудованих на мікро сервісах. Досвід розробників підтверджено сертифікатами від вендорів.

Розробка та обслуговування будь-яких видів сайтів:

Інформаційні сайти або веб-програми
Сайти візитки, landing page, корпоративні сайти, онлайн каталоги, квіз, промо-сайти, блоги, ресурси новин, інформаційні портали, форуми, агрегатори
Сайти або веб-програми електронної комерції
Інтернет-магазини, B2B-портали, маркетплейси, онлайн-обмінники, кешбек-сайти, біржі, дропшиппінг-платформи, парсери товарів
Веб-програми для управління бізнес-процесами
CRM-системи, ERP-системи, корпоративні портали, системи управління виробництвом, парсери інформації
Сайти або веб-програми електронних послуг
Дошки оголошень, онлайн-школи, онлайн-кінотеатри, конструктори сайтів, портали надання електронних послуг, відеохостинги, тематичні портали

Це лише деякі з технічних типів сайтів, з якими ми працюємо, і кожен із них може мати свої специфічні особливості та функціональність, а також бути адаптованим під конкретні потреби та цілі клієнта.

Пропоновані послуги
Показано 1 з 1 послугУсі 2065 послуг
Реалізація інкрементального імпорту товарів (тільки зміни)
Середня
~3-5 робочих днів
Часті питання

Наші компетенції:

Етапи розробки

Останні роботи

  • image_website-b2b-advance_0.png
    Розробка сайту компанії B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    874
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Розробка веб-сайту для компанії ФІКСПЕР
    851

Реалізація інкрементального імпорту товарів (лише зміни)

Повна перезагрузка каталогу кожен раз — марно. При 100 000 позицій за годину змінюється 1–5% даних, а система обробляє всі 100 000. Інкрементальний імпорт розв'язує це: передаються та обробляються лише товари, які реально змінилися.

Стратегії визначення змін

1. Часова мітка updated_at

Постачальник підтримує фільтр за датою зміни — найпоширеніший підхід:

GET /api/products?updated_after=2024-01-15T10:00:00Z&page=1&per_page=200

Система запам'ятовує час останньої успішної синхронізації та передає його при наступному запиті.

2. Курсор / change log

Постачальник ведить журнал змін зі зростаючим ID:

GET /api/changes?since_id=48291&limit=500

Надійніше, ніж timestamp: не пропускає зміни, які сталися під час обробки.

3. Порівняння хешів

Коли джерело не підтримує фільтрацію по змінам — скачуємо повний файл, але обробляємо лише зміни:

$hash = md5(serialize([
    $row['price'], $row['qty'], $row['name'], $row['description']
]));

Рядок обробляється лише якщо хеш змінився.

4. Файли Diff

Постачальник публікує щогодинний diff-файл замість повного прайсу:

<changes>
  <updated id="SKU-123"><price>4990</price><qty>15</qty></updated>
  <updated id="SKU-456"><qty>0</qty></updated>
  <deleted id="SKU-789"/>
  <created id="SKU-999"><!-- повні дані --></created>
</changes>

Реалізація State Tracker

Стан синхронізації зберігається в БД:

CREATE TABLE import_sync_state (
    source_id       int PRIMARY KEY REFERENCES import_sources(id),
    last_sync_at    timestamptz,
    last_cursor     varchar(200),   -- для cursor-based
    last_change_id  bigint,         -- для changelog-based
    items_synced    bigint DEFAULT 0,
    updated_at      timestamptz DEFAULT now()
);
class SyncStateManager
{
    public function getLastSyncAt(int $sourceId): ?\DateTimeInterface
    {
        return ImportSyncState::find($sourceId)?->last_sync_at;
    }

    public function markSyncStarted(int $sourceId): void
    {
        // Запам'ятовуємо час ПОЧАТКУ синхронізації, не КІНЦЯ
        // Критично: за час обробки можуть появитися нові зміни
        Cache::put("sync_start_{$sourceId}", now(), 3600);
    }

    public function markSyncCompleted(int $sourceId): void
    {
        ImportSyncState::updateOrCreate(
            ['source_id' => $sourceId],
            ['last_sync_at' => Cache::get("sync_start_{$sourceId}")]
        );
    }
}

Важливо: фіксуємо час початку синхронізації, а не кінця. Якщо за час обробки з'явилися нові зміни — вони попадуть у наступний цикл.

Pipeline інкрементального імпорту

class IncrementalImportJob implements ShouldQueue
{
    public function handle(
        SyncStateManager       $state,
        SupplierApiClient      $client,
        IncrementalProductSync $sync,
    ): void {
        $since = $state->getLastSyncAt($this->sourceId);
        $state->markSyncStarted($this->sourceId);

        $stats = ['created' => 0, 'updated' => 0, 'deleted' => 0, 'skipped' => 0];

        foreach ($client->fetchUpdatedSince($since) as $item) {
            $result = $sync->process($item, $this->sourceId);
            $stats[$result]++;
        }

        $state->markSyncCompleted($this->sourceId);
        $this->logResult($stats);
    }
}

Визначення типу зміни

class IncrementalProductSync
{
    public function process(array $item, int $sourceId): string
    {
        // Видалені позиції
        if ($item['deleted'] ?? false) {
            Product::where('sku', $item['sku'])
                ->where('source_id', $sourceId)
                ->update(['deleted_at' => now()]);
            return 'deleted';
        }

        $product = Product::where('sku', $item['sku'])
            ->where('source_id', $sourceId)
            ->first();

        if (!$product) {
            // Новий товар
            Product::create($this->buildProductData($item, $sourceId));
            return 'created';
        }

        // Перевіряємо хеш — оновлюємо лише якщо щось змінилось
        $newHash = $this->computeHash($item);
        if ($product->content_hash === $newHash) {
            return 'skipped';
        }

        $product->update($this->buildProductData($item, $sourceId) + [
            'content_hash' => $newHash,
        ]);
        return 'updated';
    }

    private function computeHash(array $item): string
    {
        return md5(json_encode([
            $item['price'] ?? null,
            $item['qty']   ?? null,
            $item['name']  ?? null,
        ]));
    }
}

Порівняння хешів при відсутності delta-API

Коли джерело не підтримує фільтрацію — скачуємо повний файл, але обробляємо лише зміни:

class HashBasedDeltaProcessor
{
    public function process(iterable $allItems, int $sourceId): DeltaResult
    {
        // Завантажуємо всі поточні хеші з БД (один запит)
        $storedHashes = Product::where('source_id', $sourceId)
            ->pluck('content_hash', 'sku')
            ->all();

        $toCreate = $toUpdate = $unchanged = 0;
        $createBatch = $updateBatch = [];

        foreach ($allItems as $item) {
            $newHash = $this->computeHash($item);
            $sku     = $item['sku'];

            if (!isset($storedHashes[$sku])) {
                $createBatch[] = $item;
                $toCreate++;
            } elseif ($storedHashes[$sku] !== $newHash) {
                $updateBatch[] = $item;
                $toUpdate++;
            } else {
                $unchanged++;
            }
        }

        // Batch insert/update лише змінених
        if ($createBatch) Product::upsert($this->prepareRows($createBatch, $sourceId), ['sku'], [...]);
        if ($updateBatch) Product::upsert($this->prepareRows($updateBatch, $sourceId), ['sku'], [...]);

        return new DeltaResult($toCreate, $toUpdate, $unchanged);
    }
}

Виявлення видалених позицій

Якщо джерело не надсилає явних сигналів про видалення — використовуємо «anti-join» підхід:

public function detectDeleted(array $currentSkus, int $sourceId): int
{
    // SKU, які були в попередньому імпорті, але відсутні у поточному
    $deletedCount = Product::where('source_id', $sourceId)
        ->whereNotIn('sku', $currentSkus)
        ->whereNull('deleted_at')
        ->update(['deleted_at' => now()]);

    return $deletedCount;
}

Для 100 000 SKU запит WHERE sku NOT IN (...) неефективний. Краще тимчасова таблиця:

CREATE TEMP TABLE current_import_skus (sku varchar(100));
COPY current_import_skus FROM STDIN;
-- завантажуємо всі SKU поточного імпорту

UPDATE products
SET deleted_at = now()
WHERE source_id = $1
  AND deleted_at IS NULL
  AND sku NOT IN (SELECT sku FROM current_import_skus);

DROP TABLE current_import_skus;

Конкурентність: захист від подвійного запуску

public function handle(): void
{
    $lock = Cache::lock("import_sync_{$this->sourceId}", 3600);

    if (!$lock->get()) {
        Log::info("Import for source {$this->sourceId} already running, skipping");
        return;
    }

    try {
        $this->runSync();
    } finally {
        $lock->release();
    }
}

Терміни реалізації

  • Timestamp-based інкремент, state manager, хеш-порівняння — 2 дні
  • Виявлення видалених позицій (temp table), lock проти подвійного запуску — +1 день
  • Cursor-based синхронізація + diff-файли + звітність по delta — +1–2 дні