Skip to content

RemoteGraph

langgraph.pregel.remote

RemoteGraph

Bases: PregelProtocol

The RemoteGraph class is a client implementation for calling remote APIs that implement the LangGraph Server API specification.

For example, the RemoteGraph class can be used to call APIs from deployments on LangSmith Deployment.

RemoteGraph behaves the same way as a Graph and can be used directly as a node in another Graph.

METHOD DESCRIPTION
__init__

Specify url, api_key, and/or headers to create default sync and async clients.

with_config

Bind config to a Runnable, returning a new Runnable.

get_graph

Get graph by graph name.

aget_graph

Get graph by graph name.

get_state

Get the state of a thread.

aget_state

Get the state of a thread.

get_state_history

Get the state history of a thread.

aget_state_history

Get the state history of a thread.

update_state

Update the state of a thread.

aupdate_state

Update the state of a thread.

stream

Create a run and stream the results.

astream

Create a run and stream the results.

astream_events

Generate a stream of events.

invoke

Create a run, wait until it finishes and return the final state.

ainvoke

Create a run, wait until it finishes and return the final state.

get_name

Get the name of the Runnable.

get_input_schema

Get a Pydantic model that can be used to validate input to the Runnable.

get_input_jsonschema

Get a JSON schema that represents the input to the Runnable.

get_output_schema

Get a Pydantic model that can be used to validate output to the Runnable.

get_output_jsonschema

Get a JSON schema that represents the output of the Runnable.

config_schema

The type of config this Runnable accepts specified as a Pydantic model.

get_config_jsonschema

Get a JSON schema that represents the config of the Runnable.

get_prompts

Return a list of prompts used by this Runnable.

__or__

Runnable "or" operator.

__ror__

Runnable "reverse-or" operator.

pipe

Pipe Runnable objects.

pick

Pick keys from the output dict of this Runnable.

assign

Assigns new fields to the dict output of this Runnable.

batch

Default implementation runs invoke in parallel using a thread pool executor.

batch_as_completed

Run invoke in parallel on a list of inputs.

abatch

Default implementation runs ainvoke in parallel using asyncio.gather.

abatch_as_completed

Run ainvoke in parallel on a list of inputs.

astream_log

Stream all output from a Runnable, as reported to the callback system.

transform

Transform inputs to outputs.

atransform

Transform inputs to outputs.

bind

Bind arguments to a Runnable, returning a new Runnable.

with_listeners

Bind lifecycle listeners to a Runnable, returning a new Runnable.

with_alisteners

Bind async lifecycle listeners to a Runnable.

with_types

Bind input and output types to a Runnable, returning a new Runnable.

with_retry

Create a new Runnable that retries the original Runnable on exceptions.

map

Return a new Runnable that maps a list of inputs to a list of outputs.

with_fallbacks

Add fallbacks to a Runnable, returning a new Runnable.

as_tool

Create a BaseTool from a Runnable.

name instance-attribute

name: str | None

The name of the Runnable. Used for debugging and tracing.

InputType property

InputType: type[Input]

Input type.

The type of input this Runnable accepts specified as a type annotation.

RAISES DESCRIPTION
TypeError

If the input type cannot be inferred.

OutputType property

OutputType: type[Output]

Output Type.

The type of output this Runnable produces specified as a type annotation.

RAISES DESCRIPTION
TypeError

If the output type cannot be inferred.

input_schema property

input_schema: type[BaseModel]

The type of input this Runnable accepts specified as a Pydantic model.

output_schema property

output_schema: type[BaseModel]

Output schema.

The type of output this Runnable produces specified as a Pydantic model.

config_specs property

config_specs: list[ConfigurableFieldSpec]

List configurable fields for this Runnable.

__init__

__init__(
    assistant_id: str,
    /,
    *,
    url: str | None = None,
    api_key: str | None = None,
    headers: dict[str, str] | None = None,
    client: LangGraphClient | None = None,
    sync_client: SyncLangGraphClient | None = None,
    config: RunnableConfig | None = None,
    name: str | None = None,
    distributed_tracing: bool = False,
)

Specify url, api_key, and/or headers to create default sync and async clients.

If client or sync_client are provided, they will be used instead of the default clients. See LangGraphClient and SyncLangGraphClient for details on the default clients. At least one of url, client, or sync_client must be provided.

PARAMETER DESCRIPTION
assistant_id

The assistant ID or graph name of the remote graph to use.

TYPE: str

url

The URL of the remote API.

TYPE: str | None DEFAULT: None

api_key

The API key to use for authentication. If not provided, it will be read from the environment (LANGGRAPH_API_KEY, LANGSMITH_API_KEY, or LANGCHAIN_API_KEY).

TYPE: str | None DEFAULT: None

headers

Additional headers to include in the requests.

TYPE: dict[str, str] | None DEFAULT: None

client

A LangGraphClient instance to use instead of creating a default client.

TYPE: LangGraphClient | None DEFAULT: None

sync_client

A SyncLangGraphClient instance to use instead of creating a default client.

