AI Load-Forecast-Based Application Auto-Scaling

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI Load-Forecast-Based Application Auto-Scaling
Complex
~1-2 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1243
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1170
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    873
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1086
  • image_logo-advance_0.png
    B2B Advance company logo design
    563
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    830

Development of an AI-based load-based autoscaling system

Predictive autoscaling for AI services—scaling resources based on load forecasting, rather than reactive responses to current metrics. It solves the main problem with reactive scaling: by the time resources are added, the load has already degraded.

Problems of reactive autoscaling for LLM

  • Cold start: Starting a new GPU pod with model loading takes 3-10 minutes
  • Latency cliff: when overloaded, the queue grows exponentially, quality degrades sharply
  • Cost spike: reactive scaling often leads to overprovisioning - resources are allocated, but the load has already subsided

Predictive scaling solves this: we see the approaching peak 15–30 minutes in advance → we launch resources in advance.

Load forecasting

from prophet import Prophet
import pandas as pd
import numpy as np

class LoadForecaster:
    def __init__(self):
        self.model = None
        self.last_trained = None

    def train(self, historical_load: pd.DataFrame):
        """
        historical_load: DataFrame с колонками 'ds' (datetime) и 'y' (requests_per_minute)
        """
        self.model = Prophet(
            seasonality_mode="multiplicative",
            weekly_seasonality=True,
            daily_seasonality=True,
            changepoint_prior_scale=0.05  # сглаживание резких изменений
        )
        # Добавляем кастомные события (праздники, планируемые маркетинг-кампании)
        self.model.add_country_holidays(country_name="RU")
        self.model.fit(historical_load)
        self.last_trained = datetime.utcnow()

    def forecast(self, horizon_minutes: int = 60) -> pd.DataFrame:
        """Прогноз нагрузки на horizon_minutes вперёд."""
        future = self.model.make_future_dataframe(
            periods=horizon_minutes, freq="T"  # поминутно
        )
        forecast = self.model.predict(future)
        return forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]].tail(horizon_minutes)

    def get_required_replicas(self, forecast: pd.DataFrame, capacity_per_replica: float) -> int:
        peak_load = forecast["yhat_upper"].max()  # берём верхнюю границу (conservative)
        return max(1, math.ceil(peak_load / capacity_per_replica))

Making Scaling Decisions

class PredictiveScalingController:
    def __init__(
        self,
        forecaster: LoadForecaster,
        lead_time_minutes: int = 15,   # заранее до ожидаемого пика
        scale_up_buffer: float = 1.2,  # +20% запас
        scale_down_delay_minutes: int = 30
    ):
        self.forecaster = forecaster
        self.lead_time = lead_time_minutes
        self.buffer = scale_up_buffer
        self.scale_down_delay = scale_down_delay_minutes

    def get_scaling_decision(
        self,
        current_replicas: int,
        current_load: float
    ) -> ScalingDecision:

        # Прогноз на следующие 30 минут
        forecast = self.forecaster.forecast(horizon_minutes=30)
        peak_in_lead_time = forecast.head(self.lead_time)["yhat_upper"].max()

        required = math.ceil(peak_in_lead_time * self.buffer / CAPACITY_PER_REPLICA)

        # Решение
        if required > current_replicas:
            return ScalingDecision(
                action="scale_up",
                target_replicas=required,
                reason=f"Predictive: peak {peak_in_lead_time:.0f} req/min in {self.lead_time}min"
            )
        elif required < current_replicas - 1:
            # Scale down только если нагрузка снижается стабильно
            recent_trend = self._is_load_decreasing(minutes=self.scale_down_delay)
            if recent_trend:
                return ScalingDecision(
                    action="scale_down",
                    target_replicas=max(1, required),
                    reason="Load decreasing trend confirmed"
                )

        return ScalingDecision(action="no_change", target_replicas=current_replicas)

Kubernetes integration

from kubernetes import client, config

class K8sScaler:
    def __init__(self):
        config.load_incluster_config()
        self.apps_v1 = client.AppsV1Api()

    def scale(self, namespace: str, deployment: str, replicas: int):
        body = {"spec": {"replicas": replicas}}
        self.apps_v1.patch_namespaced_deployment_scale(
            name=deployment,
            namespace=namespace,
            body=body
        )
        logger.info(f"Scaled {namespace}/{deployment} to {replicas} replicas")

    def get_current_replicas(self, namespace: str, deployment: str) -> int:
        deployment_obj = self.apps_v1.read_namespaced_deployment(deployment, namespace)
        return deployment_obj.spec.replicas

Training on historical data

class ContinuousLearner:
    def update_model(self):
        """Переобучаем модель на новых данных каждые 24 часа."""
        historical = self.metrics_db.get_load_history(days=90)
        df = pd.DataFrame(historical, columns=["ds", "y"])

        self.forecaster.train(df)
        logger.info(f"Model retrained on {len(df)} data points")

        # Оценка точности прогноза
        accuracy = self.evaluate_forecast_accuracy()
        if accuracy.mape > 0.20:  # > 20% ошибка → алерт
            logger.warning(f"Forecast accuracy degraded: MAPE={accuracy.mape:.1%}")

Implementation timeframes

Week 1–2: Collecting historical load metrics, first Prophet model, backtesting

Week 3–4: Integration with K8s Deployment, shadow mode (we predict but do not scale)

Month 2: Transition to production mode, cost savings monitoring, continuous learning

Month 3: Parameter tuning, multi-service coordination, circuit breakers for abnormal forecasts