AI-система Data Catalog с автоматической классификацией

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1Все 1566 услуг
AI-система Data Catalog с автоматической классификацией
Средний
~2-4 недели
Часто задаваемые вопросы

Направления AI-разработки

Этапы разработки AI-решения

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1284
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    901
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1119
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    853

Реализация AI-каталога данных с автоклассификацией

Data Catalog — это инвентаризация всех датасетов организации: где данные хранятся, что означают, кто владелец, насколько свежие и чувствительные. AI-классификация автоматизирует тегирование (PII, финансы, здоровье), генерирует описания из DDL и данных, и обнаруживает семантические связи между таблицами.

Автоматическое сканирование и классификация

from anthropic import Anthropic
import sqlalchemy
import pandas as pd
import json
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class DataAsset:
    asset_id: str
    name: str
    asset_type: str  # table, view, file, api, topic
    location: str
    schema: dict
    row_count: int
    owner: Optional[str] = None
    description: Optional[str] = None
    tags: list = field(default_factory=list)
    pii_columns: list = field(default_factory=list)
    sensitivity_level: str = "internal"
    last_updated: Optional[str] = None

class AIDataCatalog:
    def __init__(self):
        self.llm = Anthropic()
        self.assets = {}

    def scan_database(self, connection_string: str,
                       database_name: str) -> list[DataAsset]:
        """Сканирование базы данных и создание assets"""
        engine = sqlalchemy.create_engine(connection_string)
        inspector = sqlalchemy.inspect(engine)
        assets = []

        for table_name in inspector.get_table_names():
            columns = inspector.get_columns(table_name)
            schema = {col['name']: str(col['type']) for col in columns}

            # Получение семпла данных
            try:
                sample_df = pd.read_sql(f"SELECT * FROM {table_name} LIMIT 5", engine)
                sample_data = sample_df.to_dict('records')
                row_count = pd.read_sql(
                    f"SELECT COUNT(*) as cnt FROM {table_name}", engine
                )['cnt'].iloc[0]
            except Exception:
                sample_data = []
                row_count = 0

            # AI-классификация
            classification = self._classify_asset(
                table_name, schema, sample_data, database_name
            )

            asset = DataAsset(
                asset_id=f"{database_name}.{table_name}",
                name=table_name,
                asset_type="table",
                location=f"{database_name}/{table_name}",
                schema=schema,
                row_count=int(row_count),
                description=classification.get('description'),
                tags=classification.get('tags', []),
                pii_columns=classification.get('pii_columns', []),
                sensitivity_level=classification.get('sensitivity_level', 'internal')
            )

            assets.append(asset)
            self.assets[asset.asset_id] = asset

        return assets

    def _classify_asset(self, table_name: str, schema: dict,
                         sample_data: list, context: str = "") -> dict:
        """LLM-классификация датасета"""
        schema_str = json.dumps(schema)
        sample_str = json.dumps(sample_data[:3], ensure_ascii=False)[:500]

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=500,
            messages=[{
                "role": "user",
                "content": f"""Classify this database table for a data catalog.

Table: {table_name}
Database context: {context}
Schema: {schema_str}
Sample data: {sample_str}

Return JSON:
{{
  "description": "Brief business description of what this table contains",
  "domain": "business domain (e.g., users, orders, payments, analytics, logs)",
  "tags": ["tag1", "tag2"],
  "pii_columns": ["columns containing personal data"],
  "sensitivity_level": "public|internal|confidential|restricted",
  "data_category": "master|transactional|analytical|operational|reference"
}}"""
            }]
        )

        try:
            return json.loads(response.content[0].text)
        except Exception:
            return {'description': 'Auto-discovered table', 'tags': [], 'pii_columns': []}

