Multi-Agent Orchestration

Patterns and frameworks for coordinating multiple specialized AI agents to solve complex tasks.

Why Multi-Agent Systems?

Single agents hit limitations on complex tasks. Multi-agent systems address this by:

Specialization

Different agents excel at different tasks (coding, research, writing)

Parallelization

Multiple agents can work on subtasks simultaneously

Verification

Agents can check each other's work (critic patterns)

Robustness

Multiple perspectives reduce single points of failure

Complexity Trade-off

Multi-agent systems add coordination overhead, debugging complexity, and cost. Use them when a single agent genuinely can't handle the task.

Orchestration Patterns

Multi-Agent Orchestration Patterns
SUPERVISOR                 PEER-TO-PEER              DEBATE
────────────────────       ────────────────────      ────────────────────

    ┌──────────┐               ┌───┐                   ┌──────────┐
    │SUPERVISOR│           ┌───┤ A ├───┐               │ PROPOSER │
    └────┬─────┘           │   └───┘   │               └────┬─────┘
         │                 │     │     │                    │
    ┌────┼────┐           ┌▼─┐  │   ┌─▼┐              ┌────▼─────┐
    │    │    │           │B │◄─┼──►│ C│              │  CRITIC  │
    ▼    ▼    ▼           └──┘  │   └──┘              └────┬─────┘
  ┌──┐ ┌──┐ ┌──┐               ┌▼─┐                        │
  │W1│ │W2│ │W3│               │ D│                   ┌────▼─────┐
  └──┘ └──┘ └──┘               └──┘                   │  JUDGE   │
                                                      └──────────┘


MIXTURE OF EXPERTS         HIERARCHICAL              SEQUENTIAL
────────────────────       ────────────────────      ────────────────────

      ┌────────┐                ┌────┐               ┌──┐   ┌──┐   ┌──┐
      │ ROUTER │                │LEAD│               │A1├──►│A2├──►│A3│
      └───┬────┘                └─┬──┘               └──┘   └──┘   └──┘
          │                   ┌──┼──┐
     ┌────┼────┐              ▼  ▼  ▼
     │    │    │            ┌──┐┌──┐┌──┐
     ▼    ▼    ▼            │M1││M2││M3│
   ┌──┐ ┌──┐ ┌──┐           └┬─┘└┬─┘└┬─┘
   │E1│ │E2│ │E3│            │   │   │
   └──┘ └──┘ └──┘          ┌─┼───┼───┼─┐
     │    │    │           ▼ ▼   ▼   ▼ ▼
     └────┼────┘          ┌──┐ ┌──┐ ┌──┐
          ▼               │W1│ │W2│ │W3│
     ┌────────┐           └──┘ └──┘ └──┘
     │COMBINER│
     └────────┘

1. Supervisor Pattern

A central supervisor agent coordinates specialized worker agents, deciding who handles each subtask:

Supervisor Pattern Implementation
# Supervisor Pattern: One agent coordinates others

class SupervisorOrchestrator:
    supervisor: Agent  # Makes routing decisions
    workers: Map<string, Agent>  # Specialized workers

    function process(task):
        conversation = [task]

        while not isComplete(conversation):
            # Supervisor decides next step
            decision = supervisor.decide(
                task: task,
                conversation: conversation,
                availableWorkers: workers.keys()
            )

            if decision.action == "DELEGATE":
                # Route to worker agent
                worker = workers.get(decision.targetWorker)
                result = worker.execute(decision.subTask)
                conversation.append(result)

            elif decision.action == "RESPOND":
                # Supervisor provides final answer
                return decision.response

            elif decision.action == "CLARIFY":
                # Need more information
                return requestInput(decision.question)

        return synthesize(conversation)

# Supervisor prompt
SUPERVISOR_PROMPT = """
You are a supervisor coordinating specialized workers.

Available workers:
- researcher: Searches and gathers information
- analyst: Analyzes data and creates reports
- writer: Drafts documents and communications
- coder: Writes and reviews code

For each user request, decide:
1. Which worker(s) should handle it
2. What specific task to assign them
3. When you have enough information to respond

Output your decision as:
{
    "action": "DELEGATE" | "RESPOND" | "CLARIFY",
    "targetWorker": "worker_name",  // if DELEGATE
    "subTask": "specific instructions",  // if DELEGATE
    "response": "final answer"  // if RESPOND
}
"""
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from typing import TypedDict, Literal

