AI-система для Data Engineering

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1Все 1566 услуг
AI-система для Data Engineering
Сложный
от 2 недель до 3 месяцев
Часто задаваемые вопросы

Направления AI-разработки

Этапы разработки AI-решения

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1284
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    901
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1119
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    853

Разработка AI-системы дата-инжиниринга

AI-система дата-инжиниринга — это оркестрация всего жизненного цикла данных: автоматическое профилирование источников, генерация ETL-трансформаций, контроль качества, обнаружение аномалий и самовосстановление при сбоях. Вместо того чтобы писать каждый пайплайн вручную, система генерирует его из описания бизнес-требований.

Архитектура системы

[Data Sources]                    ← API, DB, S3, Kafka, files
        ↓
[Auto-Discovery & Profiling]      ← схема, статистика, качество
        ↓
[AI Pipeline Generation]          ← LLM → DAG код (Airflow/Prefect)
        ↓
[Transformation Engine]           ← dbt, Spark, pandas
        ↓
[Quality Gate]                    ← Great Expectations, custom rules
        ↓
[Data Catalog & Lineage]          ← OpenMetadata, DataHub
        ↓
[ML Feature Store]                ← Feast, Hopsworks
        ↓
[Consumers]                       ← BI, ML models, APIs

Автогенерация ETL-пайплайнов

from anthropic import Anthropic
import pandas as pd
import yaml
import json
from dataclasses import dataclass

@dataclass
class DataSource:
    name: str
    type: str  # postgres, s3, api, kafka
    connection: dict
    schema: dict = None

class AIDataEngineeringSystem:
    def __init__(self):
        self.llm = Anthropic()
        self.pipelines = {}
        self.quality_rules = {}

    def generate_pipeline(self, source: DataSource, target: dict,
                          business_requirements: str) -> dict:
        """Генерация ETL пайплайна из бизнес-требований"""

        # Профилирование источника
        if source.schema is None:
            source.schema = self._profile_source(source)

        # Генерация трансформаций через LLM
        pipeline_code = self._generate_transformations(
            source, target, business_requirements
        )

        # Генерация правил качества
        quality_rules = self._generate_quality_rules(source.schema, business_requirements)

        # Сборка DAG
        dag = self._generate_airflow_dag(source, target, pipeline_code, quality_rules)

        return {
            'pipeline_code': pipeline_code,
            'quality_rules': quality_rules,
            'dag': dag,
            'source_schema': source.schema
        }

    def _profile_source(self, source: DataSource) -> dict:
        """Автоматическое профилирование источника данных"""
        if source.type == 'postgres':
            import sqlalchemy
            engine = sqlalchemy.create_engine(source.connection['url'])

            # Получение схемы
            inspector = sqlalchemy.inspect(engine)
            schema = {}

            for table_name in inspector.get_table_names():
                columns = inspector.get_columns(table_name)
                schema[table_name] = {
                    'columns': {col['name']: str(col['type']) for col in columns},
                    'row_count': pd.read_sql(
                        f"SELECT COUNT(*) as cnt FROM {table_name}", engine
                    )['cnt'].iloc[0]
                }

            return schema

        elif source.type == 's3':
            import boto3
            s3 = boto3.client('s3', **source.connection)
            # Профилирование S3 объектов
            return self._profile_s3_files(s3, source.connection)

        return {}

    def _generate_transformations(self, source: DataSource, target: dict,
                                   requirements: str) -> str:
        """LLM генерирует код трансформаций"""
        schema_str = json.dumps(source.schema, indent=2)

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1500,
            system="""You are a senior data engineer. Generate production-quality Python ETL code.
Use pandas/SQLAlchemy. Include error handling, logging, and type hints.
Return only Python code.""",
            messages=[{
                "role": "user",
                "content": f"""Generate ETL transformation code.

Source: {source.type}
Source schema: {schema_str}

Target: {json.dumps(target)}

Business requirements:
{requirements}

Generate Python function def transform(df: pd.DataFrame) -> pd.DataFrame that implements the requirements."""
            }]
        )

        return response.content[0].text

    def _generate_quality_rules(self, schema: dict, requirements: str) -> dict:
        """Автогенерация правил качества данных"""
        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=800,
            messages=[{
                "role": "user",
                "content": f"""Generate Great Expectations data quality rules as JSON.

Schema: {json.dumps(schema, indent=2)[:1000]}
Requirements: {requirements}

Return JSON with expectations:
{{
  "expectations": [
    {{"type": "expect_column_values_to_not_be_null", "column": "id"}},
    {{"type": "expect_column_values_to_be_between", "column": "amount", "min_value": 0}},
    ...
  ]
}}"""
            }]
        )

        try:
            return json.loads(response.content[0].text)
        except Exception:
            return {"expectations": []}

    def _generate_airflow_dag(self, source: DataSource, target: dict,
                               pipeline_code: str, quality_rules: dict) -> str:
        """Генерация Airflow DAG"""
        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1000,
            messages=[{
                "role": "user",
                "content": f"""Generate an Airflow DAG that:
1. Extracts data from {source.type}
2. Applies transformations
3. Validates quality rules
4. Loads to target: {json.dumps(target)}
5. Sends alerts on failure

Include: proper retries, SLA, email alerts.
Use Airflow 2.x TaskFlow API."""
            }]
        )
        return response.content[0].text

