LangGraph: Stateful AI Agent Workflows Guide

LangGraph is a framework for building stateful, multi-step AI agent workflows as directed graphs. Where LangChain excels at linear chains, LangGraph handles the messy reality of production agents: loops, branches, retries, parallel execution, human approval gates, and long-running workflows that can pause and resume. In 2026, it is the standard choice for production agentic systems built on top of LangChain.

This guide covers LangGraph's core concepts — graphs, state, nodes, edges, and checkpointing — then builds progressively more complex agents: a simple ReAct agent, a branching workflow, a multi-agent system, and a human-in-the-loop approval pattern.

Table of Contents

Core Concepts: State, Nodes, and Edges

LangGraph models agent execution as a directed graph. State is a TypedDict that flows through the graph — every node reads the current state and returns an updated partial state. Nodes are Python functions (or LangChain runnables) that perform one unit of work. Edges define how execution flows between nodes, including conditional edges that route based on state values.

This is fundamentally different from LangChain's chains: state is explicit and persisted, loops are first-class citizens, and the graph can be interrupted mid-execution to wait for human input or external events.

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
import operator

# Define the state schema
class AgentState(TypedDict):
    messages: Annotated[list, operator.add]   # operator.add = append, not replace
    next_step: str
    iteration: int

# Define nodes — each receives state, returns partial update
def planner(state: AgentState) -> dict:
    """Decide what to do next."""
    return {"next_step": "execute", "iteration": state.get("iteration", 0) + 1}

def executor(state: AgentState) -> dict:
    """Execute the planned action."""
    return {"messages": [f"Executed step {state['iteration']}"]}

def should_continue(state: AgentState) -> str:
    """Conditional edge — returns the name of the next node."""
    if state["iteration"] >= 3:
        return "done"
    return "continue"

# Build the graph
builder = StateGraph(AgentState)
builder.add_node("planner", planner)
builder.add_node("executor", executor)
builder.add_edge(START, "planner")
builder.add_edge("planner", "executor")
builder.add_conditional_edges("executor", should_continue,
                               {"continue": "planner", "done": END})
graph = builder.compile()
Note: Using Annotated[list, operator.add] for the messages field means each node's returned messages are appended to the list rather than replacing it. This is how LangGraph accumulates conversation history across nodes.

Building Your First Graph

Let's build a practical graph — a document analysis workflow that extracts, summarizes, and classifies a document in sequence, with error handling that routes to a fallback node on failure.

from typing import TypedDict, Optional
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o", temperature=0)

class DocState(TypedDict):
    text: str
    summary: Optional[str]
    category: Optional[str]
    error: Optional[str]

def summarize(state: DocState) -> dict:
    try:
        response = llm.invoke(f"Summarize in 2 sentences:\n\n{state['text'][:2000]}")
        return {"summary": response.content}
    except Exception as e:
        return {"error": str(e)}

def classify(state: DocState) -> dict:
    try:
        response = llm.invoke(
            f"Classify as one of: technical/legal/financial/other.\n"
            f"Summary: {state['summary']}\nRespond with just the category."
        )
        return {"category": response.content.strip().lower()}
    except Exception as e:
        return {"error": str(e)}

def handle_error(state: DocState) -> dict:
    return {"category": "unknown", "summary": f"Processing failed: {state['error']}"}

def route_after_summary(state: DocState) -> str:
    return "error" if state.get("error") else "classify"

builder = StateGraph(DocState)
builder.add_node("summarize", summarize)
builder.add_node("classify", classify)
builder.add_node("error", handle_error)
builder.add_edge(START, "summarize")
builder.add_conditional_edges("summarize", route_after_summary,
                               {"classify": "classify", "error": "error"})
builder.add_edge("classify", END)
builder.add_edge("error", END)
graph = builder.compile()

result = graph.invoke({"text": "Python is a high-level programming language..."})
print(result)

ReAct Agent with Tools

LangGraph's create_react_agent builds a standard Reason+Act loop that calls tools until it decides to return a final answer. Unlike the LangChain agent executor, it exposes the full graph so you can modify any part of the logic.

from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
import requests

llm = ChatOpenAI(model="gpt-4o", temperature=0)

@tool
def search_web(query: str) -> str:
    """Search the web for current information. Returns top results summary."""
    # In production use Tavily, SerpAPI, or similar
    return f"Search results for '{query}': [simulated results about {query}]"

