Реализация AI-каталога данных с автоклассификацией
Data Catalog — это инвентаризация всех датасетов организации: где данные хранятся, что означают, кто владелец, насколько свежие и чувствительные. AI-классификация автоматизирует тегирование (PII, финансы, здоровье), генерирует описания из DDL и данных, и обнаруживает семантические связи между таблицами.
Автоматическое сканирование и классификация
from anthropic import Anthropic
import sqlalchemy
import pandas as pd
import json
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class DataAsset:
asset_id: str
name: str
asset_type: str # table, view, file, api, topic
location: str
schema: dict
row_count: int
owner: Optional[str] = None
description: Optional[str] = None
tags: list = field(default_factory=list)
pii_columns: list = field(default_factory=list)
sensitivity_level: str = "internal"
last_updated: Optional[str] = None
class AIDataCatalog:
def __init__(self):
self.llm = Anthropic()
self.assets = {}
def scan_database(self, connection_string: str,
database_name: str) -> list[DataAsset]:
"""Сканирование базы данных и создание assets"""
engine = sqlalchemy.create_engine(connection_string)
inspector = sqlalchemy.inspect(engine)
assets = []
for table_name in inspector.get_table_names():
columns = inspector.get_columns(table_name)
schema = {col['name']: str(col['type']) for col in columns}
# Получение семпла данных
try:
sample_df = pd.read_sql(f"SELECT * FROM {table_name} LIMIT 5", engine)
sample_data = sample_df.to_dict('records')
row_count = pd.read_sql(
f"SELECT COUNT(*) as cnt FROM {table_name}", engine
)['cnt'].iloc[0]
except Exception:
sample_data = []
row_count = 0
# AI-классификация
classification = self._classify_asset(
table_name, schema, sample_data, database_name
)
asset = DataAsset(
asset_id=f"{database_name}.{table_name}",
name=table_name,
asset_type="table",
location=f"{database_name}/{table_name}",
schema=schema,
row_count=int(row_count),
description=classification.get('description'),
tags=classification.get('tags', []),
pii_columns=classification.get('pii_columns', []),
sensitivity_level=classification.get('sensitivity_level', 'internal')
)
assets.append(asset)
self.assets[asset.asset_id] = asset
return assets
def _classify_asset(self, table_name: str, schema: dict,
sample_data: list, context: str = "") -> dict:
"""LLM-классификация датасета"""
schema_str = json.dumps(schema)
sample_str = json.dumps(sample_data[:3], ensure_ascii=False)[:500]
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"""Classify this database table for a data catalog.
Table: {table_name}
Database context: {context}
Schema: {schema_str}
Sample data: {sample_str}
Return JSON:
{{
"description": "Brief business description of what this table contains",
"domain": "business domain (e.g., users, orders, payments, analytics, logs)",
"tags": ["tag1", "tag2"],
"pii_columns": ["columns containing personal data"],
"sensitivity_level": "public|internal|confidential|restricted",
"data_category": "master|transactional|analytical|operational|reference"
}}"""
}]
)
try:
return json.loads(response.content[0].text)
except Exception:
return {'description': 'Auto-discovered table', 'tags': [], 'pii_columns': []}
Поиск по каталогу
def search(self, query: str, filters: dict = None) -> list[DataAsset]:
"""Семантический поиск по каталогу"""
# Подготовка описаний всех активов
asset_descriptions = []
for asset_id, asset in self.assets.items():
desc = f"{asset.name}: {asset.description or 'No description'}"
desc += f" Tags: {', '.join(asset.tags)}"
desc += f" Columns: {', '.join(list(asset.schema.keys())[:10])}"
asset_descriptions.append({'id': asset_id, 'description': desc})
# LLM ищет релевантные активы
descriptions_text = "\n".join([
f"{i+1}. {a['id']}: {a['description']}"
for i, a in enumerate(asset_descriptions[:50])
])
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=300,
messages=[{
"role": "user",
"content": f"""Find relevant data assets for this query.
Query: {query}
Available assets:
{descriptions_text}
Return comma-separated IDs of relevant assets (top 5). No explanation."""
}]
)
relevant_ids = [id.strip() for id in response.content[0].text.split(',')]
results = [self.assets[id] for id in relevant_ids if id in self.assets]
# Применение фильтров
if filters:
if 'sensitivity_level' in filters:
results = [r for r in results
if r.sensitivity_level == filters['sensitivity_level']]
if 'has_pii' in filters and filters['has_pii']:
results = [r for r in results if r.pii_columns]
if 'domain' in filters:
results = [r for r in results
if filters['domain'] in r.tags]
return results
def find_related_assets(self, asset_id: str) -> list[dict]:
"""Поиск связанных датасетов по семантическому сходству"""
if asset_id not in self.assets:
return []
source_asset = self.assets[asset_id]
# Описания всех других активов
other_assets = {id: asset for id, asset in self.assets.items() if id != asset_id}
others_desc = "\n".join([
f"- {id}: {asset.description}, columns: {list(asset.schema.keys())[:5]}"
for id, asset in list(other_assets.items())[:30]
])
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=300,
messages=[{
"role": "user",
"content": f"""Find assets related to:
{source_asset.name}: {source_asset.description}
Columns: {list(source_asset.schema.keys())}
Other assets:
{others_desc}
Return JSON array: [{{"id": "...", "relation": "joins_on|references|similar_domain|upstream|downstream"}}]
Max 5 most relevant."""
}]
)
try:
return json.loads(response.content[0].text)
except Exception:
return []
def generate_data_dictionary(self, asset_id: str) -> dict:
"""Автогенерация data dictionary для датасета"""
if asset_id not in self.assets:
return {}
asset = self.assets[asset_id]
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=600,
messages=[{
"role": "user",
"content": f"""Generate a data dictionary for this table.
Table: {asset.name}
Description: {asset.description}
Schema: {json.dumps(asset.schema)}
Return JSON: {{"column_name": {{"description": "...", "example": "...", "notes": "..."}}}}"""
}]
)
try:
return json.loads(response.content[0].text)
except Exception:
return {}
Обнаружение PII и управление чувствительностью
def audit_pii_exposure(self) -> dict:
"""Аудит PII данных по всему каталогу"""
pii_report = {
'total_assets': len(self.assets),
'assets_with_pii': [],
'pii_columns_by_type': {}
}
for asset_id, asset in self.assets.items():
if asset.pii_columns:
pii_report['assets_with_pii'].append({
'asset': asset_id,
'pii_columns': asset.pii_columns,
'sensitivity': asset.sensitivity_level,
'owner': asset.owner
})
return pii_report
OpenMetadata и DataHub — наиболее зрелые open-source решения для корпоративного каталога. AI-слой поверх них добавляет автоматическую классификацию при обнаружении новых таблиц: тегирование занимает 30-60 секунд вместо ручного заполнения за 15-30 минут на актив. Для организации с 500+ таблицами это экономит 100-200 часов при первоначальном заполнении каталога.







