langchain-fireworks¶
Modules:
| Name | Description |
|---|---|
chat_models |
Fireworks chat wrapper. |
embeddings |
|
llms |
Wrapper around Fireworks AI's Completion API. |
version |
Main entrypoint into package. |
Classes:
| Name | Description |
|---|---|
ChatFireworks |
|
FireworksEmbeddings |
Fireworks embedding model integration. |
Fireworks |
LLM models from |
ChatFireworks
¶
Bases: BaseChatModel
Fireworks Chat large language models API.
To use, you should have the
environment variable FIREWORKS_API_KEY set with your API key.
Any parameters that are valid to be passed to the fireworks.create call can be passed in, even if not explicitly saved on this class.
Example
.. code-block:: python
from langchain_fireworks.chat_models import ChatFireworks
fireworks = ChatFireworks(
model_name="accounts/fireworks/models/llama-v3p1-8b-instruct"
)
Methods:
| Name | Description |
|---|---|
get_name |
Get the name of the |
get_input_schema |
Get a pydantic model that can be used to validate input to the Runnable. |
get_input_jsonschema |
Get a JSON schema that represents the input to the |
get_output_schema |
Get a pydantic model that can be used to validate output to the |
get_output_jsonschema |
Get a JSON schema that represents the output of the |
config_schema |
The type of config this |
get_config_jsonschema |
Get a JSON schema that represents the config of the |
get_graph |
Return a graph representation of this |
get_prompts |
Return a list of prompts used by this |
__or__ |
Runnable "or" operator. |
__ror__ |
Runnable "reverse-or" operator. |
pipe |
Pipe runnables. |
pick |
Pick keys from the output dict of this |
assign |
Assigns new fields to the dict output of this |
batch |
Default implementation runs invoke in parallel using a thread pool executor. |
batch_as_completed |
Run |
abatch |
Default implementation runs |
abatch_as_completed |
Run |
astream_log |
Stream all output from a |
astream_events |
Generate a stream of events. |
transform |
Transform inputs to outputs. |
atransform |
Transform inputs to outputs. |
bind |
Bind arguments to a |
with_config |
Bind config to a |
with_listeners |
Bind lifecycle listeners to a |
with_alisteners |
Bind async lifecycle listeners to a |
with_types |
Bind input and output types to a |
with_retry |
Create a new Runnable that retries the original Runnable on exceptions. |
map |
Return a new |
with_fallbacks |
Add fallbacks to a |
as_tool |
Create a |
__init__ |
|
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the |
to_json_not_implemented |
Serialize a "not implemented" object. |
configurable_fields |
Configure particular |
configurable_alternatives |
Configure alternatives for |
set_verbose |
If verbose is None, set it. |
get_token_ids |
Return the ordered ids of the tokens in a text. |
get_num_tokens |
Get the number of tokens present in the text. |
get_num_tokens_from_messages |
Get the number of tokens in the messages. |
generate |
Pass a sequence of prompts to the model and return model generations. |
agenerate |
Asynchronously pass a sequence of prompts to a model and return generations. |
dict |
Return a dictionary of the LLM. |
get_lc_namespace |
Get the namespace of the LangChain object. |
is_lc_serializable |
Return whether this model can be serialized by LangChain. |
build_extra |
Build extra kwargs from additional params that were passed in. |
validate_environment |
Validate that api key and python package exists in environment. |
bind_tools |
Bind tool-like objects to this chat model. |
with_structured_output |
Model wrapper that returns outputs formatted to match the given schema. |
Attributes:
| Name | Type | Description |
|---|---|---|
InputType |
TypeAlias
|
Get the input type for this runnable. |
OutputType |
Any
|
Get the output type for this runnable. |
input_schema |
type[BaseModel]
|
The type of input this |
output_schema |
type[BaseModel]
|
Output schema. |
config_specs |
list[ConfigurableFieldSpec]
|
List configurable fields for this |
cache |
BaseCache | bool | None
|
Whether to cache the response. |
verbose |
bool
|
Whether to print out response text. |
callbacks |
Callbacks
|
Callbacks to add to the run trace. |
tags |
list[str] | None
|
Tags to add to the run trace. |
metadata |
dict[str, Any] | None
|
Metadata to add to the run trace. |
custom_get_token_ids |
Callable[[str], list[int]] | None
|
Optional encoder to use for counting tokens. |
rate_limiter |
BaseRateLimiter | None
|
An optional rate limiter to use for limiting the number of requests. |
disable_streaming |
bool | Literal['tool_calling']
|
Whether to disable streaming for this model. |
output_version |
str | None
|
Version of |
model_name |
str
|
Model name to use. |
temperature |
Optional[float]
|
What sampling temperature to use. |
stop |
Optional[Union[str, list[str]]]
|
Default stop sequences. |
model_kwargs |
dict[str, Any]
|
Holds any model parameters valid for |
fireworks_api_key |
SecretStr
|
Fireworks API key. |
fireworks_api_base |
Optional[str]
|
Base URL path for API requests, leave blank if not using a proxy or service |
request_timeout |
Union[float, tuple[float, float], Any, None]
|
Timeout for requests to Fireworks completion API. Can be |
streaming |
bool
|
Whether to stream the results or not. |
n |
int
|
Number of chat completions to generate for each prompt. |
max_tokens |
Optional[int]
|
Maximum number of tokens to generate. |
max_retries |
Optional[int]
|
Maximum number of retries to make when generating. |
input_schema
property
¶
input_schema: type[BaseModel]
The type of input this Runnable accepts specified as a pydantic model.
output_schema
property
¶
output_schema: type[BaseModel]
Output schema.
The type of output this Runnable produces specified as a pydantic model.
config_specs
property
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable.
cache
class-attribute
instance-attribute
¶
cache: BaseCache | bool | None = Field(
default=None, exclude=True
)
Whether to cache the response.
- If true, will use the global cache.
- If false, will not use a cache
- If None, will use the global cache if it's set, otherwise no cache.
- If instance of
BaseCache, will use the provided cache.
Caching is not currently supported for streaming methods of models.
verbose
class-attribute
instance-attribute
¶
verbose: bool = Field(
default_factory=_get_verbosity, exclude=True, repr=False
)
Whether to print out response text.
callbacks
class-attribute
instance-attribute
¶
Callbacks to add to the run trace.
tags
class-attribute
instance-attribute
¶
Tags to add to the run trace.
metadata
class-attribute
instance-attribute
¶
Metadata to add to the run trace.
custom_get_token_ids
class-attribute
instance-attribute
¶
Optional encoder to use for counting tokens.
rate_limiter
class-attribute
instance-attribute
¶
rate_limiter: BaseRateLimiter | None = Field(
default=None, exclude=True
)
An optional rate limiter to use for limiting the number of requests.
disable_streaming
class-attribute
instance-attribute
¶
Whether to disable streaming for this model.
If streaming is bypassed, then stream()/astream()/astream_events() will
defer to invoke()/ainvoke().
- If True, will always bypass streaming case.
- If
'tool_calling', will bypass streaming case only when the model is called with atoolskeyword argument. In other words, LangChain will automatically switch to non-streaming behavior (invoke()) only when the tools argument is provided. This offers the best of both worlds. - If False (default), will always use streaming case if available.
The main reason for this flag is that code might be written using stream() and
a user may want to swap out a given model for another model whose the implementation
does not properly support streaming.
output_version
class-attribute
instance-attribute
¶
output_version: str | None = Field(
default_factory=from_env(
"LC_OUTPUT_VERSION", default=None
)
)
Version of AIMessage output format to store in message content.
AIMessage.content_blocks will lazily parse the contents of content into a
standard format. This flag can be used to additionally store the standard format
in message content, e.g., for serialization purposes.
Supported values:
"v0": provider-specific format in content (can lazily-parse with.content_blocks)"v1": standardized format in content (consistent with.content_blocks)
Partner packages (e.g., langchain-openai) can also use this field to roll out
new content formats in a backward-compatible way.
Added in version 1.0
model_name
class-attribute
instance-attribute
¶
model_name: str = Field(alias='model')
Model name to use.
temperature
class-attribute
instance-attribute
¶
What sampling temperature to use.
stop
class-attribute
instance-attribute
¶
Default stop sequences.
model_kwargs
class-attribute
instance-attribute
¶
Holds any model parameters valid for create call not explicitly specified.
fireworks_api_key
class-attribute
instance-attribute
¶
fireworks_api_key: SecretStr = Field(
alias="api_key",
default_factory=secret_from_env(
"FIREWORKS_API_KEY",
error_message="You must specify an api key. You can pass it an argument as `api_key=...` or set the environment variable `FIREWORKS_API_KEY`.",
),
)
Fireworks API key.
Automatically read from env variable FIREWORKS_API_KEY if not provided.
fireworks_api_base
class-attribute
instance-attribute
¶
fireworks_api_base: Optional[str] = Field(
alias="base_url",
default_factory=from_env(
"FIREWORKS_API_BASE", default=None
),
)
Base URL path for API requests, leave blank if not using a proxy or service emulator.
request_timeout
class-attribute
instance-attribute
¶
request_timeout: Union[
float, tuple[float, float], Any, None
] = Field(default=None, alias="timeout")
Timeout for requests to Fireworks completion API. Can be float,
httpx.Timeout or None.
streaming
class-attribute
instance-attribute
¶
streaming: bool = False
Whether to stream the results or not.
n
class-attribute
instance-attribute
¶
n: int = 1
Number of chat completions to generate for each prompt.
max_tokens
class-attribute
instance-attribute
¶
Maximum number of tokens to generate.
max_retries
class-attribute
instance-attribute
¶
Maximum number of retries to make when generating.
get_name
¶
get_input_schema
¶
get_input_schema(
config: RunnableConfig | None = None,
) -> type[BaseModel]
Get a pydantic model that can be used to validate input to the Runnable.
Runnables that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic input schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an input schema for a specific configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
| Type | Description |
|---|---|
type[BaseModel]
|
A pydantic model that can be used to validate input. |
get_input_jsonschema
¶
Get a JSON schema that represents the input to the Runnable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A JSON schema that represents the input to the |
Example
Added in version 0.3.0
get_output_schema
¶
get_output_schema(
config: RunnableConfig | None = None,
) -> type[BaseModel]
Get a pydantic model that can be used to validate output to the Runnable.
Runnables that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic output schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an output schema for a specific configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
| Type | Description |
|---|---|
type[BaseModel]
|
A pydantic model that can be used to validate output. |
get_output_jsonschema
¶
Get a JSON schema that represents the output of the Runnable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A JSON schema that represents the output of the |
Example
Added in version 0.3.0
config_schema
¶
The type of config this Runnable accepts specified as a pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives methods.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include
|
Sequence[str] | None
|
A list of fields to include in the config schema. |
None
|
Returns:
| Type | Description |
|---|---|
type[BaseModel]
|
A pydantic model that can be used to validate config. |
get_config_jsonschema
¶
Get a JSON schema that represents the config of the Runnable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include
|
Sequence[str] | None
|
A list of fields to include in the config schema. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A JSON schema that represents the config of the |
Added in version 0.3.0
get_graph
¶
Return a graph representation of this Runnable.
get_prompts
¶
get_prompts(
config: RunnableConfig | None = None,
) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable.
__or__
¶
__or__(
other: (
Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[
[AsyncIterator[Any]], AsyncIterator[Other]
]
| Callable[[Any], Other]
| Mapping[
str,
Runnable[Any, Other]
| Callable[[Any], Other]
| Any,
]
),
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
Runnable[Any, Other] | Callable[[Iterator[Any]], Iterator[Other]] | Callable[[AsyncIterator[Any]], AsyncIterator[Other]] | Callable[[Any], Other] | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any]
|
Another |
required |
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Input, Other]
|
A new |
__ror__
¶
__ror__(
other: (
Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[
[AsyncIterator[Other]], AsyncIterator[Any]
]
| Callable[[Other], Any]
| Mapping[
str,
Runnable[Other, Any]
| Callable[[Other], Any]
| Any,
]
),
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
Runnable[Other, Any] | Callable[[Iterator[Other]], Iterator[Any]] | Callable[[AsyncIterator[Other]], AsyncIterator[Any]] | Callable[[Other], Any] | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any]
|
Another |
required |
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Other, Output]
|
A new |
pipe
¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other],
name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe runnables.
Compose this Runnable with Runnable-like objects to make a
RunnableSequence.
Equivalent to RunnableSequence(self, *others) or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*others
|
Runnable[Any, Other] | Callable[[Any], Other]
|
Other |
()
|
name
|
str | None
|
An optional name for the resulting |
None
|
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Input, Other]
|
A new |
pick
¶
Pick keys from the output dict of this Runnable.
Pick single key:
```python
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
```
Pick list of keys:
```python
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(
str=as_str, json=as_json, bytes=RunnableLambda(as_bytes)
)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
```
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
str | list[str]
|
A key or list of keys to pick from the output dict. |
required |
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Any, Any]
|
a new |
assign
¶
assign(
**kwargs: (
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[
str,
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any],
]
),
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
llm = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | llm | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | llm)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any] | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]]
|
A mapping of keys to |
{}
|
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Any, Any]
|
A new |
batch
¶
batch(
inputs: list[Input],
config: (
RunnableConfig | list[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
list[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | list[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
**kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Returns:
| Type | Description |
|---|---|
list[Output]
|
A list of outputs from the |
batch_as_completed
¶
batch_as_completed(
inputs: Sequence[Input],
config: (
RunnableConfig | Sequence[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> Iterator[tuple[int, Output | Exception]]
Run invoke in parallel on a list of inputs.
Yields results as they complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
Sequence[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | Sequence[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
**kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
| Type | Description |
|---|---|
tuple[int, Output | Exception]
|
Tuples of the index of the input and the output from the |
abatch
async
¶
abatch(
inputs: list[Input],
config: (
RunnableConfig | list[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> list[Output]
Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
list[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | list[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
**kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Returns:
| Type | Description |
|---|---|
list[Output]
|
A list of outputs from the |
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence[Input],
config: (
RunnableConfig | Sequence[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke in parallel on a list of inputs.
Yields results as they complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
Sequence[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | Sequence[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
A tuple of the index of the input and the output from the |
astream_log
async
¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input
|
Any
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
diff
|
bool
|
Whether to yield diffs between each step or the current state. |
True
|
with_streamed_output_list
|
bool
|
Whether to yield the |
True
|
include_names
|
Sequence[str] | None
|
Only include logs with these names. |
None
|
include_types
|
Sequence[str] | None
|
Only include logs with these types. |
None
|
include_tags
|
Sequence[str] | None
|
Only include logs with these tags. |
None
|
exclude_names
|
Sequence[str] | None
|
Exclude logs with these names. |
None
|
exclude_types
|
Sequence[str] | None
|
Exclude logs with these types. |
None
|
exclude_tags
|
Sequence[str] | None
|
Exclude logs with these tags. |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
A |
astream_events
async
¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvents that provide real-time information
about the progress of the Runnable, including StreamEvents from intermediate
results.
A StreamEvent is a dictionary with the following schema:
event: str - Event names are of the format:on_[runnable_type]_(start|stream|end).name: str - The name of theRunnablethat generated the event.run_id: str - randomly generated ID associated with the given execution of theRunnablethat emitted the event. A childRunnablethat gets invoked as part of the execution of a parentRunnableis assigned its own unique ID.parent_ids: list[str] - The IDs of the parent runnables that generated the event. The rootRunnablewill have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags: Optional[list[str]] - The tags of theRunnablethat generated the event.metadata: Optional[dict[str, Any]] - The metadata of theRunnablethat generated the event.data: dict[str, Any]
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| event | name | chunk | input | output |
+==========================+==================+=====================================+===================================================+=====================================================+
| on_chat_model_start | [model name] | | {"messages": [[SystemMessage, HumanMessage]]} | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chat_model_stream | [model name] | AIMessageChunk(content="hello") | | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chat_model_end | [model name] | | {"messages": [[SystemMessage, HumanMessage]]} | AIMessageChunk(content="hello world") |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_start | [model name] | | {'input': 'hello'} | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_stream | [model name] | 'Hello' | | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_end | [model name] | | 'Hello human!' | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_start | format_docs | | | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_stream | format_docs | 'hello world!, goodbye world!' | | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_end | format_docs | | [Document(...)] | 'hello world!, goodbye world!' |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_tool_start | some_tool | | {"x": 1, "y": "2"} | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_tool_end | some_tool | | | {"x": 1, "y": "2"} |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_retriever_start | [retriever name] | | {"query": "hello"} | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_retriever_end | [retriever name] | | {"query": "hello"} | [Document(...), ..] |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_prompt_start | [template_name] | | {"question": "hello"} | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_prompt_end | [template_name] | | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage, ...]) |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
+-----------+------+-----------------------------------------------------------------------------------------------------------+ | Attribute | Type | Description | +===========+======+===========================================================================================================+ | name | str | A user defined name for the event. | +-----------+------+-----------------------------------------------------------------------------------------------------------+ | data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. | +-----------+------+-----------------------------------------------------------------------------------------------------------+
Here are declarations associated with the standard events shown above:
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
Example: Dispatch Custom Event
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input
|
Any
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
version
|
Literal['v1', 'v2']
|
The version of the schema to use either |
'v2'
|
include_names
|
Sequence[str] | None
|
Only include events from |
None
|
include_types
|
Sequence[str] | None
|
Only include events from |
None
|
include_tags
|
Sequence[str] | None
|
Only include events from |
None
|
exclude_names
|
Sequence[str] | None
|
Exclude events from |
None
|
exclude_types
|
Sequence[str] | None
|
Exclude events from |
None
|
exclude_tags
|
Sequence[str] | None
|
Exclude events from |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[StreamEvent]
|
An async stream of |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If the version is not |
transform
¶
transform(
input: Iterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream.
Subclasses should override this method if they can start producing output while input is still being generated.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input
|
Iterator[Input]
|
An iterator of inputs to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
| Type | Description |
|---|---|
Output
|
The output of the |
atransform
async
¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream.
Subclasses should override this method if they can start producing output while input is still being generated.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input
|
AsyncIterator[Input]
|
An async iterator of inputs to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[Output]
|
The output of the |
bind
¶
bind(**kwargs: Any) -> Runnable[Input, Output]
Bind arguments to a Runnable, returning a new Runnable.
Useful when a Runnable in a chain requires an argument that is not
in the output of the previous Runnable or included in the user input.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
kwargs
|
Any
|
The arguments to bind to the |
{}
|
Returns:
| Type | Description |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
llm = ChatOllama(model="llama3.1")
# Without bind.
chain = llm | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind.
chain = llm.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable, returning a new Runnable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig | None
|
The config to bind to the |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Returns:
| Type | Description |
|---|---|
Runnable[Input, Output]
|
A new |
with_listeners
¶
with_listeners(
*,
on_start: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None,
on_end: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None,
on_error: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable, returning a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_start
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called before the |
None
|
on_end
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called after the |
None
|
on_error
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called if the |
None
|
Returns:
| Type | Description |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable.
Returns a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_start
|
AsyncListener | None
|
Called asynchronously before the |
None
|
on_end
|
AsyncListener | None
|
Called asynchronously after the |
None
|
on_error
|
AsyncListener | None
|
Called asynchronously if the |
None
|
Returns:
| Type | Description |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*,
input_type: type[Input] | None = None,
output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable, returning a new Runnable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_type
|
type[Input] | None
|
The input type to bind to the |
None
|
output_type
|
type[Output] | None
|
The output type to bind to the |
None
|
Returns:
| Type | Description |
|---|---|
Runnable[Input, Output]
|
A new Runnable with the types bound. |
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple[
type[BaseException], ...
] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: (
ExponentialJitterParams | None
) = None,
stop_after_attempt: int = 3
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retry_if_exception_type
|
tuple[type[BaseException], ...]
|
A tuple of exception types to retry on. Defaults to (Exception,). |
(Exception,)
|
wait_exponential_jitter
|
bool
|
Whether to add jitter to the wait time between retries. Defaults to True. |
True
|
stop_after_attempt
|
int
|
The maximum number of attempts to make before giving up. Defaults to 3. |
3
|
exponential_jitter_params
|
ExponentialJitterParams | None
|
Parameters for
|
None
|
Returns:
| Type | Description |
|---|---|
Runnable[Input, Output]
|
A new Runnable that retries the original Runnable on exceptions. |
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map
¶
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[
type[BaseException], ...
] = (Exception,),
exception_key: str | None = None
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable, returning a new Runnable.
The new Runnable will try the original Runnable, and then each fallback
in order, upon failures.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fallbacks
|
Sequence[Runnable[Input, Output]]
|
A sequence of runnables to try if the original |
required |
exceptions_to_handle
|
tuple[type[BaseException], ...]
|
A tuple of exception types to handle.
Defaults to |
(Exception,)
|
exception_key
|
str | None
|
If string is specified then handled exceptions will be passed
to fallbacks as part of the input under the specified key.
If None, exceptions will not be passed to fallbacks.
If used, the base |
None
|
Returns:
| Type | Description |
|---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
RunnableWithFallbacks[Input, Output]
|
fallback in order, upon failures. |
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fallbacks
|
Sequence[Runnable[Input, Output]]
|
A sequence of runnables to try if the original |
required |
exceptions_to_handle
|
tuple[type[BaseException], ...]
|
A tuple of exception types to handle. |
(Exception,)
|
exception_key
|
str | None
|
If string is specified then handled exceptions will be passed
to fallbacks as part of the input under the specified key.
If None, exceptions will not be passed to fallbacks.
If used, the base |
None
|
Returns:
| Type | Description |
|---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
RunnableWithFallbacks[Input, Output]
|
fallback in order, upon failures. |
as_tool
¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None
) -> BaseTool
Create a BaseTool from a Runnable.
as_tool will instantiate a BaseTool with a name, description, and
args_schema from a Runnable. Where possible, schemas are inferred
from runnable.get_input_schema. Alternatively (e.g., if the
Runnable takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema. You can also
pass arg_types to just specify the required arguments and their types.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
args_schema
|
type[BaseModel] | None
|
The schema for the tool. Defaults to None. |
None
|
name
|
str | None
|
The name of the tool. Defaults to None. |
None
|
description
|
str | None
|
The description of the tool. Defaults to None. |
None
|
arg_types
|
dict[str, type] | None
|
A dictionary of argument names to types. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
BaseTool
|
A |
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via args_schema:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via arg_types:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
from langchain_core.runnables import RunnableLambda
def f(x: str) -> str:
return x + "a"
def g(x: str) -> str:
return x + "z"
runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")
Added in version 0.2.14
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path
to the object.
For example, for the class langchain.llms.openai.OpenAI, the id is
["langchain", "llms", "openai", "OpenAI"].
to_json
¶
Serialize the Runnable to JSON.
Returns:
| Type | Description |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
A JSON-serializable representation of the |
to_json_not_implemented
¶
Serialize a "not implemented" object.
Returns:
| Type | Description |
|---|---|
SerializedNotImplemented
|
SerializedNotImplemented. |
configurable_fields
¶
Configure particular Runnable fields at runtime.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
AnyConfigurableField
|
A dictionary of |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If a configuration key is not found in the |
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives
¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: (
Runnable[Input, Output]
| Callable[[], Runnable[Input, Output]]
)
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnables that can be set at runtime.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
which
|
ConfigurableField
|
The |
required |
default_key
|
str
|
The default key to use if no alternative is selected.
Defaults to |
'default'
|
prefix_keys
|
bool
|
Whether to prefix the keys with the |
False
|
**kwargs
|
Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]
|
A dictionary of keys to |
{}
|
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-3-7-sonnet-20250219"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
set_verbose
¶
get_token_ids
¶
get_num_tokens
¶
get_num_tokens_from_messages
¶
Get the number of tokens in the messages.
Useful for checking if an input fits in a model's context window.
Note
The base implementation of get_num_tokens_from_messages ignores tool
schemas.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
list[BaseMessage]
|
The message inputs to tokenize. |
required |
tools
|
Sequence | None
|
If provided, sequence of dict, |
None
|
Returns:
| Type | Description |
|---|---|
int
|
The sum of the number of tokens across the messages. |
generate
¶
generate(
messages: list[list[BaseMessage]],
stop: list[str] | None = None,
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
**kwargs: Any
) -> LLMResult
Pass a sequence of prompts to the model and return model generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
list[list[BaseMessage]]
|
List of list of messages. |
required |
stop
|
list[str] | None
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
None
|
callbacks
|
Callbacks
|
Callbacks to pass through. Used for executing additional functionality, such as logging or streaming, throughout generation. |
None
|
tags
|
list[str] | None
|
The tags to apply. |
None
|
metadata
|
dict[str, Any] | None
|
The metadata to apply. |
None
|
run_name
|
str | None
|
The name of the run. |
None
|
run_id
|
UUID | None
|
The ID of the run. |
None
|
**kwargs
|
Any
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call. |
{}
|
Returns:
| Type | Description |
|---|---|
LLMResult
|
An LLMResult, which contains a list of candidate Generations for each input |
LLMResult
|
prompt and additional model provider-specific output. |
agenerate
async
¶
agenerate(
messages: list[list[BaseMessage]],
stop: list[str] | None = None,
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
**kwargs: Any
) -> LLMResult
Asynchronously pass a sequence of prompts to a model and return generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
list[list[BaseMessage]]
|
List of list of messages. |
required |
stop
|
list[str] | None
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
None
|
callbacks
|
Callbacks
|
Callbacks to pass through. Used for executing additional functionality, such as logging or streaming, throughout generation. |
None
|
tags
|
list[str] | None
|
The tags to apply. |
None
|
metadata
|
dict[str, Any] | None
|
The metadata to apply. |
None
|
run_name
|
str | None
|
The name of the run. |
None
|
run_id
|
UUID | None
|
The ID of the run. |
None
|
**kwargs
|
Any
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call. |
{}
|
Returns:
| Type | Description |
|---|---|
LLMResult
|
An LLMResult, which contains a list of candidate Generations for each input |
LLMResult
|
prompt and additional model provider-specific output. |
get_lc_namespace
classmethod
¶
Get the namespace of the LangChain object.
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
Return whether this model can be serialized by LangChain.
build_extra
classmethod
¶
Build extra kwargs from additional params that were passed in.
validate_environment
¶
Validate that api key and python package exists in environment.
bind_tools
¶
bind_tools(
tools: Sequence[
Union[
dict[str, Any],
type[BaseModel],
Callable,
BaseTool,
]
],
*,
tool_choice: Optional[
Union[
dict, str, Literal["auto", "any", "none"], bool
]
] = None,
**kwargs: Any
) -> Runnable[LanguageModelInput, AIMessage]
Bind tool-like objects to this chat model.
Assumes model is compatible with Fireworks tool-calling API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tools
|
Sequence[Union[dict[str, Any], type[BaseModel], Callable, BaseTool]]
|
A list of tool definitions to bind to this chat model.
Supports any tool definition handled by
|
required |
tool_choice
|
Optional[Union[dict, str, Literal['auto', 'any', 'none'], bool]]
|
Which tool to require the model to call.
Must be the name of the single provided function,
|
None
|
**kwargs
|
Any
|
Any additional parameters to pass to
|
{}
|
with_structured_output
¶
with_structured_output(
schema: Optional[Union[dict, type[BaseModel]]] = None,
*,
method: Literal[
"function_calling", "json_mode", "json_schema"
] = "function_calling",
include_raw: bool = False,
**kwargs: Any
) -> Runnable[LanguageModelInput, Union[dict, BaseModel]]
Model wrapper that returns outputs formatted to match the given schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Optional[Union[dict, type[BaseModel]]]
|
The output schema. Can be passed in as:
If Behavior changed in 0.1.7 Added support for TypedDict class. |
None
|
method
|
Literal['function_calling', 'json_mode', 'json_schema']
|
The method for steering model generation, one of:
Behavior changed in 0.2.8 Added support for |
'function_calling'
|
include_raw
|
bool
|
If False then only the parsed structured output is returned. If
an error occurs during model output parsing it will be raised. If True
then both the raw model response (a BaseMessage) and the parsed model
response will be returned. If an error occurs during output parsing it
will be caught and returned as well. The final output is always a dict
with keys |
False
|
kwargs
|
Any
|
Any additional parameters to pass to the
|
{}
|
Returns:
| Type | Description |
|---|---|
Runnable[LanguageModelInput, Union[dict, BaseModel]]
|
A Runnable that takes same inputs as a |
Runnable[LanguageModelInput, Union[dict, BaseModel]]
|
If |
Runnable[LanguageModelInput, Union[dict, BaseModel]]
|
an instance of |
Runnable[LanguageModelInput, Union[dict, BaseModel]]
|
Otherwise, if |
Runnable[LanguageModelInput, Union[dict, BaseModel]]
|
If |
Runnable[LanguageModelInput, Union[dict, BaseModel]]
|
|
Runnable[LanguageModelInput, Union[dict, BaseModel]]
|
|
Runnable[LanguageModelInput, Union[dict, BaseModel]]
|
|
Example: schema=Pydantic class, method="function_calling", include_raw=False:
.. code-block:: python
from typing import Optional
from langchain_fireworks import ChatFireworks
from pydantic import BaseModel, Field
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
# If we provide default values and/or descriptions for fields, these will be passed
# to the model. This is an important part of improving a model's ability to
# correctly return structured outputs.
justification: Optional[str] = Field(
default=None, description="A justification for the answer."
)
llm = ChatFireworks(
model="accounts/fireworks/models/firefunction-v1",
temperature=0,
)
structured_llm = llm.with_structured_output(AnswerWithJustification)
structured_llm.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> AnswerWithJustification(
# answer='They weigh the same',
# justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'
# )
Example: schema=Pydantic class, method="function_calling", include_raw=True:
.. code-block:: python
from langchain_fireworks import ChatFireworks
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
llm = ChatFireworks(
model="accounts/fireworks/models/firefunction-v1",
temperature=0,
)
structured_llm = llm.with_structured_output(
AnswerWithJustification, include_raw=True
)
structured_llm.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> {
# 'raw': AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Ao02pnFYXD6GN1yzc0uXPsvF', 'function': {'arguments': '{"answer":"They weigh the same.","justification":"Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ."}', 'name': 'AnswerWithJustification'}, 'type': 'function'}]}),
# 'parsed': AnswerWithJustification(answer='They weigh the same.', justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'),
# 'parsing_error': None
# }
Example: schema=TypedDict class, method="function_calling", include_raw=False:
.. code-block:: python
# IMPORTANT: If you are using Python <=3.8, you need to import Annotated
# from typing_extensions, not from typing.
from typing_extensions import Annotated, TypedDict
from langchain_fireworks import ChatFireworks
class AnswerWithJustification(TypedDict):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: Annotated[
Optional[str], None, "A justification for the answer."
]
llm = ChatFireworks(
model="accounts/fireworks/models/firefunction-v1",
temperature=0,
)
structured_llm = llm.with_structured_output(AnswerWithJustification)
structured_llm.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> {
# 'answer': 'They weigh the same',
# 'justification': 'Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume and density of the two substances differ.'
# }
Example: schema=OpenAI function schema, method="function_calling", include_raw=False:
.. code-block:: python
from langchain_fireworks import ChatFireworks
oai_schema = {
"name": "AnswerWithJustification",
"description": "An answer to the user question along with justification for the answer.",
"parameters": {
"type": "object",
"properties": {
"answer": {"type": "string"},
"justification": {
"description": "A justification for the answer.",
"type": "string",
},
},
"required": ["answer"],
},
}
llm = ChatFireworks(
model="accounts/fireworks/models/firefunction-v1",
temperature=0,
)
structured_llm = llm.with_structured_output(oai_schema)
structured_llm.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> {
# 'answer': 'They weigh the same',
# 'justification': 'Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume and density of the two substances differ.'
# }
Example: schema=Pydantic class, method="json_mode", include_raw=True:
.. code-block::
from langchain_fireworks import ChatFireworks
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
answer: str
justification: str
llm = ChatFireworks(model="accounts/fireworks/models/firefunction-v1", temperature=0)
structured_llm = llm.with_structured_output(
AnswerWithJustification,
method="json_mode",
include_raw=True
)
structured_llm.invoke(
"Answer the following question. "
"Make sure to return a JSON blob with keys 'answer' and 'justification'. "
"What's heavier a pound of bricks or a pound of feathers?"
)
# -> {
# 'raw': AIMessage(content='{"answer": "They are both the same weight.", "justification": "Both a pound of bricks and a pound of feathers weigh one pound. The difference lies in the volume and density of the materials, not the weight."}'),
# 'parsed': AnswerWithJustification(answer='They are both the same weight.', justification='Both a pound of bricks and a pound of feathers weigh one pound. The difference lies in the volume and density of the materials, not the weight.'),
# 'parsing_error': None
# }
Example: schema=None, method="json_mode", include_raw=True:
.. code-block::
structured_llm = llm.with_structured_output(method="json_mode", include_raw=True)
structured_llm.invoke(
"Answer the following question. "
"Make sure to return a JSON blob with keys 'answer' and 'justification'. "
"What's heavier a pound of bricks or a pound of feathers?"
)
# -> {
# 'raw': AIMessage(content='{"answer": "They are both the same weight.", "justification": "Both a pound of bricks and a pound of feathers weigh one pound. The difference lies in the volume and density of the materials, not the weight."}'),
# 'parsed': {
# 'answer': 'They are both the same weight.',
# 'justification': 'Both a pound of bricks and a pound of feathers weigh one pound. The difference lies in the volume and density of the materials, not the weight.'
# },
# 'parsing_error': None
# }
FireworksEmbeddings
¶
Bases: BaseModel, Embeddings
Fireworks embedding model integration.
Setup:
Install ``langchain_fireworks`` and set environment variable
``FIREWORKS_API_KEY``.
.. code-block:: bash
pip install -U langchain_fireworks
export FIREWORKS_API_KEY="your-api-key"
Key init args — completion params: model: str Name of Fireworks model to use.
Key init args — client params: fireworks_api_key: SecretStr Fireworks API key.
See full list of supported init args and their descriptions in the params section.
Instantiate:
.. code-block:: python
from langchain_fireworks import FireworksEmbeddings
model = FireworksEmbeddings(
model="nomic-ai/nomic-embed-text-v1.5"
# Use FIREWORKS_API_KEY env var or pass it in directly
# fireworks_api_key="..."
)
Embed multiple texts:
.. code-block:: python
vectors = embeddings.embed_documents(["hello", "goodbye"])
# Showing only the first 3 coordinates
print(len(vectors))
print(vectors[0][:3])
.. code-block:: python
2
[-0.024603435769677162, -0.007543657906353474, 0.0039630369283258915]
Embed single text:
.. code-block:: python
input_text = "The meaning of life is 42"
vector = embeddings.embed_query("hello")
print(vector[:3])
.. code-block:: python
[-0.024603435769677162, -0.007543657906353474, 0.0039630369283258915]
Methods:
| Name | Description |
|---|---|
aembed_documents |
Asynchronous Embed search docs. |
aembed_query |
Asynchronous Embed query text. |
validate_environment |
Validate environment variables. |
embed_documents |
Embed search docs. |
embed_query |
Embed query text. |
Attributes:
| Name | Type | Description |
|---|---|---|
fireworks_api_key |
SecretStr
|
Fireworks API key. |
fireworks_api_key
class-attribute
instance-attribute
¶
fireworks_api_key: SecretStr = Field(
alias="api_key",
default_factory=secret_from_env(
"FIREWORKS_API_KEY", default=""
),
)
Fireworks API key.
Automatically read from env variable FIREWORKS_API_KEY if not provided.
aembed_documents
async
¶
aembed_query
async
¶
Fireworks
¶
Bases: LLM
LLM models from Fireworks.
To use, you'll need an API key <https://fireworks.ai>__. This can be passed in as
init param fireworks_api_key or set as environment variable
FIREWORKS_API_KEY.
Fireworks AI API reference <https://readme.fireworks.ai/>__
Example:
.. code-block:: python
response = fireworks.generate(["Tell me a joke."])
Methods:
| Name | Description |
|---|---|
get_name |
Get the name of the |
get_input_schema |
Get a pydantic model that can be used to validate input to the Runnable. |
get_input_jsonschema |
Get a JSON schema that represents the input to the |
get_output_schema |
Get a pydantic model that can be used to validate output to the |
get_output_jsonschema |
Get a JSON schema that represents the output of the |
config_schema |
The type of config this |
get_config_jsonschema |
Get a JSON schema that represents the config of the |
get_graph |
Return a graph representation of this |
get_prompts |
Return a list of prompts used by this |
__or__ |
Runnable "or" operator. |
__ror__ |
Runnable "reverse-or" operator. |
pipe |
Pipe runnables. |
pick |
Pick keys from the output dict of this |
assign |
Assigns new fields to the dict output of this |
batch_as_completed |
Run |
abatch_as_completed |
Run |
astream_log |
Stream all output from a |
astream_events |
Generate a stream of events. |
transform |
Transform inputs to outputs. |
atransform |
Transform inputs to outputs. |
bind |
Bind arguments to a |
with_config |
Bind config to a |
with_listeners |
Bind lifecycle listeners to a |
with_alisteners |
Bind async lifecycle listeners to a |
with_types |
Bind input and output types to a |
with_retry |
Create a new Runnable that retries the original Runnable on exceptions. |
map |
Return a new |
with_fallbacks |
Add fallbacks to a |
as_tool |
Create a |
__init__ |
|
is_lc_serializable |
Is this class serializable? |
get_lc_namespace |
Get the namespace of the langchain object. |
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the |
to_json_not_implemented |
Serialize a "not implemented" object. |
configurable_fields |
Configure particular |
configurable_alternatives |
Configure alternatives for |
set_verbose |
If verbose is None, set it. |
with_structured_output |
Not implemented on this class. |
get_token_ids |
Return the ordered ids of the tokens in a text. |
get_num_tokens |
Get the number of tokens present in the text. |
get_num_tokens_from_messages |
Get the number of tokens in the messages. |
generate |
Pass a sequence of prompts to a model and return generations. |
agenerate |
Asynchronously pass a sequence of prompts to a model and return generations. |
__str__ |
Return a string representation of the object for printing. |
dict |
Return a dictionary of the LLM. |
save |
Save the LLM. |
build_extra |
Build extra kwargs from additional params that were passed in. |
Attributes:
| Name | Type | Description |
|---|---|---|
InputType |
TypeAlias
|
Get the input type for this runnable. |
OutputType |
type[str]
|
Get the input type for this runnable. |
input_schema |
type[BaseModel]
|
The type of input this |
output_schema |
type[BaseModel]
|
Output schema. |
config_specs |
list[ConfigurableFieldSpec]
|
List configurable fields for this |
lc_secrets |
dict[str, str]
|
A map of constructor argument names to secret ids. |
lc_attributes |
dict
|
List of attribute names that should be included in the serialized kwargs. |
cache |
BaseCache | bool | None
|
Whether to cache the response. |
verbose |
bool
|
Whether to print out response text. |
callbacks |
Callbacks
|
Callbacks to add to the run trace. |
tags |
list[str] | None
|
Tags to add to the run trace. |
metadata |
dict[str, Any] | None
|
Metadata to add to the run trace. |
custom_get_token_ids |
Callable[[str], list[int]] | None
|
Optional encoder to use for counting tokens. |
base_url |
str
|
Base inference API URL. |
fireworks_api_key |
SecretStr
|
Fireworks API key. |
model |
str
|
Model name. |
temperature |
Optional[float]
|
Model temperature. |
top_p |
Optional[float]
|
Used to dynamically adjust the number of choices for each predicted token based |
model_kwargs |
dict[str, Any]
|
Holds any model parameters valid for |
top_k |
Optional[int]
|
Used to limit the number of choices for the next predicted word or token. It |
max_tokens |
Optional[int]
|
The maximum number of tokens to generate. |
repetition_penalty |
Optional[float]
|
A number that controls the diversity of generated text by reducing the likelihood |
logprobs |
Optional[int]
|
An integer that specifies how many top token log probabilities are included in |
timeout |
Optional[int]
|
Timeout in seconds for requests to the Fireworks API. |
input_schema
property
¶
input_schema: type[BaseModel]
The type of input this Runnable accepts specified as a pydantic model.
output_schema
property
¶
output_schema: type[BaseModel]
Output schema.
The type of output this Runnable produces specified as a pydantic model.
config_specs
property
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable.
lc_secrets
property
¶
A map of constructor argument names to secret ids.
For example,
lc_attributes
property
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor. Default is an empty dictionary.
cache
class-attribute
instance-attribute
¶
cache: BaseCache | bool | None = Field(
default=None, exclude=True
)
Whether to cache the response.
- If true, will use the global cache.
- If false, will not use a cache
- If None, will use the global cache if it's set, otherwise no cache.
- If instance of
BaseCache, will use the provided cache.
Caching is not currently supported for streaming methods of models.
verbose
class-attribute
instance-attribute
¶
verbose: bool = Field(
default_factory=_get_verbosity, exclude=True, repr=False
)
Whether to print out response text.
callbacks
class-attribute
instance-attribute
¶
Callbacks to add to the run trace.
tags
class-attribute
instance-attribute
¶
Tags to add to the run trace.
metadata
class-attribute
instance-attribute
¶
Metadata to add to the run trace.
custom_get_token_ids
class-attribute
instance-attribute
¶
Optional encoder to use for counting tokens.
base_url
class-attribute
instance-attribute
¶
base_url: str = (
"https://api.fireworks.ai/inference/v1/completions"
)
Base inference API URL.
fireworks_api_key
class-attribute
instance-attribute
¶
fireworks_api_key: SecretStr = Field(
alias="api_key",
default_factory=secret_from_env(
"FIREWORKS_API_KEY",
error_message="You must specify an api key. You can pass it an argument as `api_key=...` or set the environment variable `FIREWORKS_API_KEY`.",
),
)
Fireworks API key.
Automatically read from env variable FIREWORKS_API_KEY if not provided.
model
instance-attribute
¶
model: str
Model name. (Available models) <https://readme.fireworks.ai/>__
temperature
class-attribute
instance-attribute
¶
Model temperature.
top_p
class-attribute
instance-attribute
¶
Used to dynamically adjust the number of choices for each predicted token based
on the cumulative probabilities. A value of 1 will always yield the same output.
A temperature less than 1 favors more correctness and is appropriate for
question answering or summarization. A value greater than 1 introduces more
randomness in the output.
model_kwargs
class-attribute
instance-attribute
¶
Holds any model parameters valid for create call not explicitly specified.
top_k
class-attribute
instance-attribute
¶
Used to limit the number of choices for the next predicted word or token. It specifies the maximum number of tokens to consider at each step, based on their probability of occurrence. This technique helps to speed up the generation process and can improve the quality of the generated text by focusing on the most likely options.
max_tokens
class-attribute
instance-attribute
¶
The maximum number of tokens to generate.
repetition_penalty
class-attribute
instance-attribute
¶
A number that controls the diversity of generated text by reducing the likelihood of repeated sequences. Higher values decrease repetition.
logprobs
class-attribute
instance-attribute
¶
An integer that specifies how many top token log probabilities are included in the response for each token generation step.
timeout
class-attribute
instance-attribute
¶
Timeout in seconds for requests to the Fireworks API.
get_name
¶
get_input_schema
¶
get_input_schema(
config: RunnableConfig | None = None,
) -> type[BaseModel]
Get a pydantic model that can be used to validate input to the Runnable.
Runnables that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic input schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an input schema for a specific configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
| Type | Description |
|---|---|
type[BaseModel]
|
A pydantic model that can be used to validate input. |
get_input_jsonschema
¶
Get a JSON schema that represents the input to the Runnable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A JSON schema that represents the input to the |
Example
Added in version 0.3.0
get_output_schema
¶
get_output_schema(
config: RunnableConfig | None = None,
) -> type[BaseModel]
Get a pydantic model that can be used to validate output to the Runnable.
Runnables that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic output schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an output schema for a specific configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
| Type | Description |
|---|---|
type[BaseModel]
|
A pydantic model that can be used to validate output. |
get_output_jsonschema
¶
Get a JSON schema that represents the output of the Runnable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig | None
|
A config to use when generating the schema. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A JSON schema that represents the output of the |
Example
Added in version 0.3.0
config_schema
¶
The type of config this Runnable accepts specified as a pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives methods.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include
|
Sequence[str] | None
|
A list of fields to include in the config schema. |
None
|
Returns:
| Type | Description |
|---|---|
type[BaseModel]
|
A pydantic model that can be used to validate config. |
get_config_jsonschema
¶
Get a JSON schema that represents the config of the Runnable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include
|
Sequence[str] | None
|
A list of fields to include in the config schema. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A JSON schema that represents the config of the |
Added in version 0.3.0
get_graph
¶
Return a graph representation of this Runnable.
get_prompts
¶
get_prompts(
config: RunnableConfig | None = None,
) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable.
__or__
¶
__or__(
other: (
Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[
[AsyncIterator[Any]], AsyncIterator[Other]
]
| Callable[[Any], Other]
| Mapping[
str,
Runnable[Any, Other]
| Callable[[Any], Other]
| Any,
]
),
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
Runnable[Any, Other] | Callable[[Iterator[Any]], Iterator[Other]] | Callable[[AsyncIterator[Any]], AsyncIterator[Other]] | Callable[[Any], Other] | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any]
|
Another |
required |
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Input, Other]
|
A new |
__ror__
¶
__ror__(
other: (
Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[
[AsyncIterator[Other]], AsyncIterator[Any]
]
| Callable[[Other], Any]
| Mapping[
str,
Runnable[Other, Any]
| Callable[[Other], Any]
| Any,
]
),
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
Runnable[Other, Any] | Callable[[Iterator[Other]], Iterator[Any]] | Callable[[AsyncIterator[Other]], AsyncIterator[Any]] | Callable[[Other], Any] | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any]
|
Another |
required |
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Other, Output]
|
A new |
pipe
¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other],
name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe runnables.
Compose this Runnable with Runnable-like objects to make a
RunnableSequence.
Equivalent to RunnableSequence(self, *others) or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*others
|
Runnable[Any, Other] | Callable[[Any], Other]
|
Other |
()
|
name
|
str | None
|
An optional name for the resulting |
None
|
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Input, Other]
|
A new |
pick
¶
Pick keys from the output dict of this Runnable.
Pick single key:
```python
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
```
Pick list of keys:
```python
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(
str=as_str, json=as_json, bytes=RunnableLambda(as_bytes)
)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
```
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
str | list[str]
|
A key or list of keys to pick from the output dict. |
required |
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Any, Any]
|
a new |
assign
¶
assign(
**kwargs: (
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[
str,
Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any],
]
),
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable.
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
llm = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | llm | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | llm)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any] | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]]
|
A mapping of keys to |
{}
|
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Any, Any]
|
A new |
batch_as_completed
¶
batch_as_completed(
inputs: Sequence[Input],
config: (
RunnableConfig | Sequence[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> Iterator[tuple[int, Output | Exception]]
Run invoke in parallel on a list of inputs.
Yields results as they complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
Sequence[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | Sequence[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
**kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
| Type | Description |
|---|---|
tuple[int, Output | Exception]
|
Tuples of the index of the input and the output from the |
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence[Input],
config: (
RunnableConfig | Sequence[RunnableConfig] | None
) = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke in parallel on a list of inputs.
Yields results as they complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
Sequence[Input]
|
A list of inputs to the |
required |
config
|
RunnableConfig | Sequence[RunnableConfig] | None
|
A config to use when invoking the |
None
|
return_exceptions
|
bool
|
Whether to return exceptions instead of raising them. Defaults to False. |
False
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
A tuple of the index of the input and the output from the |
astream_log
async
¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input
|
Any
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
diff
|
bool
|
Whether to yield diffs between each step or the current state. |
True
|
with_streamed_output_list
|
bool
|
Whether to yield the |
True
|
include_names
|
Sequence[str] | None
|
Only include logs with these names. |
None
|
include_types
|
Sequence[str] | None
|
Only include logs with these types. |
None
|
include_tags
|
Sequence[str] | None
|
Only include logs with these tags. |
None
|
exclude_names
|
Sequence[str] | None
|
Exclude logs with these names. |
None
|
exclude_types
|
Sequence[str] | None
|
Exclude logs with these types. |
None
|
exclude_tags
|
Sequence[str] | None
|
Exclude logs with these tags. |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
A |
astream_events
async
¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvents that provide real-time information
about the progress of the Runnable, including StreamEvents from intermediate
results.
A StreamEvent is a dictionary with the following schema:
event: str - Event names are of the format:on_[runnable_type]_(start|stream|end).name: str - The name of theRunnablethat generated the event.run_id: str - randomly generated ID associated with the given execution of theRunnablethat emitted the event. A childRunnablethat gets invoked as part of the execution of a parentRunnableis assigned its own unique ID.parent_ids: list[str] - The IDs of the parent runnables that generated the event. The rootRunnablewill have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags: Optional[list[str]] - The tags of theRunnablethat generated the event.metadata: Optional[dict[str, Any]] - The metadata of theRunnablethat generated the event.data: dict[str, Any]
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| event | name | chunk | input | output |
+==========================+==================+=====================================+===================================================+=====================================================+
| on_chat_model_start | [model name] | | {"messages": [[SystemMessage, HumanMessage]]} | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chat_model_stream | [model name] | AIMessageChunk(content="hello") | | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chat_model_end | [model name] | | {"messages": [[SystemMessage, HumanMessage]]} | AIMessageChunk(content="hello world") |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_start | [model name] | | {'input': 'hello'} | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_stream | [model name] | 'Hello' | | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_llm_end | [model name] | | 'Hello human!' | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_start | format_docs | | | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_stream | format_docs | 'hello world!, goodbye world!' | | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_chain_end | format_docs | | [Document(...)] | 'hello world!, goodbye world!' |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_tool_start | some_tool | | {"x": 1, "y": "2"} | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_tool_end | some_tool | | | {"x": 1, "y": "2"} |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_retriever_start | [retriever name] | | {"query": "hello"} | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_retriever_end | [retriever name] | | {"query": "hello"} | [Document(...), ..] |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_prompt_start | [template_name] | | {"question": "hello"} | |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
| on_prompt_end | [template_name] | | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage, ...]) |
+--------------------------+------------------+-------------------------------------+---------------------------------------------------+-----------------------------------------------------+
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
+-----------+------+-----------------------------------------------------------------------------------------------------------+ | Attribute | Type | Description | +===========+======+===========================================================================================================+ | name | str | A user defined name for the event. | +-----------+------+-----------------------------------------------------------------------------------------------------------+ | data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. | +-----------+------+-----------------------------------------------------------------------------------------------------------+
Here are declarations associated with the standard events shown above:
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
Example: Dispatch Custom Event
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input
|
Any
|
The input to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
version
|
Literal['v1', 'v2']
|
The version of the schema to use either |
'v2'
|
include_names
|
Sequence[str] | None
|
Only include events from |
None
|
include_types
|
Sequence[str] | None
|
Only include events from |
None
|
include_tags
|
Sequence[str] | None
|
Only include events from |
None
|
exclude_names
|
Sequence[str] | None
|
Exclude events from |
None
|
exclude_types
|
Sequence[str] | None
|
Exclude events from |
None
|
exclude_tags
|
Sequence[str] | None
|
Exclude events from |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[StreamEvent]
|
An async stream of |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If the version is not |
transform
¶
transform(
input: Iterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream.
Subclasses should override this method if they can start producing output while input is still being generated.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input
|
Iterator[Input]
|
An iterator of inputs to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
| Type | Description |
|---|---|
Output
|
The output of the |
atransform
async
¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream.
Subclasses should override this method if they can start producing output while input is still being generated.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input
|
AsyncIterator[Input]
|
An async iterator of inputs to the |
required |
config
|
RunnableConfig | None
|
The config to use for the |
None
|
kwargs
|
Any | None
|
Additional keyword arguments to pass to the |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[Output]
|
The output of the |
bind
¶
bind(**kwargs: Any) -> Runnable[Input, Output]
Bind arguments to a Runnable, returning a new Runnable.
Useful when a Runnable in a chain requires an argument that is not
in the output of the previous Runnable or included in the user input.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
kwargs
|
Any
|
The arguments to bind to the |
{}
|
Returns:
| Type | Description |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
llm = ChatOllama(model="llama3.1")
# Without bind.
chain = llm | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind.
chain = llm.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable, returning a new Runnable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig | None
|
The config to bind to the |
None
|
kwargs
|
Any
|
Additional keyword arguments to pass to the |
{}
|
Returns:
| Type | Description |
|---|---|
Runnable[Input, Output]
|
A new |
with_listeners
¶
with_listeners(
*,
on_start: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None,
on_end: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None,
on_error: (
Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None
) = None
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable, returning a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_start
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called before the |
None
|
on_end
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called after the |
None
|
on_error
|
Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None
|
Called if the |
None
|
Returns:
| Type | Description |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable.
Returns a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_start
|
AsyncListener | None
|
Called asynchronously before the |
None
|
on_end
|
AsyncListener | None
|
Called asynchronously after the |
None
|
on_error
|
AsyncListener | None
|
Called asynchronously if the |
None
|
Returns:
| Type | Description |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*,
input_type: type[Input] | None = None,
output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable, returning a new Runnable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_type
|
type[Input] | None
|
The input type to bind to the |
None
|
output_type
|
type[Output] | None
|
The output type to bind to the |
None
|
Returns:
| Type | Description |
|---|---|
Runnable[Input, Output]
|
A new Runnable with the types bound. |
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple[
type[BaseException], ...
] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: (
ExponentialJitterParams | None
) = None,
stop_after_attempt: int = 3
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retry_if_exception_type
|
tuple[type[BaseException], ...]
|
A tuple of exception types to retry on. Defaults to (Exception,). |
(Exception,)
|
wait_exponential_jitter
|
bool
|
Whether to add jitter to the wait time between retries. Defaults to True. |
True
|
stop_after_attempt
|
int
|
The maximum number of attempts to make before giving up. Defaults to 3. |
3
|
exponential_jitter_params
|
ExponentialJitterParams | None
|
Parameters for
|
None
|
Returns:
| Type | Description |
|---|---|
Runnable[Input, Output]
|
A new Runnable that retries the original Runnable on exceptions. |
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map
¶
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[
type[BaseException], ...
] = (Exception,),
exception_key: str | None = None
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable, returning a new Runnable.
The new Runnable will try the original Runnable, and then each fallback
in order, upon failures.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fallbacks
|
Sequence[Runnable[Input, Output]]
|
A sequence of runnables to try if the original |
required |
exceptions_to_handle
|
tuple[type[BaseException], ...]
|
A tuple of exception types to handle.
Defaults to |
(Exception,)
|
exception_key
|
str | None
|
If string is specified then handled exceptions will be passed
to fallbacks as part of the input under the specified key.
If None, exceptions will not be passed to fallbacks.
If used, the base |
None
|
Returns:
| Type | Description |
|---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
RunnableWithFallbacks[Input, Output]
|
fallback in order, upon failures. |
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fallbacks
|
Sequence[Runnable[Input, Output]]
|
A sequence of runnables to try if the original |
required |
exceptions_to_handle
|
tuple[type[BaseException], ...]
|
A tuple of exception types to handle. |
(Exception,)
|
exception_key
|
str | None
|
If string is specified then handled exceptions will be passed
to fallbacks as part of the input under the specified key.
If None, exceptions will not be passed to fallbacks.
If used, the base |
None
|
Returns:
| Type | Description |
|---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
RunnableWithFallbacks[Input, Output]
|
fallback in order, upon failures. |
as_tool
¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None
) -> BaseTool
Create a BaseTool from a Runnable.
as_tool will instantiate a BaseTool with a name, description, and
args_schema from a Runnable. Where possible, schemas are inferred
from runnable.get_input_schema. Alternatively (e.g., if the
Runnable takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema. You can also
pass arg_types to just specify the required arguments and their types.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
args_schema
|
type[BaseModel] | None
|
The schema for the tool. Defaults to None. |
None
|
name
|
str | None
|
The name of the tool. Defaults to None. |
None
|
description
|
str | None
|
The description of the tool. Defaults to None. |
None
|
arg_types
|
dict[str, type] | None
|
A dictionary of argument names to types. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
BaseTool
|
A |
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via args_schema:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via arg_types:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
from langchain_core.runnables import RunnableLambda
def f(x: str) -> str:
return x + "a"
def g(x: str) -> str:
return x + "z"
runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")
Added in version 0.2.14
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
Is this class serializable?
By design, even if a class inherits from Serializable, it is not serializable by default. This is to prevent accidental serialization of objects that should not be serialized.
Returns:
| Type | Description |
|---|---|
bool
|
Whether the class is serializable. Default is False. |
get_lc_namespace
classmethod
¶
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path
to the object.
For example, for the class langchain.llms.openai.OpenAI, the id is
["langchain", "llms", "openai", "OpenAI"].
to_json
¶
Serialize the Runnable to JSON.
Returns:
| Type | Description |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
A JSON-serializable representation of the |
to_json_not_implemented
¶
Serialize a "not implemented" object.
Returns:
| Type | Description |
|---|---|
SerializedNotImplemented
|
SerializedNotImplemented. |
configurable_fields
¶
Configure particular Runnable fields at runtime.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
AnyConfigurableField
|
A dictionary of |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If a configuration key is not found in the |
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives
¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: (
Runnable[Input, Output]
| Callable[[], Runnable[Input, Output]]
)
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnables that can be set at runtime.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
which
|
ConfigurableField
|
The |
required |
default_key
|
str
|
The default key to use if no alternative is selected.
Defaults to |
'default'
|
prefix_keys
|
bool
|
Whether to prefix the keys with the |
False
|
**kwargs
|
Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]
|
A dictionary of keys to |
{}
|
Returns:
| Type | Description |
|---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-3-7-sonnet-20250219"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
set_verbose
¶
with_structured_output
¶
with_structured_output(
schema: dict | type, **kwargs: Any
) -> Runnable[LanguageModelInput, dict | BaseModel]
Not implemented on this class.
get_token_ids
¶
get_num_tokens
¶
get_num_tokens_from_messages
¶
Get the number of tokens in the messages.
Useful for checking if an input fits in a model's context window.
Note
The base implementation of get_num_tokens_from_messages ignores tool
schemas.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
list[BaseMessage]
|
The message inputs to tokenize. |
required |
tools
|
Sequence | None
|
If provided, sequence of dict, |
None
|
Returns:
| Type | Description |
|---|---|
int
|
The sum of the number of tokens across the messages. |
generate
¶
generate(
prompts: list[str],
stop: list[str] | None = None,
callbacks: Callbacks | list[Callbacks] | None = None,
*,
tags: list[str] | list[list[str]] | None = None,
metadata: (
dict[str, Any] | list[dict[str, Any]] | None
) = None,
run_name: str | list[str] | None = None,
run_id: UUID | list[UUID | None] | None = None,
**kwargs: Any
) -> LLMResult
Pass a sequence of prompts to a model and return generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prompts
|
list[str]
|
List of string prompts. |
required |
stop
|
list[str] | None
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
None
|
callbacks
|
Callbacks | list[Callbacks] | None
|
Callbacks to pass through. Used for executing additional functionality, such as logging or streaming, throughout generation. |
None
|
tags
|
list[str] | list[list[str]] | None
|
List of tags to associate with each prompt. If provided, the length of the list must match the length of the prompts list. |
None
|
metadata
|
dict[str, Any] | list[dict[str, Any]] | None
|
List of metadata dictionaries to associate with each prompt. If provided, the length of the list must match the length of the prompts list. |
None
|
run_name
|
str | list[str] | None
|
List of run names to associate with each prompt. If provided, the length of the list must match the length of the prompts list. |
None
|
run_id
|
UUID | list[UUID | None] | None
|
List of run IDs to associate with each prompt. If provided, the length of the list must match the length of the prompts list. |
None
|
**kwargs
|
Any
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call. |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If prompts is not a list. |
ValueError
|
If the length of |
Returns:
| Type | Description |
|---|---|
LLMResult
|
An LLMResult, which contains a list of candidate Generations for each input prompt and additional model provider-specific output. |
agenerate
async
¶
agenerate(
prompts: list[str],
stop: list[str] | None = None,
callbacks: Callbacks | list[Callbacks] | None = None,
*,
tags: list[str] | list[list[str]] | None = None,
metadata: (
dict[str, Any] | list[dict[str, Any]] | None
) = None,
run_name: str | list[str] | None = None,
run_id: UUID | list[UUID | None] | None = None,
**kwargs: Any
) -> LLMResult
Asynchronously pass a sequence of prompts to a model and return generations.
This method should make use of batched calls for models that expose a batched API.
Use this method when you want to:
- Take advantage of batched calls,
- Need more output from the model than just the top generated value,
- Are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prompts
|
list[str]
|
List of string prompts. |
required |
stop
|
list[str] | None
|
Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. |
None
|
callbacks
|
Callbacks | list[Callbacks] | None
|
Callbacks to pass through. Used for executing additional functionality, such as logging or streaming, throughout generation. |
None
|
tags
|
list[str] | list[list[str]] | None
|
List of tags to associate with each prompt. If provided, the length of the list must match the length of the prompts list. |
None
|
metadata
|
dict[str, Any] | list[dict[str, Any]] | None
|
List of metadata dictionaries to associate with each prompt. If provided, the length of the list must match the length of the prompts list. |
None
|
run_name
|
str | list[str] | None
|
List of run names to associate with each prompt. If provided, the length of the list must match the length of the prompts list. |
None
|
run_id
|
UUID | list[UUID | None] | None
|
List of run IDs to associate with each prompt. If provided, the length of the list must match the length of the prompts list. |
None
|
**kwargs
|
Any
|
Arbitrary additional keyword arguments. These are usually passed to the model provider API call. |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the length of |
Returns:
| Type | Description |
|---|---|
LLMResult
|
An LLMResult, which contains a list of candidate Generations for each input prompt and additional model provider-specific output. |
save
¶
Save the LLM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
Path | str
|
Path to file to save the LLM to. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the file path is not a string or Path object. |
Example:
.. code-block:: python
llm.save(file_path="path/llm.yaml")