Поиск по каталогу

    def search(self, query: str, filters: dict = None) -> list[DataAsset]:
        """Семантический поиск по каталогу"""
        # Подготовка описаний всех активов
        asset_descriptions = []
        for asset_id, asset in self.assets.items():
            desc = f"{asset.name}: {asset.description or 'No description'}"
            desc += f" Tags: {', '.join(asset.tags)}"
            desc += f" Columns: {', '.join(list(asset.schema.keys())[:10])}"
            asset_descriptions.append({'id': asset_id, 'description': desc})

        # LLM ищет релевантные активы
        descriptions_text = "\n".join([
            f"{i+1}. {a['id']}: {a['description']}"
            for i, a in enumerate(asset_descriptions[:50])
        ])

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=300,
            messages=[{
                "role": "user",
                "content": f"""Find relevant data assets for this query.

Query: {query}

Available assets:
{descriptions_text}

Return comma-separated IDs of relevant assets (top 5). No explanation."""
            }]
        )

        relevant_ids = [id.strip() for id in response.content[0].text.split(',')]

        results = [self.assets[id] for id in relevant_ids if id in self.assets]

        # Применение фильтров
        if filters:
            if 'sensitivity_level' in filters:
                results = [r for r in results
                           if r.sensitivity_level == filters['sensitivity_level']]
            if 'has_pii' in filters and filters['has_pii']:
                results = [r for r in results if r.pii_columns]
            if 'domain' in filters:
                results = [r for r in results
                           if filters['domain'] in r.tags]

        return results

    def find_related_assets(self, asset_id: str) -> list[dict]:
        """Поиск связанных датасетов по семантическому сходству"""
        if asset_id not in self.assets:
            return []

        source_asset = self.assets[asset_id]

        # Описания всех других активов
        other_assets = {id: asset for id, asset in self.assets.items() if id != asset_id}
        others_desc = "\n".join([
            f"- {id}: {asset.description}, columns: {list(asset.schema.keys())[:5]}"
            for id, asset in list(other_assets.items())[:30]
        ])

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=300,
            messages=[{
                "role": "user",
                "content": f"""Find assets related to:
{source_asset.name}: {source_asset.description}
Columns: {list(source_asset.schema.keys())}

Other assets:
{others_desc}

Return JSON array: [{{"id": "...", "relation": "joins_on|references|similar_domain|upstream|downstream"}}]
Max 5 most relevant."""
            }]
        )

        try:
            return json.loads(response.content[0].text)
        except Exception:
            return []

    def generate_data_dictionary(self, asset_id: str) -> dict:
        """Автогенерация data dictionary для датасета"""
        if asset_id not in self.assets:
            return {}

        asset = self.assets[asset_id]

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=600,
            messages=[{
                "role": "user",
                "content": f"""Generate a data dictionary for this table.

Table: {asset.name}
Description: {asset.description}
Schema: {json.dumps(asset.schema)}

Return JSON: {{"column_name": {{"description": "...", "example": "...", "notes": "..."}}}}"""
            }]
        )

        try:
            return json.loads(response.content[0].text)
        except Exception:
            return {}

Обнаружение PII и управление чувствительностью

    def audit_pii_exposure(self) -> dict:
        """Аудит PII данных по всему каталогу"""
        pii_report = {
            'total_assets': len(self.assets),
            'assets_with_pii': [],
            'pii_columns_by_type': {}
        }

        for asset_id, asset in self.assets.items():
            if asset.pii_columns:
                pii_report['assets_with_pii'].append({
                    'asset': asset_id,
                    'pii_columns': asset.pii_columns,
                    'sensitivity': asset.sensitivity_level,
                    'owner': asset.owner
                })

        return pii_report

OpenMetadata и DataHub — наиболее зрелые open-source решения для корпоративного каталога. AI-слой поверх них добавляет автоматическую классификацию при обнаружении новых таблиц: тегирование занимает 30-60 секунд вместо ручного заполнения за 15-30 минут на актив. Для организации с 500+ таблицами это экономит 100-200 часов при первоначальном заполнении каталога.