LangChain Reference home pageLangChain ReferenceLangChain Reference
  • GitHub
  • Main Docs
Deep Agents
LangChain
LangGraph
Integrations
LangSmith
  • Overview
    • Overview
    • Graphs
    • Functional API
    • Pregel
    • Checkpointing
    • Storage
    • Caching
    • Types
    • Runtime
    • Config
    • Errors
    • Constants
    • Channels
    • Agents
    LangGraph CLI
    LangGraph SDK
    LangGraph Supervisor
    LangGraph Swarm
    ⌘I

    LangChain Assistant

    Ask a question to get started

    Enter to send•Shift+Enter new line

    Menu

    OverviewGraphsFunctional APIPregelCheckpointingStorageCachingTypesRuntimeConfigErrorsConstantsChannelsAgents
    LangGraph CLI
    LangGraph SDK
    LangGraph Supervisor
    LangGraph Swarm
    Language
    Theme
    PythonlanggraphpregelmainPregel
    Class●Since v0.6

    Pregel

    Copy
    Pregel(
      self,
      *,
      nodes: dict[str, PregelNode | NodeBuilder],
      channels: dict[str, BaseChannel | ManagedValueSpec] | None,
      auto_validate: bool = True,
      stream_mode: StreamMode = 'values',
      stream_eager: bool = False,
      output_channels: str | Sequence[str],
      stream_channels: str | Sequence[str] | None = None,
      interrupt_after_nodes: All | Sequence[str] = (),
      interrupt_before_nodes: All | Sequence[str] = (),
      input_channels: str | Sequence[str],
      step_timeout: float | None = None,
      debug: bool | None = None,
      checkpointer: Checkpointer = None,
      store: BaseStore | None = None,
      cache: BaseCache | None = None,
      retry_policy: RetryPolicy | Sequence[RetryPolicy] = (),
      cache_policy: CachePolicy | None = None,
      context_schema: type[ContextT] | None = None,
      config: RunnableConfig | None = None,
      trigger_to_nodes: Mapping[str, Sequence[str]] | None = None,
      name: str = 'LangGraph',
      **deprecated_kwargs: Unpack[DeprecatedKwargs] = {}
    )

    Bases

    PregelProtocol[StateT, ContextT, InputT, OutputT]Generic[StateT, ContextT, InputT, OutputT]

    Constructors

    constructor
    __init__
    NameType
    nodesdict[str, PregelNode | NodeBuilder]
    channelsdict[str, BaseChannel | ManagedValueSpec

    Attributes

    attribute
    nodes: dict[str, PregelNode]
    attribute
    channels: dict[str, BaseChannel | ManagedValueSpec]
    attribute
    stream_mode: StreamMode
    attribute
    stream_eager: bool
    attribute
    output_channels: str | Sequence[str]
    attribute
    stream_channels: str | Sequence[str] | None
    attribute
    interrupt_after_nodes: All | Sequence[str]
    attribute
    interrupt_before_nodes: All | Sequence[str]
    attribute
    input_channels: str | Sequence[str]
    attribute
    step_timeout: float | None
    attribute
    debug: bool
    attribute
    checkpointer: Checkpointer
    attribute
    store: BaseStore | None
    attribute
    cache: BaseCache | None
    attribute
    retry_policy: Sequence[RetryPolicy]
    attribute
    cache_policy: CachePolicy | None
    attribute
    context_schema: type[ContextT] | None
    attribute
    config: RunnableConfig | None
    attribute
    name: str
    attribute
    trigger_to_nodes: Mapping[str, Sequence[str]]
    attribute
    InputType: Any
    attribute
    OutputType: Any
    attribute
    stream_channels_list: Sequence[str]
    attribute
    stream_channels_asis: str | Sequence[str]

    Methods

    method
    get_graph
    method
    aget_graph
    method
    copy
    method
    with_config
    method
    validate
    method
    get_context_jsonschema
    method
    get_input_schema
    method
    get_input_jsonschema
    method
    get_output_schema
    method
    get_output_jsonschema
    method
    get_subgraphs
    method
    aget_subgraphs
    method
    get_state
    method
    aget_state
    method
    get_state_history
    method
    aget_state_history
    method
    bulk_update_state
    method
    abulk_update_state
    method
    update_state
    method
    aupdate_state
    method
    stream
    method
    astream
    method
    invoke
    method
    ainvoke
    method
    clear_cache
    method
    aclear_cache
    deprecatedmethod
    config_schema
    deprecatedmethod
    get_config_jsonschema

    Inherited fromRunnable(langchain_core)

    Attributes

    Ainput_schemaAoutput_schemaAconfig_specs

    Methods

    Mget_nameMget_promptsMpipeMpickMassignMbatchMbatch_as_completedMabatchMabatch_as_completedMastream_logMastream_eventsMtransformMatransformMbindMwith_listenersMwith_alistenersMwith_typesMwith_retryMmapMwith_fallbacksMas_tool
    View source on GitHub

    Pregel manages the runtime behavior for LangGraph applications.

    Overview

    Pregel combines actors and channels into a single application. Actors read data from channels and write data to channels. Pregel organizes the execution of the application into multiple steps, following the Pregel Algorithm/Bulk Synchronous Parallel model.

    Each step consists of three phases:

    • Plan: Determine which actors to execute in this step. For example, in the first step, select the actors that subscribe to the special input channels; in subsequent steps, select the actors that subscribe to channels updated in the previous step.
    • Execution: Execute all selected actors in parallel, until all complete, or one fails, or a timeout is reached. During this phase, channel updates are invisible to actors until the next step.
    • Update: Update the channels with the values written by the actors in this step.

    Repeat until no actors are selected for execution, or a maximum number of steps is reached.

    Actors

    An actor is a PregelNode. It subscribes to channels, reads data from them, and writes data to them. It can be thought of as an actor in the Pregel algorithm. PregelNodes implement LangChain's Runnable interface.

    Channels

    Channels are used to communicate between actors (PregelNodes). Each channel has a value type, an update type, and an update function – which takes a sequence of updates and modifies the stored value. Channels can be used to send data from one chain to another, or to send data from a chain to itself in a future step. LangGraph provides a number of built-in channels:

    Basic channels: LastValue and Topic

    • LastValue: The default channel, stores the last value sent to the channel, useful for input and output values, or for sending data from one step to the next
    • Topic: A configurable PubSub Topic, useful for sending multiple values between actors, or for accumulating output. Can be configured to deduplicate values, and/or to accumulate values over the course of multiple steps.

    Advanced channels: Context and BinaryOperatorAggregate

    • Context: exposes the value of a context manager, managing its lifecycle. Useful for accessing external resources that require setup and/or teardown. e.g. client = Context(httpx.Client)
    • BinaryOperatorAggregate: stores a persistent value, updated by applying a binary operator to the current value and each update sent to the channel, useful for computing aggregates over multiple steps. e.g. total = BinaryOperatorAggregate(int, operator.add)

    Examples

    Most users will interact with Pregel via a StateGraph (Graph API) or via an entrypoint (Functional API).

    However, for advanced use cases, Pregel can be used directly. If you're not sure whether you need to use Pregel directly, then the answer is probably no

    • you should use the Graph API or Functional API instead. These are higher-level interfaces that will compile down to Pregel under the hood.

    Here are some examples to give you a sense of how it works:

    Single node application:

    from langgraph.channels import EphemeralValue
    from langgraph.pregel import Pregel, NodeBuilder
    
    node1 = (
        NodeBuilder().subscribe_only("a")
        .do(lambda x: x + x)
        .write_to("b")
    )
    
    app = Pregel(
        nodes={"node1": node1},
        channels={
            "a": EphemeralValue(str),
            "b": EphemeralValue(str),
        },
        input_channels=["a"],
        output_channels=["b"],
    )
    
    app.invoke({"a": "foo"})
    {'b': 'foofoo'}
    

    Using multiple nodes and multiple output channels:

    from langgraph.channels import LastValue, EphemeralValue
    from langgraph.pregel import Pregel, NodeBuilder
    
    node1 = (
        NodeBuilder().subscribe_only("a")
        .do(lambda x: x + x)
        .write_to("b")
    )
    
    node2 = (
        NodeBuilder().subscribe_to("b")
        .do(lambda x: x["b"] + x["b"])
        .write_to("c")
    )
    
    app = Pregel(
        nodes={"node1": node1, "node2": node2},
        channels={
            "a": EphemeralValue(str),
            "b": LastValue(str),
            "c": EphemeralValue(str),
        },
        input_channels=["a"],
        output_channels=["b", "c"],
    )
    
    app.invoke({"a": "foo"})
    {'b': 'foofoo', 'c': 'foofoofoofoo'}
    

    Using a Topic channel:

    from langgraph.channels import LastValue, EphemeralValue, Topic
    from langgraph.pregel import Pregel, NodeBuilder
    
    node1 = (
        NodeBuilder().subscribe_only("a")
        .do(lambda x: x + x)
        .write_to("b", "c")
    )
    
    node2 = (
        NodeBuilder().subscribe_only("b")
        .do(lambda x: x + x)
        .write_to("c")
    )
    
    app = Pregel(
        nodes={"node1": node1, "node2": node2},
        channels={
            "a": EphemeralValue(str),
            "b": EphemeralValue(str),
            "c": Topic(str, accumulate=True),
        },
        input_channels=["a"],
        output_channels=["c"],
    )
    
    app.invoke({"a": "foo"})
    {"c": ["foofoo", "foofoofoofoo"]}
    

    Using a BinaryOperatorAggregate channel:

    from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
    from langgraph.pregel import Pregel, NodeBuilder
    
    node1 = (
        NodeBuilder().subscribe_only("a")
        .do(lambda x: x + x)
        .write_to("b", "c")
    )
    
    node2 = (
        NodeBuilder().subscribe_only("b")
        .do(lambda x: x + x)
        .write_to("c")
    )
    
    def reducer(current, update):
        if current:
            return current + " | " + update
        else:
            return update
    
    app = Pregel(
        nodes={"node1": node1, "node2": node2},
        channels={
            "a": EphemeralValue(str),
            "b": EphemeralValue(str),
            "c": BinaryOperatorAggregate(str, operator=reducer),
        },
        input_channels=["a"],
        output_channels=["c"],
    )
    
    app.invoke({"a": "foo"})
    {'c': 'foofoo | foofoofoofoo'}
    

    Introducing a cycle:

    This example demonstrates how to introduce a cycle in the graph, by having a chain write to a channel it subscribes to.

    Execution will continue until a None value is written to the channel.

    from langgraph.channels import EphemeralValue
    from langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry
    
    example_node = (
        NodeBuilder()
        .subscribe_only("value")
        .do(lambda x: x + x if len(x) < 10 else None)
        .write_to(ChannelWriteEntry(channel="value", skip_none=True))
    )
    
    app = Pregel(
        nodes={"example_node": example_node},
        channels={
            "value": EphemeralValue(str),
        },
        input_channels=["value"],
        output_channels=["value"],
    )
    
    app.invoke({"value": "a"})
    {'value': 'aaaaaaaaaaaaaaaa'}
    
    ] |
    None
    auto_validatebool
    stream_modeStreamMode
    stream_eagerbool
    output_channelsstr | Sequence[str]
    stream_channelsstr | Sequence[str] | None
    interrupt_after_nodesAll | Sequence[str]
    interrupt_before_nodesAll | Sequence[str]
    input_channelsstr | Sequence[str]
    step_timeoutfloat | None
    debugbool | None
    checkpointerCheckpointer
    storeBaseStore | None
    cacheBaseCache | None
    retry_policyRetryPolicy | Sequence[RetryPolicy]
    cache_policyCachePolicy | None
    context_schematype[ContextT] | None
    configRunnableConfig | None
    trigger_to_nodesMapping[str, Sequence[str]] | None
    namestr

    Mode to stream output, defaults to 'values'.

    Whether to force emitting stream events eagerly, automatically turned on for stream_mode "messages" and "custom".

    Channels to stream, defaults to all channels not in reserved channels

    Maximum time to wait for a step to complete, in seconds.

    Whether to print debug information during execution.

    Checkpointer used to save and load graph state.

    Memory store to use for SharedValues.

    Cache to use for storing node results.

    Retry policies to use when running tasks. Empty set disables retries.

    Cache policy to use for all nodes. Can be overridden by individual nodes.

    Specifies the schema for the context object that will be passed to the workflow.

    Return a drawable representation of the computation graph.

    Return a drawable representation of the computation graph.

    Create a copy of the Pregel object with an updated config.

    Get the subgraphs of the graph.

    Get the subgraphs of the graph.

    Get the current state of the graph.

    Get the current state of the graph.

    Get the history of the state of the graph.

    Asynchronously get the history of the state of the graph.

    Apply updates to the graph state in bulk. Requires a checkpointer to be set.

    Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.

    Update the state of the graph with the given values, as if they came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.

    Asynchronously update the state of the graph with the given values, as if they came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.

    Stream graph steps for a single input.

    Asynchronously stream graph steps for a single input.

    Run the graph with a single input and config.

    Asynchronously run the graph with a single input and config.

    Clear the cache for the given nodes.

    Asynchronously clear the cache for the given nodes.