Мониторинг и самовосстановление

class PipelineMonitor:
    """AI-мониторинг пайплайнов с автовосстановлением"""

    def __init__(self, system: AIDataEngineeringSystem):
        self.system = system
        self.llm = Anthropic()
        self.failure_history = []

    def analyze_failure(self, pipeline_name: str, error: str,
                         context: dict) -> dict:
        """LLM-анализ сбоя и генерация fix"""
        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=600,
            messages=[{
                "role": "user",
                "content": f"""Data pipeline "{pipeline_name}" failed.

Error: {error}

Context:
- Source: {context.get('source_type')}
- Records processed: {context.get('records_processed', 0)}
- Last successful run: {context.get('last_success')}
- Error stack: {context.get('traceback', '')[:500]}

Provide:
1. Root cause (1-2 sentences)
2. Immediate fix (code if applicable)
3. Long-term prevention
4. Severity: critical/warning/info"""
            }]
        )

        analysis = response.content[0].text

        # Автоматические действия при известных ошибках
        auto_fix = self._attempt_auto_fix(error, context)

        return {
            'analysis': analysis,
            'auto_fix_applied': auto_fix is not None,
            'auto_fix': auto_fix,
            'pipeline': pipeline_name
        }

    def _attempt_auto_fix(self, error: str, context: dict) -> str:
        """Автоматические исправления для типовых ошибок"""
        error_lower = error.lower()

        if 'connection refused' in error_lower or 'timeout' in error_lower:
            return "retry_with_backoff"
        elif 'schema mismatch' in error_lower or 'column not found' in error_lower:
            return "refresh_schema_and_retry"
        elif 'disk full' in error_lower or 'out of memory' in error_lower:
            return "reduce_batch_size_and_retry"
        elif 'duplicate key' in error_lower:
            return "switch_to_upsert_mode"

        return None

    def generate_pipeline_report(self, pipeline_name: str,
                                  metrics: dict) -> str:
        """Еженедельный отчёт по пайплайну"""
        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=400,
            messages=[{
                "role": "user",
                "content": f"""Summarize pipeline health for ops report.

Pipeline: {pipeline_name}
Metrics (last 7 days):
{json.dumps(metrics, indent=2)}

Give: status assessment, key issues, trend, recommended actions. 3-5 sentences."""
            }]
        )
        return response.content[0].text

dbt интеграция для трансформаций

class DBTManager:
    """Управление dbt моделями через AI"""

    def __init__(self, project_dir: str):
        self.project_dir = project_dir
        self.llm = Anthropic()

    def generate_model(self, model_name: str, requirements: str,
                        source_tables: list[str]) -> str:
        """Генерация dbt модели из требований"""
        # Получение схем источников
        sources_info = self._get_sources_info(source_tables)

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=800,
            messages=[{
                "role": "user",
                "content": f"""Generate a dbt SQL model.

Model name: {model_name}
Requirements: {requirements}
Available source tables: {json.dumps(sources_info)}

Generate:
1. SQL model using dbt ref() and source() macros
2. Model config block (materialization, tags)
3. Column-level descriptions as SQL comments"""
            }]
        )

        model_sql = response.content[0].text

        # Сохранение модели
        model_path = f"{self.project_dir}/models/{model_name}.sql"
        with open(model_path, 'w') as f:
            f.write(model_sql)

        # Генерация schema.yml
        schema_yml = self._generate_schema_yaml(model_name, model_sql)
        schema_path = f"{self.project_dir}/models/{model_name}.yml"
        with open(schema_path, 'w') as f:
            f.write(schema_yml)

        return model_sql

    def _generate_schema_yaml(self, model_name: str, model_sql: str) -> str:
        """Автогенерация dbt schema.yml с тестами"""
        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=500,
            messages=[{
                "role": "user",
                "content": f"""Generate dbt schema.yml for this model with data tests.

Model: {model_name}
SQL: {model_sql[:1000]}

Include: column descriptions, not_null tests, unique tests, accepted_values where relevant.
Return valid YAML."""
            }]
        )
        return response.content[0].text

Производительность системы

Задача Ручная работа С AI-системой Экономия
Новый источник данных 3-5 дней 4-6 часов 85%
ETL трансформация 1-2 дня 2-3 часа 80%
Правила качества 4-8 часов 30 минут 87%
Документация 1-2 дня 1-2 часа 88%
Диагностика сбоев 2-4 часа 15-30 минут 87%

Полноценная AI-система дата-инжиниринга разворачивается за 4-6 недель. Команда из 3 дата-инженеров с системой справляется с задачами, которые раньше требовали 7-8 человек.