Реалізація Saga Pattern для розподілених транзакцій
У архітектурі мікросервісів неможливо використовувати ACID-транзакції крізь межі сервісів. Saga Pattern вирішує задачу розподіленої узгодженості: бізнес-транзакція розбивається на послідовність локальних транзакцій у кожному сервісі, а при збої виконуються компенсуючі транзакції для відкату.
Два види Saga
Хореографія (Choreography) — сервіси реагують на події один одного без центрального координатора:
OrderService InventoryService PaymentService ShippingService
│ │ │ │
│── OrderCreated ───►│ │ │
│ │── StockReserved ──►│ │
│ │ │── PaymentProcessed►│
│ │ │ │── ShipmentCreated
│ │ │ │
│ (при помилці платежу) │ │
│ │◄── StockReleased ──│ │
Оркестрація (Orchestration) — центральний Saga Orchestrator явно управляє кроками:
class CreateOrderSaga {
async execute(context: OrderSagaContext): Promise<void> {
try {
// Крок 1: Зарезервувати товар
const reservation = await this.inventoryService.reserveStock(
context.orderId, context.items
);
context.reservationId = reservation.id;
// Крок 2: Списати оплату
const payment = await this.paymentService.charge(
context.customerId, context.totalAmount
);
context.paymentId = payment.id;
// Крок 3: Створити відправку
await this.shippingService.createShipment(
context.orderId, context.shippingAddress
);
// Крок 4: Підтвердити замовлення
await this.orderService.confirmOrder(context.orderId);
} catch (error) {
await this.compensate(context, error);
throw new SagaFailedError(context.orderId, error);
}
}
async compensate(context: OrderSagaContext, failedAt: Error): Promise<void> {
// Компенсації виконуються у зворотному порядку
if (context.paymentId) {
await this.paymentService.refund(context.paymentId)
.catch(e => this.logger.error('Refund failed', e));
}
if (context.reservationId) {
await this.inventoryService.releaseReservation(context.reservationId)
.catch(e => this.logger.error('Release failed', e));
}
await this.orderService.cancelOrder(context.orderId, 'Saga compensation');
}
}
Персистентна Saga зі станом
Saga повинна пережити перезавантаження сервісу. Стан зберігається в БД:
interface SagaState {
sagaId: string;
sagaType: string;
status: 'running' | 'completed' | 'failed' | 'compensating';
currentStep: number;
context: Record<string, unknown>;
completedSteps: string[];
failedStep?: string;
createdAt: Date;
updatedAt: Date;
}
class PersistentSagaOrchestrator {
async startSaga(sagaType: string, context: unknown): Promise<string> {
const sagaId = uuidv4();
await this.sagaRepo.save({
sagaId, sagaType, status: 'running',
currentStep: 0, context, completedSteps: []
});
await this.executeSaga(sagaId);
return sagaId;
}
async resumeSaga(sagaId: string): Promise<void> {
const state = await this.sagaRepo.findById(sagaId);
if (!state || state.status !== 'running') return;
// Відновлюємо з незавершеного кроку
await this.executeSaga(sagaId, state.currentStep);
}
}
Temporal.io для оркестрації саг
Temporal — production-ready рушій для довгоживучих workflow (включаючи саги):
import { proxyActivities, sleep } from '@temporalio/workflow';
const { reserveStock, chargePayment, createShipment, releaseStock, refund } =
proxyActivities({ startToCloseTimeout: '10 seconds' });
export async function createOrderWorkflow(input: CreateOrderInput): Promise<void> {
let stockReserved = false;
let paymentCharged = false;
try {
await reserveStock({ orderId: input.orderId, items: input.items });
stockReserved = true;
await chargePayment({ orderId: input.orderId, amount: input.amount });
paymentCharged = true;
await createShipment({ orderId: input.orderId, address: input.address });
} catch (error) {
// Temporal гарантує виконання компенсацій
if (paymentCharged) {
await refund({ orderId: input.orderId });
}
if (stockReserved) {
await releaseStock({ orderId: input.orderId });
}
throw error;
}
}
Temporal автоматично повторює activity, зберігає історію виконання, дозволяє інспектувати та налагоджувати workflow через UI.
Хореографія через Kafka
// Order Service публікує подію
await kafka.producer.send({
topic: 'order.events',
messages: [{ key: orderId, value: JSON.stringify({
type: 'OrderCreated', orderId, items, customerId
})}]
});
// Inventory Service слухає та резервує
kafka.consumer.subscribe({ topic: 'order.events' });
kafka.consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value.toString());
if (event.type !== 'OrderCreated') return;
try {
await inventoryService.reserveStock(event.orderId, event.items);
// Публікуємо успіх
await kafka.producer.send({
topic: 'inventory.events',
messages: [{ key: event.orderId, value: JSON.stringify({
type: 'StockReserved', orderId: event.orderId
})}]
});
} catch {
// Публікуємо невдачу — Order Service відкатиться
await kafka.producer.send({
topic: 'inventory.events',
messages: [{ key: event.orderId, value: JSON.stringify({
type: 'StockReservationFailed', orderId: event.orderId
})}]
});
}
}
});
Ідемпотентність — обов'язкова вимога
Кожна операція в саге повинна бути ідемпотентною: повторний виклик не створює дублікати.
async function reserveStock(orderId: string, items: Item[]): Promise<Reservation> {
// Перевіряємо, чи не було створено резервацію для цього замовлення
const existing = await reservationRepo.findByOrderId(orderId);
if (existing) return existing; // ідемпотентно
return reservationRepo.create({ orderId, items, status: 'reserved' });
}
Терміни реалізації
- Saga з оркестрацією (2–3 сервіси, без Temporal) — 1–2 тижні
- Saga з Temporal + моніторинг станів — 2–3 тижні
- Хореографія через Kafka з ідемпотентними обробниками — 2–4 тижні