# Define state
class OrchestratorState(TypedDict):
    messages: list
    next_worker: str | None

# Create specialized agents
llm = ChatOpenAI(model="gpt-4")

researcher = create_react_agent(
    llm,
    tools=[search_tool, browse_tool],
    state_modifier="You are a research specialist."
)

analyst = create_react_agent(
    llm,
    tools=[analyze_tool, chart_tool],
    state_modifier="You are a data analyst."
)

writer = create_react_agent(
    llm,
    tools=[write_tool, format_tool],
    state_modifier="You are a technical writer."
)

# Supervisor decides routing
def supervisor_node(state: OrchestratorState):
    """Supervisor decides which worker to invoke next."""
    messages = state["messages"]

    response = llm.invoke([
        {"role": "system", "content": SUPERVISOR_PROMPT},
        *messages,
        {"role": "user", "content": "What should happen next?"}
    ])

    # Parse decision
    decision = parse_supervisor_response(response.content)

    if decision["action"] == "RESPOND":
        return {"messages": messages + [response], "next_worker": None}

    return {"messages": messages, "next_worker": decision["target_worker"]}

def route_to_worker(state: OrchestratorState) -> Literal["researcher", "analyst", "writer", "end"]:
    """Route to appropriate worker or end."""
    if state["next_worker"] is None:
        return "end"
    return state["next_worker"]

# Build the graph
workflow = StateGraph(OrchestratorState)

# Add nodes
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("researcher", researcher)
workflow.add_node("analyst", analyst)
workflow.add_node("writer", writer)

# Add edges
workflow.set_entry_point("supervisor")
workflow.add_conditional_edges(
    "supervisor",
    route_to_worker,
    {
        "researcher": "researcher",
        "analyst": "analyst",
        "writer": "writer",
        "end": END
    }
)

# Workers return to supervisor
for worker in ["researcher", "analyst", "writer"]:
    workflow.add_edge(worker, "supervisor")

# Compile
app = workflow.compile()

# Run
result = app.invoke({
    "messages": [{"role": "user", "content": "Research AI trends and write a summary"}],
    "next_worker": None
})
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
using OpenAI;

public class SupervisorOrchestrator
{
    private readonly AIAgent _supervisor;
    private readonly Dictionary<string, AIAgent> _workers;
    private readonly IChatClient _chatClient;

    public SupervisorOrchestrator(string apiKey)
    {
        _chatClient = new OpenAIClient(apiKey)
            .GetChatClient("gpt-4o")
            .AsIChatClient();

        _supervisor = _chatClient.CreateAIAgent(
            name: "Supervisor",
            instructions: SUPERVISOR_PROMPT
        );

        _workers = new Dictionary<string, AIAgent>
        {
            ["researcher"] = _chatClient.CreateAIAgent(
                name: "Researcher",
                instructions: "You are a research specialist..."
            ),
            ["analyst"] = _chatClient.CreateAIAgent(
                name: "Analyst",
                instructions: "You are a data analyst..."
            ),
            ["writer"] = _chatClient.CreateAIAgent(
                name: "Writer",
                instructions: "You are a technical writer..."
            )
        };
    }

    public async Task<string> ProcessAsync(string task)
    {
        var supervisorThread = _supervisor.GetNewThread();
        var context = new List<string> { task };

        while (true)
        {
            // Get supervisor decision
            var decisionPrompt = $"Task: {task}\nContext: {string.Join("\n", context)}";
            var response = await _supervisor.RunAsync(decisionPrompt, supervisorThread);
            var decision = JsonSerializer.Deserialize<SupervisorDecision>(response)!;

            if (decision.Action == "RESPOND")
            {
                return decision.Response!;
            }

            if (decision.Action == "DELEGATE")
            {
                var worker = _workers[decision.TargetWorker!];
                var workerThread = worker.GetNewThread();

                // Execute worker task
                var workerResult = await worker.RunAsync(
                    decision.SubTask!,
                    workerThread
                );

                context.Add($"[{decision.TargetWorker}]: {workerResult}");
            }
        }
    }
}

