Реализация Event Sourcing для веб-приложения
Event Sourcing — паттерн хранения состояния приложения через последовательность неизменяемых событий вместо обновляемых записей. Текущее состояние объекта восстанавливается «прокруткой» его истории событий. Это не серебряная пуля: паттерн добавляет сложность и оправдан для доменов с богатой историей изменений, аудитом или необходимостью temporal queries.
Когда применять Event Sourcing
Подходит:
- Финансовые транзакции, бухгалтерия (нужна полная история каждого изменения баланса)
- Системы заказов и логистики (OrderPlaced → PaymentProcessed → Shipped → Delivered)
- Медицинские карты (каждое изменение должно быть задокументировано)
- Системы с undo/redo или откатом состояния
Не подходит:
- CRUD-справочники без истории
- Аналитические хранилища (там лучше CDC + data warehouse)
- Простые блоги и лендинги
Структура события
interface DomainEvent {
eventId: string; // UUID
aggregateId: string; // ID сущности (orderId, userId)
aggregateType: string; // 'Order', 'Account'
eventType: string; // 'OrderPlaced', 'ItemAdded'
eventVersion: number; // для schema evolution
occurredAt: Date;
payload: Record<string, unknown>;
metadata: {
causedBy?: string; // eventId родительского события
userId?: string;
correlationId: string;
};
}
Event Store
Основная таблица — append-only. Никакого UPDATE и DELETE:
CREATE TABLE event_store (
id BIGSERIAL PRIMARY KEY,
event_id UUID UNIQUE NOT NULL,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_version INT NOT NULL DEFAULT 1,
payload JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
sequence_nr BIGINT NOT NULL -- глобальный порядок
);
CREATE INDEX idx_es_aggregate ON event_store (aggregate_id, aggregate_type, id);
CREATE INDEX idx_es_sequence ON event_store (sequence_nr);
Оптимистичная блокировка — проверка sequence_nr перед записью нового события предотвращает конфликты конкурентных записей.
Aggregate и Replay
class OrderAggregate {
private state: OrderState = { status: 'new', items: [], total: 0 };
private version = 0;
private uncommittedEvents: DomainEvent[] = [];
static rehydrate(events: DomainEvent[]): OrderAggregate {
const order = new OrderAggregate();
for (const event of events) {
order.apply(event);
}
return order;
}
placeOrder(items: OrderItem[]) {
// валидация бизнес-правил
if (this.state.status !== 'new') throw new Error('Order already placed');
this.raise({
eventType: 'OrderPlaced',
payload: { items, placedAt: new Date() }
});
}
private apply(event: DomainEvent) {
switch (event.eventType) {
case 'OrderPlaced':
this.state.status = 'placed';
this.state.items = event.payload.items;
break;
case 'PaymentProcessed':
this.state.status = 'paid';
this.state.paidAmount = event.payload.amount;
break;
case 'OrderShipped':
this.state.status = 'shipped';
this.state.trackingNumber = event.payload.trackingNumber;
break;
}
this.version++;
}
}
Снапшоты (Snapshots)
При большом числе событий на агрегат (>500) полный replay становится медленным. Снапшот — сериализованное состояние на момент N-го события. При загрузке читается последний снапшот + события после него:
async loadAggregate(aggregateId: string): Promise<OrderAggregate> {
const snapshot = await this.snapshotRepo.findLatest(aggregateId);
const fromSequence = snapshot?.version ?? 0;
const events = await this.eventStore.getEvents(
aggregateId, { fromVersion: fromSequence }
);
const aggregate = snapshot
? OrderAggregate.fromSnapshot(snapshot)
: new OrderAggregate();
return aggregate.rehydrate(events);
}
Снапшоты создаются асинхронно каждые 100–500 событий на агрегат.
Проекции (Read Models)
Event Sourcing диктует разделение Write Model (события) и Read Model (проекции для запросов). Проекция подписывается на поток событий и строит денормализованную таблицу для быстрого чтения:
class OrderProjection {
async on(event: DomainEvent) {
switch (event.eventType) {
case 'OrderPlaced':
await db.query(`
INSERT INTO orders_view (id, status, customer_id, total, created_at)
VALUES ($1, 'placed', $2, $3, $4)
`, [event.aggregateId, event.payload.customerId,
event.payload.total, event.occurredAt]);
break;
case 'OrderShipped':
await db.query(`
UPDATE orders_view SET status = 'shipped',
tracking_number = $2, shipped_at = $3
WHERE id = $1
`, [event.aggregateId, event.payload.trackingNumber, event.occurredAt]);
break;
}
}
}
Проекции можно удалить и пересобрать с нуля — история событий полная.
Технический стек
Готовые Event Store:
- EventStoreDB — специализированная СУБД, поддерживает подписки, catchup subscriptions, проекции
- Marten (.NET) — PostgreSQL как Event Store + документная БД
- Axon Framework (Java) — полный ES/CQRS фреймворк
Самописный на PostgreSQL — достаточен для большинства проектов. LISTEN/NOTIFY для уведомления проекций о новых событиях.
Брокер событий для распределения между сервисами: Kafka, RabbitMQ, NATS JetStream.
Schema Evolution
Версионирование схем событий — обязательная практика. Стратегии:
- Upcasting — при чтении старого события трансформировать его к новой схеме
- Weak schema — JSON позволяет добавлять поля без поломки
-
Event versioning — хранить
eventVersion, читать с разными хендлерами
Временна́я сложность
| Операция | Без снапшотов | Со снапшотами |
|---|---|---|
| Загрузка агрегата (N событий) | O(N) | O(recent events) |
| Запись события | O(1) | O(1) |
| Запрос по состоянию | O(N) projection rebuild | O(1) read model |
Сроки реализации
- Базовый Event Store на PostgreSQL с append-only таблицей — 2–3 дня
- Один агрегат с несколькими типами событий — 3–5 дней
- Проекции + подписки + снапшоты — ещё 5–7 дней
- Полная система с несколькими агрегатами, schema evolution и мониторингом — 3–5 недель







