Разработка мульти-агентной AI-системы
Мульти-агентная система (MAS) — архитектура, в которой несколько специализированных AI-агентов взаимодействуют для выполнения сложных задач, недоступных одному агенту. Разделение ответственности снижает сложность каждого агента, повышает качество в специализированных задачах и позволяет масштабировать систему горизонтально.
Архитектуры мульти-агентных систем
Supervisor (Оркестратор): центральный агент распределяет задачи между специализированными агентами и агрегирует результаты. Хорошо управляем, но есть узкое место в оркестраторе.
Peer-to-peer (P2P): агенты общаются напрямую, без центрального координатора. Более устойчив к отказам, сложнее в отладке.
Hierarchical (Иерархический): многоуровневая структура — агенты верхнего уровня управляют агентами нижнего уровня. Используется для очень сложных рабочих процессов.
Pipeline: каждый агент выполняет свою часть задачи и передаёт результат следующему. Простой, предсказуемый, ограничен линейностью.
Реализация Supervisor паттерна с LangGraph
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Literal
import operator
from langchain_core.messages import HumanMessage
class MultiAgentState(TypedDict):
messages: list
current_task: str
task_result: str
next_agent: str
# Специализированные агенты
def researcher_agent(state: MultiAgentState) -> MultiAgentState:
"""Агент для поиска информации"""
llm = ChatOpenAI(model="gpt-4o")
task = state["current_task"]
# Retrieval + анализ
docs = retriever.invoke(task)
context = "\n".join([d.page_content for d in docs])
result = llm.invoke([
HumanMessage(content=f"Задача исследования: {task}\n\nКонтекст:\n{context}\n\nВыдай ключевые факты:")
]).content
return {**state, "task_result": result, "next_agent": "writer"}
def writer_agent(state: MultiAgentState) -> MultiAgentState:
"""Агент для написания текста"""
llm = ChatOpenAI(model="gpt-4o")
research = state["task_result"]
original_task = state["current_task"]
result = llm.invoke([
HumanMessage(content=f"Напиши ответ на задачу: {original_task}\n\nМатериалы: {research}")
]).content
return {**state, "task_result": result, "next_agent": "reviewer"}
def reviewer_agent(state: MultiAgentState) -> MultiAgentState:
"""Агент для проверки качества"""
llm = ChatOpenAI(model="gpt-4o")
draft = state["task_result"]
review = llm.invoke([
HumanMessage(content=f"""Проверь следующий текст на:
1. Фактические ошибки
2. Полноту ответа
3. Структуру и ясность
Текст: {draft}
Если всё в порядке, ответь "APPROVED". Иначе укажи конкретные правки.""")
]).content
if "APPROVED" in review:
return {**state, "next_agent": "complete"}
else:
return {**state, "task_result": review, "next_agent": "writer"}
def supervisor_agent(state: MultiAgentState) -> MultiAgentState:
"""Оркестратор: определяет первый агент для задачи"""
return {**state, "next_agent": "researcher"}
def route_agent(state: MultiAgentState) -> str:
return state["next_agent"]
# Построение графа
graph = StateGraph(MultiAgentState)
graph.add_node("supervisor", supervisor_agent)
graph.add_node("researcher", researcher_agent)
graph.add_node("writer", writer_agent)
graph.add_node("reviewer", reviewer_agent)
graph.set_entry_point("supervisor")
graph.add_conditional_edges("supervisor", route_agent)
graph.add_conditional_edges("researcher", route_agent)
graph.add_conditional_edges("writer", route_agent)
graph.add_conditional_edges("reviewer", lambda s: END if s["next_agent"] == "complete" else s["next_agent"])
mas = graph.compile()
CrewAI: высокоуровневый фреймворк
from crewai import Agent, Task, Crew, Process
# Определение агентов с ролями
analyst = Agent(
role="Финансовый аналитик",
goal="Анализировать финансовые данные и выявлять тренды",
backstory="Опытный финансовый аналитик с 10 годами в investment banking",
tools=[search_tool, calculator_tool, db_query_tool],
llm=ChatOpenAI(model="gpt-4o"),
verbose=True,
)
report_writer = Agent(
role="Автор отчётов",
goal="Создавать профессиональные финансовые отчёты",
backstory="Специалист по бизнес-коммуникациям с опытом в финансах",
tools=[document_writer_tool],
llm=ChatOpenAI(model="gpt-4o"),
)
fact_checker = Agent(
role="Проверщик фактов",
goal="Верифицировать все цифры и утверждения в отчёте",
tools=[search_tool, calculator_tool],
llm=ChatOpenAI(model="gpt-4o"),
)
# Задачи
analysis_task = Task(
description="Проанализируй финансовые показатели компании X за Q1 2026",
expected_output="JSON с KPI: revenue, EBITDA, net_profit, growth_rates",
agent=analyst,
)
report_task = Task(
description="Создай инвестиционный меморандум на основе анализа",
expected_output="PDF-ready текст инвестиционного меморандума",
agent=report_writer,
context=[analysis_task],
)
# Экипаж
crew = Crew(
agents=[analyst, report_writer, fact_checker],
tasks=[analysis_task, report_task],
process=Process.sequential,
verbose=True,
)
result = crew.kickoff(inputs={"company": "ООО Пример", "period": "Q1 2026"})
Практический кейс: система автоматизации due diligence
Задача: комплексная проверка компании при M&A сделке. Включает юридический, финансовый, операционный и HR анализ.
Состав агентов:
- Financial Analyst Agent: анализ отчётности МСФО/РСБУ
- Legal Agent: проверка договорной базы, судебных споров
- HR Agent: анализ структуры персонала, текучести, ключевых сотрудников
- Risk Agent: сводный анализ рисков
- Report Agent: финальный отчёт Due Diligence
Инфраструктура: LangGraph, каждый агент с доступом к специализированным RAG-индексам.
Результаты:
- Сокращение времени DD: с 4 недель до 3 дней (для стандартных кейсов)
- Покрытие (доля проверяемых аспектов): 78% → 94%
- Человек-в-петле: финальная валидация каждого раздела
- Стоимость одного DD: снижение на 67%
Коммуникация между агентами
# Паттерн: агенты передают структурированные сообщения через shared state
class AgentMessage:
source_agent: str
target_agent: str
message_type: str # "request", "result", "error"
content: dict
priority: int
# Message queue для асинхронной коммуникации
import asyncio
from asyncio import Queue
class AgentCommunicationBus:
def __init__(self):
self.queues: dict[str, Queue] = {}
def register_agent(self, agent_id: str):
self.queues[agent_id] = Queue()
async def send(self, msg: AgentMessage):
await self.queues[msg.target_agent].put(msg)
async def receive(self, agent_id: str) -> AgentMessage:
return await self.queues[agent_id].get()
Сроки
- Проектирование архитектуры MAS: 1–2 недели
- Разработка базовых агентов (3–5): 3–5 недель
- Интеграция коммуникации и тестирование: 2–3 недели
- Мониторинг и production hardening: 1–2 недели
- Итого: 7–12 недель







