Реализация AI-пайплайна обработки данных ETL и AI

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1Все 1566 услуг
Реализация AI-пайплайна обработки данных ETL и AI
Средний
~1-2 недели
Часто задаваемые вопросы

Направления AI-разработки

Этапы разработки AI-решения

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1284
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    901
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1119
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    853

Реализация AI-ETL пайплайна обработки данных

Классический ETL не справляется с неструктурированными данными: PDF с таблицами, HTML с динамическим контентом, изображения с данными, аудио-транскрипты. AI-ETL добавляет слой понимания: извлечение данных из произвольных форматов, нормализация через LLM и интеллектуальная валидация с объяснением ошибок.

Архитектура AI-ETL

from anthropic import Anthropic
import pandas as pd
import json
from dataclasses import dataclass
from typing import Any, Callable
import logging

@dataclass
class ETLStep:
    name: str
    func: Callable
    depends_on: list[str] = None
    retry_on_failure: bool = True
    max_retries: int = 3

class AIETLPipeline:
    def __init__(self, pipeline_name: str):
        self.name = pipeline_name
        self.llm = Anthropic()
        self.steps = []
        self.context = {}  # Данные между шагами
        self.metrics = {}
        self.logger = logging.getLogger(pipeline_name)

    def add_step(self, step: ETLStep):
        self.steps.append(step)

    def run(self, initial_data: Any) -> dict:
        """Выполнение пайплайна"""
        self.context['input'] = initial_data
        errors = []

        for step in self.steps:
            try:
                self.logger.info(f"Running step: {step.name}")
                input_data = self.context.get(
                    step.depends_on[0] if step.depends_on else 'input'
                )

                result = step.func(input_data, self.context)
                self.context[step.name] = result
                self.metrics[step.name] = {'status': 'success'}

            except Exception as e:
                self.logger.error(f"Step {step.name} failed: {e}")
                errors.append({'step': step.name, 'error': str(e)})

                if step.retry_on_failure:
                    # AI-assisted recovery
                    fixed_result = self._ai_recover(step, input_data, str(e))
                    if fixed_result is not None:
                        self.context[step.name] = fixed_result
                        self.metrics[step.name] = {'status': 'recovered'}
                        continue

                self.metrics[step.name] = {'status': 'failed', 'error': str(e)}
                break

        return {'context': self.context, 'metrics': self.metrics, 'errors': errors}

    def _ai_recover(self, step: ETLStep, input_data: Any, error: str) -> Any:
        """Попытка восстановления после ошибки через LLM"""
        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=400,
            messages=[{
                "role": "user",
                "content": f"""ETL step "{step.name}" failed.
Error: {error}
Input data type: {type(input_data).__name__}
Input sample: {str(input_data)[:500]}

Suggest recovery: should we skip this step, use default values, or transform input differently?
Respond with JSON: {{"action": "skip|default|transform", "reason": "...", "default_value": ...}}"""
            }]
        )
        try:
            decision = json.loads(response.content[0].text)
            if decision['action'] == 'skip':
                return input_data  # Pass through unchanged
            elif decision['action'] == 'default':
                return decision.get('default_value')
        except Exception:
            pass
        return None

Извлечение данных из неструктурированных источников

