Реалізація паттерну Saga через черги повідомлень

Наша компанія займається розробкою, підтримкою та обслуговуванням сайтів будь-якої складності. Від простих односторінкових сайтів до масштабних кластерних систем, побудованих на мікро сервісах. Досвід розробників підтверджено сертифікатами від вендорів.

Розробка та обслуговування будь-яких видів сайтів:

Інформаційні сайти або веб-програми
Сайти візитки, landing page, корпоративні сайти, онлайн каталоги, квіз, промо-сайти, блоги, ресурси новин, інформаційні портали, форуми, агрегатори
Сайти або веб-програми електронної комерції
Інтернет-магазини, B2B-портали, маркетплейси, онлайн-обмінники, кешбек-сайти, біржі, дропшиппінг-платформи, парсери товарів
Веб-програми для управління бізнес-процесами
CRM-системи, ERP-системи, корпоративні портали, системи управління виробництвом, парсери інформації
Сайти або веб-програми електронних послуг
Дошки оголошень, онлайн-школи, онлайн-кінотеатри, конструктори сайтів, портали надання електронних послуг, відеохостинги, тематичні портали

Це лише деякі з технічних типів сайтів, з якими ми працюємо, і кожен із них може мати свої специфічні особливості та функціональність, а також бути адаптованим під конкретні потреби та цілі клієнта.

Пропоновані послуги
Показано 1 з 1 послугУсі 2065 послуг
Реалізація паттерну Saga через черги повідомлень
Складна
~5 робочих днів
Часті питання

Наші компетенції:

Етапи розробки

Останні роботи

  • image_website-b2b-advance_0.png
    Розробка сайту компанії B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    874
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Розробка веб-сайту для компанії ФІКСПЕР
    851

Реалізація паттерну Saga через черги сообщень

Розподілені транзакції через двофазний commit (2PC) не працюють у мікросервісній архітектурі — вони створюють синхронні залежності та точки відмови. Паттерн Saga розв'язує цю проблему: довга транзакція розбивається на послідовність локальних транзакцій, кожна з яких публікує подію для наступного кроку. При помилці виконуються компенсуючі транзакції у зворотному порядку.

Два підходи до оркестрації Saga

Choreography (хореографія) — кожен сервіс реагує на події та публікує свої. Немає центрального координатора. Добре для простих сценаріїв з 2–3 кроками. Складно налагоджувати з збільшенням кількості учасників.

Orchestration (оркестрація) — виділений Saga Orchestrator знає весь сценарій, відправляє команди кожному сервісу та чекає відповідь. Легше налагоджувати та моніторити. Рекомендується для складних сценаріїв.

Приклад: Saga оформлення замовлення

Сценарій: CreateOrder → ReserveInventory → ProcessPayment → ShipOrder

При помилці на будь-якому кроці — компенсація у зворотному порядку.

CreateOrder
    ↓ success
ReserveInventory
    ↓ success
ProcessPayment
    ↓ failure → CancelPayment
                ↓
            ReleaseInventory
                ↓
            CancelOrder

Реалізація Orchestration Saga (Java/Spring)

// Стан Saga зберігається в БД — гарантує восстановлення після перезавантаження
@Entity
@Table(name = "order_sagas")
public class OrderSaga {
    @Id
    private String sagaId;
    private Long orderId;

    @Enumerated(EnumType.STRING)
    private SagaStatus status; // STARTED, INVENTORY_RESERVED, PAYMENT_PROCESSING, COMPLETED, COMPENSATING, FAILED

    @Enumerated(EnumType.STRING)
    private SagaStep currentStep;

    private String failureReason;
    private int retryCount;

    @Column(columnDefinition = "jsonb")
    private String context; // JSON з даними для компенсації
}

@Service
@Transactional
public class OrderSagaOrchestrator {

    @Autowired
    private OrderSagaRepository sagaRepo;

