langchain-exa
¶
Modules:
Name | Description |
---|---|
retrievers |
|
tools |
Tool for the Exa Search API. |
Classes:
Name | Description |
---|---|
ExaSearchRetriever |
Exa Search retriever. |
ExaFindSimilarResults |
Tool that queries the Metaphor Search API and gets back json. |
ExaSearchResults |
Exa Search tool. |
ExaSearchRetriever
¶
Bases: BaseRetriever
Exa Search retriever.
Methods:
Name | Description |
---|---|
get_name |
Get the name of the |
get_input_schema |
Get a pydantic model that can be used to validate input to the Runnable. |
get_input_jsonschema |
Get a JSON schema that represents the input to the |
get_output_schema |
Get a pydantic model that can be used to validate output to the |
get_output_jsonschema |
Get a JSON schema that represents the output of the |
config_schema |
The type of config this |
get_config_jsonschema |
Get a JSON schema that represents the config of the |
get_graph |
Return a graph representation of this |
get_prompts |
Return a list of prompts used by this |
__or__ |
Runnable "or" operator. |
__ror__ |
Runnable "reverse-or" operator. |
pipe |
Pipe runnables. |
pick |
Pick keys from the output dict of this |
assign |
Assigns new fields to the dict output of this |
invoke |
Invoke the retriever to get relevant documents. |
ainvoke |
Asynchronously invoke the retriever to get relevant documents. |
batch |
Default implementation runs invoke in parallel using a thread pool executor. |
batch_as_completed |
Run |
abatch |
Default implementation runs |
abatch_as_completed |
Run |
stream |
Default implementation of |
astream |
Default implementation of |
astream_log |
Stream all output from a |
astream_events |
Generate a stream of events. |
transform |
Transform inputs to outputs. |
atransform |
Transform inputs to outputs. |
bind |
Bind arguments to a |
with_config |
Bind config to a |
with_listeners |
Bind lifecycle listeners to a |
with_alisteners |
Bind async lifecycle listeners to a |
with_types |
Bind input and output types to a |
with_retry |
Create a new Runnable that retries the original Runnable on exceptions. |
map |
Return a new |
with_fallbacks |
Add fallbacks to a |
as_tool |
Create a |
__init__ |
|
is_lc_serializable |
Is this class serializable? |
get_lc_namespace |
Get the namespace of the langchain object. |
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the |
to_json_not_implemented |
Serialize a "not implemented" object. |
configurable_fields |
Configure particular |
configurable_alternatives |
Configure alternatives for |
get_relevant_documents |
Retrieve documents relevant to a query. |
aget_relevant_documents |
Asynchronously get documents relevant to a query. |
validate_environment |
Validate the environment. |
Attributes:
Name | Type | Description |
---|---|---|
InputType |
type[Input]
|
Input type. |
OutputType |
type[Output]
|
Output Type. |
input_schema |
type[BaseModel]
|
The type of input this |
output_schema |
type[BaseModel]
|
Output schema. |
config_specs |
list[ConfigurableFieldSpec]
|
List configurable fields for this |
lc_secrets |
dict[str, str]
|
A map of constructor argument names to secret ids. |
lc_attributes |
dict
|
List of attribute names that should be included in the serialized kwargs. |
tags |
list[str] | None
|
Optional list of tags associated with the retriever. Defaults to None. |
metadata |
dict[str, Any] | None
|
Optional metadata associated with the retriever. Defaults to None. |
k |
int
|
The number of search results to return (1 to 100). |
include_domains |
Optional[list[str]]
|
A list of domains to include in the search. |
exclude_domains |
Optional[list[str]]
|
A list of domains to exclude from the search. |
start_crawl_date |
Optional[str]
|
The start date for the crawl (in YYYY-MM-DD format). |
end_crawl_date |
Optional[str]
|
The end date for the crawl (in YYYY-MM-DD format). |
start_published_date |
Optional[str]
|
The start date for when the document was published (in YYYY-MM-DD format). |
end_published_date |
Optional[str]
|
The end date for when the document was published (in YYYY-MM-DD format). |
use_autoprompt |
Optional[bool]
|
Whether to use autoprompt for the search. |
type |
str
|
The type of search, 'keyword', 'neural', or 'auto'. Default: neural |
highlights |
Optional[Union[HighlightsContentsOptions, bool]]
|
Whether to set the page content to the highlights of the results. |
text_contents_options |
Union[TextContentsOptions, dict[str, Any], Literal[True]]
|
How to set the page content of the results. Can be True or a dict with options |
livecrawl |
Optional[Literal['always', 'fallback', 'never']]
|
Option to crawl live webpages if content is not in the index. Options: "always", |
summary |
Optional[Union[bool, dict[str, str]]]
|
Whether to include a summary of the content. Can be a boolean or a dict with a |
InputType
property
¶
InputType: type[Input]
Input type.
The type of input this Runnable
accepts specified as a type annotation.
Raises:
Type | Description |
---|---|
TypeError
|
If the input type cannot be inferred. |
OutputType
property
¶
OutputType: type[Output]
Output Type.
The type of output this Runnable
produces specified as a type annotation.
Raises:
Type | Description |
---|---|
TypeError
|
If the output type cannot be inferred. |
input_schema
property
¶
input_schema: type[BaseModel]
The type of input this Runnable
accepts specified as a pydantic model.
output_schema
property
¶
output_schema: type[BaseModel]
Output schema.
The type of output this Runnable
produces specified as a pydantic model.
config_specs
property
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable
.
lc_secrets
property
¶
A map of constructor argument names to secret ids.
For example,
lc_attributes
property
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor. Default is an empty dictionary.
tags
class-attribute
instance-attribute
¶
Optional list of tags associated with the retriever. Defaults to None.
These tags will be associated with each call to this retriever,
and passed as arguments to the handlers defined in callbacks
.
You can use these to eg identify a specific instance of a retriever with its
use case.
metadata
class-attribute
instance-attribute
¶
Optional metadata associated with the retriever. Defaults to None.
This metadata will be associated with each call to this retriever,
and passed as arguments to the handlers defined in callbacks
.
You can use these to eg identify a specific instance of a retriever with its
use case.
k
class-attribute
instance-attribute
¶
k: int = 10
The number of search results to return (1 to 100).
include_domains
class-attribute
instance-attribute
¶
A list of domains to include in the search.
exclude_domains
class-attribute
instance-attribute
¶
A list of domains to exclude from the search.
start_crawl_date
class-attribute
instance-attribute
¶
The start date for the crawl (in YYYY-MM-DD format).
end_crawl_date
class-attribute
instance-attribute
¶
The end date for the crawl (in YYYY-MM-DD format).
start_published_date
class-attribute
instance-attribute
¶
The start date for when the document was published (in YYYY-MM-DD format).
end_published_date
class-attribute
instance-attribute
¶
The end date for when the document was published (in YYYY-MM-DD format).
use_autoprompt
class-attribute
instance-attribute
¶
Whether to use autoprompt for the search.
type
class-attribute
instance-attribute
¶
type: str = 'neural'
The type of search, 'keyword', 'neural', or 'auto'. Default: neural
highlights
class-attribute
instance-attribute
¶
Whether to set the page content to the highlights of the results.
text_contents_options
class-attribute
instance-attribute
¶
How to set the page content of the results. Can be True or a dict with options like max_characters.
livecrawl
class-attribute
instance-attribute
¶
Option to crawl live webpages if content is not in the index. Options: "always", "fallback", "never".
summary
class-attribute
instance-attribute
¶
Whether to include a summary of the content. Can be a boolean or a dict with a custom query.
get_name
¶
get_input_schema
¶
get_input_schema(
config: RunnableConfig | None = None,
) -> type[BaseModel]
Get a pydantic model that can be used to validate input to the Runnable.
Runnable
s that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic input schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an input schema for a specific configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
Type | Description |
---|---|
type[BaseModel]
|
A pydantic model that can be used to validate input. |
get_input_jsonschema
¶
Get a JSON schema that represents the input to the Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
A JSON schema that represents the input to the |
Example
Added in version 0.3.0
get_output_schema
¶
get_output_schema(
config: RunnableConfig | None = None,
) -> type[BaseModel]
Get a pydantic model that can be used to validate output to the Runnable
.
Runnable
s that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic output schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an output schema for a specific configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
Type | Description |
---|---|
type[BaseModel]
|
A pydantic model that can be used to validate output. |
get_output_jsonschema
¶
Get a JSON schema that represents the output of the Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
A JSON schema that represents the output of the |
Example
Added in version 0.3.0
config_schema
¶
The type of config this Runnable
accepts specified as a pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives
methods.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include
|
Sequence[str] | None
|
A list of fields to include in the config schema. |
None
|
Returns:
Type | Description |
---|---|
type[BaseModel]
|
A pydantic model that can be used to validate config. |
get_config_jsonschema
¶
Get a JSON schema that represents the config of the Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include
|
Sequence[str] | None
|
A list of fields to include in the config schema. |
None
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
A JSON schema that represents the config of the |
Added in version 0.3.0
get_graph
¶
Return a graph representation of this Runnable
.
get_prompts
¶
get_prompts(
config: RunnableConfig | None = None,
) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable
.
__or__
¶
__or__(
other: (
Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[
[AsyncIterator[Any]], AsyncIterator[Other]
]
| Callable[[Any], Other]
| Mapping[
str,
Runnable[Any, Other]
| Callable[[Any], Other]
| Any,
]
),
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other
|
Runnable[Any, Other] | Callable[[Iterator[Any]], Iterator[Other]] | Callable[[AsyncIterator[Any]], AsyncIterator[Other]] | Callable[[Any], Other] | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any]
|
Another |
required |
Returns:
Type | Description |
---|---|
RunnableSerializable[Input, Other]
|
A new |
__ror__
¶
__ror__(
other: (
Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[
[AsyncIterator[Other]], AsyncIterator[Any]
]
| Callable[[Other], Any]
| Mapping[
str,
Runnable[Other, Any]
| Callable[[Other], Any]
| Any,
]
),
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other
|
Runnable[Other, Any] | Callable[[Iterator[Other]], Iterator[Any]] | Callable[[AsyncIterator[Other]], AsyncIterator[Any]] | Callable[[Other], Any] | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any]
|
Another |
required |
Returns:
Type | Description |
---|---|
RunnableSerializable[Other, Output]
|
A new |
pipe
¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other],
name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe runnables.
Compose this Runnable
with Runnable
-like objects to make a
RunnableSequence
.
Equivalent to RunnableSequence(self, *others)
or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*others
|
Runnable[Any, Other] | Callable[[Any], Other]
|
Other |
()
|
name
|
str | None
|
An optional name for the resulting |
None
|
Returns:
Type | Description |
---|---|
RunnableSerializable[Input, Other]
|
A new |
pick
¶
Pick keys from the output dict of this Runnable
.
Pick single key:
```python
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
```
Pick list of keys:
```python
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(
str=as_str, json=as_json, bytes=RunnableLambda(as_bytes)
)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
```
Parameters:
Name | Type | Description | Default |
---|---|---|---|
keys
|
str | list[str]
|
A key or list of keys to pick from the output dict. |
required |
Returns:
Type | Description |
---|---|
RunnableSerializable[Any, Any]
|
a new |
assign
¶
assign(
**kwargs: (
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[
str,
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any],
]
),
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable
.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
llm = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | llm | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | llm)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs
|
Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any] | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]]
|
A mapping of keys to |
{}
|
Returns:
Type | Description |
---|---|
RunnableSerializable[Any, Any]
|
A new |
invoke
¶
Invoke the retriever to get relevant documents.
Main entry point for synchronous retriever invocations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
str
|
The query string. |
required |
config
|
RunnableConfig | None
|
Configuration for the retriever. Defaults to None. |
None
|
kwargs
|
Any
|
Additional arguments to pass to the retriever. |
{}
|
Returns:
Type | Description |
---|---|
list[Document]
|
List of relevant documents. |
Examples:
.. code-block:: python
retriever.invoke("query")
ainvoke
async
¶
Asynchronously invoke the retriever to get relevant documents.
Main entry point for asynchronous retriever invocations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
str
|
The query string. |
required |
config
|
RunnableConfig | None
|
Configuration for the retriever. Defaults to None. |
None
|
kwargs
|
Any
|
Additional arguments to pass to the retriever. |
{}
|
Returns:
Type | Description |
---|---|
list[Document]
|
List of relevant documents. |
Examples:
.. code-block:: python
await retriever.ainvoke("query")
batch
¶
batch(
inputs: list[Input],
config: (
RunnableConfig | list[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs
|
list[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | list[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
**kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Returns:
Type | Description |
---|---|
list[Output]
|
A list of outputs from the |
batch_as_completed
¶
batch_as_completed(
inputs: Sequence[Input],
config: (
RunnableConfig | Sequence[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> Iterator[tuple[int, Output | Exception]]
Run invoke
in parallel on a list of inputs.
Yields results as they complete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs
|
Sequence[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | Sequence[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
**kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
tuple[int, Output | Exception]
|
Tuples of the index of the input and the output from the |
abatch
async
¶
abatch(
inputs: list[Input],
config: (
RunnableConfig | list[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs ainvoke
in parallel using asyncio.gather
.
The default implementation of batch
works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs
|
list[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | list[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
**kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Returns:
Type | Description |
---|---|
list[Output]
|
A list of outputs from the |
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence[Input],
config: (
RunnableConfig | Sequence[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke
in parallel on a list of inputs.
Yields results as they complete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs
|
Sequence[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | Sequence[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
A tuple of the index of the input and the output from the |
stream
¶
stream(
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> Iterator[Output]
Default implementation of stream
, which calls invoke
.
Subclasses should override this method if they support streaming output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Input
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
Output
|
The output of the |
astream
async
¶
astream(
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> AsyncIterator[Output]
Default implementation of astream
, which calls ainvoke
.
Subclasses should override this method if they support streaming output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Input
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[Output]
|
The output of the |
astream_log
async
¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable
, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Any
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
diff
|
bool
|
Whether to yield diffs between each step or the current state. |
True
|
with_streamed_output_list
|
bool
|
Whether to yield the |
True
|
include_names
|
Sequence[str] | None
|
Only include logs with these names. |
None
|
include_types
|
Sequence[str] | None
|
Only include logs with these types. |
None
|
include_tags
|
Sequence[str] | None
|
Only include logs with these tags. |
None
|
exclude_names
|
Sequence[str] | None
|
Exclude logs with these names. |
None
|
exclude_types
|
Sequence[str] | None
|
Exclude logs with these types. |
None
|
exclude_tags
|
Sequence[str] | None
|
Exclude logs with these tags. |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
A |
astream_events
async
¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvents
that provide real-time information
about the progress of the Runnable
, including StreamEvents
from intermediate
results.
A StreamEvent
is a dictionary with the following schema:
event
: str - Event names are of the format:on_[runnable_type]_(start|stream|end)
.name
: str - The name of theRunnable
that generated the event.run_id
: str - randomly generated ID associated with the given execution of theRunnable
that emitted the event. A childRunnable
that gets invoked as part of the execution of a parentRunnable
is assigned its own unique ID.parent_ids
: list[str] - The IDs of the parent runnables that generated the event. The rootRunnable
will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags
: Optional[list[str]] - The tags of theRunnable
that generated the event.metadata
: Optional[dict[str, Any]] - The metadata of theRunnable
that generated the event.data
: dict[str, Any]
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| event | name | chunk | input | output |
+==========================+==================+=====================================+===================================================+=====================================================+
| on_chat_model_start
| [model name] | | {"messages": [[SystemMessage, HumanMessage]]}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chat_model_stream
| [model name] | AIMessageChunk(content="hello")
| | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chat_model_end
| [model name] | | {"messages": [[SystemMessage, HumanMessage]]}
| AIMessageChunk(content="hello world")
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_start
| [model name] | | {'input': 'hello'}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_stream
| [model name] | 'Hello'
| | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_end
| [model name] | | 'Hello human!'
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_start
| format_docs | | | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_stream
| format_docs | 'hello world!, goodbye world!'
| | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_end
| format_docs | | [Document(...)]
| 'hello world!, goodbye world!'
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_tool_start
| some_tool | | {"x": 1, "y": "2"}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_tool_end
| some_tool | | | {"x": 1, "y": "2"}
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_retriever_start
| [retriever name] | | {"query": "hello"}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_retriever_end
| [retriever name] | | {"query": "hello"}
| [Document(...), ..]
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_prompt_start
| [template_name] | | {"question": "hello"}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_prompt_end
| [template_name] | | {"question": "hello"}
| ChatPromptValue(messages: [SystemMessage, ...])
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
+-----------+------+-----------------------------------------------------------------------------------------------------------+ | Attribute | Type | Description | +===========+======+===========================================================================================================+ | name | str | A user defined name for the event. | +-----------+------+-----------------------------------------------------------------------------------------------------------+ | data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. | +-----------+------+-----------------------------------------------------------------------------------------------------------+
Here are declarations associated with the standard events shown above:
format_docs
:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool
:
prompt
:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
Example: Dispatch Custom Event
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Any
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
version
|
Literal['v1', 'v2']
|
The version of the schema to use either |
'v2'
|
include_names
|
Sequence[str] | None
|
Only include events from |
None
|
include_types
|
Sequence[str] | None
|
Only include events from |
None
|
include_tags
|
Sequence[str] | None
|
Only include events from |
None
|
exclude_names
|
Sequence[str] | None
|
Exclude events from |
None
|
exclude_types
|
Sequence[str] | None
|
Exclude events from |
None
|
exclude_tags
|
Sequence[str] | None
|
Exclude events from |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[StreamEvent]
|
An async stream of |
Raises:
Type | Description |
---|---|
NotImplementedError
|
If the version is not |
transform
¶
transform(
input: Iterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream
.
Subclasses should override this method if they can start producing output while input is still being generated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Iterator[Input]
|
An iterator of inputs to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
Output
|
The output of the |
atransform
async
¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream
.
Subclasses should override this method if they can start producing output while input is still being generated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
AsyncIterator[Input]
|
An async iterator of inputs to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[Output]
|
The output of the |
bind
¶
bind(**kwargs: Any) -> Runnable[Input, Output]
Bind arguments to a Runnable
, returning a new Runnable
.
Useful when a Runnable
in a chain requires an argument that is not
in the output of the previous Runnable
or included in the user input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs
|
Any
|
The arguments to bind to the |
{}
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
llm = ChatOllama(model="llama3.1")
# Without bind.
chain = llm | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind.
chain = llm.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable
, returning a new Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
The config to bind to the |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new |
with_listeners
¶
with_listeners(
*,
on_start: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None,
on_end: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None,
on_error: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable
, returning a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
on_start
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called before the |
None
|
on_end
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called after the |
None
|
on_error
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called if the |
None
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable
.
Returns a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
on_start
|
AsyncListener | None
|
Called asynchronously before the |
None
|
on_end
|
AsyncListener | None
|
Called asynchronously after the |
None
|
on_error
|
AsyncListener | None
|
Called asynchronously if the |
None
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*,
input_type: type[Input] | None = None,
output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable
, returning a new Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_type
|
type[Input] | None
|
The input type to bind to the |
None
|
output_type
|
type[Output] | None
|
The output type to bind to the |
None
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new Runnable with the types bound. |
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple[
type[BaseException], ...
] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: (
ExponentialJitterParams | None
) = None,
stop_after_attempt: int = 3
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
retry_if_exception_type
|
tuple[type[BaseException], ...]
|
A tuple of exception types to retry on. Defaults to (Exception,). |
(Exception,)
|
wait_exponential_jitter
|
bool
|
Whether to add jitter to the wait time between retries. Defaults to True. |
True
|
stop_after_attempt
|
int
|
The maximum number of attempts to make before giving up. Defaults to 3. |
3
|
exponential_jitter_params
|
ExponentialJitterParams | None
|
Parameters for
|
None
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new Runnable that retries the original Runnable on exceptions. |
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map
¶
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[
type[BaseException], ...
] = (Exception,),
exception_key: str | None = None
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable
, returning a new Runnable
.
The new Runnable
will try the original Runnable
, and then each fallback
in order, upon failures.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fallbacks
|
Sequence[Runnable[Input, Output]]
|
A sequence of runnables to try if the original |
required |
exceptions_to_handle
|
tuple[type[BaseException], ...]
|
A tuple of exception types to handle.
Defaults to |
(Exception,)
|
exception_key
|
str | None
|
If string is specified then handled exceptions will be passed
to fallbacks as part of the input under the specified key.
If None, exceptions will not be passed to fallbacks.
If used, the base |
None
|
Returns:
Type | Description |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
RunnableWithFallbacks[Input, Output]
|
fallback in order, upon failures. |
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fallbacks
|
Sequence[Runnable[Input, Output]]
|
A sequence of runnables to try if the original |
required |
exceptions_to_handle
|
tuple[type[BaseException], ...]
|
A tuple of exception types to handle. |
(Exception,)
|
exception_key
|
str | None
|
If string is specified then handled exceptions will be passed
to fallbacks as part of the input under the specified key.
If None, exceptions will not be passed to fallbacks.
If used, the base |
None
|
Returns:
Type | Description |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
RunnableWithFallbacks[Input, Output]
|
fallback in order, upon failures. |
as_tool
¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None
) -> BaseTool
Create a BaseTool
from a Runnable
.
as_tool
will instantiate a BaseTool
with a name, description, and
args_schema
from a Runnable
. Where possible, schemas are inferred
from runnable.get_input_schema
. Alternatively (e.g., if the
Runnable
takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema
. You can also
pass arg_types
to just specify the required arguments and their types.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args_schema
|
type[BaseModel] | None
|
The schema for the tool. Defaults to None. |
None
|
name
|
str | None
|
The name of the tool. Defaults to None. |
None
|
description
|
str | None
|
The description of the tool. Defaults to None. |
None
|
arg_types
|
dict[str, type] | None
|
A dictionary of argument names to types. Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
BaseTool
|
A |
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via args_schema
:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via arg_types
:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
from langchain_core.runnables import RunnableLambda
def f(x: str) -> str:
return x + "a"
def g(x: str) -> str:
return x + "z"
runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")
Added in version 0.2.14
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
Is this class serializable?
By design, even if a class inherits from Serializable, it is not serializable by default. This is to prevent accidental serialization of objects that should not be serialized.
Returns:
Type | Description |
---|---|
bool
|
Whether the class is serializable. Default is False. |
get_lc_namespace
classmethod
¶
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path
to the object.
For example, for the class langchain.llms.openai.OpenAI
, the id is
["langchain", "llms", "openai", "OpenAI"].
to_json
¶
Serialize the Runnable
to JSON.
Returns:
Type | Description |
---|---|
SerializedConstructor | SerializedNotImplemented
|
A JSON-serializable representation of the |
to_json_not_implemented
¶
Serialize a "not implemented" object.
Returns:
Type | Description |
---|---|
SerializedNotImplemented
|
SerializedNotImplemented. |
configurable_fields
¶
Configure particular Runnable
fields at runtime.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs
|
AnyConfigurableField
|
A dictionary of |
{}
|
Raises:
Type | Description |
---|---|
ValueError
|
If a configuration key is not found in the |
Returns:
Type | Description |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives
¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: (
Runnable[Input, Output]
| Callable[[], Runnable[Input, Output]]
)
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnables
that can be set at runtime.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
which
|
ConfigurableField
|
The |
required |
default_key
|
str
|
The default key to use if no alternative is selected.
Defaults to |
'default'
|
prefix_keys
|
bool
|
Whether to prefix the keys with the |
False
|
**kwargs
|
Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]
|
A dictionary of keys to |
{}
|
Returns:
Type | Description |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-3-7-sonnet-20250219"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
get_relevant_documents
¶
get_relevant_documents(
query: str,
*,
callbacks: Callbacks = None,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
**kwargs: Any
) -> list[Document]
Retrieve documents relevant to a query.
Users should favor using .invoke
or .batch
rather than
get_relevant_documents directly
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query
|
str
|
string to find relevant documents for. |
required |
callbacks
|
Callbacks
|
Callback manager or list of callbacks. Defaults to None. |
None
|
tags
|
list[str] | None
|
Optional list of tags associated with the retriever.
These tags will be associated with each call to this retriever,
and passed as arguments to the handlers defined in |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata associated with the retriever.
This metadata will be associated with each call to this retriever,
and passed as arguments to the handlers defined in |
None
|
run_name
|
str | None
|
Optional name for the run. Defaults to None. |
None
|
kwargs
|
Any
|
Additional arguments to pass to the retriever. |
{}
|
Returns:
Type | Description |
---|---|
list[Document]
|
List of relevant documents. |
aget_relevant_documents
async
¶
aget_relevant_documents(
query: str,
*,
callbacks: Callbacks = None,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
**kwargs: Any
) -> list[Document]
Asynchronously get documents relevant to a query.
Users should favor using .ainvoke
or .abatch
rather than
aget_relevant_documents directly
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query
|
str
|
string to find relevant documents for. |
required |
callbacks
|
Callbacks
|
Callback manager or list of callbacks. |
None
|
tags
|
list[str] | None
|
Optional list of tags associated with the retriever.
These tags will be associated with each call to this retriever,
and passed as arguments to the handlers defined in |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata associated with the retriever.
This metadata will be associated with each call to this retriever,
and passed as arguments to the handlers defined in |
None
|
run_name
|
str | None
|
Optional name for the run. Defaults to None. |
None
|
kwargs
|
Any
|
Additional arguments to pass to the retriever. |
{}
|
Returns:
Type | Description |
---|---|
list[Document]
|
List of relevant documents. |
ExaFindSimilarResults
¶
Bases: BaseTool
Tool that queries the Metaphor Search API and gets back json.
Methods:
Name | Description |
---|---|
get_name |
Get the name of the |
get_input_schema |
The tool's input schema. |
get_input_jsonschema |
Get a JSON schema that represents the input to the |
get_output_schema |
Get a pydantic model that can be used to validate output to the |
get_output_jsonschema |
Get a JSON schema that represents the output of the |
config_schema |
The type of config this |
get_config_jsonschema |
Get a JSON schema that represents the config of the |
get_graph |
Return a graph representation of this |
get_prompts |
Return a list of prompts used by this |
__or__ |
Runnable "or" operator. |
__ror__ |
Runnable "reverse-or" operator. |
pipe |
Pipe runnables. |
pick |
Pick keys from the output dict of this |
assign |
Assigns new fields to the dict output of this |
batch |
Default implementation runs invoke in parallel using a thread pool executor. |
batch_as_completed |
Run |
abatch |
Default implementation runs |
abatch_as_completed |
Run |
stream |
Default implementation of |
astream |
Default implementation of |
astream_log |
Stream all output from a |
astream_events |
Generate a stream of events. |
transform |
Transform inputs to outputs. |
atransform |
Transform inputs to outputs. |
bind |
Bind arguments to a |
with_config |
Bind config to a |
with_listeners |
Bind lifecycle listeners to a |
with_alisteners |
Bind async lifecycle listeners to a |
with_types |
Bind input and output types to a |
with_retry |
Create a new Runnable that retries the original Runnable on exceptions. |
map |
Return a new |
with_fallbacks |
Add fallbacks to a |
as_tool |
Create a |
__init__ |
Initialize the tool. |
is_lc_serializable |
Is this class serializable? |
get_lc_namespace |
Get the namespace of the langchain object. |
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the |
to_json_not_implemented |
Serialize a "not implemented" object. |
configurable_fields |
Configure particular |
configurable_alternatives |
Configure alternatives for |
__init_subclass__ |
Validate the tool class definition during subclass creation. |
run |
Run the tool. |
arun |
Run the tool asynchronously. |
validate_environment |
Validate the environment. |
Attributes:
Name | Type | Description |
---|---|---|
InputType |
type[Input]
|
Input type. |
OutputType |
type[Output]
|
Output Type. |
input_schema |
type[BaseModel]
|
The type of input this |
output_schema |
type[BaseModel]
|
Output schema. |
config_specs |
list[ConfigurableFieldSpec]
|
List configurable fields for this |
lc_secrets |
dict[str, str]
|
A map of constructor argument names to secret ids. |
lc_attributes |
dict
|
List of attribute names that should be included in the serialized kwargs. |
args_schema |
Annotated[ArgsSchema | None, SkipValidation()]
|
Pydantic model class to validate and parse the tool's input arguments. |
return_direct |
bool
|
Whether to return the tool's output directly. |
verbose |
bool
|
Whether to log the tool's progress. |
callbacks |
Callbacks
|
Callbacks to be called during tool execution. |
tags |
list[str] | None
|
Optional list of tags associated with the tool. Defaults to None. |
metadata |
dict[str, Any] | None
|
Optional metadata associated with the tool. Defaults to None. |
handle_tool_error |
bool | str | Callable[[ToolException], str] | None
|
Handle the content of the ToolException thrown. |
handle_validation_error |
bool | str | Callable[[ValidationError | ValidationError], str] | None
|
Handle the content of the ValidationError thrown. |
response_format |
Literal['content', 'content_and_artifact']
|
The tool response format. Defaults to 'content'. |
is_single_input |
bool
|
Check if the tool accepts only a single input argument. |
args |
dict
|
Get the tool's input arguments schema. |
tool_call_schema |
ArgsSchema
|
Get the schema for tool calls, excluding injected arguments. |
InputType
property
¶
InputType: type[Input]
Input type.
The type of input this Runnable
accepts specified as a type annotation.
Raises:
Type | Description |
---|---|
TypeError
|
If the input type cannot be inferred. |
OutputType
property
¶
OutputType: type[Output]
Output Type.
The type of output this Runnable
produces specified as a type annotation.
Raises:
Type | Description |
---|---|
TypeError
|
If the output type cannot be inferred. |
input_schema
property
¶
input_schema: type[BaseModel]
The type of input this Runnable
accepts specified as a pydantic model.
output_schema
property
¶
output_schema: type[BaseModel]
Output schema.
The type of output this Runnable
produces specified as a pydantic model.
config_specs
property
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable
.
lc_secrets
property
¶
A map of constructor argument names to secret ids.
For example,
lc_attributes
property
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor. Default is an empty dictionary.
args_schema
class-attribute
instance-attribute
¶
args_schema: Annotated[
ArgsSchema | None, SkipValidation()
] = Field(default=None, description="The tool schema.")
Pydantic model class to validate and parse the tool's input arguments.
Args schema should be either:
- A subclass of pydantic.BaseModel.
- A subclass of pydantic.v1.BaseModel if accessing v1 namespace in pydantic 2
- a JSON schema dict
return_direct
class-attribute
instance-attribute
¶
return_direct: bool = False
Whether to return the tool's output directly.
Setting this to True means that after the tool is called, the AgentExecutor will stop looping.
verbose
class-attribute
instance-attribute
¶
verbose: bool = False
Whether to log the tool's progress.
callbacks
class-attribute
instance-attribute
¶
Callbacks to be called during tool execution.
tags
class-attribute
instance-attribute
¶
Optional list of tags associated with the tool. Defaults to None.
These tags will be associated with each call to this tool,
and passed as arguments to the handlers defined in callbacks
.
You can use these to eg identify a specific instance of a tool with its use case.
metadata
class-attribute
instance-attribute
¶
Optional metadata associated with the tool. Defaults to None.
This metadata will be associated with each call to this tool,
and passed as arguments to the handlers defined in callbacks
.
You can use these to eg identify a specific instance of a tool with its use case.
handle_tool_error
class-attribute
instance-attribute
¶
Handle the content of the ToolException thrown.
handle_validation_error
class-attribute
instance-attribute
¶
handle_validation_error: (
bool
| str
| Callable[[ValidationError | ValidationError], str]
| None
) = False
Handle the content of the ValidationError thrown.
response_format
class-attribute
instance-attribute
¶
response_format: Literal[
"content", "content_and_artifact"
] = "content"
The tool response format. Defaults to 'content'.
If "content" then the output of the tool is interpreted as the contents of a ToolMessage. If "content_and_artifact" then the output is expected to be a two-tuple corresponding to the (content, artifact) of a ToolMessage.
is_single_input
property
¶
is_single_input: bool
Check if the tool accepts only a single input argument.
Returns:
Type | Description |
---|---|
bool
|
True if the tool has only one input argument, False otherwise. |
args
property
¶
args: dict
Get the tool's input arguments schema.
Returns:
Type | Description |
---|---|
dict
|
Dictionary containing the tool's argument properties. |
tool_call_schema
property
¶
Get the schema for tool calls, excluding injected arguments.
Returns:
Type | Description |
---|---|
ArgsSchema
|
The schema that should be used for tool calls from language models. |
get_name
¶
get_input_schema
¶
get_input_schema(
config: RunnableConfig | None = None,
) -> type[BaseModel]
The tool's input schema.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
The configuration for the tool. |
None
|
Returns:
Type | Description |
---|---|
type[BaseModel]
|
The input schema for the tool. |
get_input_jsonschema
¶
Get a JSON schema that represents the input to the Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
A JSON schema that represents the input to the |
Example
Added in version 0.3.0
get_output_schema
¶
get_output_schema(
config: RunnableConfig | None = None,
) -> type[BaseModel]
Get a pydantic model that can be used to validate output to the Runnable
.
Runnable
s that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic output schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an output schema for a specific configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
Type | Description |
---|---|
type[BaseModel]
|
A pydantic model that can be used to validate output. |
get_output_jsonschema
¶
Get a JSON schema that represents the output of the Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
A JSON schema that represents the output of the |
Example
Added in version 0.3.0
config_schema
¶
The type of config this Runnable
accepts specified as a pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives
methods.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include
|
Sequence[str] | None
|
A list of fields to include in the config schema. |
None
|
Returns:
Type | Description |
---|---|
type[BaseModel]
|
A pydantic model that can be used to validate config. |
get_config_jsonschema
¶
Get a JSON schema that represents the config of the Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include
|
Sequence[str] | None
|
A list of fields to include in the config schema. |
None
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
A JSON schema that represents the config of the |
Added in version 0.3.0
get_graph
¶
Return a graph representation of this Runnable
.
get_prompts
¶
get_prompts(
config: RunnableConfig | None = None,
) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable
.
__or__
¶
__or__(
other: (
Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[
[AsyncIterator[Any]], AsyncIterator[Other]
]
| Callable[[Any], Other]
| Mapping[
str,
Runnable[Any, Other]
| Callable[[Any], Other]
| Any,
]
),
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other
|
Runnable[Any, Other] | Callable[[Iterator[Any]], Iterator[Other]] | Callable[[AsyncIterator[Any]], AsyncIterator[Other]] | Callable[[Any], Other] | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any]
|
Another |
required |
Returns:
Type | Description |
---|---|
RunnableSerializable[Input, Other]
|
A new |
__ror__
¶
__ror__(
other: (
Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[
[AsyncIterator[Other]], AsyncIterator[Any]
]
| Callable[[Other], Any]
| Mapping[
str,
Runnable[Other, Any]
| Callable[[Other], Any]
| Any,
]
),
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other
|
Runnable[Other, Any] | Callable[[Iterator[Other]], Iterator[Any]] | Callable[[AsyncIterator[Other]], AsyncIterator[Any]] | Callable[[Other], Any] | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any]
|
Another |
required |
Returns:
Type | Description |
---|---|
RunnableSerializable[Other, Output]
|
A new |
pipe
¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other],
name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe runnables.
Compose this Runnable
with Runnable
-like objects to make a
RunnableSequence
.
Equivalent to RunnableSequence(self, *others)
or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*others
|
Runnable[Any, Other] | Callable[[Any], Other]
|
Other |
()
|
name
|
str | None
|
An optional name for the resulting |
None
|
Returns:
Type | Description |
---|---|
RunnableSerializable[Input, Other]
|
A new |
pick
¶
Pick keys from the output dict of this Runnable
.
Pick single key:
```python
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
```
Pick list of keys:
```python
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(
str=as_str, json=as_json, bytes=RunnableLambda(as_bytes)
)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
```
Parameters:
Name | Type | Description | Default |
---|---|---|---|
keys
|
str | list[str]
|
A key or list of keys to pick from the output dict. |
required |
Returns:
Type | Description |
---|---|
RunnableSerializable[Any, Any]
|
a new |
assign
¶
assign(
**kwargs: (
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[
str,
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any],
]
),
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable
.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
llm = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | llm | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | llm)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs
|
Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any] | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]]
|
A mapping of keys to |
{}
|
Returns:
Type | Description |
---|---|
RunnableSerializable[Any, Any]
|
A new |
batch
¶
batch(
inputs: list[Input],
config: (
RunnableConfig | list[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs
|
list[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | list[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
**kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Returns:
Type | Description |
---|---|
list[Output]
|
A list of outputs from the |
batch_as_completed
¶
batch_as_completed(
inputs: Sequence[Input],
config: (
RunnableConfig | Sequence[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> Iterator[tuple[int, Output | Exception]]
Run invoke
in parallel on a list of inputs.
Yields results as they complete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs
|
Sequence[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | Sequence[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
**kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
tuple[int, Output | Exception]
|
Tuples of the index of the input and the output from the |
abatch
async
¶
abatch(
inputs: list[Input],
config: (
RunnableConfig | list[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs ainvoke
in parallel using asyncio.gather
.
The default implementation of batch
works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs
|
list[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | list[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
**kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Returns:
Type | Description |
---|---|
list[Output]
|
A list of outputs from the |
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence[Input],
config: (
RunnableConfig | Sequence[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke
in parallel on a list of inputs.
Yields results as they complete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs
|
Sequence[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | Sequence[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
A tuple of the index of the input and the output from the |
stream
¶
stream(
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> Iterator[Output]
Default implementation of stream
, which calls invoke
.
Subclasses should override this method if they support streaming output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Input
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
Output
|
The output of the |
astream
async
¶
astream(
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> AsyncIterator[Output]
Default implementation of astream
, which calls ainvoke
.
Subclasses should override this method if they support streaming output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Input
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[Output]
|
The output of the |
astream_log
async
¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable
, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Any
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
diff
|
bool
|
Whether to yield diffs between each step or the current state. |
True
|
with_streamed_output_list
|
bool
|
Whether to yield the |
True
|
include_names
|
Sequence[str] | None
|
Only include logs with these names. |
None
|
include_types
|
Sequence[str] | None
|
Only include logs with these types. |
None
|
include_tags
|
Sequence[str] | None
|
Only include logs with these tags. |
None
|
exclude_names
|
Sequence[str] | None
|
Exclude logs with these names. |
None
|
exclude_types
|
Sequence[str] | None
|
Exclude logs with these types. |
None
|
exclude_tags
|
Sequence[str] | None
|
Exclude logs with these tags. |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
A |
astream_events
async
¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvents
that provide real-time information
about the progress of the Runnable
, including StreamEvents
from intermediate
results.
A StreamEvent
is a dictionary with the following schema:
event
: str - Event names are of the format:on_[runnable_type]_(start|stream|end)
.name
: str - The name of theRunnable
that generated the event.run_id
: str - randomly generated ID associated with the given execution of theRunnable
that emitted the event. A childRunnable
that gets invoked as part of the execution of a parentRunnable
is assigned its own unique ID.parent_ids
: list[str] - The IDs of the parent runnables that generated the event. The rootRunnable
will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags
: Optional[list[str]] - The tags of theRunnable
that generated the event.metadata
: Optional[dict[str, Any]] - The metadata of theRunnable
that generated the event.data
: dict[str, Any]
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| event | name | chunk | input | output |
+==========================+==================+=====================================+===================================================+=====================================================+
| on_chat_model_start
| [model name] | | {"messages": [[SystemMessage, HumanMessage]]}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chat_model_stream
| [model name] | AIMessageChunk(content="hello")
| | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chat_model_end
| [model name] | | {"messages": [[SystemMessage, HumanMessage]]}
| AIMessageChunk(content="hello world")
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_start
| [model name] | | {'input': 'hello'}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_stream
| [model name] | 'Hello'
| | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_end
| [model name] | | 'Hello human!'
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_start
| format_docs | | | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_stream
| format_docs | 'hello world!, goodbye world!'
| | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_end
| format_docs | | [Document(...)]
| 'hello world!, goodbye world!'
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_tool_start
| some_tool | | {"x": 1, "y": "2"}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_tool_end
| some_tool | | | {"x": 1, "y": "2"}
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_retriever_start
| [retriever name] | | {"query": "hello"}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_retriever_end
| [retriever name] | | {"query": "hello"}
| [Document(...), ..]
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_prompt_start
| [template_name] | | {"question": "hello"}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_prompt_end
| [template_name] | | {"question": "hello"}
| ChatPromptValue(messages: [SystemMessage, ...])
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
+-----------+------+-----------------------------------------------------------------------------------------------------------+ | Attribute | Type | Description | +===========+======+===========================================================================================================+ | name | str | A user defined name for the event. | +-----------+------+-----------------------------------------------------------------------------------------------------------+ | data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. | +-----------+------+-----------------------------------------------------------------------------------------------------------+
Here are declarations associated with the standard events shown above:
format_docs
:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool
:
prompt
:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
Example: Dispatch Custom Event
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Any
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
version
|
Literal['v1', 'v2']
|
The version of the schema to use either |
'v2'
|
include_names
|
Sequence[str] | None
|
Only include events from |
None
|
include_types
|
Sequence[str] | None
|
Only include events from |
None
|
include_tags
|
Sequence[str] | None
|
Only include events from |
None
|
exclude_names
|
Sequence[str] | None
|
Exclude events from |
None
|
exclude_types
|
Sequence[str] | None
|
Exclude events from |
None
|
exclude_tags
|
Sequence[str] | None
|
Exclude events from |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[StreamEvent]
|
An async stream of |
Raises:
Type | Description |
---|---|
NotImplementedError
|
If the version is not |
transform
¶
transform(
input: Iterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream
.
Subclasses should override this method if they can start producing output while input is still being generated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Iterator[Input]
|
An iterator of inputs to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
Output
|
The output of the |
atransform
async
¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream
.
Subclasses should override this method if they can start producing output while input is still being generated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
AsyncIterator[Input]
|
An async iterator of inputs to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[Output]
|
The output of the |
bind
¶
bind(**kwargs: Any) -> Runnable[Input, Output]
Bind arguments to a Runnable
, returning a new Runnable
.
Useful when a Runnable
in a chain requires an argument that is not
in the output of the previous Runnable
or included in the user input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs
|
Any
|
The arguments to bind to the |
{}
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
llm = ChatOllama(model="llama3.1")
# Without bind.
chain = llm | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind.
chain = llm.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable
, returning a new Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
The config to bind to the |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new |
with_listeners
¶
with_listeners(
*,
on_start: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None,
on_end: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None,
on_error: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable
, returning a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
on_start
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called before the |
None
|
on_end
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called after the |
None
|
on_error
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called if the |
None
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable
.
Returns a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
on_start
|
AsyncListener | None
|
Called asynchronously before the |
None
|
on_end
|
AsyncListener | None
|
Called asynchronously after the |
None
|
on_error
|
AsyncListener | None
|
Called asynchronously if the |
None
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*,
input_type: type[Input] | None = None,
output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable
, returning a new Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_type
|
type[Input] | None
|
The input type to bind to the |
None
|
output_type
|
type[Output] | None
|
The output type to bind to the |
None
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new Runnable with the types bound. |
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple[
type[BaseException], ...
] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: (
ExponentialJitterParams | None
) = None,
stop_after_attempt: int = 3
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
retry_if_exception_type
|
tuple[type[BaseException], ...]
|
A tuple of exception types to retry on. Defaults to (Exception,). |
(Exception,)
|
wait_exponential_jitter
|
bool
|
Whether to add jitter to the wait time between retries. Defaults to True. |
True
|
stop_after_attempt
|
int
|
The maximum number of attempts to make before giving up. Defaults to 3. |
3
|
exponential_jitter_params
|
ExponentialJitterParams | None
|
Parameters for
|
None
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new Runnable that retries the original Runnable on exceptions. |
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map
¶
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[
type[BaseException], ...
] = (Exception,),
exception_key: str | None = None
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable
, returning a new Runnable
.
The new Runnable
will try the original Runnable
, and then each fallback
in order, upon failures.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fallbacks
|
Sequence[Runnable[Input, Output]]
|
A sequence of runnables to try if the original |
required |
exceptions_to_handle
|
tuple[type[BaseException], ...]
|
A tuple of exception types to handle.
Defaults to |
(Exception,)
|
exception_key
|
str | None
|
If string is specified then handled exceptions will be passed
to fallbacks as part of the input under the specified key.
If None, exceptions will not be passed to fallbacks.
If used, the base |
None
|
Returns:
Type | Description |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
RunnableWithFallbacks[Input, Output]
|
fallback in order, upon failures. |
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fallbacks
|
Sequence[Runnable[Input, Output]]
|
A sequence of runnables to try if the original |
required |
exceptions_to_handle
|
tuple[type[BaseException], ...]
|
A tuple of exception types to handle. |
(Exception,)
|
exception_key
|
str | None
|
If string is specified then handled exceptions will be passed
to fallbacks as part of the input under the specified key.
If None, exceptions will not be passed to fallbacks.
If used, the base |
None
|
Returns:
Type | Description |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
RunnableWithFallbacks[Input, Output]
|
fallback in order, upon failures. |
as_tool
¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None
) -> BaseTool
Create a BaseTool
from a Runnable
.
as_tool
will instantiate a BaseTool
with a name, description, and
args_schema
from a Runnable
. Where possible, schemas are inferred
from runnable.get_input_schema
. Alternatively (e.g., if the
Runnable
takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema
. You can also
pass arg_types
to just specify the required arguments and their types.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args_schema
|
type[BaseModel] | None
|
The schema for the tool. Defaults to None. |
None
|
name
|
str | None
|
The name of the tool. Defaults to None. |
None
|
description
|
str | None
|
The description of the tool. Defaults to None. |
None
|
arg_types
|
dict[str, type] | None
|
A dictionary of argument names to types. Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
BaseTool
|
A |
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via args_schema
:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via arg_types
:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
from langchain_core.runnables import RunnableLambda
def f(x: str) -> str:
return x + "a"
def g(x: str) -> str:
return x + "z"
runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")
Added in version 0.2.14
__init__
¶
__init__(**kwargs: Any) -> None
Initialize the tool.
Raises:
Type | Description |
---|---|
TypeError
|
If |
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
Is this class serializable?
By design, even if a class inherits from Serializable, it is not serializable by default. This is to prevent accidental serialization of objects that should not be serialized.
Returns:
Type | Description |
---|---|
bool
|
Whether the class is serializable. Default is False. |
get_lc_namespace
classmethod
¶
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path
to the object.
For example, for the class langchain.llms.openai.OpenAI
, the id is
["langchain", "llms", "openai", "OpenAI"].
to_json
¶
Serialize the Runnable
to JSON.
Returns:
Type | Description |
---|---|
SerializedConstructor | SerializedNotImplemented
|
A JSON-serializable representation of the |
to_json_not_implemented
¶
Serialize a "not implemented" object.
Returns:
Type | Description |
---|---|
SerializedNotImplemented
|
SerializedNotImplemented. |
configurable_fields
¶
Configure particular Runnable
fields at runtime.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs
|
AnyConfigurableField
|
A dictionary of |
{}
|
Raises:
Type | Description |
---|---|
ValueError
|
If a configuration key is not found in the |
Returns:
Type | Description |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives
¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: (
Runnable[Input, Output]
| Callable[[], Runnable[Input, Output]]
)
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnables
that can be set at runtime.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
which
|
ConfigurableField
|
The |
required |
default_key
|
str
|
The default key to use if no alternative is selected.
Defaults to |
'default'
|
prefix_keys
|
bool
|
Whether to prefix the keys with the |
False
|
**kwargs
|
Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]
|
A dictionary of keys to |
{}
|
Returns:
Type | Description |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-3-7-sonnet-20250219"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
__init_subclass__
¶
__init_subclass__(**kwargs: Any) -> None
Validate the tool class definition during subclass creation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs
|
Any
|
Additional keyword arguments passed to the parent class. |
{}
|
Raises:
Type | Description |
---|---|
SchemaAnnotationError
|
If args_schema has incorrect type annotation. |
run
¶
run(
tool_input: str | dict[str, Any],
verbose: bool | None = None,
start_color: str | None = "green",
color: str | None = "green",
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
config: RunnableConfig | None = None,
tool_call_id: str | None = None,
**kwargs: Any
) -> Any
Run the tool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tool_input
|
str | dict[str, Any]
|
The input to the tool. |
required |
verbose
|
bool | None
|
Whether to log the tool's progress. Defaults to None. |
None
|
start_color
|
str | None
|
The color to use when starting the tool. Defaults to 'green'. |
'green'
|
color
|
str | None
|
The color to use when ending the tool. Defaults to 'green'. |
'green'
|
callbacks
|
Callbacks
|
Callbacks to be called during tool execution. Defaults to None. |
None
|
tags
|
list[str] | None
|
Optional list of tags associated with the tool. Defaults to None. |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata associated with the tool. Defaults to None. |
None
|
run_name
|
str | None
|
The name of the run. Defaults to None. |
None
|
run_id
|
UUID | None
|
The id of the run. Defaults to None. |
None
|
config
|
RunnableConfig | None
|
The configuration for the tool. Defaults to None. |
None
|
tool_call_id
|
str | None
|
The id of the tool call. Defaults to None. |
None
|
kwargs
|
Any
|
Keyword arguments to be passed to tool callbacks (event handler) |
{}
|
Returns:
Type | Description |
---|---|
Any
|
The output of the tool. |
Raises:
Type | Description |
---|---|
ToolException
|
If an error occurs during tool execution. |
arun
async
¶
arun(
tool_input: str | dict,
verbose: bool | None = None,
start_color: str | None = "green",
color: str | None = "green",
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
config: RunnableConfig | None = None,
tool_call_id: str | None = None,
**kwargs: Any
) -> Any
Run the tool asynchronously.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tool_input
|
str | dict
|
The input to the tool. |
required |
verbose
|
bool | None
|
Whether to log the tool's progress. Defaults to None. |
None
|
start_color
|
str | None
|
The color to use when starting the tool. Defaults to 'green'. |
'green'
|
color
|
str | None
|
The color to use when ending the tool. Defaults to 'green'. |
'green'
|
callbacks
|
Callbacks
|
Callbacks to be called during tool execution. Defaults to None. |
None
|
tags
|
list[str] | None
|
Optional list of tags associated with the tool. Defaults to None. |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata associated with the tool. Defaults to None. |
None
|
run_name
|
str | None
|
The name of the run. Defaults to None. |
None
|
run_id
|
UUID | None
|
The id of the run. Defaults to None. |
None
|
config
|
RunnableConfig | None
|
The configuration for the tool. Defaults to None. |
None
|
tool_call_id
|
str | None
|
The id of the tool call. Defaults to None. |
None
|
kwargs
|
Any
|
Keyword arguments to be passed to tool callbacks |
{}
|
Returns:
Type | Description |
---|---|
Any
|
The output of the tool. |
Raises:
Type | Description |
---|---|
ToolException
|
If an error occurs during tool execution. |
ExaSearchResults
¶
Bases: BaseTool
Exa Search tool.
Setup
Install langchain-exa
and set environment variable EXA_API_KEY
.
.. code-block:: bash
pip install -U langchain-exa
export EXA_API_KEY="your-api-key"
Instantiation
.. code-block:: python
from langchain-exa import ExaSearchResults
tool = ExaSearchResults()
Invocation with args
.. code-block:: python
tool.invoke({"query": "what is the weather in SF", "num_results": 1})
.. code-block:: python
SearchResponse(
results=[
Result(
url="https://www.wunderground.com/weather/37.8,-122.4",
id="https://www.wunderground.com/weather/37.8,-122.4",
title="San Francisco, CA Weather Conditionsstar_ratehome",
score=0.1843988299369812,
published_date="2023-02-23T01:17:06.594Z",
author=None,
text="The time period when the sun is no more than 6 degrees below the horizon at either sunrise or sunset. The horizon should be clearly defined and the brightest stars should be visible under good atmospheric conditions (i.e. no moonlight, or other lights). One still should be able to carry on ordinary outdoor activities. The time period when the sun is between 6 and 12 degrees below the horizon at either sunrise or sunset. The horizon is well defined and the outline of objects might be visible without artificial light. Ordinary outdoor activities are not possible at this time without extra illumination. The time period when the sun is between 12 and 18 degrees below the horizon at either sunrise or sunset. The sun does not contribute to the illumination of the sky before this time in the morning, or after this time in the evening. In the beginning of morning astronomical twilight and at the end of astronomical twilight in the evening, sky illumination is very faint, and might be undetectable. The time of Civil Sunset minus the time of Civil Sunrise. The time of Actual Sunset minus the time of Actual Sunrise. The change in length of daylight between today and tomorrow is also listed when available.",
highlights=None,
highlight_scores=None,
summary=None,
)
],
autoprompt_string=None,
)
Invocation with ToolCall:
.. code-block:: python
tool.invoke(
{
"args": {"query": "what is the weather in SF", "num_results": 1},
"id": "1",
"name": tool.name,
"type": "tool_call",
}
)
.. code-block:: python
ToolMessage(
content="Title: San Francisco, CA Weather Conditionsstar_ratehome\nURL: https://www.wunderground.com/weather/37.8,-122.4\nID: https://www.wunderground.com/weather/37.8,-122.4\nScore: 0.1843988299369812\nPublished Date: 2023-02-23T01:17:06.594Z\nAuthor: None\nText: The time period when the sun is no more than 6 degrees below the horizon at either sunrise or sunset. The horizon should be clearly defined and the brightest stars should be visible under good atmospheric conditions (i.e. no moonlight, or other lights). One still should be able to carry on ordinary outdoor activities. The time period when the sun is between 6 and 12 degrees below the horizon at either sunrise or sunset. The horizon is well defined and the outline of objects might be visible without artificial light. Ordinary outdoor activities are not possible at this time without extra illumination. The time period when the sun is between 12 and 18 degrees below the horizon at either sunrise or sunset. The sun does not contribute to the illumination of the sky before this time in the morning, or after this time in the evening. In the beginning of morning astronomical twilight and at the end of astronomical twilight in the evening, sky illumination is very faint, and might be undetectable. The time of Civil Sunset minus the time of Civil Sunrise. The time of Actual Sunset minus the time of Actual Sunrise. The change in length of daylight between today and tomorrow is also listed when available.\nHighlights: None\nHighlight Scores: None\nSummary: None\n",
name="exa_search_results_json",
tool_call_id="1",
)
Methods:
Name | Description |
---|---|
get_name |
Get the name of the |
get_input_schema |
The tool's input schema. |
get_input_jsonschema |
Get a JSON schema that represents the input to the |
get_output_schema |
Get a pydantic model that can be used to validate output to the |
get_output_jsonschema |
Get a JSON schema that represents the output of the |
config_schema |
The type of config this |
get_config_jsonschema |
Get a JSON schema that represents the config of the |
get_graph |
Return a graph representation of this |
get_prompts |
Return a list of prompts used by this |
__or__ |
Runnable "or" operator. |
__ror__ |
Runnable "reverse-or" operator. |
pipe |
Pipe runnables. |
pick |
Pick keys from the output dict of this |
assign |
Assigns new fields to the dict output of this |
batch |
Default implementation runs invoke in parallel using a thread pool executor. |
batch_as_completed |
Run |
abatch |
Default implementation runs |
abatch_as_completed |
Run |
stream |
Default implementation of |
astream |
Default implementation of |
astream_log |
Stream all output from a |
astream_events |
Generate a stream of events. |
transform |
Transform inputs to outputs. |
atransform |
Transform inputs to outputs. |
bind |
Bind arguments to a |
with_config |
Bind config to a |
with_listeners |
Bind lifecycle listeners to a |
with_alisteners |
Bind async lifecycle listeners to a |
with_types |
Bind input and output types to a |
with_retry |
Create a new Runnable that retries the original Runnable on exceptions. |
map |
Return a new |
with_fallbacks |
Add fallbacks to a |
as_tool |
Create a |
__init__ |
Initialize the tool. |
is_lc_serializable |
Is this class serializable? |
get_lc_namespace |
Get the namespace of the langchain object. |
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the |
to_json_not_implemented |
Serialize a "not implemented" object. |
configurable_fields |
Configure particular |
configurable_alternatives |
Configure alternatives for |
__init_subclass__ |
Validate the tool class definition during subclass creation. |
run |
Run the tool. |
arun |
Run the tool asynchronously. |
validate_environment |
Validate the environment. |
Attributes:
Name | Type | Description |
---|---|---|
InputType |
type[Input]
|
Input type. |
OutputType |
type[Output]
|
Output Type. |
input_schema |
type[BaseModel]
|
The type of input this |
output_schema |
type[BaseModel]
|
Output schema. |
config_specs |
list[ConfigurableFieldSpec]
|
List configurable fields for this |
lc_secrets |
dict[str, str]
|
A map of constructor argument names to secret ids. |
lc_attributes |
dict
|
List of attribute names that should be included in the serialized kwargs. |
args_schema |
Annotated[ArgsSchema | None, SkipValidation()]
|
Pydantic model class to validate and parse the tool's input arguments. |
return_direct |
bool
|
Whether to return the tool's output directly. |
verbose |
bool
|
Whether to log the tool's progress. |
callbacks |
Callbacks
|
Callbacks to be called during tool execution. |
tags |
list[str] | None
|
Optional list of tags associated with the tool. Defaults to None. |
metadata |
dict[str, Any] | None
|
Optional metadata associated with the tool. Defaults to None. |
handle_tool_error |
bool | str | Callable[[ToolException], str] | None
|
Handle the content of the ToolException thrown. |
handle_validation_error |
bool | str | Callable[[ValidationError | ValidationError], str] | None
|
Handle the content of the ValidationError thrown. |
response_format |
Literal['content', 'content_and_artifact']
|
The tool response format. Defaults to 'content'. |
is_single_input |
bool
|
Check if the tool accepts only a single input argument. |
args |
dict
|
Get the tool's input arguments schema. |
tool_call_schema |
ArgsSchema
|
Get the schema for tool calls, excluding injected arguments. |
InputType
property
¶
InputType: type[Input]
Input type.
The type of input this Runnable
accepts specified as a type annotation.
Raises:
Type | Description |
---|---|
TypeError
|
If the input type cannot be inferred. |
OutputType
property
¶
OutputType: type[Output]
Output Type.
The type of output this Runnable
produces specified as a type annotation.
Raises:
Type | Description |
---|---|
TypeError
|
If the output type cannot be inferred. |
input_schema
property
¶
input_schema: type[BaseModel]
The type of input this Runnable
accepts specified as a pydantic model.
output_schema
property
¶
output_schema: type[BaseModel]
Output schema.
The type of output this Runnable
produces specified as a pydantic model.
config_specs
property
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable
.
lc_secrets
property
¶
A map of constructor argument names to secret ids.
For example,
lc_attributes
property
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor. Default is an empty dictionary.
args_schema
class-attribute
instance-attribute
¶
args_schema: Annotated[
ArgsSchema | None, SkipValidation()
] = Field(default=None, description="The tool schema.")
Pydantic model class to validate and parse the tool's input arguments.
Args schema should be either:
- A subclass of pydantic.BaseModel.
- A subclass of pydantic.v1.BaseModel if accessing v1 namespace in pydantic 2
- a JSON schema dict
return_direct
class-attribute
instance-attribute
¶
return_direct: bool = False
Whether to return the tool's output directly.
Setting this to True means that after the tool is called, the AgentExecutor will stop looping.
verbose
class-attribute
instance-attribute
¶
verbose: bool = False
Whether to log the tool's progress.
callbacks
class-attribute
instance-attribute
¶
Callbacks to be called during tool execution.
tags
class-attribute
instance-attribute
¶
Optional list of tags associated with the tool. Defaults to None.
These tags will be associated with each call to this tool,
and passed as arguments to the handlers defined in callbacks
.
You can use these to eg identify a specific instance of a tool with its use case.
metadata
class-attribute
instance-attribute
¶
Optional metadata associated with the tool. Defaults to None.
This metadata will be associated with each call to this tool,
and passed as arguments to the handlers defined in callbacks
.
You can use these to eg identify a specific instance of a tool with its use case.
handle_tool_error
class-attribute
instance-attribute
¶
Handle the content of the ToolException thrown.
handle_validation_error
class-attribute
instance-attribute
¶
handle_validation_error: (
bool
| str
| Callable[[ValidationError | ValidationError], str]
| None
) = False
Handle the content of the ValidationError thrown.
response_format
class-attribute
instance-attribute
¶
response_format: Literal[
"content", "content_and_artifact"
] = "content"
The tool response format. Defaults to 'content'.
If "content" then the output of the tool is interpreted as the contents of a ToolMessage. If "content_and_artifact" then the output is expected to be a two-tuple corresponding to the (content, artifact) of a ToolMessage.
is_single_input
property
¶
is_single_input: bool
Check if the tool accepts only a single input argument.
Returns:
Type | Description |
---|---|
bool
|
True if the tool has only one input argument, False otherwise. |
args
property
¶
args: dict
Get the tool's input arguments schema.
Returns:
Type | Description |
---|---|
dict
|
Dictionary containing the tool's argument properties. |
tool_call_schema
property
¶
Get the schema for tool calls, excluding injected arguments.
Returns:
Type | Description |
---|---|
ArgsSchema
|
The schema that should be used for tool calls from language models. |
get_name
¶
get_input_schema
¶
get_input_schema(
config: RunnableConfig | None = None,
) -> type[BaseModel]
The tool's input schema.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
The configuration for the tool. |
None
|
Returns:
Type | Description |
---|---|
type[BaseModel]
|
The input schema for the tool. |
get_input_jsonschema
¶
Get a JSON schema that represents the input to the Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
A JSON schema that represents the input to the |
Example
Added in version 0.3.0
get_output_schema
¶
get_output_schema(
config: RunnableConfig | None = None,
) -> type[BaseModel]
Get a pydantic model that can be used to validate output to the Runnable
.
Runnable
s that leverage the configurable_fields
and
configurable_alternatives
methods will have a dynamic output schema that
depends on which configuration the Runnable
is invoked with.
This method allows to get an output schema for a specific configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
Type | Description |
---|---|
type[BaseModel]
|
A pydantic model that can be used to validate output. |
get_output_jsonschema
¶
Get a JSON schema that represents the output of the Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
A JSON schema that represents the output of the |
Example
Added in version 0.3.0
config_schema
¶
The type of config this Runnable
accepts specified as a pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives
methods.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include
|
Sequence[str] | None
|
A list of fields to include in the config schema. |
None
|
Returns:
Type | Description |
---|---|
type[BaseModel]
|
A pydantic model that can be used to validate config. |
get_config_jsonschema
¶
Get a JSON schema that represents the config of the Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include
|
Sequence[str] | None
|
A list of fields to include in the config schema. |
None
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
A JSON schema that represents the config of the |
Added in version 0.3.0
get_graph
¶
Return a graph representation of this Runnable
.
get_prompts
¶
get_prompts(
config: RunnableConfig | None = None,
) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable
.
__or__
¶
__or__(
other: (
Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[
[AsyncIterator[Any]], AsyncIterator[Other]
]
| Callable[[Any], Other]
| Mapping[
str,
Runnable[Any, Other]
| Callable[[Any], Other]
| Any,
]
),
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other
|
Runnable[Any, Other] | Callable[[Iterator[Any]], Iterator[Other]] | Callable[[AsyncIterator[Any]], AsyncIterator[Other]] | Callable[[Any], Other] | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any]
|
Another |
required |
Returns:
Type | Description |
---|---|
RunnableSerializable[Input, Other]
|
A new |
__ror__
¶
__ror__(
other: (
Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[
[AsyncIterator[Other]], AsyncIterator[Any]
]
| Callable[[Other], Any]
| Mapping[
str,
Runnable[Other, Any]
| Callable[[Other], Any]
| Any,
]
),
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable
with another object to create a
RunnableSequence
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other
|
Runnable[Other, Any] | Callable[[Iterator[Other]], Iterator[Any]] | Callable[[AsyncIterator[Other]], AsyncIterator[Any]] | Callable[[Other], Any] | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any]
|
Another |
required |
Returns:
Type | Description |
---|---|
RunnableSerializable[Other, Output]
|
A new |
pipe
¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other],
name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe runnables.
Compose this Runnable
with Runnable
-like objects to make a
RunnableSequence
.
Equivalent to RunnableSequence(self, *others)
or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*others
|
Runnable[Any, Other] | Callable[[Any], Other]
|
Other |
()
|
name
|
str | None
|
An optional name for the resulting |
None
|
Returns:
Type | Description |
---|---|
RunnableSerializable[Input, Other]
|
A new |
pick
¶
Pick keys from the output dict of this Runnable
.
Pick single key:
```python
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
```
Pick list of keys:
```python
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(
str=as_str, json=as_json, bytes=RunnableLambda(as_bytes)
)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
```
Parameters:
Name | Type | Description | Default |
---|---|---|---|
keys
|
str | list[str]
|
A key or list of keys to pick from the output dict. |
required |
Returns:
Type | Description |
---|---|
RunnableSerializable[Any, Any]
|
a new |
assign
¶
assign(
**kwargs: (
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[
str,
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any],
]
),
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable
.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
llm = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | llm | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | llm)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs
|
Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any] | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]]
|
A mapping of keys to |
{}
|
Returns:
Type | Description |
---|---|
RunnableSerializable[Any, Any]
|
A new |
batch
¶
batch(
inputs: list[Input],
config: (
RunnableConfig | list[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs
|
list[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | list[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
**kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Returns:
Type | Description |
---|---|
list[Output]
|
A list of outputs from the |
batch_as_completed
¶
batch_as_completed(
inputs: Sequence[Input],
config: (
RunnableConfig | Sequence[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> Iterator[tuple[int, Output | Exception]]
Run invoke
in parallel on a list of inputs.
Yields results as they complete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs
|
Sequence[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | Sequence[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
**kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
tuple[int, Output | Exception]
|
Tuples of the index of the input and the output from the |
abatch
async
¶
abatch(
inputs: list[Input],
config: (
RunnableConfig | list[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs ainvoke
in parallel using asyncio.gather
.
The default implementation of batch
works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently;
e.g., if the underlying Runnable
uses an API which supports a batch mode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs
|
list[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | list[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
**kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Returns:
Type | Description |
---|---|
list[Output]
|
A list of outputs from the |
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence[Input],
config: (
RunnableConfig | Sequence[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke
in parallel on a list of inputs.
Yields results as they complete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs
|
Sequence[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | Sequence[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
A tuple of the index of the input and the output from the |
stream
¶
stream(
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> Iterator[Output]
Default implementation of stream
, which calls invoke
.
Subclasses should override this method if they support streaming output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Input
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
Output
|
The output of the |
astream
async
¶
astream(
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> AsyncIterator[Output]
Default implementation of astream
, which calls ainvoke
.
Subclasses should override this method if they support streaming output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Input
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[Output]
|
The output of the |
astream_log
async
¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable
, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Any
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
diff
|
bool
|
Whether to yield diffs between each step or the current state. |
True
|
with_streamed_output_list
|
bool
|
Whether to yield the |
True
|
include_names
|
Sequence[str] | None
|
Only include logs with these names. |
None
|
include_types
|
Sequence[str] | None
|
Only include logs with these types. |
None
|
include_tags
|
Sequence[str] | None
|
Only include logs with these tags. |
None
|
exclude_names
|
Sequence[str] | None
|
Exclude logs with these names. |
None
|
exclude_types
|
Sequence[str] | None
|
Exclude logs with these types. |
None
|
exclude_tags
|
Sequence[str] | None
|
Exclude logs with these tags. |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
A |
astream_events
async
¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvents
that provide real-time information
about the progress of the Runnable
, including StreamEvents
from intermediate
results.
A StreamEvent
is a dictionary with the following schema:
event
: str - Event names are of the format:on_[runnable_type]_(start|stream|end)
.name
: str - The name of theRunnable
that generated the event.run_id
: str - randomly generated ID associated with the given execution of theRunnable
that emitted the event. A childRunnable
that gets invoked as part of the execution of a parentRunnable
is assigned its own unique ID.parent_ids
: list[str] - The IDs of the parent runnables that generated the event. The rootRunnable
will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags
: Optional[list[str]] - The tags of theRunnable
that generated the event.metadata
: Optional[dict[str, Any]] - The metadata of theRunnable
that generated the event.data
: dict[str, Any]
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| event | name | chunk | input | output |
+==========================+==================+=====================================+===================================================+=====================================================+
| on_chat_model_start
| [model name] | | {"messages": [[SystemMessage, HumanMessage]]}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chat_model_stream
| [model name] | AIMessageChunk(content="hello")
| | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chat_model_end
| [model name] | | {"messages": [[SystemMessage, HumanMessage]]}
| AIMessageChunk(content="hello world")
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_start
| [model name] | | {'input': 'hello'}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_stream
| [model name] | 'Hello'
| | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_end
| [model name] | | 'Hello human!'
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_start
| format_docs | | | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_stream
| format_docs | 'hello world!, goodbye world!'
| | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_end
| format_docs | | [Document(...)]
| 'hello world!, goodbye world!'
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_tool_start
| some_tool | | {"x": 1, "y": "2"}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_tool_end
| some_tool | | | {"x": 1, "y": "2"}
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_retriever_start
| [retriever name] | | {"query": "hello"}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_retriever_end
| [retriever name] | | {"query": "hello"}
| [Document(...), ..]
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_prompt_start
| [template_name] | | {"question": "hello"}
| |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_prompt_end
| [template_name] | | {"question": "hello"}
| ChatPromptValue(messages: [SystemMessage, ...])
|
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
+-----------+------+-----------------------------------------------------------------------------------------------------------+ | Attribute | Type | Description | +===========+======+===========================================================================================================+ | name | str | A user defined name for the event. | +-----------+------+-----------------------------------------------------------------------------------------------------------+ | data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. | +-----------+------+-----------------------------------------------------------------------------------------------------------+
Here are declarations associated with the standard events shown above:
format_docs
:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool
:
prompt
:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
Example: Dispatch Custom Event
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Any
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
version
|
Literal['v1', 'v2']
|
The version of the schema to use either |
'v2'
|
include_names
|
Sequence[str] | None
|
Only include events from |
None
|
include_types
|
Sequence[str] | None
|
Only include events from |
None
|
include_tags
|
Sequence[str] | None
|
Only include events from |
None
|
exclude_names
|
Sequence[str] | None
|
Exclude events from |
None
|
exclude_types
|
Sequence[str] | None
|
Exclude events from |
None
|
exclude_tags
|
Sequence[str] | None
|
Exclude events from |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[StreamEvent]
|
An async stream of |
Raises:
Type | Description |
---|---|
NotImplementedError
|
If the version is not |
transform
¶
transform(
input: Iterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream
.
Subclasses should override this method if they can start producing output while input is still being generated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Iterator[Input]
|
An iterator of inputs to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
Output
|
The output of the |
atransform
async
¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream
.
Subclasses should override this method if they can start producing output while input is still being generated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
AsyncIterator[Input]
|
An async iterator of inputs to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
Type | Description |
---|---|
AsyncIterator[Output]
|
The output of the |
bind
¶
bind(**kwargs: Any) -> Runnable[Input, Output]
Bind arguments to a Runnable
, returning a new Runnable
.
Useful when a Runnable
in a chain requires an argument that is not
in the output of the previous Runnable
or included in the user input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs
|
Any
|
The arguments to bind to the |
{}
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
llm = ChatOllama(model="llama3.1")
# Without bind.
chain = llm | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind.
chain = llm.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable
, returning a new Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig | None
|
The config to bind to the |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new |
with_listeners
¶
with_listeners(
*,
on_start: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None,
on_end: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None,
on_error: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable
, returning a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
on_start
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called before the |
None
|
on_end
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called after the |
None
|
on_error
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called if the |
None
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable
.
Returns a new Runnable
.
The Run object contains information about the run, including its id
,
type
, input
, output
, error
, start_time
, end_time
, and
any tags or metadata added to the run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
on_start
|
AsyncListener | None
|
Called asynchronously before the |
None
|
on_end
|
AsyncListener | None
|
Called asynchronously after the |
None
|
on_error
|
AsyncListener | None
|
Called asynchronously if the |
None
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*,
input_type: type[Input] | None = None,
output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable
, returning a new Runnable
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_type
|
type[Input] | None
|
The input type to bind to the |
None
|
output_type
|
type[Output] | None
|
The output type to bind to the |
None
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new Runnable with the types bound. |
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple[
type[BaseException], ...
] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: (
ExponentialJitterParams | None
) = None,
stop_after_attempt: int = 3
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
retry_if_exception_type
|
tuple[type[BaseException], ...]
|
A tuple of exception types to retry on. Defaults to (Exception,). |
(Exception,)
|
wait_exponential_jitter
|
bool
|
Whether to add jitter to the wait time between retries. Defaults to True. |
True
|
stop_after_attempt
|
int
|
The maximum number of attempts to make before giving up. Defaults to 3. |
3
|
exponential_jitter_params
|
ExponentialJitterParams | None
|
Parameters for
|
None
|
Returns:
Type | Description |
---|---|
Runnable[Input, Output]
|
A new Runnable that retries the original Runnable on exceptions. |
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map
¶
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[
type[BaseException], ...
] = (Exception,),
exception_key: str | None = None
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable
, returning a new Runnable
.
The new Runnable
will try the original Runnable
, and then each fallback
in order, upon failures.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fallbacks
|
Sequence[Runnable[Input, Output]]
|
A sequence of runnables to try if the original |
required |
exceptions_to_handle
|
tuple[type[BaseException], ...]
|
A tuple of exception types to handle.
Defaults to |
(Exception,)
|
exception_key
|
str | None
|
If string is specified then handled exceptions will be passed
to fallbacks as part of the input under the specified key.
If None, exceptions will not be passed to fallbacks.
If used, the base |
None
|
Returns:
Type | Description |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
RunnableWithFallbacks[Input, Output]
|
fallback in order, upon failures. |
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fallbacks
|
Sequence[Runnable[Input, Output]]
|
A sequence of runnables to try if the original |
required |
exceptions_to_handle
|
tuple[type[BaseException], ...]
|
A tuple of exception types to handle. |
(Exception,)
|
exception_key
|
str | None
|
If string is specified then handled exceptions will be passed
to fallbacks as part of the input under the specified key.
If None, exceptions will not be passed to fallbacks.
If used, the base |
None
|
Returns:
Type | Description |
---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
RunnableWithFallbacks[Input, Output]
|
fallback in order, upon failures. |
as_tool
¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None
) -> BaseTool
Create a BaseTool
from a Runnable
.
as_tool
will instantiate a BaseTool
with a name, description, and
args_schema
from a Runnable
. Where possible, schemas are inferred
from runnable.get_input_schema
. Alternatively (e.g., if the
Runnable
takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema
. You can also
pass arg_types
to just specify the required arguments and their types.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args_schema
|
type[BaseModel] | None
|
The schema for the tool. Defaults to None. |
None
|
name
|
str | None
|
The name of the tool. Defaults to None. |
None
|
description
|
str | None
|
The description of the tool. Defaults to None. |
None
|
arg_types
|
dict[str, type] | None
|
A dictionary of argument names to types. Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
BaseTool
|
A |
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via args_schema
:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via arg_types
:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
from langchain_core.runnables import RunnableLambda
def f(x: str) -> str:
return x + "a"
def g(x: str) -> str:
return x + "z"
runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")
Added in version 0.2.14
__init__
¶
__init__(**kwargs: Any) -> None
Initialize the tool.
Raises:
Type | Description |
---|---|
TypeError
|
If |
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
Is this class serializable?
By design, even if a class inherits from Serializable, it is not serializable by default. This is to prevent accidental serialization of objects that should not be serialized.
Returns:
Type | Description |
---|---|
bool
|
Whether the class is serializable. Default is False. |
get_lc_namespace
classmethod
¶
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path
to the object.
For example, for the class langchain.llms.openai.OpenAI
, the id is
["langchain", "llms", "openai", "OpenAI"].
to_json
¶
Serialize the Runnable
to JSON.
Returns:
Type | Description |
---|---|
SerializedConstructor | SerializedNotImplemented
|
A JSON-serializable representation of the |
to_json_not_implemented
¶
Serialize a "not implemented" object.
Returns:
Type | Description |
---|---|
SerializedNotImplemented
|
SerializedNotImplemented. |
configurable_fields
¶
Configure particular Runnable
fields at runtime.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs
|
AnyConfigurableField
|
A dictionary of |
{}
|
Raises:
Type | Description |
---|---|
ValueError
|
If a configuration key is not found in the |
Returns:
Type | Description |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives
¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: (
Runnable[Input, Output]
| Callable[[], Runnable[Input, Output]]
)
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnables
that can be set at runtime.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
which
|
ConfigurableField
|
The |
required |
default_key
|
str
|
The default key to use if no alternative is selected.
Defaults to |
'default'
|
prefix_keys
|
bool
|
Whether to prefix the keys with the |
False
|
**kwargs
|
Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]
|
A dictionary of keys to |
{}
|
Returns:
Type | Description |
---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-3-7-sonnet-20250219"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
__init_subclass__
¶
__init_subclass__(**kwargs: Any) -> None
Validate the tool class definition during subclass creation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs
|
Any
|
Additional keyword arguments passed to the parent class. |
{}
|
Raises:
Type | Description |
---|---|
SchemaAnnotationError
|
If args_schema has incorrect type annotation. |
run
¶
run(
tool_input: str | dict[str, Any],
verbose: bool | None = None,
start_color: str | None = "green",
color: str | None = "green",
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
config: RunnableConfig | None = None,
tool_call_id: str | None = None,
**kwargs: Any
) -> Any
Run the tool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tool_input
|
str | dict[str, Any]
|
The input to the tool. |
required |
verbose
|
bool | None
|
Whether to log the tool's progress. Defaults to None. |
None
|
start_color
|
str | None
|
The color to use when starting the tool. Defaults to 'green'. |
'green'
|
color
|
str | None
|
The color to use when ending the tool. Defaults to 'green'. |
'green'
|
callbacks
|
Callbacks
|
Callbacks to be called during tool execution. Defaults to None. |
None
|
tags
|
list[str] | None
|
Optional list of tags associated with the tool. Defaults to None. |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata associated with the tool. Defaults to None. |
None
|
run_name
|
str | None
|
The name of the run. Defaults to None. |
None
|
run_id
|
UUID | None
|
The id of the run. Defaults to None. |
None
|
config
|
RunnableConfig | None
|
The configuration for the tool. Defaults to None. |
None
|
tool_call_id
|
str | None
|
The id of the tool call. Defaults to None. |
None
|
kwargs
|
Any
|
Keyword arguments to be passed to tool callbacks (event handler) |
{}
|
Returns:
Type | Description |
---|---|
Any
|
The output of the tool. |
Raises:
Type | Description |
---|---|
ToolException
|
If an error occurs during tool execution. |
arun
async
¶
arun(
tool_input: str | dict,
verbose: bool | None = None,
start_color: str | None = "green",
color: str | None = "green",
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
config: RunnableConfig | None = None,
tool_call_id: str | None = None,
**kwargs: Any
) -> Any
Run the tool asynchronously.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tool_input
|
str | dict
|
The input to the tool. |
required |
verbose
|
bool | None
|
Whether to log the tool's progress. Defaults to None. |
None
|
start_color
|
str | None
|
The color to use when starting the tool. Defaults to 'green'. |
'green'
|
color
|
str | None
|
The color to use when ending the tool. Defaults to 'green'. |
'green'
|
callbacks
|
Callbacks
|
Callbacks to be called during tool execution. Defaults to None. |
None
|
tags
|
list[str] | None
|
Optional list of tags associated with the tool. Defaults to None. |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata associated with the tool. Defaults to None. |
None
|
run_name
|
str | None
|
The name of the run. Defaults to None. |
None
|
run_id
|
UUID | None
|
The id of the run. Defaults to None. |
None
|
config
|
RunnableConfig | None
|
The configuration for the tool. Defaults to None. |
None
|
tool_call_id
|
str | None
|
The id of the tool call. Defaults to None. |
None
|
kwargs
|
Any
|
Keyword arguments to be passed to tool callbacks |
{}
|
Returns:
Type | Description |
---|---|
Any
|
The output of the tool. |
Raises:
Type | Description |
---|---|
ToolException
|
If an error occurs during tool execution. |