Key Takeaway
By the end of this blueprint you will have a production-ready multi-agent system built on LangGraph where a Supervisor agent routes tasks to specialized Research, Code, and Review agents — each with their own tools, memory, and failure boundaries — all observable through LangSmith and deployable behind a FastAPI service.
Prerequisites
Prerequisites
- Python 3.11+ with a working knowledge of async/await and type hints
- Familiarity with LLM APIs (Anthropic or OpenAI) and the concept of tool/function calling
- Basic understanding of directed graphs (nodes, edges, cycles)
- A LangSmith account (free tier works) for tracing and evaluation
- Docker installed locally for the PostgreSQL checkpointer
- An Anthropic API key or OpenAI API key for LLM calls
Why Multi-Agent Architecture?
Single-agent systems hit a wall once the task surface area grows beyond what a single system prompt can handle cleanly. A monolithic agent that researches, writes code, reviews that code, runs tests, and summarizes results ends up with a sprawling prompt that confuses the LLM, burns excessive tokens re-reading irrelevant instructions, and makes failures hard to isolate. When a code-generation step fails, you have no clean way to retry just that step — you restart the entire chain.
Multi-agent architectures solve this by decomposing the workflow into specialized agents, each with a focused prompt and its own tool set. A Supervisor agent acts as the orchestrator: it reads the user request, decides which specialist to invoke next, and inspects each specialist's output before routing to the next step. This gives you three immediate wins: narrower prompts that produce better outputs, isolated failure domains with targeted retries, and the ability to swap or upgrade individual agents without touching the rest of the graph.
Multi-agent is not always the right call. If your task is linear and well-scoped (e.g., summarize a document, classify a ticket), a single agent or a simple chain is simpler and cheaper. Reach for multi-agent when you have branching logic, multiple tool domains, or steps that benefit from different system prompts.
Consider multi-agent when you see these signals: your single prompt is over 2,000 tokens of instructions; you need different tools for different phases of the workflow; you want human review at specific checkpoints; or you need to parallelize independent sub-tasks for latency reduction. This blueprint addresses all four scenarios.
Architecture Overview
The system is built on a LangGraph StateGraph where each node is an agent or a utility function, and edges define the routing logic between them. At the center sits the Supervisor — a lightweight LLM call whose sole job is to read the current state and decide which agent should act next (or whether the task is complete). Each specialist agent (Research, Code, Review) runs its own tool-calling loop, writes results back to shared state, and returns control to the Supervisor. A PostgreSQL-backed checkpointer persists graph state at every step so long-running workflows can survive process restarts.
Core Concepts
Agent State Management
LangGraph models state as a Python TypedDict that flows through every node in the graph. Each node receives the current state, performs its work, and returns a partial update that gets merged back. The key design decision is choosing your state schema carefully — it is the contract between all agents. Fields that use the Annotated type with a reducer function (like operator.add for lists) allow multiple agents to append to the same field without overwriting each other's results. This is how you accumulate messages, research findings, and code artifacts across the graph.
"""Agent state schema — the shared contract between all agents."""
from __future__ import annotations
import operator
from typing import Annotated, Literal, TypedDict
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
"""Shared state that flows through the entire graph.
Fields using Annotated[..., operator.add] are append-only —
multiple agents can write to them without overwriting each other.
"""
# The original user request (immutable after entry)
task: str
# Conversation history — appended to by every agent
messages: Annotated[list[BaseMessage], operator.add]
# Which agent should act next (set by supervisor)
next_agent: Literal["research", "code", "review", "FINISH"]
# Accumulated research findings
research_notes: Annotated[list[str], operator.add]
# Generated code artifacts
code_artifacts: Annotated[list[dict], operator.add]
# Review feedback items
review_comments: Annotated[list[str], operator.add]
# Iteration counter to prevent infinite loops
iteration_count: int
# Final synthesized response
final_response: strUse `Annotated[list[...], operator.add]` for any field that multiple agents write to. This prevents the classic bug where Agent B overwrites Agent A's output. The reducer merges updates by concatenation rather than replacement.
Message Passing Patterns
Agents communicate through the shared state, not by calling each other directly. When the Research Agent finishes, it appends its findings to research_notes and adds an AIMessage to messages summarizing what it found. The Supervisor reads these updates on its next invocation and decides whether to send the task to the Code Agent, ask for more research, or finalize the response. This decoupled message-passing pattern means you can add, remove, or reorder agents without changing any agent's internal logic — they only need to read from and write to agreed-upon state fields.
Conditional Edges and Routing
LangGraph's add_conditional_edges method is how you implement the Supervisor's routing decisions. After the Supervisor node runs, a routing function inspects the state's next_agent field and returns the name of the node to invoke next. If the Supervisor sets next_agent to "FINISH", the routing function returns the special END constant which terminates the graph. This gives you deterministic, inspectable routing — unlike chain-of-thought routing where the LLM implicitly decides what to do next inside a single prompt.
"""Routing logic for the supervisor's conditional edges."""
from langgraph.graph import END
from state import AgentState
def route_supervisor(state: AgentState) -> str:
"""Read the supervisor's routing decision from state.
Returns the node name to invoke next, or END to terminate.
"""
next_agent = state["next_agent"]
if next_agent == "FINISH":
return END
# Guard against infinite loops
if state.get("iteration_count", 0) >= 10:
return END
return next_agentStep-by-Step Implementation
Step 1: Project Setup
Start by creating a clean project with pinned dependencies. We will use langgraph for the orchestration graph, langchain-anthropic for Claude LLM calls (you can swap in langchain-openai if you prefer GPT-4), langsmith for tracing, and pydantic for structured tool outputs. The psycopg driver is needed for the PostgreSQL checkpointer that makes your graph resumable.
# Create project directory
mkdir multi-agent-system && cd multi-agent-system
python -m venv .venv && source .venv/bin/activate
# Install core dependencies
pip install \
"langgraph>=0.2.60" \
"langchain-core>=0.3.30" \
"langchain-anthropic>=0.3.12" \
"langsmith>=0.2.10" \
"pydantic>=2.10" \
"psycopg[binary]>=3.2" \
"python-dotenv>=1.0" \
"tavily-python>=0.5"
# Create .env file
cat <<'DOTENV' > .env
ANTHROPIC_API_KEY=sk-ant-...
LANGSMITH_API_KEY=lsv2_...
LANGSMITH_PROJECT=multi-agent-blueprint
LANGSMITH_TRACING=true
TAVILY_API_KEY=tvly-...
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/agents
DOTENV# Start PostgreSQL for the checkpointer (Docker)
docker run -d \
--name agent-postgres \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=agents \
-p 5432:5432 \
postgres:16-alpinePin your langchain ecosystem packages to compatible minor versions. The LangChain ecosystem moves fast — a mismatch between langchain-core and langgraph versions is the most common source of import errors in new projects.
Step 2: Define the Agent State Schema
We already previewed the state schema in the Core Concepts section. Here is the production version with additional fields for error tracking and human-in-the-loop support. The human_feedback field is used by the checkpoint interrupt mechanism — when the graph pauses for human review, the reviewer's feedback is written here before resuming.
"""Production agent state with error tracking and HITL support."""
from __future__ import annotations
import operator
from dataclasses import dataclass, field
from typing import Annotated, Literal, TypedDict
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
"""Shared state flowing through the multi-agent graph."""
# --- Input ---
task: str
# --- Message history (append-only) ---
messages: Annotated[list[BaseMessage], operator.add]
# --- Routing ---
next_agent: Literal["research", "code", "review", "FINISH"]
# --- Agent outputs (append-only) ---
research_notes: Annotated[list[str], operator.add]
code_artifacts: Annotated[list[dict], operator.add]
review_comments: Annotated[list[str], operator.add]
# --- Control flow ---
iteration_count: int
max_iterations: int # configurable safety limit
# --- Error tracking ---
errors: Annotated[list[dict], operator.add]
# --- Human-in-the-loop ---
human_feedback: str | None
# --- Final output ---
final_response: strStep 3: Build the Supervisor Agent
The Supervisor is the brain of the system. It receives the full state on every invocation, reads what has happened so far, and decides which agent should act next. The key insight is that the Supervisor should be a lightweight, fast LLM call — it does not do substantive work itself. Its system prompt is short and focused on routing decisions. We use structured output with Pydantic to force the Supervisor to return a clean routing decision rather than free-form text.
"""Supervisor agent — routes tasks to specialist agents."""
from __future__ import annotations
import logging
from typing import Literal
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage
from pydantic import BaseModel, Field
from state import AgentState
logger = logging.getLogger(__name__)
# ---- Structured output for routing decisions ----
class RoutingDecision(BaseModel):
"""The supervisor's routing decision."""
next_agent: Literal["research", "code", "review", "FINISH"] = Field(
description=(
"Which agent to invoke next. Use 'research' for information "
"gathering, 'code' for code generation or modification, "
"'review' for quality checks, or 'FINISH' when the task "
"is fully complete."
)
)
reasoning: str = Field(
description="Brief explanation of why this agent was chosen."
)
SUPERVISOR_SYSTEM_PROMPT = """You are a task supervisor managing a team of specialist agents.
Your team:
- **research**: Searches the web, retrieves documents, gathers facts. Use when the task requires external information.
- **code**: Writes, refactors, or debugs code. Use when the task requires code generation or modification.
- **review**: Reviews code for bugs, security issues, and best practices. Use after code generation.
Current state summary:
- Research notes collected: {research_count}
- Code artifacts generated: {code_count}
- Review comments: {review_count}
- Iteration: {iteration} / {max_iterations}
Rules:
1. Always research before coding if the task requires external knowledge.
2. Always review after code generation.
3. If review feedback requires changes, route back to code.
4. Set next_agent to FINISH only when the task is fully addressed.
5. Never exceed max_iterations — set FINISH if you are close to the limit."""
def create_supervisor(model_name: str = "claude-sonnet-4-20250514") -> ChatAnthropic:
"""Create the supervisor LLM with structured output."""
llm = ChatAnthropic(
model=model_name,
temperature=0,
max_tokens=1024,
)
return llm.with_structured_output(RoutingDecision)
def supervisor_node(state: AgentState) -> dict:
"""Supervisor node — reads state, returns routing decision."""
llm = create_supervisor()
system_msg = SUPERVISOR_SYSTEM_PROMPT.format(
research_count=len(state.get("research_notes", [])),
code_count=len(state.get("code_artifacts", [])),
review_count=len(state.get("review_comments", [])),
iteration=state.get("iteration_count", 0),
max_iterations=state.get("max_iterations", 10),
)
messages = [
SystemMessage(content=system_msg),
*state["messages"],
HumanMessage(content=f"Task: {state['task']}\n\nDecide the next step."),
]
try:
decision: RoutingDecision = llm.invoke(messages)
logger.info(
"Supervisor routed to %s: %s",
decision.next_agent,
decision.reasoning,
)
return {
"next_agent": decision.next_agent,
"iteration_count": state.get("iteration_count", 0) + 1,
}
except Exception as exc:
logger.exception("Supervisor failed")
return {
"next_agent": "FINISH",
"errors": [{"agent": "supervisor", "error": str(exc)}],
}Always set a max_iterations guard on the Supervisor. Without it, a confused LLM can loop between agents indefinitely, burning tokens. Ten iterations is a reasonable default — most workflows complete in three to five.
Step 4: Build Specialist Worker Agents
Each specialist agent follows the same pattern: it receives the state, constructs a focused prompt, calls the LLM with its tool set, and writes results back to the appropriate state fields. The critical difference between agents is their system prompt and available tools. By keeping each agent's prompt under 500 tokens and giving it only the tools it needs, you get sharper outputs and lower costs than a monolithic approach.
Research Agent
"""Research agent — gathers information from external sources."""
from __future__ import annotations
import logging
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import AIMessage, SystemMessage
from langchain_core.tools import tool
from tavily import TavilyClient
from state import AgentState
logger = logging.getLogger(__name__)
# ---- Tools ----
@tool
def web_search(query: str) -> str:
"""Search the web for current information on a topic.
Args:
query: The search query string.
Returns:
A summary of the top search results.
"""
client = TavilyClient()
results = client.search(query=query, max_results=5)
formatted = []
for r in results.get("results", []):
formatted.append(f"**{r['title']}**\n{r['content']}\nSource: {r['url']}")
return "\n\n---\n\n".join(formatted) if formatted else "No results found."
@tool
def extract_url_content(url: str) -> str:
"""Extract and summarize content from a specific URL.
Args:
url: The URL to extract content from.
Returns:
The extracted text content.
"""
client = TavilyClient()
result = client.extract(urls=[url])
if result.get("results"):
return result["results"][0].get("raw_content", "")[:4000]
return "Could not extract content from URL."
RESEARCH_TOOLS = [web_search, extract_url_content]
RESEARCH_SYSTEM_PROMPT = """You are a research specialist. Your job is to gather accurate, relevant information for the task at hand.
Guidelines:
1. Use web_search to find current information.
2. Use extract_url_content to get details from specific pages.
3. Cite your sources — include URLs in your notes.
4. Focus on facts, not opinions.
5. Summarize findings concisely — other agents will use your notes.
Provide your findings as clear, structured notes."""
def create_research_agent() -> ChatAnthropic:
"""Create the research agent LLM with tools bound."""
llm = ChatAnthropic(
model="claude-sonnet-4-20250514",
temperature=0,
max_tokens=4096,
)
return llm.bind_tools(RESEARCH_TOOLS)
def research_node(state: AgentState) -> dict:
"""Research agent node — gathers information and writes notes."""
llm = create_research_agent()
messages = [
SystemMessage(content=RESEARCH_SYSTEM_PROMPT),
*state["messages"][-10:], # Limit context window
]
try:
response = llm.invoke(messages)
# Handle tool calls via a simple loop
while response.tool_calls:
tool_results = []
for tc in response.tool_calls:
tool_fn = {t.name: t for t in RESEARCH_TOOLS}[tc["name"]]
result = tool_fn.invoke(tc["args"])
tool_results.append(
{"role": "tool", "content": str(result), "tool_call_id": tc["id"]}
)
response = llm.invoke(messages + [response] + tool_results)
findings = response.content if isinstance(response.content, str) else str(response.content)
logger.info("Research agent produced %d chars of notes", len(findings))
return {
"research_notes": [findings],
"messages": [AIMessage(content=f"[Research] {findings[:500]}...")],
}
except Exception as exc:
logger.exception("Research agent failed")
return {
"errors": [{"agent": "research", "error": str(exc)}],
"messages": [AIMessage(content=f"[Research] Error: {exc}")],
}Code Agent
"""Code agent — generates, refactors, and debugs code."""
from __future__ import annotations
import logging
import subprocess
import tempfile
from pathlib import Path
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import AIMessage, SystemMessage
from langchain_core.tools import tool
from state import AgentState
logger = logging.getLogger(__name__)
# ---- Tools ----
@tool
def execute_python(code: str) -> str:
"""Execute Python code in a sandboxed subprocess and return output.
Args:
code: Python code to execute.
Returns:
stdout and stderr from execution.
"""
with tempfile.NamedTemporaryFile(
mode="w", suffix=".py", delete=False
) as f:
f.write(code)
f.flush()
try:
result = subprocess.run(
["python", f.name],
capture_output=True,
text=True,
timeout=30,
cwd=tempfile.gettempdir(),
)
output = result.stdout
if result.stderr:
output += f"\n\nSTDERR:\n{result.stderr}"
return output[:4000] if output else "(no output)"
except subprocess.TimeoutExpired:
return "ERROR: Execution timed out after 30 seconds."
finally:
Path(f.name).unlink(missing_ok=True)
@tool
def lint_python(code: str) -> str:
"""Run basic Python syntax checking on the provided code.
Args:
code: Python code to check.
Returns:
Any syntax errors found, or 'No issues found.'
"""
try:
compile(code, "<lint>", "exec")
return "No syntax errors found."
except SyntaxError as e:
return f"SyntaxError at line {e.lineno}: {e.msg}"
CODE_TOOLS = [execute_python, lint_python]
CODE_SYSTEM_PROMPT = """You are a senior software engineer. Your job is to write clean, production-grade Python code.
Guidelines:
1. Write well-typed code with docstrings and type hints.
2. Include error handling — never let exceptions propagate silently.
3. Use the execute_python tool to test your code.
4. Use the lint_python tool to check for syntax errors before finalizing.
5. If research notes are available, incorporate those findings.
6. Return code as a structured artifact with filename and content.
Format your final code artifact as:
FILENAME: [filename.py]
[code content]"""
def create_code_agent() -> ChatAnthropic:
"""Create the code agent LLM with tools bound."""
llm = ChatAnthropic(
model="claude-sonnet-4-20250514",
temperature=0,
max_tokens=8192,
)
return llm.bind_tools(CODE_TOOLS)
def code_node(state: AgentState) -> dict:
"""Code agent node — generates code based on task and research."""
llm = create_code_agent()
# Build context from research notes
research_ctx = ""
if state.get("research_notes"):
research_ctx = "\n\nResearch findings:\n" + "\n".join(
state["research_notes"][-3:]
)
# Include review feedback if this is a revision cycle
review_ctx = ""
if state.get("review_comments"):
review_ctx = "\n\nReview feedback to address:\n" + "\n".join(
state["review_comments"][-3:]
)
messages = [
SystemMessage(content=CODE_SYSTEM_PROMPT),
*state["messages"][-10:],
]
if research_ctx:
messages.append(SystemMessage(content=research_ctx))
if review_ctx:
messages.append(SystemMessage(content=review_ctx))
try:
response = llm.invoke(messages)
# Handle tool calls
while response.tool_calls:
tool_results = []
for tc in response.tool_calls:
tool_fn = {t.name: t for t in CODE_TOOLS}[tc["name"]]
result = tool_fn.invoke(tc["args"])
tool_results.append(
{"role": "tool", "content": str(result), "tool_call_id": tc["id"]}
)
response = llm.invoke(messages + [response] + tool_results)
content = response.content if isinstance(response.content, str) else str(response.content)
artifact = {
"content": content,
"iteration": state.get("iteration_count", 0),
}
logger.info("Code agent produced artifact at iteration %d", artifact["iteration"])
return {
"code_artifacts": [artifact],
"messages": [AIMessage(content=f"[Code] Generated code artifact (iteration {artifact['iteration']})")],
}
except Exception as exc:
logger.exception("Code agent failed")
return {
"errors": [{"agent": "code", "error": str(exc)}],
"messages": [AIMessage(content=f"[Code] Error: {exc}")],
}Review Agent
"""Review agent — examines code for quality, bugs, and security issues."""
from __future__ import annotations
import logging
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import AIMessage, SystemMessage
from state import AgentState
logger = logging.getLogger(__name__)
REVIEW_SYSTEM_PROMPT = """You are a senior code reviewer. Your job is to evaluate code for quality, correctness, security, and adherence to best practices.
Review criteria:
1. **Correctness**: Does the code do what the task requires?
2. **Error handling**: Are edge cases covered? Are exceptions handled?
3. **Type safety**: Are type hints present and accurate?
4. **Security**: Any injection risks, hardcoded secrets, or unsafe operations?
5. **Performance**: Any obvious inefficiencies or N+1 patterns?
Output format:
- Start with APPROVED or NEEDS_REVISION
- List specific issues with line references
- Suggest concrete fixes for each issue
- Rate severity: CRITICAL / MAJOR / MINOR"""
def create_review_agent() -> ChatAnthropic:
"""Create the review agent LLM (no tools needed)."""
return ChatAnthropic(
model="claude-sonnet-4-20250514",
temperature=0,
max_tokens=4096,
)
def review_node(state: AgentState) -> dict:
"""Review agent node — examines the latest code artifact."""
llm = create_review_agent()
# Get the latest code artifact
artifacts = state.get("code_artifacts", [])
if not artifacts:
return {
"review_comments": ["No code artifacts to review."],
"messages": [AIMessage(content="[Review] No code to review.")],
}
latest_code = artifacts[-1].get("content", "")
messages = [
SystemMessage(content=REVIEW_SYSTEM_PROMPT),
*state["messages"][-5:],
SystemMessage(content=f"Code to review:\n\n{latest_code}"),
]
try:
response = llm.invoke(messages)
review = response.content if isinstance(response.content, str) else str(response.content)
logger.info("Review agent verdict: %s", review[:50])
return {
"review_comments": [review],
"messages": [AIMessage(content=f"[Review] {review[:300]}...")],
}
except Exception as exc:
logger.exception("Review agent failed")
return {
"errors": [{"agent": "review", "error": str(exc)}],
"messages": [AIMessage(content=f"[Review] Error: {exc}")],
}Step 5: Wire Up the Graph
Now we assemble all the pieces into a LangGraph StateGraph. The graph starts at the Supervisor node, which routes to one of the three specialist agents via conditional edges. Each specialist returns to the Supervisor after completing its work. The Supervisor inspects the updated state and either routes to the next agent or terminates by setting next_agent to FINISH. The compile() step validates the graph structure and returns a runnable that can be invoked like any LangChain runnable.
"""Assemble the multi-agent graph with conditional routing."""
from __future__ import annotations
import logging
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import END, StateGraph
from agents.code import code_node
from agents.research import research_node
from agents.review import review_node
from state import AgentState
from supervisor import supervisor_node
logger = logging.getLogger(__name__)
def route_supervisor(state: AgentState) -> str:
"""Route based on the supervisor's decision."""
next_agent = state.get("next_agent", "FINISH")
if next_agent == "FINISH":
return END
# Safety: enforce max iterations
if state.get("iteration_count", 0) >= state.get("max_iterations", 10):
logger.warning("Max iterations reached — forcing FINISH")
return END
return next_agent
def build_graph(checkpointer=None) -> StateGraph:
"""Build and compile the multi-agent orchestration graph.
Args:
checkpointer: Optional LangGraph checkpointer for persistence.
Returns:
A compiled StateGraph ready for invocation.
"""
graph = StateGraph(AgentState)
# ---- Add nodes ----
graph.add_node("supervisor", supervisor_node)
graph.add_node("research", research_node)
graph.add_node("code", code_node)
graph.add_node("review", review_node)
# ---- Set entry point ----
graph.set_entry_point("supervisor")
# ---- Add conditional edges from supervisor ----
graph.add_conditional_edges(
"supervisor",
route_supervisor,
{
"research": "research",
"code": "code",
"review": "review",
END: END,
},
)
# ---- All specialists return to supervisor ----
graph.add_edge("research", "supervisor")
graph.add_edge("code", "supervisor")
graph.add_edge("review", "supervisor")
# ---- Compile with optional checkpointer ----
compiled = graph.compile(checkpointer=checkpointer)
logger.info("Multi-agent graph compiled with %d nodes", len(graph.nodes))
return compiled
def create_graph_with_persistence(db_url: str):
"""Create a graph backed by PostgreSQL checkpointing.
Args:
db_url: PostgreSQL connection string.
Returns:
A compiled graph with persistence enabled.
"""
checkpointer = PostgresSaver.from_conn_string(db_url)
checkpointer.setup() # Create tables if they don't exist
return build_graph(checkpointer=checkpointer)The routing map passed to `add_conditional_edges` serves as documentation and a safety net. LangGraph validates that every possible return value from your routing function has a corresponding entry in the map. If your routing function returns a string that is not in the map, you get a clear error at compile time rather than a silent failure at runtime.
Step 6: Add Human-in-the-Loop Checkpoints
For high-stakes workflows, you want a human to review the Supervisor's routing decisions or the Code Agent's output before the graph continues. LangGraph supports this through the interrupt_before and interrupt_after parameters on the compiled graph. When the graph hits an interrupt, it persists its state to the checkpointer and returns a partial result. Your application can then show the state to a human reviewer, collect their feedback, and resume the graph from exactly where it paused.
"""Graph with human-in-the-loop interrupts before code execution."""
from __future__ import annotations
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import END, StateGraph
from agents.code import code_node
from agents.research import research_node
from agents.review import review_node
from state import AgentState
from supervisor import supervisor_node, route_supervisor
def build_graph_with_hitl(db_url: str):
"""Build graph that pauses for human review before code generation.
The graph interrupts before the 'code' node, allowing a human
to review the research findings and approve code generation.
"""
graph = StateGraph(AgentState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("research", research_node)
graph.add_node("code", code_node)
graph.add_node("review", review_node)
graph.set_entry_point("supervisor")
graph.add_conditional_edges(
"supervisor",
route_supervisor,
{"research": "research", "code": "code", "review": "review", END: END},
)
graph.add_edge("research", "supervisor")
graph.add_edge("code", "supervisor")
graph.add_edge("review", "supervisor")
checkpointer = PostgresSaver.from_conn_string(db_url)
checkpointer.setup()
# Interrupt BEFORE the code node — human must approve
compiled = graph.compile(
checkpointer=checkpointer,
interrupt_before=["code"],
)
return compiled
# ---- Usage: running with human-in-the-loop ----
async def run_with_human_review(graph, task: str, thread_id: str):
"""Run the graph, pausing for human review before code generation.
This function demonstrates the interrupt/resume pattern.
"""
config = {"configurable": {"thread_id": thread_id}}
initial_state = {
"task": task,
"messages": [],
"next_agent": "research",
"research_notes": [],
"code_artifacts": [],
"review_comments": [],
"iteration_count": 0,
"max_iterations": 10,
"errors": [],
"human_feedback": None,
"final_response": "",
}
# Run until interrupt
result = None
async for event in graph.astream(initial_state, config):
result = event
# Graph will pause here when it reaches the 'code' node
# At this point, the graph is paused.
# In production, you would send the state to a UI for review.
state = await graph.aget_state(config)
print(f"Graph paused. Research notes: {state.values.get('research_notes', [])}")
# Simulate human approval and resume
await graph.aupdate_state(
config,
{"human_feedback": "Approved. Proceed with code generation."},
)
# Resume from the interrupt point
async for event in graph.astream(None, config):
result = event
return resultUse `interrupt_before` for approval gates ("should we proceed?") and `interrupt_after` for review gates ("was this output acceptable?"). You can interrupt at multiple points in the same graph — for example, interrupt after research for fact-checking and before code for scope approval.
Step 7: Run the Graph End-to-End
With all agents and the graph defined, here is the entry point that ties everything together. This module creates the graph, invokes it with an initial state, and streams events so you can observe progress in real time. The thread_id in the config is what enables the checkpointer to track separate workflow instances — each user session or API request should use a unique thread ID.
"""Entry point — run the multi-agent system."""
from __future__ import annotations
import asyncio
import logging
import os
import uuid
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage
from graph import create_graph_with_persistence
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
logger = logging.getLogger(__name__)
async def run_task(task: str) -> dict:
"""Run a task through the multi-agent system.
Args:
task: The user's task description.
Returns:
The final graph state with all agent outputs.
"""
db_url = os.environ["DATABASE_URL"]
graph = create_graph_with_persistence(db_url)
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
initial_state = {
"task": task,
"messages": [HumanMessage(content=task)],
"next_agent": "research",
"research_notes": [],
"code_artifacts": [],
"review_comments": [],
"iteration_count": 0,
"max_iterations": 10,
"errors": [],
"human_feedback": None,
"final_response": "",
}
logger.info("Starting task: %s (thread=%s)", task[:80], thread_id)
final_state = None
async for event in graph.astream(initial_state, config, stream_mode="values"):
agent = event.get("next_agent", "unknown")
iteration = event.get("iteration_count", 0)
logger.info("Step %d — next agent: %s", iteration, agent)
final_state = event
logger.info(
"Task complete. Research notes: %d, Code artifacts: %d, Reviews: %d, Errors: %d",
len(final_state.get("research_notes", [])),
len(final_state.get("code_artifacts", [])),
len(final_state.get("review_comments", [])),
len(final_state.get("errors", [])),
)
return final_state
if __name__ == "__main__":
result = asyncio.run(
run_task(
"Research best practices for rate limiting in Python APIs, "
"then write a production-grade rate limiter using a "
"token bucket algorithm with Redis. Include tests."
)
)
print("\n" + "=" * 60)
print("FINAL CODE ARTIFACTS:")
for artifact in result.get("code_artifacts", []):
print(artifact.get("content", "")[:2000])LangSmith Integration
Multi-agent systems are notoriously hard to debug without observability. When the output is wrong, you need to know which agent made a bad decision, what state it saw, and how the Supervisor routed the workflow. LangSmith gives you distributed tracing for every LLM call, tool invocation, and state transition in your graph. With the environment variables from Step 1 already set (LANGSMITH_TRACING=true), all LangGraph runs are automatically traced. Here is how to add custom metadata, evaluations, and cost tracking.
"""LangSmith tracing, evaluation, and cost tracking."""
from __future__ import annotations
import functools
from typing import Any
from langsmith import Client, traceable
from langsmith.evaluation import evaluate
from langsmith.schemas import Example, Run
# ---- Custom tracing with metadata ----
@traceable(
name="multi-agent-run",
tags=["production", "multi-agent"],
metadata={"version": "1.0", "graph": "supervisor-workers"},
)
async def traced_run(graph, initial_state: dict, config: dict) -> dict:
"""Run the graph with full LangSmith tracing.
The @traceable decorator captures this as a top-level trace span.
All nested LLM calls and tool invocations appear as child spans.
"""
final = None
async for event in graph.astream(initial_state, config, stream_mode="values"):
final = event
return final
# ---- Custom evaluator for multi-agent outputs ----
def task_completion_evaluator(run: Run, example: Example) -> dict:
"""Evaluate whether the multi-agent system completed the task.
This evaluator checks:
1. Did the system produce code artifacts?
2. Did the review pass (APPROVED)?
3. Were there errors?
"""
outputs = run.outputs or {}
code_artifacts = outputs.get("code_artifacts", [])
review_comments = outputs.get("review_comments", [])
errors = outputs.get("errors", [])
has_code = len(code_artifacts) > 0
review_passed = any("APPROVED" in str(c) for c in review_comments)
no_errors = len(errors) == 0
score = sum([has_code, review_passed, no_errors]) / 3.0
return {
"key": "task_completion",
"score": score,
"comment": (
f"Code: {'yes' if has_code else 'no'}, "
f"Review: {'passed' if review_passed else 'failed'}, "
f"Errors: {len(errors)}"
),
}
# ---- Run evaluation against a dataset ----
def run_evaluation(dataset_name: str = "multi-agent-tasks"):
"""Evaluate the multi-agent system against a LangSmith dataset.
Prerequisites:
- Create a dataset in LangSmith with input/output examples
- Each example should have an 'input' with a 'task' field
"""
client = Client()
results = evaluate(
lambda inputs: {"task": inputs["task"]},
data=dataset_name,
evaluators=[task_completion_evaluator],
experiment_prefix="multi-agent-v1",
max_concurrency=2,
)
print(f"Evaluation results: {results}")
return resultsCreate a LangSmith dataset with 10-20 representative tasks and run evaluations after every change to your agent prompts or routing logic. This catches regressions that unit tests cannot — like a prompt change that makes the Supervisor skip the Review agent for certain task types.
Production Deployment
FastAPI Service Layer
Wrap the graph in a FastAPI application that provides async HTTP endpoints, request validation, and streaming responses. Each incoming request gets a unique thread ID and runs through the graph asynchronously. The /run endpoint returns the final result, while the /stream endpoint uses server-sent events to stream intermediate state updates to the client.
"""FastAPI service layer for the multi-agent system."""
from __future__ import annotations
import os
import uuid
import logging
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage
from pydantic import BaseModel, Field
from graph import create_graph_with_persistence
load_dotenv()
logger = logging.getLogger(__name__)
app = FastAPI(
title="Multi-Agent Orchestration API",
version="1.0.0",
)
# Initialize graph once at startup
_graph = None
def get_graph():
global _graph
if _graph is None:
db_url = os.environ["DATABASE_URL"]
_graph = create_graph_with_persistence(db_url)
return _graph
class TaskRequest(BaseModel):
"""Incoming task request."""
task: str = Field(min_length=10, max_length=5000)
max_iterations: int = Field(default=10, ge=1, le=25)
class TaskResponse(BaseModel):
"""Task completion response."""
thread_id: str
research_notes: list[str]
code_artifacts: list[dict]
review_comments: list[str]
errors: list[dict]
iterations: int
@app.post("/run", response_model=TaskResponse)
async def run_task(request: TaskRequest):
"""Run a task through the multi-agent system."""
graph = get_graph()
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
initial_state = {
"task": request.task,
"messages": [HumanMessage(content=request.task)],
"next_agent": "research",
"research_notes": [],
"code_artifacts": [],
"review_comments": [],
"iteration_count": 0,
"max_iterations": request.max_iterations,
"errors": [],
"human_feedback": None,
"final_response": "",
}
try:
final = None
async for event in graph.astream(initial_state, config, stream_mode="values"):
final = event
if final is None:
raise HTTPException(status_code=500, detail="Graph produced no output")
return TaskResponse(
thread_id=thread_id,
research_notes=final.get("research_notes", []),
code_artifacts=final.get("code_artifacts", []),
review_comments=final.get("review_comments", []),
errors=final.get("errors", []),
iterations=final.get("iteration_count", 0),
)
except Exception as exc:
logger.exception("Task execution failed")
raise HTTPException(status_code=500, detail=str(exc)) from exc
@app.get("/health")
async def health():
return {"status": "ok"}Deployment Patterns
Multi-agent systems have different deployment requirements than typical web services because graph executions can run for minutes, not milliseconds. The checkpointer enables a decoupled architecture where a lightweight API server accepts requests and a pool of worker processes execute the graphs. Here are the primary deployment options, ordered by operational complexity.
| Pattern | Best For | Latency | Complexity | Cost |
|---|---|---|---|---|
| Single Process (FastAPI + Uvicorn) | Development, low-traffic internal tools | Variable (seconds to minutes) | Low | Low — single VM or container |
| Task Queue (Celery / Dramatiq) | Background processing, async workflows | Higher (queue overhead) | Medium | Medium — queue infra + workers |
| Container Orchestration (ECS / Kubernetes) | Production multi-tenant, auto-scaling | Low (dedicated workers) | High | Variable — scales with demand |
| LangGraph Cloud | Managed deployment with built-in persistence | Low | Low (managed) | Usage-based pricing |
Error Handling and Retry Patterns
Agent failures are inevitable in production — LLM APIs have rate limits, tool calls time out, and models occasionally return malformed output. The key principle is to contain failures at the agent level. Each agent catches its own exceptions and writes them to the shared errors list in state. The Supervisor reads these errors and can decide to retry the failed agent, route to a different agent, or terminate gracefully. This is far more resilient than letting exceptions bubble up and kill the entire graph execution.
"""Retry wrapper for agent nodes with exponential backoff."""
from __future__ import annotations
import asyncio
import functools
import logging
from typing import Callable
logger = logging.getLogger(__name__)
def with_retry(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
retryable_exceptions: tuple = (Exception,),
):
"""Decorator that adds retry logic to agent node functions.
Uses exponential backoff with jitter to avoid thundering herd.
Args:
max_retries: Maximum number of retry attempts.
base_delay: Initial delay in seconds.
max_delay: Maximum delay between retries.
retryable_exceptions: Tuple of exception types to retry on.
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_exc = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
except retryable_exceptions as exc:
last_exc = exc
if attempt < max_retries:
delay = min(base_delay * (2 ** attempt), max_delay)
logger.warning(
"Attempt %d/%d for %s failed: %s. Retrying in %.1fs",
attempt + 1,
max_retries + 1,
func.__name__,
exc,
delay,
)
await asyncio.sleep(delay)
else:
logger.error(
"All %d attempts for %s exhausted",
max_retries + 1,
func.__name__,
)
raise last_exc # type: ignore[misc]
return wrapper
return decoratorNever retry blindly on all exceptions. Rate limit errors (HTTP 429) should be retried with backoff, but a 400 Bad Request from the LLM API means your input is malformed — retrying will just waste tokens. Check the exception type before deciding to retry.
Cost Optimization
Multi-agent systems can be expensive because every routing decision and every specialist invocation is an LLM call. Here are concrete strategies to keep costs manageable without sacrificing quality.
- 1
Use a smaller model for the Supervisor
The Supervisor makes routing decisions, not substantive judgments. Claude Haiku or GPT-4o-mini can route as effectively as larger models at a fraction of the cost. Reserve your frontier model for specialist agents that need reasoning power.
- 2
Limit context window per agent
Pass only the last 5-10 messages to each agent, not the full conversation history. Each agent has a focused task and does not need to re-read the entire workflow. This reduces input token costs significantly on multi-iteration runs.
- 3
Cache tool results
If the Research Agent searches for the same query twice (common in revision cycles), cache the results. A simple in-memory dict keyed by the search query can eliminate redundant API calls to Tavily or your search provider.
- 4
Short-circuit when possible
If the task is simple enough (e.g., a straightforward code generation with no research needed), let the Supervisor skip agents entirely. This requires a well-crafted Supervisor prompt that recognizes trivial tasks.
- 5
Set token budgets per agent
Use the `max_tokens` parameter on each agent's LLM to cap output length. The Research Agent rarely needs more than 2,000 tokens. The Code Agent may need 8,000. The Supervisor should never exceed 512.
Production Checklist
Pre-Launch
Monitoring
Security
Performance
Common Pitfalls
Infinite routing loops: Without a max iteration guard, the Supervisor can get stuck routing between the Code and Review agents indefinitely — the Review says "needs changes" and the Code agent makes changes that the Review still rejects. Always enforce a hard iteration limit and add a "good enough" exit condition to the Supervisor prompt.
State bloat: Every message and artifact appended to state persists in the checkpointer and is passed to subsequent nodes. On long-running workflows with many iterations, the state can grow to hundreds of kilobytes, increasing LLM input costs and checkpointer write latency. Implement a message pruning strategy — summarize old messages or keep only the last N per agent.
Tool output explosion: If a search tool returns large raw HTML or a code execution tool produces verbose output, it inflates the agent's context window. Always truncate tool outputs to a reasonable length (2,000-4,000 characters) and summarize rather than pass raw results to the LLM.
Missing error isolation: The most common production failure is an unhandled exception in one agent crashing the entire graph. Every agent node must have a try/except that captures the error, writes it to the shared errors list, and returns a valid state update. The Supervisor then handles the failure gracefully.
Over-engineering the Supervisor: The Supervisor prompt should be short and focused on routing. If you add detailed instructions about how each agent should behave, the Supervisor starts trying to do the agents' jobs itself. Keep the Supervisor under 500 tokens of instructions and let the specialist agents own their domains.
Cost Analysis
The cost of a multi-agent run depends on the number of routing iterations, the models used for each agent, and the volume of tool calls. Below are representative cost ranges for a typical task that involves one research step, one code generation step, and one review step (three iterations through the Supervisor). Costs scale linearly with iteration count. These figures are based on published Anthropic pricing as of early 2026 and will vary with your specific usage patterns.
$0.02 - $0.05
Per-run cost (3 iterations)
Using Claude Haiku for Supervisor, Claude Sonnet for specialists
$0.08 - $0.15
Per-run cost (8 iterations)
Complex tasks with research-code-review-revise cycles
3-5 sec
Supervisor routing latency
Time for the Supervisor to read state and make a routing decision
30-120 sec
Full run wall-clock time
End-to-end time including all agent calls and tool executions
Testing Your Multi-Agent System
Testing multi-agent systems requires a layered approach. Unit tests verify individual agent nodes in isolation with mocked LLM responses. Integration tests run the full graph against a real LLM but with a small, predictable task. Evaluation tests use a LangSmith dataset to measure quality across a range of tasks and catch regressions. The combination of all three gives you confidence that changes to any single agent do not break the system as a whole.
"""Integration tests for the multi-agent graph."""
from __future__ import annotations
import asyncio
import pytest
from unittest.mock import patch, MagicMock
from langchain_core.messages import HumanMessage, AIMessage
from state import AgentState
from graph import build_graph
@pytest.fixture
def graph():
"""Create an in-memory graph (no checkpointer) for testing."""
return build_graph(checkpointer=None)
@pytest.fixture
def initial_state() -> AgentState:
"""Create a minimal initial state for testing."""
return {
"task": "Write a Python function that reverses a string",
"messages": [HumanMessage(content="Write a Python function that reverses a string")],
"next_agent": "code",
"research_notes": [],
"code_artifacts": [],
"review_comments": [],
"iteration_count": 0,
"max_iterations": 5,
"errors": [],
"human_feedback": None,
"final_response": "",
}
def test_graph_compiles(graph):
"""Verify the graph compiles without errors."""
assert graph is not None
def test_graph_has_expected_nodes(graph):
"""Verify all expected nodes are in the graph."""
# Access the underlying graph structure
node_names = {"supervisor", "research", "code", "review"}
# The compiled graph should be invocable
assert callable(graph.invoke)
def test_max_iterations_enforced(graph, initial_state):
"""Verify the graph terminates at max_iterations."""
initial_state["max_iterations"] = 2
initial_state["iteration_count"] = 2
# With iteration_count already at max, supervisor
# should route to END
from graph import route_supervisor
result = route_supervisor(initial_state)
from langgraph.graph import END
assert result == END
def test_route_supervisor_returns_agent():
"""Test routing function returns correct agent names."""
from graph import route_supervisor
state = {"next_agent": "research", "iteration_count": 0, "max_iterations": 10}
assert route_supervisor(state) == "research"
state["next_agent"] = "code"
assert route_supervisor(state) == "code"
state["next_agent"] = "FINISH"
from langgraph.graph import END
assert route_supervisor(state) == ENDFinal Project Structure
multi-agent-system/
├── .env # API keys and config
├── pyproject.toml # Dependencies and metadata
├── docker-compose.yml # PostgreSQL for checkpointer
├── src/
│ ├── state.py # AgentState TypedDict
│ ├── supervisor.py # Supervisor agent + routing logic
│ ├── graph.py # Graph assembly + compilation
│ ├── graph_with_hitl.py # Human-in-the-loop variant
│ ├── main.py # CLI entry point
│ ├── api.py # FastAPI service layer
│ ├── retry.py # Retry decorator with backoff
│ ├── observability.py # LangSmith tracing + evaluation
│ └── agents/
│ ├── __init__.py
│ ├── research.py # Research agent + tools
│ ├── code.py # Code agent + tools
│ └── review.py # Review agent
└── tests/
├── test_graph.py # Integration tests
├── test_supervisor.py # Supervisor unit tests
└── test_agents.py # Agent unit testsWhere to Go from Here
This blueprint gives you a solid foundation for multi-agent orchestration. As your system matures, consider these extensions: add a Planning Agent that decomposes complex tasks into sub-tasks before routing begins; implement parallel agent execution for independent sub-tasks using LangGraph's branching primitives; build a memory layer that persists key learnings across workflow runs so agents can reference past solutions; and add an output synthesis node that combines all agent outputs into a polished final response. Each of these can be added as a new node in the existing graph without rewriting the core architecture.
The real power of the graph-based approach is composability. Once you have a working Supervisor-Worker pattern, adding a new specialist agent is as simple as writing a node function, binding its tools, and adding an edge from the Supervisor. The graph grows horizontally without increasing the complexity of any individual component.
Version History
Version History
1.0.0 · 2026-03-01
- • Initial publication with Supervisor + 3 specialist agents (Research, Code, Review)
- • LangGraph 0.2.x with StateGraph, conditional edges, and PostgreSQL checkpointer
- • LangSmith integration for tracing and evaluation
- • Human-in-the-loop checkpoint pattern
- • FastAPI service layer with streaming support
- • Production checklist, cost analysis, and common pitfalls