Розробка AI-оркестратора агентів (Agent Orchestration)

Проектуємо та впроваджуємо системи штучного інтелекту: від прототипу до production-ready рішення. Наша команда поєднує експертизу в машинному навчанні, дата-інжинірингу та MLOps, щоб AI працював не в лабораторії, а в реальному бізнесі.
Показано 1 з 1Усі 1566 послуг
Розробка AI-оркестратора агентів (Agent Orchestration)
Складний
від 2 тижнів до 3 місяців
Часті запитання

Напрямки AI-розробки

Етапи розробки AI-рішення

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1284
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    901
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1119
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    853

Розробка систем оркестрації AI-агентів

Оркестратор агентів — система управління, що координує роботу кількох спеціалізованих AI-агентів: розподіляє завдання, управляє потоком даних між агентами, відслідковує стан виконання, обробляє помилки та забезпечує узгодженість результатів. Це верхній рівень мультиагентної архітектури.

Обов'язки оркестратора

Task Decomposition: розбивка складних завдань на підзавдання для спеціалізованих агентів.

Agent Selection: вибір підходящих агентів для кожного підзавдання на основі можливостей та поточного навантаження.

State Management: відслідковування стану кожного агента та загального прогресу.

Error Handling: обробка помилок агентів, retry-логіка, fallback-стратегії.

Result Aggregation: збирання результатів від різних агентів в один відповідь.

Реалізація оркестратора з LangGraph

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated
from langchain_openai import ChatOpenAI
import operator
import json

class OrchestratorState(TypedDict):
    user_request: str
    task_plan: list[dict]           # [{task_id, description, agent, status, result}]
    current_task_index: int
    agent_results: Annotated[dict, lambda a, b: {**a, **b}]
    final_response: str
    error_count: int

llm = ChatOpenAI(model="gpt-4o", temperature=0)

# Реєстр агентів
AGENT_REGISTRY = {
    "researcher": ResearcherAgent(),
    "analyst": AnalystAgent(),
    "writer": WriterAgent(),
    "sql_agent": SQLAgent(),
    "code_interpreter": CodeInterpreterAgent(),
    "file_processor": FileProcessorAgent(),
}

def plan_tasks(state: OrchestratorState) -> OrchestratorState:
    """Оркестратор розбиває завдання на підзавдання та призначає агентів"""

    available_agents = list(AGENT_REGISTRY.keys())

    response = llm.invoke(f"""
Розбий наступне завдання на підзавдання та признач агента для кожного.
Доступні агенти: {available_agents}

Завдання: {state["user_request"]}

Верни JSON-список:
[{{"task_id": "t1", "description": "...", "agent": "researcher", "dependencies": []}}]
Залежності: список task_id, які мають завершитися перед цим завданням.
""")

    task_plan = json.loads(response.content)
    for task in task_plan:
        task["status"] = "pending"
        task["result"] = None

    return {**state, "task_plan": task_plan, "current_task_index": 0}

def execute_next_task(state: OrchestratorState) -> OrchestratorState:
    """Виконай наступне готове завдання"""
    task_plan = state["task_plan"].copy()

    # Знайди наступне завдання з усіма виконаними залежностями
    next_task = None
    for task in task_plan:
        if task["status"] == "pending":
            deps_completed = all(
                any(t["task_id"] == dep and t["status"] == "completed"
                    for t in task_plan)
                for dep in task.get("dependencies", [])
            )
            if deps_completed:
                next_task = task
                break

    if not next_task:
        return {**state, "current_task_index": -1}  # Усі завдання завершені

    # Виконай завдання через відповідного агента
    agent = AGENT_REGISTRY.get(next_task["agent"])
    if not agent:
        next_task["status"] = "failed"
        next_task["result"] = f"Agent {next_task['agent']} not found"
    else:
        # Передай результати залежностей як контекст
        dependency_results = {
            dep: state["agent_results"].get(dep)
            for dep in next_task.get("dependencies", [])
        }

        try:
            result = agent.execute(
                task=next_task["description"],
                context=dependency_results,
            )
            next_task["status"] = "completed"
            next_task["result"] = result
        except Exception as e:
            next_task["status"] = "failed"
            next_task["result"] = str(e)

    # Оновлюємо план
    updated_plan = [
        task if task["task_id"] != next_task["task_id"] else next_task
        for task in task_plan
    ]

    return {
        **state,
        "task_plan": updated_plan,
        "agent_results": {next_task["task_id"]: next_task["result"]},
    }

