Розробка AI-системи автоматичного масштабування застосунків на основі прогнозу навантаження

Проектуємо та впроваджуємо системи штучного інтелекту: від прототипу до production-ready рішення. Наша команда поєднує експертизу в машинному навчанні, дата-інжинірингу та MLOps, щоб AI працював не в лабораторії, а в реальному бізнесі.
Показано 1 з 1Усі 1566 послуг
Розробка AI-системи автоматичного масштабування застосунків на основі прогнозу навантаження
Складний
~1-2 тижні
Часті запитання

Напрямки AI-розробки

Етапи розробки AI-рішення

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    902
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1123
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    860

Розробка AI-системи автоскейлінгу на основі навантаження

Predictive autoscaling для AI-сервісів - масштабування ресурсів на основі прогнозу навантаження, а не реактивної відповіді на поточні метрики. Вирішує основну проблему реактивного скейлінгу: на момент додавання ресурсів навантаження вже призвело до деградації.

Проблеми реактивного autoscaling для LLM

  • Cold start: запуск нового GPU pod із завантаженням моделі займає 3–10 хвилин
  • Latency cliff: при перевантаженні черга росте експоненційно, якість деградує різко
  • Cost spike: реактивний скейл часто призводить до overprovision - ресурси виділяються, але навантаження вже спало

Predictive scaling вирішує це: бачимо пік, що наближається, за 15–30 хвилин → запускаємо ресурси заздалегідь.

Прогнозування навантаження

from prophet import Prophet
import pandas as pd
import numpy as np

class LoadForecaster:
    def __init__(self):
        self.model = None
        self.last_trained = None

    def train(self, historical_load: pd.DataFrame):
        """
        historical_load: DataFrame с колонками 'ds' (datetime) и 'y' (requests_per_minute)
        """
        self.model = Prophet(
            seasonality_mode="multiplicative",
            weekly_seasonality=True,
            daily_seasonality=True,
            changepoint_prior_scale=0.05  # сглаживание резких изменений
        )
        # Добавляем кастомные события (праздники, планируемые маркетинг-кампании)
        self.model.add_country_holidays(country_name="RU")
        self.model.fit(historical_load)
        self.last_trained = datetime.utcnow()

    def forecast(self, horizon_minutes: int = 60) -> pd.DataFrame:
        """Прогноз нагрузки на horizon_minutes вперёд."""
        future = self.model.make_future_dataframe(
            periods=horizon_minutes, freq="T"  # поминутно
        )
        forecast = self.model.predict(future)
        return forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]].tail(horizon_minutes)

    def get_required_replicas(self, forecast: pd.DataFrame, capacity_per_replica: float) -> int:
        peak_load = forecast["yhat_upper"].max()  # берём верхнюю границу (conservative)
        return max(1, math.ceil(peak_load / capacity_per_replica))

Прийняття рішень про скейлінг

class PredictiveScalingController:
    def __init__(
        self,
        forecaster: LoadForecaster,
        lead_time_minutes: int = 15,   # заранее до ожидаемого пика
        scale_up_buffer: float = 1.2,  # +20% запас
        scale_down_delay_minutes: int = 30
    ):
        self.forecaster = forecaster
        self.lead_time = lead_time_minutes
        self.buffer = scale_up_buffer
        self.scale_down_delay = scale_down_delay_minutes

    def get_scaling_decision(
        self,
        current_replicas: int,
        current_load: float
    ) -> ScalingDecision:

        # Прогноз на следующие 30 минут
        forecast = self.forecaster.forecast(horizon_minutes=30)
        peak_in_lead_time = forecast.head(self.lead_time)["yhat_upper"].max()

        required = math.ceil(peak_in_lead_time * self.buffer / CAPACITY_PER_REPLICA)

        # Решение
        if required > current_replicas:
            return ScalingDecision(
                action="scale_up",
                target_replicas=required,
                reason=f"Predictive: peak {peak_in_lead_time:.0f} req/min in {self.lead_time}min"
            )
        elif required < current_replicas - 1:
            # Scale down только если нагрузка снижается стабильно
            recent_trend = self._is_load_decreasing(minutes=self.scale_down_delay)
            if recent_trend:
                return ScalingDecision(
                    action="scale_down",
                    target_replicas=max(1, required),
                    reason="Load decreasing trend confirmed"
                )

        return ScalingDecision(action="no_change", target_replicas=current_replicas)

Інтеграція з Kubernetes

from kubernetes import client, config

class K8sScaler:
    def __init__(self):
        config.load_incluster_config()
        self.apps_v1 = client.AppsV1Api()

    def scale(self, namespace: str, deployment: str, replicas: int):
        body = {"spec": {"replicas": replicas}}
        self.apps_v1.patch_namespaced_deployment_scale(
            name=deployment,
            namespace=namespace,
            body=body
        )
        logger.info(f"Scaled {namespace}/{deployment} to {replicas} replicas")

    def get_current_replicas(self, namespace: str, deployment: str) -> int:
        deployment_obj = self.apps_v1.read_namespaced_deployment(deployment, namespace)
        return deployment_obj.spec.replicas

Навчання на історичних даних

class ContinuousLearner:
    def update_model(self):
        """Переобучаем модель на новых данных каждые 24 часа."""
        historical = self.metrics_db.get_load_history(days=90)
        df = pd.DataFrame(historical, columns=["ds", "y"])

        self.forecaster.train(df)
        logger.info(f"Model retrained on {len(df)} data points")

        # Оценка точности прогноза
        accuracy = self.evaluate_forecast_accuracy()
        if accuracy.mape > 0.20:  # > 20% ошибка → алерт
            logger.warning(f"Forecast accuracy degraded: MAPE={accuracy.mape:.1%}")

Терміни впровадження

Тиждень 1–2: Збір історичних метрик навантаження, перша модель Prophet, backtesting

Тиждень 3–4: Інтеграція з K8s Deployment, shadow mode (прогнозуємо але не скейлім)

Місяць 2: Переклад у production режим, моніторинг cost savings, continuous learning

Місяць 3: Тюнінг параметрів, multi-service координація, circuit breakers для аномальних прогнозів