Реалізація зберігання та обробки скрейплених даних
Зберегти дані у CSV — не система зберігання. Промислова система повинна підтримувати інкрементальне оновлення, версіонування змін, повнотекстовий пошук та ефективні вибірки за фільтрами.
Схема зберігання в PostgreSQL
-- Основна таблиця з історією змін
CREATE TABLE scraped_items (
id BIGSERIAL PRIMARY KEY,
source_id INTEGER REFERENCES sources(id),
external_id TEXT NOT NULL, -- ID на стороні джерела
url TEXT NOT NULL,
data JSONB NOT NULL, -- гнучка схема для різних джерел
data_hash CHAR(64) NOT NULL, -- SHA-256 від data для детекції змін
first_seen TIMESTAMPTZ DEFAULT NOW(),
last_seen TIMESTAMPTZ DEFAULT NOW(),
changed_at TIMESTAMPTZ,
UNIQUE (source_id, external_id)
);
-- Історія змін
CREATE TABLE scraped_items_history (
id BIGSERIAL PRIMARY KEY,
item_id BIGINT REFERENCES scraped_items(id),
data JSONB NOT NULL,
recorded_at TIMESTAMPTZ DEFAULT NOW()
);
-- Індекси
CREATE INDEX ON scraped_items USING GIN (data); -- пошук по JSONB
CREATE INDEX ON scraped_items (source_id, last_seen);
CREATE INDEX ON scraped_items USING GIN (
to_tsvector('russian', data->>'title' || ' ' || COALESCE(data->>'description', ''))
);
Логіка оновлення
def upsert_item(source_id, external_id, url, data):
data_hash = hashlib.sha256(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
existing = db.query(
'SELECT id, data_hash FROM scraped_items WHERE source_id=%s AND external_id=%s',
(source_id, external_id)
).fetchone()
if existing is None:
# новий елемент
db.execute(
'INSERT INTO scraped_items (source_id, external_id, url, data, data_hash) '
'VALUES (%s, %s, %s, %s, %s)',
(source_id, external_id, url, json.dumps(data), data_hash)
)
elif existing['data_hash'] != data_hash:
# дані змінились — зберігаємо історію
db.execute(
'INSERT INTO scraped_items_history (item_id, data) '
'SELECT id, data FROM scraped_items WHERE id=%s',
(existing['id'],)
)
db.execute(
'UPDATE scraped_items SET data=%s, data_hash=%s, last_seen=NOW(), changed_at=NOW() '
'WHERE id=%s',
(json.dumps(data), data_hash, existing['id'])
)
else:
# дані не змінились — оновлюємо тільки last_seen
db.execute(
'UPDATE scraped_items SET last_seen=NOW() WHERE id=%s',
(existing['id'],)
)
Постобробка даних
Після збереження сирих даних запускаються pipeline обробки:
- Нормалізація — приведення цін до однієї валюти, телефонів до E.164, адрес до стандартного формату
- Збагачення — геокодування адрес, визначення категорії через класифікатор
- Агрегація — підрахунок статистики за джерелом: середня ціна, розподіл категорій
Архівування та TTL
Старі дані (не видіні більше 90 днів) переводяться в архівний статус або видаляються — залежить від вимог. Історія змін зберігається довше основних даних.
Експорт
-
CSV/XLSX — через
pandas.to_excel()абоcsv.DictWriter - REST API — FastAPI/Laravel з фільтрацією, пагінацією, сортуванням
- Webhook — відправка нових/змінених записів у стороннню систему в режимі реального часу
Час реалізації системи зберігання з історією змін та API: 4–6 робочих днів.







