Developing a billing system for AI-as-a-Service (tokens, GPU hours, requests)
A billing system for AI SaaS must accurately account for several disparate resources: LLM tokens, GPU time, API requests, and storage. Each resource has its own unit of measurement, price, and calculation logic.
Tariff models and their combinations
from dataclasses import dataclass
from enum import Enum
from decimal import Decimal
class BillingUnit(Enum):
TOKEN = "token" # 1M токенов
GPU_SECOND = "gpu_second" # GPU-время
REQUEST = "request" # Запрос
GB_MONTH = "gb_month" # Хранилище
@dataclass
class PricingRule:
resource: BillingUnit
unit_price: Decimal
tier_breaks: list # [(volume_threshold, discounted_price)]
minimum_charge: Decimal = Decimal('0')
PRICING = {
"llm_input_token": PricingRule(
resource=BillingUnit.TOKEN,
unit_price=Decimal('0.000005'), # $5 per 1M tokens
tier_breaks=[
(10_000_000, Decimal('0.000004')), # >10M → $4/1M
(100_000_000, Decimal('0.000003')), # >100M → $3/1M
]
),
"gpu_a100_second": PricingRule(
resource=BillingUnit.GPU_SECOND,
unit_price=Decimal('0.00089'), # ~$3.20/hour per A100
tier_breaks=[],
minimum_charge=Decimal('0.01') # Минимум 10 секунд биллинга
),
}
Real-time Usage Metering
class UsageMeter:
def __init__(self, redis_client, kafka_producer):
self.redis = redis_client
self.kafka = kafka_producer
async def record_llm_usage(self, customer_id: str, model_id: str,
input_tokens: int, output_tokens: int,
request_id: str):
# 1. Real-time счётчики в Redis (для rate limiting и balance checks)
pipe = self.redis.pipeline()
month_key = f"usage:{customer_id}:{self.current_month()}"
pipe.hincrby(month_key, f"{model_id}:input_tokens", input_tokens)
pipe.hincrby(month_key, f"{model_id}:output_tokens", output_tokens)
pipe.expire(month_key, 60 * 60 * 24 * 40) # 40 дней
await pipe.execute()
# 2. Детальные события в Kafka для аудита и биллинга
await self.kafka.send("usage_events", {
"event_type": "llm_inference",
"customer_id": customer_id,
"model_id": model_id,
"request_id": request_id,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"timestamp": datetime.utcnow().isoformat()
})
async def record_gpu_job(self, customer_id: str, job_id: str,
gpu_type: str, duration_seconds: float):
# GPU биллинг с округлением вверх до секунды
billable_seconds = max(ceil(duration_seconds), 10) # Минимум 10 сек
await self.kafka.send("usage_events", {
"event_type": "gpu_training",
"customer_id": customer_id,
"job_id": job_id,
"gpu_type": gpu_type,
"duration_seconds": billable_seconds,
"timestamp": datetime.utcnow().isoformat()
})
Invoice generation
class InvoiceGenerator:
async def generate_monthly_invoice(self, customer_id: str,
billing_period: str) -> Invoice:
# Агрегация usage событий из ClickHouse
usage = await self.clickhouse.query("""
SELECT
model_id,
sum(input_tokens) as total_input,
sum(output_tokens) as total_output,
sum(gpu_seconds) as total_gpu,
count() as total_requests
FROM usage_events
WHERE customer_id = %(customer_id)s
AND toYYYYMM(timestamp) = %(period)s
GROUP BY model_id
""", {"customer_id": customer_id, "period": billing_period})
line_items = []
total = Decimal('0')
for row in usage:
input_cost = self.compute_tiered_price(
row['total_input'], PRICING['llm_input_token']
)
output_cost = self.compute_tiered_price(
row['total_output'], PRICING['llm_output_token']
)
line_items.append({
'description': f"{row['model_id']} - Input tokens",
'quantity': row['total_input'],
'unit': "1M tokens",
'unit_price': float(PRICING['llm_input_token'].unit_price * 1_000_000),
'amount': float(input_cost)
})
total += input_cost + output_cost
# Применение кредитов и скидок
credits = await self.get_customer_credits(customer_id)
final_amount = max(total - credits, Decimal('0'))
invoice = Invoice(
customer_id=customer_id,
period=billing_period,
line_items=line_items,
subtotal=float(total),
credits_applied=float(min(credits, total)),
total=float(final_amount)
)
# Отправка через Stripe
if final_amount > 0:
await self.stripe.create_invoice(customer_id, invoice)
return invoice
Alerts and budget control
async def check_budget_alerts(customer_id: str):
customer = await db.get_customer(customer_id)
current_spend = await usage_meter.get_current_month_spend(customer_id)
thresholds = [0.5, 0.8, 0.9, 1.0] # % от месячного бюджета
for threshold in thresholds:
alert_amount = customer.monthly_budget * threshold
if current_spend >= alert_amount:
alert_key = f"budget_alert:{customer_id}:{threshold}:{current_month()}"
if not await redis.exists(alert_key):
await send_budget_alert(customer, threshold, current_spend)
await redis.setex(alert_key, 86400 * 31, "1")
A successful billing system is not only a technical challenge but also a business process: transparency for customers, accurate accounting, and ease of invoice disputes reduce churn and increase trust in the platform.