def should_continue(state: OrchestratorState) -> str:
    """Визнач наступний крок оркестратора"""
    pending = [t for t in state["task_plan"] if t["status"] == "pending"]
    failed = [t for t in state["task_plan"] if t["status"] == "failed"]

    if failed and state["error_count"] >= 3:
        return "finalize_with_errors"
    if not pending:
        return "aggregate_results"
    return "execute_next"

def aggregate_results(state: OrchestratorState) -> OrchestratorState:
    """Збери результати всіх агентів в фінальну відповідь"""
    all_results = {t["task_id"]: t["result"] for t in state["task_plan"]}

    final = llm.invoke(f"""
На основі результатів від різних агентів сформуй фінальну відповідь.
Оригінальний запит: {state["user_request"]}
Результати: {json.dumps(all_results)}
""").content

    return {**state, "final_response": final}

# Побудуй граф
graph = StateGraph(OrchestratorState)
graph.add_node("plan", plan_tasks)
graph.add_node("execute_next", execute_next_task)
graph.add_node("aggregate_results", aggregate_results)

graph.set_entry_point("plan")
graph.add_edge("plan", "execute_next")
graph.add_conditional_edges("execute_next", should_continue, {
    "execute_next": "execute_next",
    "aggregate_results": "aggregate_results",
    "finalize_with_errors": "aggregate_results",
})
graph.add_edge("aggregate_results", END)

orchestrator = graph.compile(checkpointer=MemorySaver())

Паралельне виконання завдань

import asyncio

async def execute_parallel_tasks(tasks_batch: list[dict]) -> list[dict]:
    """Виконай незалежні завдання паралельно"""
    coroutines = []
    for task in tasks_batch:
        agent = AGENT_REGISTRY.get(task["agent"])
        if agent:
            coroutines.append(asyncio.to_thread(agent.execute, task=task["description"]))

    results = await asyncio.gather(*coroutines, return_exceptions=True)

    for task, result in zip(tasks_batch, results):
        if isinstance(result, Exception):
            task["status"] = "failed"
            task["result"] = str(result)
        else:
            task["status"] = "completed"
            task["result"] = result

    return tasks_batch

Практичний кейс: оркестратор для due diligence

Завдання: автоматизована перевірка компанії при M&A. Паралельна робота 5 агентів:

  1. Financial Agent: аналіз 3 років звітності
  2. Legal Agent: перевірка судових спорів, обмежень
  3. HR Agent: структура персоналу, плинність
  4. Market Agent: становище на ринку, конкуренти
  5. Risk Agent: синтез ризиків з усіх джерел

Граф виконання:

  • t1 (financial), t2 (legal), t3 (hr), t4 (market) — паралельно
  • t5 (risk) — залежить від t1, t2, t3, t4
  • t6 (final_report) — залежить від t5

Результати:

  • Часовий розклад DD: 4 тижні → 3 дні
  • Охоплення аспектів: 78% → 94%
  • Вартість одного DD: -71%

Моніторинг та трасування

import mlflow

def log_orchestration_run(state: OrchestratorState):
    with mlflow.start_run():
        mlflow.log_metrics({
            "total_tasks": len(state["task_plan"]),
            "completed_tasks": sum(1 for t in state["task_plan"] if t["status"] == "completed"),
            "failed_tasks": sum(1 for t in state["task_plan"] if t["status"] == "failed"),
        })
        mlflow.log_text(json.dumps(state["task_plan"], indent=2), "task_execution_log.json")

Часовий розклад

  • Проектування оркестратора: 1–2 тижні
  • Реалізація базових агентів (3–5): 3–5 тижнів
  • Інтеграція паралельного виконання: 1 тиждень
  • Обробка помилок та моніторинг: 1–2 тижні
  • Всього: 6–10 тижнів