public record SupervisorDecision(
    string Action,
    string? TargetWorker = null,
    string? SubTask = null,
    string? Response = null
);

// Usage
var orchestrator = new SupervisorOrchestrator("your-api-key");
var result = await orchestrator.ProcessAsync(
    "Research AI trends and write a summary"
);

Pros

  • Clear control flow
  • Easy to debug and trace
  • Central point for monitoring

Cons

  • Single point of failure
  • Supervisor can become bottleneck
  • Requires good supervisor prompting

2. Peer-to-Peer Pattern

Agents communicate directly with each other without a central coordinator:

Peer-to-Peer Pattern Implementation
# Peer-to-Peer Pattern: Agents communicate directly

class PeerNetwork:
    agents: Map<string, Agent>
    messageQueue: Queue

    function broadcast(sender, message):
        for agent in agents.values():
            if agent.name != sender:
                messageQueue.enqueue({
                    from: sender,
                    to: agent.name,
                    content: message
                })

    function sendTo(sender, recipient, message):
        messageQueue.enqueue({
            from: sender,
            to: recipient,
            content: message
        })

    function processMessages():
        while not messageQueue.isEmpty():
            msg = messageQueue.dequeue()
            recipient = agents.get(msg.to)
            response = recipient.receive(msg.from, msg.content)

            if response.hasReply:
                sendTo(msg.to, msg.from, response.reply)

# Agent with peer communication
class PeerAgent:
    name: string
    capabilities: []
    peers: Map<string, PeerInfo>

    function receive(sender, message):
        # Decide how to handle incoming message
        if canHandle(message):
            result = process(message)
            return { reply: result }
        else:
            # Find a peer who can help
            bestPeer = findBestPeer(message)
            if bestPeer:
                return { forward: bestPeer, message: message }
            else:
                return { reply: "Cannot handle this request" }

    function requestHelp(task):
        # Broadcast request to peers
        responses = broadcast(self.name, {
            type: "HELP_REQUEST",
            task: task,
            requiredCapabilities: inferCapabilities(task)
        })

        # Collect and synthesize responses
        return synthesize(responses)
import asyncio
from dataclasses import dataclass, field
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

@dataclass
class Message:
    sender: str
    recipient: str
    content: dict
    message_type: str = "REQUEST"

@dataclass
class PeerAgent:
    name: str
    capabilities: list[str]
    system_prompt: str
    llm: ChatOpenAI = field(default_factory=lambda: ChatOpenAI(model="gpt-4"))
    inbox: asyncio.Queue = field(default_factory=asyncio.Queue)

    async def receive(self, message: Message) -> dict:
        """Handle incoming message."""
        if message.message_type == "REQUEST":
            if self._can_handle(message.content):
                result = await self._process(message.content)
                return {"status": "completed", "result": result}
            return {"status": "cannot_handle"}

        elif message.message_type == "HELP_REQUEST":
            if self._can_help(message.content.get("required_capabilities", [])):
                return {
                    "status": "can_help",
                    "agent": self.name,
                    "capabilities": self.capabilities
                }
            return {"status": "cannot_help"}

        return {"status": "unknown_message_type"}

    async def _process(self, content: dict) -> str:
        """Process task using LangChain."""
        prompt = ChatPromptTemplate.from_messages([
            ("system", self.system_prompt),
            ("user", "{task}")
        ])
        chain = prompt | self.llm
        response = await chain.ainvoke({"task": content.get("task", str(content))})
        return response.content

    def _can_handle(self, content: dict) -> bool:
        required = content.get("required_capabilities", [])
        return all(cap in self.capabilities for cap in required)

    def _can_help(self, required: list[str]) -> bool:
        return any(cap in self.capabilities for cap in required)


