Реализация Fallback между LLM-провайдерами при недоступности
LLM-провайдеры периодически испытывают перебои: rate limits, maintenance windows, региональные сбои. Продакшн-система должна автоматически переключаться на резервный провайдер. Graded fallback — это не просто "если сломано, попробуй другое", а умная стратегия с учётом типа ошибки, времени ожидания и деградации качества.
Базовый Fallback с tenacity
from openai import OpenAI, RateLimitError, APIError
from anthropic import Anthropic
from groq import Groq
import anthropic
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import logging
from dataclasses import dataclass
from typing import Optional
import time
logger = logging.getLogger(__name__)
@dataclass
class ProviderConfig:
name: str
model: str
priority: int # Меньше = выше приоритет
max_retries: int = 3
class LLMFallbackClient:
PROVIDERS = [
ProviderConfig("anthropic", "claude-sonnet-4-5", priority=1),
ProviderConfig("openai", "gpt-4o", priority=2),
ProviderConfig("groq", "llama-3.1-70b-versatile", priority=3),
]
def __init__(self):
self.clients = {
"anthropic": Anthropic(),
"openai": OpenAI(),
"groq": Groq(),
}
self._circuit_breakers: dict[str, dict] = {}
def _is_circuit_open(self, provider: str) -> bool:
"""Circuit breaker: блокируем провайдер при частых ошибках"""
cb = self._circuit_breakers.get(provider, {"failures": 0, "last_failure": 0})
if cb["failures"] >= 5:
# Переоткрываем через 60 секунд
if time.time() - cb["last_failure"] > 60:
self._circuit_breakers[provider] = {"failures": 0, "last_failure": 0}
return False
return True
return False
def _record_failure(self, provider: str):
cb = self._circuit_breakers.get(provider, {"failures": 0, "last_failure": 0})
cb["failures"] += 1
cb["last_failure"] = time.time()
self._circuit_breakers[provider] = cb
def _record_success(self, provider: str):
self._circuit_breakers[provider] = {"failures": 0, "last_failure": 0}
def _call_provider(self, provider: str, model: str, messages: list[dict], **kwargs) -> str:
"""Вызов конкретного провайдера"""
if provider == "anthropic":
response = self.clients["anthropic"].messages.create(
model=model,
max_tokens=kwargs.get("max_tokens", 2048),
messages=messages,
system=kwargs.get("system", ""),
)
return response.content[0].text
elif provider == "openai":
all_messages = []
if kwargs.get("system"):
all_messages.append({"role": "system", "content": kwargs["system"]})
all_messages.extend(messages)
response = self.clients["openai"].chat.completions.create(
model=model,
messages=all_messages,
max_tokens=kwargs.get("max_tokens", 2048),
temperature=kwargs.get("temperature", 0.1),
)
return response.choices[0].message.content
elif provider == "groq":
all_messages = []
if kwargs.get("system"):
all_messages.append({"role": "system", "content": kwargs["system"]})
all_messages.extend(messages)
response = self.clients["groq"].chat.completions.create(
model=model,
messages=all_messages,
)
return response.choices[0].message.content
raise ValueError(f"Unknown provider: {provider}")
def complete(self, messages: list[dict], **kwargs) -> tuple[str, str]:
"""Выполняет запрос с автоматическим fallback.
Возвращает (ответ, имя_провайдера)"""
sorted_providers = sorted(self.PROVIDERS, key=lambda p: p.priority)
last_error = None
for config in sorted_providers:
if self._is_circuit_open(config.name):
logger.warning(f"Circuit open for {config.name}, skipping")
continue
for attempt in range(config.max_retries):
try:
result = self._call_provider(config.name, config.model, messages, **kwargs)
self._record_success(config.name)
if config.priority > 1:
logger.warning(f"Used fallback provider: {config.name}")
return result, config.name
except (RateLimitError, anthropic.RateLimitError) as e:
wait_time = min(2 ** attempt, 30)
logger.warning(f"{config.name} rate limited, waiting {wait_time}s")
time.sleep(wait_time)
last_error = e
except (APIError, anthropic.APIError) as e:
self._record_failure(config.name)
logger.error(f"{config.name} API error: {e}")
last_error = e
break # Переходим к следующему провайдеру
except Exception as e:
self._record_failure(config.name)
logger.error(f"{config.name} unexpected error: {e}")
last_error = e
break
raise RuntimeError(f"All providers failed. Last error: {last_error}")
Async fallback с параллельными попытками
import asyncio
class AsyncFallbackClient:
"""Для критически важных запросов — параллельный запрос к нескольким провайдерам"""
async def complete_parallel(self, messages: list[dict], **kwargs) -> str:
"""Отправляет запрос параллельно в 2 провайдера, берёт первый ответ"""
tasks = [
self._async_anthropic(messages, **kwargs),
self._async_openai(messages, **kwargs),
]
# race — возвращаем первый успешный результат
done, pending = await asyncio.wait(
[asyncio.create_task(t) for t in tasks],
return_when=asyncio.FIRST_COMPLETED
)
# Отменяем оставшиеся запросы
for task in pending:
task.cancel()
return await next(iter(done))
Мониторинг здоровья провайдеров
class ProviderHealthMonitor:
"""Периодически проверяет доступность провайдеров"""
async def check_all(self) -> dict[str, bool]:
results = {}
test_message = [{"role": "user", "content": "Say 'ok'"}]
for provider_name in ["anthropic", "openai", "groq"]:
try:
# Быстрый тест с коротким таймаутом
await asyncio.wait_for(
self._test_provider(provider_name, test_message),
timeout=5.0
)
results[provider_name] = True
except Exception:
results[provider_name] = False
return results
Сроки
- Базовый fallback с retry: 1–2 дня
- Circuit breaker + мониторинг: 2–3 дня
- Async параллельный fallback: 2–3 дня
- Полная production-система с алертингом: 1 неделя







