LangChain Reference home pageLangChain ReferenceLangChain Reference
  • GitHub
  • Main Docs
Deep Agents
LangChain
LangGraph
Integrations
LangSmith
  • Overview
  • Graphs
  • Functional API
  • Pregel
  • Checkpointing
  • Storage
  • Caching
  • Types
  • Runtime
  • Config
  • Errors
  • Constants
  • Channels
  • Agents
LangGraph CLI
LangGraph SDK
LangGraph Supervisor
LangGraph Swarm
⌘I

LangChain Assistant

Ask a question to get started

Enter to send•Shift+Enter new line

Menu

OverviewGraphsFunctional APIPregelCheckpointingStorageCachingTypesRuntimeConfigErrorsConstantsChannelsAgents
LangGraph CLI
LangGraph SDK
LangGraph Supervisor
LangGraph Swarm
Language
Theme
PythonlanggraphpregelmainPregel
Class●Since v0.6

Pregel

Copy
Pregel(
  self,
  *,
  nodes: dict[str, PregelNode | NodeBuilder],
  channels: dict[str, BaseChannel | ManagedValueSpec] | None,
  auto_validate: bool = True,
  stream_mode: StreamMode = 'values',
  stream_eager: bool = False,
  output_channels: str | Sequence[str],
  stream_channels: str | Sequence[str] | None = None,
  interrupt_after_nodes: All | Sequence[str] = (),
  interrupt_before_nodes: All | Sequence[str] = (),
  input_channels: str | Sequence[str],
  step_timeout: float | None = None,
  debug: bool | None = None,
  checkpointer: Checkpointer = None,
  store: BaseStore | None = None,
  cache: BaseCache | None = None,
  retry_policy: RetryPolicy | Sequence[RetryPolicy] = (),
  cache_policy: CachePolicy | None = None,
  context_schema: type[ContextT] | None = None,
  config: RunnableConfig | None = None,
  trigger_to_nodes: Mapping[str, Sequence[str]] | None = None,
  name: str = 'LangGraph',
  **deprecated_kwargs: Unpack[DeprecatedKwargs] = {}
)

Bases

PregelProtocol[StateT, ContextT, InputT, OutputT]Generic[StateT, ContextT, InputT, OutputT]

Constructors

constructor
__init__
NameType
nodesdict[str, PregelNode | NodeBuilder]
channelsdict[str, BaseChannel | ManagedValueSpec

Attributes

attribute
nodes: dict[str, PregelNode]
attribute
channels: dict[str, BaseChannel | ManagedValueSpec]
attribute
stream_mode: StreamMode
attribute
stream_eager: bool
attribute
output_channels: str | Sequence[str]
attribute
stream_channels: str | Sequence[str] | None
attribute
interrupt_after_nodes: All | Sequence[str]
attribute
interrupt_before_nodes: All | Sequence[str]
attribute
input_channels: str | Sequence[str]
attribute
step_timeout: float | None
attribute
debug: bool
attribute
checkpointer: Checkpointer
attribute
store: BaseStore | None
attribute
cache: BaseCache | None
attribute
retry_policy: Sequence[RetryPolicy]
attribute
cache_policy: CachePolicy | None
attribute
context_schema: type[ContextT] | None
attribute
config: RunnableConfig | None
attribute
name: str
attribute
trigger_to_nodes: Mapping[str, Sequence[str]]
attribute
InputType: Any
attribute
OutputType: Any
attribute
stream_channels_list: Sequence[str]
attribute
stream_channels_asis: str | Sequence[str]

Methods

method
get_graph
method
aget_graph
method
copy
method
with_config
method
validate
method
get_context_jsonschema
method
get_input_schema
method
get_input_jsonschema
method
get_output_schema
method
get_output_jsonschema
method
get_subgraphs
method
aget_subgraphs
method
get_state
method
aget_state
method
get_state_history
method
aget_state_history
method
bulk_update_state
method
abulk_update_state
method
update_state
method
aupdate_state
method
stream
method
astream
method
invoke
method
ainvoke
method
clear_cache
method
aclear_cache
deprecatedmethod
config_schema
deprecatedmethod
get_config_jsonschema

Inherited fromRunnable(langchain_core)

Attributes

Ainput_schemaAoutput_schemaAconfig_specs

Methods

Mget_nameMget_promptsMpipeMpickMassignMbatchMbatch_as_completedMabatchMabatch_as_completedMastream_logMastream_eventsMtransformMatransformMbindMwith_listenersMwith_alistenersMwith_typesMwith_retryMmapMwith_fallbacksMas_tool
View source on GitHub

Pregel manages the runtime behavior for LangGraph applications.

Overview

Pregel combines actors and channels into a single application. Actors read data from channels and write data to channels. Pregel organizes the execution of the application into multiple steps, following the Pregel Algorithm/Bulk Synchronous Parallel model.

Each step consists of three phases:

  • Plan: Determine which actors to execute in this step. For example, in the first step, select the actors that subscribe to the special input channels; in subsequent steps, select the actors that subscribe to channels updated in the previous step.
  • Execution: Execute all selected actors in parallel, until all complete, or one fails, or a timeout is reached. During this phase, channel updates are invisible to actors until the next step.
  • Update: Update the channels with the values written by the actors in this step.

Repeat until no actors are selected for execution, or a maximum number of steps is reached.

Actors

An actor is a PregelNode. It subscribes to channels, reads data from them, and writes data to them. It can be thought of as an actor in the Pregel algorithm. PregelNodes implement LangChain's Runnable interface.

Channels

Channels are used to communicate between actors (PregelNodes). Each channel has a value type, an update type, and an update function – which takes a sequence of updates and modifies the stored value. Channels can be used to send data from one chain to another, or to send data from a chain to itself in a future step. LangGraph provides a number of built-in channels:

Basic channels: LastValue and Topic

  • LastValue: The default channel, stores the last value sent to the channel, useful for input and output values, or for sending data from one step to the next
  • Topic: A configurable PubSub Topic, useful for sending multiple values between actors, or for accumulating output. Can be configured to deduplicate values, and/or to accumulate values over the course of multiple steps.

Advanced channels: Context and BinaryOperatorAggregate

  • Context: exposes the value of a context manager, managing its lifecycle. Useful for accessing external resources that require setup and/or teardown. e.g. client = Context(httpx.Client)
  • BinaryOperatorAggregate: stores a persistent value, updated by applying a binary operator to the current value and each update sent to the channel, useful for computing aggregates over multiple steps. e.g. total = BinaryOperatorAggregate(int, operator.add)

Examples

Most users will interact with Pregel via a StateGraph (Graph API) or via an entrypoint (Functional API).

However, for advanced use cases, Pregel can be used directly. If you're not sure whether you need to use Pregel directly, then the answer is probably no

  • you should use the Graph API or Functional API instead. These are higher-level interfaces that will compile down to Pregel under the hood.

Here are some examples to give you a sense of how it works:

Single node application:

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo'}

Using multiple nodes and multiple output channels:

from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

node2 = (
    NodeBuilder().subscribe_to("b")
    .do(lambda x: x["b"] + x["b"])
    .write_to("c")
)

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": LastValue(str),
        "c": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b", "c"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo', 'c': 'foofoofoofoo'}

Using a Topic channel:

from langgraph.channels import LastValue, EphemeralValue, Topic
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b", "c")
)

