Development of Autonomous AI Request Processing System
An autonomous request processing system is an AI-orchestrator that accepts incoming requests from various channels (email, forms, API, messengers), classifies them, extracts data, executes processing logic, and returns a response or creates tasks in business systems—without operator involvement for standard cases.
Unlike a simple chatbot or single-tool agent, an autonomous system includes a complete cycle: intake → understanding → data enrichment → execution → notification → monitoring.
System Architecture
Input Channels: webhook (email parser), REST API, Telegram/WhatsApp Bot, web form.
Processing Core: LangGraph state graph, classifier, executors, aggregator.
Output Channels: external system REST APIs (CRM, ERP, Service Desk), email/push notifications, task queue (Celery/Redis).
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from typing import TypedDict, Annotated, Optional
from datetime import datetime
import operator
class RequestState(TypedDict):
# Incoming request
raw_content: str
channel: str # "email", "api", "telegram", "form"
sender_id: str
received_at: datetime
# Classification
request_type: Optional[str] # "support", "order", "complaint", "inquiry", "refund"
urgency: Optional[str] # "critical", "high", "normal", "low"
confidence: Optional[float]
# Enrichment
user_profile: Optional[dict]
related_entities: Optional[list] # Related orders, contracts, tickets
# Processing
action_plan: Optional[list[dict]]
executed_actions: Annotated[list, operator.add]
requires_human: bool
human_reason: Optional[str]
# Result
response_draft: Optional[str]
outcome: Optional[str]
processing_time_ms: Optional[int]
Request Classifier
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
from typing import Literal
class RequestClassification(BaseModel):
request_type: Literal["support_technical", "support_billing", "order_new",
"order_status", "complaint", "refund_request", "general_inquiry"]
urgency: Literal["critical", "high", "normal", "low"]
confidence: float
extracted_entities: dict # Order number, email, amount, etc.
requires_human: bool
human_reason: Optional[str] = None
summary: str
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def classify_request(state: RequestState) -> RequestState:
result = llm.with_structured_output(RequestClassification).invoke(
f"""Classify the incoming request.
Channel: {state['channel']}
Request: {state['raw_content']}
Escalate to human if:
- Legal threats or mention of litigation
- Refund request for amount > 50,000 rubles
- Mention of physical damage
- Emotionally charged review with public threats"""
)
return {
**state,
"request_type": result.request_type,
"urgency": result.urgency,
"confidence": result.confidence,
"requires_human": result.requires_human,
"human_reason": result.human_reason,
}
Data Enrichment
async def enrich_request(state: RequestState) -> RequestState:
"""Loads user context and related entities"""
# Parallel data loading
user_task = asyncio.create_task(crm.get_user_profile(state["sender_id"]))
orders_task = asyncio.create_task(
order_service.get_recent_orders(state["sender_id"], limit=5)
)
tickets_task = asyncio.create_task(
helpdesk.get_open_tickets(state["sender_id"])
)
user_profile, orders, tickets = await asyncio.gather(
user_task, orders_task, tickets_task, return_exceptions=True
)
related_entities = []
if not isinstance(orders, Exception):
related_entities.extend([{"type": "order", **o} for o in orders])
if not isinstance(tickets, Exception):
related_entities.extend([{"type": "ticket", **t} for t in tickets])
return {
**state,
"user_profile": user_profile if not isinstance(user_profile, Exception) else {},
"related_entities": related_entities,
}
Action Planning and Execution
def plan_actions(state: RequestState) -> RequestState:
"""Agent creates action plan based on request type"""
action_templates = {
"order_status": [
{"action": "query_order_db", "params": {"order_id": "{extracted_order_id}"}},
{"action": "generate_status_response", "params": {}},
{"action": "send_response", "params": {}},
],
"refund_request": [
{"action": "verify_refund_eligibility", "params": {}},
{"action": "create_refund_ticket", "params": {}},
{"action": "notify_finance_team", "params": {}},
{"action": "send_confirmation", "params": {}},
],
"support_technical": [
{"action": "search_knowledge_base", "params": {}},
{"action": "generate_solution", "params": {}},
{"action": "create_ticket_if_unsolved", "params": {}},
{"action": "send_response", "params": {}},
],
}
base_plan = action_templates.get(state["request_type"], [
{"action": "generate_generic_response", "params": {}},
{"action": "create_manual_review_task", "params": {}},
])
return {**state, "action_plan": base_plan}
async def execute_actions(state: RequestState) -> RequestState:
"""Sequential execution of planned actions"""
executed = []
for action in state["action_plan"]:
action_name = action["action"]
params = action.get("params", {})
try:
result = await action_registry[action_name](state, **params)
executed.append({"action": action_name, "status": "success", "result": result})
except Exception as e:
executed.append({"action": action_name, "status": "failed", "error": str(e)})
# If critical action fails — escalate to human
if action.get("critical", False):
return {**state, "executed_actions": executed, "requires_human": True,
"human_reason": f"Critical action error: {e}"}
return {**state, "executed_actions": executed}
Processing Graph
def route_after_classification(state: RequestState) -> str:
if state["requires_human"]:
return "escalate_to_human"
if state["confidence"] < 0.6:
return "escalate_to_human" # Low classification confidence
return "enrich"
def route_after_enrichment(state: RequestState) -> str:
# Check VIP status — VIP always gets priority
if state.get("user_profile", {}).get("tier") == "vip" and state["urgency"] in ("high", "critical"):
return "plan_premium"
return "plan"
graph = StateGraph(RequestState)
graph.add_node("classify", classify_request)
graph.add_node("enrich", enrich_request)
graph.add_node("plan", plan_actions)
graph.add_node("plan_premium", plan_premium_actions)
graph.add_node("execute", execute_actions)
graph.add_node("generate_response", generate_final_response)
graph.add_node("escalate_to_human", create_human_task)
graph.add_node("send_response", send_response_to_channel)
graph.set_entry_point("classify")
graph.add_conditional_edges("classify", route_after_classification)
graph.add_conditional_edges("enrich", route_after_enrichment)
graph.add_edge("plan", "execute")
graph.add_edge("plan_premium", "execute")
graph.add_edge("execute", "generate_response")
graph.add_edge("generate_response", "send_response")
graph.add_edge("send_response", END)
graph.add_edge("escalate_to_human", END)
processor = graph.compile(checkpointer=PostgresSaver(conn))
Practical Case: E-commerce, 2500 requests/day
Company: online retailer, 2500 incoming requests daily, 12 operators.
Before system: average first response time 4.2 hours, night shifts with low load, operators spent 60% of time on standard status requests.
Types of requests in the stream:
- Order status / tracking — 41%
- Return inquiries — 19%
- Technical issues — 14%
- General product questions — 17%
- Complaints and claims — 9%
After system implementation:
- Autonomous processing without operator involvement: 74%
- Average first response time: 4.2h → 2.1 minutes
- Night shift: reduced from 4 to 1 operator (monitoring escalations)
- Response accuracy (sample of 500 requests): 94.1%
- False escalations (unnecessary human transfers): 8.3%
- Incorrect auto-closure (required human): 2.1%
Launch challenges: first 2 weeks — retraining classifier on real company data. Initial classification accuracy 81% → 94% after 500 corrections.
Monitoring and SLA Metrics
class RequestMetrics:
"""Metrics for system monitoring"""
def track_request(self, state: RequestState):
labels = {
"channel": state["channel"],
"request_type": state["request_type"],
"outcome": "automated" if not state["requires_human"] else "escalated",
}
request_counter.labels(**labels).inc()
processing_time.labels(**labels).observe(state["processing_time_ms"] / 1000)
if state["requires_human"]:
escalation_reason_counter.labels(
reason=state["human_reason"]
).inc()
Timeline
- System architecture and graph: 1–2 weeks
- Classifier + data enrichment: 2–3 weeks
- Executors for each request type: 2–4 weeks
- Integration with channels (email, messengers): 1–2 weeks
- Calibration and production launch: 2 weeks
- Total: 8–13 weeks







