Розробка системи резервування та відмовостійкості AI-агентів
Нестійкість AI-агентів - забезпечення безперервності сервісу при збоях: недоступності LLM API, помилках інструментів, винятках в коді агента. Критично для виробництва систем з SLA.
Шари відмовостійкості
Шар LLM: fallback на альтернативні моделі за недоступності основної. OpenAI → Anthropic → локальна модель.
Шар інструментів: retry з exponential backoff, circuit breaker, альтернативні інструменти.
Шар агента: checkpoint'и прогресу, відновлення після падіння, human escalation при критичних помилках.
Шар оркестрації: багатоагентна резервація, health checks, автоматичне перепризначення завдань.
Fallback LLM Provider
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import anthropic
import openai
class ResilientLLMClient:
def __init__(self):
self.providers = [
{"name": "openai", "client": openai.AsyncOpenAI(), "model": "gpt-4o"},
{"name": "anthropic", "client": anthropic.AsyncAnthropic(), "model": "claude-3-5-sonnet-20241022"},
{"name": "local_vllm", "client": openai.AsyncOpenAI(base_url="http://gpu1:8000/v1"), "model": "llama-3-8b"},
]
self.circuit_breakers = {p["name"]: CircuitBreaker() for p in self.providers}
async def generate(self, messages: list, **kwargs) -> str:
last_error = None
for provider in self.providers:
cb = self.circuit_breakers[provider["name"]]
if cb.is_open():
continue # провайдер отключён по circuit breaker
try:
result = await self._call_provider(provider, messages, **kwargs)
cb.record_success()
return result
except Exception as e:
cb.record_failure()
last_error = e
logger.warning(f"Provider {provider['name']} failed: {e}")
raise RuntimeError(f"All LLM providers failed. Last error: {last_error}")
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((openai.RateLimitError, openai.APIConnectionError))
)
async def _call_provider(self, provider: dict, messages: list, **kwargs) -> str:
response = await provider["client"].chat.completions.create(
model=provider["model"],
messages=messages,
**kwargs
)
return response.choices[0].message.content
Circuit Breaker
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed" # нормальная работа
OPEN = "open" # провайдер отключён
HALF_OPEN = "half_open" # тестируем восстановление
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = CircuitState.CLOSED
def is_open(self) -> bool:
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = CircuitState.HALF_OPEN
return False
return True
return False
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.error(f"Circuit breaker OPEN after {self.failure_count} failures")
def record_success(self):
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
logger.info("Circuit breaker CLOSED - provider recovered")
Checkpoint та відновлення завдання
Для тривалих агентських завдань (> 1 хв) зберігаємо прогрес:
@dataclass
class AgentCheckpoint:
task_id: str
step_index: int
completed_steps: list[StepResult]
state: dict # произвольное состояние агента
saved_at: datetime
class CheckpointManager:
def __init__(self, storage: Redis):
self.storage = storage
async def save(self, checkpoint: AgentCheckpoint):
key = f"checkpoint:{checkpoint.task_id}"
await self.storage.setex(
key,
3600, # 1 час TTL
pickle.dumps(checkpoint)
)
async def load(self, task_id: str) -> AgentCheckpoint | None:
key = f"checkpoint:{task_id}"
data = await self.storage.get(key)
return pickle.loads(data) if data else None
async def resume_or_start(self, task: AgentTask) -> AgentCheckpoint:
existing = await self.load(task.id)
if existing:
logger.info(f"Resuming task {task.id} from step {existing.step_index}")
return existing
return AgentCheckpoint(task_id=task.id, step_index=0, completed_steps=[], state={}, saved_at=datetime.utcnow())
Human escalation при нерозв'язних помилках
class EscalationPolicy:
MAX_RETRIES = 3
MAX_FALLBACK_ATTEMPTS = 2
async def handle_failure(
self,
task: AgentTask,
error: Exception,
retry_count: int
) -> EscalationDecision:
if retry_count < self.MAX_RETRIES and self._is_retriable(error):
return EscalationDecision(action="retry", delay_seconds=2 ** retry_count)
if isinstance(error, ToolUnavailableError):
return EscalationDecision(action="use_fallback_tool", tool=self._get_fallback_tool(error.tool))
# Эскалация к человеку
await self.notify_human(task, error, retry_count)
return EscalationDecision(
action="escalate",
message=f"Task {task.id} requires human intervention after {retry_count} retries: {error}"
)
Терміни впровадження
Тиждень 1–2: Fallback LLM providers, retry логіка, базові circuit breakers
Тиждень 3–4: Checkpoint система, human escalation workflow
Місяць 2: Multi-agent redundancy, chaos testing, SLA dashboard







