RAG Pipeline Architecture Design

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
RAG Pipeline Architecture Design
Medium
~3-5 business days
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1212
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822

RAG Pipeline Architecture Design

The architecture of a RAG pipeline determines the quality, scalability, and cost of the entire system. Basic RAG "works" in a day, but a production-ready system with reliable retrieval, monitoring, and managed costs requires careful design.

Components of Modern RAG Pipeline

┌─────────────────────────────────────────────────────┐
│                 INGESTION PIPELINE                   │
│  Sources → Loaders → Parsers → Chunkers → Embedder  │
│           → Metadata Extractor → Vector Store        │
└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐
│                 RETRIEVAL PIPELINE                   │
│  Query → Query Transformer → Multi-Index Search     │
│        → Reranker → Context Assembler               │
└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐
│                GENERATION PIPELINE                   │
│  Context + Query → Prompt Builder → LLM             │
│                  → Response Validator → User         │
└─────────────────────────────────────────────────────┘

Ingestion Pipeline: Architectural Solutions

Document Loaders: choice of loaders is critical for quality. PDFs with tables require pdfplumber or LlamaParse, not PyPDF2. Word documents — python-docx, HTML — BeautifulSoup with custom cleanup rules.

from llama_parse import LlamaParse
from langchain_community.document_loaders import (
    PyPDFLoader, UnstructuredWordDocumentLoader,
    ConfluenceLoader, NotionDBLoader
)

# For complex PDFs (tables, columns, images)
parser = LlamaParse(
    api_key="...",
    result_type="markdown",  # Preserves table structure
    language="en",
)

# Configurable loading pipeline
LOADERS = {
    ".pdf": lambda path: LlamaParse().load_data(path),
    ".docx": lambda path: UnstructuredWordDocumentLoader(path).load(),
    ".html": lambda path: custom_html_loader(path),
}

Metadata enrichment: enriching chunks with metadata is critical for filtering and attribution:

def enrich_chunk_metadata(chunk, source_doc):
    """Adds structured metadata to chunk"""
    chunk.metadata.update({
        "source": source_doc.metadata.get("source"),
        "page": source_doc.metadata.get("page"),
        "doc_type": detect_doc_type(source_doc),     # "contract", "regulation", "faq"
        "department": extract_department(source_doc),
        "date": extract_date(source_doc),
        "version": extract_version(source_doc),
        "chunk_index": chunk.metadata.get("chunk_index"),
        "parent_chunk_id": chunk.metadata.get("parent_id"),
    })
    return chunk

Retrieval Pipeline: Strategies

Sparse + Dense Hybrid Search:

from qdrant_client import QdrantClient
from qdrant_client.models import SparseVector, NamedSparseVector, NamedVector

# Hybrid search in Qdrant: BM25 sparse + embedding dense
def hybrid_search(query: str, top_k: int = 10) -> list:
    # Dense embedding
    dense_vector = embedder.embed_query(query)

    # Sparse (BM25) via SPLADE or FastEmbed
    sparse_vector = sparse_encoder.encode(query)

    results = client.query_points(
        collection_name="docs",
        prefetch=[
            {"query": dense_vector, "using": "dense", "limit": 30},
            {"query": SparseVector(indices=sparse_vector.indices,
                                   values=sparse_vector.values),
             "using": "sparse", "limit": 30},
        ],
        query=fusion,  # RRF (Reciprocal Rank Fusion)
        limit=top_k,
    )
    return results

Reranking Pipeline:

from flashrank import Ranker, RerankRequest

ranker = Ranker(model_name="ms-marco-MiniLM-L-12-v2")

def rerank_results(query: str, candidates: list[str]) -> list[str]:
    rerank_request = RerankRequest(
        query=query,
        passages=[{"id": i, "text": c} for i, c in enumerate(candidates)]
    )
    results = ranker.rerank(rerank_request)
    # Sort by score, take top-5
    top_passages = [candidates[r["id"]] for r in sorted(results, key=lambda x: -x["score"])[:5]]
    return top_passages

Query Transformation: Improving Query Before Search

Poorly formulated query = poor retrieval. Query transformations:

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# Multi-Query: generate 3 paraphrases of query
def multi_query_transform(original_query: str) -> list[str]:
    response = llm.invoke(f"""Generate 3 different paraphrases of the following question.
Each variant should search for the same information but using different words.
Return JSON list of strings.

Question: {original_query}""")
    queries = json.loads(response.content)
    return [original_query] + queries  # Original + 3 paraphrases

# Step-back prompting: abstract to more general question
def step_back_transform(specific_query: str) -> str:
    response = llm.invoke(f"""Formulate a more general question, the answer to which
would help answer the specific question: "{specific_query}"
Return only the question, without explanations.""")
    return response.content

Multi-Index Architecture

For systems with heterogeneous data sources, separate indexes are more effective:

class MultiIndexRAG:
    def __init__(self):
        self.indexes = {
            "contracts": QdrantRetriever(collection="contracts"),
            "regulations": QdrantRetriever(collection="regulations"),
            "faq": QdrantRetriever(collection="faq"),
            "procedures": QdrantRetriever(collection="procedures"),
        }
        self.router = QueryRouter()  # Query classifier

    def retrieve(self, query: str, top_k: int = 5) -> list:
        # Determine relevant indexes
        relevant_indexes = self.router.route(query)

        # Parallel search across all relevant indexes
        all_results = []
        for index_name in relevant_indexes:
            results = self.indexes[index_name].retrieve(query, k=top_k)
            for r in results:
                r.metadata["source_index"] = index_name
            all_results.extend(results)

        # Reranking merged results
        return rerank_results(query, all_results)[:top_k]

Monitoring Retrieval Quality

# Tracing each query for analysis
import opentelemetry as otel

def traced_retrieval(query: str, span_name: str = "rag_retrieval"):
    with otel.trace.get_tracer(__name__).start_as_current_span(span_name) as span:
        start_time = time.time()
        results = retriever.retrieve(query)
        latency = time.time() - start_time

        span.set_attributes({
            "query.length": len(query),
            "results.count": len(results),
            "results.top_score": results[0].score if results else 0,
            "retrieval.latency_ms": latency * 1000,
        })
        return results

Design and Development Timeline

  • Architecture design: 1 week
  • Basic ingestion pipeline: 1–2 weeks
  • Advanced retrieval (hybrid search, reranking): 2–3 weeks
  • Evaluation framework: 1–2 weeks
  • Production hardening: 1–2 weeks
  • Total: 6–10 weeks