Pregel(
self,
*,
nodes: dict[str, PregelNode | NodeBuilder],
channels: dict[str, BaseChannel | ManagedValueSpec] | None,
auto_validate: bool = True,
stream_mode: StreamMode = 'values',
stream_eager: bool = False,
output_channels: str | Sequence[str],
stream_channels: str | Sequence[str] | None = None,
interrupt_after_nodes: All | Sequence[str] = (),
interrupt_before_nodes: All | Sequence[str] = (),
input_channels: str | Sequence[str],
step_timeout: float | None = None,
debug: bool | None = None,
checkpointer: Checkpointer = None,
store: BaseStore | None = None,
cache: BaseCache | None = None,
retry_policy: RetryPolicy | Sequence[RetryPolicy] = (),
cache_policy: CachePolicy | None = None,
context_schema: type[ContextT] | None = None,
config: RunnableConfig | None = None,
trigger_to_nodes: Mapping[str, Sequence[str]] | None = None,
name: str = 'LangGraph',
**deprecated_kwargs: Unpack[DeprecatedKwargs] = {}
)| Name | Type |
|---|---|
| nodes | dict[str, PregelNode | NodeBuilder] |
| channels | dict[str, BaseChannel | ManagedValueSpec |
Pregel manages the runtime behavior for LangGraph applications.
Pregel combines actors and channels into a single application. Actors read data from channels and write data to channels. Pregel organizes the execution of the application into multiple steps, following the Pregel Algorithm/Bulk Synchronous Parallel model.
Each step consists of three phases:
Repeat until no actors are selected for execution, or a maximum number of steps is reached.
An actor is a PregelNode.
It subscribes to channels, reads data from them, and writes data to them.
It can be thought of as an actor in the Pregel algorithm.
PregelNodes implement LangChain's
Runnable interface.
Channels are used to communicate between actors (PregelNodes).
Each channel has a value type, an update type, and an update function – which
takes a sequence of updates and
modifies the stored value. Channels can be used to send data from one chain to
another, or to send data from a chain to itself in a future step. LangGraph
provides a number of built-in channels:
LastValue: The default channel, stores the last value sent to the channel,
useful for input and output values, or for sending data from one step to the nextTopic: A configurable PubSub Topic, useful for sending multiple values
between actors, or for accumulating output. Can be configured to deduplicate
values, and/or to accumulate values over the course of multiple steps.Context: exposes the value of a context manager, managing its lifecycle.
Useful for accessing external resources that require setup and/or teardown. e.g.
client = Context(httpx.Client)BinaryOperatorAggregate: stores a persistent value, updated by applying
a binary operator to the current value and each update
sent to the channel, useful for computing aggregates over multiple steps. e.g.
total = BinaryOperatorAggregate(int, operator.add)Most users will interact with Pregel via a StateGraph (Graph API) or via an entrypoint (Functional API).
However, for advanced use cases, Pregel can be used directly. If you're not sure whether you need to use Pregel directly, then the answer is probably no
Here are some examples to give you a sense of how it works:
Single node application:
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
)
app = Pregel(
nodes={"node1": node1},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b"],
)
app.invoke({"a": "foo"})
{'b': 'foofoo'}
Using multiple nodes and multiple output channels:
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
)
node2 = (
NodeBuilder().subscribe_to("b")
.do(lambda x: x["b"] + x["b"])
.write_to("c")
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": LastValue(str),
"c": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b", "c"],
)
app.invoke({"a": "foo"})
{'b': 'foofoo', 'c': 'foofoofoofoo'}
Using a Topic channel:
from langgraph.channels import LastValue, EphemeralValue, Topic
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b", "c")
)
node2 = (
NodeBuilder().subscribe_only("b")
.do(lambda x: x + x)
.write_to("c")
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": Topic(str, accumulate=True),
},
input_channels=["a"],
output_channels=["c"],
)
app.invoke({"a": "foo"})
{"c": ["foofoo", "foofoofoofoo"]}
Using a BinaryOperatorAggregate channel:
from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b", "c")
)
node2 = (
NodeBuilder().subscribe_only("b")
.do(lambda x: x + x)
.write_to("c")
)
def reducer(current, update):
if current:
return current + " | " + update
else:
return update
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": BinaryOperatorAggregate(str, operator=reducer),
},
input_channels=["a"],
output_channels=["c"],
)
app.invoke({"a": "foo"})
{'c': 'foofoo | foofoofoofoo'}
Introducing a cycle:
This example demonstrates how to introduce a cycle in the graph, by having a chain write to a channel it subscribes to.
Execution will continue until a None value is written to the channel.
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry
example_node = (
NodeBuilder()
.subscribe_only("value")
.do(lambda x: x + x if len(x) < 10 else None)
.write_to(ChannelWriteEntry(channel="value", skip_none=True))
)
app = Pregel(
nodes={"example_node": example_node},
channels={
"value": EphemeralValue(str),
},
input_channels=["value"],
output_channels=["value"],
)
app.invoke({"value": "a"})
{'value': 'aaaaaaaaaaaaaaaa'}
| auto_validate | bool |
| stream_mode | StreamMode |
| stream_eager | bool |
| output_channels | str | Sequence[str] |
| stream_channels | str | Sequence[str] | None |
| interrupt_after_nodes | All | Sequence[str] |
| interrupt_before_nodes | All | Sequence[str] |
| input_channels | str | Sequence[str] |
| step_timeout | float | None |
| debug | bool | None |
| checkpointer | Checkpointer |
| store | BaseStore | None |
| cache | BaseCache | None |
| retry_policy | RetryPolicy | Sequence[RetryPolicy] |
| cache_policy | CachePolicy | None |
| context_schema | type[ContextT] | None |
| config | RunnableConfig | None |
| trigger_to_nodes | Mapping[str, Sequence[str]] | None |
| name | str |
Mode to stream output, defaults to 'values'.
Whether to force emitting stream events eagerly, automatically turned on for stream_mode "messages" and "custom".
Channels to stream, defaults to all channels not in reserved channels
Maximum time to wait for a step to complete, in seconds.
Whether to print debug information during execution.
Checkpointer used to save and load graph state.
Memory store to use for SharedValues.
Cache to use for storing node results.
Retry policies to use when running tasks. Empty set disables retries.
Cache policy to use for all nodes. Can be overridden by individual nodes.
Specifies the schema for the context object that will be passed to the workflow.
Return a drawable representation of the computation graph.
Return a drawable representation of the computation graph.
Create a copy of the Pregel object with an updated config.
Get the subgraphs of the graph.
Get the subgraphs of the graph.
Get the current state of the graph.
Get the current state of the graph.
Get the history of the state of the graph.
Asynchronously get the history of the state of the graph.
Apply updates to the graph state in bulk. Requires a checkpointer to be set.
Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.
Update the state of the graph with the given values, as if they came from
node as_node. If as_node is not provided, it will be set to the last node
that updated the state, if not ambiguous.
Asynchronously update the state of the graph with the given values, as if they came from
node as_node. If as_node is not provided, it will be set to the last node
that updated the state, if not ambiguous.
Stream graph steps for a single input.
Asynchronously stream graph steps for a single input.
Run the graph with a single input and config.
Asynchronously run the graph with a single input and config.
Clear the cache for the given nodes.
Asynchronously clear the cache for the given nodes.