Налаштування Kafka Connect для інтеграції з базами даних

Наша компанія займається розробкою, підтримкою та обслуговуванням сайтів будь-якої складності. Від простих односторінкових сайтів до масштабних кластерних систем, побудованих на мікро сервісах. Досвід розробників підтверджено сертифікатами від вендорів.

Розробка та обслуговування будь-яких видів сайтів:

Інформаційні сайти або веб-програми
Сайти візитки, landing page, корпоративні сайти, онлайн каталоги, квіз, промо-сайти, блоги, ресурси новин, інформаційні портали, форуми, агрегатори
Сайти або веб-програми електронної комерції
Інтернет-магазини, B2B-портали, маркетплейси, онлайн-обмінники, кешбек-сайти, біржі, дропшиппінг-платформи, парсери товарів
Веб-програми для управління бізнес-процесами
CRM-системи, ERP-системи, корпоративні портали, системи управління виробництвом, парсери інформації
Сайти або веб-програми електронних послуг
Дошки оголошень, онлайн-школи, онлайн-кінотеатри, конструктори сайтів, портали надання електронних послуг, відеохостинги, тематичні портали

Це лише деякі з технічних типів сайтів, з якими ми працюємо, і кожен із них може мати свої специфічні особливості та функціональність, а також бути адаптованим під конкретні потреби та цілі клієнта.

Пропоновані послуги
Показано 1 з 1 послугУсі 2065 послуг
Налаштування Kafka Connect для інтеграції з базами даних
Складна
~3-5 робочих днів
Часті питання

Наші компетенції:

Етапи розробки

Останні роботи

  • image_website-b2b-advance_0.png
    Розробка сайту компанії B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    874
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Розробка веб-сайту для компанії ФІКСПЕР
    851

Налаштування Kafka Connect для інтеграції з базами даних

Kafka Connect — це фреймворк для потокової репліцирування даних між Kafka та зовнішніми системами без написання кастомного коду. Два типи коннекторів: Source (дані йдуть в Kafka) та Sink (дані йдуть з Kafka у сховище).

Практичний випадок: CDC (Change Data Capture) з PostgreSQL через Debezium → Kafka → ElasticSearch для пошукового індексу. При змінах строки в БД пошук оновляється за секунди без запитів до PostgreSQL.

Встановлення Kafka Connect

Kafka Connect входить у дистрибутив Kafka, але потребує окремого запуску у distributed-режимі:

# Конфігурація distributed-режиму
# /opt/kafka/config/connect-distributed.properties

bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092

# Внутрішні топіки для зберігання конфігурації коннекторів
group.id=kafka-connect-cluster
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
offset.flush.interval.ms=10000

# REST API для управління
rest.host.name=0.0.0.0
rest.port=8083
rest.advertised.host.name=connect-1.internal
rest.advertised.port=8083

# Плагіни (завантажені коннектори)
plugin.path=/opt/kafka/plugins

# Конвертери — Avro з Schema Registry
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081

# Для простих випадків без Schema Registry:
# key.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.json.JsonConverter
# value.converter.schemas.enable=true

Модуль Systemd:

/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties

Debezium PostgreSQL Source Connector

Debezium перехоплює WAL (Write-Ahead Log) PostgreSQL та транслює кожну INSERT/UPDATE/DELETE в Kafka-подію.

Попередня налаштування PostgreSQL:

-- postgresql.conf
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;

-- Створюємо користувача для репліцирування
CREATE USER debezium WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT CONNECT ON DATABASE myapp TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;

-- Створюємо publication для потрібних таблиць
CREATE PUBLICATION debezium_pub FOR TABLE products, orders, users, categories;

Конфігурація коннектора через REST API:

curl -X POST http://connect-1:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-source-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres.internal",
        "database.port": "5432",
        "database.user": "debezium",
        "database.password": "secure_password",
        "database.dbname": "myapp",
        "database.server.name": "myapp-pg",
        "topic.prefix": "myapp",
        "table.include.list": "public.products,public.orders,public.users",
        "plugin.name": "pgoutput",
        "publication.name": "debezium_pub",
        "slot.name": "debezium_slot",
        "snapshot.mode": "initial",
        "snapshot.isolation.mode": "read_committed",
        "decimal.handling.mode": "double",
        "time.precision.mode": "connect",
        "tombstones.on.delete": "true",
        "heartbeat.interval.ms": "10000",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
    }
}'

