Реалізація Event Sourcing через брокер повідомлень

Наша компанія займається розробкою, підтримкою та обслуговуванням сайтів будь-якої складності. Від простих односторінкових сайтів до масштабних кластерних систем, побудованих на мікро сервісах. Досвід розробників підтверджено сертифікатами від вендорів.

Розробка та обслуговування будь-яких видів сайтів:

Інформаційні сайти або веб-програми
Сайти візитки, landing page, корпоративні сайти, онлайн каталоги, квіз, промо-сайти, блоги, ресурси новин, інформаційні портали, форуми, агрегатори
Сайти або веб-програми електронної комерції
Інтернет-магазини, B2B-портали, маркетплейси, онлайн-обмінники, кешбек-сайти, біржі, дропшиппінг-платформи, парсери товарів
Веб-програми для управління бізнес-процесами
CRM-системи, ERP-системи, корпоративні портали, системи управління виробництвом, парсери інформації
Сайти або веб-програми електронних послуг
Дошки оголошень, онлайн-школи, онлайн-кінотеатри, конструктори сайтів, портали надання електронних послуг, відеохостинги, тематичні портали

Це лише деякі з технічних типів сайтів, з якими ми працюємо, і кожен із них може мати свої специфічні особливості та функціональність, а також бути адаптованим під конкретні потреби та цілі клієнта.

Пропоновані послуги
Показано 1 з 1 послугУсі 2065 послуг
Реалізація Event Sourcing через брокер повідомлень
Складна
~1-2 тижні
Часті питання

Наші компетенції:

Етапи розробки

Останні роботи

  • image_website-b2b-advance_0.png
    Розробка сайту компанії B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    874
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Розробка веб-сайту для компанії ФІКСПЕР
    851

Реалізація Event Sourcing через брокер сообщень

Event Sourcing — це зберігання стану системи не як поточного снімка, а як послідовності подій. Замість UPDATE orders SET status='shipped' в БД додається запис OrderShipped{orderId: 42, timestamp: ..., trackingCode: ...}. Поточний стан обчислюється воспроізведенням усіх подій.

Брокер сообщень (Kafka, EventStoreDB) служить Event Log — незмінним журналом, з якого можна восстановити будь-який минулий стан.

Коли Event Sourcing виправданий

Event Sourcing додає складність. Виправданий, коли:

  • Потрібний повний аудит-лог (фінтех, медицина, e-commerce)
  • Потрібна можливість воспроізвести минулий стан на конкретний момент
  • Кілька read-моделей з одного джерела (CQRS)
  • Undo/redo операції

Не виправданий для більшості CRUD-додатків без суворих вимог до аудиту.

Event Store на базі Kafka

Kafka ідеально підходить як Event Store: топіки — це append-only логи, події впорядковані в межах партиції, retention налаштовується вплоть до log.retention.ms=-1 (назавжди).

Схема топіків:

  • orders-events — усі події замовлень (ключ = order_id, гарантує порядок у партиції)
  • users-events — події користувачів
  • inventory-events — переміщення товарних залишків
// Базовий клас доменної події
public abstract class DomainEvent {
    private final String eventId;
    private final String aggregateId;
    private final String aggregateType;
    private final long version;          // монотонно зростаючий номер версії агрегату
    private final Instant occurredAt;
    private final String causedBy;       // ID команди, яка викликала подію

    // події незмінні
}

// Конкретні події
public class OrderCreated extends DomainEvent {
    private final Long userId;
    private final List<OrderItem> items;
    private final BigDecimal totalAmount;
    private final String currency;
}

public class OrderPaid extends DomainEvent {
    private final String paymentId;
    private final String paymentMethod;
    private final BigDecimal amount;
}

public class OrderShipped extends DomainEvent {
    private final String carrier;
    private final String trackingCode;
    private final Instant estimatedDelivery;
}

public class OrderCancelled extends DomainEvent {
    private final String reason;
    private final String cancelledBy; // "customer" | "system" | "support"
}

Агрегат з event sourcing

public class Order {
    private Long id;
    private Long userId;
    private OrderStatus status;
    private List<OrderItem> items;
    private BigDecimal totalAmount;
    private long version = 0;

    private final List<DomainEvent> pendingEvents = new ArrayList<>();

    // Статичний метод восстановлення з подій
    public static Order reconstitute(List<DomainEvent> events) {
        Order order = new Order();
        for (DomainEvent event : events) {
            order.apply(event);
        }
        return order;
    }

    // Команда: створити замовлення
    public void create(Long userId, List<OrderItem> items) {
        if (this.status != null) throw new IllegalStateException("Order already exists");

        BigDecimal total = items.stream()
            .map(i -> i.getPrice().multiply(BigDecimal.valueOf(i.getQuantity())))
            .reduce(BigDecimal.ZERO, BigDecimal::add);

        OrderCreated event = new OrderCreated(
            UUID.randomUUID().toString(),
            String.valueOf(id),
            "Order",
            version + 1,
            Instant.now(),
            userId,
            items,
            total,
            "USD"
        );

        apply(event);
        pendingEvents.add(event);
    }

    public void pay(String paymentId, String method, BigDecimal amount) {
        if (status != OrderStatus.CREATED) {
            throw new InvalidOrderStateException("Cannot pay order in status: " + status);
        }

        OrderPaid event = new OrderPaid(
            UUID.randomUUID().toString(),
            String.valueOf(id),
            "Order",
            version + 1,
            Instant.now(),
            paymentId,
            method,
            amount
        );

        apply(event);
        pendingEvents.add(event);
    }

