AI Dispatcher — Digital Worker for Task Coordination
AI Dispatcher automatically receives incoming requests, classifies them, assigns executors considering workload and competencies, tracks execution and escalates overdue tasks. Applied in technical support services, field operations, transport dispatching, maintenance request management.
Core: Intake and Request Routing
from openai import AsyncOpenAI
from pydantic import BaseModel
from typing import Literal, Optional
client = AsyncOpenAI()
class TaskClassification(BaseModel):
category: str
priority: Literal["critical", "high", "normal", "low"]
required_skill: str
estimated_duration_minutes: int
location: Optional[str]
sla_hours: float # SLA in hours
special_requirements: list[str]
class DispatcherAgent:
def __init__(self, team_db, task_db):
self.team_db = team_db
self.task_db = task_db
async def process_incoming_request(self, request: dict) -> dict:
"""Receives a request and assigns an executor"""
# Classification
classification = await client.beta.chat.completions.parse(
model="gpt-4o",
messages=[{
"role": "system",
"content": "Classify incoming request for dispatching. Determine priority, category, required competencies and expected execution time."
}, {
"role": "user",
"content": f"Request: {request['description']}\nFrom: {request['client']}\nContact: {request['contact']}"
}],
response_format=TaskClassification,
temperature=0,
)
task_class = classification.choices[0].message.parsed
# Select executor
assignee = await self.select_best_assignee(task_class, request.get("location"))
# Create task
task = await self.task_db.create({
"title": request.get("title", f"Request from {request['client']}"),
"description": request["description"],
"category": task_class.category,
"priority": task_class.priority,
"required_skill": task_class.required_skill,
"assignee_id": assignee["id"] if assignee else None,
"sla_deadline": datetime.now() + timedelta(hours=task_class.sla_hours),
"status": "assigned" if assignee else "pending",
})
# Notify executor
if assignee:
await self.notify_assignee(assignee, task)
return {"task_id": task["id"], "assignee": assignee, "sla": task["sla_deadline"]}
async def select_best_assignee(
self,
task: TaskClassification,
location: Optional[str],
) -> Optional[dict]:
"""Selects the optimal executor"""
available = await self.team_db.get_available(
skill=task.required_skill,
shift="current",
)
if not available:
return None
# Score executors
scored = []
for person in available:
score = 100
# Workload (fewer tasks = higher priority)
current_load = await self.task_db.count_active(person["id"])
score -= current_load * 10
# Proximity to location (for field tasks)
if location and person.get("current_location"):
distance = calculate_distance(location, person["current_location"])
score -= min(distance / 10, 30) # Maximum -30 for distance
# Specialization
if task.required_skill in person.get("specializations", []):
score += 20
scored.append({**person, "score": score})
# Best candidate
return max(scored, key=lambda x: x["score"]) if scored else None
SLA Monitoring and Escalations
class SLAMonitor:
async def check_and_escalate(self):
"""Checks SLA every 15 minutes"""
tasks = await self.task_db.get_active_tasks()
now = datetime.now()
for task in tasks:
sla_deadline = task["sla_deadline"]
time_to_sla = (sla_deadline - now).total_seconds() / 3600
if time_to_sla < 0:
# SLA breached
await self.handle_sla_breach(task)
elif time_to_sla < 0.5:
# Less than 30 minutes to SLA
await self.send_urgent_reminder(task)
elif time_to_sla < 1 and task["status"] == "assigned":
# Less than 1 hour to SLA, task not started
await self.escalate_to_supervisor(task, reason="task_not_started")
async def handle_sla_breach(self, task: dict):
"""Handles SLA breach"""
breach_message = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": "Create a message about SLA breach escalation for the manager. Brief, specific."
}, {
"role": "user",
"content": f"Task: {task['title']}, SLA breach by {abs(int((datetime.now() - task['sla_deadline']).total_seconds() / 60))} min"
}],
)
await slack_client.post_message(
channel="#dispatcher-escalations",
text=f"🚨 SLA breached:\n{breach_message.choices[0].message.content}",
)
await self.task_db.update(task["id"], {"sla_status": "breached"})
Practical Case Study: Telecom, 800 Requests/Day
Company: Telecom with field engineers, 800 requests for technical site visits per day.
AI Dispatcher:
- Request intake via chat, email, CRM API
- Classification: 8 types (connection, repair, equipment replacement, etc.)
- Engineer assignment considering skills, workload, geolocation
- SLA control: 4h for critical, 24h for standard
- Automatic engineer reminders
- Overdue task escalation to manager
Results:
- SLA compliance: 71% → 89%
- Task assignment time: 12 min → 45 sec
- Individual engineer overload: -34% (balanced distribution)
Timeline
- Request classifier: 1–2 weeks
- Executor assignment algorithm: 1–2 weeks
- SLA monitoring and escalations: 1 week
- CRM and notification integration: 1–2 weeks
- Total: 4–7 weeks







