Разработка автономной AI-системы обработки запросов
Автономная система обработки запросов — AI-оркестратор, принимающий входящие запросы из различных каналов (email, форм, API, мессенджеров), классифицирующий их, извлекающий данные, исполняющий логику обработки и возвращающий ответ или создающий задачи в бизнес-системах — без участия оператора для типовых случаев.
В отличие от простого чат-бота или агента с одним инструментом, автономная система включает полный цикл: прием → понимание → обогащение данных → исполнение → уведомление → мониторинг.
Архитектура системы
Входные каналы: webhook (email-парсер), REST API, Telegram/WhatsApp Bot, web-форма.
Ядро обработки: LangGraph-граф с состоянием, классификатор, исполнители, агрегатор.
Выходные каналы: REST API внешних систем (CRM, ERP, Service Desk), email/push уведомления, очередь задач (Celery/Redis).
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from typing import TypedDict, Annotated, Optional
from datetime import datetime
import operator
class RequestState(TypedDict):
# Входящий запрос
raw_content: str
channel: str # "email", "api", "telegram", "form"
sender_id: str
received_at: datetime
# Классификация
request_type: Optional[str] # "support", "order", "complaint", "inquiry", "refund"
urgency: Optional[str] # "critical", "high", "normal", "low"
confidence: Optional[float]
# Обогащение
user_profile: Optional[dict]
related_entities: Optional[list] # Связанные заказы, договоры, тикеты
# Обработка
action_plan: Optional[list[dict]]
executed_actions: Annotated[list, operator.add]
requires_human: bool
human_reason: Optional[str]
# Результат
response_draft: Optional[str]
outcome: Optional[str]
processing_time_ms: Optional[int]
Классификатор запросов
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
from typing import Literal
class RequestClassification(BaseModel):
request_type: Literal["support_technical", "support_billing", "order_new",
"order_status", "complaint", "refund_request", "general_inquiry"]
urgency: Literal["critical", "high", "normal", "low"]
confidence: float
extracted_entities: dict # Номер заказа, email, сумма и т.д.
requires_human: bool
human_reason: Optional[str] = None
summary: str
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def classify_request(state: RequestState) -> RequestState:
result = llm.with_structured_output(RequestClassification).invoke(
f"""Классифицируй входящий запрос.
Канал: {state['channel']}
Запрос: {state['raw_content']}
Передай человеку если:
- Юридические угрозы или упоминание судебных разбирательств
- Запрос на возврат суммы > 50 000 руб
- Упоминание о физическом ущербе
- Эмоционально заряженный отзыв с публичными угрозами"""
)
return {
**state,
"request_type": result.request_type,
"urgency": result.urgency,
"confidence": result.confidence,
"requires_human": result.requires_human,
"human_reason": result.human_reason,
}
Обогащение данных
async def enrich_request(state: RequestState) -> RequestState:
"""Загружает контекст пользователя и связанные сущности"""
# Параллельная загрузка данных
user_task = asyncio.create_task(crm.get_user_profile(state["sender_id"]))
orders_task = asyncio.create_task(
order_service.get_recent_orders(state["sender_id"], limit=5)
)
tickets_task = asyncio.create_task(
helpdesk.get_open_tickets(state["sender_id"])
)
user_profile, orders, tickets = await asyncio.gather(
user_task, orders_task, tickets_task, return_exceptions=True
)
related_entities = []
if not isinstance(orders, Exception):
related_entities.extend([{"type": "order", **o} for o in orders])
if not isinstance(tickets, Exception):
related_entities.extend([{"type": "ticket", **t} for t in tickets])
return {
**state,
"user_profile": user_profile if not isinstance(user_profile, Exception) else {},
"related_entities": related_entities,
}
Планирование и исполнение действий
def plan_actions(state: RequestState) -> RequestState:
"""Агент составляет план действий на основе типа запроса"""
action_templates = {
"order_status": [
{"action": "query_order_db", "params": {"order_id": "{extracted_order_id}"}},
{"action": "generate_status_response", "params": {}},
{"action": "send_response", "params": {}},
],
"refund_request": [
{"action": "verify_refund_eligibility", "params": {}},
{"action": "create_refund_ticket", "params": {}},
{"action": "notify_finance_team", "params": {}},
{"action": "send_confirmation", "params": {}},
],
"support_technical": [
{"action": "search_knowledge_base", "params": {}},
{"action": "generate_solution", "params": {}},
{"action": "create_ticket_if_unsolved", "params": {}},
{"action": "send_response", "params": {}},
],
}
base_plan = action_templates.get(state["request_type"], [
{"action": "generate_generic_response", "params": {}},
{"action": "create_manual_review_task", "params": {}},
])
return {**state, "action_plan": base_plan}
async def execute_actions(state: RequestState) -> RequestState:
"""Последовательное выполнение запланированных действий"""
executed = []
for action in state["action_plan"]:
action_name = action["action"]
params = action.get("params", {})
try:
result = await action_registry[action_name](state, **params)
executed.append({"action": action_name, "status": "success", "result": result})
except Exception as e:
executed.append({"action": action_name, "status": "failed", "error": str(e)})
# Если критическое действие упало — передаём человеку
if action.get("critical", False):
return {**state, "executed_actions": executed, "requires_human": True,
"human_reason": f"Ошибка критического действия: {e}"}
return {**state, "executed_actions": executed}
Граф обработки
def route_after_classification(state: RequestState) -> str:
if state["requires_human"]:
return "escalate_to_human"
if state["confidence"] < 0.6:
return "escalate_to_human" # Низкая уверенность в классификации
return "enrich"
def route_after_enrichment(state: RequestState) -> str:
# Проверяем VIP-статус — VIP всегда получает приоритет
if state.get("user_profile", {}).get("tier") == "vip" and state["urgency"] in ("high", "critical"):
return "plan_premium"
return "plan"
graph = StateGraph(RequestState)
graph.add_node("classify", classify_request)
graph.add_node("enrich", enrich_request)
graph.add_node("plan", plan_actions)
graph.add_node("plan_premium", plan_premium_actions)
graph.add_node("execute", execute_actions)
graph.add_node("generate_response", generate_final_response)
graph.add_node("escalate_to_human", create_human_task)
graph.add_node("send_response", send_response_to_channel)
graph.set_entry_point("classify")
graph.add_conditional_edges("classify", route_after_classification)
graph.add_conditional_edges("enrich", route_after_enrichment)
graph.add_edge("plan", "execute")
graph.add_edge("plan_premium", "execute")
graph.add_edge("execute", "generate_response")
graph.add_edge("generate_response", "send_response")
graph.add_edge("send_response", END)
graph.add_edge("escalate_to_human", END)
processor = graph.compile(checkpointer=PostgresSaver(conn))
Практический кейс: e-commerce, 2500 запросов/день
Компания: онлайн-ретейлер, 2500 входящих запросов в сутки, 12 операторов.
До системы: среднее время первого ответа 4.2 часа, ночные смены с низкой загрузкой, операторы тратили 60% времени на типовые статусные запросы.
Типы запросов в потоке:
- Статус заказа / трекинг — 41%
- Вопросы о возвратах — 19%
- Технические проблемы — 14%
- Общие вопросы о товарах — 17%
- Жалобы и претензии — 9%
После внедрения системы:
- Автономная обработка без участия оператора: 74%
- Среднее время первого ответа: 4.2ч → 2.1 минуты
- Ночная смена: сокращена с 4 до 1 оператора (мониторинг эскалаций)
- Точность ответов (оценка на выборке 500 запросов): 94.1%
- Ложные эскалации (ненужные передачи человеку): 8.3%
- Ошибочное автоматическое закрытие (требовал человека): 2.1%
Проблемы при запуске: первые 2 недели — дообучение классификатора на реальных данных компании. Изначальная точность классификации 81% → 94% после 500 корректировок.
Мониторинг и SLA-метрики
class RequestMetrics:
"""Метрики для мониторинга системы"""
def track_request(self, state: RequestState):
labels = {
"channel": state["channel"],
"request_type": state["request_type"],
"outcome": "automated" if not state["requires_human"] else "escalated",
}
request_counter.labels(**labels).inc()
processing_time.labels(**labels).observe(state["processing_time_ms"] / 1000)
if state["requires_human"]:
escalation_reason_counter.labels(
reason=state["human_reason"]
).inc()
Сроки
- Архитектура системы и граф: 1–2 недели
- Классификатор + обогащение данных: 2–3 недели
- Исполнители для каждого типа запросов: 2–4 недели
- Интеграция с каналами (email, мессенджеры): 1–2 недели
- Калибровка и запуск в production: 2 недели
- Итого: 8–13 недель







