Kafka Schema Registry setup for message validation

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Showing 1 of 1 servicesAll 2065 services
Kafka Schema Registry setup for message validation
Complex
~2-3 business days
FAQ
Our competencies:
Development stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1218
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    853
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1047
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Website development for FIXPER company
    819

Kafka Schema Registry Setup for Message Validation

Without Schema Registry, Kafka topics are blind byte streams. If a producer changes JSON format—the consumer crashes with NullPointerException. Schema Registry solves this: message schemas are versioned, evolution is controlled, incompatible changes are blocked before publication.

Used together with Avro, Protobuf, or JSON Schema formats.

Architecture

Schema Registry is a separate HTTP service that stores schemas in the Kafka topic _schemas. When a producer sends for the first time, it registers the schema and receives schema_id (an integer). Instead of the full schema, each message contains only schema_id (4 bytes)—this is the Confluent wire format.

Producer → [magic byte 0x00][schema_id 4 bytes][serialized payload] → Kafka
Consumer → reads schema_id → requests schema from Registry → deserializes

Installing Confluent Schema Registry

# Via Docker Compose (typical setup)
version: '3.8'
services:
  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.0
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092,kafka-3:9092"
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: "_schemas"
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3
      SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: "BACKWARD"
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8081/subjects"]
      interval: 10s
      retries: 5

For production—minimum 2 instances behind a load balancer, one acts as master.

Defining Avro Schema

{
  "type": "record",
  "name": "OrderEvent",
  "namespace": "com.example.orders",
  "doc": "Order change event",
  "fields": [
    {
      "name": "event_id",
      "type": "string",
      "doc": "Event UUID"
    },
    {
      "name": "order_id",
      "type": "long"
    },
    {
      "name": "user_id",
      "type": "long"
    },
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": ["CREATED", "PAID", "SHIPPED", "DELIVERED", "CANCELLED"]
      }
    },
    {
      "name": "amount",
      "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}
    },
    {
      "name": "created_at",
      "type": {"type": "long", "logicalType": "timestamp-millis"}
    },
    {
      "name": "metadata",
      "type": ["null", {"type": "map", "values": "string"}],
      "default": null,
      "doc": "Optional metadata—new field, backward-compatible"
    }
  ]
}

Registering Schema via REST API

# Register schema for subject "order-events-value"
# Subject naming strategy: {topic}-value (default) or custom

curl -X POST http://schema-registry:8081/subjects/order-events-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schema": "{\"type\":\"record\",\"name\":\"OrderEvent\",\"namespace\":\"com.example.orders\",\"fields\":[{\"name\":\"event_id\",\"type\":\"string\"},{\"name\":\"order_id\",\"type\":\"long\"},{\"name\":\"status\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"created_at\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}]}"
  }'

# Response: {"id": 1}

# Get all schema versions
curl http://schema-registry:8081/subjects/order-events-value/versions

# Get schema by version
curl http://schema-registry:8081/subjects/order-events-value/versions/1

# Get schema by ID
curl http://schema-registry:8081/schemas/ids/1

# Check compatibility of new schema before registration
curl -X POST http://schema-registry:8081/compatibility/subjects/order-events-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "..."}'
# Response: {"is_compatible": true}

Configuring Compatibility Modes

# Global mode
curl -X PUT http://schema-registry:8081/config \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'

# Override for specific subject
curl -X PUT http://schema-registry:8081/config/order-events-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "FULL"}'

Modes:

  • BACKWARD—new schema reads data written by old schema. Can add fields with default, remove fields without default.
  • FORWARD—old schema reads data written by new schema. Reverse direction.
  • FULL—both directions. Only addition/removal of optional fields.
  • NONE—no validation. Development only.

Java Producer with Avro

<!-- pom.xml -->
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.11.3</version>
</dependency>
// Generate Java classes from Avro schema (via avro-maven-plugin)
// or use GenericRecord for dynamic schema

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("auto.register.schemas", false); // In production—disable auto-registration
props.put("use.latest.version", true);

KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props);

OrderEvent event = OrderEvent.newBuilder()
    .setEventId(UUID.randomUUID().toString())
    .setOrderId(12345L)
    .setUserId(67890L)
    .setStatus(OrderStatus.CREATED)
    .setCreatedAt(Instant.now().toEpochMilli())
    .build();

producer.send(new ProducerRecord<>("order-events", event.getOrderId().toString(), event));

Python Client with Avro

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer

schema_registry_conf = {'url': 'http://schema-registry:8081'}

value_schema = avro.load('schemas/order_event.avsc')

producer = AvroProducer(
    {
        'bootstrap.servers': 'kafka-1:9092',
        'schema.registry.url': 'http://schema-registry:8081',
        'acks': 'all',
    },
    default_value_schema=value_schema
)

producer.produce(
    topic='order-events',
    key=str(order_id),
    value={
        'event_id': str(uuid.uuid4()),
        'order_id': order_id,
        'user_id': user_id,
        'status': 'CREATED',
        'amount': 1999.99,
        'created_at': int(time.time() * 1000),
    }
)
producer.flush()

Monitoring Schema Registry

# Count registered subjects
curl -s http://schema-registry:8081/subjects | jq '. | length'

# List all subjects
curl -s http://schema-registry:8081/subjects | jq '.[]'

# Metrics via JMX or built-in endpoint
curl http://schema-registry:8081/metrics

Prometheus: Schema Registry exports metrics in Prometheus format on /metrics. Important: kafka_schema_registry_master_slave_role (should be one master), kafka_schema_registry_registered_count.

CI/CD Integration

In pipeline before deploying a new service version—check schema compatibility:

#!/bin/bash
# schema-compatibility-check.sh

SCHEMA_FILE="src/main/avro/OrderEvent.avsc"
SUBJECT="order-events-value"
REGISTRY_URL="http://schema-registry:8081"

SCHEMA_JSON=$(jq -c . "$SCHEMA_FILE")
RESPONSE=$(curl -s -X POST \
    "${REGISTRY_URL}/compatibility/subjects/${SUBJECT}/versions/latest" \
    -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    -d "{\"schema\": $(echo $SCHEMA_JSON | jq -R .)}")

COMPATIBLE=$(echo $RESPONSE | jq -r '.is_compatible')

if [ "$COMPATIBLE" != "true" ]; then
    echo "FAIL: Schema is not compatible: $RESPONSE"
    exit 1
fi

echo "OK: Schema is backward compatible"

Timeline

Day 1—install Schema Registry, define Avro schemas for all topics, configure compatibility mode.

Day 2—integrate producers and consumers with KafkaAvroSerializer/Deserializer, test wire format.

Day 3—integrate compatibility check into CI/CD, document schema evolution process for team. Test: intentionally broken incompatible change should fail in pipeline before deployment.