Розробка схеми подій (Event Schema) для мікросервісів
Контракт між мікросервісами через черги — це Event Schema. Без строгої схеми — це хаос: продюсер переіменував поле, консьюмер упав о 3:00 ночі. Правильно спроектована схема подій з версіонуванням робить зміни явними та контрольованими.
Принципи проектування Event Schema
Події описують факти, не команди. OrderShipped — це факт. ShipOrder — це команда. Подія трапилася та не може бути скасована (тільки компенсована іншою подією).
Схема повинна бути самодостатньою. Консьюмер не повинен робити додаткові запити для обробки події. Всі потрібні дані — у тілі події.
Обратна сумісність за замовчуванням. Старі консьюмери повинні працювати з новими подіями без змін.
Структура події
{
"eventId": "01HQ2XK4VB8M9QXYZ123456789",
"eventType": "order.shipped",
"eventVersion": "1.2",
"occurredAt": "2026-03-28T14:22:00.000Z",
"producedBy": "order-service",
"correlationId": "req-abc-123",
"causationId": "cmd-xyz-456",
"aggregateType": "Order",
"aggregateId": "12345",
"aggregateVersion": 7,
"payload": {
"orderId": 12345,
"userId": 67890,
"carrier": "DHL",
"trackingCode": "JD123456789DE",
"estimatedDelivery": "2026-03-31",
"items": [
{"sku": "PROD-001", "quantity": 2, "warehouseId": "WH-MSK"}
]
}
}
Обов'язкові поля конверта:
-
eventId— ULID або UUID, унікальний ідентифікатор для ідемпотентності -
eventType— ієрархічний,domain.aggregate.action -
eventVersion— semantic versioning схеми payload -
occurredAt— UTC ISO 8601 -
correlationId— для трасування ланцюга запитів -
aggregateId+aggregateVersion— для оптимістичної блокування
Avro-схема з еволюцією
{
"type": "record",
"name": "OrderShipped",
"namespace": "com.example.orders.events",
"doc": "Подія відгрузки замовлення зі складу",
"fields": [
{"name": "eventId", "type": "string"},
{"name": "eventType", "type": "string", "default": "order.shipped"},
{"name": "occurredAt", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "orderId", "type": "long"},
{"name": "userId", "type": "long"},
{"name": "carrier", "type": "string"},
{"name": "trackingCode", "type": "string"},
{
"name": "estimatedDelivery",
"type": ["null", "string"],
"default": null,
"doc": "ISO дата, може відсутствувати для деяких перевозників"
},
{
"name": "warehouseId",
"type": ["null", "string"],
"default": null,
"doc": "Додано в v1.1 — необов'язкове поле для backward compatibility"
},
{
"name": "shippingCost",
"type": ["null", {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}],
"default": null,
"doc": "Додано в v1.2"
}
]
}
Правила еволюції для backward compatibility:
- Нові поля — завжди з
default(null або значення) - Не можна видаляти обов'язкові поля
- Не можна змінювати тип поля
- Не можна переіменовувати поля (додайте alias, потім переіменуйте у major-версії)
Версіонування та стратегії сумісності
# Налаштування Schema Registry — BACKWARD сумісність для всіх подій orders
curl -X PUT http://schema-registry:8081/config/order-events-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD_TRANSITIVE"}'
# BACKWARD_TRANSITIVE — нова схема сумісна з УСІМИпопередніми версіями,
# не тільки з останньою
Мажорна зміна (breaking change) — новий топік:
order-events-v1 → для консьюмерів на старій схемі
order-events-v2 → нова схема, консьюмери мігрують поступово
Перехідний період: продюсер публікує в обидва топіки. Після повної міграції — order-events-v1 deprecated.
Event Catalog — документування схем
Для команди з кількох сервісів критично мати центральний реєстр подій. Використовуємо AsyncAPI:
# asyncapi.yaml
asyncapi: 3.0.0
info:
title: Order Service Events
version: 1.0.0
description: Події, публіковані Order Service
channels:
order-events:
address: order-events
messages:
OrderCreated:
$ref: '#/components/messages/OrderCreated'
OrderShipped:
$ref: '#/components/messages/OrderShipped'
OrderCancelled:
$ref: '#/components/messages/OrderCancelled'
components:
messages:
OrderCreated:
name: OrderCreated
title: Замовлення створене
summary: Публікується при успішному створенні нового замовлення
contentType: application/avro
headers:
type: object
properties:
correlationId:
type: string
description: ID вхідного HTTP-запиту
payload:
type: object
required: [eventId, orderId, userId, items, totalAmount]
properties:
eventId:
type: string
format: ulid
orderId:
type: integer
format: int64
userId:
type: integer
format: int64
items:
type: array
items:
type: object
properties:
sku:
type: string
quantity:
type: integer
price:
type: number
totalAmount:
type: number
createdAt:
type: string
format: date-time
Типізований Event Publisher (TypeScript/Node.js)
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import { Kafka } from 'kafkajs';
interface EventEnvelope<T> {
eventId: string;
eventType: string;
eventVersion: string;
occurredAt: string;
producedBy: string;
correlationId?: string;
aggregateType: string;
aggregateId: string;
aggregateVersion: number;
payload: T;
}
interface OrderShippedPayload {
orderId: number;
userId: number;
carrier: string;
trackingCode: string;
estimatedDelivery?: string;
}
class OrderEventPublisher {
private registry: SchemaRegistry;
private producer: ReturnType<Kafka['producer']>;
async publishOrderShipped(data: OrderShippedPayload, correlationId?: string): Promise<void> {
const envelope: EventEnvelope<OrderShippedPayload> = {
eventId: ulid(),
eventType: 'order.shipped',
eventVersion: '1.2',
occurredAt: new Date().toISOString(),
producedBy: 'order-service',
correlationId,
aggregateType: 'Order',
aggregateId: String(data.orderId),
aggregateVersion: await this.getAggregateVersion(data.orderId),
payload: data,
};
const schemaId = await this.registry.getLatestSchemaId('order-events-value');
const encoded = await this.registry.encode(schemaId, envelope);
await this.producer.send({
topic: 'order-events',
messages: [{
key: String(data.orderId),
value: encoded,
headers: {
'correlation-id': correlationId ?? '',
'event-type': 'order.shipped',
},
}],
});
}
}
Тестування Event Schema
// Contract testing — перевіряємо, що продюсер публікує те, що консьюмер очікує
@SpringBootTest
class OrderEventContractTest {
@Test
void orderShippedEvent_shouldMatchConsumerExpectations() throws Exception {
OrderShipped event = OrderShipped.newBuilder()
.setEventId(UUID.randomUUID().toString())
.setOrderId(12345L)
.setUserId(67890L)
.setCarrier("DHL")
.setTrackingCode("JD123456789DE")
.build();
// Серіалізуємо з Avro
byte[] serialized = avroSerializer.serialize("order-events", event);
// Десеріалізуємо як консьюмер (інший сервіс)
OrderShipped deserialized = (OrderShipped) avroDeserializer.deserialize("order-events", serialized);
assertThat(deserialized.getOrderId()).isEqualTo(12345L);
assertThat(deserialized.getTrackingCode()).isEqualTo("JD123456789DE");
// Перевіряємо обратну сумісність: старий консьюмер без поля warehouseId
OldOrderShipped oldDeserialized = (OldOrderShipped) oldDeserializer.deserialize("order-events", serialized);
assertThat(oldDeserialized.getOrderId()).isEqualTo(12345L);
// warehouseId відсутня — не падаємо
}
}
Графік
День 1 — воркшоп з командами сервісів: складаємо Event Storming карту, визначаємо всі доменні події та їхні межі.
День 2 — розробка Avro-схем для кожного типу події, визначення правил найменування та структури конверта. Реєстрація в Schema Registry.
День 3 — реалізація типізованих Event Publisher'ів у кожному сервісі-продюсері, AsyncAPI-документація.
День 4 — contract тесты, інтеграція перевірки сумісності в CI/CD pipeline, інструкція для команди по правилах еволюції схем.







