Multi-Agent Orchestration
Patterns and frameworks for coordinating multiple specialized AI agents to solve complex tasks.
Why Multi-Agent Systems?
Single agents hit limitations on complex tasks. Multi-agent systems address this by:
Specialization
Different agents excel at different tasks (coding, research, writing)
Parallelization
Multiple agents can work on subtasks simultaneously
Verification
Agents can check each other's work (critic patterns)
Robustness
Multiple perspectives reduce single points of failure
Complexity Trade-off
Orchestration Patterns
SUPERVISOR PEER-TO-PEER DEBATE
──────────────────── ──────────────────── ────────────────────
┌──────────┐ ┌───┐ ┌──────────┐
│SUPERVISOR│ ┌───┤ A ├───┐ │ PROPOSER │
└────┬─────┘ │ └───┘ │ └────┬─────┘
│ │ │ │ │
┌────┼────┐ ┌▼─┐ │ ┌─▼┐ ┌────▼─────┐
│ │ │ │B │◄─┼──►│ C│ │ CRITIC │
▼ ▼ ▼ └──┘ │ └──┘ └────┬─────┘
┌──┐ ┌──┐ ┌──┐ ┌▼─┐ │
│W1│ │W2│ │W3│ │ D│ ┌────▼─────┐
└──┘ └──┘ └──┘ └──┘ │ JUDGE │
└──────────┘
MIXTURE OF EXPERTS HIERARCHICAL SEQUENTIAL
──────────────────── ──────────────────── ────────────────────
┌────────┐ ┌────┐ ┌──┐ ┌──┐ ┌──┐
│ ROUTER │ │LEAD│ │A1├──►│A2├──►│A3│
└───┬────┘ └─┬──┘ └──┘ └──┘ └──┘
│ ┌──┼──┐
┌────┼────┐ ▼ ▼ ▼
│ │ │ ┌──┐┌──┐┌──┐
▼ ▼ ▼ │M1││M2││M3│
┌──┐ ┌──┐ ┌──┐ └┬─┘└┬─┘└┬─┘
│E1│ │E2│ │E3│ │ │ │
└──┘ └──┘ └──┘ ┌─┼───┼───┼─┐
│ │ │ ▼ ▼ ▼ ▼ ▼
└────┼────┘ ┌──┐ ┌──┐ ┌──┐
▼ │W1│ │W2│ │W3│
┌────────┐ └──┘ └──┘ └──┘
│COMBINER│
└────────┘ 1. Supervisor Pattern
A central supervisor agent coordinates specialized worker agents, deciding who handles each subtask:
# Supervisor Pattern: One agent coordinates others
class SupervisorOrchestrator:
supervisor: Agent # Makes routing decisions
workers: Map<string, Agent> # Specialized workers
function process(task):
conversation = [task]
while not isComplete(conversation):
# Supervisor decides next step
decision = supervisor.decide(
task: task,
conversation: conversation,
availableWorkers: workers.keys()
)
if decision.action == "DELEGATE":
# Route to worker agent
worker = workers.get(decision.targetWorker)
result = worker.execute(decision.subTask)
conversation.append(result)
elif decision.action == "RESPOND":
# Supervisor provides final answer
return decision.response
elif decision.action == "CLARIFY":
# Need more information
return requestInput(decision.question)
return synthesize(conversation)
# Supervisor prompt
SUPERVISOR_PROMPT = """
You are a supervisor coordinating specialized workers.
Available workers:
- researcher: Searches and gathers information
- analyst: Analyzes data and creates reports
- writer: Drafts documents and communications
- coder: Writes and reviews code
For each user request, decide:
1. Which worker(s) should handle it
2. What specific task to assign them
3. When you have enough information to respond
Output your decision as:
{
"action": "DELEGATE" | "RESPOND" | "CLARIFY",
"targetWorker": "worker_name", // if DELEGATE
"subTask": "specific instructions", // if DELEGATE
"response": "final answer" // if RESPOND
}
""" from langgraph.graph import StateGraph, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from typing import TypedDict, Literal
# Define state
class OrchestratorState(TypedDict):
messages: list
next_worker: str | None
# Create specialized agents
llm = ChatOpenAI(model="gpt-4")
researcher = create_react_agent(
llm,
tools=[search_tool, browse_tool],
state_modifier="You are a research specialist."
)
analyst = create_react_agent(
llm,
tools=[analyze_tool, chart_tool],
state_modifier="You are a data analyst."
)
writer = create_react_agent(
llm,
tools=[write_tool, format_tool],
state_modifier="You are a technical writer."
)
# Supervisor decides routing
def supervisor_node(state: OrchestratorState):
"""Supervisor decides which worker to invoke next."""
messages = state["messages"]
response = llm.invoke([
{"role": "system", "content": SUPERVISOR_PROMPT},
*messages,
{"role": "user", "content": "What should happen next?"}
])
# Parse decision
decision = parse_supervisor_response(response.content)
if decision["action"] == "RESPOND":
return {"messages": messages + [response], "next_worker": None}
return {"messages": messages, "next_worker": decision["target_worker"]}
def route_to_worker(state: OrchestratorState) -> Literal["researcher", "analyst", "writer", "end"]:
"""Route to appropriate worker or end."""
if state["next_worker"] is None:
return "end"
return state["next_worker"]
# Build the graph
workflow = StateGraph(OrchestratorState)
# Add nodes
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("researcher", researcher)
workflow.add_node("analyst", analyst)
workflow.add_node("writer", writer)
# Add edges
workflow.set_entry_point("supervisor")
workflow.add_conditional_edges(
"supervisor",
route_to_worker,
{
"researcher": "researcher",
"analyst": "analyst",
"writer": "writer",
"end": END
}
)
# Workers return to supervisor
for worker in ["researcher", "analyst", "writer"]:
workflow.add_edge(worker, "supervisor")
# Compile
app = workflow.compile()
# Run
result = app.invoke({
"messages": [{"role": "user", "content": "Research AI trends and write a summary"}],
"next_worker": None
}) using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
using OpenAI;
public class SupervisorOrchestrator
{
private readonly AIAgent _supervisor;
private readonly Dictionary<string, AIAgent> _workers;
private readonly IChatClient _chatClient;
public SupervisorOrchestrator(string apiKey)
{
_chatClient = new OpenAIClient(apiKey)
.GetChatClient("gpt-4o")
.AsIChatClient();
_supervisor = _chatClient.CreateAIAgent(
name: "Supervisor",
instructions: SUPERVISOR_PROMPT
);
_workers = new Dictionary<string, AIAgent>
{
["researcher"] = _chatClient.CreateAIAgent(
name: "Researcher",
instructions: "You are a research specialist..."
),
["analyst"] = _chatClient.CreateAIAgent(
name: "Analyst",
instructions: "You are a data analyst..."
),
["writer"] = _chatClient.CreateAIAgent(
name: "Writer",
instructions: "You are a technical writer..."
)
};
}
public async Task<string> ProcessAsync(string task)
{
var supervisorThread = _supervisor.GetNewThread();
var context = new List<string> { task };
while (true)
{
// Get supervisor decision
var decisionPrompt = $"Task: {task}\nContext: {string.Join("\n", context)}";
var response = await _supervisor.RunAsync(decisionPrompt, supervisorThread);
var decision = JsonSerializer.Deserialize<SupervisorDecision>(response)!;
if (decision.Action == "RESPOND")
{
return decision.Response!;
}
if (decision.Action == "DELEGATE")
{
var worker = _workers[decision.TargetWorker!];
var workerThread = worker.GetNewThread();
// Execute worker task
var workerResult = await worker.RunAsync(
decision.SubTask!,
workerThread
);
context.Add($"[{decision.TargetWorker}]: {workerResult}");
}
}
}
}
public record SupervisorDecision(
string Action,
string? TargetWorker = null,
string? SubTask = null,
string? Response = null
);
// Usage
var orchestrator = new SupervisorOrchestrator("your-api-key");
var result = await orchestrator.ProcessAsync(
"Research AI trends and write a summary"
); Pros
- Clear control flow
- Easy to debug and trace
- Central point for monitoring
Cons
- Single point of failure
- Supervisor can become bottleneck
- Requires good supervisor prompting
2. Peer-to-Peer Pattern
Agents communicate directly with each other without a central coordinator:
# Peer-to-Peer Pattern: Agents communicate directly
class PeerNetwork:
agents: Map<string, Agent>
messageQueue: Queue
function broadcast(sender, message):
for agent in agents.values():
if agent.name != sender:
messageQueue.enqueue({
from: sender,
to: agent.name,
content: message
})
function sendTo(sender, recipient, message):
messageQueue.enqueue({
from: sender,
to: recipient,
content: message
})
function processMessages():
while not messageQueue.isEmpty():
msg = messageQueue.dequeue()
recipient = agents.get(msg.to)
response = recipient.receive(msg.from, msg.content)
if response.hasReply:
sendTo(msg.to, msg.from, response.reply)
# Agent with peer communication
class PeerAgent:
name: string
capabilities: []
peers: Map<string, PeerInfo>
function receive(sender, message):
# Decide how to handle incoming message
if canHandle(message):
result = process(message)
return { reply: result }
else:
# Find a peer who can help
bestPeer = findBestPeer(message)
if bestPeer:
return { forward: bestPeer, message: message }
else:
return { reply: "Cannot handle this request" }
function requestHelp(task):
# Broadcast request to peers
responses = broadcast(self.name, {
type: "HELP_REQUEST",
task: task,
requiredCapabilities: inferCapabilities(task)
})
# Collect and synthesize responses
return synthesize(responses) import asyncio
from dataclasses import dataclass, field
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
@dataclass
class Message:
sender: str
recipient: str
content: dict
message_type: str = "REQUEST"
@dataclass
class PeerAgent:
name: str
capabilities: list[str]
system_prompt: str
llm: ChatOpenAI = field(default_factory=lambda: ChatOpenAI(model="gpt-4"))
inbox: asyncio.Queue = field(default_factory=asyncio.Queue)
async def receive(self, message: Message) -> dict:
"""Handle incoming message."""
if message.message_type == "REQUEST":
if self._can_handle(message.content):
result = await self._process(message.content)
return {"status": "completed", "result": result}
return {"status": "cannot_handle"}
elif message.message_type == "HELP_REQUEST":
if self._can_help(message.content.get("required_capabilities", [])):
return {
"status": "can_help",
"agent": self.name,
"capabilities": self.capabilities
}
return {"status": "cannot_help"}
return {"status": "unknown_message_type"}
async def _process(self, content: dict) -> str:
"""Process task using LangChain."""
prompt = ChatPromptTemplate.from_messages([
("system", self.system_prompt),
("user", "{task}")
])
chain = prompt | self.llm
response = await chain.ainvoke({"task": content.get("task", str(content))})
return response.content
def _can_handle(self, content: dict) -> bool:
required = content.get("required_capabilities", [])
return all(cap in self.capabilities for cap in required)
def _can_help(self, required: list[str]) -> bool:
return any(cap in self.capabilities for cap in required)
class PeerNetwork:
def __init__(self):
self.agents: dict[str, PeerAgent] = {}
def register(self, agent: PeerAgent):
self.agents[agent.name] = agent
async def send(self, message: Message) -> dict:
recipient = self.agents.get(message.recipient)
if not recipient:
return {"status": "agent_not_found"}
return await recipient.receive(message)
async def broadcast(
self, sender: str, content: dict, message_type: str = "REQUEST"
) -> list[dict]:
tasks = [
agent.receive(Message(sender, name, content, message_type))
for name, agent in self.agents.items() if name != sender
]
return await asyncio.gather(*tasks)
async def request_help(
self, requester: str, task: dict, required_capabilities: list[str]
) -> str | None:
responses = await self.broadcast(
requester,
{"task": task, "required_capabilities": required_capabilities},
"HELP_REQUEST"
)
helpers = [r for r in responses if r.get("status") == "can_help"]
if helpers:
return max(helpers, key=lambda h: len(
set(h["capabilities"]) & set(required_capabilities)
))["agent"]
return None
# Usage
async def main():
network = PeerNetwork()
researcher = PeerAgent(
name="researcher",
capabilities=["search", "browse", "summarize"],
system_prompt="You are a research specialist. Search and summarize information."
)
coder = PeerAgent(
name="coder",
capabilities=["code", "debug", "test"],
system_prompt="You are an expert programmer. Write clean, efficient code."
)
network.register(researcher)
network.register(coder)
# Find helper and execute task
helper = await network.request_help(
requester="user",
task={"description": "Write a Python script"},
required_capabilities=["code"]
)
if helper:
result = await network.send(Message(
"user", helper, {"task": "Write a hello world script"}, "REQUEST"
))
print(result)
asyncio.run(main()) When to Use
3. Debate Pattern
Agents argue and critique each other to reach better conclusions:
# Debate Pattern: Agents argue to reach better conclusions
class DebateOrchestrator:
proposer: Agent # Makes initial proposal
critic: Agent # Challenges and finds flaws
judge: Agent # Evaluates arguments
function debate(topic, maxRounds = 3):
# Initial proposal
proposal = proposer.generate(topic)
for round in range(maxRounds):
# Critic challenges the proposal
critique = critic.challenge(
topic: topic,
proposal: proposal,
findFlaws: true,
suggestAlternatives: true
)
if critique.noSignificantFlaws:
break
# Proposer defends or revises
response = proposer.respond(
originalProposal: proposal,
critique: critique,
canRevise: true
)
proposal = response.revisedProposal or proposal
# Judge evaluates final proposal
verdict = judge.evaluate(
topic: topic,
finalProposal: proposal,
debateHistory: getAllExchanges()
)
return {
conclusion: proposal,
confidence: verdict.confidence,
reasoning: verdict.reasoning
}
# Multi-perspective variant
class MultiPerspectiveDebate:
perspectives: [
{ name: "optimist", bias: "focus on benefits and opportunities" },
{ name: "pessimist", bias: "focus on risks and downsides" },
{ name: "pragmatist", bias: "focus on feasibility and implementation" },
{ name: "ethicist", bias: "focus on moral implications" }
]
function analyze(topic):
analyses = []
for perspective in perspectives:
analysis = llm.generate(
prompt: analyzeFromPerspective(topic, perspective)
)
analyses.append(analysis)
# Synthesize all perspectives
synthesis = llm.generate(
prompt: synthesizeAnalyses(topic, analyses)
)
return synthesis from dataclasses import dataclass
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
@dataclass
class DebateRound:
proposal: str
critique: str
response: str
round_number: int
class DebateOrchestrator:
def __init__(self, model: str = "gpt-4"):
self.llm = ChatOpenAI(model=model)
self.json_llm = ChatOpenAI(model=model).bind(
response_format={"type": "json_object"}
)
def debate(self, topic: str, max_rounds: int = 3) -> dict:
"""Run a debate between proposer and critic."""
rounds: list[DebateRound] = []
# Initial proposal
proposal = self._generate_proposal(topic)
for round_num in range(max_rounds):
# Critic challenges
critique = self._generate_critique(topic, proposal, rounds)
# Check if critique found significant issues
if self._no_significant_flaws(critique):
break
# Proposer responds
response, revised = self._generate_response(
topic, proposal, critique
)
rounds.append(DebateRound(
proposal=proposal,
critique=critique,
response=response,
round_number=round_num
))
proposal = revised if revised else proposal
# Judge evaluates
verdict = self._judge_debate(topic, proposal, rounds)
return {
"conclusion": proposal,
"confidence": verdict["confidence"],
"reasoning": verdict["reasoning"],
"rounds": len(rounds)
}
def _generate_proposal(self, topic: str) -> str:
prompt = ChatPromptTemplate.from_messages([
("system", "You are a thoughtful proposer. Present a well-reasoned position."),
("user", "Topic: {topic}\n\nPresent your proposal with supporting arguments.")
])
chain = prompt | self.llm
return chain.invoke({"topic": topic}).content
def _generate_critique(
self, topic: str, proposal: str, history: list[DebateRound]
) -> str:
prompt = ChatPromptTemplate.from_messages([
("system", """You are a rigorous critic. Find flaws, gaps, and weaknesses.
Be specific about what's wrong and suggest alternatives.
If the proposal is solid, acknowledge it."""),
("user", """Topic: {topic}
Current proposal:
{proposal}
Previous debate:
{history}
Provide your critique. Be thorough but fair.""")
])
chain = prompt | self.llm
return chain.invoke({
"topic": topic,
"proposal": proposal,
"history": self._format_history(history)
}).content
def _generate_response(
self, topic: str, proposal: str, critique: str
) -> tuple[str, str | None]:
prompt = ChatPromptTemplate.from_messages([
("system", """You are the proposer defending your position.
Address valid criticisms and revise your proposal if needed."""),
("user", """Topic: {topic}
Your proposal:
{proposal}
Critique: {critique}
Respond. If revising, mark as REVISED PROPOSAL:""")
])
chain = prompt | self.llm
content = chain.invoke({
"topic": topic, "proposal": proposal, "critique": critique
}).content
if "REVISED PROPOSAL:" in content:
parts = content.split("REVISED PROPOSAL:")
return parts[0].strip(), parts[1].strip()
return content, None
def _judge_debate(
self, topic: str, final_proposal: str, rounds: list[DebateRound]
) -> dict:
prompt = ChatPromptTemplate.from_messages([
("system", """You are an impartial judge.
Return JSON: {{"confidence": 0-100, "reasoning": "..."}}"""),
("user", "Topic: {topic}\nProposal: {proposal}\nHistory: {history}")
])
chain = prompt | self.json_llm | JsonOutputParser()
return chain.invoke({
"topic": topic,
"proposal": final_proposal,
"history": self._format_history(rounds)
})
# Usage
orchestrator = DebateOrchestrator()
result = orchestrator.debate("Should AI systems be given autonomy in hiring decisions?")
print(f"Conclusion (confidence: {result['confidence']}%): {result['conclusion']}") Research Backing
4. Mixture of Experts (MoE)
A router selects which specialized experts should handle each task:
# Mixture of Experts: Route to specialized agents
class MixtureOfExperts:
router: Agent # Decides which expert(s) to use
experts: Map<string, Agent>
combiner: Agent # Synthesizes expert outputs
function process(task):
# Router analyzes task and selects experts
routing = router.analyze(task)
# Get outputs from selected experts
expertOutputs = []
for expert in routing.selectedExperts:
weight = routing.weights[expert.name]
output = experts.get(expert.name).process(task)
expertOutputs.append({
expert: expert.name,
output: output,
weight: weight
})
# Combine expert outputs
if routing.combinationStrategy == "WEIGHTED_MERGE":
return weightedMerge(expertOutputs)
elif routing.combinationStrategy == "BEST_OF":
return selectBest(expertOutputs)
else:
return combiner.synthesize(task, expertOutputs)
# Router prompt
ROUTER_PROMPT = """
Analyze this task and select the best expert(s).
Available experts:
- code_expert: Programming, debugging, code review
- data_expert: Data analysis, statistics, visualization
- writing_expert: Documentation, communication, editing
- research_expert: Information gathering, summarization
For the given task:
1. Which expert(s) should handle it? (1-3 experts)
2. What weight should each have? (0.0-1.0, sum to 1.0)
3. How should outputs be combined?
Return as JSON:
{
"selected_experts": ["expert1", "expert2"],
"weights": {"expert1": 0.7, "expert2": 0.3},
"combination_strategy": "WEIGHTED_MERGE" | "BEST_OF" | "SYNTHESIZE"
}
""" from dataclasses import dataclass
from typing import Literal
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
@dataclass
class RoutingDecision:
selected_experts: list[str]
weights: dict[str, float]
combination_strategy: Literal["WEIGHTED_MERGE", "BEST_OF", "SYNTHESIZE"]
@dataclass
class ExpertOutput:
expert_name: str
output: str
weight: float
class MixtureOfExperts:
def __init__(self, model: str = "gpt-4"):
self.llm = ChatOpenAI(model=model)
self.json_llm = ChatOpenAI(model=model).bind(
response_format={"type": "json_object"}
)
self.experts: dict[str, str] = {} # name -> system prompt
def register_expert(self, name: str, system_prompt: str):
self.experts[name] = system_prompt
def process(self, task: str) -> str:
# Route to experts
routing = self._route(task)
# Get expert outputs
outputs = []
for expert_name in routing.selected_experts:
output = self._query_expert(expert_name, task)
outputs.append(ExpertOutput(
expert_name=expert_name,
output=output,
weight=routing.weights.get(expert_name, 1.0)
))
# Combine outputs
return self._combine(task, outputs, routing.combination_strategy)
def _route(self, task: str) -> RoutingDecision:
expert_list = "\n".join([
f"- {name}: {prompt[:100]}..."
for name, prompt in self.experts.items()
])
prompt = ChatPromptTemplate.from_messages([
("system", f"""You are a routing agent. Select the best expert(s).
Available experts:
{expert_list}
Return JSON: {{"selected_experts": [...], "weights": {{}}, "combination_strategy": "..."}}"""),
("user", "Task: {task}")
])
chain = prompt | self.json_llm | JsonOutputParser()
data = chain.invoke({"task": task})
return RoutingDecision(**data)
def _query_expert(self, expert_name: str, task: str) -> str:
system_prompt = self.experts.get(expert_name, "You are a helpful assistant.")
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("user", "{task}")
])
chain = prompt | self.llm
return chain.invoke({"task": task}).content
def _combine(
self, task: str, outputs: list[ExpertOutput], strategy: str
) -> str:
if strategy == "BEST_OF":
return max(outputs, key=lambda o: o.weight).output
elif strategy == "WEIGHTED_MERGE":
parts = [
f"[{o.expert_name} ({o.weight:.0%})]: {o.output}"
for o in sorted(outputs, key=lambda o: -o.weight)
]
return "\n\n".join(parts)
else: # SYNTHESIZE
outputs_text = "\n\n".join([
f"Expert: {o.expert_name} (weight: {o.weight})\n{o.output}"
for o in outputs
])
prompt = ChatPromptTemplate.from_messages([
("system", "Synthesize these expert opinions into a coherent response."),
("user", "Task: {task}\n\nExpert outputs:\n{outputs}")
])
chain = prompt | self.llm
return chain.invoke({"task": task, "outputs": outputs_text}).content
# Usage
moe = MixtureOfExperts()
moe.register_expert("code_expert", "You are an expert programmer...")
moe.register_expert("data_expert", "You are a data scientist...")
moe.register_expert("writing_expert", "You are a technical writer...")
result = moe.process("Create a Python script that analyzes sales data and generates a report")
print(result) Pattern Comparison
| Pattern | Best For | Coordination | Complexity |
|---|---|---|---|
| Supervisor | Clear task decomposition | Centralized | Medium |
| Peer-to-Peer | Loosely-coupled tasks | Decentralized | High |
| Debate | Quality/accuracy critical | Turn-based | Medium |
| MoE | Varied task types | Router-based | Medium |
| Sequential | Pipeline workflows | Linear | Low |
| Hierarchical | Large-scale systems | Tree structure | High |
Framework Comparison
Several frameworks provide multi-agent orchestration out of the box:
# LangGraph: Graph-based orchestration
from langgraph.graph import StateGraph, END
from typing import TypedDict
class State(TypedDict):
messages: list
current_agent: str
def create_langgraph_workflow():
workflow = StateGraph(State)
# Add nodes for each agent
workflow.add_node("researcher", researcher_node)
workflow.add_node("writer", writer_node)
workflow.add_node("reviewer", reviewer_node)
# Add routing logic
workflow.add_conditional_edges(
"researcher",
lambda s: "writer" if s["research_complete"] else "researcher"
)
workflow.add_edge("writer", "reviewer")
workflow.add_conditional_edges(
"reviewer",
lambda s: END if s["approved"] else "writer"
)
workflow.set_entry_point("researcher")
return workflow.compile()
# Key characteristics:
# - Explicit state management
# - Visual graph representation
# - Built-in persistence and streaming
# - Good for complex, stateful workflows # LangGraph: Sequential multi-agent with handoffs
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated
from operator import add
class TeamState(TypedDict):
task: str
research: str
draft: str
feedback: str
final: str
messages: Annotated[list, add]
def create_sequential_team():
llm = ChatOpenAI(model="gpt-4")
def researcher_node(state: TeamState) -> dict:
response = llm.invoke([
("system", "You are a research analyst. Research the topic thoroughly."),
("user", f"Research this: {state['task']}")
])
return {"research": response.content}
def writer_node(state: TeamState) -> dict:
response = llm.invoke([
("system", "You are a technical writer. Write clear content."),
("user", f"Write about: {state['task']}\n\nResearch: {state['research']}")
])
return {"draft": response.content}
def reviewer_node(state: TeamState) -> dict:
response = llm.invoke([
("system", "You are an editor. Review and provide feedback."),
("user", f"Review this draft: {state['draft']}")
])
return {"feedback": response.content, "final": state["draft"]}
# Build sequential workflow
workflow = StateGraph(TeamState)
workflow.add_node("researcher", researcher_node)
workflow.add_node("writer", writer_node)
workflow.add_node("reviewer", reviewer_node)
workflow.set_entry_point("researcher")
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", "reviewer")
workflow.add_edge("reviewer", END)
return workflow.compile()
# Key characteristics:
# - Explicit handoffs between agents
# - Shared state across all agents
# - Clear sequential flow
# - Good for pipeline-style workflows # LangGraph: Supervisor-based multi-agent
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Literal
import json
class SupervisorState(TypedDict):
task: str
messages: list
next_agent: str | None
def create_supervisor_team():
llm = ChatOpenAI(model="gpt-4")
agents = ["researcher", "writer", "critic"]
def supervisor_node(state: SupervisorState) -> dict:
response = llm.invoke([
("system", f"""You are a supervisor managing: {agents}.
Based on the task and conversation, decide:
- Which agent should act next, OR
- If task is complete, respond with "FINISH"
Return JSON: {{"next": "agent_name" or "FINISH", "instruction": "..."}}"""),
("user", f"Task: {state['task']}\nHistory: {state['messages']}")
])
decision = json.loads(response.content)
return {"next_agent": decision["next"], "messages": state["messages"] + [decision]}
def agent_node(agent_name: str):
def node(state: SupervisorState) -> dict:
instruction = state["messages"][-1].get("instruction", state["task"])
response = llm.invoke([
("system", f"You are the {agent_name}. Complete your assigned task."),
("user", instruction)
])
return {"messages": state["messages"] + [{"agent": agent_name, "response": response.content}]}
return node
def route(state: SupervisorState) -> Literal["researcher", "writer", "critic", "__end__"]:
if state["next_agent"] == "FINISH":
return END
return state["next_agent"]
workflow = StateGraph(SupervisorState)
workflow.add_node("supervisor", supervisor_node)
for agent in agents:
workflow.add_node(agent, agent_node(agent))
workflow.add_edge(agent, "supervisor")
workflow.set_entry_point("supervisor")
workflow.add_conditional_edges("supervisor", route)
return workflow.compile()
# Key characteristics:
# - Dynamic agent selection
# - Supervisor controls workflow
# - Flexible task routing
# - Good for complex, adaptive workflows // Microsoft Agent Framework: Multi-agent workflows
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
using OpenAI;
async Task CreateAgentFrameworkWorkflow()
{
var chatClient = new OpenAIClient(apiKey)
.GetChatClient("gpt-4o")
.AsIChatClient();
// Create specialized agents
var researcher = chatClient.CreateAIAgent(
name: "Researcher",
instructions: "You research topics thoroughly..."
);
var writer = chatClient.CreateAIAgent(
name: "Writer",
instructions: "You write clear articles..."
);
var critic = chatClient.CreateAIAgent(
name: "Critic",
instructions: "You provide constructive feedback..."
);
// Sequential workflow with thread-based memory
var researchThread = researcher.GetNewThread();
var researchResult = await researcher.RunAsync(
"Research AI agents and their applications",
researchThread
);
var writerThread = writer.GetNewThread();
var article = await writer.RunAsync(
$"Write an article based on: {researchResult}",
writerThread
);
var criticThread = critic.GetNewThread();
var feedback = await critic.RunAsync(
$"Review this article: {article}",
criticThread
);
// Revision loop
var finalArticle = await writer.RunAsync(
$"Revise based on feedback: {feedback}",
writerThread
);
Console.WriteLine(finalArticle);
}
// Key characteristics:
// - .NET native with Python parity
// - Thread-based conversation memory
// - Simple, intuitive API
// - Enterprise-ready (Azure integration) | Framework | Language | Paradigm | Best For |
|---|---|---|---|
| LangGraph (Graph) | Python | Graph-based | Complex stateful workflows |
| LangGraph (Sequential) | Python | Pipeline | Linear multi-step workflows |
| LangGraph (Supervisor) | Python | Hierarchical | Dynamic agent routing |
| Agent Framework | C#/.NET | Thread-based | Enterprise applications |
Framework comparison
Evaluation Metrics
| Metric | What it Measures | How to Calculate |
|---|---|---|
| Task Completion | Did the system solve the task? | Binary or graded evaluation |
| Coordination Overhead | Extra cost from multi-agent | Total tokens / single-agent tokens |
| Latency | Time to completion | Wall clock time |
| Agent Utilization | Are all agents contributing? | Messages per agent |
| Redundancy | Duplicate work across agents | Semantic similarity of outputs |
Key metrics for multi-agent systems