Топіки створюються автоматично: myapp.public.products, myapp.public.orders.

JDBC Sink Connector — з Kafka в PostgreSQL

Зворотний випадок: события з Kafka пишемо в PostgreSQL (аналітична БД, Data Warehouse).

# Завантажуємо JDBC Connector
wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.4/kafka-connect-jdbc-10.7.4.jar \
    -O /opt/kafka/plugins/kafka-connect-jdbc.jar

curl -X POST http://connect-1:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "4",
        "topics": "myapp.analytics.events",
        "connection.url": "jdbc:postgresql://analytics-pg:5432/analytics",
        "connection.user": "kafka_writer",
        "connection.password": "secure_password",
        "auto.create": "false",
        "auto.evolve": "false",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "id",
        "table.name.format": "analytics.${topic}",
        "batch.size": "1000",
        "db.timezone": "UTC",
        "transforms": "dropPrefix",
        "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.dropPrefix.exclude": "__deleted,__op,__ts_ms"
    }
}'

Elasticsearch Sink Connector

Синхронізація каталогу продуктів з Kafka в Elasticsearch:

curl -X POST http://connect-1:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "4",
        "topics": "myapp.public.products",
        "connection.url": "https://es-node-1:9200,https://es-node-2:9200",
        "connection.username": "elastic",
        "connection.password": "elastic_pass",
        "type.name": "_doc",
        "key.ignore": "false",
        "schema.ignore": "true",
        "behavior.on.null.values": "delete",
        "batch.size": "500",
        "flush.timeout.ms": "10000",
        "max.retries": "5",
        "retry.backoff.ms": "100",
        "linger.ms": "1000",
        "transforms": "extractKey",
        "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractKey.field": "id"
    }
}'

Управління та моніторинг

# Список коннекторів
curl http://connect-1:8083/connectors | jq .

# Статус коннектора
curl http://connect-1:8083/connectors/postgres-source-connector/status | jq .

# Перезапуск упалого task'а
curl -X POST http://connect-1:8083/connectors/postgres-source-connector/tasks/0/restart

# Пауза/відновлення
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/pause
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/resume

# Оновлення конфігурації
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/config \
  -H "Content-Type: application/json" \
  -d '{"heartbeat.interval.ms": "5000", ...}'

Prometheus JMX-метрики через JMX Exporter:

KAFKA_OPTS="-javaagent:/opt/jmx-exporter.jar=9404:/opt/kafka/config/jmx-connect.yml" \
    /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties

Типові проблеми

WAL bloat — Debezium не може встигати за PostgreSQL: якщо слот репліцирування не зміщується, WAL накопичується. Налаштовуємо max_slot_wal_keep_size у PostgreSQL та алерт на розмір WAL.

Schema evolution — при додаванні нової колонки в PostgreSQL Debezium автоматично оновить схему у Schema Registry. Sink-коннектор має бути готовий до нових полів (auto.evolve=true або ручне управління).

Tombstone messages — при DELETE Debezium відправляє два повідомлення: подію DELETE та tombstone (null value). Для compact-топіків tombstone використовується для видалення запису з журналу.

Таймлайн

День 1 — налаштування PostgreSQL для логічної репліцирування, встановлення Kafka Connect у distributed-режимі на 2–3 вузла.

День 2 — встановлення Debezium, первоначальний snapshot (може займати години для великих таблиць), налаштування коннектора, перевірка CDC-подій.

День 3 — налаштування Sink-коннектора (ES або PostgreSQL), трансформації через SMT (Single Message Transform), тестування повного пайплайну INSERT/UPDATE/DELETE.

День 4 — моніторинг, алерти на лаг та помилки, документація схеми топіків, нагрузочне тестування з піковим потоком змін.