TYPE: SyncLangGraphClient | None DEFAULT: None

config

An optional RunnableConfig instance with additional configuration.

TYPE: RunnableConfig | None DEFAULT: None

name

Human-readable name to attach to the RemoteGraph instance. This is useful for adding RemoteGraph as a subgraph via graph.add_node(remote_graph). If not provided, defaults to the assistant ID.

TYPE: str | None DEFAULT: None

distributed_tracing

Whether to enable sending LangSmith distributed tracing headers.

TYPE: bool DEFAULT: False

with_config

with_config(config: RunnableConfig | None = None, **kwargs: Any) -> Self

Bind config to a Runnable, returning a new Runnable.

PARAMETER DESCRIPTION
config

The config to bind to the Runnable.

TYPE: RunnableConfig | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Runnable[Input, Output]

A new Runnable with the config bound.

get_graph

get_graph(
    config: RunnableConfig | None = None,
    *,
    xray: int | bool = False,
    headers: dict[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Graph

Get graph by graph name.

This method calls GET /assistants/{assistant_id}/graph.

PARAMETER DESCRIPTION
config

This parameter is not used.

TYPE: RunnableConfig | None DEFAULT: None

xray

Include graph representation of subgraphs. If an integer value is provided, only subgraphs with a depth less than or equal to the value will be included.

TYPE: int | bool DEFAULT: False

RETURNS DESCRIPTION
Graph

The graph information for the assistant in JSON format.

aget_graph async

aget_graph(
    config: RunnableConfig | None = None,
    *,
    xray: int | bool = False,
    headers: dict[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Graph

Get graph by graph name.

This method calls GET /assistants/{assistant_id}/graph.

PARAMETER DESCRIPTION
config

This parameter is not used.

TYPE: RunnableConfig | None DEFAULT: None

xray

Include graph representation of subgraphs. If an integer value is provided, only subgraphs with a depth less than or equal to the value will be included.

TYPE: int | bool DEFAULT: False

RETURNS DESCRIPTION
Graph

The graph information for the assistant in JSON format.

get_state

get_state(
    config: RunnableConfig,
    *,
    subgraphs: bool = False,
    headers: dict[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> StateSnapshot

Get the state of a thread.

This method calls POST /threads/{thread_id}/state/checkpoint if a checkpoint is specified in the config or GET /threads/{thread_id}/state if no checkpoint is specified.

PARAMETER DESCRIPTION
config

A RunnableConfig that includes thread_id in the configurable field.

TYPE: RunnableConfig

subgraphs

Include subgraphs in the state.

TYPE: bool DEFAULT: False

headers

Optional custom headers to include with the request.

TYPE: dict[str, str] | None DEFAULT: None

params

Optional query parameters to include with the request.

TYPE: QueryParamTypes | None DEFAULT: None

RETURNS DESCRIPTION
StateSnapshot

The latest state of the thread.

aget_state async

aget_state(
    config: RunnableConfig,
    *,
    subgraphs: bool = False,
    headers: dict[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> StateSnapshot

Get the state of a thread.

This method calls POST /threads/{thread_id}/state/checkpoint if a checkpoint is specified in the config or GET /threads/{thread_id}/state if no checkpoint is specified.

PARAMETER DESCRIPTION
config

A RunnableConfig that includes thread_id in the configurable field.

TYPE: RunnableConfig

subgraphs

Include subgraphs in the state.

TYPE: bool DEFAULT: False

headers

Optional custom headers to include with the request.

TYPE: dict[str, str] | None DEFAULT: None

params

Optional query parameters to include with the request.

TYPE: QueryParamTypes | None DEFAULT: None

RETURNS DESCRIPTION
StateSnapshot

The latest state of the thread.

get_state_history

get_state_history(
    config: RunnableConfig,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None,
    headers: dict[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Iterator[StateSnapshot]

Get the state history of a thread.

This method calls POST /threads/{thread_id}/history.

PARAMETER DESCRIPTION
config

A RunnableConfig that includes thread_id in the configurable field.

TYPE: RunnableConfig

filter

Metadata to filter on.

TYPE: dict[str, Any] | None DEFAULT: None

before

A RunnableConfig that includes checkpoint metadata.

TYPE: RunnableConfig | None DEFAULT: None

limit

Max number of states to return.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
Iterator[StateSnapshot]

States of the thread.

aget_state_history async

aget_state_history(
    config: RunnableConfig,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None,
    headers: dict[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> AsyncIterator[StateSnapshot]

Get the state history of a thread.

This method calls POST /threads/{thread_id}/history.

PARAMETER DESCRIPTION
config

A RunnableConfig that includes thread_id in the configurable field.

TYPE: RunnableConfig

filter

Metadata to filter on.

TYPE: dict[str, Any] | None DEFAULT: None

before

A RunnableConfig that includes checkpoint metadata.

TYPE: RunnableConfig | None DEFAULT: None

limit

Max number of states to return.

TYPE: int | None DEFAULT: None

headers

Optional custom headers to include with the request.

TYPE: dict[str, str] | None DEFAULT: None

params

Optional query parameters to include with the request.

TYPE: QueryParamTypes | None DEFAULT: None

RETURNS DESCRIPTION
AsyncIterator[StateSnapshot]

States of the thread.

update_state

update_state(
    config: RunnableConfig,
    values: dict[str, Any] | Any | None,
    as_node: str | None = None,
    *,
    headers: dict[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> RunnableConfig

Update the state of a thread.

This method calls POST /threads/{thread_id}/state.

PARAMETER DESCRIPTION
config

A RunnableConfig that includes thread_id in the configurable field.

TYPE: RunnableConfig

values

Values to update to the state.

TYPE: dict[str, Any] | Any | None

as_node

Update the state as if this node had just executed.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
RunnableConfig

RunnableConfig for the updated thread.

aupdate_state async

aupdate_state(
    config: RunnableConfig,
    values: dict[str, Any] | Any | None,
    as_node: str | None = None,
    *,
    headers: dict[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> RunnableConfig

Update the state of a thread.

This method calls POST /threads/{thread_id}/state.

PARAMETER DESCRIPTION
config

A RunnableConfig that includes thread_id in the configurable field.

TYPE: RunnableConfig

values

Values to update to the state.

TYPE: dict[str, Any] | Any | None

as_node

Update the state as if this node had just executed.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
RunnableConfig

RunnableConfig for the updated thread.

stream

stream(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: StreamMode | list[StreamMode] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    subgraphs: bool = False,
    headers: dict[str, str] | None = None,
    params: QueryParamTypes | None = None,
    **kwargs: Any,
) -> Iterator[dict[str, Any] | Any]

Create a run and stream the results.

This method calls POST /threads/{thread_id}/runs/stream if a thread_id is speciffed in the configurable field of the config or POST /runs/stream otherwise.

PARAMETER DESCRIPTION
input

Input to the graph.

TYPE: dict[str, Any] | Any

config

A RunnableConfig for graph invocation.

TYPE: RunnableConfig | None DEFAULT: None

stream_mode

Stream mode(s) to use.

TYPE: StreamMode | list[StreamMode] | None DEFAULT: None

interrupt_before

Interrupt the graph before these nodes.

TYPE: All | Sequence[str] | None DEFAULT: None

interrupt_after

Interrupt the graph after these nodes.

TYPE: All | Sequence[str] | None DEFAULT: None

subgraphs

Stream from subgraphs.

TYPE: bool DEFAULT: False

headers

Additional headers to pass to the request.

TYPE: dict[str, str] | None DEFAULT: None

**kwargs

Additional params to pass to client.runs.stream.

TYPE: Any DEFAULT: {}

YIELDS DESCRIPTION
dict[str, Any] | Any

The output of the graph.

astream async

astream(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: StreamMode | list[StreamMode] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    subgraphs: bool = False,
    headers: dict[str, str] | None = None,
    params: QueryParamTypes | None = None,
    **kwargs: Any,
) -> AsyncIterator[dict[str, Any] | Any]

Create a run and stream the results.

This method calls POST /threads/{thread_id}/runs/stream if a thread_id is speciffed in the configurable field of the config or POST /runs/stream otherwise.

PARAMETER DESCRIPTION
input

Input to the graph.

TYPE: dict[str, Any] | Any

config

A RunnableConfig for graph invocation.

TYPE: RunnableConfig | None DEFAULT: None

stream_mode

Stream mode(s) to use.

TYPE: StreamMode | list[StreamMode] | None DEFAULT: None

interrupt_before

Interrupt the graph before these nodes.

TYPE: All | Sequence[str] | None DEFAULT: None

interrupt_after

Interrupt the graph after these nodes.

TYPE: All | Sequence[str] | None DEFAULT: None

subgraphs

Stream from subgraphs.

TYPE: bool DEFAULT: False

headers

Additional headers to pass to the request.

TYPE: dict[str, str] | None DEFAULT: None

**kwargs

Additional params to pass to client.runs.stream.

TYPE: Any DEFAULT: {}

YIELDS DESCRIPTION
AsyncIterator[dict[str, Any] | Any]

The output of the graph.

astream_events async

astream_events(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    version: Literal["v1", "v2"],
    include_names: Sequence[All] | None = None,
    include_types: Sequence[All] | None = None,
    include_tags: Sequence[All] | None = None,
    exclude_names: Sequence[All] | None = None,
    exclude_types: Sequence[All] | None = None,
    exclude_tags: Sequence[All] | None = None,
    **kwargs: Any,
) -> AsyncIterator[dict[str, Any]]

Generate a stream of events.

Use to create an iterator over StreamEvent that provide real-time information about the progress of the Runnable, including StreamEvent from intermediate results.

A StreamEvent is a dictionary with the following schema:

  • event: Event names are of the format: on_[runnable_type]_(start|stream|end).
  • name: The name of the Runnable that generated the event.
  • run_id: Randomly generated ID associated with the given execution of the Runnable that emitted the event. A child Runnable that gets invoked as part of the execution of a parent Runnable is assigned its own unique ID.
  • parent_ids: The IDs of the parent runnables that generated the event. The root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
  • tags: The tags of the Runnable that generated the event.
  • metadata: The metadata of the Runnable that generated the event.
  • data: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.

Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.

Note

This reference table is for the v2 version of the schema.

event name chunk input output
on_chat_model_start '[model name]' {"messages": [[SystemMessage, HumanMessage]]}
on_chat_model_stream '[model name]' AIMessageChunk(content="hello")
on_chat_model_end '[model name]' {"messages": [[SystemMessage, HumanMessage]]} AIMessageChunk(content="hello world")
on_llm_start '[model name]' {'input': 'hello'}
on_llm_stream '[model name]' 'Hello'
on_llm_end '[model name]' 'Hello human!'
on_chain_start 'format_docs'
on_chain_stream 'format_docs' 'hello world!, goodbye world!'
on_chain_end 'format_docs' [Document(...)] 'hello world!, goodbye world!'
on_tool_start 'some_tool' {"x": 1, "y": "2"}
on_tool_end 'some_tool' {"x": 1, "y": "2"}
on_retriever_start '[retriever name]' {"query": "hello"}
on_retriever_end '[retriever name]' {"query": "hello"} [Document(...), ..]
on_prompt_start '[template_name]' {"question": "hello"}
on_prompt_end '[template_name]' {"question": "hello"} ChatPromptValue(messages: [SystemMessage, ...])

In addition to the standard events, users can also dispatch custom events (see example below).

Custom events will be only be surfaced with in the v2 version of the API!

A custom event has following format:

Attribute Type Description
name str A user defined name for the event.
data Any The data associated with the event. This can be anything, though we suggest making it JSON serializable.

Here are declarations associated with the standard events shown above:

format_docs:

def format_docs(docs: list[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])


format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

prompt:

template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are Cat Agent 007"),
        ("human", "{question}"),
    ]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

For instance:

from langchain_core.runnables import RunnableLambda


async def reverse(s: str) -> str:
    return s[::-1]


chain = RunnableLambda(func=reverse)

events = [event async for event in chain.astream_events("hello", version="v2")]

# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]
Example: Dispatch Custom Event
from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
PARAMETER DESCRIPTION
input

The input to the Runnable.

TYPE: Any

config

The config to use for the Runnable.

TYPE: RunnableConfig | None DEFAULT: None

version

The version of the schema to use either 'v2' or 'v1'. Users should use 'v2'. 'v1' is for backwards compatibility and will be deprecated in 0.4.0. No default will be assigned until the API is stabilized. custom events will only be surfaced in 'v2'.

TYPE: Literal['v1', 'v2'] DEFAULT: 'v2'

include_names

Only include events from Runnable objects with matching names.

TYPE: Sequence[str] | None DEFAULT: None

include_types

Only include events from Runnable objects with matching types.

TYPE: Sequence[str] | None DEFAULT: None

include_tags

Only include events from Runnable objects with matching tags.

TYPE: Sequence[str] | None DEFAULT: None

exclude_names

Exclude events from Runnable objects with matching names.

TYPE: Sequence[str] | None DEFAULT: None

exclude_types

Exclude events from Runnable objects with matching types.

TYPE: Sequence[str] | None DEFAULT: None

exclude_tags

Exclude events from Runnable objects with matching tags.

TYPE: Sequence[str] | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the Runnable. These will be passed to astream_log as this implementation of astream_events is built on top of astream_log.

TYPE: Any DEFAULT: {}

YIELDS DESCRIPTION
AsyncIterator[StreamEvent]

An async stream of StreamEvent.

RAISES DESCRIPTION
NotImplementedError

If the version is not 'v1' or 'v2'.

invoke

invoke(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    headers: dict[str, str] | None = None,
    params: QueryParamTypes | None = None,
    **kwargs: Any,
) -> dict[str, Any] | Any

Create a run, wait until it finishes and return the final state.

PARAMETER DESCRIPTION
input

Input to the graph.

TYPE: dict[str, Any] | Any

config

A RunnableConfig for graph invocation.

TYPE: RunnableConfig | None DEFAULT: None

interrupt_before

Interrupt the graph before these nodes.

TYPE: All | Sequence[str] | None DEFAULT: None

interrupt_after

Interrupt the graph after these nodes.

TYPE: All | Sequence[str] | None DEFAULT: None

headers

Additional headers to pass to the request.

TYPE: dict[str, str] | None DEFAULT: None

**kwargs

Additional params to pass to RemoteGraph.stream.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
dict[str, Any] | Any

The output of the graph.

ainvoke async

ainvoke(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    headers: dict[str, str] | None = None,
    params: QueryParamTypes | None = None,
    **kwargs: Any,
) -> dict[str, Any] | Any

Create a run, wait until it finishes and return the final state.

PARAMETER DESCRIPTION
input

Input to the graph.

TYPE: dict[str, Any] | Any

config

A RunnableConfig for graph invocation.

TYPE: RunnableConfig | None DEFAULT: None

interrupt_before

Interrupt the graph before these nodes.

TYPE: All | Sequence[str] | None DEFAULT: None

interrupt_after

Interrupt the graph after these nodes.

TYPE: All | Sequence[str] | None DEFAULT: None

headers

Additional headers to pass to the request.

TYPE: dict[str, str] | None DEFAULT: None

**kwargs

Additional params to pass to RemoteGraph.astream.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
dict[str, Any] | Any

The output of the graph.

get_name

get_name(suffix: str | None = None, *, name: str | None = None) -> str

Get the name of the Runnable.

PARAMETER DESCRIPTION
suffix

An optional suffix to append to the name.

TYPE: str | None DEFAULT: None

name

An optional name to use instead of the Runnable's name.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
str

The name of the Runnable.

get_input_schema

get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]

Get a Pydantic model that can be used to validate input to the Runnable.

Runnable objects that leverage the configurable_fields and configurable_alternatives methods will have a dynamic input schema that depends on which configuration the Runnable is invoked with.

This method allows to get an input schema for a specific configuration.

PARAMETER DESCRIPTION
config

A config to use when generating the schema.

TYPE: RunnableConfig | None DEFAULT: None

RETURNS DESCRIPTION
type[BaseModel]

A Pydantic model that can be used to validate input.

get_input_jsonschema

get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]

Get a JSON schema that represents the input to the Runnable.

PARAMETER DESCRIPTION
config

A config to use when generating the schema.

TYPE: RunnableConfig | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

A JSON schema that represents the input to the Runnable.

Example
from langchain_core.runnables import RunnableLambda


def add_one(x: int) -> int:
    return x + 1


runnable = RunnableLambda(add_one)

print(runnable.get_input_jsonschema())

Added in version 0.3.0

get_output_schema

get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]

Get a Pydantic model that can be used to validate output to the Runnable.

Runnable objects that leverage the configurable_fields and configurable_alternatives methods will have a dynamic output schema that depends on which configuration the Runnable is invoked with.

This method allows to get an output schema for a specific configuration.

PARAMETER DESCRIPTION
config

A config to use when generating the schema.

TYPE: RunnableConfig | None DEFAULT: None

RETURNS DESCRIPTION
type[BaseModel]

A Pydantic model that can be used to validate output.

get_output_jsonschema

get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]

Get a JSON schema that represents the output of the Runnable.

PARAMETER DESCRIPTION
config

A config to use when generating the schema.

TYPE: RunnableConfig | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

A JSON schema that represents the output of the Runnable.

Example
from langchain_core.runnables import RunnableLambda


def add_one(x: int) -> int:
    return x + 1


runnable = RunnableLambda(add_one)

print(runnable.get_output_jsonschema())

Added in version 0.3.0

config_schema

config_schema(*, include: Sequence[str] | None = None) -> type[BaseModel]

The type of config this Runnable accepts specified as a Pydantic model.

To mark a field as configurable, see the configurable_fields and configurable_alternatives methods.

PARAMETER DESCRIPTION
include

A list of fields to include in the config schema.

TYPE: Sequence[str] | None DEFAULT: None

RETURNS DESCRIPTION
type[BaseModel]

A Pydantic model that can be used to validate config.

get_config_jsonschema

get_config_jsonschema(*, include: Sequence[str] | None = None) -> dict[str, Any]

Get a JSON schema that represents the config of the Runnable.

PARAMETER DESCRIPTION
include

A list of fields to include in the config schema.

TYPE: Sequence[str] | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

A JSON schema that represents the config of the Runnable.

Added in version 0.3.0

get_prompts

get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]

Return a list of prompts used by this Runnable.

__or__

__or__(
    other: Runnable[Any, Other]
    | Callable[[Iterator[Any]], Iterator[Other]]
    | Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
    | Callable[[Any], Other]
    | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]

Runnable "or" operator.

Compose this Runnable with another object to create a RunnableSequence.

PARAMETER DESCRIPTION
other

Another Runnable or a Runnable-like object.

TYPE: Runnable[Any, Other] | Callable[[Iterator[Any]], Iterator[Other]] | Callable[[AsyncIterator[Any]], AsyncIterator[Other]] | Callable[[Any], Other] | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any]

RETURNS DESCRIPTION
RunnableSerializable[Input, Other]

A new Runnable.

__ror__

__ror__(
    other: Runnable[Other, Any]
    | Callable[[Iterator[Other]], Iterator[Any]]
    | Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
    | Callable[[Other], Any]
    | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]

Runnable "reverse-or" operator.

Compose this Runnable with another object to create a RunnableSequence.

PARAMETER DESCRIPTION
other

Another Runnable or a Runnable-like object.

TYPE: Runnable[Other, Any] | Callable[[Iterator[Other]], Iterator[Any]] | Callable[[AsyncIterator[Other]], AsyncIterator[Any]] | Callable[[Other], Any] | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any]

RETURNS DESCRIPTION
RunnableSerializable[Other, Output]

A new Runnable.

pipe

pipe(
    *others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]

Pipe Runnable objects.

Compose this Runnable with Runnable-like objects to make a RunnableSequence.

Equivalent to RunnableSequence(self, *others) or self | others[0] | ...

Example
from langchain_core.runnables import RunnableLambda


def add_one(x: int) -> int:
    return x + 1


def mul_two(x: int) -> int:
    return x * 2


runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4

sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
PARAMETER DESCRIPTION
*others

Other Runnable or Runnable-like objects to compose

TYPE: Runnable[Any, Other] | Callable[[Any], Other] DEFAULT: ()

name

An optional name for the resulting RunnableSequence.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
RunnableSerializable[Input, Other]

A new Runnable.

pick

pick(keys: str | list[str]) -> RunnableSerializable[Any, Any]

Pick keys from the output dict of this Runnable.

Pick a single key:

import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}

json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]

Pick a list of keys:

from typing import Any

import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)


def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")


chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}

json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
PARAMETER DESCRIPTION
keys

A key or list of keys to pick from the output dict.

TYPE: str | list[str]

RETURNS DESCRIPTION
RunnableSerializable[Any, Any]

a new Runnable.

assign

Assigns new fields to the dict output of this Runnable.

from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter

prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])

chain: Runnable = prompt | model | {"str": StrOutputParser()}

chain_with_assign = chain.assign(hello=itemgetter("str") | model)

print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
PARAMETER DESCRIPTION
**kwargs

A mapping of keys to Runnable or Runnable-like objects that will be invoked with the entire output dict of this Runnable.

TYPE: Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any] | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]] DEFAULT: {}

