Реализация Text-to-SQL интерфейса к базам данных
Text-to-SQL позволяет бизнес-пользователям получать данные из реляционных БД без знания SQL. Задача технически сложнее, чем кажется: нужно правильно обработать JOIN между 10+ таблицами, учесть диалект БД (PostgreSQL vs MySQL vs BigQuery), избежать дорогостоящих full-table scans и вернуть корректный результат при амбигуитивных запросах.
Архитектура Text-to-SQL системы
from anthropic import Anthropic
import sqlglot
import sqlparse
import pandas as pd
from dataclasses import dataclass
@dataclass
class TableSchema:
name: str
columns: list[dict] # [{name, type, description, example}]
row_count: int
sample_rows: list[dict]
foreign_keys: list[dict] # [{from_col, to_table, to_col}]
class TextToSQLEngine:
def __init__(self, db_connection, db_dialect: str = 'postgres'):
self.db = db_connection
self.dialect = db_dialect
self.llm = Anthropic()
self.schema = self._extract_full_schema()
self.query_history = []
def _extract_full_schema(self) -> dict[str, TableSchema]:
"""Автоматическое извлечение схемы из БД"""
if self.dialect == 'postgres':
return self._extract_postgres_schema()
elif self.dialect == 'mysql':
return self._extract_mysql_schema()
return {}
def _extract_postgres_schema(self) -> dict[str, TableSchema]:
tables = {}
# Получение списка таблиц
tables_df = pd.read_sql("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
""", self.db)
for table_name in tables_df['table_name']:
# Колонки с типами и комментариями
cols_df = pd.read_sql(f"""
SELECT
c.column_name,
c.data_type,
c.is_nullable,
col_description('{table_name}'::regclass, c.ordinal_position) as description
FROM information_schema.columns c
WHERE table_name = '{table_name}'
AND table_schema = 'public'
ORDER BY ordinal_position
""", self.db)
# FK связи
fks_df = pd.read_sql(f"""
SELECT
kcu.column_name as from_col,
ccu.table_name as to_table,
ccu.column_name as to_col
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage ccu
ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_name = '{table_name}'
""", self.db)
# Примеры данных
sample_df = pd.read_sql(
f"SELECT * FROM {table_name} LIMIT 3", self.db
)
row_count = pd.read_sql(
f"SELECT COUNT(*) as cnt FROM {table_name}", self.db
)['cnt'].iloc[0]
tables[table_name] = TableSchema(
name=table_name,
columns=cols_df.to_dict('records'),
row_count=int(row_count),
sample_rows=sample_df.to_dict('records'),
foreign_keys=fks_df.to_dict('records')
)
return tables
Генерация SQL с контекстом
def _build_schema_context(self, relevant_tables: list[str]) -> str:
"""Компактное представление схемы для LLM"""
lines = []
for table_name in relevant_tables:
if table_name not in self.schema:
continue
t = self.schema[table_name]
lines.append(f"Table: {table_name} ({t.row_count:,} rows)")
for col in t.columns:
desc = f" -- {col['description']}" if col.get('description') else ""
lines.append(f" {col['column_name']} {col['data_type']}{desc}")
for fk in t.foreign_keys:
lines.append(f" FK: {fk['from_col']} → {fk['to_table']}.{fk['to_col']}")
if t.sample_rows:
lines.append(f" Sample: {t.sample_rows[0]}")
lines.append("")
return "\n".join(lines)
def _select_relevant_tables(self, question: str) -> list[str]:
"""Выбор нужных таблиц через LLM"""
all_tables_desc = "\n".join([
f"- {name}: {[c['column_name'] for c in t.columns[:5]]}..."
for name, t in self.schema.items()
])
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=200,
messages=[{
"role": "user",
"content": f"""Tables available:
{all_tables_desc}
Question: {question}
List only the table names needed, comma-separated."""
}]
)
names = [n.strip() for n in response.content[0].text.split(',')]
return [n for n in names if n in self.schema]
def generate_sql(self, question: str) -> dict:
"""Генерация SQL из естественного языка"""
relevant_tables = self._select_relevant_tables(question)
schema_context = self._build_schema_context(relevant_tables)
# Учитываем историю для контекстных запросов ("а теперь по регионам")
conversation_context = ""
if self.query_history:
last = self.query_history[-1]
conversation_context = f"\nПредыдущий вопрос: {last['question']}\nПредыдущий SQL:\n{last['sql']}\n"
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=800,
system=f"""You are a SQL expert for {self.dialect}.
Generate syntactically correct SQL queries.
Return ONLY the SQL query, no explanations.
Use proper {self.dialect} syntax.
Avoid SELECT *. Always use column aliases for aggregates.
Limit results to 1000 rows unless user asks for aggregation.
Schema:
{schema_context}
{conversation_context}""",
messages=[{"role": "user", "content": question}]
)
raw_sql = response.content[0].text.strip()
# Убрать markdown-обёртку
if '```' in raw_sql:
raw_sql = raw_sql.split('```')[1]
if raw_sql.startswith('sql\n'):
raw_sql = raw_sql[4:]
return {
'sql': raw_sql,
'relevant_tables': relevant_tables,
'question': question
}
Валидация и безопасное выполнение
def validate_sql(self, sql: str) -> tuple[bool, str]:
"""Проверка SQL перед выполнением"""
try:
# Парсинг через sqlglot
parsed = sqlglot.parse_one(sql, dialect=self.dialect)
except Exception as e:
return False, f"Parse error: {e}"
# Проверка на опасные операции
sql_upper = sql.upper()
forbidden = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'TRUNCATE', 'ALTER', 'CREATE']
for keyword in forbidden:
if keyword in sql_upper:
return False, f"Forbidden operation: {keyword}"
# Проверка наличия LIMIT для non-aggregate запросов
if 'GROUP BY' not in sql_upper and 'LIMIT' not in sql_upper:
sql += "\nLIMIT 1000"
return True, sql
def execute(self, question: str) -> dict:
"""Полный pipeline: вопрос → результат"""
generation = self.generate_sql(question)
sql = generation['sql']
is_valid, validated_sql = self.validate_sql(sql)
if not is_valid:
# Попытка починить SQL
sql = self._fix_sql(sql, validated_sql)
is_valid, validated_sql = self.validate_sql(sql)
if not is_valid:
return {'error': validated_sql, 'sql': sql}
try:
df = pd.read_sql(validated_sql, self.db)
self.query_history.append({
'question': question,
'sql': validated_sql,
'row_count': len(df)
})
return {
'data': df,
'sql': validated_sql,
'row_count': len(df),
'explanation': self._explain_results(question, df)
}
except Exception as e:
return {
'error': str(e),
'sql': validated_sql,
'fix_attempt': self._fix_sql(validated_sql, str(e))
}
def _fix_sql(self, sql: str, error: str) -> str:
"""Попытка исправить SQL через LLM"""
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"""Fix this {self.dialect} SQL:
{sql}
Error: {error}
Return only the fixed SQL."""
}]
)
return response.content[0].text.strip()
Качество генерации по типу запроса
| Тип запроса | Точность | Примечание |
|---|---|---|
| Агрегации (SUM, COUNT, AVG) | 95%+ | Простые GROUP BY |
| Фильтрация с датами | 88% | Форматы дат — частая ошибка |
| JOIN 2 таблиц | 92% | С правильными FK в схеме |
| JOIN 3+ таблиц | 75% | Нужны примеры в промпте |
| Оконные функции | 70% | LAG, RANK, ROW_NUMBER |
| Рекурсивные CTE | 55% | Иерархии, деревья |
| Subquery оптимизация | 65% | Часто генерирует медленные N+1 |
Self-correction loop
При ошибке выполнения система автоматически запускает второй цикл генерации с текстом ошибки в контексте. На практике 85% ошибок исправляются с первой попытки. Критические ошибки (неправильные имена таблиц, отсутствующие колонки) встречаются реже при использовании полной схемы в промпте.
Типовой ROI: аналитик обрабатывает в 3-5 раз больше запросов в день. Бизнес-пользователи закрывают 70% простых запросов самостоятельно. Время онбординга новых сотрудников на аналитический стек сокращается с нескольких недель до 1-2 дней.







