LangGraph is a framework for building stateful, multi-step AI agent workflows as directed graphs. Where LangChain excels at linear chains, LangGraph handles the messy reality of production agents: loops, branches, retries, parallel execution, human approval gates, and long-running workflows that can pause and resume. In 2026, it is the standard choice for production agentic systems built on top of LangChain.
This guide covers LangGraph's core concepts — graphs, state, nodes, edges, and checkpointing — then builds progressively more complex agents: a simple ReAct agent, a branching workflow, a multi-agent system, and a human-in-the-loop approval pattern.
LangGraph models agent execution as a directed graph. State is a TypedDict that flows through the graph — every node reads the current state and returns an updated partial state. Nodes are Python functions (or LangChain runnables) that perform one unit of work. Edges define how execution flows between nodes, including conditional edges that route based on state values.
This is fundamentally different from LangChain's chains: state is explicit and persisted, loops are first-class citizens, and the graph can be interrupted mid-execution to wait for human input or external events.
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
import operator
# Define the state schema
class AgentState(TypedDict):
messages: Annotated[list, operator.add] # operator.add = append, not replace
next_step: str
iteration: int
# Define nodes — each receives state, returns partial update
def planner(state: AgentState) -> dict:
"""Decide what to do next."""
return {"next_step": "execute", "iteration": state.get("iteration", 0) + 1}
def executor(state: AgentState) -> dict:
"""Execute the planned action."""
return {"messages": [f"Executed step {state['iteration']}"]}
def should_continue(state: AgentState) -> str:
"""Conditional edge — returns the name of the next node."""
if state["iteration"] >= 3:
return "done"
return "continue"
# Build the graph
builder = StateGraph(AgentState)
builder.add_node("planner", planner)
builder.add_node("executor", executor)
builder.add_edge(START, "planner")
builder.add_edge("planner", "executor")
builder.add_conditional_edges("executor", should_continue,
{"continue": "planner", "done": END})
graph = builder.compile()
Annotated[list, operator.add] for the messages field means each node's returned messages are appended to the list rather than replacing it. This is how LangGraph accumulates conversation history across nodes.
Let's build a practical graph — a document analysis workflow that extracts, summarizes, and classifies a document in sequence, with error handling that routes to a fallback node on failure.
from typing import TypedDict, Optional
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", temperature=0)
class DocState(TypedDict):
text: str
summary: Optional[str]
category: Optional[str]
error: Optional[str]
def summarize(state: DocState) -> dict:
try:
response = llm.invoke(f"Summarize in 2 sentences:\n\n{state['text'][:2000]}")
return {"summary": response.content}
except Exception as e:
return {"error": str(e)}
def classify(state: DocState) -> dict:
try:
response = llm.invoke(
f"Classify as one of: technical/legal/financial/other.\n"
f"Summary: {state['summary']}\nRespond with just the category."
)
return {"category": response.content.strip().lower()}
except Exception as e:
return {"error": str(e)}
def handle_error(state: DocState) -> dict:
return {"category": "unknown", "summary": f"Processing failed: {state['error']}"}
def route_after_summary(state: DocState) -> str:
return "error" if state.get("error") else "classify"
builder = StateGraph(DocState)
builder.add_node("summarize", summarize)
builder.add_node("classify", classify)
builder.add_node("error", handle_error)
builder.add_edge(START, "summarize")
builder.add_conditional_edges("summarize", route_after_summary,
{"classify": "classify", "error": "error"})
builder.add_edge("classify", END)
builder.add_edge("error", END)
graph = builder.compile()
result = graph.invoke({"text": "Python is a high-level programming language..."})
print(result)
LangGraph's create_react_agent builds a standard Reason+Act loop that calls tools until it decides to return a final answer. Unlike the LangChain agent executor, it exposes the full graph so you can modify any part of the logic.
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
import requests
llm = ChatOpenAI(model="gpt-4o", temperature=0)
@tool
def search_web(query: str) -> str:
"""Search the web for current information. Returns top results summary."""
# In production use Tavily, SerpAPI, or similar
return f"Search results for '{query}': [simulated results about {query}]"
@tool
def calculate(expression: str) -> str:
"""Evaluate a mathematical expression. Input should be a valid Python math expression."""
try:
result = eval(expression, {"__builtins__": {}}, {"__builtins__": {}})
return str(result)
except Exception as e:
return f"Error: {e}"
@tool
def get_weather(city: str) -> str:
"""Get current weather for a city."""
return f"Weather in {city}: 22°C, partly cloudy"
# ReAct agent — automatically loops until final answer
agent = create_react_agent(llm, tools=[search_web, calculate, get_weather])
result = agent.invoke({
"messages": [{"role": "user", "content":
"What's the weather in Paris, and what is 42 * 17 + 100?"}]
})
print(result["messages"][-1].content)
LangGraph checkpointers persist graph state after each node execution, enabling pause-resume workflows, conversation history across sessions, and error recovery without re-running from scratch. The MemorySaver is for development; use PostgresSaver or RedisSaver for production.
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
checkpointer = MemorySaver() # Production: use PostgresSaver
agent = create_react_agent(llm, tools=[], checkpointer=checkpointer)
# Thread ID groups messages into a conversation session
config = {"configurable": {"thread_id": "user-123-session-456"}}
# First message
result1 = agent.invoke(
{"messages": [{"role": "user", "content": "My name is Alice."}]},
config=config
)
print(result1["messages"][-1].content)
# Second message — agent remembers previous context
result2 = agent.invoke(
{"messages": [{"role": "user", "content": "What's my name?"}]},
config=config
)
print(result2["messages"][-1].content) # Will say "Alice"
# Inspect saved state
state = agent.get_state(config)
print(f"Messages in memory: {len(state.values['messages'])}")
LangGraph's interrupt mechanism lets you pause graph execution mid-run to wait for human input — approval, correction, or additional context. This is essential for high-stakes agentic workflows like sending emails, executing database writes, or making API calls with real-world consequences.
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class WorkflowState(TypedDict):
action: str
approved: bool
result: str
def prepare_action(state: WorkflowState) -> dict:
return {"action": "DELETE FROM users WHERE inactive=true"}
def request_approval(state: WorkflowState) -> Command:
"""Interrupt execution and wait for human approval."""
human_decision = interrupt({
"question": f"Approve this action?\n{state['action']}",
"options": ["yes", "no"]
})
return Command(update={"approved": human_decision == "yes"})
def execute_or_abort(state: WorkflowState) -> dict:
if state["approved"]:
return {"result": f"Executed: {state['action']}"}
return {"result": "Action rejected by human reviewer."}
builder = StateGraph(WorkflowState)
builder.add_node("prepare", prepare_action)
builder.add_node("approve", request_approval)
builder.add_node("execute", execute_or_abort)
builder.add_edge(START, "prepare")
builder.add_edge("prepare", "approve")
builder.add_edge("approve", "execute")
builder.add_edge("execute", END)
from langgraph.checkpoint.memory import MemorySaver
graph = builder.compile(checkpointer=MemorySaver(), interrupt_before=["approve"])
interrupt_before=["approve"] pauses the graph before entering the approval node. Resume by calling graph.invoke(Command(resume="yes"), config). The graph reloads from the checkpoint and continues from where it stopped.
LangGraph supports rich streaming: stream token-by-token LLM output, stream node-level events (which node is running, what state it produced), or stream custom values. This is critical for production UIs where users need immediate feedback during long agent runs.
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", streaming=True)
agent = create_react_agent(llm, tools=[search_web, calculate])
# Stream all events — great for debugging
for event in agent.stream(
{"messages": [{"role": "user", "content": "What is 99 * 99?"}]},
stream_mode="values"
):
last_message = event["messages"][-1]
if hasattr(last_message, "content") and last_message.content:
print(last_message.content, end="", flush=True)
# Stream only LLM tokens — for chat UIs
async def stream_to_ui(user_message: str):
async for chunk in agent.astream_events(
{"messages": [{"role": "user", "content": user_message}]},
version="v2"
):
if chunk["event"] == "on_chat_model_stream":
token = chunk["data"]["chunk"].content
if token:
yield token # Send to WebSocket / SSE