RETURNS DESCRIPTION
RunnableSerializable[Any, Any]

A new Runnable.

batch

batch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]

Default implementation runs invoke in parallel using a thread pool executor.

The default implementation of batch works well for IO bound runnables.

Subclasses must override this method if they can batch more efficiently; e.g., if the underlying Runnable uses an API which supports a batch mode.

PARAMETER DESCRIPTION
inputs

A list of inputs to the Runnable.

TYPE: list[Input]

config

A config to use when invoking the Runnable. The config supports standard keys like 'tags', 'metadata' for tracing purposes, 'max_concurrency' for controlling how much work to do in parallel, and other keys. Please refer to the RunnableConfig for more details.

TYPE: RunnableConfig | list[RunnableConfig] | None DEFAULT: None

return_exceptions

Whether to return exceptions instead of raising them.

TYPE: bool DEFAULT: False

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

RETURNS DESCRIPTION
list[Output]

A list of outputs from the Runnable.

batch_as_completed

batch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]

Run invoke in parallel on a list of inputs.

Yields results as they complete.

PARAMETER DESCRIPTION
inputs

A list of inputs to the Runnable.

TYPE: Sequence[Input]

config

A config to use when invoking the Runnable. The config supports standard keys like 'tags', 'metadata' for tracing purposes, 'max_concurrency' for controlling how much work to do in parallel, and other keys. Please refer to the RunnableConfig for more details.