class PeerNetwork:
    def __init__(self):
        self.agents: dict[str, PeerAgent] = {}

    def register(self, agent: PeerAgent):
        self.agents[agent.name] = agent

    async def send(self, message: Message) -> dict:
        recipient = self.agents.get(message.recipient)
        if not recipient:
            return {"status": "agent_not_found"}
        return await recipient.receive(message)

    async def broadcast(
        self, sender: str, content: dict, message_type: str = "REQUEST"
    ) -> list[dict]:
        tasks = [
            agent.receive(Message(sender, name, content, message_type))
            for name, agent in self.agents.items() if name != sender
        ]
        return await asyncio.gather(*tasks)

    async def request_help(
        self, requester: str, task: dict, required_capabilities: list[str]
    ) -> str | None:
        responses = await self.broadcast(
            requester,
            {"task": task, "required_capabilities": required_capabilities},
            "HELP_REQUEST"
        )
        helpers = [r for r in responses if r.get("status") == "can_help"]
        if helpers:
            return max(helpers, key=lambda h: len(
                set(h["capabilities"]) & set(required_capabilities)
            ))["agent"]
        return None


# Usage
async def main():
    network = PeerNetwork()

    researcher = PeerAgent(
        name="researcher",
        capabilities=["search", "browse", "summarize"],
        system_prompt="You are a research specialist. Search and summarize information."
    )

    coder = PeerAgent(
        name="coder",
        capabilities=["code", "debug", "test"],
        system_prompt="You are an expert programmer. Write clean, efficient code."
    )

    network.register(researcher)
    network.register(coder)

    # Find helper and execute task
    helper = await network.request_help(
        requester="user",
        task={"description": "Write a Python script"},
        required_capabilities=["code"]
    )

    if helper:
        result = await network.send(Message(
            "user", helper, {"task": "Write a hello world script"}, "REQUEST"
        ))
        print(result)

asyncio.run(main())

When to Use

Best for loosely-coupled tasks where agents have distinct capabilities and can self-organize. Avoid for tasks requiring tight coordination.

3. Debate Pattern

Agents argue and critique each other to reach better conclusions:

Debate Pattern Implementation
# Debate Pattern: Agents argue to reach better conclusions

class DebateOrchestrator:
    proposer: Agent    # Makes initial proposal
    critic: Agent      # Challenges and finds flaws
    judge: Agent       # Evaluates arguments

    function debate(topic, maxRounds = 3):
        # Initial proposal
        proposal = proposer.generate(topic)

        for round in range(maxRounds):
            # Critic challenges the proposal
            critique = critic.challenge(
                topic: topic,
                proposal: proposal,
                findFlaws: true,
                suggestAlternatives: true
            )

            if critique.noSignificantFlaws:
                break

            # Proposer defends or revises
            response = proposer.respond(
                originalProposal: proposal,
                critique: critique,
                canRevise: true
            )

            proposal = response.revisedProposal or proposal

        # Judge evaluates final proposal
        verdict = judge.evaluate(
            topic: topic,
            finalProposal: proposal,
            debateHistory: getAllExchanges()
        )

        return {
            conclusion: proposal,
            confidence: verdict.confidence,
            reasoning: verdict.reasoning
        }

# Multi-perspective variant
class MultiPerspectiveDebate:
    perspectives: [
        { name: "optimist", bias: "focus on benefits and opportunities" },
        { name: "pessimist", bias: "focus on risks and downsides" },
        { name: "pragmatist", bias: "focus on feasibility and implementation" },
        { name: "ethicist", bias: "focus on moral implications" }
    ]

    function analyze(topic):
        analyses = []
        for perspective in perspectives:
            analysis = llm.generate(
                prompt: analyzeFromPerspective(topic, perspective)
            )
            analyses.append(analysis)

        # Synthesize all perspectives
        synthesis = llm.generate(
            prompt: synthesizeAnalyses(topic, analyses)
        )

        return synthesis
from dataclasses import dataclass
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser

@dataclass
class DebateRound:
    proposal: str
    critique: str
    response: str
    round_number: int