    @Autowired
    private MessagePublisher publisher;

    public void startSaga(CreateOrderCommand command) {
        // Локальна транзакція: створюємо замовлення + зберігаємо Saga
        Order order = orderService.createDraft(command);

        OrderSaga saga = new OrderSaga();
        saga.setSagaId(UUID.randomUUID().toString());
        saga.setOrderId(order.getId());
        saga.setStatus(SagaStatus.STARTED);
        saga.setCurrentStep(SagaStep.RESERVE_INVENTORY);
        sagaRepo.save(saga);

        // Публікуємо команду для наступного кроку
        publisher.publish("inventory-commands", new ReserveInventoryCommand(
            saga.getSagaId(),
            order.getId(),
            command.getItems()
        ));
    }

    @KafkaListener(topics = "inventory-events")
    public void onInventoryEvent(InventoryEvent event) {
        OrderSaga saga = sagaRepo.findBySagaId(event.getSagaId())
            .orElseThrow(() -> new IllegalStateException("Saga not found: " + event.getSagaId()));

        if (event.getType() == EventType.INVENTORY_RESERVED) {
            saga.setStatus(SagaStatus.INVENTORY_RESERVED);
            saga.setCurrentStep(SagaStep.PROCESS_PAYMENT);
            sagaRepo.save(saga);

            publisher.publish("payment-commands", new ProcessPaymentCommand(
                saga.getSagaId(),
                saga.getOrderId(),
                event.getReservationId()
            ));
        } else if (event.getType() == EventType.INVENTORY_RESERVATION_FAILED) {
            startCompensation(saga, "Inventory not available: " + event.getReason());
        }
    }

    @KafkaListener(topics = "payment-events")
    public void onPaymentEvent(PaymentEvent event) {
        OrderSaga saga = sagaRepo.findBySagaId(event.getSagaId()).orElseThrow();

        if (event.getType() == EventType.PAYMENT_COMPLETED) {
            saga.setStatus(SagaStatus.COMPLETED);
            saga.setCurrentStep(null);
            sagaRepo.save(saga);

            orderService.confirmOrder(saga.getOrderId());
            publisher.publish("shipping-commands", new CreateShipmentCommand(
                saga.getSagaId(), saga.getOrderId()
            ));
        } else if (event.getType() == EventType.PAYMENT_FAILED) {
            startCompensation(saga, "Payment failed: " + event.getErrorCode());
        }
    }

    private void startCompensation(OrderSaga saga, String reason) {
        saga.setStatus(SagaStatus.COMPENSATING);
        saga.setFailureReason(reason);
        sagaRepo.save(saga);

        // Визначаємо, які кроки потрібно компенсувати на основі currentStep
        switch (saga.getCurrentStep()) {
            case PROCESS_PAYMENT:
                // Інвентар зарезервований, платіж впав
                publisher.publish("inventory-commands", new ReleaseInventoryCommand(
                    saga.getSagaId(), saga.getOrderId()
                ));
                break;
            case RESERVE_INVENTORY:
                // Не вдалося зарезервувати — тільки скасовуємо замовлення
                orderService.cancelOrder(saga.getOrderId(), reason);
                saga.setStatus(SagaStatus.FAILED);
                sagaRepo.save(saga);
                break;
        }
    }
}

Idempotency — захист від дублів

Кожен крок Saga повинен бути ідемпотентним: повторна команда не повинна створювати дублювання транзакції.

@Service
public class InventoryService {

