Реалізація Fallback між LLM-провайдерами при недоступності

Проектуємо та впроваджуємо системи штучного інтелекту: від прототипу до production-ready рішення. Наша команда поєднує експертизу в машинному навчанні, дата-інжинірингу та MLOps, щоб AI працював не в лабораторії, а в реальному бізнесі.
Показано 1 з 1Усі 1566 послуг
Реалізація Fallback між LLM-провайдерами при недоступності
Середній
~2-3 дні
Часті запитання

Напрямки AI-розробки

Етапи розробки AI-рішення

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1284
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    901
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1119
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    853

Реалізація Fallback між LLM-провайдерами при недоступності

LLM-провайдери періодично відчувають перебої: rate limits, maintenance windows, регіональні збої. Production-система повинна автоматично переключатися на резервного провайдера. Graded fallback — це не просто "якщо зломано, спробуй інше", а розумна стратегія з урахуванням типу помилки, часу очікування та деградації якості.

Базовий Fallback з tenacity

from openai import OpenAI, RateLimitError, APIError
from anthropic import Anthropic
from groq import Groq
import anthropic
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import logging
from dataclasses import dataclass
from typing import Optional
import time

logger = logging.getLogger(__name__)

@dataclass
class ProviderConfig:
    name: str
    model: str
    priority: int  # Менше = вищий пріоритет
    max_retries: int = 3

class LLMFallbackClient:

    PROVIDERS = [
        ProviderConfig("anthropic", "claude-sonnet-4-5", priority=1),
        ProviderConfig("openai", "gpt-4o", priority=2),
        ProviderConfig("groq", "llama-3.1-70b-versatile", priority=3),
    ]

    def __init__(self):
        self.clients = {
            "anthropic": Anthropic(),
            "openai": OpenAI(),
            "groq": Groq(),
        }
        self._circuit_breakers: dict[str, dict] = {}

    def _is_circuit_open(self, provider: str) -> bool:
        """Circuit breaker: блокуємо провайдера при частих помилках"""
        cb = self._circuit_breakers.get(provider, {"failures": 0, "last_failure": 0})
        if cb["failures"] >= 5:
            # Переоткриваємо через 60 секунд
            if time.time() - cb["last_failure"] > 60:
                self._circuit_breakers[provider] = {"failures": 0, "last_failure": 0}
                return False
            return True
        return False

    def _record_failure(self, provider: str):
        cb = self._circuit_breakers.get(provider, {"failures": 0, "last_failure": 0})
        cb["failures"] += 1
        cb["last_failure"] = time.time()
        self._circuit_breakers[provider] = cb

    def _record_success(self, provider: str):
        self._circuit_breakers[provider] = {"failures": 0, "last_failure": 0}

    def _call_provider(self, provider: str, model: str, messages: list[dict], **kwargs) -> str:
        """Виклик конкретного провайдера"""
        if provider == "anthropic":
            response = self.clients["anthropic"].messages.create(
                model=model,
                max_tokens=kwargs.get("max_tokens", 2048),
                messages=messages,
                system=kwargs.get("system", ""),
            )
            return response.content[0].text

        elif provider == "openai":
            all_messages = []
            if kwargs.get("system"):
                all_messages.append({"role": "system", "content": kwargs["system"]})
            all_messages.extend(messages)
            response = self.clients["openai"].chat.completions.create(
                model=model,
                messages=all_messages,
                max_tokens=kwargs.get("max_tokens", 2048),
                temperature=kwargs.get("temperature", 0.1),
            )
            return response.choices[0].message.content

        elif provider == "groq":
            all_messages = []
            if kwargs.get("system"):
                all_messages.append({"role": "system", "content": kwargs["system"]})
            all_messages.extend(messages)
            response = self.clients["groq"].chat.completions.create(
                model=model,
                messages=all_messages,
            )
            return response.choices[0].message.content

        raise ValueError(f"Unknown provider: {provider}")

    def complete(self, messages: list[dict], **kwargs) -> tuple[str, str]:
        """Виконує запит з автоматичним fallback.
        Повертає (відповідь, ім'я_провайдера)"""

        sorted_providers = sorted(self.PROVIDERS, key=lambda p: p.priority)

        last_error = None
        for config in sorted_providers:
            if self._is_circuit_open(config.name):
                logger.warning(f"Circuit open for {config.name}, skipping")
                continue

            for attempt in range(config.max_retries):
                try:
                    result = self._call_provider(config.name, config.model, messages, **kwargs)
                    self._record_success(config.name)

                    if config.priority > 1:
                        logger.warning(f"Used fallback provider: {config.name}")

                    return result, config.name

                except (RateLimitError, anthropic.RateLimitError) as e:
                    wait_time = min(2 ** attempt, 30)
                    logger.warning(f"{config.name} rate limited, waiting {wait_time}s")
                    time.sleep(wait_time)
                    last_error = e

                except (APIError, anthropic.APIError) as e:
                    self._record_failure(config.name)
                    logger.error(f"{config.name} API error: {e}")
                    last_error = e
                    break  # Переходимо до наступного провайдера

                except Exception as e:
                    self._record_failure(config.name)
                    logger.error(f"{config.name} unexpected error: {e}")
                    last_error = e
                    break

        raise RuntimeError(f"All providers failed. Last error: {last_error}")

Async fallback з паралельними спробами

import asyncio

class AsyncFallbackClient:
    """Для критично важливих запитів — паралельний запит до кількох провайдерів"""

    async def complete_parallel(self, messages: list[dict], **kwargs) -> str:
        """Відправляє запит паралельно в 2 провайдера, бере першу відповідь"""

        tasks = [
            self._async_anthropic(messages, **kwargs),
            self._async_openai(messages, **kwargs),
        ]

        # race — повертаємо перший успішний результат
        done, pending = await asyncio.wait(
            [asyncio.create_task(t) for t in tasks],
            return_when=asyncio.FIRST_COMPLETED
        )

        # Скасовуємо залишилися запити
        for task in pending:
            task.cancel()

        return await next(iter(done))

Моніторинг здоров'я провайдерів

class ProviderHealthMonitor:
    """Періодично перевіряє доступність провайдерів"""

    async def check_all(self) -> dict[str, bool]:
        results = {}
        test_message = [{"role": "user", "content": "Say 'ok'"}]

        for provider_name in ["anthropic", "openai", "groq"]:
            try:
                # Швидкий тест з коротким таймаутом
                await asyncio.wait_for(
                    self._test_provider(provider_name, test_message),
                    timeout=5.0
                )
                results[provider_name] = True
            except Exception:
                results[provider_name] = False

        return results

Строки реалізації

  • Базовий fallback з retry: 1–2 дні
  • Circuit breaker + моніторинг: 2–3 дні
  • Async паралельний fallback: 2–3 дні
  • Повна production-система з alertingом: 1 тиждень