Unset sentinel value.
The last (maybe virtual) node in graph-style Pregel.
Special value to indicate that graph should interrupt on all nodes.
Durability mode for the graph execution.
'sync': Changes are persisted synchronously before the next step starts.'async': Changes are persisted asynchronously while the next step executes.'exit': Changes are persisted only when the graph exits.How the stream method should emit outputs.
"values": Emit all values in the state after each step, including interrupts.
When used with functional API, values are emitted once at the end of the workflow."updates": Emit only the node or task names and updates returned by the nodes or tasks after each step.
If multiple updates are made in the same step (e.g. multiple nodes are run) then those updates are emitted separately."custom": Emit custom data using from inside nodes or tasks using StreamWriter."messages": Emit LLM messages token-by-token together with metadata for any LLM invocations inside nodes or tasks."checkpoints": Emit an event when a checkpoint is created, in the same format as returned by get_state()."tasks": Emit events when tasks start and finish, including their results and errors."debug": Emit "checkpoints" and "tasks" events for debugging purposes.Type variable used to represent graph run scoped context.
Defaults to None.
Type variable used to represent the input to a StateGraph.
Defaults to StateT.
Type variable used to represent the output of a StateGraph.
Defaults to StateT.
Type variable used to represent the state in a graph.
Return a config with all keys, merging any provided configs.
Merge multiple configs into one.
Patch a config with new values.
Remove task IDs from checkpoint namespace.
Create a pydantic model with the given field definitions.
Coerce a runnable-like object into a Runnable.
Normalize a timeout value to positive-second policy fields.
Build an async graph lifecycle callback manager from a runnable config.
This helper filters config["callbacks"] down to handlers that inherit
from GraphCallbackHandler and binds the provided run_id onto the
returned manager.
Build a sync graph lifecycle callback manager from a runnable config.
This helper filters config["callbacks"] down to handlers that inherit
from GraphCallbackHandler and binds the provided run_id onto the
returned manager.
Apply writes from a set of tasks (usually the tasks from a Pregel step) to the checkpoint and channels, and return managed values writes to be applied externally.
Function injected under CONFIG_KEY_READ in task config, to read current state. Used by conditional edges to read a copy of the state with reflecting the writes from that node only.
Prepare the set of tasks that will make up the next Pregel step.
Return the module and name of an object.
Async version of channels_from_checkpoint. See docstring there.
Hydrate channels from a checkpoint.
For most channels, spec.from_checkpoint(checkpoint["channel_values"][k])
is sufficient. DeltaChannel is the exception: when the channel is
absent from channel_values, an ancestor walk via
saver.get_delta_channel_history is required to find the nearest seed
(_DeltaSnapshot blob or pre-migration plain value) and accumulate
the writes between it and the target. All delta channels needing
replay are batched into a single saver call.
Build a new Checkpoint from the previous one and live channel state.
For each name in channels_to_snapshot, a _DeltaSnapshot(value) blob
is written into channel_values[k]. Other delta channels are omitted
from channel_values — the ancestor walk reconstructs their state
from checkpoint_writes. Callers compute the set via
delta_channels_to_snapshot(channels, counters); defaults to empty
(no snapshots) when not provided.
Get the graph for this Pregel instance.
Map input chunk to a sequence of pending writes in the form (channel, value).
Get subset of current_versions that are newer than previous_versions.
Get bolded text.
Get colored text.
Apply writes / subgraph states to tasks to be returned in a StateSnapshot.
Async unbounded FIFO queue with a wait() method.
Subclassed from asyncio.Queue, adding a wait() method.
Unbounded FIFO queue with a wait() method. Adapted from pure Python implementation of queue.SimpleQueue.
Sequence of Runnable, where the output of each is the input of the next.
RunnableSeq is a simpler version of RunnableSequence that is internal to
LangGraph.
TypedDict to use for extra keyword arguments, enabling type checking warnings for deprecated arguments.
Graph lifecycle event emitted when execution pauses for interrupts.
Graph lifecycle event emitted when execution resumes from a checkpoint.
Base class for all channels.
A configurable PubSub Topic.
Raised when a graph run exits early due to a drain request.
This indicates the graph stopped cooperatively at a superstep boundary
because RunControl.request_drain() was called (e.g., in response to
SIGTERM). The checkpoint is saved and the run can be resumed later.
Raised when the graph has exhausted the maximum number of steps.
This prevents infinite loops. To increase the maximum number of steps,
run your graph with a config specifying a higher recursion_limit.
Troubleshooting guides:
Examples:
graph = builder.compile()
graph.invoke(
{"messages": [("user", "Hello, world!")]},
# The config is the second positional argument
{"recursion_limit": 1000},
)
Raised when attempting to update a channel with an invalid set of updates.
Troubleshooting guides:
Simplest implementation of WritesProtocol, for usage with writes that don't originate from a runnable task, eg. graph input, update_state, etc.
A callback handler that implements stream_mode=messages.
Collects messages from: (1) chat model stream events; and (2) node outputs.
v2 variant of StreamMessagesHandler.
Declaring _V2StreamingCallbackHandler as a base flips
BaseChatModel.invoke to route through _stream_chat_model_events
(firing on_stream_event) instead of _stream (firing
on_llm_new_token). Inherits on_stream_event from the parent,
which forwards protocol events onto the messages stream channel.
Pregel attaches this class instead of the v1 handler only when
StreamingHandler opts in via the internal
CONFIG_KEY_STREAM_MESSAGES_V2 config key; direct
graph.stream(stream_mode="messages") callers keep the v1
AIMessageChunk shape.
A node in a Pregel graph. This won't be invoked as a runnable by the graph itself, but instead acts as a container for the components necessary to make a PregelExecutableTask for a node.
Configuration for retrying nodes.
Responsible for executing a set of Pregel tasks concurrently, committing their writes, yielding control to caller when there is output to emit, and interrupting other tasks if appropriate.
Callback handler that emits tool-call lifecycle events on the stream.
Fires on LangChain's on_tool_* callbacks and pushes to the tools
stream mode. Emits tool-started / tool-output-delta /
tool-finished / tool-error payloads keyed by tool_call_id.
While a tool is executing, this handler sets _tool_call_writer to a
closure bound to that call's namespace and tool_call_id.
ToolRuntime.emit_output_delta reads that ContextVar so tool bodies
can stream partial output without threading the writer through their
own signature.
Attached by Pregel.stream / astream when "tools" is in
stream_modes. run_inline = True keeps event ordering
deterministic.
Implements the logic for sending writes to CONFIG_KEY_SEND. Can be used as a runnable or as a static method to call imperatively.
Run-scoped control surface for cooperative draining.
Intended for a single graph run. Create a fresh RunControl per run;
reusing a control after request_drain() leaves it drained.
Safe to call from any thread: the drain request is represented by a single attribute write, so no lock is needed for this signal. If more mutable state is added here, add synchronization.
Metadata injected by LangGraph Server. None when running open-source LangGraph without LangSmith deployments.
Central event dispatcher for the streaming infrastructure.
Owns the main event log and routes events through a transformer
pipeline. StreamChannels with a name discovered in transformer
projections are auto-wired so that every push() also injects a
ProtocolEvent into the main log. StreamChannels without a name
are local-only.
Pass is_async=True when the mux will be consumed via async
iteration (handler.astream()). All StreamChannel instances
discovered during registration are automatically bound to the
matching mode.
Extension point for custom stream projections.
Transformers observe protocol events flowing through the StreamMux and build typed derived projections (StreamChannels, promises, etc.).
Set _native = True on a transformer to have its projection keys
exposed as direct attributes on the run stream (in addition to
appearing in run.extensions).
Subclasses must implement init and override at least one of
process / aprocess. The finalize / afinalize and fail /
afail hooks are optional — the default implementations are no-ops.
StreamChannel instances in the projection dict are auto-closed /
auto-failed by the mux, so most transformers don't need finalize
or fail at all.
Transformers that need async work pick the async lane by:
aprocess (and optionally afinalize / afail), orself.schedule(coro) from inside a sync process, orrequires_async = True explicitly.The mux detects these cases at registration and raises if they're
used under sync stream() — they only work under astream().
Use aprocess when the pump must wait for async work before the
next transformer sees the event (e.g. PII redaction that mutates
event in place). Use schedule() for decoupled async work whose
result lands on an independent projection (e.g. async moderation
scoring, cost lookup, external tracing).
Sync run stream with caller-driven pumping.
The caller's iteration on any projection (values, messages,
raw events, or output) drives the graph forward. No background
thread is used — the caller's for loop is the pump.
Projections are single-consumer — iterating run.values twice
raises. Use projection.tee(n) if you genuinely need fan-out.
All transformer projections live in extensions. Native transformer
projections (those with _native = True) are also set as direct
attributes on this instance (e.g. run.values, run.messages).
Returned by Pregel.stream_events(version="v3"), which is
experimental and may change.
Surface subgraph lifecycle as lifecycle protocol events.
Pushes LifecyclePayload to a StreamChannel named lifecycle.
The channel is auto-forwarded by the mux so payloads land in the
main event log under method = "lifecycle" (native transformer —
no custom: prefix) — visible to remote SDK clients over the
wire and to in-process consumers via run.lifecycle.
Tracks subgraphs at every depth strictly below the transformer's scope, so a graph → subgraph → subgraph chain produces lifecycle events for both nested levels in a flat stream.
Native transformer — projection key lifecycle is exposed as
run.lifecycle.
Capture messages events as ChatModelStream objects.
The messages projection yields one ChatModelStream (or
AsyncChatModelStream) per LLM call. Consumers iterate
run.messages to get stream handles, then use each handle's typed
projections (.text, .reasoning, .tool_calls, .usage,
.output) for per-message content.
Two input shapes are handled (via params["data"] = (payload, metadata) from StreamMessagesHandler):
"event" key) — emitted by
stream_events(version="v3") / astream_events(version="v3") via the on_stream_event
callback. Routed to an existing ChatModelStream by
metadata["run_id"]. A message-start event creates a new
stream; message-finish closes it.AIMessage — emitted from on_chain_end when a node
returns a finalized message. Replayed as a synthetic protocol
event lifecycle via message_to_events, then the
already-complete stream is pushed to the log.V1 AIMessageChunk tuples (from on_llm_new_token) are not
streamed into this projection: chat models that want to populate
run.messages with content-block streaming must use
stream_events(version="v3") / astream_events(version="v3"). Models called via the legacy
stream() method still surface their final AIMessage via
on_chain_end when a node returns it as state.
Only events at the run's own level are projected; tokens from
deeper subgraphs are left in the main event log but excluded from
.messages. "Own level" is defined by scope, which
stream_events(version="v3") / astream_events(version="v3") populate from the caller's checkpoint
namespace so that a stream_events(version="v3") call inside a node still sees its
own root chat model streams on .messages. Consumers that need
subgraph tokens should iterate the raw event stream or register a
custom transformer.
Native transformer — the messages projection is exposed as a
direct attribute on the run stream.
Discover subgraph invocations as in-process navigation handles.
Per discovered direct-child subgraph, builds a SubgraphRunStream
(or AsyncSubgraphRunStream) wrapping a child mini-mux scoped to
the subgraph's namespace. Consumers iterate run.subgraphs to
receive handles, then drill into handle.values / handle.messages
/ handle.subgraphs (recursive grandchildren) / handle.lifecycle.
Each mini-mux owns its own scope and uses its own
SubgraphTransformer to discover its direct children, so
grandchildren live on the child handle — never on the root's
subgraphs log. Forwarding events into the matching child mini-mux
is what keeps the child's projections populated.
Native transformer — subgraphs is exposed as run.subgraphs.
Capture values events as a drainable stream of state snapshots.
Provides the run.values projection. run.output,
run.interrupted and run.interrupts are tracked directly
by the run stream and do not depend on this transformer.
Native transformer — projection keys are exposed as direct
attributes on the run stream (e.g. run.values).
Only values events at the run's own level are captured; snapshots
from deeper subgraphs are left in the main event log but excluded
from the projection. "Own level" is defined by scope, which
stream_events(version="v3") / astream_events(version="v3") populate from the caller's
checkpoint namespace so that a nested stream_events(version="v3") call still
sees its own root snapshots.
Configuration for caching nodes.
One or more commands to update the graph's state and send messages to nodes.
Typed container returned by invoke() / ainvoke() with version="v2".
Information about an interrupt that occurred in a node.
interrupt_id was introduced as a propertyThe following attributes have been removed:
nswhenresumableinterrupt_id, deprecated in favor of idA message or packet to send to a specific node in the graph.
The Send class is used within a StateGraph's conditional edges to
dynamically invoke a node with a custom state at the next step.
Importantly, the sent state can differ from the core graph's state, allowing for flexible and dynamic workflow management.
One such example is a "map-reduce" workflow where your graph invokes the same node multiple times in parallel with different states, before aggregating the results back into the main graph's state.
Snapshot of the state of the graph at the beginning of a step.
Configuration for timing out node attempts.
Timeouts rely on asyncio cancellation. If your node uses synchronous time.sleep() or other CPU-bound work that blocks the GIL, the timeout will not be fired until after the event loop has been released.
Under refresh_on="auto", an internal handler refreshes the timeout on any
callback event that occurs in the execution of the node or its nested descendants.
A specific LangGraphDeprecationWarning subclass defining functionality deprecated since LangGraph v1.0.0
Pregel manages the runtime behavior for LangGraph applications.
Pregel combines actors and channels into a single application. Actors read data from channels and write data to channels. Pregel organizes the execution of the application into multiple steps, following the Pregel Algorithm/Bulk Synchronous Parallel model.
Each step consists of three phases:
Repeat until no actors are selected for execution, or a maximum number of steps is reached.
An actor is a PregelNode.
It subscribes to channels, reads data from them, and writes data to them.
It can be thought of as an actor in the Pregel algorithm.
PregelNodes implement LangChain's
Runnable interface.
Channels are used to communicate between actors (PregelNodes).
Each channel has a value type, an update type, and an update function – which
takes a sequence of updates and
modifies the stored value. Channels can be used to send data from one chain to
another, or to send data from a chain to itself in a future step. LangGraph
provides a number of built-in channels:
LastValue: The default channel, stores the last value sent to the channel,
useful for input and output values, or for sending data from one step to the nextTopic: A configurable PubSub Topic, useful for sending multiple values
between actors, or for accumulating output. Can be configured to deduplicate
values, and/or to accumulate values over the course of multiple steps.Context: exposes the value of a context manager, managing its lifecycle.
Useful for accessing external resources that require setup and/or teardown. e.g.
client = Context(httpx.Client)BinaryOperatorAggregate: stores a persistent value, updated by applying
a binary operator to the current value and each update
sent to the channel, useful for computing aggregates over multiple steps. e.g.
total = BinaryOperatorAggregate(int, operator.add)Most users will interact with Pregel via a StateGraph (Graph API) or via an entrypoint (Functional API).
However, for advanced use cases, Pregel can be used directly. If you're not sure whether you need to use Pregel directly, then the answer is probably no
Here are some examples to give you a sense of how it works:
Type of the checkpointer to use for a subgraph.
True enables persistent checkpointing for this subgraph.False disables checkpointing, even if the parent graph has a checkpointer.None inherits checkpointer from the parent graph.A discriminated union of all v2 stream part types.
Use part["type"] to narrow the type:
async for part in graph.astream(input, version="v2"):
if part["type"] == "values":
part["data"] # OutputT — full state (pydantic/dataclass/dict)
elif part["type"] == "messages":
part["data"] # tuple[BaseMessage, dict] — (message, metadata)
elif part["type"] == "custom":
part["data"] # Any — user-definedConvenience class that bundles run-scoped context and other runtime utilities.
This class is injected into graph nodes and middleware. It provides access to
context, store, stream_writer, previous, and execution_info.
configRuntime does not include config. To access RunnableConfig, you can inject
it directly by adding a config: RunnableConfig parameter to your node function
(recommended), or use get_config() from langgraph.config.
ToolRuntime (from langgraph.prebuilt) is a subclass that provides similar
functionality but is designed specifically for tools. It shares context, store,
and stream_writer with Runtime, and adds tool-specific attributes like config,
state, and tool_call_id.
Example:
from typing import TypedDict
from langgraph.graph import StateGraph
from dataclasses import dataclass
from langgraph.runtime import Runtime
from langgraph.store.memory import InMemoryStore
@dataclass
class Context: # (1)!
user_id: str
class State(TypedDict, total=False):
response: str
store = InMemoryStore() # (2)!
store.put(("users",), "user_123", {"name": "Alice"})
def personalized_greeting(state: State, runtime: Runtime[Context]) -> State:
'''Generate personalized greeting using runtime context and store.'''
user_id = runtime.context.user_id # (3)!
name = "unknown_user"
if runtime.store:
if memory := runtime.store.get(("users",), user_id):
name = memory.value["name"]
response = f"Hello {name}! Nice to see you again."
return {"response": response}
graph = (
StateGraph(state_schema=State, context_schema=Context)
.add_node("personalized_greeting", personalized_greeting)
.set_entry_point("personalized_greeting")
.set_finish_point("personalized_greeting")
.compile(store=store)
)
result = graph.invoke({}, context=Context(user_id="user_123"))
print(result)
# > {'response': 'Hello Alice! Nice to see you again.'}
user_id.Async run stream with caller-driven pumping.
Async iteration on any projection drives the graph forward — there
is no background task. Concurrent consumers share a single-flight
pump via an asyncio.Lock, so each awaiting cursor contributes one
event per acquisition. Backpressure comes from the logs: when a
subscribed log's buffer reaches maxlen, apush awaits the
subscriber to drain, which holds back the pump and paces the graph.
Projections are single-consumer — a second aiter(run.values)
raises. Use projection.tee(n) for fan-out.
Use as an async context manager to guarantee clean shutdown on early exit:
async with await handler.astream(input) as run:
async for msg in run.messages:
...
Awaited from Pregel.astream_events(version="v3"), which is
experimental and may change.