Реализация Text-to-SQL AI-интерфейса к базе данных

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1Все 1566 услуг
Реализация Text-to-SQL AI-интерфейса к базе данных
Средний
~5 дней
Часто задаваемые вопросы

Направления AI-разработки

Этапы разработки AI-решения

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1281
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    901
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1119
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    853

Реализация Text-to-SQL интерфейса к базам данных

Text-to-SQL позволяет бизнес-пользователям получать данные из реляционных БД без знания SQL. Задача технически сложнее, чем кажется: нужно правильно обработать JOIN между 10+ таблицами, учесть диалект БД (PostgreSQL vs MySQL vs BigQuery), избежать дорогостоящих full-table scans и вернуть корректный результат при амбигуитивных запросах.

Архитектура Text-to-SQL системы

from anthropic import Anthropic
import sqlglot
import sqlparse
import pandas as pd
from dataclasses import dataclass

@dataclass
class TableSchema:
    name: str
    columns: list[dict]  # [{name, type, description, example}]
    row_count: int
    sample_rows: list[dict]
    foreign_keys: list[dict]  # [{from_col, to_table, to_col}]

class TextToSQLEngine:
    def __init__(self, db_connection, db_dialect: str = 'postgres'):
        self.db = db_connection
        self.dialect = db_dialect
        self.llm = Anthropic()
        self.schema = self._extract_full_schema()
        self.query_history = []

    def _extract_full_schema(self) -> dict[str, TableSchema]:
        """Автоматическое извлечение схемы из БД"""
        if self.dialect == 'postgres':
            return self._extract_postgres_schema()
        elif self.dialect == 'mysql':
            return self._extract_mysql_schema()
        return {}

    def _extract_postgres_schema(self) -> dict[str, TableSchema]:
        tables = {}

        # Получение списка таблиц
        tables_df = pd.read_sql("""
            SELECT table_name
            FROM information_schema.tables
            WHERE table_schema = 'public'
              AND table_type = 'BASE TABLE'
        """, self.db)

        for table_name in tables_df['table_name']:
            # Колонки с типами и комментариями
            cols_df = pd.read_sql(f"""
                SELECT
                    c.column_name,
                    c.data_type,
                    c.is_nullable,
                    col_description('{table_name}'::regclass, c.ordinal_position) as description
                FROM information_schema.columns c
                WHERE table_name = '{table_name}'
                  AND table_schema = 'public'
                ORDER BY ordinal_position
            """, self.db)

            # FK связи
            fks_df = pd.read_sql(f"""
                SELECT
                    kcu.column_name as from_col,
                    ccu.table_name as to_table,
                    ccu.column_name as to_col
                FROM information_schema.table_constraints tc
                JOIN information_schema.key_column_usage kcu
                    ON tc.constraint_name = kcu.constraint_name
                JOIN information_schema.constraint_column_usage ccu
                    ON ccu.constraint_name = tc.constraint_name
                WHERE tc.constraint_type = 'FOREIGN KEY'
                  AND tc.table_name = '{table_name}'
            """, self.db)

            # Примеры данных
            sample_df = pd.read_sql(
                f"SELECT * FROM {table_name} LIMIT 3", self.db
            )

            row_count = pd.read_sql(
                f"SELECT COUNT(*) as cnt FROM {table_name}", self.db
            )['cnt'].iloc[0]

            tables[table_name] = TableSchema(
                name=table_name,
                columns=cols_df.to_dict('records'),
                row_count=int(row_count),
                sample_rows=sample_df.to_dict('records'),
                foreign_keys=fks_df.to_dict('records')
            )

        return tables