class DebateOrchestrator:
    def __init__(self, model: str = "gpt-4"):
        self.llm = ChatOpenAI(model=model)
        self.json_llm = ChatOpenAI(model=model).bind(
            response_format={"type": "json_object"}
        )

    def debate(self, topic: str, max_rounds: int = 3) -> dict:
        """Run a debate between proposer and critic."""
        rounds: list[DebateRound] = []

        # Initial proposal
        proposal = self._generate_proposal(topic)

        for round_num in range(max_rounds):
            # Critic challenges
            critique = self._generate_critique(topic, proposal, rounds)

            # Check if critique found significant issues
            if self._no_significant_flaws(critique):
                break

            # Proposer responds
            response, revised = self._generate_response(
                topic, proposal, critique
            )

            rounds.append(DebateRound(
                proposal=proposal,
                critique=critique,
                response=response,
                round_number=round_num
            ))

            proposal = revised if revised else proposal

        # Judge evaluates
        verdict = self._judge_debate(topic, proposal, rounds)

        return {
            "conclusion": proposal,
            "confidence": verdict["confidence"],
            "reasoning": verdict["reasoning"],
            "rounds": len(rounds)
        }

    def _generate_proposal(self, topic: str) -> str:
        prompt = ChatPromptTemplate.from_messages([
            ("system", "You are a thoughtful proposer. Present a well-reasoned position."),
            ("user", "Topic: {topic}\n\nPresent your proposal with supporting arguments.")
        ])
        chain = prompt | self.llm
        return chain.invoke({"topic": topic}).content

    def _generate_critique(
        self, topic: str, proposal: str, history: list[DebateRound]
    ) -> str:
        prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a rigorous critic. Find flaws, gaps, and weaknesses.
Be specific about what's wrong and suggest alternatives.
If the proposal is solid, acknowledge it."""),
            ("user", """Topic: {topic}

Current proposal:
{proposal}

Previous debate:
{history}

Provide your critique. Be thorough but fair.""")
        ])
        chain = prompt | self.llm
        return chain.invoke({
            "topic": topic,
            "proposal": proposal,
            "history": self._format_history(history)
        }).content

    def _generate_response(
        self, topic: str, proposal: str, critique: str
    ) -> tuple[str, str | None]:
        prompt = ChatPromptTemplate.from_messages([
            ("system", """You are the proposer defending your position.
Address valid criticisms and revise your proposal if needed."""),
            ("user", """Topic: {topic}

Your proposal:
{proposal}

Critique: {critique}

Respond. If revising, mark as REVISED PROPOSAL:""")
        ])
        chain = prompt | self.llm
        content = chain.invoke({
            "topic": topic, "proposal": proposal, "critique": critique
        }).content

        if "REVISED PROPOSAL:" in content:
            parts = content.split("REVISED PROPOSAL:")
            return parts[0].strip(), parts[1].strip()
        return content, None

    def _judge_debate(
        self, topic: str, final_proposal: str, rounds: list[DebateRound]
    ) -> dict:
        prompt = ChatPromptTemplate.from_messages([
            ("system", """You are an impartial judge.
Return JSON: {{"confidence": 0-100, "reasoning": "..."}}"""),
            ("user", "Topic: {topic}\nProposal: {proposal}\nHistory: {history}")
        ])
        chain = prompt | self.json_llm | JsonOutputParser()
        return chain.invoke({
            "topic": topic,
            "proposal": final_proposal,
            "history": self._format_history(rounds)
        })

# Usage
orchestrator = DebateOrchestrator()
result = orchestrator.debate("Should AI systems be given autonomy in hiring decisions?")
print(f"Conclusion (confidence: {result['confidence']}%): {result['conclusion']}")

Research Backing

Debate patterns have been shown to improve factual accuracy and reduce hallucinations by forcing agents to defend their claims against criticism.

4. Mixture of Experts (MoE)

A router selects which specialized experts should handle each task:

Mixture of Experts Implementation
# Mixture of Experts: Route to specialized agents

class MixtureOfExperts:
    router: Agent  # Decides which expert(s) to use
    experts: Map<string, Agent>
    combiner: Agent  # Synthesizes expert outputs

    function process(task):
        # Router analyzes task and selects experts
        routing = router.analyze(task)

        # Get outputs from selected experts
        expertOutputs = []
        for expert in routing.selectedExperts:
            weight = routing.weights[expert.name]
            output = experts.get(expert.name).process(task)
            expertOutputs.append({
                expert: expert.name,
                output: output,
                weight: weight
            })

        # Combine expert outputs
        if routing.combinationStrategy == "WEIGHTED_MERGE":
            return weightedMerge(expertOutputs)
        elif routing.combinationStrategy == "BEST_OF":
            return selectBest(expertOutputs)
        else:
            return combiner.synthesize(task, expertOutputs)

# Router prompt
ROUTER_PROMPT = """
Analyze this task and select the best expert(s).

