import { ... } from "@langchain/langgraph/web";Reducer function for combining two sets of messages in LangGraph's state system.
This reducer handles several tasks:
left and right message inputs to arrays.BaseMessage instances.RemoveMessage instance is encountered in right with the ID REMOVE_ALL_MESSAGES,
all previous messages are discarded and only the subsequent messages in right are returned.left and right messages together following these rules:
right shares an ID with a message in left:
RemoveMessage, that message (by ID) is marked for removal.left.right does not exist in left:
RemoveMessage, this is considered an error (cannot remove non-existent ID).Creates a GraphRunStream with built-in transformers and kicks off the background pump that feeds raw stream chunks through the transformer pipeline.
Built-in transformers are registered in this order:
_discoveries log.lifecycle channel events.run.values / run.output.run.messages / .messagesFrom.Subgraph discovery is registered first so that downstream transformers (notably lifecycle) observe child namespaces with their stream handles already in place. User-supplied transformer factories are registered afterwards.
Create the built-in lifecycle transformer.
Marked as a NativeStreamTransformer so the run stream
factory can expose _lifecycleLog via a dedicated getter
(run.lifecycle) rather than through run.extensions.
Creates a StreamTransformer that groups messages channel events into
per-message ChatModelStream instances.
A new ChatModelStream is created on message-start and closed on
message-finish. Content-block events in between are forwarded to the
active stream. Only events whose namespace exactly matches path
are processed; child namespaces are ignored.
Create the subgraph discovery transformer.
Registering this transformer against a mux replaces the legacy
inline behavior that previously lived in StreamMux.push.
The mux no longer knows about the subgraph factory: instead, this
transformer is the single component that materializes stream
handles and announces them on _discoveries.
Marked as a NativeStreamTransformer so the projection is
treated as internal wiring (not merged into run.extensions and
not auto-forwarded via StreamMux.wireChannels).
Creates a StreamTransformer that captures values channel events
into a local StreamChannel. Only events whose namespace exactly
matches path are recorded; events from child or sibling namespaces
are ignored.
The final snapshot is resolved by StreamMux.close directly; this transformer only accumulates intermediate values.
Define a LangGraph workflow using the entrypoint function.
The wrapped function must accept at most two parameters. The first parameter is the input to the function. The second (optional) parameter is a LangGraphRunnableConfig object. If you wish to pass multiple parameters to the function, you can pass them as an object.
To write data to the "custom" stream, use the getWriter function, or the LangGraphRunnableConfig.writer property.
The getPreviousState function can be used to access the previous state that was returned from the last invocation of the entrypoint on the same thread id.
If you wish to save state other than the return value, you can use the entrypoint.final function.
Filter a lifecycle StreamChannel to only the entries whose namespace lies within the subtree rooted at path.
Returns an AsyncIterable whose iterator yields every entry whose
namespace either equals path or is a descendant of it.
Iteration begins at startAt, so callers can capture the
log's current size at construction time to skip entries emitted
before the caller existed (e.g. a subgraph stream discovered
mid-run shouldn't replay the root's started).
Filter a SubgraphDiscovery channel to only the direct children of a given namespace.
Returns an AsyncIterable whose iterator yields stream handles for
discoveries whose namespace is exactly one segment deeper than
path and shares it as a prefix. Iteration begins at
startAt (so each caller picks up only discoveries added
after its construction) and terminates when the underlying log
closes or fails.
A helper utility function that returns the LangGraphRunnableConfig that was set when the graph was initialized.
Note: This only works when running in an environment that supports node:async_hooks and AsyncLocalStorage. If you're running this in a web environment, access the LangGraphRunnableConfig from the node function directly.
A helper utility function that returns the input for the currently executing task
Get the JSON schema from a SerializableSchema.
A helper utility function for use with the functional API that returns the previous state from the checkpoint from the last invocation of the current thread.
This function allows workflows to access state that was saved in previous runs using entrypoint.final.
Detect if a schema has a default value by validating undefined.
Uses the Standard Schema ~standard.validate API to detect defaults.
If the schema accepts undefined and returns a value, that value is the default.
This approach is library-agnostic and works with any Standard Schema compliant library (Zod, Valibot, ArkType, etc.) without needing to introspect internals.
A helper utility function that returns the BaseStore that was set when the graph was initialized
Used for subgraph detection.
A helper utility function that returns the LangGraphRunnableConfig#writer if "custom" stream mode is enabled, otherwise undefined.
Interrupts the execution of a graph node.
This function can be used to pause execution of a node, and return the value of the resume
input when the graph is re-invoked using Command.
Multiple interrupts can be called within a single node, and each will be handled sequentially.
When an interrupt is called:
resume value available (from a previous Command), it returns that value.GraphInterrupt with the provided valueCommand with a resume valueBecause the interrupt function propagates by throwing a special GraphInterrupt error,
you should avoid using try/catch blocks around the interrupt function,
or if you do, ensure that the GraphInterrupt error is thrown again within your catch block.
A type guard to check if the given value is a Command.
Useful for type narrowing when working with the Command object.
Checks if the given graph invoke / stream chunk contains interrupt.
Type guard that tests whether a transformer is a NativeStreamTransformer.
Type guard to check if a given value is a SerializableSchema, i.e.
both a Standard Schema and a Standard JSON Schema object.
Type guard to check if a given value is a Standard Schema V1 object.
Reducer function for combining two sets of messages in LangGraph's state system.
This reducer handles several tasks:
left and right message inputs to arrays.BaseMessage instances.RemoveMessage instance is encountered in right with the ID REMOVE_ALL_MESSAGES,
all previous messages are discarded and only the subsequent messages in right are returned.left and right messages together following these rules:
right shares an ID with a message in left:
RemoveMessage, that message (by ID) is marked for removal.left.right does not exist in left:
RemoveMessage, this is considered an error (cannot remove non-existent ID).Manually push a message to a message stream.
This is useful when you need to push a manually created message before the node has finished executing.
When a message is pushed, it will be automatically persisted to the state after the node has finished executing.
To disable persisting, set options.stateKey to null.
Define a LangGraph task using the task function.
Tasks can only be called from within an entrypoint or from within a StateGraph. A task can be called like a regular function with the following differences:
Abstract base class for persistent key-value stores.
Stores enable persistence and memory that can be shared across threads, scoped to user IDs, assistant IDs, or other arbitrary namespaces.
Features:
Abstract base class for persistent key-value stores.
Stores enable persistence and memory that can be shared across threads, scoped to user IDs, assistant IDs, or other arbitrary namespaces.
Features:
Stores the result of applying a binary operator to the current value and each new value.
The main stream object returned by chat model streaming.
Implements AsyncIterable<ChatModelStreamEvent> for raw event access
and PromiseLike<AIMessage> for simple await usage.
One or more commands to update the graph's state and send messages to nodes. Can be used to combine routing logic with state updates in lieu of conditional edges
Final result from building and compiling a StateGraph.
Should not be instantiated directly, only using the StateGraph .compile()
instance method.
A projection channel for StreamTransformers.
Implements AsyncIterable<T> so it can be iterated directly by
in-process consumers via run.extensions.<key>. Channels created with
StreamChannel.remote or new StreamChannel(name) are also
auto-forwarded to remote clients.
Primary run stream for a LangGraph execution.
Implements AsyncIterable over ProtocolEvent and exposes ergonomic projections for values, messages, subgraphs, output, and interrupts. Created by createGraphRunStream.
In-memory key-value store with optional vector search.
A lightweight store implementation using JavaScript Maps. Supports basic key-value operations and vector search when configured with embeddings.
Raised by a node to interrupt execution.
Bypass a reducer and write the wrapped value directly to a BinaryOperatorAggregate channel.
Receiving multiple Overwrite values for the same channel in a single
super-step will raise an InvalidUpdateError.
The Pregel class is the core runtime engine of LangGraph, implementing a message-passing graph computation model inspired by Google's Pregel system. It provides the foundation for building reliable, controllable agent workflows that can evolve state over time.
Key features:
The Pregel class is not intended to be instantiated directly by consumers. Instead, use the following higher-level APIs:
PregelPregel instance is returned by the entrypoint functionRepresents a state field whose value is computed and updated using a reducer function.
ReducedValue allows you to define accumulators, counters, aggregators, or other fields whose value is determined incrementally by applying a reducer to incoming updates.
Each time a new input is provided, the reducer function is called with the current output and the new input, producing an updated value. Input validation can be controlled separately from output validation by providing an explicit input schema.
Exception raised when an error occurs in the remote graph.
A message or packet to send to a specific node in the graph.
The Send class is used within a StateGraph's conditional edges to
dynamically invoke a node with a custom state at the next step.
Importantly, the sent state can differ from the core graph's state, allowing for flexible and dynamic workflow management.
One such example is a "map-reduce" workflow where your graph invokes the same node multiple times in parallel with different states, before aggregating the results back into the main graph's state.
A graph whose nodes communicate by reading and writing to a shared state.
Each node takes a defined State as input and returns a Partial<State>.
Each state key can optionally be annotated with a reducer function that will be used to aggregate the values of that key received from multiple nodes. The signature of a reducer function is (left: Value, right: UpdateValue) => Value.
See Annotation for more on defining state.
After adding nodes and edges to your graph, you must call .compile() on it before
you can use it.
Error thrown when invalid input is provided to a StateGraph.
This typically means that the input to the StateGraph constructor or builder did not match the required types. A valid input should be a StateDefinition, an Annotation.Root, or a Zod schema.
StateSchema provides a unified API for defining LangGraph state schemas.
A projection channel for StreamTransformers.
Implements AsyncIterable<T> so it can be iterated directly by
in-process consumers via run.extensions.<key>. Channels created with
StreamChannel.remote or new StreamChannel(name) are also
auto-forwarded to remote clients.
A run stream for a child subgraph within a parent graph execution.
Extends GraphRunStream with a parsed name and
index extracted from the last segment of the namespace path.
The segment is expected to follow the "name:index" convention;
when no numeric suffix is present, index defaults to 0.
Represents a state field whose value is transient and never checkpointed.
Use UntrackedValue for state fields that should be tracked for the lifetime of the process, but should not participate in durable checkpoints or recovery.
Should not be instantiated directly. See Annotation.
Stores the last value received, assumes that if multiple values are received, they are all equal.
Note: Unlike 'LastValue' if multiple nodes write to this channel in a single step, the values will be continuously overwritten.
The Pregel class is the core runtime engine of LangGraph, implementing a message-passing graph computation model inspired by Google's Pregel system. It provides the foundation for building reliable, controllable agent workflows that can evolve state over time.
Key features:
The Pregel class is not intended to be instantiated directly by consumers. Instead, use the following higher-level APIs:
PregelPregel instance is returned by the entrypoint functionType bag for ConditionalEdgeRouter that accepts schema types. Unlike GraphNodeTypes, conditional edges don't have separate input/output - they just read state and return routing decisions.
Converts a raw [ns, mode, payload, meta?] stream chunk emitted by
graph.stream() into one or more CDDL-aligned ProtocolEvents.
Most modes produce a single event. values chunks carrying
StreamChunkMeta.checkpoint additionally produce a companion
checkpoints event immediately after the values event, so clients
that subscribe only to checkpoints can build a branching timeline
without also paying for full-state values payloads, and clients that
subscribe to both can correlate the pair by (namespace, step) or by
adjacent seq numbers.
Returns an empty array for stream modes that have no protocol mapping.
Options accepted by createGraphRunStream.
A channel that switches between two states
Stores the value received in the step immediately preceding, clears after.
Read-only execution info/metadata for the execution of current thread/run/node.
Operation to retrieve an item by namespace and ID.
Type bag for GraphNode that accepts schema types. All fields are optional - unspecified fields use defaults.
This enables separate input/output schemas for nodes, which is useful when a node receives a subset of state fields and returns different fields.
Human-in-the-loop interrupt: stable id plus opaque payload for resume UIs.
Represents a stored item with metadata.
Single lifecycle entry surfaced by the in-process
GraphRunStream.lifecycle projection. Combines the CDDL
LifecycleData payload with the namespace it applies to so
consumers can filter or correlate without dipping into raw
ProtocolEvents.
Projection returned from the lifecycle transformer's init().
The local StreamChannel is closed automatically when the transformer
finalizes or fails. _lifecycleLog is intentionally underscore-prefixed to
signal that it is consumed by the run stream wiring
(see run-stream.ts) and not meant for direct user access -
consumers should read run.lifecycle instead.
The lifecycle iterable is the root-scoped projection (prefix
[], starting at offset 0) mirroring the pattern used by the
subgraph discovery transformer. Root stream wiring consumes it
via SET_LIFECYCLE_ITERABLE; child streams are wired with their
own path-scoped iterable produced by filterLifecycleEntries.
Configuration knobs for createLifecycleTransformer.
Operation to list and filter namespaces in the store.
A channel that waits until all named values are received before making the value available.
This ensures that if node N and node M both write to channel C, the value of C will not be updated until N and M have completed updating.
Marker interface for transformers provided by internal LangChain products (e.g. ReactAgent's ToolCallTransformer, DeepAgent's SubagentTransformer).
Native transformers differ from user-defined extension transformers in where their projection lands on the run stream:
Native — projections become direct getters on a
GraphRunStream subclass (e.g. run.toolCalls, run.subagents).
They emit events on protocol-defined channels (tools, lifecycle,
tasks, etc.).
Extension (user-defined) — projections are merged into
run.extensions. Events emitted via emit() use an
application-chosen method name (e.g. emit("a2a", data)) and are
accessible to remote clients via session.subscribe("custom:<name>").
The __native brand is used by downstream stream factory functions
to distinguish native transformers from extension transformers at
registration time. See docs/native-stream-transformers.md for the
full pattern.
An object representing a direct overwrite of a value for a channel. Used to signal that the channel value should be replaced with the given value, bypassing any reducer or binary operator logic.
Configuration options for executing a Pregel graph. These options control how the graph executes, what data is streamed, and how interrupts are handled.
Single envelope for a streaming protocol emission: sequence, channel
(method), and payload (params).
Operation to store, update, or delete an item.
Operation to search for items within a namespace prefix.
Metadata injected by LangGraph Server. Undefined when running open-source LangGraph without LangSmith deployments.
Narrow capability handle passed to StreamTransformer.onRegister. Exposes only the minimal mux surface required for synthetic event emission — intentionally does not expose close/fail/register/etc. to keep the transformer contract small and tamper-resistant.
Observes ProtocolEvents during a graph run and builds typed derived projections (secondary event logs, promises, etc.).
Data is surfaced to consumers through projections returned from
init(). Projections are merged into GraphRunStream.extensions for
in-process consumers. Use StreamChannel.local for local streaming
values, StreamChannel.remote for values that should also be visible
to remote clients, or Promise<T> for final values.
To make projection data available to remote clients (SDK consumers
over WebSocket / SSE), create a named channel with
StreamChannel.remote(name). The StreamMux detects named
StreamChannel instances in the init() return and auto-forwards every
push() as a ProtocolEvent on the channel's named method. Remote
clients subscribe via session.subscribe("custom:<name>").
finalize and fail are optional. When a transformer uses
StreamChannel, the mux auto-closes/fails the channels on run
completion — no manual lifecycle management needed. Implement
finalize/fail only for non-channel teardown (e.g. resolving a
Promise).
Projection returned by createSubgraphDiscoveryTransformer.
Configuration for createSubgraphDiscoveryTransformer.
Options for the task function
Stable handle for one tool call: name, arguments, and async results.
Emitted when content-block-finish delivers a finalized tool_call block.
A configurable PubSub Topic.
Initialization options for UntrackedValue.
Configuration for caching nodes.
Additional details about the checkpoint, including the source, step, writes, and parents.
Convenience type for referencing a compiled graph by named type slots.
Type for conditional edge routing functions.
Use this to type functions passed to addConditionalEdges for
full type safety on state, runtime context, and return values.
Supports two patterns:
Single schema pattern - Single schema:
ConditionalEdgeRouter<Schema, Context, Nodes>
Type bag pattern - Separate schemas for state, context:
ConditionalEdgeRouter<{ Schema; ContextSchema; Nodes }>
Options for the entrypoint function
Extract the State type from any supported schema type.
Supports:
Extract the Update type from any supported schema type.
The Update type represents what a node can return to update the state. All fields are optional since nodes only need to return the fields they modify.
Supports:
Options for getting the state of the graph.
Strongly-typed utility for authoring graph nodes outside of the StateGraph builder, supporting inference for both state (from Schema) and config context (from Context type).
This type enables you to define graph node functions with full type safety—both for the evolving state and for additional context that may be passed in at runtime. Typing the context parameter allows for better code organization and precise editor support.
Works with StateSchema, AnnotationRoot, and Zod object schemas for state, and with a user-defined object shape for context.
Supports two patterns:
Single schema usage - Single schema for both input and output:
GraphNode<Schema, Context, Nodes>
Type bag pattern - Separate schemas for input, output, context:
GraphNode<{ InputSchema; OutputSchema; ContextSchema; Nodes }>
Return value type for GraphNode functions. Nodes can return an update object, a Command, or a Promise of either.
Infers the merged extensions type from a tuple of transformer factory functions.
Given [() => StreamTransformer<{ a: number }>, () => StreamTransformer<{ b: string }>],
produces { a: number } & { b: string }.
Infer the Update type from a StateSchemaFields. This is the type for partial updates to state.
Infer the State type from a StateSchemaFields. This is the type of the full state object.
Type that represents an acceptable input for the messages state reducer.
BaseMessage or BaseMessageLike.BaseMessage or BaseMessageLike.Options for subscribing to multiple channels.
Hierarchical path identifying a position in the agent tree.
Each element is one segment; longer arrays mean deeper nesting (e.g. subgraph or multi-agent scopes).
Initialization options for ReducedValue.
Two forms are supported:
jsonSchemaExtra)—in this case, the reducer's inputs are validated using the output value schema.inputSchema field to distinguish the reducer's input type from the stored/output type.Options for subscribing to a single channel.
Options for StateGraph.addNode() method.
Initialization options for StateGraph. Accepts any combination of schema types for state/input/output.
Supports both state and stateSchema as aliases for backward compatibility.
If only input is provided (no state/stateSchema), input is used as the state schema.
Valid field types for StateSchema. Either a LangGraph state value type or a raw schema (e.g., Zod schema).
Init object for StateSchema constructor.
Uses any to allow variance in generic types (e.g., ReducedValue<string, string[]>).
Converts StateSchema fields into a strongly-typed State Definition object, where each field is mapped to its channel type.
This utility type is used internally to create the shape of the state channels for a given schema,
substituting each field with the result of StateSchemaFieldToChannel.
If you define a state schema as:
const fields = {
a: ReducedValue<number, string>(),
b: UntrackedValue<boolean>(),
c: SomeSerializableSchemaType, // SerializableSchema<in, out>
}
then StateSchemaFieldsToStateDefinition<typeof fields> yields:
{
a: BaseChannel<number, string>;
b: BaseChannel<boolean, boolean>;
c: BaseChannel<typeof schema's output type, typeof schema's input type>;
}Maps a single StateSchema field definition to its corresponding Channel type.
This utility type inspects the type of the field and returns an appropriate
BaseChannel type, parameterized with the state "value" and "input" types according to the field's shape.
Rules:
F) is a ReducedValue<V, I>, the channel will store values of type V
and accept input of type I.UntrackedValue<V>, the channel will store and accept values of type V.SerializableSchema<I, O>, the channel will store values of type O
(the schema's output/validated value) and accept input of type I.BaseChannel<unknown, unknown> is used as fallback.Selects the type of output you'll receive when streaming from the graph. See Streaming for more details.
High-level outcome of a single tool call for UI or aggregators.
Special reserved node name denoting the end of a graph.
Special channel reserved for graph interrupts
Prebuilt state annotation that combines returned messages. Can handle standard messages and special modifiers like RemoveMessage instances.
Specifically, importing and using the prebuilt MessagesAnnotation like this:
import { MessagesAnnotation, StateGraph } from "@langchain/langgraph";
const graph = new StateGraph(MessagesAnnotation)
.addNode(...)
...import { BaseMessage } from "@langchain/core/messages";
import { Annotation, StateGraph, messagesStateReducer } from "@langchain/langgraph";
export const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: messagesStateReducer,
default: () => [],
}),
});
const graph = new StateGraph(StateAnnotation)
.addNode(...)
...Prebuilt schema meta for Zod state definition.
import { z } from "zod/v4-mini";
import { MessagesZodState, StateGraph } from "@langchain/langgraph";
const AgentState = z.object({
messages: z.custom<BaseMessage[]>().register(registry, MessagesZodMeta),
});Prebuilt state object that uses Zod to combine returned messages.
This utility is synonymous with the MessagesAnnotation annotation,
but uses Zod as the way to express messages state.
You can use import and use this prebuilt schema like this:
import { MessagesZodState, StateGraph } from "@langchain/langgraph";
const graph = new StateGraph(MessagesZodState)
.addNode(...)
...import { z } from "zod";
import type { BaseMessage, BaseMessageLike } from "@langchain/core/messages";
import { StateGraph, messagesStateReducer } from "@langchain/langgraph";
import "@langchain/langgraph/zod";
const AgentState = z.object({
messages: z
.custom<BaseMessage[]>()
.default(() => [])
.langgraph.reducer(
messagesStateReducer,
z.custom<BaseMessageLike | BaseMessageLike[]>()
),
});
const graph = new StateGraph(AgentState)
.addNode(...)
...Special value that signifies the intent to remove all previous messages in the state reducer.
Used as the unique identifier for a RemoveMessage instance which, when encountered,
causes all prior messages to be discarded, leaving only those following this marker.
Special reserved node name denoting the start of a graph.
The set of stream modes requested by
streamEvents(..., { version: "v3" }) — every mode the protocol maps
to a channel.
The verbose "debug" mode is intentionally excluded: it was a thin
re-wrap of checkpoints + tasks carrying no new information.
The "checkpoints" mode is likewise excluded from the stream-mode
request because the protocol's checkpoints channel carries only a
lightweight envelope (id, parent_id, step, source) derived from
StreamChunkMeta.checkpoint on the adjacent values chunk — not
the full-state shape that Pregel's checkpoints stream mode produces.
convertToProtocolEvent emits a companion checkpoints protocol event
next to each values event when meta is present, so clients can build
branching/time-travel UIs without a full-state checkpoints subscription.
Helper that instantiates channels within a StateGraph state.
Can be used as a field in an Annotation.Root wrapper in one of two ways:
import { StateGraph, Annotation } from "@langchain/langgraph";
// Define a state with a single string key named "currentOutput"
const SimpleAnnotation = Annotation.Root({
currentOutput: Annotation<string>,
});
const graphBuilder = new StateGraph(SimpleAnnotation);
// A node in the graph that returns an object with a "currentOutput" key
// replaces the value in the state. You can get the state type as shown below:
const myNode = (state: typeof SimpleAnnotation.State) => {
return {
currentOutput: "some_new_value",
};
}
const graph = graphBuilder
.addNode("myNode", myNode)
...
.compile();import { type BaseMessage, AIMessage } from "@langchain/core/messages";
import { StateGraph, Annotation } from "@langchain/langgraph";
// Define a state with a single key named "messages" that will
// combine a returned BaseMessage or arrays of BaseMessages
const AnnotationWithReducer = Annotation.Root({
messages: Annotation<BaseMessage[]>({
// Different types are allowed for updates
reducer: (left: BaseMessage[], right: BaseMessage | BaseMessage[]) => {
if (Array.isArray(right)) {
return left.concat(right);
}
return left.concat([right]);
},
default: () => [],
}),
});
const graphBuilder = new StateGraph(AnnotationWithReducer);
// A node in the graph that returns an object with a "messages" key
// will update the state by combining the existing value with the returned one.
const myNode = (state: typeof AnnotationWithReducer.State) => {
return {
messages: [new AIMessage("Some new response")],
};
};
const graph = graphBuilder
.addNode("myNode", myNode)
...
.compile();