TYPE: RunnableConfig | Sequence[RunnableConfig] | None DEFAULT: None

return_exceptions

Whether to return exceptions instead of raising them.

TYPE: bool DEFAULT: False

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

YIELDS DESCRIPTION
tuple[int, Output | Exception]

Tuples of the index of the input and the output from the Runnable.

abatch async

abatch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]

Default implementation runs ainvoke in parallel using asyncio.gather.

The default implementation of batch works well for IO bound runnables.

Subclasses must override this method if they can batch more efficiently; e.g., if the underlying Runnable uses an API which supports a batch mode.

PARAMETER DESCRIPTION
inputs

A list of inputs to the Runnable.

TYPE: list[Input]

config

A config to use when invoking the Runnable. The config supports standard keys like 'tags', 'metadata' for tracing purposes, 'max_concurrency' for controlling how much work to do in parallel, and other keys. Please refer to the RunnableConfig for more details.

TYPE: RunnableConfig | list[RunnableConfig] | None DEFAULT: None

return_exceptions

Whether to return exceptions instead of raising them.

TYPE: bool DEFAULT: False

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

RETURNS DESCRIPTION
list[Output]

A list of outputs from the Runnable.

abatch_as_completed async

abatch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]

Run ainvoke in parallel on a list of inputs.