Available experts:
- code_expert: Programming, debugging, code review
- data_expert: Data analysis, statistics, visualization
- writing_expert: Documentation, communication, editing
- research_expert: Information gathering, summarization

For the given task:
1. Which expert(s) should handle it? (1-3 experts)
2. What weight should each have? (0.0-1.0, sum to 1.0)
3. How should outputs be combined?

Return as JSON:
{
    "selected_experts": ["expert1", "expert2"],
    "weights": {"expert1": 0.7, "expert2": 0.3},
    "combination_strategy": "WEIGHTED_MERGE" | "BEST_OF" | "SYNTHESIZE"
}
"""
from dataclasses import dataclass
from typing import Literal
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser

@dataclass
class RoutingDecision:
    selected_experts: list[str]
    weights: dict[str, float]
    combination_strategy: Literal["WEIGHTED_MERGE", "BEST_OF", "SYNTHESIZE"]

@dataclass
class ExpertOutput:
    expert_name: str
    output: str
    weight: float

class MixtureOfExperts:
    def __init__(self, model: str = "gpt-4"):
        self.llm = ChatOpenAI(model=model)
        self.json_llm = ChatOpenAI(model=model).bind(
            response_format={"type": "json_object"}
        )
        self.experts: dict[str, str] = {}  # name -> system prompt

    def register_expert(self, name: str, system_prompt: str):
        self.experts[name] = system_prompt

    def process(self, task: str) -> str:
        # Route to experts
        routing = self._route(task)

        # Get expert outputs
        outputs = []
        for expert_name in routing.selected_experts:
            output = self._query_expert(expert_name, task)
            outputs.append(ExpertOutput(
                expert_name=expert_name,
                output=output,
                weight=routing.weights.get(expert_name, 1.0)
            ))

        # Combine outputs
        return self._combine(task, outputs, routing.combination_strategy)

    def _route(self, task: str) -> RoutingDecision:
        expert_list = "\n".join([
            f"- {name}: {prompt[:100]}..."
            for name, prompt in self.experts.items()
        ])

        prompt = ChatPromptTemplate.from_messages([
            ("system", f"""You are a routing agent. Select the best expert(s).

Available experts:
{expert_list}