Генерация SQL с контекстом

    def _build_schema_context(self, relevant_tables: list[str]) -> str:
        """Компактное представление схемы для LLM"""
        lines = []
        for table_name in relevant_tables:
            if table_name not in self.schema:
                continue
            t = self.schema[table_name]
            lines.append(f"Table: {table_name} ({t.row_count:,} rows)")

            for col in t.columns:
                desc = f" -- {col['description']}" if col.get('description') else ""
                lines.append(f"  {col['column_name']} {col['data_type']}{desc}")

            for fk in t.foreign_keys:
                lines.append(f"  FK: {fk['from_col']} → {fk['to_table']}.{fk['to_col']}")

            if t.sample_rows:
                lines.append(f"  Sample: {t.sample_rows[0]}")
            lines.append("")

        return "\n".join(lines)

    def _select_relevant_tables(self, question: str) -> list[str]:
        """Выбор нужных таблиц через LLM"""
        all_tables_desc = "\n".join([
            f"- {name}: {[c['column_name'] for c in t.columns[:5]]}..."
            for name, t in self.schema.items()
        ])

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=200,
            messages=[{
                "role": "user",
                "content": f"""Tables available:
{all_tables_desc}

Question: {question}

List only the table names needed, comma-separated."""
            }]
        )
        names = [n.strip() for n in response.content[0].text.split(',')]
        return [n for n in names if n in self.schema]

    def generate_sql(self, question: str) -> dict:
        """Генерация SQL из естественного языка"""
        relevant_tables = self._select_relevant_tables(question)
        schema_context = self._build_schema_context(relevant_tables)

        # Учитываем историю для контекстных запросов ("а теперь по регионам")
        conversation_context = ""
        if self.query_history:
            last = self.query_history[-1]
            conversation_context = f"\nПредыдущий вопрос: {last['question']}\nПредыдущий SQL:\n{last['sql']}\n"

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=800,
            system=f"""You are a SQL expert for {self.dialect}.
Generate syntactically correct SQL queries.
Return ONLY the SQL query, no explanations.
Use proper {self.dialect} syntax.
Avoid SELECT *. Always use column aliases for aggregates.
Limit results to 1000 rows unless user asks for aggregation.

Schema:
{schema_context}
{conversation_context}""",
            messages=[{"role": "user", "content": question}]
        )

        raw_sql = response.content[0].text.strip()
        # Убрать markdown-обёртку
        if '```' in raw_sql:
            raw_sql = raw_sql.split('```')[1]
            if raw_sql.startswith('sql\n'):
                raw_sql = raw_sql[4:]

        return {
            'sql': raw_sql,
            'relevant_tables': relevant_tables,
            'question': question
        }

Валидация и безопасное выполнение

    def validate_sql(self, sql: str) -> tuple[bool, str]:
        """Проверка SQL перед выполнением"""
        try:
            # Парсинг через sqlglot
            parsed = sqlglot.parse_one(sql, dialect=self.dialect)
        except Exception as e:
            return False, f"Parse error: {e}"

        # Проверка на опасные операции
        sql_upper = sql.upper()
        forbidden = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'TRUNCATE', 'ALTER', 'CREATE']
        for keyword in forbidden:
            if keyword in sql_upper:
                return False, f"Forbidden operation: {keyword}"

        # Проверка наличия LIMIT для non-aggregate запросов
        if 'GROUP BY' not in sql_upper and 'LIMIT' not in sql_upper:
            sql += "\nLIMIT 1000"

        return True, sql

    def execute(self, question: str) -> dict:
        """Полный pipeline: вопрос → результат"""
        generation = self.generate_sql(question)
        sql = generation['sql']

        is_valid, validated_sql = self.validate_sql(sql)
        if not is_valid:
            # Попытка починить SQL
            sql = self._fix_sql(sql, validated_sql)
            is_valid, validated_sql = self.validate_sql(sql)
            if not is_valid:
                return {'error': validated_sql, 'sql': sql}

        try:
            df = pd.read_sql(validated_sql, self.db)
            self.query_history.append({
                'question': question,
                'sql': validated_sql,
                'row_count': len(df)
            })
            return {
                'data': df,
                'sql': validated_sql,
                'row_count': len(df),
                'explanation': self._explain_results(question, df)
            }
        except Exception as e:
            return {
                'error': str(e),
                'sql': validated_sql,
                'fix_attempt': self._fix_sql(validated_sql, str(e))
            }

    def _fix_sql(self, sql: str, error: str) -> str:
        """Попытка исправить SQL через LLM"""
        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=500,
            messages=[{
                "role": "user",
                "content": f"""Fix this {self.dialect} SQL:
{sql}

Error: {error}

Return only the fixed SQL."""
            }]
        )
        return response.content[0].text.strip()

Качество генерации по типу запроса

Тип запроса Точность Примечание
Агрегации (SUM, COUNT, AVG) 95%+ Простые GROUP BY
Фильтрация с датами 88% Форматы дат — частая ошибка
JOIN 2 таблиц 92% С правильными FK в схеме
JOIN 3+ таблиц 75% Нужны примеры в промпте
Оконные функции 70% LAG, RANK, ROW_NUMBER
Рекурсивные CTE 55% Иерархии, деревья
Subquery оптимизация 65% Часто генерирует медленные N+1

Self-correction loop

При ошибке выполнения система автоматически запускает второй цикл генерации с текстом ошибки в контексте. На практике 85% ошибок исправляются с первой попытки. Критические ошибки (неправильные имена таблиц, отсутствующие колонки) встречаются реже при использовании полной схемы в промпте.

Типовой ROI: аналитик обрабатывает в 3-5 раз больше запросов в день. Бизнес-пользователи закрывают 70% простых запросов самостоятельно. Время онбординга новых сотрудников на аналитический стек сокращается с нескольких недель до 1-2 дней.