Yields results as they complete.

PARAMETER DESCRIPTION
inputs

A list of inputs to the Runnable.

TYPE: Sequence[Input]

config

A config to use when invoking the Runnable. The config supports standard keys like 'tags', 'metadata' for tracing purposes, 'max_concurrency' for controlling how much work to do in parallel, and other keys. Please refer to the RunnableConfig for more details.

TYPE: RunnableConfig | Sequence[RunnableConfig] | None DEFAULT: None

return_exceptions

Whether to return exceptions instead of raising them.

TYPE: bool DEFAULT: False

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

YIELDS DESCRIPTION
AsyncIterator[tuple[int, Output | Exception]]

A tuple of the index of the input and the output from the Runnable.

astream_log async

astream_log(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    diff: bool = True,
    with_streamed_output_list: bool = True,
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]

Stream all output from a Runnable, as reported to the callback system.

This includes all inner runs of LLMs, Retrievers, Tools, etc.

Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.

The Jsonpatch ops can be applied in order to construct state.

PARAMETER DESCRIPTION
input

The input to the Runnable.

TYPE: Any

config

The config to use for the Runnable.

TYPE: RunnableConfig | None DEFAULT: None

diff

Whether to yield diffs between each step or the current state.

TYPE: bool DEFAULT: True

with_streamed_output_list

