Embedding Pipeline for Data Indexing Implementation
Embedding pipeline is the infrastructural foundation of any RAG system and semantic search. The task: take heterogeneous data (texts, tables, images), transform them into vector representations, and write to a vector database so that similar entities are close in space.
Embedding Model Selection
| Model | Dimensions | Speed | Languages | Hosting |
|---|---|---|---|---|
| text-embedding-3-small | 1536 | 8K tokens/sec | 100+ | OpenAI API |
| text-embedding-3-large | 3072 | 3K tokens/sec | 100+ | OpenAI API |
| E5-large-v2 | 1024 | 2K tokens/sec | EN | Self-hosted |
| multilingual-e5-large | 1024 | 1.5K tokens/sec | 100+ | Self-hosted |
| BGE-M3 | 1024 | 1K tokens/sec | 100+ | Self-hosted |
| Cohere Embed v3 | 1024 | 5K tokens/sec | 100+ | Cohere API |
Basic Pipeline
import asyncio
import hashlib
from typing import Any
import numpy as np
from openai import AsyncOpenAI
import qdrant_client
from qdrant_client.models import Distance, VectorParams, PointStruct
class EmbeddingPipeline:
def __init__(self, collection_name: str,
embedding_model: str = "text-embedding-3-small"):
self.oai = AsyncOpenAI()
self.qdrant = qdrant_client.QdrantClient(host="localhost", port=6333)
self.model = embedding_model
self.collection = collection_name
self.batch_size = 100
self._init_collection()
def _init_collection(self):
dims = {"text-embedding-3-small": 1536, "text-embedding-3-large": 3072}
dim = dims.get(self.model, 1536)
existing = [c.name for c in self.qdrant.get_collections().collections]
if self.collection not in existing:
self.qdrant.create_collection(
collection_name=self.collection,
vectors_config=VectorParams(size=dim, distance=Distance.COSINE)
)
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""Batch embedding generation with retry"""
max_retries = 3
for attempt in range(max_retries):
try:
response = await self.oai.embeddings.create(
input=texts,
model=self.model
)
return [item.embedding for item in response.data]
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
async def index_documents(self, documents: list[dict],
text_field: str = "content") -> dict:
"""
documents: list of dicts with text and metadata
text_field: key with text for embedding
"""
total = len(documents)
indexed = 0
skipped = 0
# Check already indexed documents (deduplication by hash)
existing_hashes = self._get_existing_hashes()
# Filter duplicates
new_docs = []
for doc in documents:
content_hash = hashlib.sha256(
doc.get(text_field, "").encode()
).hexdigest()[:16]
if content_hash not in existing_hashes:
doc['_hash'] = content_hash
new_docs.append(doc)
else:
skipped += 1
# Batch processing
for i in range(0, len(new_docs), self.batch_size):
batch = new_docs[i:i + self.batch_size]
texts = [doc.get(text_field, "") for doc in batch]
# Truncate overly long texts
texts = [t[:8000] for t in texts]
embeddings = await self.embed_batch(texts)
points = []
for j, (doc, embedding) in enumerate(zip(batch, embeddings)):
point_id = int(hashlib.sha256(
doc.get('id', str(i + j)).encode()
).hexdigest()[:8], 16) % (2**63)
payload = {k: v for k, v in doc.items()
if k != text_field and not k.startswith('_')}
payload['text_preview'] = doc.get(text_field, "")[:200]
payload['_hash'] = doc['_hash']
points.append(PointStruct(
id=point_id,
vector=embedding,
payload=payload
))
self.qdrant.upsert(
collection_name=self.collection,
points=points
)
indexed += len(batch)
return {'indexed': indexed, 'skipped': skipped, 'total': total}
def _get_existing_hashes(self) -> set:
"""Get hashes of already indexed documents"""
try:
scroll_result = self.qdrant.scroll(
collection_name=self.collection,
scroll_filter=None,
with_payload=["_hash"],
limit=10000
)
return {point.payload.get('_hash', '') for point in scroll_result[0]}
except Exception:
return set()
Advanced Pipeline with Support for Different Data Types
class MultimodalEmbeddingPipeline:
"""Index texts, tables, and images"""
def __init__(self):
self.text_pipeline = EmbeddingPipeline("text_collection")
self.table_pipeline = EmbeddingPipeline("table_collection")
async def process_table(self, df: pd.DataFrame,
table_name: str,
description: str = "") -> list[dict]:
"""Table → set of indexable records"""
documents = []
# Table schema description
schema_text = f"Table: {table_name}. {description}\n"
schema_text += f"Columns: {', '.join(df.columns.tolist())}\n"
schema_text += f"Sample data:\n{df.head(3).to_string()}"
documents.append({
'id': f"{table_name}_schema",
'content': schema_text,
'type': 'table_schema',
'table_name': table_name
})
# Each row as separate record (for small tables)
if len(df) <= 1000:
for idx, row in df.iterrows():
row_text = f"Row {idx} in {table_name}: " + \
", ".join([f"{col}={val}" for col, val in row.items()])
documents.append({
'id': f"{table_name}_row_{idx}",
'content': row_text,
'type': 'table_row',
'table_name': table_name,
'row_index': idx
})
return documents
async def process_structured_content(self, content: dict,
content_type: str) -> list[dict]:
"""JSON/structured data → embeddings"""
import json
# Flat text representation
flat_text = self._flatten_dict(content)
return [{
'id': content.get('id', hashlib.md5(flat_text.encode()).hexdigest()),
'content': flat_text,
'type': content_type,
'raw': json.dumps(content, ensure_ascii=False)[:1000]
}]
def _flatten_dict(self, d: dict, prefix: str = "") -> str:
"""Recursive transformation of dict to text"""
parts = []
for key, value in d.items():
full_key = f"{prefix}.{key}" if prefix else key
if isinstance(value, dict):
parts.append(self._flatten_dict(value, full_key))
elif isinstance(value, list):
parts.append(f"{full_key}: {', '.join(str(v) for v in value[:10])}")
else:
parts.append(f"{full_key}: {value}")
return ". ".join(parts)
Searching Through Indexed Data
async def semantic_search(self, query: str,
limit: int = 5,
score_threshold: float = 0.7) -> list[dict]:
query_embedding = (await self.text_pipeline.embed_batch([query]))[0]
results = self.text_pipeline.qdrant.search(
collection_name=self.text_pipeline.collection,
query_vector=query_embedding,
limit=limit,
score_threshold=score_threshold,
with_payload=True
)
return [
{
'score': hit.score,
'text': hit.payload.get('text_preview', ''),
'metadata': {k: v for k, v in hit.payload.items()
if k not in ['text_preview', '_hash']}
}
for hit in results
]
Pipeline Performance
Throughput with batch processing through OpenAI API: 500-800 documents/minute (text-embedding-3-small). Self-hosted BGE-M3 on A10G GPU: 1200-1500 documents/minute. Cost of indexing 1M documents via API: ~$1.5-2. Incremental update via hashing: only new and changed documents, saving 70-90% on daily updates.