    // apply — мутирує стан без побічних ефектів
    private void apply(DomainEvent event) {
        version = event.getVersion();

        if (event instanceof OrderCreated e) {
            this.userId = e.getUserId();
            this.items = e.getItems();
            this.totalAmount = e.getTotalAmount();
            this.status = OrderStatus.CREATED;
        } else if (event instanceof OrderPaid) {
            this.status = OrderStatus.PAID;
        } else if (event instanceof OrderShipped e) {
            this.status = OrderStatus.SHIPPED;
        } else if (event instanceof OrderCancelled) {
            this.status = OrderStatus.CANCELLED;
        }
    }

    public List<DomainEvent> pullPendingEvents() {
        List<DomainEvent> events = new ArrayList<>(pendingEvents);
        pendingEvents.clear();
        return events;
    }
}

Event Store Repository

@Repository
public class OrderEventStoreRepository {
    private final KafkaTemplate<String, DomainEvent> kafkaTemplate;
    private final KafkaConsumer<String, DomainEvent> replayConsumer;
    private static final String TOPIC = "order-events";

    public void save(Order order) {
        List<DomainEvent> events = order.pullPendingEvents();
        if (events.isEmpty()) return;

        // Оптимістична блокування — версія події у заголовку
        for (DomainEvent event : events) {
            Headers headers = new RecordHeaders();
            headers.add("aggregate-version", String.valueOf(event.getVersion()).getBytes());
            headers.add("event-type", event.getClass().getSimpleName().getBytes());

            ProducerRecord<String, DomainEvent> record = new ProducerRecord<>(
                TOPIC,
                null,
                order.getId().toString(),
                event,
                headers
            );

            // Синхронна відправка для гарантії запису
            kafkaTemplate.send(record).get(5, TimeUnit.SECONDS);
        }
    }

    public Order load(Long orderId) {
        // Воспроізводимо всі події для агрегату
        List<DomainEvent> events = replayEvents(TOPIC, orderId.toString());
        if (events.isEmpty()) throw new OrderNotFoundException(orderId);
        return Order.reconstitute(events);
    }

    private List<DomainEvent> replayEvents(String topic, String aggregateId) {
        // Читаємо всі партиції та фільтруємо за ключем агрегату
        // У реальному продукті: використовуємо окремий топік per-aggregate
        // або EventStoreDB замість Kafka для кращої підтримки читання за aggregate ID
        List<DomainEvent> events = new ArrayList<>();
        // ... реалізація читання за ключем
        return events;
    }
}

Проекції та Read Models

З Event Stream будуються Read Models — денормалізовані представлення для конкретних запитів:

@Component
public class OrderReadModelProjection {

    @Autowired
    private OrderReadModelRepository readRepo;

    @KafkaListener(topics = "order-events", groupId = "order-read-model-projector")
    public void project(ConsumerRecord<String, DomainEvent> record) {
        DomainEvent event = record.value();

        switch (event) {
            case OrderCreated e -> {
                OrderReadModel model = new OrderReadModel();
                model.setOrderId(Long.parseLong(e.getAggregateId()));
                model.setUserId(e.getUserId());
                model.setStatus("CREATED");
                model.setTotalAmount(e.getTotalAmount());
                model.setItemCount(e.getItems().size());
                model.setCreatedAt(e.getOccurredAt());
                readRepo.save(model);
            }
            case OrderPaid e -> readRepo.updateStatus(
                Long.parseLong(e.getAggregateId()), "PAID", e.getOccurredAt()
            );
            case OrderShipped e -> readRepo.updateStatusWithTracking(
                Long.parseLong(e.getAggregateId()), "SHIPPED",
                e.getTrackingCode(), e.getEstimatedDelivery()
            );
            case OrderCancelled e -> readRepo.updateStatus(
                Long.parseLong(e.getAggregateId()), "CANCELLED", e.getOccurredAt()
            );
            default -> {}
        }
    }
}

Snapshots — оптимізація воспроізведення

При тисячах подій на агрегат воспроізведення з початку займає час. Snapshots зберігають стан на певній версії:

@Service
public class SnapshotService {

    public void createSnapshotIfNeeded(Order order) {
        if (order.getVersion() % 100 == 0) { // кожні 100 подій
            OrderSnapshot snapshot = new OrderSnapshot(
                order.getId(),
                order.getVersion(),
                objectMapper.writeValueAsString(order),
                Instant.now()
            );
            snapshotRepo.save(snapshot);
        }
    }

    public Order loadWithSnapshot(Long orderId) {
        Optional<OrderSnapshot> snapshot = snapshotRepo.findLatest(orderId);

        if (snapshot.isPresent()) {
            Order order = objectMapper.readValue(snapshot.get().getState(), Order.class);
            // Загружаємо тільки події після версії snapshots
            List<DomainEvent> newEvents = eventRepo.loadAfterVersion(
                orderId, snapshot.get().getVersion()
            );
            for (DomainEvent event : newEvents) {
                order.applyHistorical(event);
            }
            return order;
        }

        return eventRepo.load(orderId);
    }
}

Графік

День 1–2 — проектування событійної моделі: які події, їхні поля, версіонування. Avro-схеми або Protobuf для кожного типу події.

День 3–4 — розробка агрегатів та Event Store Repository, реалізація apply-методів, оптимістична блокування.

День 5–6 — реалізація проекцій для Read Models, тестування воспроізведення.

День 7 — snapshots, нагрузочне тестування, оцінка часу воспроізведення без snapshots vs зі snapshots.

День 8 — моніторинг відставання проекцій, інструмент для пересоздання Read Model з Event Log (full replay).