Реалізація паттерну Saga через черги сообщень
Розподілені транзакції через двофазний commit (2PC) не працюють у мікросервісній архітектурі — вони створюють синхронні залежності та точки відмови. Паттерн Saga розв'язує цю проблему: довга транзакція розбивається на послідовність локальних транзакцій, кожна з яких публікує подію для наступного кроку. При помилці виконуються компенсуючі транзакції у зворотному порядку.
Два підходи до оркестрації Saga
Choreography (хореографія) — кожен сервіс реагує на події та публікує свої. Немає центрального координатора. Добре для простих сценаріїв з 2–3 кроками. Складно налагоджувати з збільшенням кількості учасників.
Orchestration (оркестрація) — виділений Saga Orchestrator знає весь сценарій, відправляє команди кожному сервісу та чекає відповідь. Легше налагоджувати та моніторити. Рекомендується для складних сценаріїв.
Приклад: Saga оформлення замовлення
Сценарій: CreateOrder → ReserveInventory → ProcessPayment → ShipOrder
При помилці на будь-якому кроці — компенсація у зворотному порядку.
CreateOrder
↓ success
ReserveInventory
↓ success
ProcessPayment
↓ failure → CancelPayment
↓
ReleaseInventory
↓
CancelOrder
Реалізація Orchestration Saga (Java/Spring)
// Стан Saga зберігається в БД — гарантує восстановлення після перезавантаження
@Entity
@Table(name = "order_sagas")
public class OrderSaga {
@Id
private String sagaId;
private Long orderId;
@Enumerated(EnumType.STRING)
private SagaStatus status; // STARTED, INVENTORY_RESERVED, PAYMENT_PROCESSING, COMPLETED, COMPENSATING, FAILED
@Enumerated(EnumType.STRING)
private SagaStep currentStep;
private String failureReason;
private int retryCount;
@Column(columnDefinition = "jsonb")
private String context; // JSON з даними для компенсації
}
@Service
@Transactional
public class OrderSagaOrchestrator {
@Autowired
private OrderSagaRepository sagaRepo;
@Autowired
private MessagePublisher publisher;
public void startSaga(CreateOrderCommand command) {
// Локальна транзакція: створюємо замовлення + зберігаємо Saga
Order order = orderService.createDraft(command);
OrderSaga saga = new OrderSaga();
saga.setSagaId(UUID.randomUUID().toString());
saga.setOrderId(order.getId());
saga.setStatus(SagaStatus.STARTED);
saga.setCurrentStep(SagaStep.RESERVE_INVENTORY);
sagaRepo.save(saga);
// Публікуємо команду для наступного кроку
publisher.publish("inventory-commands", new ReserveInventoryCommand(
saga.getSagaId(),
order.getId(),
command.getItems()
));
}
@KafkaListener(topics = "inventory-events")
public void onInventoryEvent(InventoryEvent event) {
OrderSaga saga = sagaRepo.findBySagaId(event.getSagaId())
.orElseThrow(() -> new IllegalStateException("Saga not found: " + event.getSagaId()));
if (event.getType() == EventType.INVENTORY_RESERVED) {
saga.setStatus(SagaStatus.INVENTORY_RESERVED);
saga.setCurrentStep(SagaStep.PROCESS_PAYMENT);
sagaRepo.save(saga);
publisher.publish("payment-commands", new ProcessPaymentCommand(
saga.getSagaId(),
saga.getOrderId(),
event.getReservationId()
));
} else if (event.getType() == EventType.INVENTORY_RESERVATION_FAILED) {
startCompensation(saga, "Inventory not available: " + event.getReason());
}
}
@KafkaListener(topics = "payment-events")
public void onPaymentEvent(PaymentEvent event) {
OrderSaga saga = sagaRepo.findBySagaId(event.getSagaId()).orElseThrow();
if (event.getType() == EventType.PAYMENT_COMPLETED) {
saga.setStatus(SagaStatus.COMPLETED);
saga.setCurrentStep(null);
sagaRepo.save(saga);
orderService.confirmOrder(saga.getOrderId());
publisher.publish("shipping-commands", new CreateShipmentCommand(
saga.getSagaId(), saga.getOrderId()
));
} else if (event.getType() == EventType.PAYMENT_FAILED) {
startCompensation(saga, "Payment failed: " + event.getErrorCode());
}
}
private void startCompensation(OrderSaga saga, String reason) {
saga.setStatus(SagaStatus.COMPENSATING);
saga.setFailureReason(reason);
sagaRepo.save(saga);
// Визначаємо, які кроки потрібно компенсувати на основі currentStep
switch (saga.getCurrentStep()) {
case PROCESS_PAYMENT:
// Інвентар зарезервований, платіж впав
publisher.publish("inventory-commands", new ReleaseInventoryCommand(
saga.getSagaId(), saga.getOrderId()
));
break;
case RESERVE_INVENTORY:
// Не вдалося зарезервувати — тільки скасовуємо замовлення
orderService.cancelOrder(saga.getOrderId(), reason);
saga.setStatus(SagaStatus.FAILED);
sagaRepo.save(saga);
break;
}
}
}
Idempotency — захист від дублів
Кожен крок Saga повинен бути ідемпотентним: повторна команда не повинна створювати дублювання транзакції.
@Service
public class InventoryService {
public void reserveInventory(ReserveInventoryCommand command) {
// Перевіряємо: можливо, уже зарезервували для цієї Saga?
Optional<InventoryReservation> existing =
reservationRepo.findBySagaId(command.getSagaId());
if (existing.isPresent()) {
// Ідемпотентна відповідь — публікуємо ту саму подію
publisher.publish("inventory-events", new InventoryReservedEvent(
command.getSagaId(),
existing.get().getId()
));
return;
}
// Виконуємо резервування
try {
InventoryReservation reservation = performReservation(command);
publisher.publish("inventory-events", new InventoryReservedEvent(
command.getSagaId(), reservation.getId()
));
} catch (InsufficientInventoryException e) {
publisher.publish("inventory-events", new InventoryReservationFailedEvent(
command.getSagaId(), e.getMessage()
));
}
}
}
Реалізація через RabbitMQ
Альтернатива Kafka — та ж логіка через RabbitMQ з topic exchange:
# Python-оркестратор
import pika
import json
import uuid
from enum import Enum
class SagaOrchestrator:
def __init__(self, connection):
self.channel = connection.channel()
self.channel.exchange_declare('saga-commands', 'topic', durable=True)
self.channel.exchange_declare('saga-events', 'topic', durable=True)
# Черга для отримання відповідей від сервісів
result = self.channel.queue_declare('orchestrator-responses', durable=True)
self.channel.queue_bind(result.method.queue, 'saga-events', '#')
self.channel.basic_consume(
result.method.queue,
self.on_event,
auto_ack=False
)
def start_order_saga(self, order_data: dict) -> str:
saga_id = str(uuid.uuid4())
# Зберігаємо Saga в БД (psycopg2/SQLAlchemy)
self.save_saga(saga_id, order_data['order_id'], 'STARTED', 'RESERVE_INVENTORY')
# Відправляємо першу команду
self.channel.basic_publish(
exchange='saga-commands',
routing_key='inventory.reserve',
body=json.dumps({
'saga_id': saga_id,
'order_id': order_data['order_id'],
'items': order_data['items'],
}),
properties=pika.BasicProperties(
delivery_mode=2,
message_id=str(uuid.uuid4()),
correlation_id=saga_id,
)
)
return saga_id
def on_event(self, channel, method, properties, body):
event = json.loads(body)
saga = self.load_saga(event['saga_id'])
if event['type'] == 'INVENTORY_RESERVED':
self.proceed_to_payment(saga, event)
elif event['type'] in ('INVENTORY_FAILED', 'PAYMENT_FAILED'):
self.compensate(saga, event['reason'])
channel.basic_ack(method.delivery_tag)
Моніторинг та налагодження Saga
Без видимості в стани Saga налагодження розподілених транзакцій крайньо складно.
-- Завислі Saga — не завершилися за 30 хвилин
SELECT saga_id, order_id, status, current_step,
created_at, NOW() - created_at AS age
FROM order_sagas
WHERE status NOT IN ('COMPLETED', 'FAILED')
AND created_at < NOW() - INTERVAL '30 minutes'
ORDER BY created_at;
-- Статистика за день
SELECT status, COUNT(*), AVG(EXTRACT(EPOCH FROM (updated_at - created_at))) as avg_duration_sec
FROM order_sagas
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY status;
Сигнал про завислі Saga:
- alert: StuckSagas
expr: sum(order_sagas_stuck_count) > 0
for: 10m
annotations:
summary: "{{ $value }} order sagas stuck for more than 30 minutes"
Графік
День 1–2 — проектування Saga: визначення кроків, компенсуючих дій, формату команд та подій. Схема БД для стану Saga.
День 3–4 — розробка Orchestrator: обробники подій, логіка компенсації, ідемпотентність команд.
День 5 — інтеграція зі всіма учасними сервісами, реалізація компенсуючих методів у кожному сервісі.
День 6 — тестування сценаріїв відмови: вбиваємо кожен сервіс на кожному кроці, перевіряємо коректність компенсації.
День 7 — моніторинг станів Saga, сигнали про завислі транзакції, інструмент для ручного управління (resume/compensate/retry).







