RAG Development with Milvus Vector Database

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
RAG Development with Milvus Vector Database
Medium
from 1 week to 3 months
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1212
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822

RAG Development with Milvus Vector Database

Milvus is an open-source vector database designed for production scale: billions of vectors, high throughput, horizontal scaling. Supports HNSW, IVF_FLAT, IVF_SQ8, DISKANN indexes and hybrid search (sparse + dense). Suitable for enterprise systems with large data volumes.

Installation and Connection

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility

# Connect to Milvus
connections.connect(
    alias="default",
    host="localhost",
    port="19530"
)

# Or via URI (Milvus Lite for local development)
from pymilvus import MilvusClient
client = MilvusClient("./milvus_local.db")  # SQLite-like file

Creating Collection with Schema

fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=4096),
    FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=512),
    FieldSchema(name="doc_type", dtype=DataType.VARCHAR, max_length=64),
    FieldSchema(name="page", dtype=DataType.INT32),
    FieldSchema(
        name="dense_vector",
        dtype=DataType.FLOAT_VECTOR,
        dim=1536  # text-embedding-3-small
    ),
    FieldSchema(
        name="sparse_vector",
        dtype=DataType.SPARSE_FLOAT_VECTOR  # BM25
    ),
]

schema = CollectionSchema(fields=fields, description="Corporate Knowledge Base")
collection = Collection(name="knowledge_base", schema=schema)

# Indexes for vector fields
collection.create_index(
    field_name="dense_vector",
    index_params={"metric_type": "COSINE", "index_type": "HNSW", "params": {"M": 16, "efConstruction": 200}}
)
collection.create_index(
    field_name="sparse_vector",
    index_params={"metric_type": "IP", "index_type": "SPARSE_INVERTED_INDEX"}
)

collection.load()

Hybrid Search with RRF

from pymilvus import AnnSearchRequest, RRFRanker

def milvus_hybrid_search(query: str, top_k: int = 5) -> list:
    # Dense vector
    dense_vec = dense_embedder.embed_query(query)

    # Sparse vector (via built-in BM25Encoder)
    sparse_vec = sparse_encoder.encode_queries([query])

    # Two queries for RRF
    dense_req = AnnSearchRequest(
        data=[dense_vec],
        anns_field="dense_vector",
        param={"metric_type": "COSINE", "params": {"ef": 100}},
        limit=30,
    )

    sparse_req = AnnSearchRequest(
        data=sparse_vec,
        anns_field="sparse_vector",
        param={"metric_type": "IP"},
        limit=30,
    )

    # RRF fusion
    results = collection.hybrid_search(
        reqs=[dense_req, sparse_req],
        rerank=RRFRanker(k=60),
        limit=top_k,
        output_fields=["text", "source", "doc_type"],
    )

    return results

Practical Case: Search in Product Documentation

Scale: 2.5M chunks (technical documentation in 6 languages, including Russian).

Index: HNSW (M=32, efConstruction=400) for dense; SPARSE_INVERTED_INDEX for sparse.

Throughput with hybrid search: 850 QPS with latency P99 < 400ms on 3-node cluster (8 vCPU, 32GB RAM each).

Comparison with Pinecone Serverless at same volume: Milvus self-hosted is ~8× cheaper at high QPS, but requires DevOps support.

Partitioning for Multi-Tenancy

# Create partitions for client isolation
collection.create_partition(partition_name="client_001")
collection.create_partition(partition_name="client_002")

# Search only in client partition
results = collection.search(
    data=[query_vector],
    anns_field="dense_vector",
    partition_names=["client_001"],
    limit=5,
)

Timeline

  • Milvus cluster setup + schema: 3–5 days
  • Ingestion pipeline with hybrid indexing: 5–10 days
  • RAG pipeline and evaluation: 1–2 weeks
  • Total: 3–5 weeks