Embedding Pipeline for Data Indexing Development

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
Embedding Pipeline for Data Indexing Development
Medium
~3-5 business days
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1221
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    855
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1056
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    828

Embedding Pipeline for Data Indexing Implementation

Embedding pipeline is the infrastructural foundation of any RAG system and semantic search. The task: take heterogeneous data (texts, tables, images), transform them into vector representations, and write to a vector database so that similar entities are close in space.

Embedding Model Selection

Model Dimensions Speed Languages Hosting
text-embedding-3-small 1536 8K tokens/sec 100+ OpenAI API
text-embedding-3-large 3072 3K tokens/sec 100+ OpenAI API
E5-large-v2 1024 2K tokens/sec EN Self-hosted
multilingual-e5-large 1024 1.5K tokens/sec 100+ Self-hosted
BGE-M3 1024 1K tokens/sec 100+ Self-hosted
Cohere Embed v3 1024 5K tokens/sec 100+ Cohere API

Basic Pipeline

import asyncio
import hashlib
from typing import Any
import numpy as np
from openai import AsyncOpenAI
import qdrant_client
from qdrant_client.models import Distance, VectorParams, PointStruct

class EmbeddingPipeline:
    def __init__(self, collection_name: str,
                 embedding_model: str = "text-embedding-3-small"):
        self.oai = AsyncOpenAI()
        self.qdrant = qdrant_client.QdrantClient(host="localhost", port=6333)
        self.model = embedding_model
        self.collection = collection_name
        self.batch_size = 100
        self._init_collection()

    def _init_collection(self):
        dims = {"text-embedding-3-small": 1536, "text-embedding-3-large": 3072}
        dim = dims.get(self.model, 1536)

        existing = [c.name for c in self.qdrant.get_collections().collections]
        if self.collection not in existing:
            self.qdrant.create_collection(
                collection_name=self.collection,
                vectors_config=VectorParams(size=dim, distance=Distance.COSINE)
            )

    async def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Batch embedding generation with retry"""
        max_retries = 3
        for attempt in range(max_retries):
            try:
                response = await self.oai.embeddings.create(
                    input=texts,
                    model=self.model
                )
                return [item.embedding for item in response.data]
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)

    async def index_documents(self, documents: list[dict],
                               text_field: str = "content") -> dict:
        """
        documents: list of dicts with text and metadata
        text_field: key with text for embedding
        """
        total = len(documents)
        indexed = 0
        skipped = 0

        # Check already indexed documents (deduplication by hash)
        existing_hashes = self._get_existing_hashes()

        # Filter duplicates
        new_docs = []
        for doc in documents:
            content_hash = hashlib.sha256(
                doc.get(text_field, "").encode()
            ).hexdigest()[:16]
            if content_hash not in existing_hashes:
                doc['_hash'] = content_hash
                new_docs.append(doc)
            else:
                skipped += 1

        # Batch processing
        for i in range(0, len(new_docs), self.batch_size):
            batch = new_docs[i:i + self.batch_size]
            texts = [doc.get(text_field, "") for doc in batch]

            # Truncate overly long texts
            texts = [t[:8000] for t in texts]

            embeddings = await self.embed_batch(texts)

            points = []
            for j, (doc, embedding) in enumerate(zip(batch, embeddings)):
                point_id = int(hashlib.sha256(
                    doc.get('id', str(i + j)).encode()
                ).hexdigest()[:8], 16) % (2**63)

                payload = {k: v for k, v in doc.items()
                           if k != text_field and not k.startswith('_')}
                payload['text_preview'] = doc.get(text_field, "")[:200]
                payload['_hash'] = doc['_hash']

                points.append(PointStruct(
                    id=point_id,
                    vector=embedding,
                    payload=payload
                ))

            self.qdrant.upsert(
                collection_name=self.collection,
                points=points
            )
            indexed += len(batch)

        return {'indexed': indexed, 'skipped': skipped, 'total': total}

    def _get_existing_hashes(self) -> set:
        """Get hashes of already indexed documents"""
        try:
            scroll_result = self.qdrant.scroll(
                collection_name=self.collection,
                scroll_filter=None,
                with_payload=["_hash"],
                limit=10000
            )
            return {point.payload.get('_hash', '') for point in scroll_result[0]}
        except Exception:
            return set()

Advanced Pipeline with Support for Different Data Types

class MultimodalEmbeddingPipeline:
    """Index texts, tables, and images"""

    def __init__(self):
        self.text_pipeline = EmbeddingPipeline("text_collection")
        self.table_pipeline = EmbeddingPipeline("table_collection")

    async def process_table(self, df: pd.DataFrame,
                              table_name: str,
                              description: str = "") -> list[dict]:
        """Table → set of indexable records"""
        documents = []

        # Table schema description
        schema_text = f"Table: {table_name}. {description}\n"
        schema_text += f"Columns: {', '.join(df.columns.tolist())}\n"
        schema_text += f"Sample data:\n{df.head(3).to_string()}"

        documents.append({
            'id': f"{table_name}_schema",
            'content': schema_text,
            'type': 'table_schema',
            'table_name': table_name
        })

        # Each row as separate record (for small tables)
        if len(df) <= 1000:
            for idx, row in df.iterrows():
                row_text = f"Row {idx} in {table_name}: " + \
                           ", ".join([f"{col}={val}" for col, val in row.items()])
                documents.append({
                    'id': f"{table_name}_row_{idx}",
                    'content': row_text,
                    'type': 'table_row',
                    'table_name': table_name,
                    'row_index': idx
                })

        return documents

    async def process_structured_content(self, content: dict,
                                          content_type: str) -> list[dict]:
        """JSON/structured data → embeddings"""
        import json

        # Flat text representation
        flat_text = self._flatten_dict(content)

        return [{
            'id': content.get('id', hashlib.md5(flat_text.encode()).hexdigest()),
            'content': flat_text,
            'type': content_type,
            'raw': json.dumps(content, ensure_ascii=False)[:1000]
        }]

    def _flatten_dict(self, d: dict, prefix: str = "") -> str:
        """Recursive transformation of dict to text"""
        parts = []
        for key, value in d.items():
            full_key = f"{prefix}.{key}" if prefix else key
            if isinstance(value, dict):
                parts.append(self._flatten_dict(value, full_key))
            elif isinstance(value, list):
                parts.append(f"{full_key}: {', '.join(str(v) for v in value[:10])}")
            else:
                parts.append(f"{full_key}: {value}")
        return ". ".join(parts)

Searching Through Indexed Data

    async def semantic_search(self, query: str,
                               limit: int = 5,
                               score_threshold: float = 0.7) -> list[dict]:
        query_embedding = (await self.text_pipeline.embed_batch([query]))[0]

        results = self.text_pipeline.qdrant.search(
            collection_name=self.text_pipeline.collection,
            query_vector=query_embedding,
            limit=limit,
            score_threshold=score_threshold,
            with_payload=True
        )

        return [
            {
                'score': hit.score,
                'text': hit.payload.get('text_preview', ''),
                'metadata': {k: v for k, v in hit.payload.items()
                             if k not in ['text_preview', '_hash']}
            }
            for hit in results
        ]

Pipeline Performance

Throughput with batch processing through OpenAI API: 500-800 documents/minute (text-embedding-3-small). Self-hosted BGE-M3 on A10G GPU: 1200-1500 documents/minute. Cost of indexing 1M documents via API: ~$1.5-2. Incremental update via hashing: only new and changed documents, saving 70-90% on daily updates.