    public void reserveInventory(ReserveInventoryCommand command) {
        // Перевіряємо: можливо, уже зарезервували для цієї Saga?
        Optional<InventoryReservation> existing =
            reservationRepo.findBySagaId(command.getSagaId());

        if (existing.isPresent()) {
            // Ідемпотентна відповідь — публікуємо ту саму подію
            publisher.publish("inventory-events", new InventoryReservedEvent(
                command.getSagaId(),
                existing.get().getId()
            ));
            return;
        }

        // Виконуємо резервування
        try {
            InventoryReservation reservation = performReservation(command);
            publisher.publish("inventory-events", new InventoryReservedEvent(
                command.getSagaId(), reservation.getId()
            ));
        } catch (InsufficientInventoryException e) {
            publisher.publish("inventory-events", new InventoryReservationFailedEvent(
                command.getSagaId(), e.getMessage()
            ));
        }
    }
}

Реалізація через RabbitMQ

Альтернатива Kafka — та ж логіка через RabbitMQ з topic exchange:

# Python-оркестратор
import pika
import json
import uuid
from enum import Enum

class SagaOrchestrator:
    def __init__(self, connection):
        self.channel = connection.channel()
        self.channel.exchange_declare('saga-commands', 'topic', durable=True)
        self.channel.exchange_declare('saga-events', 'topic', durable=True)

        # Черга для отримання відповідей від сервісів
        result = self.channel.queue_declare('orchestrator-responses', durable=True)
        self.channel.queue_bind(result.method.queue, 'saga-events', '#')
        self.channel.basic_consume(
            result.method.queue,
            self.on_event,
            auto_ack=False
        )

    def start_order_saga(self, order_data: dict) -> str:
        saga_id = str(uuid.uuid4())

        # Зберігаємо Saga в БД (psycopg2/SQLAlchemy)
        self.save_saga(saga_id, order_data['order_id'], 'STARTED', 'RESERVE_INVENTORY')

        # Відправляємо першу команду
        self.channel.basic_publish(
            exchange='saga-commands',
            routing_key='inventory.reserve',
            body=json.dumps({
                'saga_id': saga_id,
                'order_id': order_data['order_id'],
                'items': order_data['items'],
            }),
            properties=pika.BasicProperties(
                delivery_mode=2,
                message_id=str(uuid.uuid4()),
                correlation_id=saga_id,
            )
        )
        return saga_id

    def on_event(self, channel, method, properties, body):
        event = json.loads(body)
        saga = self.load_saga(event['saga_id'])

        if event['type'] == 'INVENTORY_RESERVED':
            self.proceed_to_payment(saga, event)
        elif event['type'] in ('INVENTORY_FAILED', 'PAYMENT_FAILED'):
            self.compensate(saga, event['reason'])

        channel.basic_ack(method.delivery_tag)

Моніторинг та налагодження Saga

Без видимості в стани Saga налагодження розподілених транзакцій крайньо складно.

-- Завислі Saga — не завершилися за 30 хвилин
SELECT saga_id, order_id, status, current_step,
       created_at, NOW() - created_at AS age
FROM order_sagas
WHERE status NOT IN ('COMPLETED', 'FAILED')
  AND created_at < NOW() - INTERVAL '30 minutes'
ORDER BY created_at;

-- Статистика за день
SELECT status, COUNT(*), AVG(EXTRACT(EPOCH FROM (updated_at - created_at))) as avg_duration_sec
FROM order_sagas
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY status;

Сигнал про завислі Saga:

- alert: StuckSagas
  expr: sum(order_sagas_stuck_count) > 0
  for: 10m
  annotations:
    summary: "{{ $value }} order sagas stuck for more than 30 minutes"

Графік

День 1–2 — проектування Saga: визначення кроків, компенсуючих дій, формату команд та подій. Схема БД для стану Saga.

День 3–4 — розробка Orchestrator: обробники подій, логіка компенсації, ідемпотентність команд.

День 5 — інтеграція зі всіма учасними сервісами, реалізація компенсуючих методів у кожному сервісі.

День 6 — тестування сценаріїв відмови: вбиваємо кожен сервіс на кожному кроці, перевіряємо коректність компенсації.

День 7 — моніторинг станів Saga, сигнали про завислі транзакції, інструмент для ручного управління (resume/compensate/retry).