LLM Provider Fallback Implementation on Unavailability

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
LLM Provider Fallback Implementation on Unavailability
Medium
~2-3 business days
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1212
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822

Реализация Fallback между LLM-провайдерами при недоступности

LLM-провайдеры периодически испытывают перебои: rate limits, maintenance windows, региональные сбои. Продакшн-система должна автоматически переключаться на резервный провайдер. Graded fallback — это не просто "если сломано, попробуй другое", а умная стратегия с учётом типа ошибки, времени ожидания и деградации качества.

Базовый Fallback с tenacity

from openai import OpenAI, RateLimitError, APIError
from anthropic import Anthropic
from groq import Groq
import anthropic
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import logging
from dataclasses import dataclass
from typing import Optional
import time

logger = logging.getLogger(__name__)

@dataclass
class ProviderConfig:
    name: str
    model: str
    priority: int  # Меньше = выше приоритет
    max_retries: int = 3

class LLMFallbackClient:

    PROVIDERS = [
        ProviderConfig("anthropic", "claude-sonnet-4-5", priority=1),
        ProviderConfig("openai", "gpt-4o", priority=2),
        ProviderConfig("groq", "llama-3.1-70b-versatile", priority=3),
    ]

    def __init__(self):
        self.clients = {
            "anthropic": Anthropic(),
            "openai": OpenAI(),
            "groq": Groq(),
        }
        self._circuit_breakers: dict[str, dict] = {}

    def _is_circuit_open(self, provider: str) -> bool:
        """Circuit breaker: блокируем провайдер при частых ошибках"""
        cb = self._circuit_breakers.get(provider, {"failures": 0, "last_failure": 0})
        if cb["failures"] >= 5:
            # Переоткрываем через 60 секунд
            if time.time() - cb["last_failure"] > 60:
                self._circuit_breakers[provider] = {"failures": 0, "last_failure": 0}
                return False
            return True
        return False

    def _record_failure(self, provider: str):
        cb = self._circuit_breakers.get(provider, {"failures": 0, "last_failure": 0})
        cb["failures"] += 1
        cb["last_failure"] = time.time()
        self._circuit_breakers[provider] = cb

    def _record_success(self, provider: str):
        self._circuit_breakers[provider] = {"failures": 0, "last_failure": 0}

    def _call_provider(self, provider: str, model: str, messages: list[dict], **kwargs) -> str:
        """Вызов конкретного провайдера"""
        if provider == "anthropic":
            response = self.clients["anthropic"].messages.create(
                model=model,
                max_tokens=kwargs.get("max_tokens", 2048),
                messages=messages,
                system=kwargs.get("system", ""),
            )
            return response.content[0].text

        elif provider == "openai":
            all_messages = []
            if kwargs.get("system"):
                all_messages.append({"role": "system", "content": kwargs["system"]})
            all_messages.extend(messages)
            response = self.clients["openai"].chat.completions.create(
                model=model,
                messages=all_messages,
                max_tokens=kwargs.get("max_tokens", 2048),
                temperature=kwargs.get("temperature", 0.1),
            )
            return response.choices[0].message.content

        elif provider == "groq":
            all_messages = []
            if kwargs.get("system"):
                all_messages.append({"role": "system", "content": kwargs["system"]})
            all_messages.extend(messages)
            response = self.clients["groq"].chat.completions.create(
                model=model,
                messages=all_messages,
            )
            return response.choices[0].message.content

        raise ValueError(f"Unknown provider: {provider}")

    def complete(self, messages: list[dict], **kwargs) -> tuple[str, str]:
        """Выполняет запрос с автоматическим fallback.
        Возвращает (ответ, имя_провайдера)"""

        sorted_providers = sorted(self.PROVIDERS, key=lambda p: p.priority)

        last_error = None
        for config in sorted_providers:
            if self._is_circuit_open(config.name):
                logger.warning(f"Circuit open for {config.name}, skipping")
                continue

            for attempt in range(config.max_retries):
                try:
                    result = self._call_provider(config.name, config.model, messages, **kwargs)
                    self._record_success(config.name)

                    if config.priority > 1:
                        logger.warning(f"Used fallback provider: {config.name}")

                    return result, config.name

                except (RateLimitError, anthropic.RateLimitError) as e:
                    wait_time = min(2 ** attempt, 30)
                    logger.warning(f"{config.name} rate limited, waiting {wait_time}s")
                    time.sleep(wait_time)
                    last_error = e

                except (APIError, anthropic.APIError) as e:
                    self._record_failure(config.name)
                    logger.error(f"{config.name} API error: {e}")
                    last_error = e
                    break  # Переходим к следующему провайдеру

                except Exception as e:
                    self._record_failure(config.name)
                    logger.error(f"{config.name} unexpected error: {e}")
                    last_error = e
                    break

        raise RuntimeError(f"All providers failed. Last error: {last_error}")

Async fallback с параллельными попытками

import asyncio

class AsyncFallbackClient:
    """Для критически важных запросов — параллельный запрос к нескольким провайдерам"""

    async def complete_parallel(self, messages: list[dict], **kwargs) -> str:
        """Отправляет запрос параллельно в 2 провайдера, берёт первый ответ"""

        tasks = [
            self._async_anthropic(messages, **kwargs),
            self._async_openai(messages, **kwargs),
        ]

        # race — возвращаем первый успешный результат
        done, pending = await asyncio.wait(
            [asyncio.create_task(t) for t in tasks],
            return_when=asyncio.FIRST_COMPLETED
        )

        # Отменяем оставшиеся запросы
        for task in pending:
            task.cancel()

        return await next(iter(done))

Мониторинг здоровья провайдеров

class ProviderHealthMonitor:
    """Периодически проверяет доступность провайдеров"""

    async def check_all(self) -> dict[str, bool]:
        results = {}
        test_message = [{"role": "user", "content": "Say 'ok'"}]

        for provider_name in ["anthropic", "openai", "groq"]:
            try:
                # Быстрый тест с коротким таймаутом
                await asyncio.wait_for(
                    self._test_provider(provider_name, test_message),
                    timeout=5.0
                )
                results[provider_name] = True
            except Exception:
                results[provider_name] = False

        return results

Сроки

  • Базовый fallback с retry: 1–2 дня
  • Circuit breaker + мониторинг: 2–3 дня
  • Async параллельный fallback: 2–3 дня
  • Полная production-система с алертингом: 1 неделя