node2 = (
    NodeBuilder().subscribe_only("b")
    .do(lambda x: x + x)
    .write_to("c")
)

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": Topic(str, accumulate=True),
    },
    input_channels=["a"],
    output_channels=["c"],
)

app.invoke({"a": "foo"})
{"c": ["foofoo", "foofoofoofoo"]}

Using a BinaryOperatorAggregate channel:

from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b", "c")
)

node2 = (
    NodeBuilder().subscribe_only("b")
    .do(lambda x: x + x)
    .write_to("c")
)

def reducer(current, update):
    if current:
        return current + " | " + update
    else:
        return update

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": BinaryOperatorAggregate(str, operator=reducer),
    },
    input_channels=["a"],
    output_channels=["c"],
)

app.invoke({"a": "foo"})
{'c': 'foofoo | foofoofoofoo'}

Introducing a cycle:

This example demonstrates how to introduce a cycle in the graph, by having a chain write to a channel it subscribes to.

Execution will continue until a None value is written to the channel.

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry

example_node = (
    NodeBuilder()
    .subscribe_only("value")
    .do(lambda x: x + x if len(x) < 10 else None)
    .write_to(ChannelWriteEntry(channel="value", skip_none=True))
)

app = Pregel(
    nodes={"example_node": example_node},
    channels={
        "value": EphemeralValue(str),
    },
    input_channels=["value"],
    output_channels=["value"],
)

app.invoke({"value": "a"})
{'value': 'aaaaaaaaaaaaaaaa'}
] |
None
auto_validatebool
stream_modeStreamMode
stream_eagerbool
output_channelsstr | Sequence[str]
stream_channelsstr | Sequence[str] | None
interrupt_after_nodesAll | Sequence[str]
interrupt_before_nodesAll | Sequence[str]
input_channelsstr | Sequence[str]
step_timeoutfloat | None
debugbool | None
checkpointerCheckpointer
storeBaseStore | None
cacheBaseCache | None
retry_policyRetryPolicy | Sequence[RetryPolicy]
cache_policyCachePolicy | None
context_schematype[ContextT] | None
configRunnableConfig | None
trigger_to_nodesMapping[str, Sequence[str]] | None
namestr

Mode to stream output, defaults to 'values'.

Whether to force emitting stream events eagerly, automatically turned on for stream_mode "messages" and "custom".

Channels to stream, defaults to all channels not in reserved channels

Maximum time to wait for a step to complete, in seconds.

Whether to print debug information during execution.

Checkpointer used to save and load graph state.

Memory store to use for SharedValues.

Cache to use for storing node results.

Retry policies to use when running tasks. Empty set disables retries.

Cache policy to use for all nodes. Can be overridden by individual nodes.

Specifies the schema for the context object that will be passed to the workflow.

Return a drawable representation of the computation graph.

Return a drawable representation of the computation graph.

Create a copy of the Pregel object with an updated config.

Get the subgraphs of the graph.

Get the subgraphs of the graph.

Get the current state of the graph.

Get the current state of the graph.

Get the history of the state of the graph.

Asynchronously get the history of the state of the graph.

Apply updates to the graph state in bulk. Requires a checkpointer to be set.

Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.

Update the state of the graph with the given values, as if they came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.

Asynchronously update the state of the graph with the given values, as if they came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.

Stream graph steps for a single input.

Asynchronously stream graph steps for a single input.

Run the graph with a single input and config.

Asynchronously run the graph with a single input and config.

Clear the cache for the given nodes.

Asynchronously clear the cache for the given nodes.