Base classes and utilities for Runnable objects.
Async call function that may optionally accept a run_manager and/or config.
Call function that may optionally accept a run_manager and/or config.
Ensure that a config is a dict with all keys present.
Get an async callback manager for a config.
Get a callback manager for a config.
Get a list of configs from a single config or a list of configs.
It is useful for subclasses overriding batch() or abatch().
Get an executor for a config.
Merge multiple configs into one.
Patch a config with new values.
Run a function in an executor.
Set the child Runnable config + tracing context.
Check if a callable accepts a config argument.
Check if a callable accepts a run_manager argument.
Await a coroutine with a context.
Run a coroutine with a semaphore.
Gather coroutines with a limit on the number of concurrent coroutines.
Get the keys of the first argument of a function if it is a dict.
Get the nonlocal variables accessed by a function.
Get the source code of a lambda function.
Get the unique config specs from a sequence of config specs.
Indent all lines of text after the first line.
Check if a function is async.
Check if a function is an async generator.
Create a Pydantic model with the given field definitions.
Do not use outside of langchain packages. This API is subject to change at any time.
Coerce a Runnable-like object into a Runnable.
Decorate a function to make it a Runnable.
Sets the name of the Runnable to the name of the function.
Any runnables called by the function will be traced as dependencies.
Async callback manager that handles callbacks from LangChain.
Callback manager for LangChain.
Serializable base class.
This class is used to serialize objects to JSON.
It relies on the following methods and properties:
is_lc_serializable: Is this class serializable?
By design, even if a class inherits from Serializable, it is not serializable
by default. This is to prevent accidental serialization of objects that should
not be serialized.
get_lc_namespace: Get the namespace of the LangChain object.
During deserialization, this namespace is used to identify the correct class to instantiate.
Please see the Reviver class in langchain_core.load.load for more details.
During deserialization an additional mapping is handle classes that have moved or been renamed across package versions.
lc_secrets: A map of constructor argument names to secret ids.
lc_attributes: List of additional attribute names that should be included
as part of the serialized representation.
Serialized constructor.
Serialized not implemented.
Configuration for a Runnable.
Custom values
The TypedDict has total=False set intentionally to:
merge_configsvar_child_runnable_config (a ContextVar that automatically passes
config down the call stack without explicit parameter passing), where
configs are merged rather than replaced# Parent sets tags
chain.invoke(input, config={"tags": ["parent"]})
# Child automatically inherits and can add:
# ensure_config({"tags": ["child"]}) -> {"tags": ["parent", "child"]}Dictionary that can be added to another dictionary.
Field that can be configured by the user.
Field that can be configured by the user. It is a specification of a field.
Tracer that streams run logs to a stream.
Async tracer that calls listeners on run start, end, and error.
Tracer that calls listeners on run start, end, and error.
Async context manager to wrap an AsyncGenerator that has a aclose() method.
Code like this:
async with aclosing(<module>.fetch(<arguments>)) as agen:
<block>
...is equivalent to this:
agen = <module>.fetch(<arguments>)
try:
<block>
finally:
await agen.aclose()
Async callback manager for chain run.
Callback manager for chain run.
Base class for all prompt templates, returning a prompt.
Runnable that can fallback to other Runnable objects if it fails.
External APIs (e.g., APIs for a language model) may at times experience degraded performance or even downtime.
In these cases, it can be useful to have a fallback Runnable that can be
used in place of the original Runnable (e.g., fallback to another LLM provider).
Fallbacks can be defined at the level of a single Runnable, or at the level
of a chain of Runnables. Fallbacks are tried in order until one succeeds or
all fail.
While you can instantiate a RunnableWithFallbacks directly, it is usually
more convenient to use the with_fallbacks method on a Runnable.
Graph of nodes and edges.
Parameters for tenacity.wait_exponential_jitter.
Base class for all LangChain tools.
This abstract class defines the interface that all LangChain tools must implement.
Tools are components that can be called by agents to perform specific actions.
Run log.
Patch to the run log.
A unit of work that can be invoked, batched, streamed, transformed and composed.
invoke/ainvoke: Transforms a single input into an output.batch/abatch: Efficiently transforms multiple inputs into outputs.stream/astream: Streams output from a single input as it's produced.astream_log: Streams output and selected intermediate results from an
input.Built-in optimizations:
Batch: By default, batch runs invoke() in parallel using a thread pool executor. Override to optimize batching.
Async: Methods with 'a' prefix are asynchronous. By default, they execute
the sync counterpart using asyncio's thread pool.
Override for native async.
All methods accept an optional config argument, which can be used to configure execution, add tags and metadata for tracing and debugging etc.
Runnables expose schematic information about their input, output and config via
the input_schema property, the output_schema property and config_schema
method.
Runnable objects can be composed together to create chains in a declarative way.
Any chain constructed this way will automatically have sync, async, batch, and streaming support.
The main composition primitives are RunnableSequence and RunnableParallel.
RunnableSequence invokes a series of runnables sequentially, with
one Runnable's output serving as the next's input. Construct using
the | operator or by passing a list of runnables to RunnableSequence.
RunnableParallel invokes runnables concurrently, providing the same input
to each. Construct it using a dict literal within a sequence or by passing a
dict to RunnableParallel.
For example,
from langchain_core.runnables import RunnableLambda
# A RunnableSequence constructed using the `|` operator
sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2)
sequence.invoke(1) # 4
sequence.batch([1, 2, 3]) # [4, 6, 8]
# A sequence that contains a RunnableParallel constructed using a dict literal
sequence = RunnableLambda(lambda x: x + 1) | {
"mul_2": RunnableLambda(lambda x: x * 2),
"mul_5": RunnableLambda(lambda x: x * 5),
}
sequence.invoke(1) # {'mul_2': 4, 'mul_5': 10}
All Runnables expose additional methods that can be used to modify their
behavior (e.g., add a retry policy, add lifecycle listeners, make them
configurable, etc.).
These methods will work on any Runnable, including Runnable chains
constructed by composing other Runnables.
See the individual methods for details.
For example,
from langchain_core.runnables import RunnableLambda
import random
def add_one(x: int) -> int:
return x + 1
def buggy_double(y: int) -> int:
"""Buggy code that will fail 70% of the time"""
if random.random() > 0.3:
print('This code failed, and will probably be retried!') # noqa: T201
raise ValueError('Triggered buggy code')
return y * 2
sequence = (
RunnableLambda(add_one) |
RunnableLambda(buggy_double).with_retry( # Retry on failure
stop_after_attempt=10,
wait_exponential_jitter=False
)
)
print(sequence.input_schema.model_json_schema()) # Show inferred input schema
print(sequence.output_schema.model_json_schema()) # Show inferred output schema
print(sequence.invoke(2)) # invoke the sequence (note the retry above!!)
As the chains get longer, it can be useful to be able to see intermediate results to debug and trace the chain.
You can set the global debug flag to True to enable debug output for all chains:
from langchain_core.globals import set_debug
set_debug(True)
Alternatively, you can pass existing or custom callbacks to any given chain:
from langchain_core.tracers import ConsoleCallbackHandler
chain.invoke(..., config={"callbacks": [ConsoleCallbackHandler()]})
For a UI (and much more) checkout LangSmith.
Runnable that can be serialized to JSON.
Sequence of Runnable objects, where the output of one is the input of the next.
RunnableSequence is the most important composition operator in LangChain
as it is used in virtually every chain.
A RunnableSequence can be instantiated directly or more commonly by using the
| operator where either the left or right operands (or both) must be a
Runnable.
Any RunnableSequence automatically supports sync, async, batch.
The default implementations of batch and abatch utilize threadpools and
asyncio gather and will be faster than naive invocation of invoke or ainvoke
for IO bound Runnables.
Batching is implemented by invoking the batch method on each component of the
RunnableSequence in order.
A RunnableSequence preserves the streaming properties of its components, so if
all components of the sequence implement a transform method -- which
is the method that implements the logic to map a streaming input to a streaming
output -- then the sequence will be able to stream input to output!
If any component of the sequence does not implement transform then the streaming will only begin after this component is run. If there are multiple blocking components, streaming begins after the last one.
RunnableLambdas do not support transform by default! So if you need to
use a RunnableLambdas be careful about where you place them in a
RunnableSequence (if you need to use the stream/astream methods).
If you need arbitrary logic and need streaming, you can subclass
Runnable, and implement transform for whatever logic you need.
Here is a simple example that uses simple functions to illustrate the use of
RunnableSequence:
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1 | runnable_2
# Or equivalently:
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
Here's an example that uses streams JSON output generated by an LLM:
from langchain_core.output_parsers.json import SimpleJsonOutputParser
from langchain_openai import ChatOpenAI
prompt = PromptTemplate.from_template(
"In JSON format, give me a list of {topic} and their "
"corresponding names in French, Spanish and in a "
"Cat Language."
)
model = ChatOpenAI()
chain = prompt | model | SimpleJsonOutputParser()
async for chunk in chain.astream({"topic": "colors"}):
print("-") # noqa: T201
print(chunk, sep="", flush=True) # noqa: T201Runnable that runs a mapping of Runnables in parallel.
Returns a mapping of their outputs.
RunnableParallel is one of the two main composition primitives,
alongside RunnableSequence. It invokes Runnables concurrently, providing the
same input to each.
A RunnableParallel can be instantiated directly or by using a dict literal
within a sequence.
Here is a simple example that uses functions to illustrate the use of
RunnableParallel:
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
def mul_three(x: int) -> int:
return x * 3
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
runnable_3 = RunnableLambda(mul_three)
sequence = runnable_1 | { # this dict is coerced to a RunnableParallel
"mul_two": runnable_2,
"mul_three": runnable_3,
}
# Or equivalently:
# sequence = runnable_1 | RunnableParallel(
# {"mul_two": runnable_2, "mul_three": runnable_3}
# )
# Also equivalently:
# sequence = runnable_1 | RunnableParallel(
# mul_two=runnable_2,
# mul_three=runnable_3,
# )
sequence.invoke(1)
await sequence.ainvoke(1)
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
RunnableParallel makes it easy to run Runnables in parallel. In the below
example, we simultaneously stream output from two different Runnable objects:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
joke_chain = (
ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
)
poem_chain = (
ChatPromptTemplate.from_template("write a 2-line poem about {topic}")
| model
)
runnable = RunnableParallel(joke=joke_chain, poem=poem_chain)
# Display stream
output = {key: "" for key, _ in runnable.output_schema()}
for chunk in runnable.stream({"topic": "bear"}):
for key in chunk:
output[key] = output[key] + chunk[key].content
print(output) # noqa: T201Runnable that runs a generator function.
RunnableGenerators can be instantiated directly or by using a generator within
a sequence.
RunnableGenerators can be used to implement custom behavior, such as custom
output parsers, while preserving streaming capabilities. Given a generator function
with a signature Iterator[A] -> Iterator[B], wrapping it in a
RunnableGenerator allows it to emit output chunks as soon as they are streamed
in from the previous step.
If a generator function has a signature A -> Iterator[B], such that it
requires its input from the previous step to be completed before emitting chunks
(e.g., most LLMs need the entire prompt available to start generating), it can
instead be wrapped in a RunnableLambda.
Here is an example to show the basic mechanics of a RunnableGenerator:
from typing import Any, AsyncIterator, Iterator
from langchain_core.runnables import RunnableGenerator
def gen(input: Iterator[Any]) -> Iterator[str]:
for token in ["Have", " a", " nice", " day"]:
yield token
runnable = RunnableGenerator(gen)
runnable.invoke(None) # "Have a nice day"
list(runnable.stream(None)) # ["Have", " a", " nice", " day"]
runnable.batch([None, None]) # ["Have a nice day", "Have a nice day"]
# Async version:
async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]:
for token in ["Have", " a", " nice", " day"]:
yield token
runnable = RunnableGenerator(agen)
await runnable.ainvoke(None) # "Have a nice day"
[p async for p in runnable.astream(None)] # ["Have", " a", " nice", " day"]
RunnableGenerator makes it easy to implement custom behavior within a streaming
context. Below we show an example:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableGenerator, RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI()
chant_chain = (
ChatPromptTemplate.from_template("Give me a 3 word chant about {topic}")
| model
| StrOutputParser()
)
def character_generator(input: Iterator[str]) -> Iterator[str]:
for token in input:
if "," in token or "." in token:
yield "👏" + token
else:
yield token
runnable = chant_chain | character_generator
assert type(runnable.last) is RunnableGenerator
"".join(runnable.stream({"topic": "waste"})) # Reduce👏, Reuse👏, Recycle👏.
# Note that RunnableLambda can be used to delay streaming of one step in a
# sequence until the previous step is finished:
def reverse_generator(input: str) -> Iterator[str]:
# Yield characters of input in reverse order.
for character in input[::-1]:
yield character
runnable = chant_chain | RunnableLambda(reverse_generator)
"".join(runnable.stream({"topic": "waste"})) # ".elcycer ,esuer ,ecudeR"RunnableLambda converts a python callable into a Runnable.
Wrapping a callable in a RunnableLambda makes the callable usable
within either a sync or async context.
RunnableLambda can be composed as any other Runnable and provides
seamless integration with LangChain tracing.
RunnableLambda is best suited for code that does not need to support
streaming. If you need to support streaming (i.e., be able to operate
on chunks of inputs and yield chunks of outputs), use RunnableGenerator
instead.
Note that if a RunnableLambda returns an instance of Runnable, that
instance is invoked (or streamed) during execution.
RunnableEachBase class.
Runnable that calls another Runnable for each element of the input sequence.
Use only if creating a new RunnableEach subclass with different __init__
args.
See documentation for RunnableEach for more details.
RunnableEach class.
Runnable that calls another Runnable for each element of the input sequence.
It allows you to call multiple inputs with the bounded Runnable.
RunnableEach makes it easy to run multiple inputs for the Runnable.
In the below example, we associate and run three inputs
with a Runnable:
from langchain_core.runnables.base import RunnableEach
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template("Tell me a short joke about
{topic}")
model = ChatOpenAI()
output_parser = StrOutputParser()
runnable = prompt | model | output_parser
runnable_each = RunnableEach(bound=runnable)
output = runnable_each.invoke([{'topic':'Computer Science'},
{'topic':'Art'},
{'topic':'Biology'}])
print(output) # noqa: T201
Runnable that delegates calls to another Runnable with a set of **kwargs.
Use only if creating a new RunnableBinding subclass with different __init__
args.
See documentation for RunnableBinding for more details.
Wrap a Runnable with additional functionality.
A RunnableBinding can be thought of as a "runnable decorator" that
preserves the essential features of Runnable; i.e., batching, streaming,
and async support, while adding additional functionality.
Any class that inherits from Runnable can be bound to a RunnableBinding.
Runnables expose a standard set of methods for creating RunnableBindings
or sub-classes of RunnableBindings (e.g., RunnableRetry,
RunnableWithFallbacks) that add additional functionality.
These methods include:
bind: Bind kwargs to pass to the underlying Runnable when running it.with_config: Bind config to pass to the underlying Runnable when running
it.with_listeners: Bind lifecycle listeners to the underlying Runnable.with_types: Override the input and output types of the underlying
Runnable.with_retry: Bind a retry policy to the underlying Runnable.with_fallbacks: Bind a fallback policy to the underlying Runnable.Example:
bind: Bind kwargs to pass to the underlying Runnable when running it.
# Create a Runnable binding that invokes the chat model with the
# additional kwarg `stop=['-']` when running it.
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
model.invoke('Say "Parrot-MAGIC"', stop=["-"]) # Should return `Parrot`
# Using it the easy way via `bind` method which returns a new
# RunnableBinding
runnable_binding = model.bind(stop=["-"])
runnable_binding.invoke('Say "Parrot-MAGIC"') # Should return `Parrot`
Can also be done by instantiating a RunnableBinding directly (not
recommended):
from langchain_core.runnables import RunnableBinding
runnable_binding = RunnableBinding(
bound=model,
kwargs={"stop": ["-"]}, # <-- Note the additional kwargs
)
runnable_binding.invoke('Say "Parrot-MAGIC"') # Should return `Parrot`