Розробка системи резервування та відмовостійкості AI-агентів

Проектуємо та впроваджуємо системи штучного інтелекту: від прототипу до production-ready рішення. Наша команда поєднує експертизу в машинному навчанні, дата-інжинірингу та MLOps, щоб AI працював не в лабораторії, а в реальному бізнесі.
Показано 1 з 1Усі 1566 послуг
Розробка системи резервування та відмовостійкості AI-агентів
Складний
від 1 тижня до 3 місяців
Часті запитання

Напрямки AI-розробки

Етапи розробки AI-рішення

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1285
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    902
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1121
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    857

Розробка системи резервування та відмовостійкості AI-агентів

Нестійкість AI-агентів - забезпечення безперервності сервісу при збоях: недоступності LLM API, помилках інструментів, винятках в коді агента. Критично для виробництва систем з SLA.

Шари відмовостійкості

Шар LLM: fallback на альтернативні моделі за недоступності основної. OpenAI → Anthropic → локальна модель.

Шар інструментів: retry з exponential backoff, circuit breaker, альтернативні інструменти.

Шар агента: checkpoint'и прогресу, відновлення після падіння, human escalation при критичних помилках.

Шар оркестрації: багатоагентна резервація, health checks, автоматичне перепризначення завдань.

Fallback LLM Provider

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import anthropic
import openai

class ResilientLLMClient:
    def __init__(self):
        self.providers = [
            {"name": "openai", "client": openai.AsyncOpenAI(), "model": "gpt-4o"},
            {"name": "anthropic", "client": anthropic.AsyncAnthropic(), "model": "claude-3-5-sonnet-20241022"},
            {"name": "local_vllm", "client": openai.AsyncOpenAI(base_url="http://gpu1:8000/v1"), "model": "llama-3-8b"},
        ]
        self.circuit_breakers = {p["name"]: CircuitBreaker() for p in self.providers}

    async def generate(self, messages: list, **kwargs) -> str:
        last_error = None

        for provider in self.providers:
            cb = self.circuit_breakers[provider["name"]]
            if cb.is_open():
                continue  # провайдер отключён по circuit breaker

            try:
                result = await self._call_provider(provider, messages, **kwargs)
                cb.record_success()
                return result
            except Exception as e:
                cb.record_failure()
                last_error = e
                logger.warning(f"Provider {provider['name']} failed: {e}")

        raise RuntimeError(f"All LLM providers failed. Last error: {last_error}")

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception_type((openai.RateLimitError, openai.APIConnectionError))
    )
    async def _call_provider(self, provider: dict, messages: list, **kwargs) -> str:
        response = await provider["client"].chat.completions.create(
            model=provider["model"],
            messages=messages,
            **kwargs
        )
        return response.choices[0].message.content

Circuit Breaker

from enum import Enum
import time

class CircuitState(Enum):
    CLOSED = "closed"       # нормальная работа
    OPEN = "open"           # провайдер отключён
    HALF_OPEN = "half_open" # тестируем восстановление

class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.failure_count = 0
        self.last_failure_time = 0
        self.state = CircuitState.CLOSED

    def is_open(self) -> bool:
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = CircuitState.HALF_OPEN
                return False
            return True
        return False

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            logger.error(f"Circuit breaker OPEN after {self.failure_count} failures")

    def record_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.CLOSED
            self.failure_count = 0
            logger.info("Circuit breaker CLOSED - provider recovered")

Checkpoint та відновлення завдання

Для тривалих агентських завдань (> 1 хв) зберігаємо прогрес:

@dataclass
class AgentCheckpoint:
    task_id: str
    step_index: int
    completed_steps: list[StepResult]
    state: dict             # произвольное состояние агента
    saved_at: datetime

class CheckpointManager:
    def __init__(self, storage: Redis):
        self.storage = storage

    async def save(self, checkpoint: AgentCheckpoint):
        key = f"checkpoint:{checkpoint.task_id}"
        await self.storage.setex(
            key,
            3600,  # 1 час TTL
            pickle.dumps(checkpoint)
        )

    async def load(self, task_id: str) -> AgentCheckpoint | None:
        key = f"checkpoint:{task_id}"
        data = await self.storage.get(key)
        return pickle.loads(data) if data else None

    async def resume_or_start(self, task: AgentTask) -> AgentCheckpoint:
        existing = await self.load(task.id)
        if existing:
            logger.info(f"Resuming task {task.id} from step {existing.step_index}")
            return existing
        return AgentCheckpoint(task_id=task.id, step_index=0, completed_steps=[], state={}, saved_at=datetime.utcnow())

Human escalation при нерозв'язних помилках

class EscalationPolicy:
    MAX_RETRIES = 3
    MAX_FALLBACK_ATTEMPTS = 2

    async def handle_failure(
        self,
        task: AgentTask,
        error: Exception,
        retry_count: int
    ) -> EscalationDecision:

        if retry_count < self.MAX_RETRIES and self._is_retriable(error):
            return EscalationDecision(action="retry", delay_seconds=2 ** retry_count)

        if isinstance(error, ToolUnavailableError):
            return EscalationDecision(action="use_fallback_tool", tool=self._get_fallback_tool(error.tool))

        # Эскалация к человеку
        await self.notify_human(task, error, retry_count)
        return EscalationDecision(
            action="escalate",
            message=f"Task {task.id} requires human intervention after {retry_count} retries: {error}"
        )

Терміни впровадження

Тиждень 1–2: Fallback LLM providers, retry логіка, базові circuit breakers

Тиждень 3–4: Checkpoint система, human escalation workflow

Місяць 2: Multi-agent redundancy, chaos testing, SLA dashboard