Runnables
    
              Bases: ABC, Generic[Input, Output]
A unit of work that can be invoked, batched, streamed, transformed and composed.
Key Methods¶
- invoke/- ainvoke: Transforms a single input into an output.
- batch/- abatch: Efficiently transforms multiple inputs into outputs.
- stream/- astream: Streams output from a single input as it's produced.
- astream_log: Streams output and selected intermediate results from an input.
Built-in optimizations:
- 
Batch: By default, batch runs invoke() in parallel using a thread pool executor. Override to optimize batching. 
- 
Async: Methods with 'a'suffix are asynchronous. By default, they execute the sync counterpart using asyncio's thread pool. Override for native async.
All methods accept an optional config argument, which can be used to configure execution, add tags and metadata for tracing and debugging etc.
Runnables expose schematic information about their input, output and config via
the input_schema property, the output_schema property and config_schema
method.
Composition¶
Runnable objects can be composed together to create chains in a declarative way.
Any chain constructed this way will automatically have sync, async, batch, and streaming support.
The main composition primitives are RunnableSequence and RunnableParallel.
RunnableSequence invokes a series of runnables sequentially, with
one Runnable's output serving as the next's input. Construct using
the | operator or by passing a list of runnables to RunnableSequence.
RunnableParallel invokes runnables concurrently, providing the same input
to each. Construct it using a dict literal within a sequence or by passing a
dict to RunnableParallel.
For example,
from langchain_core.runnables import RunnableLambda
# A RunnableSequence constructed using the `|` operator
sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2)
sequence.invoke(1)  # 4
sequence.batch([1, 2, 3])  # [4, 6, 8]
# A sequence that contains a RunnableParallel constructed using a dict literal
sequence = RunnableLambda(lambda x: x + 1) | {
    "mul_2": RunnableLambda(lambda x: x * 2),
    "mul_5": RunnableLambda(lambda x: x * 5),
}
sequence.invoke(1)  # {'mul_2': 4, 'mul_5': 10}
Standard Methods¶
All Runnables expose additional methods that can be used to modify their
behavior (e.g., add a retry policy, add lifecycle listeners, make them
configurable, etc.).
These methods will work on any Runnable, including Runnable chains
constructed by composing other Runnables.
See the individual methods for details.
For example,
from langchain_core.runnables import RunnableLambda
import random
def add_one(x: int) -> int:
    return x + 1
def buggy_double(y: int) -> int:
    """Buggy code that will fail 70% of the time"""
    if random.random() > 0.3:
        print('This code failed, and will probably be retried!')  # noqa: T201
        raise ValueError('Triggered buggy code')
    return y * 2
sequence = (
    RunnableLambda(add_one) |
    RunnableLambda(buggy_double).with_retry( # Retry on failure
        stop_after_attempt=10,
        wait_exponential_jitter=False
    )
)
print(sequence.input_schema.model_json_schema()) # Show inferred input schema
print(sequence.output_schema.model_json_schema()) # Show inferred output schema
print(sequence.invoke(2)) # invoke the sequence (note the retry above!!)
Debugging and tracing¶
As the chains get longer, it can be useful to be able to see intermediate results to debug and trace the chain.
You can set the global debug flag to True to enable debug output for all chains:
Alternatively, you can pass existing or custom callbacks to any given chain:
from langchain_core.tracers import ConsoleCallbackHandler
chain.invoke(..., config={"callbacks": [ConsoleCallbackHandler()]})
For a UI (and much more) checkout LangSmith.
| METHOD | DESCRIPTION | 
|---|---|
| get_name | Get the name of the  | 
| get_input_schema | Get a Pydantic model that can be used to validate input to the  | 
| get_input_jsonschema | Get a JSON schema that represents the input to the  | 
| get_output_schema | Get a Pydantic model that can be used to validate output to the  | 
| get_output_jsonschema | Get a JSON schema that represents the output of the  | 
| config_schema | The type of config this  | 
| get_config_jsonschema | Get a JSON schema that represents the config of the  | 
| get_graph | Return a graph representation of this  | 
| get_prompts | Return a list of prompts used by this  | 
| __or__ | Runnable "or" operator. | 
| __ror__ | Runnable "reverse-or" operator. | 
| pipe | Pipe  | 
| pick | Pick keys from the output  | 
| assign | Assigns new fields to the  | 
| invoke | Transform a single input into an output. | 
| ainvoke | Transform a single input into an output. | 
| batch | Default implementation runs invoke in parallel using a thread pool executor. | 
| batch_as_completed | Run  | 
| abatch | Default implementation runs  | 
| abatch_as_completed | Run  | 
| stream | Default implementation of  | 
| astream | Default implementation of  | 
| astream_log | Stream all output from a  | 
| astream_events | Generate a stream of events. | 
| transform | Transform inputs to outputs. | 
| atransform | Transform inputs to outputs. | 
| bind | Bind arguments to a  | 
| with_config | Bind config to a  | 
| with_listeners | Bind lifecycle listeners to a  | 
| with_alisteners | Bind async lifecycle listeners to a  | 
| with_types | Bind input and output types to a  | 
| with_retry | Create a new  | 
| map | Return a new  | 
| with_fallbacks | Add fallbacks to a  | 
| as_tool | Create a  | 
instance-attribute
  
¶
name: str | None
The name of the Runnable. Used for debugging and tracing.
property
  
¶
InputType: type[Input]
Input type.
The type of input this Runnable accepts specified as a type annotation.
| RAISES | DESCRIPTION | 
|---|---|
| TypeError | If the input type cannot be inferred. | 
property
  
¶
OutputType: type[Output]
Output Type.
The type of output this Runnable produces specified as a type annotation.
| RAISES | DESCRIPTION | 
|---|---|
| TypeError | If the output type cannot be inferred. | 
property
  
¶
    The type of input this Runnable accepts specified as a Pydantic model.
property
  
¶
    Output schema.
The type of output this Runnable produces specified as a Pydantic model.
property
  
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable.
      
    
      
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate input to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic input schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an input schema for a specific configuration.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate input. | 
      
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the input to the  | 
Example
Added in version 0.3.0
      
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate output to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic output schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an output schema for a specific configuration.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate output. | 
      
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the output of the  | 
Example
Added in version 0.3.0
      
    The type of config this Runnable accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives methods.
| PARAMETER | DESCRIPTION | 
|---|---|
| include | A list of fields to include in the config schema. | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate config. | 
      
    
      
get_graph(config: RunnableConfig | None = None) -> Graph
Return a graph representation of this Runnable.
      
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable.
      
