LangGraph状态图与状态转换完全指南:从基础到高阶技巧
State Graph Overview
Defining State
In LangGraph, state functions as a shared container for data exchanged between nodes. Each node receives the current state, returns a partial update (state update), and at the super-step boundary these updates are merged back into the state for downstream nodes to consume.
The state schema must be defined before constructing the graph. The most fundamental and commonly used approach is TypedDict:
from typing_extensions import TypedDict
class State(TypedDict):
state_value1: int
state_value2: str
# additional fields
LangGraph also supports defining state with Pydantic's BaseModel, but TypedDict is more prevalent in real projects because it natively supports Annotated[type, reducer] field binding (details covered later).
In practice, you rarely define state from scratch; instead, you inherit the built-in MessagesState:
from langgraph.graph import MessagesState
class CleanerState(MessagesState):
# define additional fields here
column_decisions: dict[str, Decision]
schema_info: dict
MessagesState is essentially a TypedDict, but it includes a built-in messages field with its corresponding reducer (add_messages). This reducer automatically accumulates Message objects from humans, agents, and tool calls. Inheriting it gives you message history persistence out of the box.
Graph Construction and Invocation
LangGraph models agent workflows as a state graph, with StateGraph as the starting point in the SDK:
from langgraph.graph import StateGraph, START, END
# 1. Instantiate builder with state schema
builder = StateGraph(State)
# 2. Add nodes (each node is a state -> partial state function)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
# 3. Add edges (determine execution flow between nodes)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", END)
# 4. Compile
graph = builder.compile()
# Use IPython to visualize topology
# display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
compile() returns a callable object. To execute the graph, pass in the initial state:
messages = [HumanMessage(content="Compute: add 3 and 24, multiply the result by 6, then divide by 3")]
graph.invoke({"messages": messages})
Note that graph.invoke() receives the entire initial state, not just a single question. It triggers the full graph execution, not a single LLM call—this is a fundamental difference between LangGraph and a raw LLM API in terms of invocation paradigm. Even if the graph contains only one LLM node, you must follow this state-driven approach. The more complex the graph, the more control information the initial state carries (e.g., configuration flags, contextual data, external inputs).
BSP Execution Model and Super‑Step
LangGraph uses the BSP (Bulk Synchronous Parallel) model to organize node execution. This model underpins all subsequent discussions on reducers, commutativity, and concurrent safety.
Key points:
1. Nodes Return State Updates
Each node function receives the current state and returns a partial update:
def node_1(state):
# state["state_value"] is the current value
return {"state_value_1": 2} # partial update; only changed keys
LangGraph does not immediately apply the update. Instead, it caches all updates until the super‑step boundary, where it merges them via reducers. This is why reducers exist—state updates always go through reducers, never via direct assignment.
2. Execution Is Divided into Discrete Super‑Steps
Graph execution is not a sequential node‑by‑node progression. It is partitioned into discrete synchronous phases called super‑steps. Within each super‑step:
- All nodes triggered in that step run logically in parallel.
- The updates they return are temporarily stored, not merged immediately.
- After the super‑step ends, all updates are merged into the state via reducers, creating the next checkpoint.
- Then the next super‑step begins.
3. Execution Order Within a Super‑Step Is Undefined
Nodes triggered concurrently in the same super‑step can finish in any order—this is the starting point for all reducer design discussions.
Consider this structure:
flowchart LR
A["node 1"]
B["node 2"]
C["node 3"]
D[END]
A --> B
A --> C
B --> D
C --> D
After node 1 completes, node 2 and node 3 are triggered in the same super‑step. LangGraph cannot guarantee which runs first. It could be node 2 first one time and node 3 first the next.
If both node 2 and node 3 write updates to the same state key, LangGraph must combine the two updates into a single value at the super‑step boundary. The logic that determines “how to combine” is the Reducer.
Reducer
A reducer is a mapping from multiple state values to a single output; it essentially embodies the state‑update logic. The operation can be understood as:
for k in update.keys():
new_state[k] = reducer(current_state[k], update[k])
where k is a key in the state (assuming the state is a TypedDict).
If no reducer is specified, the default behavior is to replace the old value with the new one:
for k in update.keys():
new_state[k] = update[k]
Accumulative Reducers
When a new value arrives but the old value still holds useful context, an accumulative approach is needed. Typical scenarios: the LLM or downstream nodes need to see “history”, not just the latest state snapshot.
Common techniques:
- Monotonic accumulation:
Annotated[list, operator.add] - Semantic accumulation:
add_messages, custom sliding windows (e.g., retain the last 10 records), key‑based deduplication (upsert)
Here’s a native Python example of information accumulation:
from typing import Annotated
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
class MessagesState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
Aggregative Reducers
When multiple nodes attempt to write to the same key, we need logic to integrate the results. This kind of reducer is used when nodes contribute different parts of the final output.
Moreover, such reducers must satisfy commutativity:
reducer(reducer(init, update_a), update_b) == reducer(reducer(init, update_b), update_a)
In other words, the merged result must not depend on the order of execution.
Use Case: Parallel Schema Profiling
During EDA initialization, three sub‑agents profile the same data from different dimensions. Each agent writes to a different dimension of column_profile, without touching others’ output.
flowchart TD
A[START]
B["Assign profiling tasks"]
C["dtype_agent
infer column data types"]
D["missing_agent
compute missing rate"]
E["cardinality_agent
compute unique value count"]
F["Merge (via reducer) & END"]
A --> B
B --> C
B --> D
B --> E
C --> F
D --> F
E --> F
The three agents belong to the same super‑step and fan‑in to the same state key:
class EDAState(MessagesState):
column_profile: Annotated[dict, ???] # pending reducer
The updates from the three agents are:
A = {"age": {"dtype": "int32"}, "city": {"dtype": "string"}}
B = {"age": {"missing_rate": 0.03}, "city": {"missing_rate": 0.00}}
C = {"age": {"n_unique": 87}, "city": {"n_unique": 142}}
Expected merged result:
{
"age": {"dtype": "int32", "missing_rate": 0.03, "n_unique": 87},
"city": {"dtype": "string", "missing_rate": 0.00, "n_unique": 142},
}
Incorrect Attempt: Shallow Merge
Intuitively, dictionary merging uses | or {**a, **b}:
def shallow_merge(old: dict, new: dict) -> dict:
return {**old, **new}
Because LangGraph does not guarantee the merge order of A/B/C, testing two arrival sequences manually:
# Order 1: A → B → C
shallow_merge(shallow_merge(shallow_merge({}, A), B), C)
# {"age": {"n_unique": 87}, "city": {"n_unique": 142}}
# Order 2: C → A → B
shallow_merge(shallow_merge(shallow_merge({}, C), A), B)
# {"age": {"missing_rate": 0.03}, "city": {"missing_rate": 0.00}}
Two problems emerge:
- Data loss:
{**a, **b}overwrites at the leaf level, replacing entire subtrees. - Order‑dependent results: which fields are lost changes with each run—this is a classic Heisenbug. Local tests may pass while production fails intermittently. Order‑dependent loss is more dangerous than stable data loss because unit tests can pass, integration tests may fail sporadically, and debugging costs are high.
Correct Implementation: Deep Merge
def deep_merge(old: dict, new: dict) -> dict:
result = dict(old)
for k, v in new.items():
if isinstance(v, dict) and isinstance(result.get(k), dict):
result[k] = deep_merge(result[k], v)
else:
result[k] = v
return result
class EDAState(MessagesState):
column_profile: Annotated[dict, deep_merge]
No matter the arrival order (A→B→C, C→A→B, B→C→A, etc.), the merged result is the expected complete profile; commutativity holds.
Key Insight: Merge Granularity Must Align with Business Responsibility Boundaries
The failure of shallow merge and success of deep merge are not fundamentally about recursion depth. The difference lies in whether the reducer’s merge granularity matches the actual responsibility boundaries of the business logic:
- All three agents overlap at the top‑level keys (
"age","city"). - But they do not overlap at the leaf level (
dtype/missing_rate/n_unique).
Shallow merge stops recursion at the top level, incorrectly treating the overlap as a conflict and using last‑write‑wins, creating order dependence. Deep merge continues recursion to the leaf level, correctly recognizing that responsibility boundaries are non‑overlapping deeper down, thus merging conflict‑free and satisfying commutativity.
Therefore, the core design principle for aggregative reducers is to align merge granularity with the real responsibility boundaries of the business. If boundaries are correct, concurrency is safe; if boundaries are wrong (overwriting too early when non‑conflicting merge space still exists), commutativity is broken.
Arbitrative Reducers
When multiple nodes produce truly conflicting writes for the same value, business rules determine which one to keep. Typical scenarios:
- HITL where human decisions override agent decisions.
- Multi‑model aggregation where confidence selects the action.
- Time‑series where the latest value wins.
⚠️ Complexity of Arbitrative Reducers
An arbitrative reducer’s function body may contain only a few lines of if-else, but its actual complexity stems from the business logic itself. Here’s a comparison with aggregative reducers:
| Dimension | Aggregative | Arbitrative |
|---|---|---|
| Decision rule source | Data structure | Business semantics (must be manually coded) |
| Reusability across projects | High | Nearly zero |
| Structural requirements | Raw data suffices | Metadata required |
| Commutativity guarantee | Clear boundaries | Depends on precision of manual coding |
| Operated objects | No constraints, overlap allowed | Inevitable overlap and conflict |
Use Case: HITL Override for Data Cleaning Sub‑Agent
Consider a data cleaning sub‑agent:
flowchart TD
A[START]
B["Route by column"]
C["missing_value_agent"]
D["type_inference_agent"]
H["human_review (HITL)"]
E["Merge (via reducer) & Next step"]
A --> B
B --> C
B --> D
B -.HITL interrupt.-> H
C --> E
D --> E
H --> E
Business rules:
- Once a human makes a decision, agents must not override that decision.
- If multiple agents give different suggestions for the same column, a deterministic rule must determine the winner.
State value design:
class CleanerState(MessagesState):
column_decisions: Annotated[dict[str, Decision], ]
column_decisions is a dictionary containing processing decisions per column. The arbitration rule also operates per column:
def arbitrate_decisions(old: dict, new: dict) -> dict:
result = dict(old)
for col, decision in new.items():
result[col] = arbitrate_single(result.get(col), decision)
return result
Below are two possible implementations of arbitrate_single.
Incorrect Attempt: Naive Human‑First
The simplest idea is to return whichever decision comes from a human:
def arbitrate_single(old, new):
if new.get("source") == "human":
return new
if old and old.get("source") == "human":
return old
return new
Consider test data:
A = {"source": "agent", "op": "fill_mean"}
B = {"source": "agent", "op": "fill_median"}
C = {"source": "human", "op": "drop"}
Check commutativity:
- When C is involved, logic is correct: always returns
{"source": "human"}. - When C is not involved, you get:
arbitrate_single(A, B) -> Bwhilearbitrate_single(B, A) -> A— different results, commutative property is broken.
So behavior is non‑deterministic when only agents are present.
Correct Approach: Introduce Confidence to Explicitly Model Business Logic
Update the field structure:
A = {"source": "agent", "op": "fill_mean", "confidence": 0.7}
B = {"source": "agent", "op": "fill_median", "confidence": 0.9}
C = {"source": "human", "op": "drop"} # human does not need confidence
Update the reducer implementation:
def arbitrate_single(old, new):
# Rule 1: Human takes precedence
if new.get("source") == "human":
return new
if old and old.get("source") == "human":
return old
# Rule 2: Without human intervention, higher confidence wins
if old is None:
return new
if new["confidence"] != old["confidence"]:
return new if new["confidence"] > old["confidence"] else old
# Rule 3: When confidence ties, enforce a deterministic preference
return new if new["op"] < old["op"] else old
Now the result is identical regardless of arrival order—commutativity is guaranteed, and system behavior becomes predictable.