Return JSON: {{"selected_experts": [...], "weights": {{}}, "combination_strategy": "..."}}"""),
            ("user", "Task: {task}")
        ])

        chain = prompt | self.json_llm | JsonOutputParser()
        data = chain.invoke({"task": task})
        return RoutingDecision(**data)

    def _query_expert(self, expert_name: str, task: str) -> str:
        system_prompt = self.experts.get(expert_name, "You are a helpful assistant.")
        prompt = ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            ("user", "{task}")
        ])
        chain = prompt | self.llm
        return chain.invoke({"task": task}).content

    def _combine(
        self, task: str, outputs: list[ExpertOutput], strategy: str
    ) -> str:
        if strategy == "BEST_OF":
            return max(outputs, key=lambda o: o.weight).output

        elif strategy == "WEIGHTED_MERGE":
            parts = [
                f"[{o.expert_name} ({o.weight:.0%})]: {o.output}"
                for o in sorted(outputs, key=lambda o: -o.weight)
            ]
            return "\n\n".join(parts)

        else:  # SYNTHESIZE
            outputs_text = "\n\n".join([
                f"Expert: {o.expert_name} (weight: {o.weight})\n{o.output}"
                for o in outputs
            ])
            prompt = ChatPromptTemplate.from_messages([
                ("system", "Synthesize these expert opinions into a coherent response."),
                ("user", "Task: {task}\n\nExpert outputs:\n{outputs}")
            ])
            chain = prompt | self.llm
            return chain.invoke({"task": task, "outputs": outputs_text}).content

# Usage
moe = MixtureOfExperts()

moe.register_expert("code_expert", "You are an expert programmer...")
moe.register_expert("data_expert", "You are a data scientist...")
moe.register_expert("writing_expert", "You are a technical writer...")

result = moe.process("Create a Python script that analyzes sales data and generates a report")
print(result)

Pattern Comparison

Pattern Best For Coordination Complexity
Supervisor Clear task decomposition Centralized Medium
Peer-to-Peer Loosely-coupled tasks Decentralized High
Debate Quality/accuracy critical Turn-based Medium
MoE Varied task types Router-based Medium
Sequential Pipeline workflows Linear Low
Hierarchical Large-scale systems Tree structure High

Framework Comparison

Several frameworks provide multi-agent orchestration out of the box:

Framework Implementations
# LangGraph: Graph-based orchestration
from langgraph.graph import StateGraph, END
from typing import TypedDict

class State(TypedDict):
    messages: list
    current_agent: str

def create_langgraph_workflow():
    workflow = StateGraph(State)

    # Add nodes for each agent
    workflow.add_node("researcher", researcher_node)
    workflow.add_node("writer", writer_node)
    workflow.add_node("reviewer", reviewer_node)

    # Add routing logic
    workflow.add_conditional_edges(
        "researcher",
        lambda s: "writer" if s["research_complete"] else "researcher"
    )
    workflow.add_edge("writer", "reviewer")
    workflow.add_conditional_edges(
        "reviewer",
        lambda s: END if s["approved"] else "writer"
    )

    workflow.set_entry_point("researcher")
    return workflow.compile()

# Key characteristics:
# - Explicit state management
# - Visual graph representation
# - Built-in persistence and streaming
# - Good for complex, stateful workflows
# LangGraph: Sequential multi-agent with handoffs
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated
from operator import add

class TeamState(TypedDict):
    task: str
    research: str
    draft: str
    feedback: str
    final: str
    messages: Annotated[list, add]

def create_sequential_team():
    llm = ChatOpenAI(model="gpt-4")

    def researcher_node(state: TeamState) -> dict:
        response = llm.invoke([
            ("system", "You are a research analyst. Research the topic thoroughly."),
            ("user", f"Research this: {state['task']}")
        ])
        return {"research": response.content}

    def writer_node(state: TeamState) -> dict:
        response = llm.invoke([
            ("system", "You are a technical writer. Write clear content."),
            ("user", f"Write about: {state['task']}\n\nResearch: {state['research']}")
        ])
        return {"draft": response.content}

    def reviewer_node(state: TeamState) -> dict:
        response = llm.invoke([
            ("system", "You are an editor. Review and provide feedback."),
            ("user", f"Review this draft: {state['draft']}")
        ])
        return {"feedback": response.content, "final": state["draft"]}

    # Build sequential workflow
    workflow = StateGraph(TeamState)
    workflow.add_node("researcher", researcher_node)
    workflow.add_node("writer", writer_node)
    workflow.add_node("reviewer", reviewer_node)

    workflow.set_entry_point("researcher")
    workflow.add_edge("researcher", "writer")
    workflow.add_edge("writer", "reviewer")
    workflow.add_edge("reviewer", END)

    return workflow.compile()

# Key characteristics:
# - Explicit handoffs between agents
# - Shared state across all agents
# - Clear sequential flow
# - Good for pipeline-style workflows
# LangGraph: Supervisor-based multi-agent
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Literal
import json

class SupervisorState(TypedDict):
    task: str
    messages: list
    next_agent: str | None

def create_supervisor_team():
    llm = ChatOpenAI(model="gpt-4")
    agents = ["researcher", "writer", "critic"]

    def supervisor_node(state: SupervisorState) -> dict:
        response = llm.invoke([
            ("system", f"""You are a supervisor managing: {agents}.
            Based on the task and conversation, decide:
            - Which agent should act next, OR
            - If task is complete, respond with "FINISH"
            Return JSON: {{"next": "agent_name" or "FINISH", "instruction": "..."}}"""),
            ("user", f"Task: {state['task']}\nHistory: {state['messages']}")
        ])
        decision = json.loads(response.content)
        return {"next_agent": decision["next"], "messages": state["messages"] + [decision]}

    def agent_node(agent_name: str):
        def node(state: SupervisorState) -> dict:
            instruction = state["messages"][-1].get("instruction", state["task"])
            response = llm.invoke([
                ("system", f"You are the {agent_name}. Complete your assigned task."),
                ("user", instruction)
            ])
            return {"messages": state["messages"] + [{"agent": agent_name, "response": response.content}]}
        return node

    def route(state: SupervisorState) -> Literal["researcher", "writer", "critic", "__end__"]:
        if state["next_agent"] == "FINISH":
            return END
        return state["next_agent"]

    workflow = StateGraph(SupervisorState)
    workflow.add_node("supervisor", supervisor_node)
    for agent in agents:
        workflow.add_node(agent, agent_node(agent))
        workflow.add_edge(agent, "supervisor")

    workflow.set_entry_point("supervisor")
    workflow.add_conditional_edges("supervisor", route)

    return workflow.compile()

# Key characteristics:
# - Dynamic agent selection
# - Supervisor controls workflow
# - Flexible task routing
# - Good for complex, adaptive workflows
// Microsoft Agent Framework: Multi-agent workflows
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
using OpenAI;

async Task CreateAgentFrameworkWorkflow()
{
    var chatClient = new OpenAIClient(apiKey)
        .GetChatClient("gpt-4o")
        .AsIChatClient();

    // Create specialized agents
    var researcher = chatClient.CreateAIAgent(
        name: "Researcher",
        instructions: "You research topics thoroughly..."
    );

    var writer = chatClient.CreateAIAgent(
        name: "Writer",
        instructions: "You write clear articles..."
    );

    var critic = chatClient.CreateAIAgent(
        name: "Critic",
        instructions: "You provide constructive feedback..."
    );

    // Sequential workflow with thread-based memory
    var researchThread = researcher.GetNewThread();
    var researchResult = await researcher.RunAsync(
        "Research AI agents and their applications",
        researchThread
    );

    var writerThread = writer.GetNewThread();
    var article = await writer.RunAsync(
        $"Write an article based on: {researchResult}",
        writerThread
    );

    var criticThread = critic.GetNewThread();
    var feedback = await critic.RunAsync(
        $"Review this article: {article}",
        criticThread
    );

    // Revision loop
    var finalArticle = await writer.RunAsync(
        $"Revise based on feedback: {feedback}",
        writerThread
    );

    Console.WriteLine(finalArticle);
}

// Key characteristics:
// - .NET native with Python parity
// - Thread-based conversation memory
// - Simple, intuitive API
// - Enterprise-ready (Azure integration)
Framework Language Paradigm Best For
LangGraph (Graph) Python Graph-based Complex stateful workflows
LangGraph (Sequential) Python Pipeline Linear multi-step workflows
LangGraph (Supervisor) Python Hierarchical Dynamic agent routing
Agent Framework C#/.NET Thread-based Enterprise applications

Framework comparison

Evaluation Metrics

Metric What it Measures How to Calculate
Task Completion Did the system solve the task? Binary or graded evaluation
Coordination Overhead Extra cost from multi-agent Total tokens / single-agent tokens
Latency Time to completion Wall clock time
Agent Utilization Are all agents contributing? Messages per agent
Redundancy Duplicate work across agents Semantic similarity of outputs

Key metrics for multi-agent systems

Common Pitfalls

Infinite Loops

Agents can get stuck passing tasks back and forth. Always implement maximum iteration limits.

Context Explosion

Each agent adds to context. Multi-agent conversations can quickly exceed context limits.

Role Confusion

Agents may not stay in their assigned roles. Use clear, distinct system prompts.

Premature Multi-Agent

Don't use multi-agent when single-agent suffices. Added complexity should have clear benefits.

Related Topics