Kafka Schema Registry Setup for Message Validation
Without Schema Registry, Kafka topics are blind byte streams. If a producer changes JSON format—the consumer crashes with NullPointerException. Schema Registry solves this: message schemas are versioned, evolution is controlled, incompatible changes are blocked before publication.
Used together with Avro, Protobuf, or JSON Schema formats.
Architecture
Schema Registry is a separate HTTP service that stores schemas in the Kafka topic _schemas. When a producer sends for the first time, it registers the schema and receives schema_id (an integer). Instead of the full schema, each message contains only schema_id (4 bytes)—this is the Confluent wire format.
Producer → [magic byte 0x00][schema_id 4 bytes][serialized payload] → Kafka
Consumer → reads schema_id → requests schema from Registry → deserializes
Installing Confluent Schema Registry
# Via Docker Compose (typical setup)
version: '3.8'
services:
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092,kafka-3:9092"
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: "_schemas"
SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: "BACKWARD"
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8081/subjects"]
interval: 10s
retries: 5
For production—minimum 2 instances behind a load balancer, one acts as master.
Defining Avro Schema
{
"type": "record",
"name": "OrderEvent",
"namespace": "com.example.orders",
"doc": "Order change event",
"fields": [
{
"name": "event_id",
"type": "string",
"doc": "Event UUID"
},
{
"name": "order_id",
"type": "long"
},
{
"name": "user_id",
"type": "long"
},
{
"name": "status",
"type": {
"type": "enum",
"name": "OrderStatus",
"symbols": ["CREATED", "PAID", "SHIPPED", "DELIVERED", "CANCELLED"]
}
},
{
"name": "amount",
"type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}
},
{
"name": "created_at",
"type": {"type": "long", "logicalType": "timestamp-millis"}
},
{
"name": "metadata",
"type": ["null", {"type": "map", "values": "string"}],
"default": null,
"doc": "Optional metadata—new field, backward-compatible"
}
]
}
Registering Schema via REST API
# Register schema for subject "order-events-value"
# Subject naming strategy: {topic}-value (default) or custom
curl -X POST http://schema-registry:8081/subjects/order-events-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"OrderEvent\",\"namespace\":\"com.example.orders\",\"fields\":[{\"name\":\"event_id\",\"type\":\"string\"},{\"name\":\"order_id\",\"type\":\"long\"},{\"name\":\"status\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"created_at\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}]}"
}'
# Response: {"id": 1}
# Get all schema versions
curl http://schema-registry:8081/subjects/order-events-value/versions
# Get schema by version
curl http://schema-registry:8081/subjects/order-events-value/versions/1
# Get schema by ID
curl http://schema-registry:8081/schemas/ids/1
# Check compatibility of new schema before registration
curl -X POST http://schema-registry:8081/compatibility/subjects/order-events-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "..."}'
# Response: {"is_compatible": true}
Configuring Compatibility Modes
# Global mode
curl -X PUT http://schema-registry:8081/config \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
# Override for specific subject
curl -X PUT http://schema-registry:8081/config/order-events-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "FULL"}'
Modes:
-
BACKWARD—new schema reads data written by old schema. Can add fields with default, remove fields without default. -
FORWARD—old schema reads data written by new schema. Reverse direction. -
FULL—both directions. Only addition/removal of optional fields. -
NONE—no validation. Development only.
Java Producer with Avro
<!-- pom.xml -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
// Generate Java classes from Avro schema (via avro-maven-plugin)
// or use GenericRecord for dynamic schema
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("auto.register.schemas", false); // In production—disable auto-registration
props.put("use.latest.version", true);
KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props);
OrderEvent event = OrderEvent.newBuilder()
.setEventId(UUID.randomUUID().toString())
.setOrderId(12345L)
.setUserId(67890L)
.setStatus(OrderStatus.CREATED)
.setCreatedAt(Instant.now().toEpochMilli())
.build();
producer.send(new ProducerRecord<>("order-events", event.getOrderId().toString(), event));
Python Client with Avro
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer
schema_registry_conf = {'url': 'http://schema-registry:8081'}
value_schema = avro.load('schemas/order_event.avsc')
producer = AvroProducer(
{
'bootstrap.servers': 'kafka-1:9092',
'schema.registry.url': 'http://schema-registry:8081',
'acks': 'all',
},
default_value_schema=value_schema
)
producer.produce(
topic='order-events',
key=str(order_id),
value={
'event_id': str(uuid.uuid4()),
'order_id': order_id,
'user_id': user_id,
'status': 'CREATED',
'amount': 1999.99,
'created_at': int(time.time() * 1000),
}
)
producer.flush()
Monitoring Schema Registry
# Count registered subjects
curl -s http://schema-registry:8081/subjects | jq '. | length'
# List all subjects
curl -s http://schema-registry:8081/subjects | jq '.[]'
# Metrics via JMX or built-in endpoint
curl http://schema-registry:8081/metrics
Prometheus: Schema Registry exports metrics in Prometheus format on /metrics. Important: kafka_schema_registry_master_slave_role (should be one master), kafka_schema_registry_registered_count.
CI/CD Integration
In pipeline before deploying a new service version—check schema compatibility:
#!/bin/bash
# schema-compatibility-check.sh
SCHEMA_FILE="src/main/avro/OrderEvent.avsc"
SUBJECT="order-events-value"
REGISTRY_URL="http://schema-registry:8081"
SCHEMA_JSON=$(jq -c . "$SCHEMA_FILE")
RESPONSE=$(curl -s -X POST \
"${REGISTRY_URL}/compatibility/subjects/${SUBJECT}/versions/latest" \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{\"schema\": $(echo $SCHEMA_JSON | jq -R .)}")
COMPATIBLE=$(echo $RESPONSE | jq -r '.is_compatible')
if [ "$COMPATIBLE" != "true" ]; then
echo "FAIL: Schema is not compatible: $RESPONSE"
exit 1
fi
echo "OK: Schema is backward compatible"
Timeline
Day 1—install Schema Registry, define Avro schemas for all topics, configure compatibility mode.
Day 2—integrate producers and consumers with KafkaAvroSerializer/Deserializer, test wire format.
Day 3—integrate compatibility check into CI/CD, document schema evolution process for team. Test: intentionally broken incompatible change should fail in pipeline before deployment.







