RAG Pipeline Architecture Design
The architecture of a RAG pipeline determines the quality, scalability, and cost of the entire system. Basic RAG "works" in a day, but a production-ready system with reliable retrieval, monitoring, and managed costs requires careful design.
Components of Modern RAG Pipeline
┌─────────────────────────────────────────────────────┐
│ INGESTION PIPELINE │
│ Sources → Loaders → Parsers → Chunkers → Embedder │
│ → Metadata Extractor → Vector Store │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ RETRIEVAL PIPELINE │
│ Query → Query Transformer → Multi-Index Search │
│ → Reranker → Context Assembler │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ GENERATION PIPELINE │
│ Context + Query → Prompt Builder → LLM │
│ → Response Validator → User │
└─────────────────────────────────────────────────────┘
Ingestion Pipeline: Architectural Solutions
Document Loaders: choice of loaders is critical for quality. PDFs with tables require pdfplumber or LlamaParse, not PyPDF2. Word documents — python-docx, HTML — BeautifulSoup with custom cleanup rules.
from llama_parse import LlamaParse
from langchain_community.document_loaders import (
PyPDFLoader, UnstructuredWordDocumentLoader,
ConfluenceLoader, NotionDBLoader
)
# For complex PDFs (tables, columns, images)
parser = LlamaParse(
api_key="...",
result_type="markdown", # Preserves table structure
language="en",
)
# Configurable loading pipeline
LOADERS = {
".pdf": lambda path: LlamaParse().load_data(path),
".docx": lambda path: UnstructuredWordDocumentLoader(path).load(),
".html": lambda path: custom_html_loader(path),
}
Metadata enrichment: enriching chunks with metadata is critical for filtering and attribution:
def enrich_chunk_metadata(chunk, source_doc):
"""Adds structured metadata to chunk"""
chunk.metadata.update({
"source": source_doc.metadata.get("source"),
"page": source_doc.metadata.get("page"),
"doc_type": detect_doc_type(source_doc), # "contract", "regulation", "faq"
"department": extract_department(source_doc),
"date": extract_date(source_doc),
"version": extract_version(source_doc),
"chunk_index": chunk.metadata.get("chunk_index"),
"parent_chunk_id": chunk.metadata.get("parent_id"),
})
return chunk
Retrieval Pipeline: Strategies
Sparse + Dense Hybrid Search:
from qdrant_client import QdrantClient
from qdrant_client.models import SparseVector, NamedSparseVector, NamedVector
# Hybrid search in Qdrant: BM25 sparse + embedding dense
def hybrid_search(query: str, top_k: int = 10) -> list:
# Dense embedding
dense_vector = embedder.embed_query(query)
# Sparse (BM25) via SPLADE or FastEmbed
sparse_vector = sparse_encoder.encode(query)
results = client.query_points(
collection_name="docs",
prefetch=[
{"query": dense_vector, "using": "dense", "limit": 30},
{"query": SparseVector(indices=sparse_vector.indices,
values=sparse_vector.values),
"using": "sparse", "limit": 30},
],
query=fusion, # RRF (Reciprocal Rank Fusion)
limit=top_k,
)
return results
Reranking Pipeline:
from flashrank import Ranker, RerankRequest
ranker = Ranker(model_name="ms-marco-MiniLM-L-12-v2")
def rerank_results(query: str, candidates: list[str]) -> list[str]:
rerank_request = RerankRequest(
query=query,
passages=[{"id": i, "text": c} for i, c in enumerate(candidates)]
)
results = ranker.rerank(rerank_request)
# Sort by score, take top-5
top_passages = [candidates[r["id"]] for r in sorted(results, key=lambda x: -x["score"])[:5]]
return top_passages
Query Transformation: Improving Query Before Search
Poorly formulated query = poor retrieval. Query transformations:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Multi-Query: generate 3 paraphrases of query
def multi_query_transform(original_query: str) -> list[str]:
response = llm.invoke(f"""Generate 3 different paraphrases of the following question.
Each variant should search for the same information but using different words.
Return JSON list of strings.
Question: {original_query}""")
queries = json.loads(response.content)
return [original_query] + queries # Original + 3 paraphrases
# Step-back prompting: abstract to more general question
def step_back_transform(specific_query: str) -> str:
response = llm.invoke(f"""Formulate a more general question, the answer to which
would help answer the specific question: "{specific_query}"
Return only the question, without explanations.""")
return response.content
Multi-Index Architecture
For systems with heterogeneous data sources, separate indexes are more effective:
class MultiIndexRAG:
def __init__(self):
self.indexes = {
"contracts": QdrantRetriever(collection="contracts"),
"regulations": QdrantRetriever(collection="regulations"),
"faq": QdrantRetriever(collection="faq"),
"procedures": QdrantRetriever(collection="procedures"),
}
self.router = QueryRouter() # Query classifier
def retrieve(self, query: str, top_k: int = 5) -> list:
# Determine relevant indexes
relevant_indexes = self.router.route(query)
# Parallel search across all relevant indexes
all_results = []
for index_name in relevant_indexes:
results = self.indexes[index_name].retrieve(query, k=top_k)
for r in results:
r.metadata["source_index"] = index_name
all_results.extend(results)
# Reranking merged results
return rerank_results(query, all_results)[:top_k]
Monitoring Retrieval Quality
# Tracing each query for analysis
import opentelemetry as otel
def traced_retrieval(query: str, span_name: str = "rag_retrieval"):
with otel.trace.get_tracer(__name__).start_as_current_span(span_name) as span:
start_time = time.time()
results = retriever.retrieve(query)
latency = time.time() - start_time
span.set_attributes({
"query.length": len(query),
"results.count": len(results),
"results.top_score": results[0].score if results else 0,
"retrieval.latency_ms": latency * 1000,
})
return results
Design and Development Timeline
- Architecture design: 1 week
- Basic ingestion pipeline: 1–2 weeks
- Advanced retrieval (hybrid search, reranking): 2–3 weeks
- Evaluation framework: 1–2 weeks
- Production hardening: 1–2 weeks
- Total: 6–10 weeks







