Runnable#
- class langchain_core.runnables.base.Runnable[source]#
A unit of work that can be invoked, batched, streamed, transformed and composed.
Key Methods#
``invoke``/``ainvoke``: Transforms a single input into an output.
``batch``/``abatch``: Efficiently transforms multiple inputs into outputs.
``stream``/``astream``: Streams output from a single input as it’s produced.
``astream_log``: Streams output and selected intermediate results from an input.
Built-in optimizations:
Batch: By default, batch runs invoke() in parallel using a thread pool executor. Override to optimize batching.
Async: Methods with
'a'suffix are asynchronous. By default, they execute the sync counterpart using asyncio’s thread pool. Override for native async.
All methods accept an optional config argument, which can be used to configure execution, add tags and metadata for tracing and debugging etc.
Runnables expose schematic information about their input, output and config via the
input_schemaproperty, theoutput_schemaproperty andconfig_schemamethod.LCEL and Composition#
The LangChain Expression Language (LCEL) is a declarative way to compose ``Runnables``into chains. Any chain constructed this way will automatically have sync, async, batch, and streaming support.
The main composition primitives are
RunnableSequenceandRunnableParallel.``RunnableSequence`` invokes a series of runnables sequentially, with one Runnable’s output serving as the next’s input. Construct using the
|operator or by passing a list of runnables toRunnableSequence.``RunnableParallel`` invokes runnables concurrently, providing the same input to each. Construct it using a dict literal within a sequence or by passing a dict to
RunnableParallel.For example,
from langchain_core.runnables import RunnableLambda # A RunnableSequence constructed using the `|` operator sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2) sequence.invoke(1) # 4 sequence.batch([1, 2, 3]) # [4, 6, 8] # A sequence that contains a RunnableParallel constructed using a dict literal sequence = RunnableLambda(lambda x: x + 1) | { "mul_2": RunnableLambda(lambda x: x * 2), "mul_5": RunnableLambda(lambda x: x * 5), } sequence.invoke(1) # {'mul_2': 4, 'mul_5': 10}
Standard Methods#
All ``Runnable``s expose additional methods that can be used to modify their behavior (e.g., add a retry policy, add lifecycle listeners, make them configurable, etc.).
These methods will work on any
Runnable, includingRunnablechains constructed by composing other ``Runnable``s. See the individual methods for details.For example,
from langchain_core.runnables import RunnableLambda import random def add_one(x: int) -> int: return x + 1 def buggy_double(y: int) -> int: """Buggy code that will fail 70% of the time""" if random.random() > 0.3: print('This code failed, and will probably be retried!') # noqa: T201 raise ValueError('Triggered buggy code') return y * 2 sequence = ( RunnableLambda(add_one) | RunnableLambda(buggy_double).with_retry( # Retry on failure stop_after_attempt=10, wait_exponential_jitter=False ) ) print(sequence.input_schema.model_json_schema()) # Show inferred input schema print(sequence.output_schema.model_json_schema()) # Show inferred output schema print(sequence.invoke(2)) # invoke the sequence (note the retry above!!)
Debugging and tracing#
As the chains get longer, it can be useful to be able to see intermediate results to debug and trace the chain.
You can set the global debug flag to True to enable debug output for all chains:
from langchain_core.globals import set_debug set_debug(True)
Alternatively, you can pass existing or custom callbacks to any given chain:
from langchain_core.tracers import ConsoleCallbackHandler chain.invoke(..., config={"callbacks": [ConsoleCallbackHandler()]})
For a UI (and much more) checkout LangSmith.
Attributes
InputTypeInput type.
OutputTypeOutput Type.
config_specsList configurable fields for this
Runnable.input_schemaThe type of input this
Runnableaccepts specified as a pydantic model.output_schemaOutput schema.
Methods
abatch(inputs[, config, return_exceptions])Default implementation runs
ainvokein parallel usingasyncio.gather.Run
ainvokein parallel on a list of inputs.ainvoke(input[, config])Transform a single input into an output.
as_tool([args_schema, name, description, ...])assign(**kwargs)Assigns new fields to the dict output of this
Runnable.astream(input[, config])Default implementation of
astream, which callsainvoke.astream_events(input[, config, version, ...])Generate a stream of events.
Stream all output from a
Runnable, as reported to the callback system.atransform(input[, config])Transform inputs to outputs.
batch(inputs[, config, return_exceptions])Default implementation runs invoke in parallel using a thread pool executor.
Run
invokein parallel on a list of inputs.bind(**kwargs)Bind arguments to a
Runnable, returning a newRunnable.config_schema(*[, include])The type of config this
Runnableaccepts specified as a pydantic model.get_config_jsonschema(*[, include])Get a JSON schema that represents the config of the
Runnable.get_graph([config])Return a graph representation of this
Runnable.get_input_jsonschema([config])Get a JSON schema that represents the input to the
Runnable.get_input_schema([config])Get a pydantic model that can be used to validate input to the Runnable.
get_name([suffix, name])Get the name of the
Runnable.get_output_jsonschema([config])Get a JSON schema that represents the output of the
Runnable.get_output_schema([config])Get a pydantic model that can be used to validate output to the
Runnable.get_prompts([config])Return a list of prompts used by this
Runnable.invoke(input[, config])Transform a single input into an output.
map()Return a new
Runnablethat maps a list of inputs to a list of outputs.pick(keys)Pick keys from the output dict of this
Runnable.pipe(*others[, name])Pipe runnables.
stream(input[, config])Default implementation of
stream, which callsinvoke.transform(input[, config])Transform inputs to outputs.
with_alisteners(*[, on_start, on_end, on_error])Bind async lifecycle listeners to a
Runnable.with_config([config])Bind config to a
Runnable, returning a newRunnable.with_fallbacks(fallbacks, *[, ...])Add fallbacks to a
Runnable, returning a newRunnable.with_listeners(*[, on_start, on_end, on_error])Bind lifecycle listeners to a
Runnable, returning a newRunnable.with_retry(*[, retry_if_exception_type, ...])Create a new Runnable that retries the original Runnable on exceptions.
with_types(*[, input_type, output_type])Bind input and output types to a
Runnable, returning a newRunnable.- async abatch(
- inputs: list[Input],
- config: RunnableConfig | list[RunnableConfig] | None = None,
- *,
- return_exceptions: bool = False,
- **kwargs: Any | None,
Default implementation runs
ainvokein parallel usingasyncio.gather.The default implementation of
batchworks well for IO bound runnables.Subclasses should override this method if they can batch more efficiently; e.g., if the underlying
Runnableuses an API which supports a batch mode.- Parameters:
inputs (list[Input]) – A list of inputs to the
Runnable.config (RunnableConfig | list[RunnableConfig] | None) – A config to use when invoking the
Runnable. The config supports standard keys like'tags','metadata'for tracing purposes,'max_concurrency'for controlling how much work to do in parallel, and other keys. Please refer to theRunnableConfigfor more details. Defaults to None.return_exceptions (bool) – Whether to return exceptions instead of raising them. Defaults to False.
**kwargs (Any | None) – Additional keyword arguments to pass to the
Runnable.
- Returns:
A list of outputs from the
Runnable.- Return type:
list[Output]
- async abatch_as_completed(
- inputs: Sequence[Input],
- config: RunnableConfig | Sequence[RunnableConfig] | None = None,
- *,
- return_exceptions: Literal[False] = False,
- **kwargs: Any | None,
- async abatch_as_completed(
- inputs: Sequence[Input],
- config: RunnableConfig | Sequence[RunnableConfig] | None = None,
- *,
- return_exceptions: Literal[True],
- **kwargs: Any | None,
Run
ainvokein parallel on a list of inputs.Yields results as they complete.
- Parameters:
inputs – A list of inputs to the
Runnable.config – A config to use when invoking the
Runnable. The config supports standard keys like'tags','metadata'for tracing purposes,'max_concurrency'for controlling how much work to do in parallel, and other keys. Please refer to theRunnableConfigfor more details. Defaults to None.return_exceptions – Whether to return exceptions instead of raising them. Defaults to False.
kwargs – Additional keyword arguments to pass to the
Runnable.
- Yields:
A tuple of the index of the input and the output from the
Runnable.
- async ainvoke(
- input: Input,
- config: RunnableConfig | None = None,
- **kwargs: Any,
Transform a single input into an output.
- Parameters:
input (Input) – The input to the
Runnable.config (RunnableConfig | None) – A config to use when invoking the
Runnable. The config supports standard keys like'tags','metadata'for tracing purposes,'max_concurrency'for controlling how much work to do in parallel, and other keys. Please refer to theRunnableConfigfor more details. Defaults to None.kwargs (Any)
- Returns:
The output of the
Runnable.- Return type:
Output
- as_tool(
- args_schema: type[BaseModel] | None = None,
- *,
- name: str | None = None,
- description: str | None = None,
- arg_types: dict[str, type] | None = None,
Beta
This API is in beta and may change in the future.
Create a
BaseToolfrom aRunnable.as_toolwill instantiate aBaseToolwith a name, description, andargs_schemafrom aRunnable. Where possible, schemas are inferred fromrunnable.get_input_schema. Alternatively (e.g., if theRunnabletakes a dict as input and the specific dict keys are not typed), the schema can be specified directly withargs_schema. You can also passarg_typesto just specify the required arguments and their types.- Parameters:
args_schema (Optional[type[BaseModel]]) – The schema for the tool. Defaults to None.
name (Optional[str]) – The name of the tool. Defaults to None.
description (Optional[str]) – The description of the tool. Defaults to None.
arg_types (Optional[dict[str, type]]) – A dictionary of argument names to types. Defaults to None.
- Returns:
A
BaseToolinstance.- Return type:
Typed dict input:
from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: list[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dictinput, specifying schema viaargs_schema:from typing import Any from pydantic import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: list[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dictinput, specifying schema viaarg_types:from typing import Any from langchain_core.runnables import RunnableLambda def f(x: dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
Added in version 0.2.14.
- assign(
- **kwargs: Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any] | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
Assigns new fields to the dict output of this
Runnable.from langchain_community.llms.fake import FakeStreamingListLLM from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import SystemMessagePromptTemplate from langchain_core.runnables import Runnable from operator import itemgetter prompt = ( SystemMessagePromptTemplate.from_template("You are a nice assistant.") + "{question}" ) llm = FakeStreamingListLLM(responses=["foo-lish"]) chain: Runnable = prompt | llm | {"str": StrOutputParser()} chain_with_assign = chain.assign(hello=itemgetter("str") | llm) print(chain_with_assign.input_schema.model_json_schema()) # {'title': 'PromptInput', 'type': 'object', 'properties': {'question': {'title': 'Question', 'type': 'string'}}} print(chain_with_assign.output_schema.model_json_schema()) # {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties': {'str': {'title': 'Str', 'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
- Parameters:
**kwargs (Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any] | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]]) – A mapping of keys to
RunnableorRunnable-like objects that will be invoked with the entire output dict of thisRunnable.- Returns:
A new
Runnable.- Return type:
RunnableSerializable[Any, Any]
- async astream(
- input: Input,
- config: RunnableConfig | None = None,
- **kwargs: Any | None,
Default implementation of
astream, which callsainvoke.Subclasses should override this method if they support streaming output.
- Parameters:
input (Input) – The input to the
Runnable.config (RunnableConfig | None) – The config to use for the
Runnable. Defaults to None.kwargs (Any | None) – Additional keyword arguments to pass to the
Runnable.
- Yields:
The output of the
Runnable.- Return type:
AsyncIterator[Output]
- async astream_events(
- input: Any,
- config: RunnableConfig | None = None,
- *,
- version: Literal['v1', 'v2'] = 'v2',
- include_names: Sequence[str] | None = None,
- include_types: Sequence[str] | None = None,
- include_tags: Sequence[str] | None = None,
- exclude_names: Sequence[str] | None = None,
- exclude_types: Sequence[str] | None = None,
- exclude_tags: Sequence[str] | None = None,
- **kwargs: Any,
Generate a stream of events.
Use to create an iterator over
StreamEventsthat provide real-time information about the progress of theRunnable, includingStreamEventsfrom intermediate results.A
StreamEventis a dictionary with the following schema:event: str - Event names are of the format:on_[runnable_type]_(start|stream|end).name: str - The name of theRunnablethat generated the event.run_id: str - randomly generated ID associated with the given execution of theRunnablethat emitted the event. A childRunnablethat gets invoked as part of the execution of a parentRunnableis assigned its own unique ID.parent_ids: list[str] - The IDs of the parent runnables that generated the event. The rootRunnablewill have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags: Optional[list[str]] - The tags of theRunnablethat generated the event.metadata: Optional[dict[str, Any]] - The metadata of theRunnablethat generated the event.data: dict[str, Any]
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
event
name
chunk
input
output
on_chat_model_start[model name]
{"messages": [[SystemMessage, HumanMessage]]}on_chat_model_stream[model name]
AIMessageChunk(content="hello")on_chat_model_end[model name]
{"messages": [[SystemMessage, HumanMessage]]}AIMessageChunk(content="hello world")on_llm_start[model name]
{'input': 'hello'}on_llm_stream[model name]
``’Hello’ ``
on_llm_end[model name]
'Hello human!'on_chain_startformat_docs
on_chain_streamformat_docs
'hello world!, goodbye world!'on_chain_endformat_docs
[Document(...)]'hello world!, goodbye world!'on_tool_startsome_tool
{"x": 1, "y": "2"}on_tool_endsome_tool
{"x": 1, "y": "2"}on_retriever_start[retriever name]
{"query": "hello"}on_retriever_end[retriever name]
{"query": "hello"}[Document(...), ..]on_prompt_start[template_name]
{"question": "hello"}on_prompt_end[template_name]
{"question": "hello"}ChatPromptValue(messages: [SystemMessage, ...])In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
Attribute
Type
Description
name
str
A user defined name for the event.
data
Any
The data associated with the event. This can be anything, though we suggest making it JSON serializable.
Here are declarations associated with the standard events shown above:
format_docs:def format_docs(docs: list[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
prompt:template = ChatPromptTemplate.from_messages( [ ("system", "You are Cat Agent 007"), ("human", "{question}"), ] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
Example:
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
Example: Dispatch Custom Event
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- Parameters:
input (Any) – The input to the
Runnable.config (Optional[RunnableConfig]) – The config to use for the
Runnable.version (Literal['v1', 'v2']) – The version of the schema to use either
'v2'or'v1'. Users should use'v2'.'v1'is for backwards compatibility and will be deprecated in 0.4.0. No default will be assigned until the API is stabilized. custom events will only be surfaced in'v2'.include_names (Optional[Sequence[str]]) – Only include events from
Runnableswith matching names.include_types (Optional[Sequence[str]]) – Only include events from
Runnableswith matching types.include_tags (Optional[Sequence[str]]) – Only include events from
Runnableswith matching tags.exclude_names (Optional[Sequence[str]]) – Exclude events from
Runnableswith matching names.exclude_types (Optional[Sequence[str]]) – Exclude events from
Runnableswith matching types.exclude_tags (Optional[Sequence[str]]) – Exclude events from
Runnableswith matching tags.kwargs (Any) – Additional keyword arguments to pass to the
Runnable. These will be passed toastream_logas this implementation ofastream_eventsis built on top ofastream_log.
- Yields:
An async stream of
StreamEvents.- Raises:
NotImplementedError – If the version is not
'v1'or'v2'.- Return type:
AsyncIterator[StreamEvent]
- async astream_log(
- input: Any,
- config: RunnableConfig | None = None,
- *,
- diff: Literal[True] = True,
- with_streamed_output_list: bool = True,
- include_names: Sequence[str] | None = None,
- include_types: Sequence[str] | None = None,
- include_tags: Sequence[str] | None = None,
- exclude_names: Sequence[str] | None = None,
- exclude_types: Sequence[str] | None = None,
- exclude_tags: Sequence[str] | None = None,
- **kwargs: Any,
- async astream_log(
- input: Any,
- config: RunnableConfig | None = None,
- *,
- diff: Literal[False],
- with_streamed_output_list: bool = True,
- include_names: Sequence[str] | None = None,
- include_types: Sequence[str] | None = None,
- include_tags: Sequence[str] | None = None,
- exclude_names: Sequence[str] | None = None,
- exclude_types: Sequence[str] | None = None,
- exclude_tags: Sequence[str] | None = None,
- **kwargs: Any,
Stream all output from a
Runnable, as reported to the callback system.This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
- Parameters:
input – The input to the
Runnable.config – The config to use for the
Runnable.diff – Whether to yield diffs between each step or the current state.
with_streamed_output_list – Whether to yield the
streamed_outputlist.include_names – Only include logs with these names.
include_types – Only include logs with these types.
include_tags – Only include logs with these tags.
exclude_names – Exclude logs with these names.
exclude_types – Exclude logs with these types.
exclude_tags – Exclude logs with these tags.
kwargs – Additional keyword arguments to pass to the
Runnable.
- Yields:
A
RunLogPatchorRunLogobject.
- async atransform(
- input: AsyncIterator[Input],
- config: RunnableConfig | None = None,
- **kwargs: Any | None,
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls
astream.Subclasses should override this method if they can start producing output while input is still being generated.
- Parameters:
input (AsyncIterator[Input]) – An async iterator of inputs to the
Runnable.config (RunnableConfig | None) – The config to use for the
Runnable. Defaults to None.kwargs (Any | None) – Additional keyword arguments to pass to the
Runnable.
- Yields:
The output of the
Runnable.- Return type:
AsyncIterator[Output]
- batch(
- inputs: list[Input],
- config: RunnableConfig | list[RunnableConfig] | None = None,
- *,
- return_exceptions: bool = False,
- **kwargs: Any | None,
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently; e.g., if the underlying
Runnableuses an API which supports a batch mode.- Parameters:
inputs (list[Input]) – A list of inputs to the
Runnable.config (RunnableConfig | list[RunnableConfig] | None) – A config to use when invoking the
Runnable. The config supports standard keys like'tags','metadata'for tracing purposes,'max_concurrency'for controlling how much work to do in parallel, and other keys. Please refer to theRunnableConfigfor more details. Defaults to None.return_exceptions (bool) – Whether to return exceptions instead of raising them. Defaults to False.
**kwargs (Any | None) – Additional keyword arguments to pass to the
Runnable.
- Returns:
A list of outputs from the
Runnable.- Return type:
list[Output]
- batch_as_completed(
- inputs: Sequence[Input],
- config: RunnableConfig | Sequence[RunnableConfig] | None = None,
- *,
- return_exceptions: Literal[False] = False,
- **kwargs: Any,
- batch_as_completed(
- inputs: Sequence[Input],
- config: RunnableConfig | Sequence[RunnableConfig] | None = None,
- *,
- return_exceptions: Literal[True],
- **kwargs: Any,
Run
invokein parallel on a list of inputs.Yields results as they complete.
- Parameters:
inputs – A list of inputs to the
Runnable.config – A config to use when invoking the
Runnable. The config supports standard keys like'tags','metadata'for tracing purposes,'max_concurrency'for controlling how much work to do in parallel, and other keys. Please refer to theRunnableConfigfor more details. Defaults to None.return_exceptions – Whether to return exceptions instead of raising them. Defaults to False.
**kwargs – Additional keyword arguments to pass to the
Runnable.
- Yields:
Tuples of the index of the input and the output from the
Runnable.
- bind(
- **kwargs: Any,
Bind arguments to a
Runnable, returning a newRunnable.Useful when a
Runnablein a chain requires an argument that is not in the output of the previousRunnableor included in the user input.- Parameters:
kwargs (Any) – The arguments to bind to the
Runnable.- Returns:
A new
Runnablewith the arguments bound.- Return type:
Runnable[Input, Output]
Example:
from langchain_ollama import ChatOllama from langchain_core.output_parsers import StrOutputParser llm = ChatOllama(model="llama2") # Without bind. chain = llm | StrOutputParser() chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two three four five.' # With bind. chain = llm.bind(stop=["three"]) | StrOutputParser() chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two'
- config_schema(
- *,
- include: Sequence[str] | None = None,
The type of config this
Runnableaccepts specified as a pydantic model.To mark a field as configurable, see the
configurable_fieldsandconfigurable_alternativesmethods.- Parameters:
include (Sequence[str] | None) – A list of fields to include in the config schema.
- Returns:
A pydantic model that can be used to validate config.
- Return type:
type[BaseModel]
- get_config_jsonschema(
- *,
- include: Sequence[str] | None = None,
Get a JSON schema that represents the config of the
Runnable.- Parameters:
include (Sequence[str] | None) – A list of fields to include in the config schema.
- Returns:
A JSON schema that represents the config of the
Runnable.- Return type:
dict[str, Any]
Added in version 0.3.0.
- get_graph(
- config: RunnableConfig | None = None,
Return a graph representation of this
Runnable.- Parameters:
config (Optional[RunnableConfig])
- Return type:
- get_input_jsonschema(
- config: RunnableConfig | None = None,
Get a JSON schema that represents the input to the
Runnable.- Parameters:
config (RunnableConfig | None) – A config to use when generating the schema.
- Returns:
A JSON schema that represents the input to the
Runnable.- Return type:
dict[str, Any]
Example
from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 runnable = RunnableLambda(add_one) print(runnable.get_input_jsonschema())
Added in version 0.3.0.
- get_input_schema(
- config: RunnableConfig | None = None,
Get a pydantic model that can be used to validate input to the Runnable.
Runnable``s that leverage the ``configurable_fieldsandconfigurable_alternativesmethods will have a dynamic input schema that depends on which configuration theRunnableis invoked with.This method allows to get an input schema for a specific configuration.
- Parameters:
config (RunnableConfig | None) – A config to use when generating the schema.
- Returns:
A pydantic model that can be used to validate input.
- Return type:
type[BaseModel]
- get_name(
- suffix: str | None = None,
- *,
- name: str | None = None,
Get the name of the
Runnable.- Parameters:
suffix (str | None) – An optional suffix to append to the name.
name (str | None) – An optional name to use instead of the
Runnable’s name.
- Returns:
The name of the
Runnable.- Return type:
str
- get_output_jsonschema(
- config: RunnableConfig | None = None,
Get a JSON schema that represents the output of the
Runnable.- Parameters:
config (RunnableConfig | None) – A config to use when generating the schema.
- Returns:
A JSON schema that represents the output of the
Runnable.- Return type:
dict[str, Any]
Example
from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 runnable = RunnableLambda(add_one) print(runnable.get_output_jsonschema())
Added in version 0.3.0.
- get_output_schema(
- config: RunnableConfig | None = None,
Get a pydantic model that can be used to validate output to the
Runnable.Runnable``s that leverage the ``configurable_fieldsandconfigurable_alternativesmethods will have a dynamic output schema that depends on which configuration theRunnableis invoked with.This method allows to get an output schema for a specific configuration.
- Parameters:
config (RunnableConfig | None) – A config to use when generating the schema.
- Returns:
A pydantic model that can be used to validate output.
- Return type:
type[BaseModel]
- get_prompts(
- config: RunnableConfig | None = None,
Return a list of prompts used by this
Runnable.- Parameters:
config (Optional[RunnableConfig])
- Return type:
list[BasePromptTemplate]
- abstractmethod invoke(
- input: Input,
- config: RunnableConfig | None = None,
- **kwargs: Any,
Transform a single input into an output.
- Parameters:
input (Input) – The input to the
Runnable.config (RunnableConfig | None) – A config to use when invoking the
Runnable. The config supports standard keys like'tags','metadata'for tracing purposes,'max_concurrency'for controlling how much work to do in parallel, and other keys. Please refer to theRunnableConfigfor more details. Defaults to None.kwargs (Any)
- Returns:
The output of the
Runnable.- Return type:
Output
- map() Runnable[list[Input], list[Output]][source]#
Return a new
Runnablethat maps a list of inputs to a list of outputs.Calls
invokewith each input.- Returns:
A new
Runnablethat maps a list of inputs to a list of outputs.- Return type:
Runnable[list[Input], list[Output]]
Example
from langchain_core.runnables import RunnableLambda def _lambda(x: int) -> int: return x + 1 runnable = RunnableLambda(_lambda) print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]
- pick(
- keys: str | list[str],
Pick keys from the output dict of this
Runnable.Pick single key:
import json from langchain_core.runnables import RunnableLambda, RunnableMap as_str = RunnableLambda(str) as_json = RunnableLambda(json.loads) chain = RunnableMap(str=as_str, json=as_json) chain.invoke("[1, 2, 3]") # -> {"str": "[1, 2, 3]", "json": [1, 2, 3]} json_only_chain = chain.pick("json") json_only_chain.invoke("[1, 2, 3]") # -> [1, 2, 3]
Pick list of keys:
from typing import Any import json from langchain_core.runnables import RunnableLambda, RunnableMap as_str = RunnableLambda(str) as_json = RunnableLambda(json.loads) def as_bytes(x: Any) -> bytes: return bytes(x, "utf-8") chain = RunnableMap( str=as_str, json=as_json, bytes=RunnableLambda(as_bytes) ) chain.invoke("[1, 2, 3]") # -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"} json_and_bytes_chain = chain.pick(["json", "bytes"]) json_and_bytes_chain.invoke("[1, 2, 3]") # -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
- Parameters:
keys (str | list[str]) – A key or list of keys to pick from the output dict.
- Returns:
a new
Runnable.- Return type:
RunnableSerializable[Any, Any]
- pipe(
- *others: Runnable[Any, Other] | Callable[[Any], Other],
- name: str | None = None,
Pipe runnables.
Compose this
RunnablewithRunnable-like objects to make aRunnableSequence.Equivalent to
RunnableSequence(self, *others)orself | others[0] | ...Example
from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 def mul_two(x: int) -> int: return x * 2 runnable_1 = RunnableLambda(add_one) runnable_2 = RunnableLambda(mul_two) sequence = runnable_1.pipe(runnable_2) # Or equivalently: # sequence = runnable_1 | runnable_2 # sequence = RunnableSequence(first=runnable_1, last=runnable_2) sequence.invoke(1) await sequence.ainvoke(1) # -> 4 sequence.batch([1, 2, 3]) await sequence.abatch([1, 2, 3]) # -> [4, 6, 8]
- Parameters:
*others (Runnable[Any, Other] | Callable[[Any], Other]) – Other
RunnableorRunnable-like objects to composename (str | None) – An optional name for the resulting
RunnableSequence.
- Returns:
A new
Runnable.- Return type:
RunnableSerializable[-Input, ~Other]
- stream(
- input: Input,
- config: RunnableConfig | None = None,
- **kwargs: Any | None,
Default implementation of
stream, which callsinvoke.Subclasses should override this method if they support streaming output.
- Parameters:
input (Input) – The input to the
Runnable.config (RunnableConfig | None) – The config to use for the
Runnable. Defaults to None.kwargs (Any | None) – Additional keyword arguments to pass to the
Runnable.
- Yields:
The output of the
Runnable.- Return type:
Iterator[Output]
- transform(
- input: Iterator[Input],
- config: RunnableConfig | None = None,
- **kwargs: Any | None,
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls
astream.Subclasses should override this method if they can start producing output while input is still being generated.
- Parameters:
input (Iterator[Input]) – An iterator of inputs to the
Runnable.config (RunnableConfig | None) – The config to use for the
Runnable. Defaults to None.kwargs (Any | None) – Additional keyword arguments to pass to the
Runnable.
- Yields:
The output of the
Runnable.- Return type:
Iterator[Output]
- with_alisteners(
- *,
- on_start: AsyncListener | None = None,
- on_end: AsyncListener | None = None,
- on_error: AsyncListener | None = None,
Bind async lifecycle listeners to a
Runnable.Returns a new
Runnable.The Run object contains information about the run, including its
id,type,input,output,error,start_time,end_time, and any tags or metadata added to the run.- Parameters:
on_start (Optional[AsyncListener]) – Called asynchronously before the
Runnablestarts running, with theRunobject. Defaults to None.on_end (Optional[AsyncListener]) – Called asynchronously after the
Runnablefinishes running, with theRunobject. Defaults to None.on_error (Optional[AsyncListener]) – Called asynchronously if the
Runnablethrows an error, with theRunobject. Defaults to None.
- Returns:
A new
Runnablewith the listeners bound.- Return type:
Runnable[Input, Output]
Example:
from langchain_core.runnables import RunnableLambda, Runnable from datetime import datetime, timezone import time import asyncio def format_t(timestamp: float) -> str: return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat() async def test_runnable(time_to_sleep : int): print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}") await asyncio.sleep(time_to_sleep) print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}") async def fn_start(run_obj : Runnable): print(f"on start callback starts at {format_t(time.time())}") await asyncio.sleep(3) print(f"on start callback ends at {format_t(time.time())}") async def fn_end(run_obj : Runnable): print(f"on end callback starts at {format_t(time.time())}") await asyncio.sleep(2) print(f"on end callback ends at {format_t(time.time())}") runnable = RunnableLambda(test_runnable).with_alisteners( on_start=fn_start, on_end=fn_end ) async def concurrent_runs(): await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3)) asyncio.run(concurrent_runs()) Result: on start callback starts at 2025-03-01T07:05:22.875378+00:00 on start callback starts at 2025-03-01T07:05:22.875495+00:00 on start callback ends at 2025-03-01T07:05:25.878862+00:00 on start callback ends at 2025-03-01T07:05:25.878947+00:00 Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00 Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00 Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00 on end callback starts at 2025-03-01T07:05:27.882360+00:00 Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00 on end callback starts at 2025-03-01T07:05:28.882428+00:00 on end callback ends at 2025-03-01T07:05:29.883893+00:00 on end callback ends at 2025-03-01T07:05:30.884831+00:00
- with_config(
- config: RunnableConfig | None = None,
- **kwargs: Any,
Bind config to a
Runnable, returning a newRunnable.- Parameters:
config (RunnableConfig | None) – The config to bind to the
Runnable.kwargs (Any) – Additional keyword arguments to pass to the
Runnable.
- Returns:
A new
Runnablewith the config bound.- Return type:
Runnable[Input, Output]
- with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: tuple[type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output][source]#
Add fallbacks to a
Runnable, returning a newRunnable.The new
Runnablewill try the originalRunnable, and then each fallback in order, upon failures.- Parameters:
fallbacks (Sequence[Runnable[Input, Output]]) – A sequence of runnables to try if the original
Runnablefails.exceptions_to_handle (tuple[type[BaseException], ...]) – A tuple of exception types to handle. Defaults to
(Exception,).exception_key (Optional[str]) – If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key. If None, exceptions will not be passed to fallbacks. If used, the base
Runnableand its fallbacks must accept a dictionary as input. Defaults to None.
- Returns:
A new
Runnablethat will try the originalRunnable, and then each fallback in order, upon failures.- Return type:
RunnableWithFallbacksT[Input, Output]
Example
from typing import Iterator from langchain_core.runnables import RunnableGenerator def _generate_immediate_error(input: Iterator) -> Iterator[str]: raise ValueError() yield "" def _generate(input: Iterator) -> Iterator[str]: yield from "foo bar" runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks( [RunnableGenerator(_generate)] ) print("".join(runnable.stream({}))) # foo bar
- Parameters:
fallbacks (Sequence[Runnable[Input, Output]]) – A sequence of runnables to try if the original
Runnablefails.exceptions_to_handle (tuple[type[BaseException], ...]) – A tuple of exception types to handle.
exception_key (Optional[str]) – If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key. If None, exceptions will not be passed to fallbacks. If used, the base
Runnableand its fallbacks must accept a dictionary as input.
- Returns:
A new
Runnablethat will try the originalRunnable, and then each fallback in order, upon failures.- Return type:
RunnableWithFallbacksT[Input, Output]
- with_listeners(
- *,
- on_start: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
- on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
- on_error: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
Bind lifecycle listeners to a
Runnable, returning a newRunnable.The Run object contains information about the run, including its
id,type,input,output,error,start_time,end_time, and any tags or metadata added to the run.- Parameters:
on_start (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – Called before the
Runnablestarts running, with theRunobject. Defaults to None.on_end (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – Called after the
Runnablefinishes running, with theRunobject. Defaults to None.on_error (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – Called if the
Runnablethrows an error, with theRunobject. Defaults to None.
- Returns:
A new
Runnablewith the listeners bound.- Return type:
Runnable[Input, Output]
Example:
from langchain_core.runnables import RunnableLambda from langchain_core.tracers.schemas import Run import time def test_runnable(time_to_sleep: int): time.sleep(time_to_sleep) def fn_start(run_obj: Run): print("start_time:", run_obj.start_time) def fn_end(run_obj: Run): print("end_time:", run_obj.end_time) chain = RunnableLambda(test_runnable).with_listeners( on_start=fn_start, on_end=fn_end ) chain.invoke(2)
- with_retry(*, retry_if_exception_type: tuple[type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, exponential_jitter_params: Optional[ExponentialJitterParams] = None, stop_after_attempt: int = 3) Runnable[Input, Output][source]#
Create a new Runnable that retries the original Runnable on exceptions.
- Parameters:
retry_if_exception_type (tuple[type[BaseException], ...]) – A tuple of exception types to retry on. Defaults to (Exception,).
wait_exponential_jitter (bool) – Whether to add jitter to the wait time between retries. Defaults to True.
stop_after_attempt (int) – The maximum number of attempts to make before giving up. Defaults to 3.
exponential_jitter_params (Optional[ExponentialJitterParams]) – Parameters for
tenacity.wait_exponential_jitter. Namely:initial,max,exp_base, andjitter(all float values).
- Returns:
A new Runnable that retries the original Runnable on exceptions.
- Return type:
Runnable[Input, Output]
Example:
from langchain_core.runnables import RunnableLambda count = 0 def _lambda(x: int) -> None: global count count = count + 1 if x == 1: raise ValueError("x is 1") else: pass runnable = RunnableLambda(_lambda) try: runnable.with_retry( stop_after_attempt=2, retry_if_exception_type=(ValueError,), ).invoke(1) except ValueError: pass assert count == 2
- with_types(
- *,
- input_type: type[Input] | None = None,
- output_type: type[Output] | None = None,
Bind input and output types to a
Runnable, returning a newRunnable.- Parameters:
input_type (type[Input] | None) – The input type to bind to the
Runnable. Defaults to None.output_type (type[Output] | None) – The output type to bind to the
Runnable. Defaults to None.
- Returns:
A new Runnable with the types bound.
- Return type:
Runnable[Input, Output]
Examples using Runnable