AI Agent Redundancy and Fault Tolerance System Development

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI Agent Redundancy and Fault Tolerance System Development
Complex
from 1 week to 3 months
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1212
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822

Development of a redundancy and fault-tolerance system for AI agents

AI agent fault tolerance ensures service continuity during failures, such as LLM API unavailability, tool errors, or exceptions in agent code. This is critical for production systems with SLAs.

Fault tolerance layers

LLM layer: fallback to alternative models when the primary one is unavailable. OpenAI → Anthropic → local model.

Tools layer: retry with exponential backoff, circuit breaker, alternative tools.

Agent layer: progress checkpoints, crash recovery, human escalation on critical errors.

Orchestration layer: multi-agent reservation, health checks, automatic task reassignment.

Fallback LLM Provider

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import anthropic
import openai

class ResilientLLMClient:
    def __init__(self):
        self.providers = [
            {"name": "openai", "client": openai.AsyncOpenAI(), "model": "gpt-4o"},
            {"name": "anthropic", "client": anthropic.AsyncAnthropic(), "model": "claude-3-5-sonnet-20241022"},
            {"name": "local_vllm", "client": openai.AsyncOpenAI(base_url="http://gpu1:8000/v1"), "model": "llama-3-8b"},
        ]
        self.circuit_breakers = {p["name"]: CircuitBreaker() for p in self.providers}

    async def generate(self, messages: list, **kwargs) -> str:
        last_error = None

        for provider in self.providers:
            cb = self.circuit_breakers[provider["name"]]
            if cb.is_open():
                continue  # провайдер отключён по circuit breaker

            try:
                result = await self._call_provider(provider, messages, **kwargs)
                cb.record_success()
                return result
            except Exception as e:
                cb.record_failure()
                last_error = e
                logger.warning(f"Provider {provider['name']} failed: {e}")

        raise RuntimeError(f"All LLM providers failed. Last error: {last_error}")

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception_type((openai.RateLimitError, openai.APIConnectionError))
    )
    async def _call_provider(self, provider: dict, messages: list, **kwargs) -> str:
        response = await provider["client"].chat.completions.create(
            model=provider["model"],
            messages=messages,
            **kwargs
        )
        return response.choices[0].message.content

Circuit Breaker

from enum import Enum
import time

class CircuitState(Enum):
    CLOSED = "closed"       # нормальная работа
    OPEN = "open"           # провайдер отключён
    HALF_OPEN = "half_open" # тестируем восстановление

class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.failure_count = 0
        self.last_failure_time = 0
        self.state = CircuitState.CLOSED

    def is_open(self) -> bool:
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = CircuitState.HALF_OPEN
                return False
            return True
        return False

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            logger.error(f"Circuit breaker OPEN after {self.failure_count} failures")

    def record_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.CLOSED
            self.failure_count = 0
            logger.info("Circuit breaker CLOSED - provider recovered")

Checkpoint and task recovery

For long-running agent tasks (> 1 min), save progress:

@dataclass
class AgentCheckpoint:
    task_id: str
    step_index: int
    completed_steps: list[StepResult]
    state: dict             # произвольное состояние агента
    saved_at: datetime

class CheckpointManager:
    def __init__(self, storage: Redis):
        self.storage = storage

    async def save(self, checkpoint: AgentCheckpoint):
        key = f"checkpoint:{checkpoint.task_id}"
        await self.storage.setex(
            key,
            3600,  # 1 час TTL
            pickle.dumps(checkpoint)
        )

    async def load(self, task_id: str) -> AgentCheckpoint | None:
        key = f"checkpoint:{task_id}"
        data = await self.storage.get(key)
        return pickle.loads(data) if data else None

    async def resume_or_start(self, task: AgentTask) -> AgentCheckpoint:
        existing = await self.load(task.id)
        if existing:
            logger.info(f"Resuming task {task.id} from step {existing.step_index}")
            return existing
        return AgentCheckpoint(task_id=task.id, step_index=0, completed_steps=[], state={}, saved_at=datetime.utcnow())

Human escalation for unresolvable errors

class EscalationPolicy:
    MAX_RETRIES = 3
    MAX_FALLBACK_ATTEMPTS = 2

    async def handle_failure(
        self,
        task: AgentTask,
        error: Exception,
        retry_count: int
    ) -> EscalationDecision:

        if retry_count < self.MAX_RETRIES and self._is_retriable(error):
            return EscalationDecision(action="retry", delay_seconds=2 ** retry_count)

        if isinstance(error, ToolUnavailableError):
            return EscalationDecision(action="use_fallback_tool", tool=self._get_fallback_tool(error.tool))

        # Эскалация к человеку
        await self.notify_human(task, error, retry_count)
        return EscalationDecision(
            action="escalate",
            message=f"Task {task.id} requires human intervention after {retry_count} retries: {error}"
        )

Implementation timeframes

Week 1–2: Fallback LLM providers, retry logic, basic circuit breakers

Week 3–4: Checkpoint system, human escalation workflow

Month 2: Multi-agent redundancy, chaos testing, SLA dashboard