Language Models¶
langchain_core.language_models
¶
Language models.
LangChain has two main classes to work with language models: chat models and "old-fashioned" LLMs.
Chat models
Language models that use a sequence of messages as inputs and return chat messages as outputs (as opposed to using plain text). Chat models support the assignment of distinct roles to conversation messages, helping to distinguish messages from the AI, users, and instructions such as system messages.
The key abstraction for chat models is BaseChatModel
. Implementations
should inherit from this class.
See existing chat model integrations.
LLMs
Language models that takes a string as input and returns a string. These are traditionally older models (newer models generally are chat models).
Although the underlying models are string in, string out, the LangChain wrappers also allow these models to take messages as input. This gives them the same interface as chat models. When messages are passed in as input, they will be formatted into a string under the hood before being passed to the underlying model.
langchain_core.language_models.chat_models
¶
Chat models for conversational AI.
BaseChatModel
¶
Bases: BaseLanguageModel[AIMessage]
, ABC
Base class for chat models.
Key imperative methods
Methods that actually call the underlying model.
This table provides a brief overview of the main imperative methods. Please see the base Runnable
reference for full documentation.
Method | Input | Output | Description |
---|---|---|---|
invoke |
str | list[dict | tuple | BaseMessage] | PromptValue |
BaseMessage |
A single chat model call. |
ainvoke |
''' |
BaseMessage |
Defaults to running invoke in an async executor. |
stream |
''' |
Iterator[BaseMessageChunk] |
Defaults to yielding output of invoke . |
astream |
''' |
AsyncIterator[BaseMessageChunk] |
Defaults to yielding output of ainvoke . |
astream_events |
''' |
AsyncIterator[StreamEvent] |
Event types: on_chat_model_start , on_chat_model_stream , on_chat_model_end . |
batch |
list['''] |
list[BaseMessage] |
Defaults to running invoke in concurrent threads. |
abatch |
list['''] |
list[BaseMessage] |
Defaults to running ainvoke in concurrent threads. |
batch_as_completed |
list['''] |
Iterator[tuple[int, Union[BaseMessage, Exception]]] |
Defaults to running invoke in concurrent threads. |
abatch_as_completed |
list['''] |
AsyncIterator[tuple[int, Union[BaseMessage, Exception]]] |
Defaults to running ainvoke in concurrent threads. |
Key declarative methods
Methods for creating another Runnable
using the chat model.
This table provides a brief overview of the main declarative methods. Please see the reference for each method for full documentation.
Method | Description |
---|---|
bind_tools |
Create chat model that can call tools. |
with_structured_output |
Create wrapper that structures model output using schema. |
with_retry |
Create wrapper that retries model calls on failure. |
with_fallbacks |
Create wrapper that falls back to other models on failure. |
configurable_fields |
Specify init args of the model that can be configured at runtime via the RunnableConfig . |
configurable_alternatives |
Specify alternative models which can be swapped in at runtime via the RunnableConfig . |
Creating custom chat model
Custom chat model implementations should inherit from this class. Please reference the table below for information about which methods and properties are required or optional for implementations.
Method/Property | Description | Required |
---|---|---|
_generate |
Use to generate a chat result from a prompt | Required |
_llm_type (property) |
Used to uniquely identify the type of the model. Used for logging. | Required |
_identifying_params (property) |
Represent model parameterization for tracing purposes. | Optional |
_stream |
Use to implement streaming | Optional |
_agenerate |
Use to implement a native async method | Optional |
_astream |
Use to implement async version of _stream |
Optional |
METHOD | DESCRIPTION |
---|---|
invoke |
Transform a single input into an output. |
ainvoke |
Transform a single input into an output. |
stream |
Default implementation of |
astream |
Default implementation of |
generate |
Pass a sequence of prompts to the model and return model generations. |
agenerate |
Asynchronously pass a sequence of prompts to a model and return generations. |
generate_prompt |
Pass a sequence of prompts to the model and return model generations. |
agenerate_prompt |
Asynchronously pass a sequence of prompts and return model generations. |
dict |
Return a dictionary of the LLM. |
bind_tools |
Bind tools to the model. |
with_structured_output |
Model wrapper that returns outputs formatted to match the given schema. |
get_name |
Get the name of the |
get_input_schema |
Get a Pydantic model that can be used to validate input to the |
get_input_jsonschema |
Get a JSON schema that represents the input to the |
get_output_schema |
Get a Pydantic model that can be used to validate output to the |
get_output_jsonschema |
Get a JSON schema that represents the output of the |
config_schema |
The type of config this |
get_config_jsonschema |
Get a JSON schema that represents the config of the |
get_graph |
Return a graph representation of this |
get_prompts |
Return a list of prompts used by this |
__or__ |
Runnable "or" operator. |
__ror__ |
Runnable "reverse-or" operator. |
pipe |
Pipe |
pick |
Pick keys from the output |
assign |
Assigns new fields to the |
batch |
Default implementation runs invoke in parallel using a thread pool executor. |
batch_as_completed |
Run |
abatch |
Default implementation runs |
abatch_as_completed |
Run |
astream_log |
Stream all output from a |
astream_events |
Generate a stream of events. |
transform |
Transform inputs to outputs. |
atransform |
Transform inputs to outputs. |
bind |
Bind arguments to a |
with_config |
Bind config to a |
with_listeners |
Bind lifecycle listeners to a |
with_alisteners |
Bind async lifecycle listeners to a |
with_types |
Bind input and output types to a |
with_retry |
Create a new |
map |
Return a new |
with_fallbacks |
Add fallbacks to a |
as_tool |
Create a |
__init__ |
|
is_lc_serializable |
Is this class serializable? |
get_lc_namespace |
Get the namespace of the LangChain object. |
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the |
to_json_not_implemented |
Serialize a "not implemented" object. |
configurable_fields |
Configure particular |
configurable_alternatives |
Configure alternatives for |
set_verbose |
If verbose is |
get_token_ids |
Return the ordered ids of the tokens in a text. |
get_num_tokens |
Get the number of tokens present in the text. |
get_num_tokens_from_messages |
Get the number of tokens in the messages. |
rate_limiter
class-attribute
instance-attribute
¶
rate_limiter: BaseRateLimiter | None = Field(default=None, exclude=True)
An optional rate limiter to use for limiting the number of requests.
disable_streaming
class-attribute
instance-attribute
¶
Whether to disable streaming for this model.
If streaming is bypassed, then stream
/astream
/astream_events
will
defer to invoke
/ainvoke
.
- If
True
, will always bypass streaming case. - If
'tool_calling'
, will bypass streaming case only when the model is called with atools
keyword argument. In other words, LangChain will automatically switch to non-streaming behavior (invoke
) only when the tools argument is provided. This offers the best of both worlds. - If
False
(Default), will always use streaming case if available.
The main reason for this flag is that code might be written using stream
and
a user may want to swap out a given model for another model whose the implementation
does not properly support streaming.
output_version
class-attribute
instance-attribute
¶
Version of AIMessage
output format to store in message content.
AIMessage.content_blocks
will lazily parse the contents of content
into a
standard format. This flag can be used to additionally store the standard format
in message content, e.g., for serialization purposes.
Supported values:
'v0'
: provider-specific format in content (can lazily-parse withcontent_blocks
)'v1'
: standardized format in content (consistent withcontent_blocks
)
Partner packages (e.g.,
langchain-openai
) can also use this
field to roll out new content formats in a backward-compatible way.
Added in version 1.0
name
class-attribute
instance-attribute
¶
name: str | None = None
The name of the Runnable
. Used for debugging and tracing.
input_schema
property
¶
The type of input this Runnable
accepts specified as a Pydantic model.
output_schema
property
¶
Output schema.
The type of output this Runnable
produces specified as a Pydantic model.
config_specs
property
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable
.
lc_secrets
property
¶
A map of constructor argument names to secret ids.
For example, {"openai_api_key": "OPENAI_API_KEY"}
lc_attributes
property
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
Default is an empty dictionary.
cache
class-attribute
instance-attribute
¶
Whether to cache the response.
- If
True
, will use the global cache. - If
False
, will not use a cache - If
None
, will use the global cache if it's set, otherwise no cache. - If instance of
BaseCache
, will use the provided cache.
Caching is not currently supported for streaming methods of models.
verbose
class-attribute
instance-attribute
¶
Whether to print out response text.
callbacks
class-attribute
instance-attribute
¶
callbacks: Callbacks = Field(default=None, exclude=True)
Callbacks to add to the run trace.
tags
class-attribute
instance-attribute
¶
Tags to add to the run trace.
metadata
class-attribute
instance-attribute
¶
Metadata to add to the run trace.
custom_get_token_ids
class-attribute
instance-attribute
¶
Optional encoder to use for counting tokens.
invoke
¶
invoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any
) -> AIMessage
Transform a single input into an output.
PARAMETER | DESCRIPTION |
---|---|
|
The input to the
TYPE:
|
|
A config to use when invoking the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Output
|
The output of the |
ainvoke
async
¶
ainvoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any
) -> AIMessage
Transform a single input into an output.
PARAMETER | DESCRIPTION |
---|---|
|
The input to the
TYPE:
|
|
A config to use when invoking the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Output
|
The output of the |
stream
¶
stream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any
) -> Iterator[AIMessageChunk]
Default implementation of stream
, which calls invoke
.
Subclasses must override this method if they support streaming output.
PARAMETER | DESCRIPTION |
---|---|
|
The input to the
TYPE:
|
|
The config to use for the
TYPE:
|
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
Output
|
The output of the |
astream
async
¶
astream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any
) -> AsyncIterator[AIMessageChunk]
Default implementation of astream
, which calls ainvoke
.
Subclasses must override this method if they support streaming output.
PARAMETER | DESCRIPTION |
---|---|
|
The input to the
TYPE:
|
|
The config to use for the
TYPE:
|
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[Output]
|
The output of the |
generate
¶
generate(
messages: list[list[BaseMessage]],
stop: list[str] | None = None,
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
**kwargs: Any
) -> LLMResult
Pass a sequence of prompts to the model and return model generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
|
List of list of messages.
TYPE:
|
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
|
TYPE:
|
|
The tags to apply. |
|
The metadata to apply. |
|
The name of the run.
TYPE:
|
|
The ID of the run.
TYPE:
|
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
agenerate
async
¶
agenerate(
messages: list[list[BaseMessage]],
stop: list[str] | None = None,
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
**kwargs: Any
) -> LLMResult
Asynchronously pass a sequence of prompts to a model and return generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
|
List of list of messages.
TYPE:
|
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
|
TYPE:
|
|
The tags to apply. |
|
The metadata to apply. |
|
The name of the run.
TYPE:
|
|
The ID of the run.
TYPE:
|
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
generate_prompt
¶
generate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks = None,
**kwargs: Any
) -> LLMResult
Pass a sequence of prompts to the model and return model generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
|
List of
TYPE:
|
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
|
TYPE:
|
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
agenerate_prompt
async
¶
agenerate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks = None,
**kwargs: Any
) -> LLMResult
Asynchronously pass a sequence of prompts and return model generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
|
List of
TYPE:
|
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
|
TYPE:
|
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
bind_tools
¶
bind_tools(
tools: Sequence[Dict[str, Any] | type | Callable | BaseTool],
*,
tool_choice: str | None = None,
**kwargs: Any
) -> Runnable[LanguageModelInput, AIMessage]
with_structured_output
¶
with_structured_output(
schema: Dict | type, *, include_raw: bool = False, **kwargs: Any
) -> Runnable[LanguageModelInput, Dict | BaseModel]
Model wrapper that returns outputs formatted to match the given schema.
PARAMETER | DESCRIPTION |
---|---|
|
The output schema. Can be passed in as:
If See |
|
If The final output is always a
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If there are any unsupported |
NotImplementedError
|
If the model does not implement
|
RETURNS | DESCRIPTION |
---|---|
Runnable[LanguageModelInput, Dict | BaseModel]
|
A If
|
Example: Pydantic schema (include_raw=False
):
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
model = ChatModel(model="model-name", temperature=0)
structured_model = model.with_structured_output(AnswerWithJustification)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> AnswerWithJustification(
# answer='They weigh the same',
# justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'
# )
Example: Pydantic schema (include_raw=True
):
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
model = ChatModel(model="model-name", temperature=0)
structured_model = model.with_structured_output(
AnswerWithJustification, include_raw=True
)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> {
# 'raw': AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Ao02pnFYXD6GN1yzc0uXPsvF', 'function': {'arguments': '{"answer":"They weigh the same.","justification":"Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ."}', 'name': 'AnswerWithJustification'}, 'type': 'function'}]}),
# 'parsed': AnswerWithJustification(answer='They weigh the same.', justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'),
# 'parsing_error': None
# }
Example: dict
schema (include_raw=False
):
from pydantic import BaseModel
from langchain_core.utils.function_calling import convert_to_openai_tool
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
dict_schema = convert_to_openai_tool(AnswerWithJustification)
model = ChatModel(model="model-name", temperature=0)
structured_model = model.with_structured_output(dict_schema)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> {
# 'answer': 'They weigh the same',
# 'justification': 'Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume and density of the two substances differ.'
# }
Behavior changed in 0.2.26
Added support for TypedDict class.
get_name
¶
get_input_schema
¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate input to the Runnable
.
Runnable
objects that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic input schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an input schema for a specific configuration.
PARAMETER | DESCRIPTION |
---|---|
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate input. |
get_input_jsonschema
¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable
.
PARAMETER | DESCRIPTION |
---|---|
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
dict[str, Any]
|
A JSON schema that represents the input to the |
Example
Added in version 0.3.0
get_output_schema
¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate output to the Runnable
.
Runnable
objects that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic output schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an output schema for a specific configuration.
PARAMETER | DESCRIPTION |
---|---|
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate output. |
get_output_jsonschema
¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable
.
PARAMETER | DESCRIPTION |
---|---|
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
dict[str, Any]
|
A JSON schema that represents the output of the |
Example
Added in version 0.3.0
config_schema
¶
The type of config this Runnable
accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives
methods.
PARAMETER | DESCRIPTION |
---|---|
|
A list of fields to include in the config schema. |
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate config. |
get_config_jsonschema
¶
get_graph
¶
get_graph(config: RunnableConfig | None = None) -> Graph
Return a graph representation of this Runnable
.
get_prompts
¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable
.
__or__
¶
__or__(
other: (
Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any]
),
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
PARAMETER | DESCRIPTION |
---|---|
|
Another
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Other]
|
A new |
__ror__
¶
__ror__(
other: (
Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any]
),
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
PARAMETER | DESCRIPTION |
---|---|
|
Another
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Other, Output]
|
A new |
pipe
¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable
objects.
Compose this Runnable
with Runnable
-like objects to make a
RunnableSequence
.
Equivalent to RunnableSequence(self, *others)
or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
PARAMETER | DESCRIPTION |
---|---|
|
Other
TYPE:
|
|
An optional name for the resulting
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Other]
|
A new |
pick
¶
Pick keys from the output dict
of this Runnable
.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
PARAMETER | DESCRIPTION |
---|---|
|
A key or list of keys to pick from the output dict. |
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Any, Any]
|
a new |
assign
¶
assign(
**kwargs: (
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]]
),
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict
output of this Runnable
.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
PARAMETER | DESCRIPTION |
---|---|
|
A mapping of keys to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Any, Any]
|
A new |
batch
¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
PARAMETER | DESCRIPTION |
---|---|
|
A list of inputs to the
TYPE:
|
|
A config to use when invoking the
TYPE:
|
|
Whether to return exceptions instead of raising them.
TYPE:
|
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Output]
|
A list of outputs from the |
batch_as_completed
¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> Iterator[tuple[int, Output | Exception]]
Run invoke
in parallel on a list of inputs.
Yields results as they complete.
PARAMETER | DESCRIPTION |
---|---|
|
A list of inputs to the
TYPE:
|
|
A config to use when invoking the
TYPE:
|
|
Whether to return exceptions instead of raising them.
TYPE:
|
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
tuple[int, Output | Exception]
|
Tuples of the index of the input and the output from the |
abatch
async
¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs ainvoke
in parallel using asyncio.gather
.
The default implementation of batch
works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
PARAMETER | DESCRIPTION |
---|---|
|
A list of inputs to the
TYPE:
|
|
A config to use when invoking the
TYPE:
|
|
Whether to return exceptions instead of raising them.
TYPE:
|
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Output]
|
A list of outputs from the |
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke
in parallel on a list of inputs.
Yields results as they complete.
PARAMETER | DESCRIPTION |
---|---|
|
A list of inputs to the
TYPE:
|
|
A config to use when invoking the
TYPE:
|
|
Whether to return exceptions instead of raising them.
TYPE:
|
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
A tuple of the index of the input and the output from the |
astream_log
async
¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable
, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
PARAMETER | DESCRIPTION |
---|---|
|
The input to the
TYPE:
|
|
The config to use for the
TYPE:
|
|
Whether to yield diffs between each step or the current state.
TYPE:
|
|
Whether to yield the
TYPE:
|
|
Only include logs with these names. |
|
Only include logs with these types. |
|
Only include logs with these tags. |
|
Exclude logs with these names. |
|
Exclude logs with these types. |
|
Exclude logs with these tags. |
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
A |
astream_events
async
¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent
that provide real-time information
about the progress of the Runnable
, including StreamEvent
from intermediate
results.
A StreamEvent
is a dictionary with the following schema:
event
: Event names are of the format:on_[runnable_type]_(start|stream|end)
.name
: The name of theRunnable
that generated the event.run_id
: Randomly generated ID associated with the given execution of theRunnable
that emitted the event. A childRunnable
that gets invoked as part of the execution of a parentRunnable
is assigned its own unique ID.parent_ids
: The IDs of the parent runnables that generated the event. The rootRunnable
will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags
: The tags of theRunnable
that generated the event.metadata
: The metadata of theRunnable
that generated the event.data
: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
event | name | chunk | input | output |
---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'Hello' |
||
on_llm_end |
'[model name]' |
'Hello human!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
Attribute | Type | Description |
---|---|---|
name |
str |
A user defined name for the event. |
data |
Any |
The data associated with the event. This can be anything, though we suggest making it JSON serializable. |
Here are declarations associated with the standard events shown above:
format_docs
:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool
:
prompt
:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
PARAMETER | DESCRIPTION |
---|---|
|
The input to the
TYPE:
|
|
The config to use for the
TYPE:
|
|
The version of the schema to use either
TYPE:
|
|
Only include events from |
|
Only include events from |
|
Only include events from |
|
Exclude events from |
|
Exclude events from |
|
Exclude events from |
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[StreamEvent]
|
An async stream of |
RAISES | DESCRIPTION |
---|---|
NotImplementedError
|
If the version is not |
transform
¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream
.
Subclasses must override this method if they can start producing output while input is still being generated.
PARAMETER | DESCRIPTION |
---|---|
|
An iterator of inputs to the
TYPE:
|
|
The config to use for the
TYPE:
|
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
Output
|
The output of the |
atransform
async
¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream
.
Subclasses must override this method if they can start producing output while input is still being generated.
PARAMETER | DESCRIPTION |
---|---|
|
An async iterator of inputs to the
TYPE:
|
|
The config to use for the
TYPE:
|
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[Output]
|
The output of the |
bind
¶
Bind arguments to a Runnable
, returning a new Runnable
.
Useful when a Runnable
in a chain requires an argument that is not
in the output of the previous Runnable
or included in the user input.
PARAMETER | DESCRIPTION |
---|---|
|
The arguments to bind to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable
, returning a new Runnable
.
PARAMETER | DESCRIPTION |
---|---|
|
The config to bind to the
TYPE:
|
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
with_listeners
¶
with_listeners(
*,
on_start: (
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
) = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: (
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
) = None
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable
, returning a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
PARAMETER | DESCRIPTION |
---|---|
|
Called before the
TYPE:
|
|
Called after the
TYPE:
|
|
Called if the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable
.
Returns a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
PARAMETER | DESCRIPTION |
---|---|
|
Called asynchronously before the
TYPE:
|
|
Called asynchronously after the
TYPE:
|
|
Called asynchronously if the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable
, returning a new Runnable
.
PARAMETER | DESCRIPTION |
---|---|
|
The input type to bind to the
TYPE:
|
|
The output type to bind to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3
) -> Runnable[Input, Output]
Create a new Runnable
that retries the original Runnable
on exceptions.
PARAMETER | DESCRIPTION |
---|---|
|
A tuple of exception types to retry on.
TYPE:
|
|
Whether to add jitter to the wait time between retries.
TYPE:
|
|
The maximum number of attempts to make before giving up.
TYPE:
|
|
Parameters for
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new Runnable that retries the original Runnable on exceptions. |
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map
¶
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable
, returning a new Runnable
.
The new Runnable
will try the original Runnable
, and then each fallback
in order, upon failures.
PARAMETER | DESCRIPTION |
---|---|
|
A sequence of runnables to try if the original |
|
A tuple of exception types to handle.
TYPE:
|
|
If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
PARAMETER | DESCRIPTION |
---|---|
|
A sequence of runnables to try if the original |
|
A tuple of exception types to handle.
TYPE:
|
|
If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
as_tool
¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None
) -> BaseTool
Create a BaseTool
from a Runnable
.
as_tool
will instantiate a BaseTool
with a name, description, and
args_schema
from a Runnable
. Where possible, schemas are inferred
from runnable.get_input_schema
. Alternatively (e.g., if the
Runnable
takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema
. You can also
pass arg_types
to just specify the required arguments and their types.
PARAMETER | DESCRIPTION |
---|---|
|
The schema for the tool. |
|
The name of the tool.
TYPE:
|
|
The description of the tool.
TYPE:
|
|
A dictionary of argument names to types. |
RETURNS | DESCRIPTION |
---|---|
BaseTool
|
A |
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via args_schema
:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via arg_types
:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
from langchain_core.runnables import RunnableLambda
def f(x: str) -> str:
return x + "a"
def g(x: str) -> str:
return x + "z"
runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")
Added in version 0.2.14
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
Is this class serializable?
By design, even if a class inherits from Serializable
, it is not serializable
by default. This is to prevent accidental serialization of objects that should
not be serialized.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the class is serializable. Default is |
get_lc_namespace
classmethod
¶
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
For example, for the class langchain.llms.openai.OpenAI
, the id is
["langchain", "llms", "openai", "OpenAI"]
.
to_json
¶
Serialize the Runnable
to JSON.
RETURNS | DESCRIPTION |
---|---|
SerializedConstructor | SerializedNotImplemented
|
A JSON-serializable representation of the |
to_json_not_implemented
¶
Serialize a "not implemented" object.
RETURNS | DESCRIPTION |
---|---|
SerializedNotImplemented
|
|
configurable_fields
¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
Configure particular Runnable
fields at runtime.
PARAMETER | DESCRIPTION |
---|---|
|
A dictionary of
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If a configuration key is not found in the |
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives
¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnable
objects that can be set at runtime.
PARAMETER | DESCRIPTION |
---|---|
|
The
TYPE:
|
|
The default key to use if no alternative is selected.
TYPE:
|
|
Whether to prefix the keys with the
TYPE:
|
|
A dictionary of keys to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-3-7-sonnet-20250219"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
set_verbose
¶
get_token_ids
¶
get_num_tokens
¶
get_num_tokens_from_messages
¶
Get the number of tokens in the messages.
Useful for checking if an input fits in a model's context window.
Note
The base implementation of get_num_tokens_from_messages
ignores tool
schemas.
PARAMETER | DESCRIPTION |
---|---|
|
The message inputs to tokenize.
TYPE:
|
|
If provided, sequence of dict,
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The sum of the number of tokens across the messages. |
langchain_core.messages
¶
Messages are objects used in prompts and chat conversations.
BaseMessage
¶
Bases: Serializable
Base abstract message class.
Messages are the inputs and outputs of a chat model.
METHOD | DESCRIPTION |
---|---|
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the object to JSON. |
to_json_not_implemented |
Serialize a "not implemented" object. |
__init__ |
Initialize a |
is_lc_serializable |
|
get_lc_namespace |
Get the namespace of the LangChain object. |
__add__ |
Concatenate this message with another message. |
pretty_repr |
Get a pretty representation of the message. |
pretty_print |
Print a pretty representation of the message. |
lc_secrets
property
¶
A map of constructor argument names to secret ids.
For example, {"openai_api_key": "OPENAI_API_KEY"}
lc_attributes
property
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
Default is an empty dictionary.
additional_kwargs
class-attribute
instance-attribute
¶
Reserved for additional payload data associated with the message.
For example, for a message from an AI, this could include tool calls as encoded by the model provider.
response_metadata
class-attribute
instance-attribute
¶
Examples: response headers, logprobs, token counts, model name.
type
instance-attribute
¶
type: str
The type of the message. Must be a string that is unique to the message type.
The purpose of this field is to allow for easy identification of the message type when deserializing messages.
name
class-attribute
instance-attribute
¶
name: str | None = None
An optional name for the message.
This can be used to provide a human-readable name for the message.
Usage of this field is optional, and whether it's used or not is up to the model implementation.
id
class-attribute
instance-attribute
¶
An optional unique identifier for the message.
This should ideally be provided by the provider/model which created the message.
content_blocks
property
¶
content_blocks: list[ContentBlock]
Load content blocks from the message content.
Added in version 1.0.0
text
property
¶
Get the text content of the message as a string.
Can be used as both property (message.text
) and method (message.text()
).
Deprecated
As of langchain-core
1.0.0, calling .text()
as a method is deprecated.
Use .text
as a property instead. This method will be removed in 2.0.0.
RETURNS | DESCRIPTION |
---|---|
TextAccessor
|
The text content of the message. |
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
For example, for the class langchain.llms.openai.OpenAI
, the id is
["langchain", "llms", "openai", "OpenAI"]
.
to_json
¶
Serialize the object to JSON.
RAISES | DESCRIPTION |
---|---|
ValueError
|
If the class has deprecated attributes. |
RETURNS | DESCRIPTION |
---|---|
SerializedConstructor | SerializedNotImplemented
|
A json serializable object or a |
to_json_not_implemented
¶
Serialize a "not implemented" object.
RETURNS | DESCRIPTION |
---|---|
SerializedNotImplemented
|
|
__init__
¶
__init__(
content: str | list[str | dict] | None = None,
content_blocks: list[ContentBlock] | None = None,
**kwargs: Any
) -> None
Initialize a BaseMessage
.
Specify content
as positional arg or content_blocks
for typing.
PARAMETER | DESCRIPTION |
---|---|
|
The contents of the message. |
|
Typed standard content.
TYPE:
|
|
Additional arguments to pass to the parent class.
TYPE:
|
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
BaseMessage
is serializable.
RETURNS | DESCRIPTION |
---|---|
bool
|
True |
get_lc_namespace
classmethod
¶
__add__
¶
__add__(other: Any) -> ChatPromptTemplate
Concatenate this message with another message.
PARAMETER | DESCRIPTION |
---|---|
|
Another message to concatenate with this one.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
ChatPromptTemplate
|
A ChatPromptTemplate containing both messages. |
pretty_repr
¶
BaseMessageChunk
¶
Bases: BaseMessage
Message chunk, which can be concatenated with other Message chunks.
METHOD | DESCRIPTION |
---|---|
__init__ |
Initialize a |
is_lc_serializable |
|
get_lc_namespace |
Get the namespace of the LangChain object. |
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the object to JSON. |
to_json_not_implemented |
Serialize a "not implemented" object. |
pretty_repr |
Get a pretty representation of the message. |
pretty_print |
Print a pretty representation of the message. |
__add__ |
Message chunks support concatenation with other message chunks. |
lc_secrets
property
¶
A map of constructor argument names to secret ids.
For example, {"openai_api_key": "OPENAI_API_KEY"}
lc_attributes
property
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
Default is an empty dictionary.
additional_kwargs
class-attribute
instance-attribute
¶
Reserved for additional payload data associated with the message.
For example, for a message from an AI, this could include tool calls as encoded by the model provider.
response_metadata
class-attribute
instance-attribute
¶
Examples: response headers, logprobs, token counts, model name.
type
instance-attribute
¶
type: str
The type of the message. Must be a string that is unique to the message type.
The purpose of this field is to allow for easy identification of the message type when deserializing messages.
name
class-attribute
instance-attribute
¶
name: str | None = None
An optional name for the message.
This can be used to provide a human-readable name for the message.
Usage of this field is optional, and whether it's used or not is up to the model implementation.
id
class-attribute
instance-attribute
¶
An optional unique identifier for the message.
This should ideally be provided by the provider/model which created the message.
content_blocks
property
¶
content_blocks: list[ContentBlock]
Load content blocks from the message content.
Added in version 1.0.0
text
property
¶
Get the text content of the message as a string.
Can be used as both property (message.text
) and method (message.text()
).
Deprecated
As of langchain-core
1.0.0, calling .text()
as a method is deprecated.
Use .text
as a property instead. This method will be removed in 2.0.0.
RETURNS | DESCRIPTION |
---|---|
TextAccessor
|
The text content of the message. |
__init__
¶
__init__(
content: str | list[str | dict] | None = None,
content_blocks: list[ContentBlock] | None = None,
**kwargs: Any
) -> None
Initialize a BaseMessage
.
Specify content
as positional arg or content_blocks
for typing.
PARAMETER | DESCRIPTION |
---|---|
|
The contents of the message. |
|
Typed standard content.
TYPE:
|
|
Additional arguments to pass to the parent class.
TYPE:
|
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
BaseMessage
is serializable.
RETURNS | DESCRIPTION |
---|---|
bool
|
True |
get_lc_namespace
classmethod
¶
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
For example, for the class langchain.llms.openai.OpenAI
, the id is
["langchain", "llms", "openai", "OpenAI"]
.
to_json
¶
Serialize the object to JSON.
RAISES | DESCRIPTION |
---|---|
ValueError
|
If the class has deprecated attributes. |
RETURNS | DESCRIPTION |
---|---|
SerializedConstructor | SerializedNotImplemented
|
A json serializable object or a |
to_json_not_implemented
¶
Serialize a "not implemented" object.
RETURNS | DESCRIPTION |
---|---|
SerializedNotImplemented
|
|
pretty_repr
¶
__add__
¶
__add__(other: Any) -> BaseMessageChunk
Message chunks support concatenation with other message chunks.
This functionality is useful to combine message chunks yielded from a streaming model into a complete message.
PARAMETER | DESCRIPTION |
---|---|
|
Another message chunk to concatenate with this one.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
BaseMessageChunk
|
A new message chunk that is the concatenation of this message chunk |
BaseMessageChunk
|
and the other message chunk. |
RAISES | DESCRIPTION |
---|---|
TypeError
|
If the other object is not a message chunk. |
For example,
AIMessageChunk(content="Hello") + AIMessageChunk(content=" World")
will give AIMessageChunk(content="Hello World")
langchain_core.language_models.fake_chat_models
¶
Fake chat model for testing purposes.
GenericFakeChatModel
¶
Bases: BaseChatModel
Generic fake chat model that can be used to test the chat model interface.
- Chat model should be usable in both sync and async tests
- Invokes
on_llm_new_token
to allow for testing of callback related code for new tokens. - Includes logic to break messages into message chunk to facilitate testing of streaming.
METHOD | DESCRIPTION |
---|---|
get_name |
Get the name of the |
get_input_schema |
Get a Pydantic model that can be used to validate input to the |
get_input_jsonschema |
Get a JSON schema that represents the input to the |
get_output_schema |
Get a Pydantic model that can be used to validate output to the |
get_output_jsonschema |
Get a JSON schema that represents the output of the |
config_schema |
The type of config this |
get_config_jsonschema |
Get a JSON schema that represents the config of the |
get_graph |
Return a graph representation of this |
get_prompts |
Return a list of prompts used by this |
__or__ |
Runnable "or" operator. |
__ror__ |
Runnable "reverse-or" operator. |
pipe |
Pipe |
pick |
Pick keys from the output |
assign |
Assigns new fields to the |
invoke |
Transform a single input into an output. |
ainvoke |
Transform a single input into an output. |
batch |
Default implementation runs invoke in parallel using a thread pool executor. |
batch_as_completed |
Run |
abatch |
Default implementation runs |
abatch_as_completed |
Run |
stream |
Default implementation of |
astream |
Default implementation of |
astream_log |
Stream all output from a |
astream_events |
Generate a stream of events. |
transform |
Transform inputs to outputs. |
atransform |
Transform inputs to outputs. |
bind |
Bind arguments to a |
with_config |
Bind config to a |
with_listeners |
Bind lifecycle listeners to a |
with_alisteners |
Bind async lifecycle listeners to a |
with_types |
Bind input and output types to a |
with_retry |
Create a new |
map |
Return a new |
with_fallbacks |
Add fallbacks to a |
as_tool |
Create a |
__init__ |
|
is_lc_serializable |
Is this class serializable? |
get_lc_namespace |
Get the namespace of the LangChain object. |
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the |
to_json_not_implemented |
Serialize a "not implemented" object. |
configurable_fields |
Configure particular |
configurable_alternatives |
Configure alternatives for |
set_verbose |
If verbose is |
generate_prompt |
Pass a sequence of prompts to the model and return model generations. |
agenerate_prompt |
Asynchronously pass a sequence of prompts and return model generations. |
with_structured_output |
Model wrapper that returns outputs formatted to match the given schema. |
get_token_ids |
Return the ordered ids of the tokens in a text. |
get_num_tokens |
Get the number of tokens present in the text. |
get_num_tokens_from_messages |
Get the number of tokens in the messages. |
generate |
Pass a sequence of prompts to the model and return model generations. |
agenerate |
Asynchronously pass a sequence of prompts to a model and return generations. |
dict |
Return a dictionary of the LLM. |
bind_tools |
Bind tools to the model. |
messages
instance-attribute
¶
Get an iterator over messages.
This can be expanded to accept other types like Callables / dicts / strings to make the interface more generic if needed.
Note
if you want to pass a list, you can use iter
to convert it to an iterator.
Warning
Streaming is not implemented yet. We should try to implement it in the future by delegating to invoke and then breaking the resulting output into message chunks.
name
class-attribute
instance-attribute
¶
name: str | None = None
The name of the Runnable
. Used for debugging and tracing.
input_schema
property
¶
The type of input this Runnable
accepts specified as a Pydantic model.
output_schema
property
¶
Output schema.
The type of output this Runnable
produces specified as a Pydantic model.
config_specs
property
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable
.
lc_secrets
property
¶
A map of constructor argument names to secret ids.
For example, {"openai_api_key": "OPENAI_API_KEY"}
lc_attributes
property
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
Default is an empty dictionary.
cache
class-attribute
instance-attribute
¶
Whether to cache the response.
- If
True
, will use the global cache. - If
False
, will not use a cache - If
None
, will use the global cache if it's set, otherwise no cache. - If instance of
BaseCache
, will use the provided cache.
Caching is not currently supported for streaming methods of models.
verbose
class-attribute
instance-attribute
¶
Whether to print out response text.
callbacks
class-attribute
instance-attribute
¶
callbacks: Callbacks = Field(default=None, exclude=True)
Callbacks to add to the run trace.
tags
class-attribute
instance-attribute
¶
Tags to add to the run trace.
metadata
class-attribute
instance-attribute
¶
Metadata to add to the run trace.
custom_get_token_ids
class-attribute
instance-attribute
¶
Optional encoder to use for counting tokens.
rate_limiter
class-attribute
instance-attribute
¶
rate_limiter: BaseRateLimiter | None = Field(default=None, exclude=True)
An optional rate limiter to use for limiting the number of requests.
disable_streaming
class-attribute
instance-attribute
¶
Whether to disable streaming for this model.
If streaming is bypassed, then stream
/astream
/astream_events
will
defer to invoke
/ainvoke
.
- If
True
, will always bypass streaming case. - If
'tool_calling'
, will bypass streaming case only when the model is called with atools
keyword argument. In other words, LangChain will automatically switch to non-streaming behavior (invoke
) only when the tools argument is provided. This offers the best of both worlds. - If
False
(Default), will always use streaming case if available.
The main reason for this flag is that code might be written using stream
and
a user may want to swap out a given model for another model whose the implementation
does not properly support streaming.
output_version
class-attribute
instance-attribute
¶
Version of AIMessage
output format to store in message content.
AIMessage.content_blocks
will lazily parse the contents of content
into a
standard format. This flag can be used to additionally store the standard format
in message content, e.g., for serialization purposes.
Supported values:
'v0'
: provider-specific format in content (can lazily-parse withcontent_blocks
)'v1'
: standardized format in content (consistent withcontent_blocks
)
Partner packages (e.g.,
langchain-openai
) can also use this
field to roll out new content formats in a backward-compatible way.
Added in version 1.0
get_name
¶
get_input_schema
¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate input to the Runnable
.
Runnable
objects that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic input schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an input schema for a specific configuration.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate input. |
get_input_jsonschema
¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable
.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
dict[str, Any]
|
A JSON schema that represents the input to the |
Example
Added in version 0.3.0
get_output_schema
¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate output to the Runnable
.
Runnable
objects that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic output schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an output schema for a specific configuration.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate output. |
get_output_jsonschema
¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable
.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
dict[str, Any]
|
A JSON schema that represents the output of the |
Example
Added in version 0.3.0
config_schema
¶
The type of config this Runnable
accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives
methods.
PARAMETER | DESCRIPTION |
---|---|
include
|
A list of fields to include in the config schema. |
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate config. |
get_config_jsonschema
¶
get_graph
¶
get_graph(config: RunnableConfig | None = None) -> Graph
Return a graph representation of this Runnable
.
get_prompts
¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable
.
__or__
¶
__or__(
other: (
Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any]
),
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
PARAMETER | DESCRIPTION |
---|---|
other
|
Another
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Other]
|
A new |
__ror__
¶
__ror__(
other: (
Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any]
),
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
PARAMETER | DESCRIPTION |
---|---|
other
|
Another
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Other, Output]
|
A new |
pipe
¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable
objects.
Compose this Runnable
with Runnable
-like objects to make a
RunnableSequence
.
Equivalent to RunnableSequence(self, *others)
or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
PARAMETER | DESCRIPTION |
---|---|
*others
|
Other
TYPE:
|
name
|
An optional name for the resulting
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Other]
|
A new |
pick
¶
Pick keys from the output dict
of this Runnable
.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
PARAMETER | DESCRIPTION |
---|---|
keys
|
A key or list of keys to pick from the output dict. |
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Any, Any]
|
a new |
assign
¶
assign(
**kwargs: (
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]]
),
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict
output of this Runnable
.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
PARAMETER | DESCRIPTION |
---|---|
**kwargs
|
A mapping of keys to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Any, Any]
|
A new |
invoke
¶
invoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any
) -> AIMessage
Transform a single input into an output.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Output
|
The output of the |
ainvoke
async
¶
ainvoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any
) -> AIMessage
Transform a single input into an output.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Output
|
The output of the |
batch
¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Output]
|
A list of outputs from the |
batch_as_completed
¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> Iterator[tuple[int, Output | Exception]]
Run invoke
in parallel on a list of inputs.
Yields results as they complete.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
tuple[int, Output | Exception]
|
Tuples of the index of the input and the output from the |
abatch
async
¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs ainvoke
in parallel using asyncio.gather
.
The default implementation of batch
works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Output]
|
A list of outputs from the |
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke
in parallel on a list of inputs.
Yields results as they complete.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
A tuple of the index of the input and the output from the |
stream
¶
stream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any
) -> Iterator[AIMessageChunk]
Default implementation of stream
, which calls invoke
.
Subclasses must override this method if they support streaming output.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
Output
|
The output of the |
astream
async
¶
astream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any
) -> AsyncIterator[AIMessageChunk]
Default implementation of astream
, which calls ainvoke
.
Subclasses must override this method if they support streaming output.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[Output]
|
The output of the |
astream_log
async
¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable
, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
diff
|
Whether to yield diffs between each step or the current state.
TYPE:
|
with_streamed_output_list
|
Whether to yield the
TYPE:
|
include_names
|
Only include logs with these names. |
include_types
|
Only include logs with these types. |
include_tags
|
Only include logs with these tags. |
exclude_names
|
Exclude logs with these names. |
exclude_types
|
Exclude logs with these types. |
exclude_tags
|
Exclude logs with these tags. |
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
A |
astream_events
async
¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent
that provide real-time information
about the progress of the Runnable
, including StreamEvent
from intermediate
results.
A StreamEvent
is a dictionary with the following schema:
event
: Event names are of the format:on_[runnable_type]_(start|stream|end)
.name
: The name of theRunnable
that generated the event.run_id
: Randomly generated ID associated with the given execution of theRunnable
that emitted the event. A childRunnable
that gets invoked as part of the execution of a parentRunnable
is assigned its own unique ID.parent_ids
: The IDs of the parent runnables that generated the event. The rootRunnable
will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags
: The tags of theRunnable
that generated the event.metadata
: The metadata of theRunnable
that generated the event.data
: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
event | name | chunk | input | output |
---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'Hello' |
||
on_llm_end |
'[model name]' |
'Hello human!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
Attribute | Type | Description |
---|---|---|
name |
str |
A user defined name for the event. |
data |
Any |
The data associated with the event. This can be anything, though we suggest making it JSON serializable. |
Here are declarations associated with the standard events shown above:
format_docs
:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool
:
prompt
:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
version
|
The version of the schema to use either
TYPE:
|
include_names
|
Only include events from |
include_types
|
Only include events from |
include_tags
|
Only include events from |
exclude_names
|
Exclude events from |
exclude_types
|
Exclude events from |
exclude_tags
|
Exclude events from |
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[StreamEvent]
|
An async stream of |
RAISES | DESCRIPTION |
---|---|
NotImplementedError
|
If the version is not |
transform
¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream
.
Subclasses must override this method if they can start producing output while input is still being generated.
PARAMETER | DESCRIPTION |
---|---|
input
|
An iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
Output
|
The output of the |
atransform
async
¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream
.
Subclasses must override this method if they can start producing output while input is still being generated.
PARAMETER | DESCRIPTION |
---|---|
input
|
An async iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[Output]
|
The output of the |
bind
¶
Bind arguments to a Runnable
, returning a new Runnable
.
Useful when a Runnable
in a chain requires an argument that is not
in the output of the previous Runnable
or included in the user input.
PARAMETER | DESCRIPTION |
---|---|
**kwargs
|
The arguments to bind to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable
, returning a new Runnable
.
PARAMETER | DESCRIPTION |
---|---|
config
|
The config to bind to the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
with_listeners
¶
with_listeners(
*,
on_start: (
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
) = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: (
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
) = None
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable
, returning a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
PARAMETER | DESCRIPTION |
---|---|
on_start
|
Called before the
TYPE:
|
on_end
|
Called after the
TYPE:
|
on_error
|
Called if the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable
.
Returns a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
PARAMETER | DESCRIPTION |
---|---|
on_start
|
Called asynchronously before the
TYPE:
|
on_end
|
Called asynchronously after the
TYPE:
|
on_error
|
Called asynchronously if the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable
, returning a new Runnable
.
PARAMETER | DESCRIPTION |
---|---|
input_type
|
The input type to bind to the
TYPE:
|
output_type
|
The output type to bind to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3
) -> Runnable[Input, Output]
Create a new Runnable
that retries the original Runnable
on exceptions.
PARAMETER | DESCRIPTION |
---|---|
retry_if_exception_type
|
A tuple of exception types to retry on.
TYPE:
|
wait_exponential_jitter
|
Whether to add jitter to the wait time between retries.
TYPE:
|
stop_after_attempt
|
The maximum number of attempts to make before giving up.
TYPE:
|
exponential_jitter_params
|
Parameters for
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new Runnable that retries the original Runnable on exceptions. |
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map
¶
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable
, returning a new Runnable
.
The new Runnable
will try the original Runnable
, and then each fallback
in order, upon failures.
PARAMETER | DESCRIPTION |
---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
PARAMETER | DESCRIPTION |
---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
as_tool
¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None
) -> BaseTool
Create a BaseTool
from a Runnable
.
as_tool
will instantiate a BaseTool
with a name, description, and
args_schema
from a Runnable
. Where possible, schemas are inferred
from runnable.get_input_schema
. Alternatively (e.g., if the
Runnable
takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema
. You can also
pass arg_types
to just specify the required arguments and their types.
PARAMETER | DESCRIPTION |
---|---|
args_schema
|
The schema for the tool. |
name
|
The name of the tool.
TYPE:
|
description
|
The description of the tool.
TYPE:
|
arg_types
|
A dictionary of argument names to types. |
RETURNS | DESCRIPTION |
---|---|
BaseTool
|
A |
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via args_schema
:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via arg_types
:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
from langchain_core.runnables import RunnableLambda
def f(x: str) -> str:
return x + "a"
def g(x: str) -> str:
return x + "z"
runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")
Added in version 0.2.14
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
Is this class serializable?
By design, even if a class inherits from Serializable
, it is not serializable
by default. This is to prevent accidental serialization of objects that should
not be serialized.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the class is serializable. Default is |
get_lc_namespace
classmethod
¶
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
For example, for the class langchain.llms.openai.OpenAI
, the id is
["langchain", "llms", "openai", "OpenAI"]
.
to_json
¶
Serialize the Runnable
to JSON.
RETURNS | DESCRIPTION |
---|---|
SerializedConstructor | SerializedNotImplemented
|
A JSON-serializable representation of the |
to_json_not_implemented
¶
Serialize a "not implemented" object.
RETURNS | DESCRIPTION |
---|---|
SerializedNotImplemented
|
|
configurable_fields
¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
Configure particular Runnable
fields at runtime.
PARAMETER | DESCRIPTION |
---|---|
**kwargs
|
A dictionary of
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If a configuration key is not found in the |
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives
¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnable
objects that can be set at runtime.
PARAMETER | DESCRIPTION |
---|---|
which
|
The
TYPE:
|
default_key
|
The default key to use if no alternative is selected.
TYPE:
|
prefix_keys
|
Whether to prefix the keys with the
TYPE:
|
**kwargs
|
A dictionary of keys to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-3-7-sonnet-20250219"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
set_verbose
¶
generate_prompt
¶
generate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks = None,
**kwargs: Any
) -> LLMResult
Pass a sequence of prompts to the model and return model generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
prompts
|
List of
TYPE:
|
stop
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
callbacks
|
TYPE:
|
**kwargs
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
agenerate_prompt
async
¶
agenerate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks = None,
**kwargs: Any
) -> LLMResult
Asynchronously pass a sequence of prompts and return model generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
prompts
|
List of
TYPE:
|
stop
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
callbacks
|
TYPE:
|
**kwargs
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
with_structured_output
¶
with_structured_output(
schema: Dict | type, *, include_raw: bool = False, **kwargs: Any
) -> Runnable[LanguageModelInput, Dict | BaseModel]
Model wrapper that returns outputs formatted to match the given schema.
PARAMETER | DESCRIPTION |
---|---|
schema
|
The output schema. Can be passed in as:
If See |
include_raw
|
If The final output is always a
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If there are any unsupported |
NotImplementedError
|
If the model does not implement
|
RETURNS | DESCRIPTION |
---|---|
Runnable[LanguageModelInput, Dict | BaseModel]
|
A If
|
Example: Pydantic schema (include_raw=False
):
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
model = ChatModel(model="model-name", temperature=0)
structured_model = model.with_structured_output(AnswerWithJustification)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> AnswerWithJustification(
# answer='They weigh the same',
# justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'
# )
Example: Pydantic schema (include_raw=True
):
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
model = ChatModel(model="model-name", temperature=0)
structured_model = model.with_structured_output(
AnswerWithJustification, include_raw=True
)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> {
# 'raw': AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Ao02pnFYXD6GN1yzc0uXPsvF', 'function': {'arguments': '{"answer":"They weigh the same.","justification":"Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ."}', 'name': 'AnswerWithJustification'}, 'type': 'function'}]}),
# 'parsed': AnswerWithJustification(answer='They weigh the same.', justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'),
# 'parsing_error': None
# }
Example: dict
schema (include_raw=False
):
from pydantic import BaseModel
from langchain_core.utils.function_calling import convert_to_openai_tool
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
dict_schema = convert_to_openai_tool(AnswerWithJustification)
model = ChatModel(model="model-name", temperature=0)
structured_model = model.with_structured_output(dict_schema)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> {
# 'answer': 'They weigh the same',
# 'justification': 'Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume and density of the two substances differ.'
# }
Behavior changed in 0.2.26
Added support for TypedDict class.
get_token_ids
¶
get_num_tokens
¶
get_num_tokens_from_messages
¶
get_num_tokens_from_messages(
messages: list[BaseMessage], tools: Sequence | None = None
) -> int
Get the number of tokens in the messages.
Useful for checking if an input fits in a model's context window.
Note
The base implementation of get_num_tokens_from_messages
ignores tool
schemas.
PARAMETER | DESCRIPTION |
---|---|
messages
|
The message inputs to tokenize.
TYPE:
|
tools
|
If provided, sequence of dict,
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The sum of the number of tokens across the messages. |
generate
¶
generate(
messages: list[list[BaseMessage]],
stop: list[str] | None = None,
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
**kwargs: Any
) -> LLMResult
Pass a sequence of prompts to the model and return model generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
messages
|
List of list of messages.
TYPE:
|
stop
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
callbacks
|
TYPE:
|
tags
|
The tags to apply. |
metadata
|
The metadata to apply. |
run_name
|
The name of the run.
TYPE:
|
run_id
|
The ID of the run.
TYPE:
|
**kwargs
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
agenerate
async
¶
agenerate(
messages: list[list[BaseMessage]],
stop: list[str] | None = None,
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
**kwargs: Any
) -> LLMResult
Asynchronously pass a sequence of prompts to a model and return generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
messages
|
List of list of messages.
TYPE:
|
stop
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
callbacks
|
TYPE:
|
tags
|
The tags to apply. |
metadata
|
The metadata to apply. |
run_name
|
The name of the run.
TYPE:
|
run_id
|
The ID of the run.
TYPE:
|
**kwargs
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
bind_tools
¶
bind_tools(
tools: Sequence[Dict[str, Any] | type | Callable | BaseTool],
*,
tool_choice: str | None = None,
**kwargs: Any
) -> Runnable[LanguageModelInput, AIMessage]
Bind tools to the model.
PARAMETER | DESCRIPTION |
---|---|
tools
|
Sequence of tools to bind to the model. |
tool_choice
|
The tool to use. If "any" then any tool can be used.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[LanguageModelInput, AIMessage]
|
A Runnable that returns a message. |
ParrotFakeChatModel
¶
Bases: BaseChatModel
Generic fake chat model that can be used to test the chat model interface.
- Chat model should be usable in both sync and async tests
METHOD | DESCRIPTION |
---|---|
get_name |
Get the name of the |
get_input_schema |
Get a Pydantic model that can be used to validate input to the |
get_input_jsonschema |
Get a JSON schema that represents the input to the |
get_output_schema |
Get a Pydantic model that can be used to validate output to the |
get_output_jsonschema |
Get a JSON schema that represents the output of the |
config_schema |
The type of config this |
get_config_jsonschema |
Get a JSON schema that represents the config of the |
get_graph |
Return a graph representation of this |
get_prompts |
Return a list of prompts used by this |
__or__ |
Runnable "or" operator. |
__ror__ |
Runnable "reverse-or" operator. |
pipe |
Pipe |
pick |
Pick keys from the output |
assign |
Assigns new fields to the |
invoke |
Transform a single input into an output. |
ainvoke |
Transform a single input into an output. |
batch |
Default implementation runs invoke in parallel using a thread pool executor. |
batch_as_completed |
Run |
abatch |
Default implementation runs |
abatch_as_completed |
Run |
stream |
Default implementation of |
astream |
Default implementation of |
astream_log |
Stream all output from a |
astream_events |
Generate a stream of events. |
transform |
Transform inputs to outputs. |
atransform |
Transform inputs to outputs. |
bind |
Bind arguments to a |
with_config |
Bind config to a |
with_listeners |
Bind lifecycle listeners to a |
with_alisteners |
Bind async lifecycle listeners to a |
with_types |
Bind input and output types to a |
with_retry |
Create a new |
map |
Return a new |
with_fallbacks |
Add fallbacks to a |
as_tool |
Create a |
__init__ |
|
is_lc_serializable |
Is this class serializable? |
get_lc_namespace |
Get the namespace of the LangChain object. |
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the |
to_json_not_implemented |
Serialize a "not implemented" object. |
configurable_fields |
Configure particular |
configurable_alternatives |
Configure alternatives for |
set_verbose |
If verbose is |
generate_prompt |
Pass a sequence of prompts to the model and return model generations. |
agenerate_prompt |
Asynchronously pass a sequence of prompts and return model generations. |
with_structured_output |
Model wrapper that returns outputs formatted to match the given schema. |
get_token_ids |
Return the ordered ids of the tokens in a text. |
get_num_tokens |
Get the number of tokens present in the text. |
get_num_tokens_from_messages |
Get the number of tokens in the messages. |
generate |
Pass a sequence of prompts to the model and return model generations. |
agenerate |
Asynchronously pass a sequence of prompts to a model and return generations. |
dict |
Return a dictionary of the LLM. |
bind_tools |
Bind tools to the model. |
name
class-attribute
instance-attribute
¶
name: str | None = None
The name of the Runnable
. Used for debugging and tracing.
input_schema
property
¶
The type of input this Runnable
accepts specified as a Pydantic model.
output_schema
property
¶
Output schema.
The type of output this Runnable
produces specified as a Pydantic model.
config_specs
property
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable
.
lc_secrets
property
¶
A map of constructor argument names to secret ids.
For example, {"openai_api_key": "OPENAI_API_KEY"}
lc_attributes
property
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
Default is an empty dictionary.
cache
class-attribute
instance-attribute
¶
Whether to cache the response.
- If
True
, will use the global cache. - If
False
, will not use a cache - If
None
, will use the global cache if it's set, otherwise no cache. - If instance of
BaseCache
, will use the provided cache.
Caching is not currently supported for streaming methods of models.
verbose
class-attribute
instance-attribute
¶
Whether to print out response text.
callbacks
class-attribute
instance-attribute
¶
callbacks: Callbacks = Field(default=None, exclude=True)
Callbacks to add to the run trace.
tags
class-attribute
instance-attribute
¶
Tags to add to the run trace.
metadata
class-attribute
instance-attribute
¶
Metadata to add to the run trace.
custom_get_token_ids
class-attribute
instance-attribute
¶
Optional encoder to use for counting tokens.
rate_limiter
class-attribute
instance-attribute
¶
rate_limiter: BaseRateLimiter | None = Field(default=None, exclude=True)
An optional rate limiter to use for limiting the number of requests.
disable_streaming
class-attribute
instance-attribute
¶
Whether to disable streaming for this model.
If streaming is bypassed, then stream
/astream
/astream_events
will
defer to invoke
/ainvoke
.
- If
True
, will always bypass streaming case. - If
'tool_calling'
, will bypass streaming case only when the model is called with atools
keyword argument. In other words, LangChain will automatically switch to non-streaming behavior (invoke
) only when the tools argument is provided. This offers the best of both worlds. - If
False
(Default), will always use streaming case if available.
The main reason for this flag is that code might be written using stream
and
a user may want to swap out a given model for another model whose the implementation
does not properly support streaming.
output_version
class-attribute
instance-attribute
¶
Version of AIMessage
output format to store in message content.
AIMessage.content_blocks
will lazily parse the contents of content
into a
standard format. This flag can be used to additionally store the standard format
in message content, e.g., for serialization purposes.
Supported values:
'v0'
: provider-specific format in content (can lazily-parse withcontent_blocks
)'v1'
: standardized format in content (consistent withcontent_blocks
)
Partner packages (e.g.,
langchain-openai
) can also use this
field to roll out new content formats in a backward-compatible way.
Added in version 1.0
get_name
¶
get_input_schema
¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate input to the Runnable
.
Runnable
objects that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic input schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an input schema for a specific configuration.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate input. |
get_input_jsonschema
¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable
.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
dict[str, Any]
|
A JSON schema that represents the input to the |
Example
Added in version 0.3.0
get_output_schema
¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate output to the Runnable
.
Runnable
objects that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic output schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an output schema for a specific configuration.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate output. |
get_output_jsonschema
¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable
.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
dict[str, Any]
|
A JSON schema that represents the output of the |
Example
Added in version 0.3.0
config_schema
¶
The type of config this Runnable
accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives
methods.
PARAMETER | DESCRIPTION |
---|---|
include
|
A list of fields to include in the config schema. |
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate config. |
get_config_jsonschema
¶
get_graph
¶
get_graph(config: RunnableConfig | None = None) -> Graph
Return a graph representation of this Runnable
.
get_prompts
¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable
.
__or__
¶
__or__(
other: (
Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any]
),
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
PARAMETER | DESCRIPTION |
---|---|
other
|
Another
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Other]
|
A new |
__ror__
¶
__ror__(
other: (
Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any]
),
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
PARAMETER | DESCRIPTION |
---|---|
other
|
Another
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Other, Output]
|
A new |
pipe
¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable
objects.
Compose this Runnable
with Runnable
-like objects to make a
RunnableSequence
.
Equivalent to RunnableSequence(self, *others)
or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
PARAMETER | DESCRIPTION |
---|---|
*others
|
Other
TYPE:
|
name
|
An optional name for the resulting
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Other]
|
A new |
pick
¶
Pick keys from the output dict
of this Runnable
.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
PARAMETER | DESCRIPTION |
---|---|
keys
|
A key or list of keys to pick from the output dict. |
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Any, Any]
|
a new |
assign
¶
assign(
**kwargs: (
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]]
),
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict
output of this Runnable
.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
PARAMETER | DESCRIPTION |
---|---|
**kwargs
|
A mapping of keys to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Any, Any]
|
A new |
invoke
¶
invoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any
) -> AIMessage
Transform a single input into an output.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Output
|
The output of the |
ainvoke
async
¶
ainvoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any
) -> AIMessage
Transform a single input into an output.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Output
|
The output of the |
batch
¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Output]
|
A list of outputs from the |
batch_as_completed
¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> Iterator[tuple[int, Output | Exception]]
Run invoke
in parallel on a list of inputs.
Yields results as they complete.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
tuple[int, Output | Exception]
|
Tuples of the index of the input and the output from the |
abatch
async
¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs ainvoke
in parallel using asyncio.gather
.
The default implementation of batch
works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Output]
|
A list of outputs from the |
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke
in parallel on a list of inputs.
Yields results as they complete.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
A tuple of the index of the input and the output from the |
stream
¶
stream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any
) -> Iterator[AIMessageChunk]
Default implementation of stream
, which calls invoke
.
Subclasses must override this method if they support streaming output.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
Output
|
The output of the |
astream
async
¶
astream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any
) -> AsyncIterator[AIMessageChunk]
Default implementation of astream
, which calls ainvoke
.
Subclasses must override this method if they support streaming output.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[Output]
|
The output of the |
astream_log
async
¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable
, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
diff
|
Whether to yield diffs between each step or the current state.
TYPE:
|
with_streamed_output_list
|
Whether to yield the
TYPE:
|
include_names
|
Only include logs with these names. |
include_types
|
Only include logs with these types. |
include_tags
|
Only include logs with these tags. |
exclude_names
|
Exclude logs with these names. |
exclude_types
|
Exclude logs with these types. |
exclude_tags
|
Exclude logs with these tags. |
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
A |
astream_events
async
¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent
that provide real-time information
about the progress of the Runnable
, including StreamEvent
from intermediate
results.
A StreamEvent
is a dictionary with the following schema:
event
: Event names are of the format:on_[runnable_type]_(start|stream|end)
.name
: The name of theRunnable
that generated the event.run_id
: Randomly generated ID associated with the given execution of theRunnable
that emitted the event. A childRunnable
that gets invoked as part of the execution of a parentRunnable
is assigned its own unique ID.parent_ids
: The IDs of the parent runnables that generated the event. The rootRunnable
will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags
: The tags of theRunnable
that generated the event.metadata
: The metadata of theRunnable
that generated the event.data
: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
event | name | chunk | input | output |
---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'Hello' |
||
on_llm_end |
'[model name]' |
'Hello human!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
Attribute | Type | Description |
---|---|---|
name |
str |
A user defined name for the event. |
data |
Any |
The data associated with the event. This can be anything, though we suggest making it JSON serializable. |
Here are declarations associated with the standard events shown above:
format_docs
:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool
:
prompt
:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
version
|
The version of the schema to use either
TYPE:
|
include_names
|
Only include events from |
include_types
|
Only include events from |
include_tags
|
Only include events from |
exclude_names
|
Exclude events from |
exclude_types
|
Exclude events from |
exclude_tags
|
Exclude events from |
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[StreamEvent]
|
An async stream of |
RAISES | DESCRIPTION |
---|---|
NotImplementedError
|
If the version is not |
transform
¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream
.
Subclasses must override this method if they can start producing output while input is still being generated.
PARAMETER | DESCRIPTION |
---|---|
input
|
An iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
Output
|
The output of the |
atransform
async
¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream
.
Subclasses must override this method if they can start producing output while input is still being generated.
PARAMETER | DESCRIPTION |
---|---|
input
|
An async iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[Output]
|
The output of the |
bind
¶
Bind arguments to a Runnable
, returning a new Runnable
.
Useful when a Runnable
in a chain requires an argument that is not
in the output of the previous Runnable
or included in the user input.
PARAMETER | DESCRIPTION |
---|---|
**kwargs
|
The arguments to bind to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable
, returning a new Runnable
.
PARAMETER | DESCRIPTION |
---|---|
config
|
The config to bind to the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
with_listeners
¶
with_listeners(
*,
on_start: (
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
) = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: (
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
) = None
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable
, returning a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
PARAMETER | DESCRIPTION |
---|---|
on_start
|
Called before the
TYPE:
|
on_end
|
Called after the
TYPE:
|
on_error
|
Called if the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable
.
Returns a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
PARAMETER | DESCRIPTION |
---|---|
on_start
|
Called asynchronously before the
TYPE:
|
on_end
|
Called asynchronously after the
TYPE:
|
on_error
|
Called asynchronously if the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable
, returning a new Runnable
.
PARAMETER | DESCRIPTION |
---|---|
input_type
|
The input type to bind to the
TYPE:
|
output_type
|
The output type to bind to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3
) -> Runnable[Input, Output]
Create a new Runnable
that retries the original Runnable
on exceptions.
PARAMETER | DESCRIPTION |
---|---|
retry_if_exception_type
|
A tuple of exception types to retry on.
TYPE:
|
wait_exponential_jitter
|
Whether to add jitter to the wait time between retries.
TYPE:
|
stop_after_attempt
|
The maximum number of attempts to make before giving up.
TYPE:
|
exponential_jitter_params
|
Parameters for
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new Runnable that retries the original Runnable on exceptions. |
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map
¶
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable
, returning a new Runnable
.
The new Runnable
will try the original Runnable
, and then each fallback
in order, upon failures.
PARAMETER | DESCRIPTION |
---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
PARAMETER | DESCRIPTION |
---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
as_tool
¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None
) -> BaseTool
Create a BaseTool
from a Runnable
.
as_tool
will instantiate a BaseTool
with a name, description, and
args_schema
from a Runnable
. Where possible, schemas are inferred
from runnable.get_input_schema
. Alternatively (e.g., if the
Runnable
takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema
. You can also
pass arg_types
to just specify the required arguments and their types.
PARAMETER | DESCRIPTION |
---|---|
args_schema
|
The schema for the tool. |
name
|
The name of the tool.
TYPE:
|
description
|
The description of the tool.
TYPE:
|
arg_types
|
A dictionary of argument names to types. |
RETURNS | DESCRIPTION |
---|---|
BaseTool
|
A |
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via args_schema
:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via arg_types
:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
from langchain_core.runnables import RunnableLambda
def f(x: str) -> str:
return x + "a"
def g(x: str) -> str:
return x + "z"
runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")
Added in version 0.2.14
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
Is this class serializable?
By design, even if a class inherits from Serializable
, it is not serializable
by default. This is to prevent accidental serialization of objects that should
not be serialized.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the class is serializable. Default is |
get_lc_namespace
classmethod
¶
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
For example, for the class langchain.llms.openai.OpenAI
, the id is
["langchain", "llms", "openai", "OpenAI"]
.
to_json
¶
Serialize the Runnable
to JSON.
RETURNS | DESCRIPTION |
---|---|
SerializedConstructor | SerializedNotImplemented
|
A JSON-serializable representation of the |
to_json_not_implemented
¶
Serialize a "not implemented" object.
RETURNS | DESCRIPTION |
---|---|
SerializedNotImplemented
|
|
configurable_fields
¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
Configure particular Runnable
fields at runtime.
PARAMETER | DESCRIPTION |
---|---|
**kwargs
|
A dictionary of
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If a configuration key is not found in the |
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives
¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnable
objects that can be set at runtime.
PARAMETER | DESCRIPTION |
---|---|
which
|
The
TYPE:
|
default_key
|
The default key to use if no alternative is selected.
TYPE:
|
prefix_keys
|
Whether to prefix the keys with the
TYPE:
|
**kwargs
|
A dictionary of keys to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-3-7-sonnet-20250219"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
set_verbose
¶
generate_prompt
¶
generate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks = None,
**kwargs: Any
) -> LLMResult
Pass a sequence of prompts to the model and return model generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
prompts
|
List of
TYPE:
|
stop
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
callbacks
|
TYPE:
|
**kwargs
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
agenerate_prompt
async
¶
agenerate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks = None,
**kwargs: Any
) -> LLMResult
Asynchronously pass a sequence of prompts and return model generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
prompts
|
List of
TYPE:
|
stop
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
callbacks
|
TYPE:
|
**kwargs
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
with_structured_output
¶
with_structured_output(
schema: Dict | type, *, include_raw: bool = False, **kwargs: Any
) -> Runnable[LanguageModelInput, Dict | BaseModel]
Model wrapper that returns outputs formatted to match the given schema.
PARAMETER | DESCRIPTION |
---|---|
schema
|
The output schema. Can be passed in as:
If See |
include_raw
|
If The final output is always a
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If there are any unsupported |
NotImplementedError
|
If the model does not implement
|
RETURNS | DESCRIPTION |
---|---|
Runnable[LanguageModelInput, Dict | BaseModel]
|
A If
|
Example: Pydantic schema (include_raw=False
):
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
model = ChatModel(model="model-name", temperature=0)
structured_model = model.with_structured_output(AnswerWithJustification)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> AnswerWithJustification(
# answer='They weigh the same',
# justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'
# )
Example: Pydantic schema (include_raw=True
):
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
model = ChatModel(model="model-name", temperature=0)
structured_model = model.with_structured_output(
AnswerWithJustification, include_raw=True
)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> {
# 'raw': AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Ao02pnFYXD6GN1yzc0uXPsvF', 'function': {'arguments': '{"answer":"They weigh the same.","justification":"Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ."}', 'name': 'AnswerWithJustification'}, 'type': 'function'}]}),
# 'parsed': AnswerWithJustification(answer='They weigh the same.', justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'),
# 'parsing_error': None
# }
Example: dict
schema (include_raw=False
):
from pydantic import BaseModel
from langchain_core.utils.function_calling import convert_to_openai_tool
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
dict_schema = convert_to_openai_tool(AnswerWithJustification)
model = ChatModel(model="model-name", temperature=0)
structured_model = model.with_structured_output(dict_schema)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> {
# 'answer': 'They weigh the same',
# 'justification': 'Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume and density of the two substances differ.'
# }
Behavior changed in 0.2.26
Added support for TypedDict class.
get_token_ids
¶
get_num_tokens
¶
get_num_tokens_from_messages
¶
get_num_tokens_from_messages(
messages: list[BaseMessage], tools: Sequence | None = None
) -> int
Get the number of tokens in the messages.
Useful for checking if an input fits in a model's context window.
Note
The base implementation of get_num_tokens_from_messages
ignores tool
schemas.
PARAMETER | DESCRIPTION |
---|---|
messages
|
The message inputs to tokenize.
TYPE:
|
tools
|
If provided, sequence of dict,
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The sum of the number of tokens across the messages. |
generate
¶
generate(
messages: list[list[BaseMessage]],
stop: list[str] | None = None,
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
**kwargs: Any
) -> LLMResult
Pass a sequence of prompts to the model and return model generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
messages
|
List of list of messages.
TYPE:
|
stop
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
callbacks
|
TYPE:
|
tags
|
The tags to apply. |
metadata
|
The metadata to apply. |
run_name
|
The name of the run.
TYPE:
|
run_id
|
The ID of the run.
TYPE:
|
**kwargs
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
agenerate
async
¶
agenerate(
messages: list[list[BaseMessage]],
stop: list[str] | None = None,
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
**kwargs: Any
) -> LLMResult
Asynchronously pass a sequence of prompts to a model and return generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
messages
|
List of list of messages.
TYPE:
|
stop
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
callbacks
|
TYPE:
|
tags
|
The tags to apply. |
metadata
|
The metadata to apply. |
run_name
|
The name of the run.
TYPE:
|
run_id
|
The ID of the run.
TYPE:
|
**kwargs
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
bind_tools
¶
bind_tools(
tools: Sequence[Dict[str, Any] | type | Callable | BaseTool],
*,
tool_choice: str | None = None,
**kwargs: Any
) -> Runnable[LanguageModelInput, AIMessage]
Bind tools to the model.
PARAMETER | DESCRIPTION |
---|---|
tools
|
Sequence of tools to bind to the model. |
tool_choice
|
The tool to use. If "any" then any tool can be used.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[LanguageModelInput, AIMessage]
|
A Runnable that returns a message. |
langchain_core.language_models.base
¶
Base language models class.
LanguageModelInput
module-attribute
¶
LanguageModelInput = PromptValue | str | Sequence[MessageLikeRepresentation]
Input to a language model.
LanguageModelOutput
module-attribute
¶
LanguageModelOutput = BaseMessage | str
Output from a language model.
LanguageModelLike
module-attribute
¶
LanguageModelLike = Runnable[LanguageModelInput, LanguageModelOutput]
Input/output interface for a language model.
BaseLanguageModel
¶
Bases: RunnableSerializable[LanguageModelInput, LanguageModelOutputVar]
, ABC
Abstract base class for interfacing with language models.
All language model wrappers inherited from BaseLanguageModel
.
METHOD | DESCRIPTION |
---|---|
set_verbose |
If verbose is |
generate_prompt |
Pass a sequence of prompts to the model and return model generations. |
agenerate_prompt |
Asynchronously pass a sequence of prompts and return model generations. |
with_structured_output |
Not implemented on this class. |
get_token_ids |
Return the ordered ids of the tokens in a text. |
get_num_tokens |
Get the number of tokens present in the text. |
get_num_tokens_from_messages |
Get the number of tokens in the messages. |
get_name |
Get the name of the |
get_input_schema |
Get a Pydantic model that can be used to validate input to the |
get_input_jsonschema |
Get a JSON schema that represents the input to the |
get_output_schema |
Get a Pydantic model that can be used to validate output to the |
get_output_jsonschema |
Get a JSON schema that represents the output of the |
config_schema |
The type of config this |
get_config_jsonschema |
Get a JSON schema that represents the config of the |
get_graph |
Return a graph representation of this |
get_prompts |
Return a list of prompts used by this |
__or__ |
Runnable "or" operator. |
__ror__ |
Runnable "reverse-or" operator. |
pipe |
Pipe |
pick |
Pick keys from the output |
assign |
Assigns new fields to the |
invoke |
Transform a single input into an output. |
ainvoke |
Transform a single input into an output. |
batch |
Default implementation runs invoke in parallel using a thread pool executor. |
batch_as_completed |
Run |
abatch |
Default implementation runs |
abatch_as_completed |
Run |
stream |
Default implementation of |
astream |
Default implementation of |
astream_log |
Stream all output from a |
astream_events |
Generate a stream of events. |
transform |
Transform inputs to outputs. |
atransform |
Transform inputs to outputs. |
bind |
Bind arguments to a |
with_config |
Bind config to a |
with_listeners |
Bind lifecycle listeners to a |
with_alisteners |
Bind async lifecycle listeners to a |
with_types |
Bind input and output types to a |
with_retry |
Create a new |
map |
Return a new |
with_fallbacks |
Add fallbacks to a |
as_tool |
Create a |
__init__ |
|
is_lc_serializable |
Is this class serializable? |
get_lc_namespace |
Get the namespace of the LangChain object. |
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the |
to_json_not_implemented |
Serialize a "not implemented" object. |
configurable_fields |
Configure particular |
configurable_alternatives |
Configure alternatives for |
cache
class-attribute
instance-attribute
¶
Whether to cache the response.
- If
True
, will use the global cache. - If
False
, will not use a cache - If
None
, will use the global cache if it's set, otherwise no cache. - If instance of
BaseCache
, will use the provided cache.
Caching is not currently supported for streaming methods of models.
verbose
class-attribute
instance-attribute
¶
Whether to print out response text.
callbacks
class-attribute
instance-attribute
¶
callbacks: Callbacks = Field(default=None, exclude=True)
Callbacks to add to the run trace.
tags
class-attribute
instance-attribute
¶
Tags to add to the run trace.
metadata
class-attribute
instance-attribute
¶
Metadata to add to the run trace.
custom_get_token_ids
class-attribute
instance-attribute
¶
Optional encoder to use for counting tokens.
name
class-attribute
instance-attribute
¶
name: str | None = None
The name of the Runnable
. Used for debugging and tracing.
OutputType
property
¶
OutputType: type[Output]
Output Type.
The type of output this Runnable
produces specified as a type annotation.
RAISES | DESCRIPTION |
---|---|
TypeError
|
If the output type cannot be inferred. |
input_schema
property
¶
The type of input this Runnable
accepts specified as a Pydantic model.
output_schema
property
¶
Output schema.
The type of output this Runnable
produces specified as a Pydantic model.
config_specs
property
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable
.
lc_secrets
property
¶
A map of constructor argument names to secret ids.
For example, {"openai_api_key": "OPENAI_API_KEY"}
lc_attributes
property
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
Default is an empty dictionary.
set_verbose
¶
generate_prompt
abstractmethod
¶
generate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks = None,
**kwargs: Any
) -> LLMResult
Pass a sequence of prompts to the model and return model generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
prompts
|
List of
TYPE:
|
stop
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
callbacks
|
TYPE:
|
**kwargs
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
agenerate_prompt
abstractmethod
async
¶
agenerate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks = None,
**kwargs: Any
) -> LLMResult
Asynchronously pass a sequence of prompts and return model generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
PARAMETER | DESCRIPTION |
---|---|
prompts
|
List of
TYPE:
|
stop
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
callbacks
|
TYPE:
|
**kwargs
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LLMResult
|
An |
with_structured_output
¶
with_structured_output(
schema: dict | type, **kwargs: Any
) -> Runnable[LanguageModelInput, dict | BaseModel]
Not implemented on this class.
get_token_ids
¶
get_num_tokens
¶
get_num_tokens_from_messages
¶
get_num_tokens_from_messages(
messages: list[BaseMessage], tools: Sequence | None = None
) -> int
Get the number of tokens in the messages.
Useful for checking if an input fits in a model's context window.
Note
The base implementation of get_num_tokens_from_messages
ignores tool
schemas.
PARAMETER | DESCRIPTION |
---|---|
messages
|
The message inputs to tokenize.
TYPE:
|
tools
|
If provided, sequence of dict,
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The sum of the number of tokens across the messages. |
get_name
¶
get_input_schema
¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate input to the Runnable
.
Runnable
objects that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic input schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an input schema for a specific configuration.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate input. |
get_input_jsonschema
¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable
.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
dict[str, Any]
|
A JSON schema that represents the input to the |
Example
Added in version 0.3.0
get_output_schema
¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate output to the Runnable
.
Runnable
objects that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic output schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an output schema for a specific configuration.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate output. |
get_output_jsonschema
¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable
.
PARAMETER | DESCRIPTION |
---|---|
config
|
A config to use when generating the schema.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
dict[str, Any]
|
A JSON schema that represents the output of the |
Example
Added in version 0.3.0
config_schema
¶
The type of config this Runnable
accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives
methods.
PARAMETER | DESCRIPTION |
---|---|
include
|
A list of fields to include in the config schema. |
RETURNS | DESCRIPTION |
---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate config. |
get_config_jsonschema
¶
get_graph
¶
get_graph(config: RunnableConfig | None = None) -> Graph
Return a graph representation of this Runnable
.
get_prompts
¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable
.
__or__
¶
__or__(
other: (
Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any]
),
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
PARAMETER | DESCRIPTION |
---|---|
other
|
Another
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Other]
|
A new |
__ror__
¶
__ror__(
other: (
Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any]
),
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
PARAMETER | DESCRIPTION |
---|---|
other
|
Another
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Other, Output]
|
A new |
pipe
¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable
objects.
Compose this Runnable
with Runnable
-like objects to make a
RunnableSequence
.
Equivalent to RunnableSequence(self, *others)
or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
PARAMETER | DESCRIPTION |
---|---|
*others
|
Other
TYPE:
|
name
|
An optional name for the resulting
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Other]
|
A new |
pick
¶
Pick keys from the output dict
of this Runnable
.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
PARAMETER | DESCRIPTION |
---|---|
keys
|
A key or list of keys to pick from the output dict. |
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Any, Any]
|
a new |
assign
¶
assign(
**kwargs: (
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]]
),
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict
output of this Runnable
.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
PARAMETER | DESCRIPTION |
---|---|
**kwargs
|
A mapping of keys to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Any, Any]
|
A new |
invoke
abstractmethod
¶
invoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
Transform a single input into an output.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Output
|
The output of the |
ainvoke
async
¶
ainvoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
Transform a single input into an output.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Output
|
The output of the |
batch
¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Output]
|
A list of outputs from the |
batch_as_completed
¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> Iterator[tuple[int, Output | Exception]]
Run invoke
in parallel on a list of inputs.
Yields results as they complete.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
tuple[int, Output | Exception]
|
Tuples of the index of the input and the output from the |
abatch
async
¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs ainvoke
in parallel using asyncio.gather
.
The default implementation of batch
works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Output]
|
A list of outputs from the |
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke
in parallel on a list of inputs.
Yields results as they complete.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
A tuple of the index of the input and the output from the |
stream
¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Default implementation of stream
, which calls invoke
.
Subclasses must override this method if they support streaming output.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
Output
|
The output of the |
astream
async
¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
Default implementation of astream
, which calls ainvoke
.
Subclasses must override this method if they support streaming output.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[Output]
|
The output of the |
astream_log
async
¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable
, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
diff
|
Whether to yield diffs between each step or the current state.
TYPE:
|
with_streamed_output_list
|
Whether to yield the
TYPE:
|
include_names
|
Only include logs with these names. |
include_types
|
Only include logs with these types. |
include_tags
|
Only include logs with these tags. |
exclude_names
|
Exclude logs with these names. |
exclude_types
|
Exclude logs with these types. |
exclude_tags
|
Exclude logs with these tags. |
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
A |
astream_events
async
¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent
that provide real-time information
about the progress of the Runnable
, including StreamEvent
from intermediate
results.
A StreamEvent
is a dictionary with the following schema:
event
: Event names are of the format:on_[runnable_type]_(start|stream|end)
.name
: The name of theRunnable
that generated the event.run_id
: Randomly generated ID associated with the given execution of theRunnable
that emitted the event. A childRunnable
that gets invoked as part of the execution of a parentRunnable
is assigned its own unique ID.parent_ids
: The IDs of the parent runnables that generated the event. The rootRunnable
will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags
: The tags of theRunnable
that generated the event.metadata
: The metadata of theRunnable
that generated the event.data
: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
event | name | chunk | input | output |
---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'Hello' |
||
on_llm_end |
'[model name]' |
'Hello human!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
Attribute | Type | Description |
---|---|---|
name |
str |
A user defined name for the event. |
data |
Any |
The data associated with the event. This can be anything, though we suggest making it JSON serializable. |
Here are declarations associated with the standard events shown above:
format_docs
:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool
:
prompt
:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
PARAMETER | DESCRIPTION |
---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
version
|
The version of the schema to use either
TYPE:
|
include_names
|
Only include events from |
include_types
|
Only include events from |
include_tags
|
Only include events from |
exclude_names
|
Exclude events from |
exclude_types
|
Exclude events from |
exclude_tags
|
Exclude events from |
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[StreamEvent]
|
An async stream of |
RAISES | DESCRIPTION |
---|---|
NotImplementedError
|
If the version is not |
transform
¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream
.
Subclasses must override this method if they can start producing output while input is still being generated.
PARAMETER | DESCRIPTION |
---|---|
input
|
An iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
Output
|
The output of the |
atransform
async
¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream
.
Subclasses must override this method if they can start producing output while input is still being generated.
PARAMETER | DESCRIPTION |
---|---|
input
|
An async iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[Output]
|
The output of the |
bind
¶
Bind arguments to a Runnable
, returning a new Runnable
.
Useful when a Runnable
in a chain requires an argument that is not
in the output of the previous Runnable
or included in the user input.
PARAMETER | DESCRIPTION |
---|---|
**kwargs
|
The arguments to bind to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable
, returning a new Runnable
.
PARAMETER | DESCRIPTION |
---|---|
config
|
The config to bind to the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
with_listeners
¶
with_listeners(
*,
on_start: (
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
) = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: (
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
) = None
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable
, returning a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
PARAMETER | DESCRIPTION |
---|---|
on_start
|
Called before the
TYPE:
|
on_end
|
Called after the
TYPE:
|
on_error
|
Called if the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable
.
Returns a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
PARAMETER | DESCRIPTION |
---|---|
on_start
|
Called asynchronously before the
TYPE:
|
on_end
|
Called asynchronously after the
TYPE:
|
on_error
|
Called asynchronously if the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable
, returning a new Runnable
.
PARAMETER | DESCRIPTION |
---|---|
input_type
|
The input type to bind to the
TYPE:
|
output_type
|
The output type to bind to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new |
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3
) -> Runnable[Input, Output]
Create a new Runnable
that retries the original Runnable
on exceptions.
PARAMETER | DESCRIPTION |
---|---|
retry_if_exception_type
|
A tuple of exception types to retry on.
TYPE:
|
wait_exponential_jitter
|
Whether to add jitter to the wait time between retries.
TYPE:
|
stop_after_attempt
|
The maximum number of attempts to make before giving up.
TYPE:
|
exponential_jitter_params
|
Parameters for
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Runnable[Input, Output]
|
A new Runnable that retries the original Runnable on exceptions. |
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map
¶
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable
, returning a new Runnable
.
The new Runnable
will try the original Runnable
, and then each fallback
in order, upon failures.
PARAMETER | DESCRIPTION |
---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
PARAMETER | DESCRIPTION |
---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
as_tool
¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None
) -> BaseTool
Create a BaseTool
from a Runnable
.
as_tool
will instantiate a BaseTool
with a name, description, and
args_schema
from a Runnable
. Where possible, schemas are inferred
from runnable.get_input_schema
. Alternatively (e.g., if the
Runnable
takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema
. You can also
pass arg_types
to just specify the required arguments and their types.
PARAMETER | DESCRIPTION |
---|---|
args_schema
|
The schema for the tool. |
name
|
The name of the tool.
TYPE:
|
description
|
The description of the tool.
TYPE:
|
arg_types
|
A dictionary of argument names to types. |
RETURNS | DESCRIPTION |
---|---|
BaseTool
|
A |
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via args_schema
:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via arg_types
:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
from langchain_core.runnables import RunnableLambda
def f(x: str) -> str:
return x + "a"
def g(x: str) -> str:
return x + "z"
runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")
Added in version 0.2.14
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
Is this class serializable?
By design, even if a class inherits from Serializable
, it is not serializable
by default. This is to prevent accidental serialization of objects that should
not be serialized.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the class is serializable. Default is |
get_lc_namespace
classmethod
¶
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
For example, for the class langchain.llms.openai.OpenAI
, the id is
["langchain", "llms", "openai", "OpenAI"]
.
to_json
¶
Serialize the Runnable
to JSON.
RETURNS | DESCRIPTION |
---|---|
SerializedConstructor | SerializedNotImplemented
|
A JSON-serializable representation of the |
to_json_not_implemented
¶
Serialize a "not implemented" object.
RETURNS | DESCRIPTION |
---|---|
SerializedNotImplemented
|
|
configurable_fields
¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
Configure particular Runnable
fields at runtime.
PARAMETER | DESCRIPTION |
---|---|
**kwargs
|
A dictionary of
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If a configuration key is not found in the |
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives
¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnable
objects that can be set at runtime.
PARAMETER | DESCRIPTION |
---|---|
which
|
The
TYPE:
|
default_key
|
The default key to use if no alternative is selected.
TYPE:
|
prefix_keys
|
Whether to prefix the keys with the
TYPE:
|
**kwargs
|
A dictionary of keys to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-3-7-sonnet-20250219"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)