@tool
def calculate(expression: str) -> str:
    """Evaluate a mathematical expression. Input should be a valid Python math expression."""
    try:
        result = eval(expression, {"__builtins__": {}}, {"__builtins__": {}})
        return str(result)
    except Exception as e:
        return f"Error: {e}"

@tool
def get_weather(city: str) -> str:
    """Get current weather for a city."""
    return f"Weather in {city}: 22°C, partly cloudy"

# ReAct agent — automatically loops until final answer
agent = create_react_agent(llm, tools=[search_web, calculate, get_weather])

result = agent.invoke({
    "messages": [{"role": "user", "content":
        "What's the weather in Paris, and what is 42 * 17 + 100?"}]
})
print(result["messages"][-1].content)

Checkpointing and Memory

LangGraph checkpointers persist graph state after each node execution, enabling pause-resume workflows, conversation history across sessions, and error recovery without re-running from scratch. The MemorySaver is for development; use PostgresSaver or RedisSaver for production.

from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")
checkpointer = MemorySaver()  # Production: use PostgresSaver

agent = create_react_agent(llm, tools=[], checkpointer=checkpointer)

# Thread ID groups messages into a conversation session
config = {"configurable": {"thread_id": "user-123-session-456"}}

# First message
result1 = agent.invoke(
    {"messages": [{"role": "user", "content": "My name is Alice."}]},
    config=config
)
print(result1["messages"][-1].content)

# Second message — agent remembers previous context
result2 = agent.invoke(
    {"messages": [{"role": "user", "content": "What's my name?"}]},
    config=config
)
print(result2["messages"][-1].content)  # Will say "Alice"

# Inspect saved state
state = agent.get_state(config)
print(f"Messages in memory: {len(state.values['messages'])}")

Human-in-the-Loop Approval

LangGraph's interrupt mechanism lets you pause graph execution mid-run to wait for human input — approval, correction, or additional context. This is essential for high-stakes agentic workflows like sending emails, executing database writes, or making API calls with real-world consequences.

from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

class WorkflowState(TypedDict):
    action: str
    approved: bool
    result: str

def prepare_action(state: WorkflowState) -> dict:
    return {"action": "DELETE FROM users WHERE inactive=true"}

def request_approval(state: WorkflowState) -> Command:
    """Interrupt execution and wait for human approval."""
    human_decision = interrupt({
        "question": f"Approve this action?\n{state['action']}",
        "options": ["yes", "no"]
    })
    return Command(update={"approved": human_decision == "yes"})

def execute_or_abort(state: WorkflowState) -> dict:
    if state["approved"]:
        return {"result": f"Executed: {state['action']}"}
    return {"result": "Action rejected by human reviewer."}

builder = StateGraph(WorkflowState)
builder.add_node("prepare", prepare_action)
builder.add_node("approve", request_approval)
builder.add_node("execute", execute_or_abort)
builder.add_edge(START, "prepare")
builder.add_edge("prepare", "approve")
builder.add_edge("approve", "execute")
builder.add_edge("execute", END)

from langgraph.checkpoint.memory import MemorySaver
graph = builder.compile(checkpointer=MemorySaver(), interrupt_before=["approve"])
Note: interrupt_before=["approve"] pauses the graph before entering the approval node. Resume by calling graph.invoke(Command(resume="yes"), config). The graph reloads from the checkpoint and continues from where it stopped.

Streaming Events

LangGraph supports rich streaming: stream token-by-token LLM output, stream node-level events (which node is running, what state it produced), or stream custom values. This is critical for production UIs where users need immediate feedback during long agent runs.

from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o", streaming=True)
agent = create_react_agent(llm, tools=[search_web, calculate])

# Stream all events — great for debugging
for event in agent.stream(
    {"messages": [{"role": "user", "content": "What is 99 * 99?"}]},
    stream_mode="values"
):
    last_message = event["messages"][-1]
    if hasattr(last_message, "content") and last_message.content:
        print(last_message.content, end="", flush=True)

# Stream only LLM tokens — for chat UIs
async def stream_to_ui(user_message: str):
    async for chunk in agent.astream_events(
        {"messages": [{"role": "user", "content": user_message}]},
        version="v2"
    ):
        if chunk["event"] == "on_chat_model_stream":
            token = chunk["data"]["chunk"].content
            if token:
                yield token  # Send to WebSocket / SSE