RemoteGraph¶
langgraph.pregel.remote
¶
RemoteGraph
¶
Bases: PregelProtocol
The RemoteGraph
class is a client implementation for calling remote
APIs that implement the LangGraph Server API specification.
For example, the RemoteGraph
class can be used to call APIs from deployments
on LangSmith Deployment.
RemoteGraph
behaves the same way as a Graph
and can be used directly as
a node in another Graph
.
METHOD | DESCRIPTION |
---|---|
__init__ |
Specify |
with_config |
Bind config to a |
get_graph |
Get graph by graph name. |
aget_graph |
Get graph by graph name. |
get_state |
Get the state of a thread. |
aget_state |
Get the state of a thread. |
get_state_history |
Get the state history of a thread. |
aget_state_history |
Get the state history of a thread. |
update_state |
Update the state of a thread. |
aupdate_state |
Update the state of a thread. |
stream |
Create a run and stream the results. |
astream |
Create a run and stream the results. |
astream_events |
Generate a stream of events. |
invoke |
Create a run, wait until it finishes and return the final state. |
ainvoke |
Create a run, wait until it finishes and return the final state. |
get_name |
Get the name of the |
get_input_schema |
Get a Pydantic model that can be used to validate input to the |
get_input_jsonschema |
Get a JSON schema that represents the input to the |
get_output_schema |
Get a Pydantic model that can be used to validate output to the |
get_output_jsonschema |
Get a JSON schema that represents the output of the |
config_schema |
The type of config this |
get_config_jsonschema |
Get a JSON schema that represents the config of the |
get_prompts |
Return a list of prompts used by this |
__or__ |
Runnable "or" operator. |
__ror__ |
Runnable "reverse-or" operator. |
pipe |
Pipe |
pick |
Pick keys from the output |
assign |
Assigns new fields to the |
batch |
Default implementation runs invoke in parallel using a thread pool executor. |
batch_as_completed |
Run |
abatch |
Default implementation runs |
abatch_as_completed |
Run |
astream_log |
Stream all output from a |
transform |
Transform inputs to outputs. |
atransform |
Transform inputs to outputs. |
bind |
Bind arguments to a |
with_listeners |
Bind lifecycle listeners to a |
with_alisteners |
Bind async lifecycle listeners to a |
with_types |
Bind input and output types to a |
with_retry |
Create a new |
map |
Return a new |
with_fallbacks |
Add fallbacks to a |
as_tool |
Create a |
name
instance-attribute
¶
name: str | None
The name of the Runnable
. Used for debugging and tracing.
InputType
property
¶
InputType: type[Input]
Input type.
The type of input this Runnable
accepts specified as a type annotation.
RAISES | DESCRIPTION |
---|---|
TypeError
|
If the input type cannot be inferred. |
OutputType
property
¶
OutputType: type[Output]
Output Type.
The type of output this Runnable
produces specified as a type annotation.
RAISES | DESCRIPTION |
---|---|
TypeError
|
If the output type cannot be inferred. |
input_schema
property
¶
The type of input this Runnable
accepts specified as a Pydantic model.
output_schema
property
¶
Output schema.
The type of output this Runnable
produces specified as a Pydantic model.
config_specs
property
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable
.
__init__
¶
__init__(
assistant_id: str,
/,
*,
url: str | None = None,
api_key: str | None = None,
headers: dict[str, str] | None = None,
client: LangGraphClient | None = None,
sync_client: SyncLangGraphClient | None = None,
config: RunnableConfig | None = None,
name: str | None = None,
distributed_tracing: bool = False,
)
Specify url
, api_key
, and/or headers
to create default sync and async clients.
If client
or sync_client
are provided, they will be used instead of the default clients.
See LangGraphClient
and SyncLangGraphClient
for details on the default clients. At least
one of url
, client
, or sync_client
must be provided.
PARAMETER | DESCRIPTION |
---|---|
assistant_id
|
The assistant ID or graph name of the remote graph to use.
TYPE:
|
url
|
The URL of the remote API.
TYPE:
|
api_key
|
The API key to use for authentication. If not provided, it will be read from the environment (
TYPE:
|
headers
|
Additional headers to include in the requests. |
client
|
A
TYPE:
|
sync_client
|
A
TYPE:
|
config
|
An optional
TYPE:
|
name
|
Human-readable name to attach to the RemoteGraph instance.
This is useful for adding
TYPE:
|
distributed_tracing
|
Whether to enable sending LangSmith distributed tracing headers.
TYPE:
|
with_config
¶
with_config(config: RunnableConfig | None = None, **kwargs: Any) -> Self
Bind config to a Runnable
, returning a new Runnable
.
PARAMETER | DESCRIPTION |
---|---|
config
|
The config to bind to the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
get_graph
¶
get_graph(
config: RunnableConfig | None = None,
*,
xray: int | bool = False,
headers: dict[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Graph
Get graph by graph name.
This method calls GET /assistants/{assistant_id}/graph
.
PARAMETER | DESCRIPTION |
---|---|
config
|
This parameter is not used.
TYPE:
|
xray
|
Include graph representation of subgraphs. If an integer value is provided, only subgraphs with a depth less than or equal to the value will be included. |
RETURNS | DESCRIPTION |
---|---|
Graph
|
The graph information for the assistant in JSON format. |
aget_graph
async
¶
aget_graph(
config: RunnableConfig | None = None,
*,
xray: int | bool = False,
headers: dict[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Graph
Get graph by graph name.
This method calls GET /assistants/{assistant_id}/graph
.
PARAMETER | DESCRIPTION |
---|---|
config
|
This parameter is not used.
TYPE:
|
xray
|
Include graph representation of subgraphs. If an integer value is provided, only subgraphs with a depth less than or equal to the value will be included. |
RETURNS | DESCRIPTION |
---|---|
Graph
|
The graph information for the assistant in JSON format. |
get_state
¶
get_state(
config: RunnableConfig,
*,
subgraphs: bool = False,
headers: dict[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> StateSnapshot
Get the state of a thread.
This method calls POST /threads/{thread_id}/state/checkpoint
if a
checkpoint is specified in the config or GET /threads/{thread_id}/state
if no checkpoint is specified.
PARAMETER | DESCRIPTION |
---|---|
config
|
A
TYPE:
|
subgraphs
|
Include subgraphs in the state.
TYPE:
|
headers
|
Optional custom headers to include with the request. |
params
|
Optional query parameters to include with the request.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
StateSnapshot
|
The latest state of the thread. |
aget_state
async
¶
aget_state(
config: RunnableConfig,
*,
subgraphs: bool = False,
headers: dict[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> StateSnapshot
Get the state of a thread.
This method calls POST /threads/{thread_id}/state/checkpoint
if a
checkpoint is specified in the config or GET /threads/{thread_id}/state
if no checkpoint is specified.
PARAMETER | DESCRIPTION |
---|---|
config
|
A
TYPE:
|
subgraphs
|
Include subgraphs in the state.
TYPE:
|
headers
|
Optional custom headers to include with the request. |
params
|
Optional query parameters to include with the request.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
StateSnapshot
|
The latest state of the thread. |
get_state_history
¶
get_state_history(
config: RunnableConfig,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None,
headers: dict[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Iterator[StateSnapshot]
Get the state history of a thread.
This method calls POST /threads/{thread_id}/history
.
PARAMETER | DESCRIPTION |
---|---|
config
|
A
TYPE:
|
filter
|
Metadata to filter on. |
before
|
A
TYPE:
|
limit
|
Max number of states to return.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Iterator[StateSnapshot]
|
States of the thread. |
aget_state_history
async
¶
aget_state_history(
config: RunnableConfig,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None,
headers: dict[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> AsyncIterator[StateSnapshot]
Get the state history of a thread.
This method calls POST /threads/{thread_id}/history
.
PARAMETER | DESCRIPTION |
---|---|
config
|
A
TYPE:
|
filter
|
Metadata to filter on. |
before
|
A
TYPE:
|
limit
|
Max number of states to return.
TYPE:
|
headers
|
Optional custom headers to include with the request. |
params
|
Optional query parameters to include with the request.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
AsyncIterator[StateSnapshot]
|
States of the thread. |
update_state
¶
update_state(
config: RunnableConfig,
values: dict[str, Any] | Any | None,
as_node: str | None = None,
*,
headers: dict[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> RunnableConfig
Update the state of a thread.
This method calls POST /threads/{thread_id}/state
.
PARAMETER | DESCRIPTION |
---|---|
config
|
A
TYPE:
|
values
|
Values to update to the state. |
as_node
|
Update the state as if this node had just executed.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableConfig
|
|
aupdate_state
async
¶
aupdate_state(
config: RunnableConfig,
values: dict[str, Any] | Any | None,
as_node: str | None = None,
*,
headers: dict[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> RunnableConfig
Update the state of a thread.
This method calls POST /threads/{thread_id}/state
.
PARAMETER | DESCRIPTION |
---|---|
config
|
A
TYPE:
|
values
|
Values to update to the state. |
as_node
|
Update the state as if this node had just executed.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableConfig
|
|
stream
¶
stream(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: StreamMode | list[StreamMode] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
subgraphs: bool = False,
headers: dict[str, str] | None = None,
params: QueryParamTypes | None = None,
**kwargs: Any,
) -> Iterator[dict[str, Any] | Any]
Create a run and stream the results.
This method calls POST /threads/{thread_id}/runs/stream
if a thread_id
is speciffed in the configurable
field of the config or
POST /runs/stream
otherwise.
PARAMETER | DESCRIPTION |
---|---|
input
|
Input to the graph. |
config
|
A
TYPE:
|
stream_mode
|
Stream mode(s) to use.
TYPE:
|
interrupt_before
|
Interrupt the graph before these nodes. |
interrupt_after
|
Interrupt the graph after these nodes. |
subgraphs
|
Stream from subgraphs.
TYPE:
|
headers
|
Additional headers to pass to the request. |
**kwargs
|
Additional params to pass to client.runs.stream.
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
dict[str, Any] | Any
|
The output of the graph. |
astream
async
¶
astream(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: StreamMode | list[StreamMode] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
subgraphs: bool = False,
headers: dict[str, str] | None = None,
params: QueryParamTypes | None = None,
**kwargs: Any,
) -> AsyncIterator[dict[str, Any] | Any]
Create a run and stream the results.
This method calls POST /threads/{thread_id}/runs/stream
if a thread_id
is speciffed in the configurable
field of the config or
POST /runs/stream
otherwise.
PARAMETER | DESCRIPTION |
---|---|
input
|
Input to the graph. |
config
|
A
TYPE:
|
stream_mode
|
Stream mode(s) to use.
TYPE:
|
interrupt_before
|
Interrupt the graph before these nodes. |
interrupt_after
|
Interrupt the graph after these nodes. |
subgraphs
|
Stream from subgraphs.
TYPE:
|
headers
|
Additional headers to pass to the request. |
**kwargs
|
Additional params to pass to client.runs.stream.
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[dict[str, Any] | Any]
|
The output of the graph. |
astream_events
async
¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"],
include_names: Sequence[All] | None = None,
include_types: Sequence[All] | None = None,
include_tags: Sequence[All] | None = None,
exclude_names: Sequence[All] | None = None,
exclude_types: Sequence[All] | None = None,
exclude_tags: Sequence[All] | None = None,
**kwargs: Any,
) -> AsyncIterator[dict[str, Any]]
Generate a stream of events.
Use to create an iterator over StreamEvent
that provide real-time information
about the progress of the Runnable
, including StreamEvent
from intermediate
results.
A StreamEvent
is a dictionary with the following schema:
event
: Event names are of the format:on_[runnable_type]_(start|stream|end)
.name
: The name of theRunnable
that generated the event.run_id
: Randomly generated ID associated with the given execution of theRunnable
that emitted the event. A childRunnable
that gets invoked as part of the execution of a parentRunnable
is assigned its own unique ID.parent_ids
: The IDs of the parent runnables that generated the event. The rootRunnable
will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags
: The tags of theRunnable
that generated the event.metadata
: The metadata of theRunnable
that generated the event.data
: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
event | name | chunk | input | output |
---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'Hello' |
||
on_llm_end |
'[model name]' |
'Hello human!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
Attribute | Type | Description |
---|---|---|
name |
str |
A user defined name for the event. |
data |
Any |
The data associated with the event. This can be anything, though we suggest making it JSON serializable. |
Here are declarations associated with the standard events shown above:
format_docs
:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool
:
prompt
:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
version
|
The version of the schema to use either
TYPE:
|
include_names
|
Only include events from |
include_types
|
Only include events from |
include_tags
|
Only include events from |
exclude_names
|
Exclude events from |
exclude_types
|
Exclude events from |
exclude_tags
|
Exclude events from |
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[StreamEvent]
|
An async stream of |
RAISES | DESCRIPTION |
---|---|
NotImplementedError
|
If the version is not |
invoke
¶
invoke(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
headers: dict[str, str] | None = None,
params: QueryParamTypes | None = None,
**kwargs: Any,
) -> dict[str, Any] | Any
Create a run, wait until it finishes and return the final state.
PARAMETER | DESCRIPTION |
---|---|
input
|
Input to the graph. |
config
|
A
TYPE:
|
interrupt_before
|
Interrupt the graph before these nodes. |
interrupt_after
|
Interrupt the graph after these nodes. |
headers
|
Additional headers to pass to the request. |
**kwargs
|
Additional params to pass to RemoteGraph.stream.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
dict[str, Any] | Any
|
The output of the graph. |
ainvoke
async
¶
ainvoke(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
headers: dict[str, str] | None = None,
params: QueryParamTypes | None = None,
**kwargs: Any,
) -> dict[str, Any] | Any
Create a run, wait until it finishes and return the final state.
PARAMETER | DESCRIPTION |
---|---|
input
|
Input to the graph. |
config
|
A
TYPE:
|
interrupt_before
|
Interrupt the graph before these nodes. |
interrupt_after
|
Interrupt the graph after these nodes. |
headers
|
Additional headers to pass to the request. |
**kwargs
|
Additional params to pass to RemoteGraph.astream.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
dict[str, Any] | Any
|
The output of the graph. |
get_name
¶
get_input_schema
¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate input to the Runnable
.
Runnable
objects that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic input schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an input schema for a specific configuration.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate input. |
get_input_jsonschema
¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable
.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
dict[str, Any]
|
A JSON schema that represents the input to the |
Example
Added in version 0.3.0
get_output_schema
¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate output to the Runnable
.
Runnable
objects that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic output schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an output schema for a specific configuration.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate output. |
get_output_jsonschema
¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable
.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
dict[str, Any]
|
A JSON schema that represents the output of the |
Example
Added in version 0.3.0
config_schema
¶
The type of config this Runnable
accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives
methods.
PARAMETER | DESCRIPTION |
---|---|
include
|
A list of fields to include in the config schema. |
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate config. |
get_config_jsonschema
¶
get_prompts
¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable
.
__or__
¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
PARAMETER | DESCRIPTION |
---|---|
other
|
Another
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Other]
|
A new |
__ror__
¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
PARAMETER | DESCRIPTION |
---|---|
other
|
Another
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Other, Output]
|
A new |
pipe
¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable
objects.
Compose this Runnable
with Runnable
-like objects to make a
RunnableSequence
.
Equivalent to RunnableSequence(self, *others)
or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
PARAMETER | DESCRIPTION |
---|---|
*others
|
Other
TYPE:
|
name
|
An optional name for the resulting
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Other]
|
A new |
pick
¶
Pick keys from the output dict
of this Runnable
.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
PARAMETER | DESCRIPTION |
---|---|
keys
|
A key or list of keys to pick from the output dict. |
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Any, Any]
|
a new |
assign
¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict
output of this Runnable
.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
PARAMETER | DESCRIPTION |
---|---|
**kwargs
|
A mapping of keys to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Any, Any]
|
A new |
batch
¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Output]
|
A list of outputs from the |
batch_as_completed
¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
Run invoke
in parallel on a list of inputs.
Yields results as they complete.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
tuple[int, Output | Exception]
|
Tuples of the index of the input and the output from the |
abatch
async
¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
Default implementation runs ainvoke
in parallel using asyncio.gather
.
The default implementation of batch
works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Output]
|
A list of outputs from the |
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke
in parallel on a list of inputs.
Yields results as they complete.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
A tuple of the index of the input and the output from the |
astream_log
async
¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable
, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
diff
|
Whether to yield diffs between each step or the current state.
TYPE:
|
with_streamed_output_list
|
Whether to yield the
TYPE:
|
include_names
|
Only include logs with these names. |
include_types
|
Only include logs with these types. |
include_tags
|
Only include logs with these tags. |
exclude_names
|
Exclude logs with these names. |
exclude_types
|
Exclude logs with these types. |
exclude_tags
|
Exclude logs with these tags. |
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
A |
transform
¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream
.
Subclasses must override this method if they can start producing output while input is still being generated.
PARAMETER | DESCRIPTION |
---|---|
input
|
An iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
Output
|
The output of the |
atransform
async
¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream
.
Subclasses must override this method if they can start producing output while input is still being generated.
PARAMETER | DESCRIPTION |
---|---|
input
|
An async iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[Output]
|
The output of the |
bind
¶
Bind arguments to a Runnable
, returning a new Runnable
.
Useful when a Runnable
in a chain requires an argument that is not
in the output of the previous Runnable
or included in the user input.
PARAMETER | DESCRIPTION |
---|---|
**kwargs
|
The arguments to bind to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_listeners
¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable
, returning a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
PARAMETER | DESCRIPTION |
---|---|
on_start
|
Called before the
TYPE:
|
on_end
|
Called after the
TYPE:
|
on_error
|
Called if the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable
.
Returns a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
PARAMETER | DESCRIPTION |
---|---|
on_start
|
Called asynchronously before the
TYPE:
|
on_end
|
Called asynchronously after the
TYPE:
|
on_error
|
Called asynchronously if the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable
, returning a new Runnable
.
PARAMETER | DESCRIPTION |
---|---|
input_type
|
The input type to bind to the
TYPE:
|
output_type
|
The output type to bind to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
Create a new Runnable
that retries the original Runnable
on exceptions.
PARAMETER | DESCRIPTION |
---|---|
retry_if_exception_type
|
A tuple of exception types to retry on.
TYPE:
|
wait_exponential_jitter
|
Whether to add jitter to the wait time between retries.
TYPE:
|
stop_after_attempt
|
The maximum number of attempts to make before giving up.
TYPE:
|
exponential_jitter_params
|
Parameters for
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new Runnable that retries the original Runnable on exceptions. |
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map
¶
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable
, returning a new Runnable
.
The new Runnable
will try the original Runnable
, and then each fallback
in order, upon failures.
PARAMETER | DESCRIPTION |
---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
PARAMETER | DESCRIPTION |
---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
as_tool
¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
Create a BaseTool
from a Runnable
.
as_tool
will instantiate a BaseTool
with a name, description, and
args_schema
from a Runnable
. Where possible, schemas are inferred
from runnable.get_input_schema
. Alternatively (e.g., if the
Runnable
takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema
. You can also
pass arg_types
to just specify the required arguments and their types.
PARAMETER | DESCRIPTION |
---|---|
args_schema
|
The schema for the tool. |
name
|
The name of the tool.
TYPE:
|
description
|
The description of the tool.
TYPE:
|
arg_types
|
A dictionary of argument names to types. |
RETURNS | DESCRIPTION |
---|---|
BaseTool
|
A |
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via args_schema
:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via arg_types
:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input: