Реализация хранения и обработки спарсенных данных
Сохранить данные в CSV — не система хранения. Промышленная система должна поддерживать инкрементальное обновление, версионирование изменений, полнотекстовый поиск и эффективные выборки по фильтрам.
Схема хранения в PostgreSQL
-- Основная таблица с историей изменений
CREATE TABLE scraped_items (
id BIGSERIAL PRIMARY KEY,
source_id INTEGER REFERENCES sources(id),
external_id TEXT NOT NULL, -- ID на стороне источника
url TEXT NOT NULL,
data JSONB NOT NULL, -- гибкая схема для разных источников
data_hash CHAR(64) NOT NULL, -- SHA-256 от data для детекции изменений
first_seen TIMESTAMPTZ DEFAULT NOW(),
last_seen TIMESTAMPTZ DEFAULT NOW(),
changed_at TIMESTAMPTZ,
UNIQUE (source_id, external_id)
);
-- История изменений
CREATE TABLE scraped_items_history (
id BIGSERIAL PRIMARY KEY,
item_id BIGINT REFERENCES scraped_items(id),
data JSONB NOT NULL,
recorded_at TIMESTAMPTZ DEFAULT NOW()
);
-- Индексы
CREATE INDEX ON scraped_items USING GIN (data); -- поиск по JSONB
CREATE INDEX ON scraped_items (source_id, last_seen);
CREATE INDEX ON scraped_items USING GIN (
to_tsvector('russian', data->>'title' || ' ' || COALESCE(data->>'description', ''))
);
Логика обновления
def upsert_item(source_id, external_id, url, data):
data_hash = hashlib.sha256(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
existing = db.query(
'SELECT id, data_hash FROM scraped_items WHERE source_id=%s AND external_id=%s',
(source_id, external_id)
).fetchone()
if existing is None:
# новый элемент
db.execute(
'INSERT INTO scraped_items (source_id, external_id, url, data, data_hash) '
'VALUES (%s, %s, %s, %s, %s)',
(source_id, external_id, url, json.dumps(data), data_hash)
)
elif existing['data_hash'] != data_hash:
# данные изменились — сохраняем историю
db.execute(
'INSERT INTO scraped_items_history (item_id, data) '
'SELECT id, data FROM scraped_items WHERE id=%s',
(existing['id'],)
)
db.execute(
'UPDATE scraped_items SET data=%s, data_hash=%s, last_seen=NOW(), changed_at=NOW() '
'WHERE id=%s',
(json.dumps(data), data_hash, existing['id'])
)
else:
# данные не изменились — обновляем only last_seen
db.execute(
'UPDATE scraped_items SET last_seen=NOW() WHERE id=%s',
(existing['id'],)
)
Постобработка данных
После сохранения сырых данных запускаются pipeline обработки:
- Нормализация — приведение цен к одной валюте, телефонов к E.164, адресов к стандартному формату
- Обогащение — геокодирование адресов, определение категории через классификатор
- Агрегация — подсчёт статистики по источнику: средняя цена, распределение категорий
Архивирование и TTL
Старые данные (не виденные более 90 дней) переводятся в архивный статус или удаляются — зависит от требований. История изменений хранится дольше основных данных.
Экспорт
-
CSV/XLSX — через
pandas.to_excel()илиcsv.DictWriter - REST API — FastAPI/Laravel с фильтрацией, пагинацией, сортировкой
- Webhook — отправка новых/изменённых записей в стороннюю систему в реальном времени
Время реализации системы хранения с историей изменений и API: 4–6 рабочих дней.







