Development of a redundancy and fault-tolerance system for AI agents
AI agent fault tolerance ensures service continuity during failures, such as LLM API unavailability, tool errors, or exceptions in agent code. This is critical for production systems with SLAs.
Fault tolerance layers
LLM layer: fallback to alternative models when the primary one is unavailable. OpenAI → Anthropic → local model.
Tools layer: retry with exponential backoff, circuit breaker, alternative tools.
Agent layer: progress checkpoints, crash recovery, human escalation on critical errors.
Orchestration layer: multi-agent reservation, health checks, automatic task reassignment.
Fallback LLM Provider
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import anthropic
import openai
class ResilientLLMClient:
def __init__(self):
self.providers = [
{"name": "openai", "client": openai.AsyncOpenAI(), "model": "gpt-4o"},
{"name": "anthropic", "client": anthropic.AsyncAnthropic(), "model": "claude-3-5-sonnet-20241022"},
{"name": "local_vllm", "client": openai.AsyncOpenAI(base_url="http://gpu1:8000/v1"), "model": "llama-3-8b"},
]
self.circuit_breakers = {p["name"]: CircuitBreaker() for p in self.providers}
async def generate(self, messages: list, **kwargs) -> str:
last_error = None
for provider in self.providers:
cb = self.circuit_breakers[provider["name"]]
if cb.is_open():
continue # провайдер отключён по circuit breaker
try:
result = await self._call_provider(provider, messages, **kwargs)
cb.record_success()
return result
except Exception as e:
cb.record_failure()
last_error = e
logger.warning(f"Provider {provider['name']} failed: {e}")
raise RuntimeError(f"All LLM providers failed. Last error: {last_error}")
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((openai.RateLimitError, openai.APIConnectionError))
)
async def _call_provider(self, provider: dict, messages: list, **kwargs) -> str:
response = await provider["client"].chat.completions.create(
model=provider["model"],
messages=messages,
**kwargs
)
return response.choices[0].message.content
Circuit Breaker
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed" # нормальная работа
OPEN = "open" # провайдер отключён
HALF_OPEN = "half_open" # тестируем восстановление
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = CircuitState.CLOSED
def is_open(self) -> bool:
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = CircuitState.HALF_OPEN
return False
return True
return False
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.error(f"Circuit breaker OPEN after {self.failure_count} failures")
def record_success(self):
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
logger.info("Circuit breaker CLOSED - provider recovered")
Checkpoint and task recovery
For long-running agent tasks (> 1 min), save progress:
@dataclass
class AgentCheckpoint:
task_id: str
step_index: int
completed_steps: list[StepResult]
state: dict # произвольное состояние агента
saved_at: datetime
class CheckpointManager:
def __init__(self, storage: Redis):
self.storage = storage
async def save(self, checkpoint: AgentCheckpoint):
key = f"checkpoint:{checkpoint.task_id}"
await self.storage.setex(
key,
3600, # 1 час TTL
pickle.dumps(checkpoint)
)
async def load(self, task_id: str) -> AgentCheckpoint | None:
key = f"checkpoint:{task_id}"
data = await self.storage.get(key)
return pickle.loads(data) if data else None
async def resume_or_start(self, task: AgentTask) -> AgentCheckpoint:
existing = await self.load(task.id)
if existing:
logger.info(f"Resuming task {task.id} from step {existing.step_index}")
return existing
return AgentCheckpoint(task_id=task.id, step_index=0, completed_steps=[], state={}, saved_at=datetime.utcnow())
Human escalation for unresolvable errors
class EscalationPolicy:
MAX_RETRIES = 3
MAX_FALLBACK_ATTEMPTS = 2
async def handle_failure(
self,
task: AgentTask,
error: Exception,
retry_count: int
) -> EscalationDecision:
if retry_count < self.MAX_RETRIES and self._is_retriable(error):
return EscalationDecision(action="retry", delay_seconds=2 ** retry_count)
if isinstance(error, ToolUnavailableError):
return EscalationDecision(action="use_fallback_tool", tool=self._get_fallback_tool(error.tool))
# Эскалация к человеку
await self.notify_human(task, error, retry_count)
return EscalationDecision(
action="escalate",
message=f"Task {task.id} requires human intervention after {retry_count} retries: {error}"
)
Implementation timeframes
Week 1–2: Fallback LLM providers, retry logic, basic circuit breakers
Week 3–4: Checkpoint system, human escalation workflow
Month 2: Multi-agent redundancy, chaos testing, SLA dashboard







