Реалізація Event Sourcing через брокер сообщень
Event Sourcing — це зберігання стану системи не як поточного снімка, а як послідовності подій. Замість UPDATE orders SET status='shipped' в БД додається запис OrderShipped{orderId: 42, timestamp: ..., trackingCode: ...}. Поточний стан обчислюється воспроізведенням усіх подій.
Брокер сообщень (Kafka, EventStoreDB) служить Event Log — незмінним журналом, з якого можна восстановити будь-який минулий стан.
Коли Event Sourcing виправданий
Event Sourcing додає складність. Виправданий, коли:
- Потрібний повний аудит-лог (фінтех, медицина, e-commerce)
- Потрібна можливість воспроізвести минулий стан на конкретний момент
- Кілька read-моделей з одного джерела (CQRS)
- Undo/redo операції
Не виправданий для більшості CRUD-додатків без суворих вимог до аудиту.
Event Store на базі Kafka
Kafka ідеально підходить як Event Store: топіки — це append-only логи, події впорядковані в межах партиції, retention налаштовується вплоть до log.retention.ms=-1 (назавжди).
Схема топіків:
-
orders-events— усі події замовлень (ключ = order_id, гарантує порядок у партиції) -
users-events— події користувачів -
inventory-events— переміщення товарних залишків
// Базовий клас доменної події
public abstract class DomainEvent {
private final String eventId;
private final String aggregateId;
private final String aggregateType;
private final long version; // монотонно зростаючий номер версії агрегату
private final Instant occurredAt;
private final String causedBy; // ID команди, яка викликала подію
// події незмінні
}
// Конкретні події
public class OrderCreated extends DomainEvent {
private final Long userId;
private final List<OrderItem> items;
private final BigDecimal totalAmount;
private final String currency;
}
public class OrderPaid extends DomainEvent {
private final String paymentId;
private final String paymentMethod;
private final BigDecimal amount;
}
public class OrderShipped extends DomainEvent {
private final String carrier;
private final String trackingCode;
private final Instant estimatedDelivery;
}
public class OrderCancelled extends DomainEvent {
private final String reason;
private final String cancelledBy; // "customer" | "system" | "support"
}
Агрегат з event sourcing
public class Order {
private Long id;
private Long userId;
private OrderStatus status;
private List<OrderItem> items;
private BigDecimal totalAmount;
private long version = 0;
private final List<DomainEvent> pendingEvents = new ArrayList<>();
// Статичний метод восстановлення з подій
public static Order reconstitute(List<DomainEvent> events) {
Order order = new Order();
for (DomainEvent event : events) {
order.apply(event);
}
return order;
}
// Команда: створити замовлення
public void create(Long userId, List<OrderItem> items) {
if (this.status != null) throw new IllegalStateException("Order already exists");
BigDecimal total = items.stream()
.map(i -> i.getPrice().multiply(BigDecimal.valueOf(i.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
OrderCreated event = new OrderCreated(
UUID.randomUUID().toString(),
String.valueOf(id),
"Order",
version + 1,
Instant.now(),
userId,
items,
total,
"USD"
);
apply(event);
pendingEvents.add(event);
}
public void pay(String paymentId, String method, BigDecimal amount) {
if (status != OrderStatus.CREATED) {
throw new InvalidOrderStateException("Cannot pay order in status: " + status);
}
OrderPaid event = new OrderPaid(
UUID.randomUUID().toString(),
String.valueOf(id),
"Order",
version + 1,
Instant.now(),
paymentId,
method,
amount
);
apply(event);
pendingEvents.add(event);
}
// apply — мутирує стан без побічних ефектів
private void apply(DomainEvent event) {
version = event.getVersion();
if (event instanceof OrderCreated e) {
this.userId = e.getUserId();
this.items = e.getItems();
this.totalAmount = e.getTotalAmount();
this.status = OrderStatus.CREATED;
} else if (event instanceof OrderPaid) {
this.status = OrderStatus.PAID;
} else if (event instanceof OrderShipped e) {
this.status = OrderStatus.SHIPPED;
} else if (event instanceof OrderCancelled) {
this.status = OrderStatus.CANCELLED;
}
}
public List<DomainEvent> pullPendingEvents() {
List<DomainEvent> events = new ArrayList<>(pendingEvents);
pendingEvents.clear();
return events;
}
}
Event Store Repository
@Repository
public class OrderEventStoreRepository {
private final KafkaTemplate<String, DomainEvent> kafkaTemplate;
private final KafkaConsumer<String, DomainEvent> replayConsumer;
private static final String TOPIC = "order-events";
public void save(Order order) {
List<DomainEvent> events = order.pullPendingEvents();
if (events.isEmpty()) return;
// Оптимістична блокування — версія події у заголовку
for (DomainEvent event : events) {
Headers headers = new RecordHeaders();
headers.add("aggregate-version", String.valueOf(event.getVersion()).getBytes());
headers.add("event-type", event.getClass().getSimpleName().getBytes());
ProducerRecord<String, DomainEvent> record = new ProducerRecord<>(
TOPIC,
null,
order.getId().toString(),
event,
headers
);
// Синхронна відправка для гарантії запису
kafkaTemplate.send(record).get(5, TimeUnit.SECONDS);
}
}
public Order load(Long orderId) {
// Воспроізводимо всі події для агрегату
List<DomainEvent> events = replayEvents(TOPIC, orderId.toString());
if (events.isEmpty()) throw new OrderNotFoundException(orderId);
return Order.reconstitute(events);
}
private List<DomainEvent> replayEvents(String topic, String aggregateId) {
// Читаємо всі партиції та фільтруємо за ключем агрегату
// У реальному продукті: використовуємо окремий топік per-aggregate
// або EventStoreDB замість Kafka для кращої підтримки читання за aggregate ID
List<DomainEvent> events = new ArrayList<>();
// ... реалізація читання за ключем
return events;
}
}
Проекції та Read Models
З Event Stream будуються Read Models — денормалізовані представлення для конкретних запитів:
@Component
public class OrderReadModelProjection {
@Autowired
private OrderReadModelRepository readRepo;
@KafkaListener(topics = "order-events", groupId = "order-read-model-projector")
public void project(ConsumerRecord<String, DomainEvent> record) {
DomainEvent event = record.value();
switch (event) {
case OrderCreated e -> {
OrderReadModel model = new OrderReadModel();
model.setOrderId(Long.parseLong(e.getAggregateId()));
model.setUserId(e.getUserId());
model.setStatus("CREATED");
model.setTotalAmount(e.getTotalAmount());
model.setItemCount(e.getItems().size());
model.setCreatedAt(e.getOccurredAt());
readRepo.save(model);
}
case OrderPaid e -> readRepo.updateStatus(
Long.parseLong(e.getAggregateId()), "PAID", e.getOccurredAt()
);
case OrderShipped e -> readRepo.updateStatusWithTracking(
Long.parseLong(e.getAggregateId()), "SHIPPED",
e.getTrackingCode(), e.getEstimatedDelivery()
);
case OrderCancelled e -> readRepo.updateStatus(
Long.parseLong(e.getAggregateId()), "CANCELLED", e.getOccurredAt()
);
default -> {}
}
}
}
Snapshots — оптимізація воспроізведення
При тисячах подій на агрегат воспроізведення з початку займає час. Snapshots зберігають стан на певній версії:
@Service
public class SnapshotService {
public void createSnapshotIfNeeded(Order order) {
if (order.getVersion() % 100 == 0) { // кожні 100 подій
OrderSnapshot snapshot = new OrderSnapshot(
order.getId(),
order.getVersion(),
objectMapper.writeValueAsString(order),
Instant.now()
);
snapshotRepo.save(snapshot);
}
}
public Order loadWithSnapshot(Long orderId) {
Optional<OrderSnapshot> snapshot = snapshotRepo.findLatest(orderId);
if (snapshot.isPresent()) {
Order order = objectMapper.readValue(snapshot.get().getState(), Order.class);
// Загружаємо тільки події після версії snapshots
List<DomainEvent> newEvents = eventRepo.loadAfterVersion(
orderId, snapshot.get().getVersion()
);
for (DomainEvent event : newEvents) {
order.applyHistorical(event);
}
return order;
}
return eventRepo.load(orderId);
}
}
Графік
День 1–2 — проектування событійної моделі: які події, їхні поля, версіонування. Avro-схеми або Protobuf для кожного типу події.
День 3–4 — розробка агрегатів та Event Store Repository, реалізація apply-методів, оптимістична блокування.
День 5–6 — реалізація проекцій для Read Models, тестування воспроізведення.
День 7 — snapshots, нагрузочне тестування, оцінка часу воспроізведення без snapshots vs зі snapshots.
День 8 — моніторинг відставання проекцій, інструмент для пересоздання Read Model з Event Log (full replay).







