Реалізація міграції даних між типами БД (PostgreSQL ↔ MySQL ↔ MongoDB)
Перехід між різними СУБД — один з найскладніших типів міграцій. Відмінності у типах даних, поведінці NULL, автоінкременти, підтримка JSON та семантика транзакцій потребують ретельного маппінгу та тестування.
Поширені сценарії
- MySQL → PostgreSQL (вихід з vendor lock-in, краща підтримка JSON/JSONB, window functions)
- MongoDB → PostgreSQL (нормалізація даних, ACID-транзакції)
- PostgreSQL → MongoDB (shardding, гнучка схема для певних сутностей)
- MySQL → MySQL інша мажорна версія (через dump/restore з трансформацією)
MySQL → PostgreSQL
Інструмент: pgloader
# Установка
apt install pgloader
# Базова міграція
pgloader mysql://user:pass@mysql-host/myapp \
postgresql://user:pass@pg-host/myapp
pgloader автоматично:
- Конвертує типи даних
- Мігрує індекси та первинні ключі
- Переносить зовнішні ключі
- Обробляє
AUTO_INCREMENT→SERIAL/BIGSERIAL
Користувацька конфігурація pgloader:
LOAD DATABASE
FROM mysql://user:pass@mysql-host/myapp
INTO postgresql://user:pass@pg-host/myapp
WITH include no drop,
create tables,
create indexes,
reset sequences
SET work_mem to '256MB',
maintenance_work_mem to '512MB'
CAST type datetime to timestamptz using midnight-in-utc,
type tinyint(1) to boolean using tinyint-to-boolean,
type enum to text,
column orders.status to text
ALTER SCHEMA 'myapp' RENAME TO 'public'
EXCLUDING TABLE NAMES MATCHING 'cache_*', 'sessions'
;
Проблеми при MySQL → PostgreSQL
| Проблема | MySQL | PostgreSQL | Рішення |
|---|---|---|---|
| Case sensitivity | user = User |
user ≠ User |
Нормалізувати регістр |
| ENUM | Native type | Нема нативного | Конвертувати в text + CHECK |
| Datetime | Нема timezone | timestamptz | Явно вказати часовий пояс |
| GROUP BY | Гнучкий | Строгий (ONLY_FULL_GROUP_BY) | Переписати запити |
| Zero values | 0000-00-00 |
Не підтримується | Конвертувати на NULL |
| Backtick quotes | Допустимі | Не допустимі | Замінити на " |
MongoDB → PostgreSQL
ETL-підхід з Python
from pymongo import MongoClient
import psycopg2
from psycopg2.extras import execute_batch
import json
mongo = MongoClient('mongodb://localhost:27017')
pg = psycopg2.connect('host=pg-host dbname=myapp user=app')
# Вихідна колекція MongoDB
source = mongo.myapp.users
cursor = pg.cursor()
batch = []
for doc in source.find():
batch.append((
str(doc['_id']),
doc.get('email'),
doc.get('name'),
json.dumps(doc.get('metadata', {})), # JSONB у PG
doc.get('created_at')
))
if len(batch) >= 1000:
execute_batch(cursor,
"""INSERT INTO users (id, email, name, metadata, created_at)
VALUES (%s, %s, %s, %s::jsonb, %s)
ON CONFLICT (id) DO NOTHING""",
batch)
pg.commit()
batch = []
# Останній батч
if batch:
execute_batch(cursor, query, batch)
pg.commit()
Нормалізація вкладених документів
MongoDB-документ:
{
"_id": "user_123",
"email": "[email protected]",
"addresses": [
{ "type": "home", "city": "Moscow", "zip": "101000" },
{ "type": "work", "city": "St. Petersburg", "zip": "190000" }
]
}
PostgreSQL-схема:
CREATE TABLE users (
id TEXT PRIMARY KEY,
email TEXT UNIQUE NOT NULL
);
CREATE TABLE user_addresses (
id BIGSERIAL PRIMARY KEY,
user_id TEXT REFERENCES users(id),
type TEXT,
city TEXT,
zip TEXT
);
Трансформація при міграції:
for doc in source.find():
cursor.execute("INSERT INTO users (id, email) VALUES (%s, %s)",
(str(doc['_id']), doc['email']))
for addr in doc.get('addresses', []):
cursor.execute(
"INSERT INTO user_addresses (user_id, type, city, zip) VALUES (%s, %s, %s, %s)",
(str(doc['_id']), addr.get('type'), addr.get('city'), addr.get('zip'))
)
PostgreSQL → MySQL
Прямого шляху через mysqldump з трансформацією не існує. Використовуйте:
- Експорт в CSV:
COPY table TO '/tmp/table.csv' CSV HEADER - Створити схему вручну з урахуванням типів MySQL
- Імпорт:
LOAD DATA INFILE '/tmp/table.csv' INTO TABLE table FIELDS TERMINATED BY ',' ENCLOSED BY '"'
Або через ETL-інструмент Airbyte / dbt / Apache Nifi.
Стратегія zero-downtime при зміні типу БД
- Запустити нову БД паралельно зі старою
- Написати dual-write шар у застосунку: кожна запис іде в обидві БД
- Запустити фоновий процес синхронізації історичних даних
- Після вирівнювання — перевести читання на нову БД
- Через тиждень після стабільної роботи — вимкнути dual-write та стару БД
class DualWriteRepository:
def __init__(self, primary, secondary):
self.primary = primary
self.secondary = secondary
def create_user(self, data):
result = self.primary.create_user(data)
try:
self.secondary.create_user(data)
except Exception as e:
logger.error(f"Secondary write failed: {e}")
# Не перериваємо запит — кладемо в чергу для retry
queue.put(('create_user', data))
return result
Верифікація після міграції
-- Порівняння кількості записів
SELECT 'users' as table_name, COUNT(*) FROM users
UNION ALL
SELECT 'orders', COUNT(*) FROM orders
UNION ALL
SELECT 'products', COUNT(*) FROM products;
-- Порівняння контрольних сум (PostgreSQL)
SELECT md5(array_agg(md5(id::text || email))::text)
FROM (SELECT id, email FROM users ORDER BY id) t;
Тривалість виконання
MySQL → PostgreSQL для бази до 50GB — 3–5 робочих днів. MongoDB → PostgreSQL з нормалізацією схеми — 1–2 тижні.