Whether to yield the streamed_output list.

TYPE: bool DEFAULT: True

include_names

Only include logs with these names.

TYPE: Sequence[str] | None DEFAULT: None

include_types

Only include logs with these types.

TYPE: Sequence[str] | None DEFAULT: None

include_tags

Only include logs with these tags.

TYPE: Sequence[str] | None DEFAULT: None

exclude_names

Exclude logs with these names.

TYPE: Sequence[str] | None DEFAULT: None

exclude_types

Exclude logs with these types.

TYPE: Sequence[str] | None DEFAULT: None

exclude_tags

Exclude logs with these tags.

TYPE: Sequence[str] | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any DEFAULT: {}

YIELDS DESCRIPTION
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]

A RunLogPatch or RunLog object.

transform

transform(
    input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]

Transform inputs to outputs.

Default implementation of transform, which buffers input and calls astream.

Subclasses must override this method if they can start producing output while input is still being generated.

PARAMETER DESCRIPTION
input

An iterator of inputs to the Runnable.

TYPE: Iterator[Input]

config

The config to use for the Runnable.

TYPE: RunnableConfig | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

YIELDS DESCRIPTION
Output

The output of the Runnable.

atransform async

atransform(
    input: AsyncIterator[Input],
    config: RunnableConfig | None = None,
    **kwargs: Any | None,
) -> AsyncIterator[Output]

