Налаштування Kafka Schema Registry для валідації повідомлень

Наша компанія займається розробкою, підтримкою та обслуговуванням сайтів будь-якої складності. Від простих односторінкових сайтів до масштабних кластерних систем, побудованих на мікро сервісах. Досвід розробників підтверджено сертифікатами від вендорів.

Розробка та обслуговування будь-яких видів сайтів:

Інформаційні сайти або веб-програми
Сайти візитки, landing page, корпоративні сайти, онлайн каталоги, квіз, промо-сайти, блоги, ресурси новин, інформаційні портали, форуми, агрегатори
Сайти або веб-програми електронної комерції
Інтернет-магазини, B2B-портали, маркетплейси, онлайн-обмінники, кешбек-сайти, біржі, дропшиппінг-платформи, парсери товарів
Веб-програми для управління бізнес-процесами
CRM-системи, ERP-системи, корпоративні портали, системи управління виробництвом, парсери інформації
Сайти або веб-програми електронних послуг
Дошки оголошень, онлайн-школи, онлайн-кінотеатри, конструктори сайтів, портали надання електронних послуг, відеохостинги, тематичні портали

Це лише деякі з технічних типів сайтів, з якими ми працюємо, і кожен із них може мати свої специфічні особливості та функціональність, а також бути адаптованим під конкретні потреби та цілі клієнта.

Пропоновані послуги
Показано 1 з 1 послугУсі 2065 послуг
Налаштування Kafka Schema Registry для валідації повідомлень
Складна
~2-3 робочих дні
Часті питання

Наші компетенції:

Етапи розробки

Останні роботи

  • image_website-b2b-advance_0.png
    Розробка сайту компанії B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    874
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Розробка веб-сайту для компанії ФІКСПЕР
    851

Налаштування Kafka Schema Registry для валідації повідомлень

Без Schema Registry топіки Kafka—це сліпі потоки байтів. Якщо продюсер змінить формат JSON—консьюмер впадає з NullPointerException. Schema Registry вирішує цю проблему: схеми повідомлень версіюються, еволюція контролюється, несумісні зміни блокуються перед публікацією.

Використовується разом з форматами Avro, Protobuf або JSON Schema.

Архітектура

Schema Registry—це окремий HTTP-сервіс, який зберігає схеми в топіці Kafka _schemas. Коли продюсер вперше відправляє—реєструє схему й отримує schema_id (ціле число). Замість повної схеми кожне повідомлення містить тільки schema_id (4 байти)—це формат Confluent wire format.

Producer → [magic byte 0x00][schema_id 4 bytes][serialized payload] → Kafka
Consumer → читає schema_id → запитує схему з Registry → десеріалізує

Встановлення Confluent Schema Registry

# Через Docker Compose (типовий сетап)
version: '3.8'
services:
  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.0
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092,kafka-3:9092"
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: "_schemas"
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3
      SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: "BACKWARD"
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8081/subjects"]
      interval: 10s
      retries: 5

Для прод—мінімум 2 екземпляри за балансувальником навантаження, один діє як master.

Визначення Avro-схеми

{
  "type": "record",
  "name": "OrderEvent",
  "namespace": "com.example.orders",
  "doc": "Подія зміни замовлення",
  "fields": [
    {
      "name": "event_id",
      "type": "string",
      "doc": "UUID події"
    },
    {
      "name": "order_id",
      "type": "long"
    },
    {
      "name": "user_id",
      "type": "long"
    },
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": ["CREATED", "PAID", "SHIPPED", "DELIVERED", "CANCELLED"]
      }
    },
    {
      "name": "amount",
      "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}
    },
    {
      "name": "created_at",
      "type": {"type": "long", "logicalType": "timestamp-millis"}
    },
    {
      "name": "metadata",
      "type": ["null", {"type": "map", "values": "string"}],
      "default": null,
      "doc": "Опціональні метадані—нове поле, backward-compatible"
    }
  ]
}

Реєстрація схеми через REST API

# Реєструємо схему для subject "order-events-value"
# Subject naming strategy: {topic}-value (за замовчуванням) або кастомна

curl -X POST http://schema-registry:8081/subjects/order-events-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schema": "{\"type\":\"record\",\"name\":\"OrderEvent\",\"namespace\":\"com.example.orders\",\"fields\":[{\"name\":\"event_id\",\"type\":\"string\"},{\"name\":\"order_id\",\"type\":\"long\"},{\"name\":\"status\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"created_at\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}]}"
  }'

