AI-based programmatic advertising system
Programmatic is the automated purchase of ad impressions through real-time auctions. Each banner impression is an auction, taking 100 milliseconds from request to winning and display. AI decides whether to participate in this auction and, if so, at what price.
Programmatic ecosystem architecture
Publisher → SSP → Ad Exchange → DSP → Advertiser
↕ RTB (100ms)
Bid Request/Response
Full stack: Supply Side Platform (SSP) manages the publisher's inventory. Demand Side Platform (DSP) manages the buying for the advertiser. Ad Exchange is the exchange that connects the two sides.
Bid Prediction Model
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.ensemble import GradientBoostingClassifier, GradientBoostingRegressor
from sklearn.calibration import CalibratedClassifierCV
import lightgbm as lgb
import json
class BidRequestFeaturizer:
"""Извлечение признаков из bid request за < 5ms"""
def featurize(self, bid_request: dict) -> np.ndarray:
"""
bid_request: стандартный OpenRTB 2.5 объект
Возвращает признаковый вектор для модели за < 1ms
"""
return np.array([
# Пользователь
self._hash_encode(bid_request.get('user', {}).get('id', ''), 100),
bid_request.get('user', {}).get('yob', 1990),
int(bid_request.get('user', {}).get('gender') == 'M'),
len(bid_request.get('user', {}).get('segments', [])),
# Устройство
self._device_type_encode(bid_request.get('device', {}).get('devicetype')),
int(bid_request.get('device', {}).get('os', '') in ['iOS', 'Android']),
self._hash_encode(bid_request.get('device', {}).get('model', ''), 50),
# Контекст показа
bid_request.get('imp', [{}])[0].get('banner', {}).get('w', 300),
bid_request.get('imp', [{}])[0].get('banner', {}).get('h', 250),
int(bid_request.get('imp', [{}])[0].get('instl') == 1), # Interstitial
# Площадка
self._hash_encode(bid_request.get('site', {}).get('domain', ''), 200),
self._hash_encode(bid_request.get('site', {}).get('cat', ['IAB1'])[0], 20),
# Временной контекст
pd.Timestamp.now().hour,
pd.Timestamp.now().weekday(),
int(pd.Timestamp.now().weekday() >= 5),
# Floor price
bid_request.get('imp', [{}])[0].get('bidfloor', 0),
], dtype=np.float32)
def _hash_encode(self, value: str, n_buckets: int) -> int:
return hash(value) % n_buckets
def _device_type_encode(self, device_type) -> int:
mapping = {1: 1, 2: 2, 3: 3, 4: 4, 5: 5}
return mapping.get(device_type, 0)
class CTRPredictor:
"""
Предсказание CTR (Click-Through Rate) для bid.
LightGBM обычно лучше нейросетей для tabular bid data.
"""
def __init__(self):
self.model = lgb.LGBMClassifier(
n_estimators=500,
learning_rate=0.05,
num_leaves=127,
min_child_samples=50,
subsample=0.8,
colsample_bytree=0.8,
random_state=42,
n_jobs=-1
)
def train(self, X: np.ndarray, y: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray):
"""Обучение с ранней остановкой"""
self.model.fit(
X, y,
eval_set=[(X_val, y_val)],
eval_metric='auc',
callbacks=[lgb.early_stopping(50), lgb.log_evaluation(100)]
)
def predict_ctr(self, X: np.ndarray) -> np.ndarray:
return self.model.predict_proba(X)[:, 1]
class ConversionRatePredictor:
"""CVR: вероятность конверсии при клике"""
def __init__(self):
# CVR обычно более разреженные данные → меньше деревьев
self.model = lgb.LGBMClassifier(
n_estimators=200,
learning_rate=0.05,
num_leaves=63,
min_child_samples=100,
random_state=42
)
def predict_cvr(self, X: np.ndarray) -> np.ndarray:
return self.model.predict_proba(X)[:, 1]
class BiddingEngine:
"""Движок принятия решений о ставках"""
def __init__(self, ctr_model: CTRPredictor,
cvr_model: ConversionRatePredictor,
featurizer: BidRequestFeaturizer):
self.ctr_model = ctr_model
self.cvr_model = cvr_model
self.featurizer = featurizer
def compute_bid(self, bid_request: dict,
campaign_config: dict) -> dict:
"""
Вычисление оптимальной ставки.
За < 10ms (latency constraint RTB).
"""
features = self.featurizer.featurize(bid_request)
# Предсказание CTR и CVR
ctr = float(self.ctr_model.predict_ctr(features.reshape(1, -1))[0])
cvr = float(self.cvr_model.predict_cvr(features.reshape(1, -1))[0])
# pCTCVR = P(click) × P(conversion|click)
pctcvr = ctr * cvr
# Ожидаемая ценность = pCTCVR × ценность конверсии
target_cpa = campaign_config.get('target_cpa_usd', 10)
expected_value = pctcvr * target_cpa
# Добавляем budget pacing adjustment
pacing_factor = self._compute_pacing_factor(campaign_config)
bid_price = expected_value * pacing_factor
# Ограничения
floor_price = bid_request.get('imp', [{}])[0].get('bidfloor', 0)
max_bid = campaign_config.get('max_bid_cpm', 10)
if bid_price < floor_price:
return {'bid': 0, 'reason': 'below_floor', 'predicted_ctr': ctr}
final_bid = min(bid_price, max_bid)
return {
'bid': round(final_bid, 4),
'predicted_ctr': round(ctr, 5),
'predicted_cvr': round(cvr, 5),
'predicted_pctcvr': round(pctcvr, 6),
'pacing_factor': round(pacing_factor, 3),
'auction_win_probability': self._estimate_win_prob(final_bid, floor_price)
}
def _compute_pacing_factor(self, campaign: dict) -> float:
"""
Budget pacing: корректируем ставки, чтобы равномерно расходовать бюджет.
Если тратим слишком быстро → снижаем, слишком медленно → повышаем.
"""
budget_total = campaign.get('daily_budget_usd', 1000)
spent_today = campaign.get('spent_today_usd', 0)
hours_elapsed = campaign.get('hours_elapsed_today', 12)
total_hours = 24
expected_spent_ratio = hours_elapsed / total_hours
actual_spent_ratio = spent_today / max(budget_total, 1)
if actual_spent_ratio > expected_spent_ratio * 1.1:
return 0.8 # Тратим слишком быстро — снижаем ставки
elif actual_spent_ratio < expected_spent_ratio * 0.9:
return 1.2 # Тратим слишком медленно — повышаем
return 1.0
def _estimate_win_prob(self, bid: float, floor: float) -> float:
"""Упрощённая оценка вероятности победы в аукционе"""
if bid < floor:
return 0.0
margin = (bid - floor) / max(floor, 0.01)
return min(0.95, 0.3 + margin * 0.5)
class BudgetPacingController:
"""Управление равномерностью расходования бюджета"""
def throttle_bid_rate(self, campaign_stats: dict,
current_qps: float) -> float:
"""
Throttling: сколько % bid requests обрабатываем.
Если тратим слишком быстро — часть запросов игнорируем.
"""
budget = campaign_stats.get('daily_budget', 1000)
spent = campaign_stats.get('spent', 0)
hours = campaign_stats.get('hours_elapsed', 12)
target_spend_rate = budget / 24 # Равномерный расход
actual_spend_rate = spent / max(hours, 0.1)
if actual_spend_rate > target_spend_rate * 1.2:
throttle = target_spend_rate / actual_spend_rate
return float(np.clip(throttle, 0.1, 1.0))
return 1.0
def compute_optimal_frequency_cap(self, user_stats: dict,
campaign_config: dict) -> dict:
"""Ограничение частоты показов одному пользователю"""
base_cap = campaign_config.get('frequency_cap', {'hour': 2, 'day': 5, 'week': 15})
# Если пользователь уже кликал → снижаем частоту (не надо давить)
if user_stats.get('has_clicked'):
return {'hour': 1, 'day': 2, 'week': 5}
# Если пользователь видел много показов без клика → снижаем
impressions_without_click = user_stats.get('impressions_no_click', 0)
if impressions_without_click > 20:
return {'hour': 0, 'day': 1, 'week': 3}
return base_cap
Auction Mechanics and Optimization
class AuctionOptimizer:
"""Оптимизация стратегии в аукционе первой и второй цены"""
def optimal_bid_second_price(self, valuation: float,
bid_landscape: np.ndarray) -> float:
"""
В аукционе второй цены (Vickrey) оптимально ставить свою истинную ценность.
Bid shading для first-price auctions: bid < valuation.
"""
return valuation # Second price: truthful bidding is optimal
def bid_shading_first_price(self, valuation: float,
historical_clearing_prices: np.ndarray) -> float:
"""
Bid shading для аукциона первой цены.
Оптимальная ставка < valuation, основанная на распределении побеждающих ставок.
"""
if len(historical_clearing_prices) == 0:
return valuation * 0.8 # Консервативное значение
# Оцениваем вероятность победы при разных ставках
best_bid = valuation * 0.5
best_profit = -float('inf')
for bid_pct in np.arange(0.5, 1.0, 0.05):
bid = valuation * bid_pct
win_prob = (historical_clearing_prices < bid).mean()
expected_profit = win_prob * (valuation - bid)
if expected_profit > best_profit:
best_profit = expected_profit
best_bid = bid
return round(best_bid, 4)
def evaluate_campaign_performance(self, impressions: pd.DataFrame) -> dict:
"""Сводные метрики кампании"""
return {
'impressions': len(impressions),
'clicks': impressions['clicked'].sum(),
'conversions': impressions['converted'].sum(),
'spend_usd': impressions['bid_price'].sum(),
'ctr': impressions['clicked'].mean(),
'cvr': impressions['converted'].sum() / max(impressions['clicked'].sum(), 1),
'cpa_usd': impressions['bid_price'].sum() / max(impressions['converted'].sum(), 1),
'roas': impressions.get('revenue', pd.Series([0])).sum() / max(impressions['bid_price'].sum(), 1),
'effective_cpm': impressions['bid_price'].mean() * 1000,
}
Latency requirements and infrastructure
RTB requires a response time of 100ms on most exchanges (Google, OpenX) and 50ms on the Google Display Network. This is a strict SLA—any delay = auction loss.
| Component | Latency budget |
|---|---|
| Network latency | ~20ms |
| Feature extraction | ~5ms |
| CTR/CVR prediction | ~3ms |
| Bid price calculation | ~1ms |
| Reply to the exchange | ~1ms |
| Total | ~30ms (reserve) |
To maintain latency: models on ONNX Runtime (5-10x faster than sklearn), feature serving via Redis (<1ms), horizontal scaling of the bid service on K8s with autoscaling by QPS.