class AIExtractor:
    """Извлечение структурированных данных из произвольных форматов"""

    def __init__(self):
        self.llm = Anthropic()

    def extract_from_pdf(self, pdf_path: str, schema: dict) -> list[dict]:
        """PDF → структурированные записи"""
        import pdfplumber

        all_records = []

        with pdfplumber.open(pdf_path) as pdf:
            for page_num, page in enumerate(pdf.pages):
                # Таблицы
                for table in page.extract_tables():
                    if table and len(table) > 1:
                        df = pd.DataFrame(table[1:], columns=table[0])
                        records = self._normalize_table_with_ai(df, schema)
                        all_records.extend(records)

                # Текст
                text = page.extract_text()
                if text and len(text) > 100:
                    text_records = self._extract_from_text(text, schema)
                    all_records.extend(text_records)

        return all_records

    def _extract_from_text(self, text: str, schema: dict) -> list[dict]:
        """LLM-извлечение по схеме из произвольного текста"""
        schema_str = json.dumps(schema, ensure_ascii=False, indent=2)

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=800,
            messages=[{
                "role": "user",
                "content": f"""Extract structured data from this text according to the schema.
Return JSON array of records. Use null for missing fields.

Schema:
{schema_str}

Text:
{text[:2000]}

Return only JSON array."""
            }]
        )

        try:
            text_response = response.content[0].text.strip()
            if '```' in text_response:
                text_response = text_response.split('```')[1]
                if text_response.startswith('json\n'):
                    text_response = text_response[5:]
            return json.loads(text_response)
        except Exception:
            return []

    def _normalize_table_with_ai(self, df: pd.DataFrame, schema: dict) -> list[dict]:
        """Нормализация таблицы с нестандартными заголовками"""
        columns_str = ", ".join(df.columns.tolist())
        schema_fields = list(schema.keys())

        # Маппинг колонок
        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=200,
            messages=[{
                "role": "user",
                "content": f"""Map these table columns to schema fields.

Table columns: {columns_str}
Schema fields: {', '.join(schema_fields)}

Return JSON object: {{"table_column": "schema_field"}}. Use null for unmapped."""
            }]
        )

        try:
            column_map = json.loads(response.content[0].text)
            df_renamed = df.rename(columns={k: v for k, v in column_map.items() if v})
            return df_renamed[schema_fields].where(df_renamed.notna(), None).to_dict('records')
        except Exception:
            return df.to_dict('records')

Трансформации с AI-валидацией

class AITransformer:
    """Умные трансформации с объяснением аномалий"""

    def __init__(self):
        self.llm = Anthropic()

    def clean_and_normalize(self, df: pd.DataFrame,
                             business_rules: list[str]) -> dict:
        """Очистка + AI-объяснение найденных проблем"""
        issues = []
        original_count = len(df)

        # Стандартные проверки
        nulls = df.isnull().sum()
        duplicates = df.duplicated().sum()

        if nulls.sum() > 0:
            issues.append(f"Null values: {nulls[nulls > 0].to_dict()}")

        if duplicates > 0:
            issues.append(f"Duplicate rows: {duplicates}")

        # Проверка бизнес-правил через LLM
        if business_rules and len(df) > 0:
            sample = df.head(5).to_string()
            rules_str = "\n".join(f"- {r}" for r in business_rules)

            response = self.llm.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=400,
                messages=[{
                    "role": "user",
                    "content": f"""Check these data quality rules against the sample data.

Business rules:
{rules_str}

Data sample:
{sample}

List violations found (if any), be specific with row/column references.
If no violations, say "No violations found"."""
                }]
            )
            rule_check = response.content[0].text
            if "No violations" not in rule_check:
                issues.append(f"Business rule violations: {rule_check}")

        # Автоочистка
        df_clean = df.drop_duplicates()
        df_clean = df_clean.dropna(subset=[col for col in df.columns
                                           if df[col].isnull().mean() < 0.5])

        return {
            'data': df_clean,
            'original_count': original_count,
            'cleaned_count': len(df_clean),
            'removed': original_count - len(df_clean),
            'issues': issues,
            'quality_score': 1 - len(issues) * 0.1
        }

Мониторинг пайплайна

class ETLMonitor:
    """Метрики и алертинг для AI-ETL"""

    def generate_run_report(self, pipeline_result: dict,
                             expected_records: int = None) -> str:
        metrics = pipeline_result['metrics']
        errors = pipeline_result['errors']

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=300,
            messages=[{
                "role": "user",
                "content": f"""Summarize ETL run results for ops team.

Pipeline steps: {json.dumps(metrics)}
Errors: {errors}
Expected records: {expected_records}

Give: status (OK/WARNING/FAILED), key issues, recommended actions. 3-5 sentences."""
            }]
        )
        return response.content[0].text

AI-ETL типично снижает время разработки трансформаций для новых источников данных с 2-3 дней до 4-8 часов. Автовосстановление после ошибок обрабатывает 70-80% типовых сбоев (изменение схемы источника, временные проблемы сети) без участия инженера.