Реалізація Event Sourcing для веб-додатку
Event Sourcing — патерн збереження стану додатку через послідовність невіддільних подій замість оновлюваних записів. Поточний стан об'єкту відновлюється «прокруткою» його історії подій. Не срібна куля: патерн додає складність та виправданий для доменів з багатою історією змін, аудитом або потребою в temporal queries.
Коли застосовувати Event Sourcing
Підходить:
- Фінансові трансакції, бухгалтерія (потрібна повна історія кожної зміни балансу)
- Системи замовлень та логістики (OrderPlaced → PaymentProcessed → Shipped → Delivered)
- Медичні карти (кожна зміна повинна бути задокументована)
- Системи з undo/redo або откатом стану
Не підходить:
- CRUD-довідники без історії
- Аналітичні сховища (краще використовувати CDC + data warehouse)
- Прості блоги та лендинги
Структура події
interface DomainEvent {
eventId: string; // UUID
aggregateId: string; // ID сутності (orderId, userId)
aggregateType: string; // 'Order', 'Account'
eventType: string; // 'OrderPlaced', 'ItemAdded'
eventVersion: number; // для schema evolution
occurredAt: Date;
payload: Record<string, unknown>;
metadata: {
causedBy?: string; // eventId батьківської події
userId?: string;
correlationId: string;
};
}
Event Store
Основна таблиця — append-only. Немає UPDATE та DELETE:
CREATE TABLE event_store (
id BIGSERIAL PRIMARY KEY,
event_id UUID UNIQUE NOT NULL,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_version INT NOT NULL DEFAULT 1,
payload JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
sequence_nr BIGINT NOT NULL -- глобальний порядок
);
CREATE INDEX idx_es_aggregate ON event_store (aggregate_id, aggregate_type, id);
CREATE INDEX idx_es_sequence ON event_store (sequence_nr);
Оптимістична блокування — перевірка sequence_nr перед записом нової події запобігає конфліктам конкурентних записів.
Aggregate та Replay
class OrderAggregate {
private state: OrderState = { status: 'new', items: [], total: 0 };
private version = 0;
private uncommittedEvents: DomainEvent[] = [];
static rehydrate(events: DomainEvent[]): OrderAggregate {
const order = new OrderAggregate();
for (const event of events) {
order.apply(event);
}
return order;
}
placeOrder(items: OrderItem[]) {
// валідація бізнес-правил
if (this.state.status !== 'new') throw new Error('Order already placed');
this.raise({
eventType: 'OrderPlaced',
payload: { items, placedAt: new Date() }
});
}
private apply(event: DomainEvent) {
switch (event.eventType) {
case 'OrderPlaced':
this.state.status = 'placed';
this.state.items = event.payload.items;
break;
case 'PaymentProcessed':
this.state.status = 'paid';
this.state.paidAmount = event.payload.amount;
break;
case 'OrderShipped':
this.state.status = 'shipped';
this.state.trackingNumber = event.payload.trackingNumber;
break;
}
this.version++;
}
}
Снапшоти (Snapshots)
При великій кількості подій на агрегат (>500) повний replay стає повільним. Снапшот — серіалізований стан на момент N-ї події. При завантаженні читається останній снапшот + події після нього:
async loadAggregate(aggregateId: string): Promise<OrderAggregate> {
const snapshot = await this.snapshotRepo.findLatest(aggregateId);
const fromSequence = snapshot?.version ?? 0;
const events = await this.eventStore.getEvents(
aggregateId, { fromVersion: fromSequence }
);
const aggregate = snapshot
? OrderAggregate.fromSnapshot(snapshot)
: new OrderAggregate();
return aggregate.rehydrate(events);
}
Снапшоти створюються асинхронно кожні 100–500 подій на агрегат.
Проекції (Read Models)
Event Sourcing диктує розділення Write Model (події) та Read Model (проекції для запитів). Проекція підписується на потік подій та будує денормалізовану таблицю для швидкого читання:
class OrderProjection {
async on(event: DomainEvent) {
switch (event.eventType) {
case 'OrderPlaced':
await db.query(`
INSERT INTO orders_view (id, status, customer_id, total, created_at)
VALUES ($1, 'placed', $2, $3, $4)
`, [event.aggregateId, event.payload.customerId,
event.payload.total, event.occurredAt]);
break;
case 'OrderShipped':
await db.query(`
UPDATE orders_view SET status = 'shipped',
tracking_number = $2, shipped_at = $3
WHERE id = $1
`, [event.aggregateId, event.payload.trackingNumber, event.occurredAt]);
break;
}
}
}
Проекції можна видалити та пересобрати з нуля — історія подій повна.
Технічний стек
Готові Event Store:
- EventStoreDB — спеціалізована СУБД, підтримує підписки, catchup subscriptions, проекції
- Marten (.NET) — PostgreSQL як Event Store + документна БД
- Axon Framework (Java) — повний ES/CQRS фреймворк
Самописаний на PostgreSQL — достатній для більшості проектів. LISTEN/NOTIFY для сповіщення проекцій про нові події.
Брокер подій для розподілу між сервісами: Kafka, RabbitMQ, NATS JetStream.
Schema Evolution
Версіонування схем подій — обов'язкова практика. Стратегії:
- Upcasting — при читанні старої події трансформувати її до нової схеми
- Weak schema — JSON дозволяє додавати поля без поломки
-
Event versioning — зберігати
eventVersion, читати з різними хендлерами
Часова складність
| Операція | Без снапшотів | Зі снапшотами |
|---|---|---|
| Завантаження агрегату (N подій) | O(N) | O(recent events) |
| Запис події | O(1) | O(1) |
| Запит по стану | O(N) projection rebuild | O(1) read model |
Терміни реалізації
- Базовий Event Store на PostgreSQL з append-only таблицею — 2–3 дні
- Один агрегат з кількома типами подій — 3–5 днів
- Проекції + підписки + снапшоти — ще 5–7 днів
- Повна система з кількома агрегатами, schema evolution та моніторингом — 3–5 тижнів







