AI Microservice Development
An AI microservice is a separate HTTP/gRPC service encapsulating an ML model or AI logic. Isolated from the main application: independent deployment, scaling, technology stack. Pattern for production AI systems.
AI Microservice Structure
ai-service/
├── app/
│ ├── main.py # FastAPI application
│ ├── model.py # Model loading and inference
│ ├── schemas.py # Pydantic request/response models
│ ├── preprocessing.py # Same transformations as during training
│ └── monitoring.py # Metrics, logging
├── tests/
│ ├── test_model.py
│ ├── test_api.py
│ └── test_preprocessing.py
├── Dockerfile
├── requirements.txt
└── model_artifacts/ # Model versioned separately (S3/MLflow)
FastAPI Implementation
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import time
from prometheus_client import Counter, Histogram, generate_latest
app = FastAPI(title="Sentiment Analysis Service", version="1.0.0")
# Metrics
REQUEST_COUNT = Counter("requests_total", "Total requests", ["endpoint", "status"])
INFERENCE_TIME = Histogram("inference_duration_seconds", "Inference time",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0])
class PredictRequest(BaseModel):
texts: list[str]
model_version: str | None = None # None = latest
class PredictionResult(BaseModel):
text: str
label: str
score: float
model_version: str
class PredictResponse(BaseModel):
predictions: list[PredictionResult]
processing_time_ms: float
@app.post("/predict", response_model=PredictResponse)
async def predict(request: PredictRequest):
if len(request.texts) > 100:
raise HTTPException(422, "Max 100 texts per request")
start = time.time()
try:
predictions = model_registry.get(request.model_version).predict(request.texts)
elapsed = (time.time() - start) * 1000
REQUEST_COUNT.labels(endpoint="/predict", status="success").inc()
INFERENCE_TIME.observe(elapsed / 1000)
return PredictResponse(
predictions=[
PredictionResult(text=t, label=p.label, score=p.score,
model_version=model_registry.current_version)
for t, p in zip(request.texts, predictions)
],
processing_time_ms=elapsed
)
except Exception as e:
REQUEST_COUNT.labels(endpoint="/predict", status="error").inc()
raise HTTPException(500, str(e))
@app.get("/health")
async def health():
return {"status": "ok", "model_loaded": model_registry.is_loaded()}
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")
Dockerfile for AI Service
FROM python:3.11-slim
WORKDIR /app
# System dependencies for ML libraries
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential libgomp1 && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app/ ./app/
# Model is loaded from S3 at startup (not in image — too large)
ENV MODEL_BUCKET=s3://models
ENV MODEL_KEY=sentiment/v2.1/model.pkl
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080", "--workers", "2"]
Batching for Efficiency
For GPU models, batching is critical: 100 requests with 1 text each is 100x slower than 1 request with 100 texts. Dynamic batching:
class DynamicBatcher:
def __init__(self, model, max_batch_size=32, max_wait_ms=20):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.queue: asyncio.Queue = asyncio.Queue()
async def predict(self, texts: list[str]) -> list[Prediction]:
future = asyncio.Future()
await self.queue.put((texts, future))
return await future
async def _batch_worker(self):
while True:
batch_items = []
deadline = time.time() + self.max_wait_ms / 1000
# Accumulate batch or wait for deadline
while len(batch_items) < self.max_batch_size and time.time() < deadline:
try:
item = await asyncio.wait_for(
self.queue.get(),
timeout=deadline - time.time()
)
batch_items.append(item)
except asyncio.TimeoutError:
break
if batch_items:
all_texts = [t for texts, _ in batch_items for t in texts]
all_preds = self.model.predict_batch(all_texts)
# Distribute results to futures
idx = 0
for texts, future in batch_items:
future.set_result(all_preds[idx:idx + len(texts)])
idx += len(texts)
Versioned Model Loading
When deploying a new model version — zero-downtime replacement: new version is loaded in memory in parallel, after successful health check — traffic switches, old version is unloaded.
API Contract and Backward Compatibility
Endpoint versioning (/v1/predict, /v2/predict) allows changing the contract without breaking clients. /v1 is supported for at least 3 months after /v2 release.







