AI-система автоматической генерации ETL-пайплайнов

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1Все 1566 услуг
AI-система автоматической генерации ETL-пайплайнов
Средний
~2-4 недели
Часто задаваемые вопросы

Направления AI-разработки

Этапы разработки AI-решения

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1284
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    901
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1119
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    853

Реализация AI-автогенерации ETL-пайплайнов

Автогенерация ETL — это когда дата-инженер описывает задачу на русском языке, а система генерирует готовый код пайплайна: Airflow DAG, dbt модели или Python скрипт. Снижает время от постановки задачи до работающего пайплайна с 1-3 дней до 2-4 часов.

Движок генерации пайплайнов

from anthropic import Anthropic
import json
import yaml
from dataclasses import dataclass

@dataclass
class PipelineSpec:
    name: str
    description: str
    source: dict     # {type, connection, table/path}
    target: dict     # {type, connection, table/path}
    transformations: list[str]
    schedule: str = "@daily"
    framework: str = "airflow"  # airflow, prefect, dbt, pandas

class ETLAutoGenerator:
    def __init__(self):
        self.llm = Anthropic()

    def generate_from_description(self, description: str,
                                   source_schema: dict = None,
                                   framework: str = "airflow") -> dict:
        """Генерация полного ETL из текстового описания"""
        # Шаг 1: Структурирование требований
        spec = self._parse_requirements(description, source_schema)

        # Шаг 2: Генерация кода
        if framework == "airflow":
            code = self._generate_airflow_dag(spec)
        elif framework == "dbt":
            code = self._generate_dbt_model(spec)
        elif framework == "prefect":
            code = self._generate_prefect_flow(spec)
        else:
            code = self._generate_pandas_script(spec)

        # Шаг 3: Тесты и документация
        tests = self._generate_tests(spec, code)
        docs = self._generate_documentation(spec)

        return {
            'spec': spec,
            'code': code,
            'tests': tests,
            'documentation': docs
        }

    def _parse_requirements(self, description: str,
                              schema: dict = None) -> PipelineSpec:
        """LLM структурирует текстовые требования"""
        schema_str = json.dumps(schema, indent=2) if schema else "Not provided"

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=600,
            messages=[{
                "role": "user",
                "content": f"""Parse this ETL requirement into a structured spec.

Description: {description}
Available schema: {schema_str}

Return JSON:
{{
  "name": "pipeline_snake_case_name",
  "description": "one sentence description",
  "source": {{
    "type": "postgres|mysql|s3|api|kafka",
    "table_or_path": "table or path name"
  }},
  "target": {{
    "type": "postgres|bigquery|s3|snowflake",
    "table_or_path": "output table"
  }},
  "transformations": [
    "list of transformation steps in order"
  ],
  "schedule": "cron expression or @daily/@hourly",
  "quality_checks": ["list of data quality validations needed"]
}}"""
            }]
        )

        try:
            data = json.loads(response.content[0].text)
            return PipelineSpec(
                name=data.get('name', 'generated_pipeline'),
                description=data.get('description', ''),
                source=data.get('source', {}),
                target=data.get('target', {}),
                transformations=data.get('transformations', []),
                schedule=data.get('schedule', '@daily')
            )
        except Exception:
            return PipelineSpec(
                name='generated_pipeline',
                description=description,
                source={},
                target={},
                transformations=[]
            )

    def _generate_airflow_dag(self, spec: PipelineSpec) -> str:
        """Генерация Airflow DAG"""
        transforms_str = "\n".join(f"- {t}" for t in spec.transformations)

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1500,
            system="""You are a senior data engineer. Generate production-quality Airflow 2.x DAG code.
Use TaskFlow API (@task decorator). Include: error handling, retries, SLA, proper connections.
Return only Python code.""",
            messages=[{
                "role": "user",
                "content": f"""Generate Airflow DAG for this pipeline:

Name: {spec.name}
Description: {spec.description}
Source: {json.dumps(spec.source)}
Target: {json.dumps(spec.target)}
Schedule: {spec.schedule}

Transformations to implement:
{transforms_str}

Include:
1. Proper imports
2. DAG configuration with retries=2, retry_delay=5min, SLA=1hour
3. Modular @task functions for each transformation step
4. Data quality validation task
5. Email alert on failure"""
            }]
        )
        return response.content[0].text

    def _generate_dbt_model(self, spec: PipelineSpec) -> dict:
        """Генерация dbt модели + schema.yml"""
        transforms_str = "\n".join(f"- {t}" for t in spec.transformations)

        sql_response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=800,
            messages=[{
                "role": "user",
                "content": f"""Generate a dbt SQL model.

Model name: {spec.name}
Description: {spec.description}
Source: {json.dumps(spec.source)}

Transformations:
{transforms_str}

Use dbt {{ config() }}, {{ ref() }}, {{ source() }} macros.
Include comments explaining each transformation."""
            }]
        )

        yaml_response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=500,
            messages=[{
                "role": "user",
                "content": f"""Generate dbt schema.yml for model "{spec.name}".
Include: description, column descriptions, not_null/unique/accepted_values tests.
Base on: {spec.description}
Return valid YAML."""
            }]
        )

        return {
            f"{spec.name}.sql": sql_response.content[0].text,
            f"{spec.name}.yml": yaml_response.content[0].text
        }

    def _generate_prefect_flow(self, spec: PipelineSpec) -> str:
        """Генерация Prefect 2.x Flow"""
        transforms_str = "\n".join(f"- {t}" for t in spec.transformations)

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1000,
            system="Generate Prefect 2.x flow code. Use @task and @flow decorators. Include retries and logging.",
            messages=[{
                "role": "user",
                "content": f"""Generate Prefect flow:
Name: {spec.name}
Source: {json.dumps(spec.source)}
Target: {json.dumps(spec.target)}
Transformations: {transforms_str}
Schedule: {spec.schedule}"""
            }]
        )
        return response.content[0].text

    def _generate_pandas_script(self, spec: PipelineSpec) -> str:
        """Простой Python/pandas скрипт для небольших датасетов"""
        transforms_str = "\n".join(f"- {t}" for t in spec.transformations)

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=800,
            system="Generate production Python ETL script. Include logging, error handling, type hints.",
            messages=[{
                "role": "user",
                "content": f"""Generate Python ETL script:
Source: {json.dumps(spec.source)}
Target: {json.dumps(spec.target)}
Transformations: {transforms_str}"""
            }]
        )
        return response.content[0].text

    def _generate_tests(self, spec: PipelineSpec, code: str) -> str:
        """Генерация unit тестов для пайплайна"""
        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=600,
            messages=[{
                "role": "user",
                "content": f"""Generate pytest unit tests for this ETL pipeline.

Pipeline description: {spec.description}
Code snippet: {code[:500]}

Include:
1. Tests for each transformation function
2. Edge cases (empty input, null values, duplicates)
3. Data type validation tests"""
            }]
        )
        return response.content[0].text

Итеративное уточнение через диалог

    def refine_pipeline(self, generated_code: str,
                         feedback: str) -> str:
        """Уточнение сгенерированного пайплайна через обратную связь"""
        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1000,
            messages=[
                {
                    "role": "user",
                    "content": f"Here's a generated ETL pipeline:\n\n{generated_code}"
                },
                {
                    "role": "assistant",
                    "content": "I've generated this ETL pipeline based on your requirements."
                },
                {
                    "role": "user",
                    "content": f"Please modify it: {feedback}"
                }
            ]
        )
        return response.content[0].text

Типичный workflow: описание задачи (5 минут) → генерация кода (2-3 минуты) → ревью и итерация (30-60 минут) → тест и деплой. Против традиционного: понимание требований (1 час) → разработка (1-2 дня) → тестирование (полдня). Экономия: 80-85% времени на типовые ETL-задачи.