Разработка AI-оркестратора агентов (Agent Orchestration)

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1Все 1566 услуг
Разработка AI-оркестратора агентов (Agent Orchestration)
Сложный
от 2 недель до 3 месяцев
Часто задаваемые вопросы

Направления AI-разработки

Этапы разработки AI-решения

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1284
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    901
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1119
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    853

Разработка AI-оркестратора агентов (Agent Orchestration)

Оркестратор агентов — система управления, координирующая работу нескольких специализированных AI-агентов: распределяет задачи, управляет потоком данных между агентами, отслеживает состояние выполнения, обрабатывает ошибки и обеспечивает согласованность результатов. Это верхний уровень мульти-агентной архитектуры.

Responsibilities оркестратора

Task Decomposition: разбивка сложной задачи на подзадачи для специализированных агентов.

Agent Selection: выбор подходящего агента для каждой подзадачи на основе возможностей и текущей нагрузки.

State Management: отслеживание состояния каждого агента и общего прогресса.

Error Handling: обработка сбоев агентов, retry-логика, fallback.

Result Aggregation: сборка результатов от разных агентов в единый ответ.

Реализация оркестратора с LangGraph

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated
from langchain_openai import ChatOpenAI
import operator
import json

class OrchestratorState(TypedDict):
    user_request: str
    task_plan: list[dict]           # [{task_id, description, agent, status, result}]
    current_task_index: int
    agent_results: Annotated[dict, lambda a, b: {**a, **b}]
    final_response: str
    error_count: int

llm = ChatOpenAI(model="gpt-4o", temperature=0)

# Реестр агентов
AGENT_REGISTRY = {
    "researcher": ResearcherAgent(),
    "analyst": AnalystAgent(),
    "writer": WriterAgent(),
    "sql_agent": SQLAgent(),
    "code_interpreter": CodeInterpreterAgent(),
    "file_processor": FileProcessorAgent(),
}

def plan_tasks(state: OrchestratorState) -> OrchestratorState:
    """Оркестратор разбивает задачу на подзадачи и назначает агентов"""

    available_agents = list(AGENT_REGISTRY.keys())

    response = llm.invoke(f"""
Разбей следующую задачу на подзадачи и назначь агента для каждой.
Доступные агенты: {available_agents}

Задача: {state["user_request"]}

Верни JSON-список:
[{{"task_id": "t1", "description": "...", "agent": "researcher", "dependencies": []}}]
Зависимости: список task_id, которые должны завершиться перед этой задачей.
""")

    task_plan = json.loads(response.content)
    for task in task_plan:
        task["status"] = "pending"
        task["result"] = None

    return {**state, "task_plan": task_plan, "current_task_index": 0}

def execute_next_task(state: OrchestratorState) -> OrchestratorState:
    """Выполняет следующую готовую задачу"""
    task_plan = state["task_plan"].copy()

    # Находим следующую задачу, все зависимости которой выполнены
    next_task = None
    for task in task_plan:
        if task["status"] == "pending":
            deps_completed = all(
                any(t["task_id"] == dep and t["status"] == "completed"
                    for t in task_plan)
                for dep in task.get("dependencies", [])
            )
            if deps_completed:
                next_task = task
                break

    if not next_task:
        return {**state, "current_task_index": -1}  # Все задачи завершены

    # Выполняем задачу через соответствующего агента
    agent = AGENT_REGISTRY.get(next_task["agent"])
    if not agent:
        next_task["status"] = "failed"
        next_task["result"] = f"Agent {next_task['agent']} not found"
    else:
        # Передаём результаты зависимостей как контекст
        dependency_results = {
            dep: state["agent_results"].get(dep)
            for dep in next_task.get("dependencies", [])
        }

        try:
            result = agent.execute(
                task=next_task["description"],
                context=dependency_results,
            )
            next_task["status"] = "completed"
            next_task["result"] = result
        except Exception as e:
            next_task["status"] = "failed"
            next_task["result"] = str(e)

    # Обновляем plan
    updated_plan = [
        task if task["task_id"] != next_task["task_id"] else next_task
        for task in task_plan
    ]

    return {
        **state,
        "task_plan": updated_plan,
        "agent_results": {next_task["task_id"]: next_task["result"]},
    }