# Відповідь: {"id": 1}

# Отримати всі версії схеми
curl http://schema-registry:8081/subjects/order-events-value/versions

# Отримати схему за версією
curl http://schema-registry:8081/subjects/order-events-value/versions/1

# Отримати схему за ID
curl http://schema-registry:8081/schemas/ids/1

# Перевірити сумісність нової схеми перед реєстрацією
curl -X POST http://schema-registry:8081/compatibility/subjects/order-events-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "..."}'
# Відповідь: {"is_compatible": true}

Налаштування режимів сумісності

# Глобальний режим
curl -X PUT http://schema-registry:8081/config \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'

# Перевизначення для конкретного subject
curl -X PUT http://schema-registry:8081/config/order-events-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "FULL"}'

Режими:

  • BACKWARD—нова схема читає дані, записані старою. Можна додавати поля зі значенням за замовчуванням, видаляти поля без значення.
  • FORWARD—стара схема читає дані, записані новою. Зворотний напрямок.
  • FULL—обидва напрямки. Тільки додавання/видалення optional-полів.
  • NONE—без перевірок. Тільки для розробки.

Java-продюсер з Avro

<!-- pom.xml -->
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.11.3</version>
</dependency>
// Генерування Java-класів з Avro-схеми (через avro-maven-plugin)
// або використання GenericRecord для динамічної схеми

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("auto.register.schemas", false); // У проді—забороняємо авто-реєстрацію
props.put("use.latest.version", true);

KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props);

OrderEvent event = OrderEvent.newBuilder()
    .setEventId(UUID.randomUUID().toString())
    .setOrderId(12345L)
    .setUserId(67890L)
    .setStatus(OrderStatus.CREATED)
    .setCreatedAt(Instant.now().toEpochMilli())
    .build();

producer.send(new ProducerRecord<>("order-events", event.getOrderId().toString(), event));

Python-клієнт з Avro

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer

schema_registry_conf = {'url': 'http://schema-registry:8081'}

value_schema = avro.load('schemas/order_event.avsc')

producer = AvroProducer(
    {
        'bootstrap.servers': 'kafka-1:9092',
        'schema.registry.url': 'http://schema-registry:8081',
        'acks': 'all',
    },
    default_value_schema=value_schema
)

producer.produce(
    topic='order-events',
    key=str(order_id),
    value={
        'event_id': str(uuid.uuid4()),
        'order_id': order_id,
        'user_id': user_id,
        'status': 'CREATED',
        'amount': 1999.99,
        'created_at': int(time.time() * 1000),
    }
)
producer.flush()

Моніторинг Schema Registry

# Кількість зареєстрованих subjects
curl -s http://schema-registry:8081/subjects | jq '. | length'

# Список всіх subjects
curl -s http://schema-registry:8081/subjects | jq '.[]'

# Метрики через JMX або вбудований endpoint
curl http://schema-registry:8081/metrics

Prometheus: Schema Registry експортує метрики в форматі Prometheus на /metrics. Важливі: kafka_schema_registry_master_slave_role (має бути один master), kafka_schema_registry_registered_count.

Інтеграція CI/CD

У pipeline перед розгортанням нової версії сервісу—перевіряємо сумісність схеми:

#!/bin/bash
# schema-compatibility-check.sh

SCHEMA_FILE="src/main/avro/OrderEvent.avsc"
SUBJECT="order-events-value"
REGISTRY_URL="http://schema-registry:8081"

SCHEMA_JSON=$(jq -c . "$SCHEMA_FILE")
RESPONSE=$(curl -s -X POST \
    "${REGISTRY_URL}/compatibility/subjects/${SUBJECT}/versions/latest" \
    -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    -d "{\"schema\": $(echo $SCHEMA_JSON | jq -R .)}")

COMPATIBLE=$(echo $RESPONSE | jq -r '.is_compatible')

if [ "$COMPATIBLE" != "true" ]; then
    echo "FAIL: Schema is not compatible: $RESPONSE"
    exit 1
fi

echo "OK: Schema is backward compatible"

Таймлайн

День 1—встановлення Schema Registry, визначення Avro-схем для всіх топіків, налаштування режиму сумісності.

День 2—інтеграція продюсерів і консьюмерів з KafkaAvroSerializer/Deserializer, тестування wire format.

День 3—інтеграція перевірки сумісності в CI/CD, документування процесу еволюції схем для команди. Тест: навмисно зломана несумісна зміна має упасти в pipeline перед розгортанням.