__or__(
    other: Runnable[Any, Other]
    | Callable[[Iterator[Any]], Iterator[Other]]
    | Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
    | Callable[[Any], Other]
    | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
__ror__(
    other: Runnable[Other, Any]
    | Callable[[Iterator[Other]], Iterator[Any]]
    | Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
    | Callable[[Other], Any]
    | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Other, Output] | A new  | 
      
pipe(
    *others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable objects.
Compose this Runnable with Runnable-like objects to make a
RunnableSequence.
Equivalent to RunnableSequence(self, *others) or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
    return x + 1
def mul_two(x: int) -> int:
    return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| PARAMETER | DESCRIPTION | 
|---|---|
| *others | Other  
                  
                    TYPE:
                       | 
| name | An optional name for the resulting  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
    Pick keys from the output dict of this Runnable.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| PARAMETER | DESCRIPTION | 
|---|---|
| keys | A key or list of keys to pick from the output dict. | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | a new  | 
      
assign(
    **kwargs: Runnable[dict[str, Any], Any]
    | Callable[[dict[str, Any]], Any]
    | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | A mapping of keys to  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | A new  | 
      
abstractmethod
  
¶
invoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
Transform a single input into an output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
ainvoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
Transform a single input into an output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
batch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
batch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
Run invoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| tuple[int, Output | Exception] | Tuples of the index of the input and the output from the  | 
      
async
  
¶
abatch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
async
  
¶
abatch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[tuple[int, Output | Exception]] | A tuple of the index of the input and the output from the  | 
      
stream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Default implementation of stream, which calls invoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
astream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
Default implementation of astream, which calls ainvoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
async
  
¶
astream_log(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    diff: bool = True,
    with_streamed_output_list: bool = True,
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| diff | Whether to yield diffs between each step or the current state. 
                  
                    TYPE:
                       | 
| with_streamed_output_list | Whether to yield the  
                  
                    TYPE:
                       | 
| include_names | Only include logs with these names. | 
| include_types | Only include logs with these types. | 
| include_tags | Only include logs with these tags. | 
| exclude_names | Exclude logs with these names. | 
| exclude_types | Exclude logs with these types. | 
| exclude_tags | Exclude logs with these tags. | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[RunLogPatch] | AsyncIterator[RunLog] | A  | 
      
async
  
¶
astream_events(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    version: Literal["v1", "v2"] = "v2",
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent that provide real-time information
about the progress of the Runnable, including StreamEvent from intermediate
results.
A StreamEvent is a dictionary with the following schema:
- event: Event names are of the format:- on_[runnable_type]_(start|stream|end).
- name: The name of the- Runnablethat generated the event.
- run_id: Randomly generated ID associated with the given execution of the- Runnablethat emitted the event. A child- Runnablethat gets invoked as part of the execution of a parent- Runnableis assigned its own unique ID.
- parent_ids: The IDs of the parent runnables that generated the event. The root- Runnablewill have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
- tags: The tags of the- Runnablethat generated the event.
- metadata: The metadata of the- Runnablethat generated the event.
- data: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
| event | name | chunk | input | output | 
|---|---|---|---|---|
| on_chat_model_start | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | ||
| on_chat_model_stream | '[model name]' | AIMessageChunk(content="hello") | ||
| on_chat_model_end | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | AIMessageChunk(content="hello world") | |
| on_llm_start | '[model name]' | {'input': 'hello'} | ||
| on_llm_stream | '[model name]' | 'Hello' | ||
| on_llm_end | '[model name]' | 'Hello human!' | ||
| on_chain_start | 'format_docs' | |||
| on_chain_stream | 'format_docs' | 'hello world!, goodbye world!' | ||
| on_chain_end | 'format_docs' | [Document(...)] | 'hello world!, goodbye world!' | |
| on_tool_start | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_tool_end | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_retriever_start | '[retriever name]' | {"query": "hello"} | ||
| on_retriever_end | '[retriever name]' | {"query": "hello"} | [Document(...), ..] | |
| on_prompt_start | '[template_name]' | {"question": "hello"} | ||
| on_prompt_end | '[template_name]' | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage, ...]) | 
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
| Attribute | Type | Description | 
|---|---|---|
| name | str | A user defined name for the event. | 
| data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. | 
Here are declarations associated with the standard events shown above:
format_docs:
def format_docs(docs: list[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are Cat Agent 007"),
        ("human", "{question}"),
    ]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
    return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]
from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| version | The version of the schema to use either  
                  
                    TYPE:
                       | 
| include_names | Only include events from  | 
| include_types | Only include events from  | 
| include_tags | Only include events from  | 
| exclude_names | Exclude events from  | 
| exclude_types | Exclude events from  | 
| exclude_tags | Exclude events from  | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[StreamEvent] | An async stream of  | 
| RAISES | DESCRIPTION | 
|---|---|
| NotImplementedError | If the version is not  | 
      
transform(
    input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
atransform(
    input: AsyncIterator[Input],
    config: RunnableConfig | None = None,
    **kwargs: Any | None,
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An async iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
    Bind arguments to a Runnable, returning a new Runnable.
Useful when a Runnable in a chain requires an argument that is not
in the output of the previous Runnable or included in the user input.
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | The arguments to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
      
with_config(
    config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to bind to the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_listeners(
    *,
    on_start: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
    on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
    on_error: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable, returning a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called before the  
                  
                    TYPE:
                       | 
| on_end | Called after the  
                  
                    TYPE:
                       | 
| on_error | Called if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
    time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
      
with_alisteners(
    *,
    on_start: AsyncListener | None = None,
    on_end: AsyncListener | None = None,
    on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable.
Returns a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called asynchronously before the  
                  
                    TYPE:
                       | 
| on_end | Called asynchronously after the  
                  
                    TYPE:
                       | 
| on_error | Called asynchronously if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
    return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
    print(f"on start callback starts at {format_t(time.time())}")
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
    print(f"on end callback starts at {format_t(time.time())}")
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
      
with_types(
    *, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| input_type | The input type to bind to the  
                  
                    TYPE:
                       | 
| output_type | The output type to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_retry(
    *,
    retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
    wait_exponential_jitter: bool = True,
    exponential_jitter_params: ExponentialJitterParams | None = None,
    stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
| PARAMETER | DESCRIPTION | 
|---|---|
| retry_if_exception_type | A tuple of exception types to retry on. 
                  
                    TYPE:
                       | 
| wait_exponential_jitter | Whether to add jitter to the wait time between retries. 
                  
                    TYPE:
                       | 
| stop_after_attempt | The maximum number of attempts to make before giving up. 
                  
                    TYPE:
                       | 
| exponential_jitter_params | Parameters for
 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new Runnable that retries the original Runnable on exceptions. | 
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
        pass
runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass
assert count == 2
      
    
      
with_fallbacks(
    fallbacks: Sequence[Runnable[Input, Output]],
    *,
    exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
    exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable, returning a new Runnable.
The new Runnable will try the original Runnable, and then each fallback
in order, upon failures.
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""
def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
)
print("".join(runnable.stream({})))  # foo bar
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
      
as_tool(
    args_schema: type[BaseModel] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    arg_types: dict[str, type] | None = None,
) -> BaseTool
Create a BaseTool from a Runnable.
as_tool will instantiate a BaseTool with a name, description, and
args_schema from a Runnable. Where possible, schemas are inferred
from runnable.get_input_schema. Alternatively (e.g., if the
Runnable takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema. You can also
pass arg_types to just specify the required arguments and their types.
| PARAMETER | DESCRIPTION | 
|---|---|
| args_schema | The schema for the tool. | 
| name | The name of the tool. 
                  
                    TYPE:
                       | 
| description | The description of the tool. 
                  
                    TYPE:
                       | 
| arg_types | A dictionary of argument names to types. | 
| RETURNS | DESCRIPTION | 
|---|---|
| BaseTool | A  | 
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
    a: int
    b: list[int]
def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via args_schema:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""
    a: int = Field(..., description="Integer")
    b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via arg_types:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
    
              Bases: RunnableBindingBase[Input, Output]
Wrap a Runnable with additional functionality.
A RunnableBinding can be thought of as a "runnable decorator" that
preserves the essential features of Runnable; i.e., batching, streaming,
and async support, while adding additional functionality.
Any class that inherits from Runnable can be bound to a RunnableBinding.
Runnables expose a standard set of methods for creating RunnableBindings
or sub-classes of RunnableBindings (e.g., RunnableRetry,
RunnableWithFallbacks) that add additional functionality.
These methods include:
- bind: Bind kwargs to pass to the underlying- Runnablewhen running it.
- with_config: Bind config to pass to the underlying- Runnablewhen running it.
- with_listeners: Bind lifecycle listeners to the underlying- Runnable.
- with_types: Override the input and output types of the underlying- Runnable.
- with_retry: Bind a retry policy to the underlying- Runnable.
- with_fallbacks: Bind a fallback policy to the underlying- Runnable.
Example:
bind: Bind kwargs to pass to the underlying Runnable when running it.
```python
# Create a Runnable binding that invokes the chat model with the
# additional kwarg `stop=['-']` when running it.
from langchain_community.chat_models import ChatOpenAI
model = ChatOpenAI()
model.invoke('Say "Parrot-MAGIC"', stop=["-"])  # Should return `Parrot`
# Using it the easy way via `bind` method which returns a new
# RunnableBinding
runnable_binding = model.bind(stop=["-"])
runnable_binding.invoke('Say "Parrot-MAGIC"')  # Should return `Parrot`
```
Can also be done by instantiating a `RunnableBinding` directly (not
recommended):
```python
from langchain_core.runnables import RunnableBinding
runnable_binding = RunnableBinding(
    bound=model,
    kwargs={"stop": ["-"]},  # <-- Note the additional kwargs
)
runnable_binding.invoke('Say "Parrot-MAGIC"')  # Should return `Parrot`
```
| METHOD | DESCRIPTION | 
|---|---|
| bind | Bind additional kwargs to a  | 
| with_config | Bind config to a  | 
| with_listeners | Bind lifecycle listeners to a  | 
| with_types | Bind input and output types to a  | 
| with_retry | Create a new  | 
| get_name | Get the name of the  | 
| get_input_schema | Get a Pydantic model that can be used to validate input to the  | 
| get_input_jsonschema | Get a JSON schema that represents the input to the  | 
| get_output_schema | Get a Pydantic model that can be used to validate output to the  | 
| get_output_jsonschema | Get a JSON schema that represents the output of the  | 
| config_schema | The type of config this  | 
| get_config_jsonschema | Get a JSON schema that represents the config of the  | 
| get_graph | Return a graph representation of this  | 
| get_prompts | Return a list of prompts used by this  | 
| __or__ | Runnable "or" operator. | 
| __ror__ | Runnable "reverse-or" operator. | 
| pipe | Pipe  | 
| pick | Pick keys from the output  | 
| assign | Assigns new fields to the  | 
| invoke | Transform a single input into an output. | 
| ainvoke | Transform a single input into an output. | 
| batch | Default implementation runs invoke in parallel using a thread pool executor. | 
| batch_as_completed | Run  | 
| abatch | Default implementation runs  | 
| abatch_as_completed | Run  | 
| stream | Default implementation of  | 
| astream | Default implementation of  | 
| astream_log | Stream all output from a  | 
| astream_events | Generate a stream of events. | 
| transform | Transform inputs to outputs. | 
| atransform | Transform inputs to outputs. | 
| with_alisteners | Bind async lifecycle listeners to a  | 
| map | Return a new  | 
| with_fallbacks | Add fallbacks to a  | 
| as_tool | Create a  | 
| __init__ | Create a  | 
| is_lc_serializable | Return True as this class is serializable. | 
| get_lc_namespace | Get the namespace of the LangChain object. | 
| lc_id | Return a unique identifier for this class for serialization purposes. | 
| to_json | Serialize the  | 
| to_json_not_implemented | Serialize a "not implemented" object. | 
| configurable_fields | Configure particular  | 
| configurable_alternatives | Configure alternatives for  | 
class-attribute
      instance-attribute
  
¶
name: str | None = None
The name of the Runnable. Used for debugging and tracing.
property
  
¶
InputType: type[Input]
Input type.
The type of input this Runnable accepts specified as a type annotation.
| RAISES | DESCRIPTION | 
|---|---|
| TypeError | If the input type cannot be inferred. | 
property
  
¶
OutputType: type[Output]
Output Type.
The type of output this Runnable produces specified as a type annotation.
| RAISES | DESCRIPTION | 
|---|---|
| TypeError | If the output type cannot be inferred. | 
property
  
¶
    The type of input this Runnable accepts specified as a Pydantic model.
property
  
¶
    Output schema.
The type of output this Runnable produces specified as a Pydantic model.
property
  
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable.
property
  
¶
    A map of constructor argument names to secret ids.
For example, {"openai_api_key": "OPENAI_API_KEY"}
property
  
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
Default is an empty dictionary.
instance-attribute
  
¶
bound: Runnable[Input, Output]
The underlying Runnable that this Runnable delegates to.
class-attribute
      instance-attribute
  
¶
    kwargs to pass to the underlying Runnable when running.
For example, when the Runnable binding is invoked the underlying
Runnable will be invoked with the same input but with these additional
kwargs.
class-attribute
      instance-attribute
  
¶
config: RunnableConfig = config or {}
The config to bind to the underlying Runnable.
class-attribute
      instance-attribute
  
¶
config_factories: list[Callable[[RunnableConfig], RunnableConfig]] = Field(
    default_factory=list
)
The config factories to bind to the underlying Runnable.
class-attribute
      instance-attribute
  
¶
custom_input_type: Any | None = None
Override the input type of the underlying Runnable with a custom type.
The type can be a Pydantic model, or a type annotation (e.g., list[str]).
class-attribute
      instance-attribute
  
¶
custom_output_type: Any | None = None
Override the output type of the underlying Runnable with a custom type.
The type can be a Pydantic model, or a type annotation (e.g., list[str]).
      
    Bind additional kwargs to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | The kwargs to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
| Runnable[Input, Output] | but with the additional kwargs bound. | 
      
with_config(
    config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to bind to the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_listeners(
    *,
    on_start: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
    on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
    on_error: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable, returning a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called before the  
                  
                    TYPE:
                       | 
| on_end | Called after the  
                  
                    TYPE:
                       | 
| on_error | Called if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_types(
    input_type: type[Input] | BaseModel | None = None,
    output_type: type[Output] | BaseModel | None = None,
) -> Runnable[Input, Output]
Bind input and output types to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| input_type | The input type to bind to the  
                  
                    TYPE:
                       | 
| output_type | The output type to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
    Create a new Runnable that retries the original Runnable on exceptions.
| PARAMETER | DESCRIPTION | 
|---|---|
| retry_if_exception_type | A tuple of exception types to retry on. 
                  
                    TYPE:
                       | 
| wait_exponential_jitter | Whether to add jitter to the wait time between retries. 
                  
                    TYPE:
                       | 
| stop_after_attempt | The maximum number of attempts to make before giving up. 
                  
                    TYPE:
                       | 
| exponential_jitter_params | Parameters for
 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new Runnable that retries the original Runnable on exceptions. | 
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
        pass
runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass
assert count == 2
      
    
      
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate input to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic input schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an input schema for a specific configuration.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate input. | 
      
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the input to the  | 
Example
Added in version 0.3.0
      
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate output to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic output schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an output schema for a specific configuration.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate output. | 
      
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the output of the  | 
Example
Added in version 0.3.0
      
    The type of config this Runnable accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives methods.
| PARAMETER | DESCRIPTION | 
|---|---|
| include | A list of fields to include in the config schema. | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate config. | 
      
    
      
get_graph(config: RunnableConfig | None = None) -> Graph
Return a graph representation of this Runnable.
      
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable.
      
__or__(
    other: Runnable[Any, Other]
    | Callable[[Iterator[Any]], Iterator[Other]]
    | Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
    | Callable[[Any], Other]
    | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
__ror__(
    other: Runnable[Other, Any]
    | Callable[[Iterator[Other]], Iterator[Any]]
    | Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
    | Callable[[Other], Any]
    | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Other, Output] | A new  | 
      
pipe(
    *others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable objects.
Compose this Runnable with Runnable-like objects to make a
RunnableSequence.
Equivalent to RunnableSequence(self, *others) or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
    return x + 1
def mul_two(x: int) -> int:
    return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| PARAMETER | DESCRIPTION | 
|---|---|
| *others | Other  
                  
                    TYPE:
                       | 
| name | An optional name for the resulting  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
    Pick keys from the output dict of this Runnable.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| PARAMETER | DESCRIPTION | 
|---|---|
| keys | A key or list of keys to pick from the output dict. | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | a new  | 
      
assign(
    **kwargs: Runnable[dict[str, Any], Any]
    | Callable[[dict[str, Any]], Any]
    | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | A mapping of keys to  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | A new  | 
      
invoke(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Output
Transform a single input into an output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
ainvoke(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Output
Transform a single input into an output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
batch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
batch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
Run invoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| tuple[int, Output | Exception] | Tuples of the index of the input and the output from the  | 
      
async
  
¶
abatch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
async
  
¶
abatch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[tuple[int, Output | Exception]] | A tuple of the index of the input and the output from the  | 
      
stream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Default implementation of stream, which calls invoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
astream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
Default implementation of astream, which calls ainvoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
async
  
¶
astream_log(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    diff: bool = True,
    with_streamed_output_list: bool = True,
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| diff | Whether to yield diffs between each step or the current state. 
                  
                    TYPE:
                       | 
| with_streamed_output_list | Whether to yield the  
                  
                    TYPE:
                       | 
| include_names | Only include logs with these names. | 
| include_types | Only include logs with these types. | 
| include_tags | Only include logs with these tags. | 
| exclude_names | Exclude logs with these names. | 
| exclude_types | Exclude logs with these types. | 
| exclude_tags | Exclude logs with these tags. | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[RunLogPatch] | AsyncIterator[RunLog] | A  | 
      
async
  
¶
astream_events(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent that provide real-time information
about the progress of the Runnable, including StreamEvent from intermediate
results.
A StreamEvent is a dictionary with the following schema:
- event: Event names are of the format:- on_[runnable_type]_(start|stream|end).
- name: The name of the- Runnablethat generated the event.
- run_id: Randomly generated ID associated with the given execution of the- Runnablethat emitted the event. A child- Runnablethat gets invoked as part of the execution of a parent- Runnableis assigned its own unique ID.
- parent_ids: The IDs of the parent runnables that generated the event. The root- Runnablewill have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
- tags: The tags of the- Runnablethat generated the event.
- metadata: The metadata of the- Runnablethat generated the event.
- data: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
| event | name | chunk | input | output | 
|---|---|---|---|---|
| on_chat_model_start | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | ||
| on_chat_model_stream | '[model name]' | AIMessageChunk(content="hello") | ||
| on_chat_model_end | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | AIMessageChunk(content="hello world") | |
| on_llm_start | '[model name]' | {'input': 'hello'} | ||
| on_llm_stream | '[model name]' | 'Hello' | ||
| on_llm_end | '[model name]' | 'Hello human!' | ||
| on_chain_start | 'format_docs' | |||
| on_chain_stream | 'format_docs' | 'hello world!, goodbye world!' | ||
| on_chain_end | 'format_docs' | [Document(...)] | 'hello world!, goodbye world!' | |
| on_tool_start | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_tool_end | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_retriever_start | '[retriever name]' | {"query": "hello"} | ||
| on_retriever_end | '[retriever name]' | {"query": "hello"} | [Document(...), ..] | |
| on_prompt_start | '[template_name]' | {"question": "hello"} | ||
| on_prompt_end | '[template_name]' | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage, ...]) | 
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
| Attribute | Type | Description | 
|---|---|---|
| name | str | A user defined name for the event. | 
| data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. | 
Here are declarations associated with the standard events shown above:
format_docs:
def format_docs(docs: list[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are Cat Agent 007"),
        ("human", "{question}"),
    ]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
    return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]
from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| version | The version of the schema to use either  
                  
                    TYPE:
                       | 
| include_names | Only include events from  | 
| include_types | Only include events from  | 
| include_tags | Only include events from  | 
| exclude_names | Exclude events from  | 
| exclude_types | Exclude events from  | 
| exclude_tags | Exclude events from  | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[StreamEvent] | An async stream of  | 
| RAISES | DESCRIPTION | 
|---|---|
| NotImplementedError | If the version is not  | 
      
transform(
    input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
atransform(
    input: AsyncIterator[Input], config: RunnableConfig | None = None, **kwargs: Any
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An async iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
with_alisteners(
    *,
    on_start: AsyncListener | None = None,
    on_end: AsyncListener | None = None,
    on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable.
Returns a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called asynchronously before the  
                  
                    TYPE:
                       | 
| on_end | Called asynchronously after the  
                  
                    TYPE:
                       | 
| on_error | Called asynchronously if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
    return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
    print(f"on start callback starts at {format_t(time.time())}")
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
    print(f"on end callback starts at {format_t(time.time())}")
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
      
    
      
with_fallbacks(
    fallbacks: Sequence[Runnable[Input, Output]],
    *,
    exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
    exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable, returning a new Runnable.
The new Runnable will try the original Runnable, and then each fallback
in order, upon failures.
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""
def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
)
print("".join(runnable.stream({})))  # foo bar
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
      
as_tool(
    args_schema: type[BaseModel] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    arg_types: dict[str, type] | None = None,
) -> BaseTool
Create a BaseTool from a Runnable.
as_tool will instantiate a BaseTool with a name, description, and
args_schema from a Runnable. Where possible, schemas are inferred
from runnable.get_input_schema. Alternatively (e.g., if the
Runnable takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema. You can also
pass arg_types to just specify the required arguments and their types.
| PARAMETER | DESCRIPTION | 
|---|---|
| args_schema | The schema for the tool. | 
| name | The name of the tool. 
                  
                    TYPE:
                       | 
| description | The description of the tool. 
                  
                    TYPE:
                       | 
| arg_types | A dictionary of argument names to types. | 
| RETURNS | DESCRIPTION | 
|---|---|
| BaseTool | A  | 
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
    a: int
    b: list[int]
def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via args_schema:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""
    a: int = Field(..., description="Integer")
    b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via arg_types:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
      
__init__(
    *,
    bound: Runnable[Input, Output],
    kwargs: Mapping[str, Any] | None = None,
    config: RunnableConfig | None = None,
    config_factories: list[Callable[[RunnableConfig], RunnableConfig]] | None = None,
    custom_input_type: type[Input] | BaseModel | None = None,
    custom_output_type: type[Output] | BaseModel | None = None,
    **other_kwargs: Any,
) -> None
Create a RunnableBinding from a Runnable and kwargs.
| PARAMETER | DESCRIPTION | 
|---|---|
| bound | The underlying  
                  
                    TYPE:
                       | 
| kwargs | optional kwargs to pass to the underlying  | 
| config | optional config to bind to the underlying  
                  
                    TYPE:
                       | 
| config_factories | optional list of config factories to apply to the
config before binding to the underlying  
                  
                    TYPE:
                       | 
| custom_input_type | Specify to override the input type of the underlying
 | 
| custom_output_type | Specify to override the output type of the underlying
 | 
| **other_kwargs | Unpacked into the base class. 
                  
                    TYPE:
                       | 
      
classmethod
  
¶
is_lc_serializable() -> bool
Return True as this class is serializable.
      
classmethod
  
¶
    
      
classmethod
  
¶
    Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
For example, for the class langchain.llms.openai.OpenAI, the id is
["langchain", "llms", "openai", "OpenAI"].
      
    Serialize the Runnable to JSON.
| RETURNS | DESCRIPTION | 
|---|---|
| SerializedConstructor | SerializedNotImplemented | A JSON-serializable representation of the  | 
      
    Serialize a "not implemented" object.
| RETURNS | DESCRIPTION | 
|---|---|
| SerializedNotImplemented | 
 | 
      
configurable_fields(
    **kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
Configure particular Runnable fields at runtime.
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | A dictionary of  
                  
                    TYPE:
                       | 
| RAISES | DESCRIPTION | 
|---|---|
| ValueError | If a configuration key is not found in the  | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Output] | A new  | 
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
    "max_tokens_200: ",
    model.with_config(configurable={"output_token_number": 200})
    .invoke("tell me something about chess")
    .content,
)
      
configurable_alternatives(
    which: ConfigurableField,
    *,
    default_key: str = "default",
    prefix_keys: bool = False,
    **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnable objects that can be set at runtime.
| PARAMETER | DESCRIPTION | 
|---|---|
| which | The  
                  
                    TYPE:
                       | 
| default_key | The default key to use if no alternative is selected. 
                  
                    TYPE:
                       | 
| prefix_keys | Whether to prefix the keys with the  
                  
                    TYPE:
                       | 
| **kwargs | A dictionary of keys to  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Output] | A new  | 
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
    model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
    model.with_config(configurable={"llm": "openai"})
    .invoke("which organization created you?")
    .content
)
    
              Bases: Runnable[Input, Output]
Runnable that runs a generator function.
RunnableGenerators can be instantiated directly or by using a generator within
a sequence.
RunnableGenerators can be used to implement custom behavior, such as custom
output parsers, while preserving streaming capabilities. Given a generator function
with a signature Iterator[A] -> Iterator[B], wrapping it in a
RunnableGenerator allows it to emit output chunks as soon as they are streamed
in from the previous step.
Note
If a generator function has a signature A -> Iterator[B], such that it
requires its input from the previous step to be completed before emitting chunks
(e.g., most LLMs need the entire prompt available to start generating), it can
instead be wrapped in a RunnableLambda.
Here is an example to show the basic mechanics of a RunnableGenerator:
```python
from typing import Any, AsyncIterator, Iterator
from langchain_core.runnables import RunnableGenerator
def gen(input: Iterator[Any]) -> Iterator[str]:
    for token in ["Have", " a", " nice", " day"]:
        yield token
runnable = RunnableGenerator(gen)
runnable.invoke(None)  # "Have a nice day"
list(runnable.stream(None))  # ["Have", " a", " nice", " day"]
runnable.batch([None, None])  # ["Have a nice day", "Have a nice day"]
# Async version:
async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]:
    for token in ["Have", " a", " nice", " day"]:
        yield token
runnable = RunnableGenerator(agen)
await runnable.ainvoke(None)  # "Have a nice day"
[p async for p in runnable.astream(None)]  # ["Have", " a", " nice", " day"]
```
RunnableGenerator makes it easy to implement custom behavior within a streaming
context. Below we show an example:
```python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableGenerator, RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI()
chant_chain = (
    ChatPromptTemplate.from_template("Give me a 3 word chant about {topic}")
    | model
    | StrOutputParser()
)
def character_generator(input: Iterator[str]) -> Iterator[str]:
    for token in input:
        if "," in token or "." in token:
            yield "👏" + token
        else:
            yield token
runnable = chant_chain | character_generator
assert type(runnable.last) is RunnableGenerator
"".join(runnable.stream({"topic": "waste"}))  # Reduce👏, Reuse👏, Recycle👏.
# Note that RunnableLambda can be used to delay streaming of one step in a
# sequence until the previous step is finished:
def reverse_generator(input: str) -> Iterator[str]:
    # Yield characters of input in reverse order.
    for character in input[::-1]:
        yield character
runnable = chant_chain | RunnableLambda(reverse_generator)
"".join(runnable.stream({"topic": "waste"}))  # ".elcycer ,esuer ,ecudeR"
```
| METHOD | DESCRIPTION | 
|---|---|
| __init__ | Initialize a  | 
| get_input_schema | Get a Pydantic model that can be used to validate input to the  | 
| get_output_schema | Get a Pydantic model that can be used to validate output to the  | 
| transform | Transform inputs to outputs. | 
| stream | Default implementation of  | 
| invoke | Transform a single input into an output. | 
| atransform | Transform inputs to outputs. | 
| astream | Default implementation of  | 
| ainvoke | Transform a single input into an output. | 
| get_name | Get the name of the  | 
| get_input_jsonschema | Get a JSON schema that represents the input to the  | 
| get_output_jsonschema | Get a JSON schema that represents the output of the  | 
| config_schema | The type of config this  | 
| get_config_jsonschema | Get a JSON schema that represents the config of the  | 
| get_graph | Return a graph representation of this  | 
| get_prompts | Return a list of prompts used by this  | 
| __or__ | Runnable "or" operator. | 
| __ror__ | Runnable "reverse-or" operator. | 
| pipe | Pipe  | 
| pick | Pick keys from the output  | 
| assign | Assigns new fields to the  | 
| batch | Default implementation runs invoke in parallel using a thread pool executor. | 
| batch_as_completed | Run  | 
| abatch | Default implementation runs  | 
| abatch_as_completed | Run  | 
| astream_log | Stream all output from a  | 
| astream_events | Generate a stream of events. | 
| bind | Bind arguments to a  | 
| with_config | Bind config to a  | 
| with_listeners | Bind lifecycle listeners to a  | 
| with_alisteners | Bind async lifecycle listeners to a  | 
| with_types | Bind input and output types to a  | 
| with_retry | Create a new  | 
| map | Return a new  | 
| with_fallbacks | Add fallbacks to a  | 
| as_tool | Create a  | 
instance-attribute
  
¶
    The name of the Runnable. Used for debugging and tracing.
property
  
¶
InputType: Any
Input type.
The type of input this Runnable accepts specified as a type annotation.
| RAISES | DESCRIPTION | 
|---|---|
| TypeError | If the input type cannot be inferred. | 
property
  
¶
OutputType: Any
Output Type.
The type of output this Runnable produces specified as a type annotation.
| RAISES | DESCRIPTION | 
|---|---|
| TypeError | If the output type cannot be inferred. | 
property
  
¶
    The type of input this Runnable accepts specified as a Pydantic model.
property
  
¶
    Output schema.
The type of output this Runnable produces specified as a Pydantic model.
property
  
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable.
      
__init__(
    transform: Callable[[Iterator[Input]], Iterator[Output]]
    | Callable[[AsyncIterator[Input]], AsyncIterator[Output]],
    atransform: Callable[[AsyncIterator[Input]], AsyncIterator[Output]] | None = None,
    *,
    name: str | None = None,
) -> None
Initialize a RunnableGenerator.
| PARAMETER | DESCRIPTION | 
|---|---|
| transform | The transform function. 
                  
                    TYPE:
                       | 
| atransform | The async transform function. 
                  
                    TYPE:
                       | 
| name | The name of the  
                  
                    TYPE:
                       | 
| RAISES | DESCRIPTION | 
|---|---|
| TypeError | If the transform is not a generator function. | 
      
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate input to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic input schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an input schema for a specific configuration.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate input. | 
      
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate output to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic output schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an output schema for a specific configuration.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate output. | 
      
transform(
    input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
stream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any
) -> Iterator[Output]
Default implementation of stream, which calls invoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
invoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
Transform a single input into an output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
atransform(
    input: AsyncIterator[Input], config: RunnableConfig | None = None, **kwargs: Any
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An async iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
astream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any
) -> AsyncIterator[Output]
Default implementation of astream, which calls ainvoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
async
  
¶
ainvoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
Transform a single input into an output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
    
      
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the input to the  | 
Example
Added in version 0.3.0
      
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the output of the  | 
Example
Added in version 0.3.0
      
    The type of config this Runnable accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives methods.
| PARAMETER | DESCRIPTION | 
|---|---|
| include | A list of fields to include in the config schema. | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate config. | 
      
    
      
get_graph(config: RunnableConfig | None = None) -> Graph
Return a graph representation of this Runnable.
      
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable.
      
__or__(
    other: Runnable[Any, Other]
    | Callable[[Iterator[Any]], Iterator[Other]]
    | Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
    | Callable[[Any], Other]
    | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
__ror__(
    other: Runnable[Other, Any]
    | Callable[[Iterator[Other]], Iterator[Any]]
    | Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
    | Callable[[Other], Any]
    | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Other, Output] | A new  | 
      
pipe(
    *others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable objects.
Compose this Runnable with Runnable-like objects to make a
RunnableSequence.
Equivalent to RunnableSequence(self, *others) or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
    return x + 1
def mul_two(x: int) -> int:
    return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| PARAMETER | DESCRIPTION | 
|---|---|
| *others | Other  
                  
                    TYPE:
                       | 
| name | An optional name for the resulting  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
    Pick keys from the output dict of this Runnable.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| PARAMETER | DESCRIPTION | 
|---|---|
| keys | A key or list of keys to pick from the output dict. | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | a new  | 
      
assign(
    **kwargs: Runnable[dict[str, Any], Any]
    | Callable[[dict[str, Any]], Any]
    | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | A mapping of keys to  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | A new  | 
      
batch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
batch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
Run invoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| tuple[int, Output | Exception] | Tuples of the index of the input and the output from the  | 
      
async
  
¶
abatch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
async
  
¶
abatch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[tuple[int, Output | Exception]] | A tuple of the index of the input and the output from the  | 
      
async
  
¶
astream_log(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    diff: bool = True,
    with_streamed_output_list: bool = True,
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| diff | Whether to yield diffs between each step or the current state. 
                  
                    TYPE:
                       | 
| with_streamed_output_list | Whether to yield the  
                  
                    TYPE:
                       | 
| include_names | Only include logs with these names. | 
| include_types | Only include logs with these types. | 
| include_tags | Only include logs with these tags. | 
| exclude_names | Exclude logs with these names. | 
| exclude_types | Exclude logs with these types. | 
| exclude_tags | Exclude logs with these tags. | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[RunLogPatch] | AsyncIterator[RunLog] | A  | 
      
async
  
¶
astream_events(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    version: Literal["v1", "v2"] = "v2",
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent that provide real-time information
about the progress of the Runnable, including StreamEvent from intermediate
results.
A StreamEvent is a dictionary with the following schema:
- event: Event names are of the format:- on_[runnable_type]_(start|stream|end).
- name: The name of the- Runnablethat generated the event.
- run_id: Randomly generated ID associated with the given execution of the- Runnablethat emitted the event. A child- Runnablethat gets invoked as part of the execution of a parent- Runnableis assigned its own unique ID.
- parent_ids: The IDs of the parent runnables that generated the event. The root- Runnablewill have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
- tags: The tags of the- Runnablethat generated the event.
- metadata: The metadata of the- Runnablethat generated the event.
- data: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
| event | name | chunk | input | output | 
|---|---|---|---|---|
| on_chat_model_start | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | ||
| on_chat_model_stream | '[model name]' | AIMessageChunk(content="hello") | ||
| on_chat_model_end | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | AIMessageChunk(content="hello world") | |
| on_llm_start | '[model name]' | {'input': 'hello'} | ||
| on_llm_stream | '[model name]' | 'Hello' | ||
| on_llm_end | '[model name]' | 'Hello human!' | ||
| on_chain_start | 'format_docs' | |||
| on_chain_stream | 'format_docs' | 'hello world!, goodbye world!' | ||
| on_chain_end | 'format_docs' | [Document(...)] | 'hello world!, goodbye world!' | |
| on_tool_start | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_tool_end | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_retriever_start | '[retriever name]' | {"query": "hello"} | ||
| on_retriever_end | '[retriever name]' | {"query": "hello"} | [Document(...), ..] | |
| on_prompt_start | '[template_name]' | {"question": "hello"} | ||
| on_prompt_end | '[template_name]' | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage, ...]) | 
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
| Attribute | Type | Description | 
|---|---|---|
| name | str | A user defined name for the event. | 
| data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. | 
Here are declarations associated with the standard events shown above:
format_docs:
def format_docs(docs: list[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are Cat Agent 007"),
        ("human", "{question}"),
    ]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
    return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]
from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| version | The version of the schema to use either  
                  
                    TYPE:
                       | 
| include_names | Only include events from  | 
| include_types | Only include events from  | 
| include_tags | Only include events from  | 
| exclude_names | Exclude events from  | 
| exclude_types | Exclude events from  | 
| exclude_tags | Exclude events from  | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[StreamEvent] | An async stream of  | 
| RAISES | DESCRIPTION | 
|---|---|
| NotImplementedError | If the version is not  | 
      
    Bind arguments to a Runnable, returning a new Runnable.
Useful when a Runnable in a chain requires an argument that is not
in the output of the previous Runnable or included in the user input.
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | The arguments to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
      
with_config(
    config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to bind to the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_listeners(
    *,
    on_start: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
    on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
    on_error: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable, returning a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called before the  
                  
                    TYPE:
                       | 
| on_end | Called after the  
                  
                    TYPE:
                       | 
| on_error | Called if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
    time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
      
with_alisteners(
    *,
    on_start: AsyncListener | None = None,
    on_end: AsyncListener | None = None,
    on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable.
Returns a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called asynchronously before the  
                  
                    TYPE:
                       | 
| on_end | Called asynchronously after the  
                  
                    TYPE:
                       | 
| on_error | Called asynchronously if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
    return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
    print(f"on start callback starts at {format_t(time.time())}")
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
    print(f"on end callback starts at {format_t(time.time())}")
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
      
with_types(
    *, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| input_type | The input type to bind to the  
                  
                    TYPE:
                       | 
| output_type | The output type to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_retry(
    *,
    retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
    wait_exponential_jitter: bool = True,
    exponential_jitter_params: ExponentialJitterParams | None = None,
    stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
| PARAMETER | DESCRIPTION | 
|---|---|
| retry_if_exception_type | A tuple of exception types to retry on. 
                  
                    TYPE:
                       | 
| wait_exponential_jitter | Whether to add jitter to the wait time between retries. 
                  
                    TYPE:
                       | 
| stop_after_attempt | The maximum number of attempts to make before giving up. 
                  
                    TYPE:
                       | 
| exponential_jitter_params | Parameters for
 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new Runnable that retries the original Runnable on exceptions. | 
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
        pass
runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass
assert count == 2
      
    
      
with_fallbacks(
    fallbacks: Sequence[Runnable[Input, Output]],
    *,
    exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
    exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable, returning a new Runnable.
The new Runnable will try the original Runnable, and then each fallback
in order, upon failures.
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""
def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
)
print("".join(runnable.stream({})))  # foo bar
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
      
as_tool(
    args_schema: type[BaseModel] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    arg_types: dict[str, type] | None = None,
) -> BaseTool
Create a BaseTool from a Runnable.
as_tool will instantiate a BaseTool with a name, description, and
args_schema from a Runnable. Where possible, schemas are inferred
from runnable.get_input_schema. Alternatively (e.g., if the
Runnable takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema. You can also
pass arg_types to just specify the required arguments and their types.
| PARAMETER | DESCRIPTION | 
|---|---|
| args_schema | The schema for the tool. | 
| name | The name of the tool. 
                  
                    TYPE:
                       | 
| description | The description of the tool. 
                  
                    TYPE:
                       | 
| arg_types | A dictionary of argument names to types. | 
| RETURNS | DESCRIPTION | 
|---|---|
| BaseTool | A  | 
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
    a: int
    b: list[int]
def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via args_schema:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""
    a: int = Field(..., description="Integer")
    b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via arg_types:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
    
              Bases: Runnable[Input, Output]
RunnableLambda converts a python callable into a Runnable.
Wrapping a callable in a RunnableLambda makes the callable usable
within either a sync or async context.
RunnableLambda can be composed as any other Runnable and provides
seamless integration with LangChain tracing.
RunnableLambda is best suited for code that does not need to support
streaming. If you need to support streaming (i.e., be able to operate
on chunks of inputs and yield chunks of outputs), use RunnableGenerator
instead.
Note that if a RunnableLambda returns an instance of Runnable, that
instance is invoked (or streamed) during execution.
Examples:
# This is a RunnableLambda
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
    return x + 1
runnable = RunnableLambda(add_one)
runnable.invoke(1)  # returns 2
runnable.batch([1, 2, 3])  # returns [2, 3, 4]
# Async is supported by default by delegating to the sync implementation
await runnable.ainvoke(1)  # returns 2
await runnable.abatch([1, 2, 3])  # returns [2, 3, 4]
# Alternatively, can provide both synd and sync implementations
async def add_one_async(x: int) -> int:
    return x + 1
runnable = RunnableLambda(add_one, afunc=add_one_async)
runnable.invoke(1)  # Uses add_one
await runnable.ainvoke(1)  # Uses add_one_async
| METHOD | DESCRIPTION | 
|---|---|
| __init__ | Create a  | 
| get_input_schema | The Pydantic schema for the input to this  | 
| get_output_schema | Get a Pydantic model that can be used to validate output to the  | 
| get_graph | Return a graph representation of this  | 
| __repr__ | Return a string representation of this  | 
| invoke | Invoke this  | 
| ainvoke | Invoke this  | 
| transform | Transform inputs to outputs. | 
| stream | Default implementation of  | 
| atransform | Transform inputs to outputs. | 
| astream | Default implementation of  | 
| get_name | Get the name of the  | 
| get_input_jsonschema | Get a JSON schema that represents the input to the  | 
| get_output_jsonschema | Get a JSON schema that represents the output of the  | 
| config_schema | The type of config this  | 
| get_config_jsonschema | Get a JSON schema that represents the config of the  | 
| get_prompts | Return a list of prompts used by this  | 
| __or__ | Runnable "or" operator. | 
| __ror__ | Runnable "reverse-or" operator. | 
| pipe | Pipe  | 
| pick | Pick keys from the output  | 
| assign | Assigns new fields to the  | 
| batch | Default implementation runs invoke in parallel using a thread pool executor. | 
| batch_as_completed | Run  | 
| abatch | Default implementation runs  | 
| abatch_as_completed | Run  | 
| astream_log | Stream all output from a  | 
| astream_events | Generate a stream of events. | 
| bind | Bind arguments to a  | 
| with_config | Bind config to a  | 
| with_listeners | Bind lifecycle listeners to a  | 
| with_alisteners | Bind async lifecycle listeners to a  | 
| with_types | Bind input and output types to a  | 
| with_retry | Create a new  | 
| map | Return a new  | 
| with_fallbacks | Add fallbacks to a  | 
| as_tool | Create a  | 
property
  
¶
OutputType: Any
The type of the output of this Runnable as a type annotation.
| RETURNS | DESCRIPTION | 
|---|---|
| Any | The type of the output of this  | 
cached
      property
  
¶
    
property
  
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable.
property
  
¶
    The type of input this Runnable accepts specified as a Pydantic model.
property
  
¶
    Output schema.
The type of output this Runnable produces specified as a Pydantic model.
      
__init__(
    func: Callable[[Input], Iterator[Output]]
    | Callable[[Input], Runnable[Input, Output]]
    | Callable[[Input], Output]
    | Callable[[Input, RunnableConfig], Output]
    | Callable[[Input, CallbackManagerForChainRun], Output]
    | Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]
    | Callable[[Input], Awaitable[Output]]
    | Callable[[Input], AsyncIterator[Output]]
    | Callable[[Input, RunnableConfig], Awaitable[Output]]
    | Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]]
    | Callable[
        [Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]
    ],
    afunc: Callable[[Input], Awaitable[Output]]
    | Callable[[Input], AsyncIterator[Output]]
    | Callable[[Input, RunnableConfig], Awaitable[Output]]
    | Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]]
    | Callable[
        [Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]
    ]
    | None = None,
    name: str | None = None,
) -> None
Create a RunnableLambda from a callable, and async callable or both.
Accepts both sync and async variants to allow providing efficient implementations for sync and async execution.
| PARAMETER | DESCRIPTION | 
|---|---|
| func | Either sync or async callable 
                  
                    TYPE:
                       | 
| afunc | An async callable that takes an input and returns an output. 
                  
                    TYPE:
                       | 
| name | The name of the  
                  
                    TYPE:
                       | 
| RAISES | DESCRIPTION | 
|---|---|
| TypeError | If the  | 
| TypeError | If both  | 
      
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
The Pydantic schema for the input to this Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to use. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | The input schema for this  | 
      
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate output to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic output schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an output schema for a specific configuration.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate output. | 
      
get_graph(config: RunnableConfig | None = None) -> Graph
Return a graph representation of this Runnable.
      
invoke(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Output
Invoke this Runnable synchronously.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to this  
                  
                    TYPE:
                       | 
| config | The config to use. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of this  | 
| RAISES | DESCRIPTION | 
|---|---|
| TypeError | If the  | 
      
async
  
¶
ainvoke(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Output
Invoke this Runnable asynchronously.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to this  
                  
                    TYPE:
                       | 
| config | The config to use. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of this  | 
      
transform(
    input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
stream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Default implementation of stream, which calls invoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
atransform(
    input: AsyncIterator[Input],
    config: RunnableConfig | None = None,
    **kwargs: Any | None,
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An async iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
async
  
¶
astream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
Default implementation of astream, which calls ainvoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
    
      
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the input to the  | 
Example
Added in version 0.3.0
      
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the output of the  | 
Example
Added in version 0.3.0
      
    The type of config this Runnable accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives methods.
| PARAMETER | DESCRIPTION | 
|---|---|
| include | A list of fields to include in the config schema. | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate config. | 
      
    
      
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable.
      
__or__(
    other: Runnable[Any, Other]
    | Callable[[Iterator[Any]], Iterator[Other]]
    | Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
    | Callable[[Any], Other]
    | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
__ror__(
    other: Runnable[Other, Any]
    | Callable[[Iterator[Other]], Iterator[Any]]
    | Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
    | Callable[[Other], Any]
    | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Other, Output] | A new  | 
      
pipe(
    *others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable objects.
Compose this Runnable with Runnable-like objects to make a
RunnableSequence.
Equivalent to RunnableSequence(self, *others) or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
    return x + 1
def mul_two(x: int) -> int:
    return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| PARAMETER | DESCRIPTION | 
|---|---|
| *others | Other  
                  
                    TYPE:
                       | 
| name | An optional name for the resulting  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
    Pick keys from the output dict of this Runnable.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| PARAMETER | DESCRIPTION | 
|---|---|
| keys | A key or list of keys to pick from the output dict. | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | a new  | 
      
assign(
    **kwargs: Runnable[dict[str, Any], Any]
    | Callable[[dict[str, Any]], Any]
    | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | A mapping of keys to  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | A new  | 
      
batch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
batch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
Run invoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| tuple[int, Output | Exception] | Tuples of the index of the input and the output from the  | 
      
async
  
¶
abatch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
async
  
¶
abatch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[tuple[int, Output | Exception]] | A tuple of the index of the input and the output from the  | 
      
async
  
¶
astream_log(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    diff: bool = True,
    with_streamed_output_list: bool = True,
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| diff | Whether to yield diffs between each step or the current state. 
                  
                    TYPE:
                       | 
| with_streamed_output_list | Whether to yield the  
                  
                    TYPE:
                       | 
| include_names | Only include logs with these names. | 
| include_types | Only include logs with these types. | 
| include_tags | Only include logs with these tags. | 
| exclude_names | Exclude logs with these names. | 
| exclude_types | Exclude logs with these types. | 
| exclude_tags | Exclude logs with these tags. | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[RunLogPatch] | AsyncIterator[RunLog] | A  | 
      
async
  
¶
astream_events(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    version: Literal["v1", "v2"] = "v2",
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent that provide real-time information
about the progress of the Runnable, including StreamEvent from intermediate
results.
A StreamEvent is a dictionary with the following schema:
- event: Event names are of the format:- on_[runnable_type]_(start|stream|end).
- name: The name of the- Runnablethat generated the event.
- run_id: Randomly generated ID associated with the given execution of the- Runnablethat emitted the event. A child- Runnablethat gets invoked as part of the execution of a parent- Runnableis assigned its own unique ID.
- parent_ids: The IDs of the parent runnables that generated the event. The root- Runnablewill have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
- tags: The tags of the- Runnablethat generated the event.
- metadata: The metadata of the- Runnablethat generated the event.
- data: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
| event | name | chunk | input | output | 
|---|---|---|---|---|
| on_chat_model_start | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | ||
| on_chat_model_stream | '[model name]' | AIMessageChunk(content="hello") | ||
| on_chat_model_end | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | AIMessageChunk(content="hello world") | |
| on_llm_start | '[model name]' | {'input': 'hello'} | ||
| on_llm_stream | '[model name]' | 'Hello' | ||
| on_llm_end | '[model name]' | 'Hello human!' | ||
| on_chain_start | 'format_docs' | |||
| on_chain_stream | 'format_docs' | 'hello world!, goodbye world!' | ||
| on_chain_end | 'format_docs' | [Document(...)] | 'hello world!, goodbye world!' | |
| on_tool_start | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_tool_end | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_retriever_start | '[retriever name]' | {"query": "hello"} | ||
| on_retriever_end | '[retriever name]' | {"query": "hello"} | [Document(...), ..] | |
| on_prompt_start | '[template_name]' | {"question": "hello"} | ||
| on_prompt_end | '[template_name]' | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage, ...]) | 
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
| Attribute | Type | Description | 
|---|---|---|
| name | str | A user defined name for the event. | 
| data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. | 
Here are declarations associated with the standard events shown above:
format_docs:
def format_docs(docs: list[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are Cat Agent 007"),
        ("human", "{question}"),
    ]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
    return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]
from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| version | The version of the schema to use either  
                  
                    TYPE:
                       | 
| include_names | Only include events from  | 
| include_types | Only include events from  | 
| include_tags | Only include events from  | 
| exclude_names | Exclude events from  | 
| exclude_types | Exclude events from  | 
| exclude_tags | Exclude events from  | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[StreamEvent] | An async stream of  | 
| RAISES | DESCRIPTION | 
|---|---|
| NotImplementedError | If the version is not  | 
      
    Bind arguments to a Runnable, returning a new Runnable.
Useful when a Runnable in a chain requires an argument that is not
in the output of the previous Runnable or included in the user input.
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | The arguments to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
      
with_config(
    config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to bind to the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_listeners(
    *,
    on_start: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
    on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
    on_error: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable, returning a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called before the  
                  
                    TYPE:
                       | 
| on_end | Called after the  
                  
                    TYPE:
                       | 
| on_error | Called if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
    time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
      
with_alisteners(
    *,
    on_start: AsyncListener | None = None,
    on_end: AsyncListener | None = None,
    on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable.
Returns a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called asynchronously before the  
                  
                    TYPE:
                       | 
| on_end | Called asynchronously after the  
                  
                    TYPE:
                       | 
| on_error | Called asynchronously if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
    return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
    print(f"on start callback starts at {format_t(time.time())}")
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
    print(f"on end callback starts at {format_t(time.time())}")
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
      
with_types(
    *, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| input_type | The input type to bind to the  
                  
                    TYPE:
                       | 
| output_type | The output type to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_retry(
    *,
    retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
    wait_exponential_jitter: bool = True,
    exponential_jitter_params: ExponentialJitterParams | None = None,
    stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
| PARAMETER | DESCRIPTION | 
|---|---|
| retry_if_exception_type | A tuple of exception types to retry on. 
                  
                    TYPE:
                       | 
| wait_exponential_jitter | Whether to add jitter to the wait time between retries. 
                  
                    TYPE:
                       | 
| stop_after_attempt | The maximum number of attempts to make before giving up. 
                  
                    TYPE:
                       | 
| exponential_jitter_params | Parameters for
 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new Runnable that retries the original Runnable on exceptions. | 
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
        pass
runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass
assert count == 2
      
    
      
with_fallbacks(
    fallbacks: Sequence[Runnable[Input, Output]],
    *,
    exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
    exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable, returning a new Runnable.
The new Runnable will try the original Runnable, and then each fallback
in order, upon failures.
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""
def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
)
print("".join(runnable.stream({})))  # foo bar
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
      
as_tool(
    args_schema: type[BaseModel] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    arg_types: dict[str, type] | None = None,
) -> BaseTool
Create a BaseTool from a Runnable.
as_tool will instantiate a BaseTool with a name, description, and
args_schema from a Runnable. Where possible, schemas are inferred
from runnable.get_input_schema. Alternatively (e.g., if the
Runnable takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema. You can also
pass arg_types to just specify the required arguments and their types.
| PARAMETER | DESCRIPTION | 
|---|---|
| args_schema | The schema for the tool. | 
| name | The name of the tool. 
                  
                    TYPE:
                       | 
| description | The description of the tool. 
                  
                    TYPE:
                       | 
| arg_types | A dictionary of argument names to types. | 
| RETURNS | DESCRIPTION | 
|---|---|
| BaseTool | A  | 
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
    a: int
    b: list[int]
def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via args_schema:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""
    a: int = Field(..., description="Integer")
    b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via arg_types:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
    
              Bases: RunnableSerializable[Input, dict[str, Any]]
Runnable that runs a mapping of Runnables in parallel.
Returns a mapping of their outputs.
RunnableParallel is one of the two main composition primitives,
alongside RunnableSequence. It invokes Runnables concurrently, providing the
same input to each.
A RunnableParallel can be instantiated directly or by using a dict literal
within a sequence.
Here is a simple example that uses functions to illustrate the use of
RunnableParallel:
```python
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
    return x + 1
def mul_two(x: int) -> int:
    return x * 2
def mul_three(x: int) -> int:
    return x * 3
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
runnable_3 = RunnableLambda(mul_three)
sequence = runnable_1 | {  # this dict is coerced to a RunnableParallel
    "mul_two": runnable_2,
    "mul_three": runnable_3,
}
# Or equivalently:
# sequence = runnable_1 | RunnableParallel(
#     {"mul_two": runnable_2, "mul_three": runnable_3}
# )
# Also equivalently:
# sequence = runnable_1 | RunnableParallel(
#     mul_two=runnable_2,
#     mul_three=runnable_3,
# )
sequence.invoke(1)
await sequence.ainvoke(1)
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
```
RunnableParallel makes it easy to run Runnables in parallel. In the below
example, we simultaneously stream output from two different Runnable objects:
```python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
joke_chain = (
    ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
)
poem_chain = (
    ChatPromptTemplate.from_template("write a 2-line poem about {topic}")
    | model
)
runnable = RunnableParallel(joke=joke_chain, poem=poem_chain)
# Display stream
output = {key: "" for key, _ in runnable.output_schema()}
for chunk in runnable.stream({"topic": "bear"}):
    for key in chunk:
        output[key] = output[key] + chunk[key].content
    print(output)  # noqa: T201
```
| METHOD | DESCRIPTION | 
|---|---|
| __init__ | Create a  | 
| is_lc_serializable | Return True as this class is serializable. | 
| get_lc_namespace | Get the namespace of the LangChain object. | 
| get_name | Get the name of the  | 
| get_input_schema | Get the input schema of the  | 
| get_output_schema | Get the output schema of the  | 
| get_graph | Get the graph representation of the  | 
| invoke | Transform a single input into an output. | 
| ainvoke | Transform a single input into an output. | 
| transform | Transform inputs to outputs. | 
| stream | Default implementation of  | 
| atransform | Transform inputs to outputs. | 
| astream | Default implementation of  | 
| get_input_jsonschema | Get a JSON schema that represents the input to the  | 
| get_output_jsonschema | Get a JSON schema that represents the output of the  | 
| config_schema | The type of config this  | 
| get_config_jsonschema | Get a JSON schema that represents the config of the  | 
| get_prompts | Return a list of prompts used by this  | 
| __or__ | Runnable "or" operator. | 
| __ror__ | Runnable "reverse-or" operator. | 
| pipe | Pipe  | 
| pick | Pick keys from the output  | 
| assign | Assigns new fields to the  | 
| batch | Default implementation runs invoke in parallel using a thread pool executor. | 
| batch_as_completed | Run  | 
| abatch | Default implementation runs  | 
| abatch_as_completed | Run  | 
| astream_log | Stream all output from a  | 
| astream_events | Generate a stream of events. | 
| bind | Bind arguments to a  | 
| with_config | Bind config to a  | 
| with_listeners | Bind lifecycle listeners to a  | 
| with_alisteners | Bind async lifecycle listeners to a  | 
| with_types | Bind input and output types to a  | 
| with_retry | Create a new  | 
| map | Return a new  | 
| with_fallbacks | Add fallbacks to a  | 
| as_tool | Create a  | 
| lc_id | Return a unique identifier for this class for serialization purposes. | 
| to_json | Serialize the  | 
| to_json_not_implemented | Serialize a "not implemented" object. | 
| configurable_fields | Configure particular  | 
| configurable_alternatives | Configure alternatives for  | 
property
  
¶
config_specs: list[ConfigurableFieldSpec]
Get the config specs of the Runnable.
| RETURNS | DESCRIPTION | 
|---|---|
| list[ConfigurableFieldSpec] | The config specs of the  | 
class-attribute
      instance-attribute
  
¶
name: str | None = None
The name of the Runnable. Used for debugging and tracing.
property
  
¶
OutputType: type[Output]
Output Type.
The type of output this Runnable produces specified as a type annotation.
| RAISES | DESCRIPTION | 
|---|---|
| TypeError | If the output type cannot be inferred. | 
property
  
¶
    The type of input this Runnable accepts specified as a Pydantic model.
property
  
¶
    Output schema.
The type of output this Runnable produces specified as a Pydantic model.
property
  
¶
    A map of constructor argument names to secret ids.
For example, {"openai_api_key": "OPENAI_API_KEY"}
property
  
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
Default is an empty dictionary.
      
__init__(
    steps__: Mapping[
        str,
        Runnable[Input, Any]
        | Callable[[Input], Any]
        | Mapping[str, Runnable[Input, Any] | Callable[[Input], Any]],
    ]
    | None = None,
    **kwargs: Runnable[Input, Any]
    | Callable[[Input], Any]
    | Mapping[str, Runnable[Input, Any] | Callable[[Input], Any]],
) -> None
Create a RunnableParallel.
| PARAMETER | DESCRIPTION | 
|---|---|
| steps__ | The steps to include. 
                  
                    TYPE:
                       | 
| **kwargs | Additional steps to include. 
                  
                    TYPE:
                       | 
      
classmethod
  
¶
is_lc_serializable() -> bool
Return True as this class is serializable.
      
classmethod
  
¶
    
      
    
      
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get the input schema of the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to use. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | The input schema of the  | 
      
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get the output schema of the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to use. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | The output schema of the  | 
      
get_graph(config: RunnableConfig | None = None) -> Graph
Get the graph representation of the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to use. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Graph | The graph representation of the  | 
| RAISES | DESCRIPTION | 
|---|---|
| ValueError | If a  | 
      
invoke(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any
) -> dict[str, Any]
Transform a single input into an output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
ainvoke(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> dict[str, Any]
Transform a single input into an output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
transform(
    input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any
) -> Iterator[dict[str, Any]]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
stream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[dict[str, Any]]
Default implementation of stream, which calls invoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
atransform(
    input: AsyncIterator[Input], config: RunnableConfig | None = None, **kwargs: Any
) -> AsyncIterator[dict[str, Any]]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An async iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
async
  
¶
astream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[dict[str, Any]]
Default implementation of astream, which calls ainvoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the input to the  | 
Example
Added in version 0.3.0
      
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the output of the  | 
Example
Added in version 0.3.0
      
    The type of config this Runnable accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives methods.
| PARAMETER | DESCRIPTION | 
|---|---|
| include | A list of fields to include in the config schema. | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate config. | 
      
    
      
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable.
      
__or__(
    other: Runnable[Any, Other]
    | Callable[[Iterator[Any]], Iterator[Other]]
    | Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
    | Callable[[Any], Other]
    | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
__ror__(
    other: Runnable[Other, Any]
    | Callable[[Iterator[Other]], Iterator[Any]]
    | Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
    | Callable[[Other], Any]
    | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Other, Output] | A new  | 
      
pipe(
    *others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable objects.
Compose this Runnable with Runnable-like objects to make a
RunnableSequence.
Equivalent to RunnableSequence(self, *others) or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
    return x + 1
def mul_two(x: int) -> int:
    return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| PARAMETER | DESCRIPTION | 
|---|---|
| *others | Other  
                  
                    TYPE:
                       | 
| name | An optional name for the resulting  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
    Pick keys from the output dict of this Runnable.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| PARAMETER | DESCRIPTION | 
|---|---|
| keys | A key or list of keys to pick from the output dict. | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | a new  | 
      
assign(
    **kwargs: Runnable[dict[str, Any], Any]
    | Callable[[dict[str, Any]], Any]
    | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | A mapping of keys to  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | A new  | 
      
batch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
batch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
Run invoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| tuple[int, Output | Exception] | Tuples of the index of the input and the output from the  | 
      
async
  
¶
abatch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
async
  
¶
abatch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[tuple[int, Output | Exception]] | A tuple of the index of the input and the output from the  | 
      
async
  
¶
astream_log(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    diff: bool = True,
    with_streamed_output_list: bool = True,
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| diff | Whether to yield diffs between each step or the current state. 
                  
                    TYPE:
                       | 
| with_streamed_output_list | Whether to yield the  
                  
                    TYPE:
                       | 
| include_names | Only include logs with these names. | 
| include_types | Only include logs with these types. | 
| include_tags | Only include logs with these tags. | 
| exclude_names | Exclude logs with these names. | 
| exclude_types | Exclude logs with these types. | 
| exclude_tags | Exclude logs with these tags. | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[RunLogPatch] | AsyncIterator[RunLog] | A  | 
      
async
  
¶
astream_events(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    version: Literal["v1", "v2"] = "v2",
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent that provide real-time information
about the progress of the Runnable, including StreamEvent from intermediate
results.
A StreamEvent is a dictionary with the following schema:
- event: Event names are of the format:- on_[runnable_type]_(start|stream|end).
- name: The name of the- Runnablethat generated the event.
- run_id: Randomly generated ID associated with the given execution of the- Runnablethat emitted the event. A child- Runnablethat gets invoked as part of the execution of a parent- Runnableis assigned its own unique ID.
- parent_ids: The IDs of the parent runnables that generated the event. The root- Runnablewill have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
- tags: The tags of the- Runnablethat generated the event.
- metadata: The metadata of the- Runnablethat generated the event.
- data: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
| event | name | chunk | input | output | 
|---|---|---|---|---|
| on_chat_model_start | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | ||
| on_chat_model_stream | '[model name]' | AIMessageChunk(content="hello") | ||
| on_chat_model_end | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | AIMessageChunk(content="hello world") | |
| on_llm_start | '[model name]' | {'input': 'hello'} | ||
| on_llm_stream | '[model name]' | 'Hello' | ||
| on_llm_end | '[model name]' | 'Hello human!' | ||
| on_chain_start | 'format_docs' | |||
| on_chain_stream | 'format_docs' | 'hello world!, goodbye world!' | ||
| on_chain_end | 'format_docs' | [Document(...)] | 'hello world!, goodbye world!' | |
| on_tool_start | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_tool_end | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_retriever_start | '[retriever name]' | {"query": "hello"} | ||
| on_retriever_end | '[retriever name]' | {"query": "hello"} | [Document(...), ..] | |
| on_prompt_start | '[template_name]' | {"question": "hello"} | ||
| on_prompt_end | '[template_name]' | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage, ...]) | 
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
| Attribute | Type | Description | 
|---|---|---|
| name | str | A user defined name for the event. | 
| data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. | 
Here are declarations associated with the standard events shown above:
format_docs:
def format_docs(docs: list[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are Cat Agent 007"),
        ("human", "{question}"),
    ]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
    return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]
from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| version | The version of the schema to use either  
                  
                    TYPE:
                       | 
| include_names | Only include events from  | 
| include_types | Only include events from  | 
| include_tags | Only include events from  | 
| exclude_names | Exclude events from  | 
| exclude_types | Exclude events from  | 
| exclude_tags | Exclude events from  | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[StreamEvent] | An async stream of  | 
| RAISES | DESCRIPTION | 
|---|---|
| NotImplementedError | If the version is not  | 
      
    Bind arguments to a Runnable, returning a new Runnable.
Useful when a Runnable in a chain requires an argument that is not
in the output of the previous Runnable or included in the user input.
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | The arguments to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
      
with_config(
    config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to bind to the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_listeners(
    *,
    on_start: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
    on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
    on_error: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable, returning a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called before the  
                  
                    TYPE:
                       | 
| on_end | Called after the  
                  
                    TYPE:
                       | 
| on_error | Called if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
    time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
      
with_alisteners(
    *,
    on_start: AsyncListener | None = None,
    on_end: AsyncListener | None = None,
    on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable.
Returns a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called asynchronously before the  
                  
                    TYPE:
                       | 
| on_end | Called asynchronously after the  
                  
                    TYPE:
                       | 
| on_error | Called asynchronously if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
    return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
    print(f"on start callback starts at {format_t(time.time())}")
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
    print(f"on end callback starts at {format_t(time.time())}")
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
      
with_types(
    *, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| input_type | The input type to bind to the  
                  
                    TYPE:
                       | 
| output_type | The output type to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_retry(
    *,
    retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
    wait_exponential_jitter: bool = True,
    exponential_jitter_params: ExponentialJitterParams | None = None,
    stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
| PARAMETER | DESCRIPTION | 
|---|---|
| retry_if_exception_type | A tuple of exception types to retry on. 
                  
                    TYPE:
                       | 
| wait_exponential_jitter | Whether to add jitter to the wait time between retries. 
                  
                    TYPE:
                       | 
| stop_after_attempt | The maximum number of attempts to make before giving up. 
                  
                    TYPE:
                       | 
| exponential_jitter_params | Parameters for
 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new Runnable that retries the original Runnable on exceptions. | 
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
        pass
runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass
assert count == 2
      
    
      
with_fallbacks(
    fallbacks: Sequence[Runnable[Input, Output]],
    *,
    exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
    exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable, returning a new Runnable.
The new Runnable will try the original Runnable, and then each fallback
in order, upon failures.
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""
def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
)
print("".join(runnable.stream({})))  # foo bar
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
      
as_tool(
    args_schema: type[BaseModel] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    arg_types: dict[str, type] | None = None,
) -> BaseTool
Create a BaseTool from a Runnable.
as_tool will instantiate a BaseTool with a name, description, and
args_schema from a Runnable. Where possible, schemas are inferred
from runnable.get_input_schema. Alternatively (e.g., if the
Runnable takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema. You can also
pass arg_types to just specify the required arguments and their types.
| PARAMETER | DESCRIPTION | 
|---|---|
| args_schema | The schema for the tool. | 
| name | The name of the tool. 
                  
                    TYPE:
                       | 
| description | The description of the tool. 
                  
                    TYPE:
                       | 
| arg_types | A dictionary of argument names to types. | 
| RETURNS | DESCRIPTION | 
|---|---|
| BaseTool | A  | 
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
    a: int
    b: list[int]
def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via args_schema:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""
    a: int = Field(..., description="Integer")
    b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via arg_types:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
      
classmethod
  
¶
    Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
For example, for the class langchain.llms.openai.OpenAI, the id is
["langchain", "llms", "openai", "OpenAI"].
      
    Serialize the Runnable to JSON.
| RETURNS | DESCRIPTION | 
|---|---|
| SerializedConstructor | SerializedNotImplemented | A JSON-serializable representation of the  | 
      
    Serialize a "not implemented" object.
| RETURNS | DESCRIPTION | 
|---|---|
| SerializedNotImplemented | 
 | 
      
configurable_fields(
    **kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
Configure particular Runnable fields at runtime.
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | A dictionary of  
                  
                    TYPE:
                       | 
| RAISES | DESCRIPTION | 
|---|---|
| ValueError | If a configuration key is not found in the  | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Output] | A new  | 
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
    "max_tokens_200: ",
    model.with_config(configurable={"output_token_number": 200})
    .invoke("tell me something about chess")
    .content,
)
      
configurable_alternatives(
    which: ConfigurableField,
    *,
    default_key: str = "default",
    prefix_keys: bool = False,
    **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnable objects that can be set at runtime.
| PARAMETER | DESCRIPTION | 
|---|---|
| which | The  
                  
                    TYPE:
                       | 
| default_key | The default key to use if no alternative is selected. 
                  
                    TYPE:
                       | 
| prefix_keys | Whether to prefix the keys with the  
                  
                    TYPE:
                       | 
| **kwargs | A dictionary of keys to  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Output] | A new  | 
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
    model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
    model.with_config(configurable={"llm": "openai"})
    .invoke("which organization created you?")
    .content
)
    
              Bases: RunnableSerializable[Input, Output]
Sequence of Runnable objects, where the output of one is the input of the next.
RunnableSequence is the most important composition operator in LangChain
as it is used in virtually every chain.
A RunnableSequence can be instantiated directly or more commonly by using the
| operator where either the left or right operands (or both) must be a
Runnable.
Any RunnableSequence automatically supports sync, async, batch.
The default implementations of batch and abatch utilize threadpools and
asyncio gather and will be faster than naive invocation of invoke or ainvoke
for IO bound Runnables.
Batching is implemented by invoking the batch method on each component of the
RunnableSequence in order.
A RunnableSequence preserves the streaming properties of its components, so if
all components of the sequence implement a transform method -- which
is the method that implements the logic to map a streaming input to a streaming
output -- then the sequence will be able to stream input to output!
If any component of the sequence does not implement transform then the streaming will only begin after this component is run. If there are multiple blocking components, streaming begins after the last one.
Note
RunnableLambdas do not support transform by default! So if you need to
use a RunnableLambdas be careful about where you place them in a
RunnableSequence (if you need to use the stream/astream methods).
If you need arbitrary logic and need streaming, you can subclass
Runnable, and implement transform for whatever logic you need.
Here is a simple example that uses simple functions to illustrate the use of
RunnableSequence:
```python
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
    return x + 1
def mul_two(x: int) -> int:
    return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1 | runnable_2
# Or equivalently:
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
```
Here's an example that uses streams JSON output generated by an LLM:
```python
from langchain_core.output_parsers.json import SimpleJsonOutputParser
from langchain_openai import ChatOpenAI
prompt = PromptTemplate.from_template(
    "In JSON format, give me a list of {topic} and their "
    "corresponding names in French, Spanish and in a "
    "Cat Language."
)
model = ChatOpenAI()
chain = prompt | model | SimpleJsonOutputParser()
async for chunk in chain.astream({"topic": "colors"}):
    print("-")  # noqa: T201
    print(chunk, sep="", flush=True)  # noqa: T201
```
| METHOD | DESCRIPTION | 
|---|---|
| __init__ | Create a new  | 
| get_lc_namespace | Get the namespace of the LangChain object. | 
| is_lc_serializable | Return True as this class is serializable. | 
| get_input_schema | Get the input schema of the  | 
| get_output_schema | Get the output schema of the  | 
| get_graph | Get the graph representation of the  | 
| __or__ | Runnable "or" operator. | 
| __ror__ | Runnable "reverse-or" operator. | 
| invoke | Transform a single input into an output. | 
| ainvoke | Transform a single input into an output. | 
| batch | Default implementation runs invoke in parallel using a thread pool executor. | 
| abatch | Default implementation runs  | 
| transform | Transform inputs to outputs. | 
| stream | Default implementation of  | 
| atransform | Transform inputs to outputs. | 
| astream | Default implementation of  | 
| get_name | Get the name of the  | 
| get_input_jsonschema | Get a JSON schema that represents the input to the  | 
| get_output_jsonschema | Get a JSON schema that represents the output of the  | 
| config_schema | The type of config this  | 
| get_config_jsonschema | Get a JSON schema that represents the config of the  | 
| get_prompts | Return a list of prompts used by this  | 
| pipe | Pipe  | 
| pick | Pick keys from the output  | 
| assign | Assigns new fields to the  | 
| batch_as_completed | Run  | 
| abatch_as_completed | Run  | 
| astream_log | Stream all output from a  | 
| astream_events | Generate a stream of events. | 
| bind | Bind arguments to a  | 
| with_config | Bind config to a  | 
| with_listeners | Bind lifecycle listeners to a  | 
| with_alisteners | Bind async lifecycle listeners to a  | 
| with_types | Bind input and output types to a  | 
| with_retry | Create a new  | 
| map | Return a new  | 
| with_fallbacks | Add fallbacks to a  | 
| as_tool | Create a  | 
| lc_id | Return a unique identifier for this class for serialization purposes. | 
| to_json | Serialize the  | 
| to_json_not_implemented | Serialize a "not implemented" object. | 
| configurable_fields | Configure particular  | 
| configurable_alternatives | Configure alternatives for  | 
class-attribute
      instance-attribute
  
¶
    The middle Runnable in the sequence.
property
  
¶
    
property
  
¶
config_specs: list[ConfigurableFieldSpec]
Get the config specs of the Runnable.
| RETURNS | DESCRIPTION | 
|---|---|
| list[ConfigurableFieldSpec] | The config specs of the  | 
class-attribute
      instance-attribute
  
¶
name: str | None = None
The name of the Runnable. Used for debugging and tracing.
property
  
¶
    The type of input this Runnable accepts specified as a Pydantic model.
property
  
¶
    Output schema.
The type of output this Runnable produces specified as a Pydantic model.
property
  
¶
    A map of constructor argument names to secret ids.
For example, {"openai_api_key": "OPENAI_API_KEY"}
property
  
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
Default is an empty dictionary.
      
__init__(
    *steps: RunnableLike,
    name: str | None = None,
    first: Runnable[Any, Any] | None = None,
    middle: list[Runnable[Any, Any]] | None = None,
    last: Runnable[Any, Any] | None = None,
) -> None
Create a new RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| steps | The steps to include in the sequence. 
                  
                    TYPE:
                       | 
| name | The name of the  
                  
                    TYPE:
                       | 
| first | The first  | 
| middle | The middle  | 
| last | The last Runnable in the sequence. | 
| RAISES | DESCRIPTION | 
|---|---|
| ValueError | If the sequence has less than 2 steps. | 
      
classmethod
  
¶
    
      
classmethod
  
¶
is_lc_serializable() -> bool
Return True as this class is serializable.
      
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get the input schema of the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to use. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | The input schema of the  | 
      
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get the output schema of the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to use. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | The output schema of the  | 
      
get_graph(config: RunnableConfig | None = None) -> Graph
Get the graph representation of the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to use. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Graph | The graph representation of the  | 
| RAISES | DESCRIPTION | 
|---|---|
| ValueError | If a  | 
      
__or__(
    other: Runnable[Any, Other]
    | Callable[[Iterator[Any]], Iterator[Other]]
    | Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
    | Callable[[Any], Other]
    | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
__ror__(
    other: Runnable[Other, Any]
    | Callable[[Iterator[Other]], Iterator[Any]]
    | Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
    | Callable[[Other], Any]
    | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Other, Output] | A new  | 
      
invoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
Transform a single input into an output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
ainvoke(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Output
Transform a single input into an output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
batch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
async
  
¶
abatch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
transform(
    input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
stream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Default implementation of stream, which calls invoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
atransform(
    input: AsyncIterator[Input],
    config: RunnableConfig | None = None,
    **kwargs: Any | None,
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An async iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
async
  
¶
astream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
Default implementation of astream, which calls ainvoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
    
      
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the input to the  | 
Example
Added in version 0.3.0
      
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the output of the  | 
Example
Added in version 0.3.0
      
    The type of config this Runnable accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives methods.
| PARAMETER | DESCRIPTION | 
|---|---|
| include | A list of fields to include in the config schema. | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate config. | 
      
    
      
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable.
      
pipe(
    *others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable objects.
Compose this Runnable with Runnable-like objects to make a
RunnableSequence.
Equivalent to RunnableSequence(self, *others) or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
    return x + 1
def mul_two(x: int) -> int:
    return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| PARAMETER | DESCRIPTION | 
|---|---|
| *others | Other  
                  
                    TYPE:
                       | 
| name | An optional name for the resulting  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
    Pick keys from the output dict of this Runnable.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| PARAMETER | DESCRIPTION | 
|---|---|
| keys | A key or list of keys to pick from the output dict. | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | a new  | 
      
assign(
    **kwargs: Runnable[dict[str, Any], Any]
    | Callable[[dict[str, Any]], Any]
    | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | A mapping of keys to  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | A new  | 
      
batch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
Run invoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| tuple[int, Output | Exception] | Tuples of the index of the input and the output from the  | 
      
async
  
¶
abatch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[tuple[int, Output | Exception]] | A tuple of the index of the input and the output from the  | 
      
async
  
¶
astream_log(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    diff: bool = True,
    with_streamed_output_list: bool = True,
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| diff | Whether to yield diffs between each step or the current state. 
                  
                    TYPE:
                       | 
| with_streamed_output_list | Whether to yield the  
                  
                    TYPE:
                       | 
| include_names | Only include logs with these names. | 
| include_types | Only include logs with these types. | 
| include_tags | Only include logs with these tags. | 
| exclude_names | Exclude logs with these names. | 
| exclude_types | Exclude logs with these types. | 
| exclude_tags | Exclude logs with these tags. | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[RunLogPatch] | AsyncIterator[RunLog] | A  | 
      
async
  
¶
astream_events(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    version: Literal["v1", "v2"] = "v2",
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent that provide real-time information
about the progress of the Runnable, including StreamEvent from intermediate
results.
A StreamEvent is a dictionary with the following schema:
- event: Event names are of the format:- on_[runnable_type]_(start|stream|end).
- name: The name of the- Runnablethat generated the event.
- run_id: Randomly generated ID associated with the given execution of the- Runnablethat emitted the event. A child- Runnablethat gets invoked as part of the execution of a parent- Runnableis assigned its own unique ID.
- parent_ids: The IDs of the parent runnables that generated the event. The root- Runnablewill have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
- tags: The tags of the- Runnablethat generated the event.
- metadata: The metadata of the- Runnablethat generated the event.
- data: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
| event | name | chunk | input | output | 
|---|---|---|---|---|
| on_chat_model_start | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | ||
| on_chat_model_stream | '[model name]' | AIMessageChunk(content="hello") | ||
| on_chat_model_end | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | AIMessageChunk(content="hello world") | |
| on_llm_start | '[model name]' | {'input': 'hello'} | ||
| on_llm_stream | '[model name]' | 'Hello' | ||
| on_llm_end | '[model name]' | 'Hello human!' | ||
| on_chain_start | 'format_docs' | |||
| on_chain_stream | 'format_docs' | 'hello world!, goodbye world!' | ||
| on_chain_end | 'format_docs' | [Document(...)] | 'hello world!, goodbye world!' | |
| on_tool_start | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_tool_end | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_retriever_start | '[retriever name]' | {"query": "hello"} | ||
| on_retriever_end | '[retriever name]' | {"query": "hello"} | [Document(...), ..] | |
| on_prompt_start | '[template_name]' | {"question": "hello"} | ||
| on_prompt_end | '[template_name]' | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage, ...]) | 
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
| Attribute | Type | Description | 
|---|---|---|
| name | str | A user defined name for the event. | 
| data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. | 
Here are declarations associated with the standard events shown above:
format_docs:
def format_docs(docs: list[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are Cat Agent 007"),
        ("human", "{question}"),
    ]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
    return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]
from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| version | The version of the schema to use either  
                  
                    TYPE:
                       | 
| include_names | Only include events from  | 
| include_types | Only include events from  | 
| include_tags | Only include events from  | 
| exclude_names | Exclude events from  | 
| exclude_types | Exclude events from  | 
| exclude_tags | Exclude events from  | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[StreamEvent] | An async stream of  | 
| RAISES | DESCRIPTION | 
|---|---|
| NotImplementedError | If the version is not  | 
      
    Bind arguments to a Runnable, returning a new Runnable.
Useful when a Runnable in a chain requires an argument that is not
in the output of the previous Runnable or included in the user input.
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | The arguments to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
      
with_config(
    config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to bind to the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_listeners(
    *,
    on_start: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
    on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
    on_error: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable, returning a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called before the  
                  
                    TYPE:
                       | 
| on_end | Called after the  
                  
                    TYPE:
                       | 
| on_error | Called if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
    time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
      
with_alisteners(
    *,
    on_start: AsyncListener | None = None,
    on_end: AsyncListener | None = None,
    on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable.
Returns a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called asynchronously before the  
                  
                    TYPE:
                       | 
| on_end | Called asynchronously after the  
                  
                    TYPE:
                       | 
| on_error | Called asynchronously if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
    return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
    print(f"on start callback starts at {format_t(time.time())}")
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
    print(f"on end callback starts at {format_t(time.time())}")
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
      
with_types(
    *, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| input_type | The input type to bind to the  
                  
                    TYPE:
                       | 
| output_type | The output type to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_retry(
    *,
    retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
    wait_exponential_jitter: bool = True,
    exponential_jitter_params: ExponentialJitterParams | None = None,
    stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
| PARAMETER | DESCRIPTION | 
|---|---|
| retry_if_exception_type | A tuple of exception types to retry on. 
                  
                    TYPE:
                       | 
| wait_exponential_jitter | Whether to add jitter to the wait time between retries. 
                  
                    TYPE:
                       | 
| stop_after_attempt | The maximum number of attempts to make before giving up. 
                  
                    TYPE:
                       | 
| exponential_jitter_params | Parameters for
 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new Runnable that retries the original Runnable on exceptions. | 
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
        pass
runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass
assert count == 2
      
    
      
with_fallbacks(
    fallbacks: Sequence[Runnable[Input, Output]],
    *,
    exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
    exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable, returning a new Runnable.
The new Runnable will try the original Runnable, and then each fallback
in order, upon failures.
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""
def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
)
print("".join(runnable.stream({})))  # foo bar
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
      
as_tool(
    args_schema: type[BaseModel] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    arg_types: dict[str, type] | None = None,
) -> BaseTool
Create a BaseTool from a Runnable.
as_tool will instantiate a BaseTool with a name, description, and
args_schema from a Runnable. Where possible, schemas are inferred
from runnable.get_input_schema. Alternatively (e.g., if the
Runnable takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema. You can also
pass arg_types to just specify the required arguments and their types.
| PARAMETER | DESCRIPTION | 
|---|---|
| args_schema | The schema for the tool. | 
| name | The name of the tool. 
                  
                    TYPE:
                       | 
| description | The description of the tool. 
                  
                    TYPE:
                       | 
| arg_types | A dictionary of argument names to types. | 
| RETURNS | DESCRIPTION | 
|---|---|
| BaseTool | A  | 
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
    a: int
    b: list[int]
def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via args_schema:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""
    a: int = Field(..., description="Integer")
    b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via arg_types:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
      
classmethod
  
¶
    Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
For example, for the class langchain.llms.openai.OpenAI, the id is
["langchain", "llms", "openai", "OpenAI"].
      
    Serialize the Runnable to JSON.
| RETURNS | DESCRIPTION | 
|---|---|
| SerializedConstructor | SerializedNotImplemented | A JSON-serializable representation of the  | 
      
    Serialize a "not implemented" object.
| RETURNS | DESCRIPTION | 
|---|---|
| SerializedNotImplemented | 
 | 
      
configurable_fields(
    **kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
Configure particular Runnable fields at runtime.
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | A dictionary of  
                  
                    TYPE:
                       | 
| RAISES | DESCRIPTION | 
|---|---|
| ValueError | If a configuration key is not found in the  | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Output] | A new  | 
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
    "max_tokens_200: ",
    model.with_config(configurable={"output_token_number": 200})
    .invoke("tell me something about chess")
    .content,
)
      
configurable_alternatives(
    which: ConfigurableField,
    *,
    default_key: str = "default",
    prefix_keys: bool = False,
    **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnable objects that can be set at runtime.
| PARAMETER | DESCRIPTION | 
|---|---|
| which | The  
                  
                    TYPE:
                       | 
| default_key | The default key to use if no alternative is selected. 
                  
                    TYPE:
                       | 
| prefix_keys | Whether to prefix the keys with the  
                  
                    TYPE:
                       | 
| **kwargs | A dictionary of keys to  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Output] | A new  | 
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
    model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
    model.with_config(configurable={"llm": "openai"})
    .invoke("which organization created you?")
    .content
)
    
              Bases: Serializable, Runnable[Input, Output]
Runnable that can be serialized to JSON.
| METHOD | DESCRIPTION | 
|---|---|
| to_json | Serialize the  | 
| configurable_fields | Configure particular  | 
| configurable_alternatives | Configure alternatives for  | 
| get_name | Get the name of the  | 
| get_input_schema | Get a Pydantic model that can be used to validate input to the  | 
| get_input_jsonschema | Get a JSON schema that represents the input to the  | 
| get_output_schema | Get a Pydantic model that can be used to validate output to the  | 
| get_output_jsonschema | Get a JSON schema that represents the output of the  | 
| config_schema | The type of config this  | 
| get_config_jsonschema | Get a JSON schema that represents the config of the  | 
| get_graph | Return a graph representation of this  | 
| get_prompts | Return a list of prompts used by this  | 
| __or__ | Runnable "or" operator. | 
| __ror__ | Runnable "reverse-or" operator. | 
| pipe | Pipe  | 
| pick | Pick keys from the output  | 
| assign | Assigns new fields to the  | 
| invoke | Transform a single input into an output. | 
| ainvoke | Transform a single input into an output. | 
| batch | Default implementation runs invoke in parallel using a thread pool executor. | 
| batch_as_completed | Run  | 
| abatch | Default implementation runs  | 
| abatch_as_completed | Run  | 
| stream | Default implementation of  | 
| astream | Default implementation of  | 
| astream_log | Stream all output from a  | 
| astream_events | Generate a stream of events. | 
| transform | Transform inputs to outputs. | 
| atransform | Transform inputs to outputs. | 
| bind | Bind arguments to a  | 
| with_config | Bind config to a  | 
| with_listeners | Bind lifecycle listeners to a  | 
| with_alisteners | Bind async lifecycle listeners to a  | 
| with_types | Bind input and output types to a  | 
| with_retry | Create a new  | 
| map | Return a new  | 
| with_fallbacks | Add fallbacks to a  | 
| as_tool | Create a  | 
| __init__ |  | 
| is_lc_serializable | Is this class serializable? | 
| get_lc_namespace | Get the namespace of the LangChain object. | 
| lc_id | Return a unique identifier for this class for serialization purposes. | 
| to_json_not_implemented | Serialize a "not implemented" object. | 
class-attribute
      instance-attribute
  
¶
name: str | None = None
The name of the Runnable. Used for debugging and tracing.
property
  
¶
InputType: type[Input]
Input type.
The type of input this Runnable accepts specified as a type annotation.
| RAISES | DESCRIPTION | 
|---|---|
| TypeError | If the input type cannot be inferred. | 
property
  
¶
OutputType: type[Output]
Output Type.
The type of output this Runnable produces specified as a type annotation.
| RAISES | DESCRIPTION | 
|---|---|
| TypeError | If the output type cannot be inferred. | 
property
  
¶
    The type of input this Runnable accepts specified as a Pydantic model.
property
  
¶
    Output schema.
The type of output this Runnable produces specified as a Pydantic model.
property
  
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable.
property
  
¶
    A map of constructor argument names to secret ids.
For example, {"openai_api_key": "OPENAI_API_KEY"}
property
  
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
Default is an empty dictionary.
      
    Serialize the Runnable to JSON.
| RETURNS | DESCRIPTION | 
|---|---|
| SerializedConstructor | SerializedNotImplemented | A JSON-serializable representation of the  | 
      
configurable_fields(
    **kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
Configure particular Runnable fields at runtime.
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | A dictionary of  
                  
                    TYPE:
                       | 
| RAISES | DESCRIPTION | 
|---|---|
| ValueError | If a configuration key is not found in the  | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Output] | A new  | 
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
    "max_tokens_200: ",
    model.with_config(configurable={"output_token_number": 200})
    .invoke("tell me something about chess")
    .content,
)
      
configurable_alternatives(
    which: ConfigurableField,
    *,
    default_key: str = "default",
    prefix_keys: bool = False,
    **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnable objects that can be set at runtime.
| PARAMETER | DESCRIPTION | 
|---|---|
| which | The  
                  
                    TYPE:
                       | 
| default_key | The default key to use if no alternative is selected. 
                  
                    TYPE:
                       | 
| prefix_keys | Whether to prefix the keys with the  
                  
                    TYPE:
                       | 
| **kwargs | A dictionary of keys to  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Output] | A new  | 
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
    model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
    model.with_config(configurable={"llm": "openai"})
    .invoke("which organization created you?")
    .content
)
      
    
      
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate input to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic input schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an input schema for a specific configuration.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate input. | 
      
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the input to the  | 
Example
Added in version 0.3.0
      
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate output to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic output schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an output schema for a specific configuration.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate output. | 
      
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | A config to use when generating the schema. 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| dict[str, Any] | A JSON schema that represents the output of the  | 
Example
Added in version 0.3.0
      
    The type of config this Runnable accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives methods.
| PARAMETER | DESCRIPTION | 
|---|---|
| include | A list of fields to include in the config schema. | 
| RETURNS | DESCRIPTION | 
|---|---|
| type[BaseModel] | A Pydantic model that can be used to validate config. | 
      
    
      
get_graph(config: RunnableConfig | None = None) -> Graph
Return a graph representation of this Runnable.
      
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable.
      
__or__(
    other: Runnable[Any, Other]
    | Callable[[Iterator[Any]], Iterator[Other]]
    | Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
    | Callable[[Any], Other]
    | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
__ror__(
    other: Runnable[Other, Any]
    | Callable[[Iterator[Other]], Iterator[Any]]
    | Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
    | Callable[[Other], Any]
    | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION | 
|---|---|
| other | Another  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Other, Output] | A new  | 
      
pipe(
    *others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable objects.
Compose this Runnable with Runnable-like objects to make a
RunnableSequence.
Equivalent to RunnableSequence(self, *others) or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
    return x + 1
def mul_two(x: int) -> int:
    return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| PARAMETER | DESCRIPTION | 
|---|---|
| *others | Other  
                  
                    TYPE:
                       | 
| name | An optional name for the resulting  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Input, Other] | A new  | 
      
    Pick keys from the output dict of this Runnable.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| PARAMETER | DESCRIPTION | 
|---|---|
| keys | A key or list of keys to pick from the output dict. | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | a new  | 
      
assign(
    **kwargs: Runnable[dict[str, Any], Any]
    | Callable[[dict[str, Any]], Any]
    | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | A mapping of keys to  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableSerializable[Any, Any] | A new  | 
      
abstractmethod
  
¶
invoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
Transform a single input into an output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
ainvoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
Transform a single input into an output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
batch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
batch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
Run invoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| tuple[int, Output | Exception] | Tuples of the index of the input and the output from the  | 
      
async
  
¶
abatch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]
Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| list[Output] | A list of outputs from the  | 
      
async
  
¶
abatch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION | 
|---|---|
| inputs | A list of inputs to the  
                  
                    TYPE:
                       | 
| config | A config to use when invoking the  
                  
                    TYPE:
                       | 
| return_exceptions | Whether to return exceptions instead of raising them. 
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[tuple[int, Output | Exception]] | A tuple of the index of the input and the output from the  | 
      
stream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Default implementation of stream, which calls invoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
astream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
Default implementation of astream, which calls ainvoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
async
  
¶
astream_log(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    diff: bool = True,
    with_streamed_output_list: bool = True,
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| diff | Whether to yield diffs between each step or the current state. 
                  
                    TYPE:
                       | 
| with_streamed_output_list | Whether to yield the  
                  
                    TYPE:
                       | 
| include_names | Only include logs with these names. | 
| include_types | Only include logs with these types. | 
| include_tags | Only include logs with these tags. | 
| exclude_names | Exclude logs with these names. | 
| exclude_types | Exclude logs with these types. | 
| exclude_tags | Exclude logs with these tags. | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[RunLogPatch] | AsyncIterator[RunLog] | A  | 
      
async
  
¶
astream_events(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    version: Literal["v1", "v2"] = "v2",
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent that provide real-time information
about the progress of the Runnable, including StreamEvent from intermediate
results.
A StreamEvent is a dictionary with the following schema:
- event: Event names are of the format:- on_[runnable_type]_(start|stream|end).
- name: The name of the- Runnablethat generated the event.
- run_id: Randomly generated ID associated with the given execution of the- Runnablethat emitted the event. A child- Runnablethat gets invoked as part of the execution of a parent- Runnableis assigned its own unique ID.
- parent_ids: The IDs of the parent runnables that generated the event. The root- Runnablewill have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
- tags: The tags of the- Runnablethat generated the event.
- metadata: The metadata of the- Runnablethat generated the event.
- data: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
| event | name | chunk | input | output | 
|---|---|---|---|---|
| on_chat_model_start | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | ||
| on_chat_model_stream | '[model name]' | AIMessageChunk(content="hello") | ||
| on_chat_model_end | '[model name]' | {"messages": [[SystemMessage, HumanMessage]]} | AIMessageChunk(content="hello world") | |
| on_llm_start | '[model name]' | {'input': 'hello'} | ||
| on_llm_stream | '[model name]' | 'Hello' | ||
| on_llm_end | '[model name]' | 'Hello human!' | ||
| on_chain_start | 'format_docs' | |||
| on_chain_stream | 'format_docs' | 'hello world!, goodbye world!' | ||
| on_chain_end | 'format_docs' | [Document(...)] | 'hello world!, goodbye world!' | |
| on_tool_start | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_tool_end | 'some_tool' | {"x": 1, "y": "2"} | ||
| on_retriever_start | '[retriever name]' | {"query": "hello"} | ||
| on_retriever_end | '[retriever name]' | {"query": "hello"} | [Document(...), ..] | |
| on_prompt_start | '[template_name]' | {"question": "hello"} | ||
| on_prompt_end | '[template_name]' | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage, ...]) | 
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
| Attribute | Type | Description | 
|---|---|---|
| name | str | A user defined name for the event. | 
| data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. | 
Here are declarations associated with the standard events shown above:
format_docs:
def format_docs(docs: list[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are Cat Agent 007"),
        ("human", "{question}"),
    ]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
    return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]
from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
| PARAMETER | DESCRIPTION | 
|---|---|
| input | The input to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| version | The version of the schema to use either  
                  
                    TYPE:
                       | 
| include_names | Only include events from  | 
| include_types | Only include events from  | 
| include_tags | Only include events from  | 
| exclude_names | Exclude events from  | 
| exclude_types | Exclude events from  | 
| exclude_tags | Exclude events from  | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[StreamEvent] | An async stream of  | 
| RAISES | DESCRIPTION | 
|---|---|
| NotImplementedError | If the version is not  | 
      
transform(
    input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| Output | The output of the  | 
      
async
  
¶
atransform(
    input: AsyncIterator[Input],
    config: RunnableConfig | None = None,
    **kwargs: Any | None,
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION | 
|---|---|
| input | An async iterator of inputs to the  
                  
                    TYPE:
                       | 
| config | The config to use for the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| YIELDS | DESCRIPTION | 
|---|---|
| AsyncIterator[Output] | The output of the  | 
      
    Bind arguments to a Runnable, returning a new Runnable.
Useful when a Runnable in a chain requires an argument that is not
in the output of the previous Runnable or included in the user input.
| PARAMETER | DESCRIPTION | 
|---|---|
| **kwargs | The arguments to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
      
with_config(
    config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| config | The config to bind to the  
                  
                    TYPE:
                       | 
| **kwargs | Additional keyword arguments to pass to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_listeners(
    *,
    on_start: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
    on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
    on_error: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable, returning a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called before the  
                  
                    TYPE:
                       | 
| on_end | Called after the  
                  
                    TYPE:
                       | 
| on_error | Called if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
    time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
      
with_alisteners(
    *,
    on_start: AsyncListener | None = None,
    on_end: AsyncListener | None = None,
    on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable.
Returns a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION | 
|---|---|
| on_start | Called asynchronously before the  
                  
                    TYPE:
                       | 
| on_end | Called asynchronously after the  
                  
                    TYPE:
                       | 
| on_error | Called asynchronously if the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
    return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
    print(f"on start callback starts at {format_t(time.time())}")
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
    print(f"on end callback starts at {format_t(time.time())}")
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
      
with_types(
    *, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION | 
|---|---|
| input_type | The input type to bind to the  
                  
                    TYPE:
                       | 
| output_type | The output type to bind to the  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new  | 
      
with_retry(
    *,
    retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
    wait_exponential_jitter: bool = True,
    exponential_jitter_params: ExponentialJitterParams | None = None,
    stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
| PARAMETER | DESCRIPTION | 
|---|---|
| retry_if_exception_type | A tuple of exception types to retry on. 
                  
                    TYPE:
                       | 
| wait_exponential_jitter | Whether to add jitter to the wait time between retries. 
                  
                    TYPE:
                       | 
| stop_after_attempt | The maximum number of attempts to make before giving up. 
                  
                    TYPE:
                       | 
| exponential_jitter_params | Parameters for
 
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| Runnable[Input, Output] | A new Runnable that retries the original Runnable on exceptions. | 
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
        pass
runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass
assert count == 2
      
    
      
with_fallbacks(
    fallbacks: Sequence[Runnable[Input, Output]],
    *,
    exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
    exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable, returning a new Runnable.
The new Runnable will try the original Runnable, and then each fallback
in order, upon failures.
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""
def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
)
print("".join(runnable.stream({})))  # foo bar
| PARAMETER | DESCRIPTION | 
|---|---|
| fallbacks | A sequence of runnables to try if the original  | 
| exceptions_to_handle | A tuple of exception types to handle. 
                  
                    TYPE:
                       | 
| exception_key | If  
                  
                    TYPE:
                       | 
| RETURNS | DESCRIPTION | 
|---|---|
| RunnableWithFallbacks[Input, Output] | A new  | 
      
as_tool(
    args_schema: type[BaseModel] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    arg_types: dict[str, type] | None = None,
) -> BaseTool
Create a BaseTool from a Runnable.
as_tool will instantiate a BaseTool with a name, description, and
args_schema from a Runnable. Where possible, schemas are inferred
from runnable.get_input_schema. Alternatively (e.g., if the
Runnable takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema. You can also
pass arg_types to just specify the required arguments and their types.
| PARAMETER | DESCRIPTION | 
|---|---|
| args_schema | The schema for the tool. | 
| name | The name of the tool. 
                  
                    TYPE:
                       | 
| description | The description of the tool. 
                  
                    TYPE:
                       | 
| arg_types | A dictionary of argument names to types. | 
| RETURNS | DESCRIPTION | 
|---|---|
| BaseTool | A  | 
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
    a: int
    b: list[int]
def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via args_schema:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""
    a: int = Field(..., description="Integer")
    b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via arg_types:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
      
classmethod
  
¶
is_lc_serializable() -> bool
Is this class serializable?
By design, even if a class inherits from Serializable, it is not serializable
by default. This is to prevent accidental serialization of objects that should
not be serialized.
| RETURNS | DESCRIPTION | 
|---|---|
| bool | Whether the class is serializable. Default is  | 
      
classmethod
  
¶
    
      
classmethod
  
¶
    Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
For example, for the class langchain.llms.openai.OpenAI, the id is
["langchain", "llms", "openai", "OpenAI"].
      
    Serialize a "not implemented" object.
| RETURNS | DESCRIPTION | 
|---|---|
| SerializedNotImplemented | 
 | 
    
              Bases: TypedDict
Configuration for a Runnable.
instance-attribute
  
¶
    Tags for this call and any sub-calls (eg. a Chain calling an LLM). You can use these to filter calls.
instance-attribute
  
¶
    Metadata for this call and any sub-calls (eg. a Chain calling an LLM). Keys should be strings, values should be JSON-serializable.
instance-attribute
  
¶
    Callbacks for this call and any sub-calls (eg. a Chain calling an LLM). Tags are passed to all callbacks, metadata is passed to handle*Start callbacks.
instance-attribute
  
¶
run_name: str
Name for the tracer run for this call. Defaults to the name of the class.
instance-attribute
  
¶
max_concurrency: int | None
Maximum number of parallel calls to make. If not provided, defaults to
ThreadPoolExecutor's default.
instance-attribute
  
¶
recursion_limit: int
Maximum number of times a call can recurse. If not provided, defaults to 25.
instance-attribute
  
¶
    Runtime values for attributes previously made configurable on this Runnable,
or sub-Runnables, through configurable_fields or configurable_alternatives.
Check output_schema for a description of the attributes that have been made
configurable.