Transform inputs to outputs.

Default implementation of atransform, which buffers input and calls astream.

Subclasses must override this method if they can start producing output while input is still being generated.

PARAMETER DESCRIPTION
input

An async iterator of inputs to the Runnable.

TYPE: AsyncIterator[Input]

config

The config to use for the Runnable.

TYPE: RunnableConfig | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

YIELDS DESCRIPTION
AsyncIterator[Output]

The output of the Runnable.

bind

bind(**kwargs: Any) -> Runnable[Input, Output]

Bind arguments to a Runnable, returning a new Runnable.

Useful when a Runnable in a chain requires an argument that is not in the output of the previous Runnable or included in the user input.

PARAMETER DESCRIPTION
**kwargs

The arguments to bind to the Runnable.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Runnable[Input, Output]

A new Runnable with the arguments bound.

Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser

model = ChatOllama(model="llama3.1")

# Without bind
chain = model | StrOutputParser()

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'

# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'

with_listeners

with_listeners(
    *,
    on_start: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
    on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
    on_error: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
) -> Runnable[Input, Output]

Bind lifecycle listeners to a Runnable, returning a new Runnable.

The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.

PARAMETER DESCRIPTION
on_start

Called before the Runnable starts running, with the Run object.

TYPE: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None DEFAULT: None

on_end

Called after the Runnable finishes running, with the Run object.

TYPE: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None DEFAULT: None

on_error

Called if the Runnable throws an error, with the Run object.

TYPE: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None DEFAULT: None

RETURNS DESCRIPTION
Runnable[Input, Output]

A new Runnable with the listeners bound.

Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run

import time


def test_runnable(time_to_sleep: int):
    time.sleep(time_to_sleep)


def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)


def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)


chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start, on_end=fn_end
)
chain.invoke(2)

with_alisteners

with_alisteners(
    *,
    on_start: AsyncListener | None = None,
    on_end: AsyncListener | None = None,
    on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]

Bind async lifecycle listeners to a Runnable.

Returns a new Runnable.

The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.

PARAMETER DESCRIPTION
on_start

Called asynchronously before the Runnable starts running, with the Run object.

TYPE: AsyncListener | None DEFAULT: None

on_end

Called asynchronously after the Runnable finishes running, with the Run object.

TYPE: AsyncListener | None DEFAULT: None

on_error

Called asynchronously if the Runnable throws an error, with the Run object.

TYPE: AsyncListener | None DEFAULT: None

RETURNS DESCRIPTION
Runnable[Input, Output]

A new Runnable with the listeners bound.

Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio

def format_t(timestamp: float) -> str:
    return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()

async def test_runnable(time_to_sleep: int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")

async def fn_start(run_obj: Runnable):
    print(f"on start callback starts at {format_t(time.time())}")
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")

async def fn_end(run_obj: Runnable):
    print(f"on end callback starts at {format_t(time.time())}")
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")

runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))

asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00

with_types

with_types(
    *, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]

Bind input and output types to a Runnable, returning a new Runnable.

PARAMETER DESCRIPTION
input_type

The input type to bind to the Runnable.

TYPE: type[Input] | None DEFAULT: None

output_type

The output type to bind to the Runnable.

TYPE: type[Output] | None DEFAULT: None

RETURNS DESCRIPTION
Runnable[Input, Output]

A new Runnable with the types bound.

with_retry

with_retry(
    *,
    retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
    wait_exponential_jitter: bool = True,
    exponential_jitter_params: ExponentialJitterParams | None = None,
    stop_after_attempt: int = 3,
) -> Runnable[Input, Output]

Create a new Runnable that retries the original Runnable on exceptions.

PARAMETER DESCRIPTION
retry_if_exception_type

A tuple of exception types to retry on.

TYPE: tuple[type[BaseException], ...] DEFAULT: (Exception,)

wait_exponential_jitter

Whether to add jitter to the wait time between retries.

TYPE: bool DEFAULT: True

stop_after_attempt

The maximum number of attempts to make before giving up.

TYPE: int DEFAULT: 3

exponential_jitter_params

Parameters for tenacity.wait_exponential_jitter. Namely: initial, max, exp_base, and jitter (all float values).

TYPE: ExponentialJitterParams | None DEFAULT: None

RETURNS DESCRIPTION
Runnable[Input, Output]

A new Runnable that retries the original Runnable on exceptions.

Example
from langchain_core.runnables import RunnableLambda

count = 0


def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
        pass


runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass

assert count == 2

map

map() -> Runnable[list[Input], list[Output]]

Return a new Runnable that maps a list of inputs to a list of outputs.

Calls invoke with each input.

RETURNS DESCRIPTION
Runnable[list[Input], list[Output]]

A new Runnable that maps a list of inputs to a list of outputs.

Example
from langchain_core.runnables import RunnableLambda


def _lambda(x: int) -> int:
    return x + 1


runnable = RunnableLambda(_lambda)
print(runnable.map().invoke([1, 2, 3]))  # [2, 3, 4]

with_fallbacks

with_fallbacks(
    fallbacks: Sequence[Runnable[Input, Output]],
    *,
    exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
    exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]

Add fallbacks to a Runnable, returning a new Runnable.

The new Runnable will try the original Runnable, and then each fallback in order, upon failures.

PARAMETER DESCRIPTION
fallbacks

A sequence of runnables to try if the original Runnable fails.

TYPE: Sequence[Runnable[Input, Output]]

exceptions_to_handle

A tuple of exception types to handle.

TYPE: tuple[type[BaseException], ...] DEFAULT: (Exception,)

exception_key

If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key. If None, exceptions will not be passed to fallbacks. If used, the base Runnable and its fallbacks must accept a dictionary as input.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
RunnableWithFallbacks[Input, Output]

A new Runnable that will try the original Runnable, and then each Fallback in order, upon failures.

Example
from typing import Iterator

from langchain_core.runnables import RunnableGenerator


def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""


def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"


runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
)
print("".join(runnable.stream({})))  # foo bar
PARAMETER DESCRIPTION
fallbacks

A sequence of runnables to try if the original Runnable fails.

TYPE: Sequence[Runnable[Input, Output]]

exceptions_to_handle

A tuple of exception types to handle.

TYPE: tuple[type[BaseException], ...] DEFAULT: (Exception,)

exception_key

If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key. If None, exceptions will not be passed to fallbacks. If used, the base Runnable and its fallbacks must accept a dictionary as input.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
RunnableWithFallbacks[Input, Output]

A new Runnable that will try the original Runnable, and then each Fallback in order, upon failures.

as_tool

as_tool(
    args_schema: type[BaseModel] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    arg_types: dict[str, type] | None = None,
) -> BaseTool

Create a BaseTool from a Runnable.

as_tool will instantiate a BaseTool with a name, description, and args_schema from a Runnable. Where possible, schemas are inferred from runnable.get_input_schema. Alternatively (e.g., if the Runnable takes a dict as input and the specific dict keys are not typed), the schema can be specified directly with args_schema. You can also pass arg_types to just specify the required arguments and their types.

PARAMETER DESCRIPTION
args_schema

The schema for the tool.

TYPE: type[BaseModel] | None DEFAULT: None

name

The name of the tool.

TYPE: str | None DEFAULT: None

description

The description of the tool.

TYPE: str | None DEFAULT: None

arg_types

A dictionary of argument names to types.

TYPE: dict[str, type] | None DEFAULT: None

RETURNS DESCRIPTION
BaseTool

A BaseTool instance.

Typed dict input:

from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda


class Args(TypedDict):
    a: int
    b: list[int]


def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))


runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict input, specifying schema via args_schema:

from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: list[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict input, specifying schema via arg_types:

from typing import Any
from langchain_core.runnables import RunnableLambda


def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))


runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

String input:

from langchain_core.runnables import RunnableLambda


def f(x: str) -> str:
    return x + "a"


def g(x: str) -> str:
    return x + "z"


runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")