Налаштування Kafka Schema Registry для валідації повідомлень
Без Schema Registry топіки Kafka—це сліпі потоки байтів. Якщо продюсер змінить формат JSON—консьюмер впадає з NullPointerException. Schema Registry вирішує цю проблему: схеми повідомлень версіюються, еволюція контролюється, несумісні зміни блокуються перед публікацією.
Використовується разом з форматами Avro, Protobuf або JSON Schema.
Архітектура
Schema Registry—це окремий HTTP-сервіс, який зберігає схеми в топіці Kafka _schemas. Коли продюсер вперше відправляє—реєструє схему й отримує schema_id (ціле число). Замість повної схеми кожне повідомлення містить тільки schema_id (4 байти)—це формат Confluent wire format.
Producer → [magic byte 0x00][schema_id 4 bytes][serialized payload] → Kafka
Consumer → читає schema_id → запитує схему з Registry → десеріалізує
Встановлення Confluent Schema Registry
# Через Docker Compose (типовий сетап)
version: '3.8'
services:
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092,kafka-3:9092"
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: "_schemas"
SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: "BACKWARD"
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8081/subjects"]
interval: 10s
retries: 5
Для прод—мінімум 2 екземпляри за балансувальником навантаження, один діє як master.
Визначення Avro-схеми
{
"type": "record",
"name": "OrderEvent",
"namespace": "com.example.orders",
"doc": "Подія зміни замовлення",
"fields": [
{
"name": "event_id",
"type": "string",
"doc": "UUID події"
},
{
"name": "order_id",
"type": "long"
},
{
"name": "user_id",
"type": "long"
},
{
"name": "status",
"type": {
"type": "enum",
"name": "OrderStatus",
"symbols": ["CREATED", "PAID", "SHIPPED", "DELIVERED", "CANCELLED"]
}
},
{
"name": "amount",
"type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}
},
{
"name": "created_at",
"type": {"type": "long", "logicalType": "timestamp-millis"}
},
{
"name": "metadata",
"type": ["null", {"type": "map", "values": "string"}],
"default": null,
"doc": "Опціональні метадані—нове поле, backward-compatible"
}
]
}
Реєстрація схеми через REST API
# Реєструємо схему для subject "order-events-value"
# Subject naming strategy: {topic}-value (за замовчуванням) або кастомна
curl -X POST http://schema-registry:8081/subjects/order-events-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"OrderEvent\",\"namespace\":\"com.example.orders\",\"fields\":[{\"name\":\"event_id\",\"type\":\"string\"},{\"name\":\"order_id\",\"type\":\"long\"},{\"name\":\"status\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"created_at\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}]}"
}'
# Відповідь: {"id": 1}
# Отримати всі версії схеми
curl http://schema-registry:8081/subjects/order-events-value/versions
# Отримати схему за версією
curl http://schema-registry:8081/subjects/order-events-value/versions/1
# Отримати схему за ID
curl http://schema-registry:8081/schemas/ids/1
# Перевірити сумісність нової схеми перед реєстрацією
curl -X POST http://schema-registry:8081/compatibility/subjects/order-events-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "..."}'
# Відповідь: {"is_compatible": true}
Налаштування режимів сумісності
# Глобальний режим
curl -X PUT http://schema-registry:8081/config \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
# Перевизначення для конкретного subject
curl -X PUT http://schema-registry:8081/config/order-events-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "FULL"}'
Режими:
-
BACKWARD—нова схема читає дані, записані старою. Можна додавати поля зі значенням за замовчуванням, видаляти поля без значення. -
FORWARD—стара схема читає дані, записані новою. Зворотний напрямок. -
FULL—обидва напрямки. Тільки додавання/видалення optional-полів. -
NONE—без перевірок. Тільки для розробки.
Java-продюсер з Avro
<!-- pom.xml -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
// Генерування Java-класів з Avro-схеми (через avro-maven-plugin)
// або використання GenericRecord для динамічної схеми
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("auto.register.schemas", false); // У проді—забороняємо авто-реєстрацію
props.put("use.latest.version", true);
KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props);
OrderEvent event = OrderEvent.newBuilder()
.setEventId(UUID.randomUUID().toString())
.setOrderId(12345L)
.setUserId(67890L)
.setStatus(OrderStatus.CREATED)
.setCreatedAt(Instant.now().toEpochMilli())
.build();
producer.send(new ProducerRecord<>("order-events", event.getOrderId().toString(), event));
Python-клієнт з Avro
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer
schema_registry_conf = {'url': 'http://schema-registry:8081'}
value_schema = avro.load('schemas/order_event.avsc')
producer = AvroProducer(
{
'bootstrap.servers': 'kafka-1:9092',
'schema.registry.url': 'http://schema-registry:8081',
'acks': 'all',
},
default_value_schema=value_schema
)
producer.produce(
topic='order-events',
key=str(order_id),
value={
'event_id': str(uuid.uuid4()),
'order_id': order_id,
'user_id': user_id,
'status': 'CREATED',
'amount': 1999.99,
'created_at': int(time.time() * 1000),
}
)
producer.flush()
Моніторинг Schema Registry
# Кількість зареєстрованих subjects
curl -s http://schema-registry:8081/subjects | jq '. | length'
# Список всіх subjects
curl -s http://schema-registry:8081/subjects | jq '.[]'
# Метрики через JMX або вбудований endpoint
curl http://schema-registry:8081/metrics
Prometheus: Schema Registry експортує метрики в форматі Prometheus на /metrics. Важливі: kafka_schema_registry_master_slave_role (має бути один master), kafka_schema_registry_registered_count.
Інтеграція CI/CD
У pipeline перед розгортанням нової версії сервісу—перевіряємо сумісність схеми:
#!/bin/bash
# schema-compatibility-check.sh
SCHEMA_FILE="src/main/avro/OrderEvent.avsc"
SUBJECT="order-events-value"
REGISTRY_URL="http://schema-registry:8081"
SCHEMA_JSON=$(jq -c . "$SCHEMA_FILE")
RESPONSE=$(curl -s -X POST \
"${REGISTRY_URL}/compatibility/subjects/${SUBJECT}/versions/latest" \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{\"schema\": $(echo $SCHEMA_JSON | jq -R .)}")
COMPATIBLE=$(echo $RESPONSE | jq -r '.is_compatible')
if [ "$COMPATIBLE" != "true" ]; then
echo "FAIL: Schema is not compatible: $RESPONSE"
exit 1
fi
echo "OK: Schema is backward compatible"
Таймлайн
День 1—встановлення Schema Registry, визначення Avro-схем для всіх топіків, налаштування режиму сумісності.
День 2—інтеграція продюсерів і консьюмерів з KafkaAvroSerializer/Deserializer, тестування wire format.
День 3—інтеграція перевірки сумісності в CI/CD, документування процесу еволюції схем для команди. Тест: навмисно зломана несумісна зміна має упасти в pipeline перед розгортанням.







