Development of a failover and auto-replacement system for AI agents
AI agent failover—automatically switches tasks from a faulty agent to a healthy one without losing progress. Critical for systems with continuous processes: monitoring, queue processing, long-running tasks.
Failover architecture
┌─────────────────────────────────┐
│ Task Queue (Redis/Kafka) │
└────────────┬────────────────────┘
│
┌─────────────▼──────────────┐
│ Orchestrator / Scheduler │
│ (health checks, failover) │
└──────┬─────────────┬────────┘
│ │
┌──────▼───┐ ┌──────▼───┐
│ Agent 1 │ │ Agent 2 │
│ (active) │ │ (standby) │
└──────┬───┘ └──────────┘
│ crash
┌──────▼──────────────────────┐
│ Failover: задачи Agent 1 │
│ переданы Agent 2 (из │
│ checkpoint или очереди) │
└─────────────────────────────┘
Health Check system
import asyncio
import aiohttp
from datetime import datetime, timedelta
@dataclass
class AgentHealth:
agent_id: str
last_heartbeat: datetime
last_task_completed: datetime | None
consecutive_failures: int
status: str # healthy / degraded / failed
class AgentHealthMonitor:
def __init__(self, failure_threshold: int = 3, heartbeat_timeout: int = 30):
self.failure_threshold = failure_threshold
self.heartbeat_timeout = heartbeat_timeout
self.agent_health: dict[str, AgentHealth] = {}
async def check_agents(self, agent_ids: list[str]) -> dict[str, AgentHealth]:
tasks = [self._check_agent(agent_id) for agent_id in agent_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
for agent_id, result in zip(agent_ids, results):
if isinstance(result, Exception):
self._record_failure(agent_id, str(result))
else:
self._record_success(agent_id, result)
return self.agent_health
async def _check_agent(self, agent_id: str) -> dict:
url = f"http://{agent_id}:8080/health"
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5)) as session:
async with session.get(url) as response:
return await response.json()
def _record_failure(self, agent_id: str, error: str):
health = self.agent_health.setdefault(agent_id, AgentHealth(
agent_id=agent_id, last_heartbeat=datetime.utcnow(),
last_task_completed=None, consecutive_failures=0, status="healthy"
))
health.consecutive_failures += 1
if health.consecutive_failures >= self.failure_threshold:
health.status = "failed"
logger.critical(f"Agent {agent_id} marked as FAILED after {health.consecutive_failures} failures")
def get_failed_agents(self) -> list[str]:
return [aid for aid, h in self.agent_health.items() if h.status == "failed"]
def get_healthy_agents(self) -> list[str]:
return [aid for aid, h in self.agent_health.items() if h.status == "healthy"]
Failover orchestrator
class FailoverOrchestrator:
def __init__(self, task_queue: TaskQueue, health_monitor: AgentHealthMonitor,
checkpoint_manager: CheckpointManager):
self.task_queue = task_queue
self.health_monitor = health_monitor
self.checkpoint_manager = checkpoint_manager
async def run_failover_loop(self):
while True:
await asyncio.sleep(15) # проверка каждые 15 секунд
failed_agents = self.health_monitor.get_failed_agents()
if not failed_agents:
continue
healthy_agents = self.health_monitor.get_healthy_agents()
if not healthy_agents:
logger.critical("NO HEALTHY AGENTS AVAILABLE - alerting on-call")
await self._page_oncall("All agents failed")
continue
for failed_agent in failed_agents:
await self._failover_agent(failed_agent, healthy_agents)
async def _failover_agent(self, failed_agent: str, healthy_agents: list[str]):
logger.info(f"Starting failover for agent {failed_agent}")
# Получаем задачи, назначенные упавшему агенту
assigned_tasks = await self.task_queue.get_tasks_for_agent(failed_agent)
for task in assigned_tasks:
# Пробуем восстановиться из checkpoint
checkpoint = await self.checkpoint_manager.load(task.id)
# Выбираем наименее загруженный здоровый агент
target_agent = self._select_least_loaded_agent(healthy_agents)
# Переназначаем задачу с checkpoint
await self.task_queue.reassign_task(
task_id=task.id,
new_agent=target_agent,
checkpoint=checkpoint,
priority=TaskPriority.HIGH # повышаем приоритет для failover задач
)
logger.info(f"Task {task.id} reassigned from {failed_agent} to {target_agent}")
# Помечаем агент как требующий замены
await self._trigger_agent_replacement(failed_agent)
async def _trigger_agent_replacement(self, agent_id: str):
"""Запускаем новый инстанс агента через K8s или Docker."""
# Kubernetes: удаляем pod, ReplicaSet создаёт новый
if self.is_kubernetes:
await k8s_client.delete_pod(agent_id)
else:
await docker_client.restart_container(agent_id)
Warm standby agents
For critical processes, use warm standby: the backup agent is loaded and ready, but does not process tasks. Failover takes <5 seconds versus 3–10 minutes with a cold start.
class WarmStandbyPool:
def __init__(self, standby_count: int = 2):
self.standbys: list[StandbyAgent] = []
self.standby_count = standby_count
async def maintain_pool(self):
"""Поддерживаем нужное число standby агентов."""
while True:
active_count = len([s for s in self.standbys if s.status == "ready"])
if active_count < self.standby_count:
# Запускаем дополнительный standby
new_standby = await self._launch_standby_agent()
self.standbys.append(new_standby)
await asyncio.sleep(30)
async def promote_standby(self, tasks: list[AgentTask]) -> StandbyAgent:
"""Переводим standby в активный режим."""
ready_standbys = [s for s in self.standbys if s.status == "ready"]
if not ready_standbys:
raise RuntimeError("No standby agents available")
agent = ready_standbys[0]
agent.status = "active"
await agent.assign_tasks(tasks)
return agent
RTO and RPO for AI agents
RTO (Recovery Time Objective): recovery time after a failure. With warm standby: < 30 sec. With checkpoint + cold start: 5–15 min.
RPO (Recovery Point Objective): data/progress loss. With a checkpoint every 60 seconds: no more than 60 seconds of work lost. For financial transactions, a checkpoint is performed after each step.