def should_continue(state: OrchestratorState) -> str:
    """Определяет следующий шаг оркестратора"""
    pending = [t for t in state["task_plan"] if t["status"] == "pending"]
    failed = [t for t in state["task_plan"] if t["status"] == "failed"]

    if failed and state["error_count"] >= 3:
        return "finalize_with_errors"
    if not pending:
        return "aggregate_results"
    return "execute_next"

def aggregate_results(state: OrchestratorState) -> OrchestratorState:
    """Собирает результаты всех агентов в финальный ответ"""
    all_results = {t["task_id"]: t["result"] for t in state["task_plan"]}

    final = llm.invoke(f"""
На основе результатов от разных агентов сформируй финальный ответ.
Оригинальный запрос: {state["user_request"]}
Результаты: {json.dumps(all_results, ensure_ascii=False)}
""").content

    return {**state, "final_response": final}

# Построение графа
graph = StateGraph(OrchestratorState)
graph.add_node("plan", plan_tasks)
graph.add_node("execute_next", execute_next_task)
graph.add_node("aggregate_results", aggregate_results)

graph.set_entry_point("plan")
graph.add_edge("plan", "execute_next")
graph.add_conditional_edges("execute_next", should_continue, {
    "execute_next": "execute_next",
    "aggregate_results": "aggregate_results",
    "finalize_with_errors": "aggregate_results",
})
graph.add_edge("aggregate_results", END)

orchestrator = graph.compile(checkpointer=MemorySaver())

Parallel Task Execution

import asyncio

async def execute_parallel_tasks(tasks_batch: list[dict]) -> list[dict]:
    """Параллельное выполнение независимых задач"""
    coroutines = []
    for task in tasks_batch:
        agent = AGENT_REGISTRY.get(task["agent"])
        if agent:
            coroutines.append(asyncio.to_thread(agent.execute, task=task["description"]))

    results = await asyncio.gather(*coroutines, return_exceptions=True)

    for task, result in zip(tasks_batch, results):
        if isinstance(result, Exception):
            task["status"] = "failed"
            task["result"] = str(result)
        else:
            task["status"] = "completed"
            task["result"] = result

    return tasks_batch

Практический кейс: оркестратор для due diligence

Задача: автоматизированная проверка компании при M&A. Параллельная работа 5 агентов:

  1. Financial Agent: анализ 3 лет отчётности
  2. Legal Agent: проверка судебных споров, ограничений
  3. HR Agent: структура персонала, текучесть
  4. Market Agent: положение на рынке, конкуренты
  5. Risk Agent: синтез рисков из всех источников

Граф выполнения:

  • t1 (financial), t2 (legal), t3 (hr), t4 (market) — параллельно
  • t5 (risk) — зависит от t1, t2, t3, t4
  • t6 (final_report) — зависит от t5

Результаты:

  • Время DD: 4 недели → 3 дня
  • Покрытие аспектов: 78% → 94%
  • Стоимость одного DD: -71%

Мониторинг и трассировка

import mlflow

def log_orchestration_run(state: OrchestratorState):
    with mlflow.start_run():
        mlflow.log_metrics({
            "total_tasks": len(state["task_plan"]),
            "completed_tasks": sum(1 for t in state["task_plan"] if t["status"] == "completed"),
            "failed_tasks": sum(1 for t in state["task_plan"] if t["status"] == "failed"),
        })
        mlflow.log_text(json.dumps(state["task_plan"], indent=2), "task_execution_log.json")

Сроки

  • Проектирование оркестратора: 1–2 недели
  • Реализация базовых агентов (3–5): 3–5 недель
  • Интеграция параллельного выполнения: 1 неделя
  • Error handling и мониторинг: 1–2 недели
  • Итого: 6–10 недель