Development of AI System for Client Data Enrichment from Open Sources
AI data enrichment system automatically supplements client profile with information from open sources: LinkedIn, Crunchbase, GitHub, news sites, company registries. This accelerates onboarding, improves lead scoring quality, and personalization.
System Architecture
from dataclasses import dataclass
from typing import Optional
@dataclass
class EnrichedProfile:
# Original data
email: str
company_name: str
# Enriched data
linkedin_profile: Optional[dict] = None
company_info: Optional[dict] = None
news_mentions: Optional[list] = None
tech_stack: Optional[list] = None
funding_info: Optional[dict] = None
enrichment_score: float = 0.0 # Data confidence
class DataEnrichmentPipeline:
def __init__(self):
self.enrichers = [
LinkedInEnricher(),
ClearbitEnricher(),
CrunchbaseEnricher(),
NewsEnricher(),
GitHubEnricher(),
]
async def enrich(self, email: str, company: str) -> EnrichedProfile:
profile = EnrichedProfile(email=email, company_name=company)
# Parallel enrichment from all sources
tasks = [enricher.enrich(email, company) for enricher in self.enrichers]
results = await asyncio.gather(*tasks, return_exceptions=True)
for enricher, result in zip(self.enrichers, results):
if isinstance(result, Exception):
continue # One source won't break the whole pipeline
profile = enricher.apply_to_profile(profile, result)
profile.enrichment_score = self._compute_score(profile)
return profile
LinkedIn Enrichment via ProxyCurl
import httpx
class LinkedInEnricher:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://nubela.co/proxycurl/api"
async def enrich(self, email: str, company: str) -> dict:
async with httpx.AsyncClient() as client:
# Find profile by email
response = await client.get(
f"{self.base_url}/linkedin/profile/resolve/email",
params={"email": email},
headers={"Authorization": f"Bearer {self.api_key}"}
)
if response.status_code != 200:
return {}
profile_url = response.json().get('linkedin_profile_url')
if not profile_url:
return {}
# Get full profile
profile_response = await client.get(
f"{self.base_url}/v2/linkedin",
params={"url": profile_url, "skills": "include"},
headers={"Authorization": f"Bearer {self.api_key}"}
)
return profile_response.json()
AI Technology Stack Extraction
class TechStackExtractor:
def __init__(self):
self.llm = Anthropic()
async def extract_from_website(self, domain: str) -> list[str]:
"""Extracting tech stack from company website via AI"""
# Collecting content from website
job_postings = await self._scrape_job_postings(domain)
about_page = await self._scrape_page(f"https://{domain}/about")
combined_text = ' '.join([about_page] + job_postings[:5])
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=300,
messages=[{
"role": "user",
"content": f"""Extract technology stack from this company information.
Return JSON array of technology names (programming languages, frameworks, cloud platforms, databases).
Only include clearly mentioned technologies.
Text: {combined_text[:3000]}"""
}]
)
return json.loads(response.content[0].text)
Quality and Deduplication of Enriched Data
Different sources provide different data for the same company. We need reconciliation logic:
def reconcile_company_info(sources: list[dict]) -> dict:
"""Combining company data from multiple sources"""
reconciled = {}
# Source priority: official registries > Clearbit > Web scraping
priority_order = ['company_registry', 'clearbit', 'linkedin', 'web_scraping']
for field in ['employee_count', 'founded_year', 'industry', 'headquarters']:
for source_name in priority_order:
source = next((s for s in sources if s.get('source') == source_name), None)
if source and field in source:
reconciled[field] = source[field]
break
return reconciled
Typical result: enriching 80-90% of CRM contacts with additional data in 2-5 seconds per record. Data quality: 85-90% accuracy for basic fields (job title, industry, company size).







