AI Agent Automatic Failover System Development

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI Agent Automatic Failover System Development
Complex
from 1 week to 3 months
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1212
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822

Development of a failover and auto-replacement system for AI agents

AI agent failover—automatically switches tasks from a faulty agent to a healthy one without losing progress. Critical for systems with continuous processes: monitoring, queue processing, long-running tasks.

Failover architecture

               ┌─────────────────────────────────┐
               │       Task Queue (Redis/Kafka)   │
               └────────────┬────────────────────┘
                            │
              ┌─────────────▼──────────────┐
              │    Orchestrator / Scheduler  │
              │   (health checks, failover)  │
              └──────┬─────────────┬────────┘
                     │             │
              ┌──────▼───┐  ┌──────▼───┐
              │  Agent 1  │  │  Agent 2  │
              │  (active) │  │ (standby) │
              └──────┬───┘  └──────────┘
                     │ crash
              ┌──────▼──────────────────────┐
              │  Failover: задачи Agent 1    │
              │  переданы Agent 2 (из        │
              │  checkpoint или очереди)     │
              └─────────────────────────────┘

Health Check system

import asyncio
import aiohttp
from datetime import datetime, timedelta

@dataclass
class AgentHealth:
    agent_id: str
    last_heartbeat: datetime
    last_task_completed: datetime | None
    consecutive_failures: int
    status: str  # healthy / degraded / failed

class AgentHealthMonitor:
    def __init__(self, failure_threshold: int = 3, heartbeat_timeout: int = 30):
        self.failure_threshold = failure_threshold
        self.heartbeat_timeout = heartbeat_timeout
        self.agent_health: dict[str, AgentHealth] = {}

    async def check_agents(self, agent_ids: list[str]) -> dict[str, AgentHealth]:
        tasks = [self._check_agent(agent_id) for agent_id in agent_ids]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        for agent_id, result in zip(agent_ids, results):
            if isinstance(result, Exception):
                self._record_failure(agent_id, str(result))
            else:
                self._record_success(agent_id, result)

        return self.agent_health

    async def _check_agent(self, agent_id: str) -> dict:
        url = f"http://{agent_id}:8080/health"
        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5)) as session:
            async with session.get(url) as response:
                return await response.json()

    def _record_failure(self, agent_id: str, error: str):
        health = self.agent_health.setdefault(agent_id, AgentHealth(
            agent_id=agent_id, last_heartbeat=datetime.utcnow(),
            last_task_completed=None, consecutive_failures=0, status="healthy"
        ))
        health.consecutive_failures += 1
        if health.consecutive_failures >= self.failure_threshold:
            health.status = "failed"
            logger.critical(f"Agent {agent_id} marked as FAILED after {health.consecutive_failures} failures")

    def get_failed_agents(self) -> list[str]:
        return [aid for aid, h in self.agent_health.items() if h.status == "failed"]

    def get_healthy_agents(self) -> list[str]:
        return [aid for aid, h in self.agent_health.items() if h.status == "healthy"]

Failover orchestrator

class FailoverOrchestrator:
    def __init__(self, task_queue: TaskQueue, health_monitor: AgentHealthMonitor,
                 checkpoint_manager: CheckpointManager):
        self.task_queue = task_queue
        self.health_monitor = health_monitor
        self.checkpoint_manager = checkpoint_manager

    async def run_failover_loop(self):
        while True:
            await asyncio.sleep(15)  # проверка каждые 15 секунд

            failed_agents = self.health_monitor.get_failed_agents()
            if not failed_agents:
                continue

            healthy_agents = self.health_monitor.get_healthy_agents()
            if not healthy_agents:
                logger.critical("NO HEALTHY AGENTS AVAILABLE - alerting on-call")
                await self._page_oncall("All agents failed")
                continue

            for failed_agent in failed_agents:
                await self._failover_agent(failed_agent, healthy_agents)

    async def _failover_agent(self, failed_agent: str, healthy_agents: list[str]):
        logger.info(f"Starting failover for agent {failed_agent}")

        # Получаем задачи, назначенные упавшему агенту
        assigned_tasks = await self.task_queue.get_tasks_for_agent(failed_agent)

        for task in assigned_tasks:
            # Пробуем восстановиться из checkpoint
            checkpoint = await self.checkpoint_manager.load(task.id)

            # Выбираем наименее загруженный здоровый агент
            target_agent = self._select_least_loaded_agent(healthy_agents)

            # Переназначаем задачу с checkpoint
            await self.task_queue.reassign_task(
                task_id=task.id,
                new_agent=target_agent,
                checkpoint=checkpoint,
                priority=TaskPriority.HIGH  # повышаем приоритет для failover задач
            )

            logger.info(f"Task {task.id} reassigned from {failed_agent} to {target_agent}")

        # Помечаем агент как требующий замены
        await self._trigger_agent_replacement(failed_agent)

    async def _trigger_agent_replacement(self, agent_id: str):
        """Запускаем новый инстанс агента через K8s или Docker."""
        # Kubernetes: удаляем pod, ReplicaSet создаёт новый
        if self.is_kubernetes:
            await k8s_client.delete_pod(agent_id)
        else:
            await docker_client.restart_container(agent_id)

Warm standby agents

For critical processes, use warm standby: the backup agent is loaded and ready, but does not process tasks. Failover takes <5 seconds versus 3–10 minutes with a cold start.

class WarmStandbyPool:
    def __init__(self, standby_count: int = 2):
        self.standbys: list[StandbyAgent] = []
        self.standby_count = standby_count

    async def maintain_pool(self):
        """Поддерживаем нужное число standby агентов."""
        while True:
            active_count = len([s for s in self.standbys if s.status == "ready"])
            if active_count < self.standby_count:
                # Запускаем дополнительный standby
                new_standby = await self._launch_standby_agent()
                self.standbys.append(new_standby)
            await asyncio.sleep(30)

    async def promote_standby(self, tasks: list[AgentTask]) -> StandbyAgent:
        """Переводим standby в активный режим."""
        ready_standbys = [s for s in self.standbys if s.status == "ready"]
        if not ready_standbys:
            raise RuntimeError("No standby agents available")

        agent = ready_standbys[0]
        agent.status = "active"
        await agent.assign_tasks(tasks)
        return agent

RTO and RPO for AI agents

RTO (Recovery Time Objective): recovery time after a failure. With warm standby: < 30 sec. With checkpoint + cold start: 5–15 min.

RPO (Recovery Point Objective): data/progress loss. With a checkpoint every 60 seconds: no more than 60 seconds of work lost. For financial transactions, a checkpoint is performed after each step.