Runnable that runs a generator function.
RunnableGenerators can be instantiated directly or by using a generator within
a sequence.
RunnableGenerators can be used to implement custom behavior, such as custom
output parsers, while preserving streaming capabilities. Given a generator function
with a signature Iterator[A] -> Iterator[B], wrapping it in a
RunnableGenerator allows it to emit output chunks as soon as they are streamed
in from the previous step.
If a generator function has a signature A -> Iterator[B], such that it
requires its input from the previous step to be completed before emitting chunks
(e.g., most LLMs need the entire prompt available to start generating), it can
instead be wrapped in a RunnableLambda.
Here is an example to show the basic mechanics of a RunnableGenerator:
from typing import Any, AsyncIterator, Iterator
from langchain_core.runnables import RunnableGenerator
def gen(input: Iterator[Any]) -> Iterator[str]:
for token in ["Have", " a", " nice", " day"]:
yield token
runnable = RunnableGenerator(gen)
runnable.invoke(None) # "Have a nice day"
list(runnable.stream(None)) # ["Have", " a", " nice", " day"]
runnable.batch([None, None]) # ["Have a nice day", "Have a nice day"]
# Async version:
async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]:
for token in ["Have", " a", " nice", " day"]:
yield token
runnable = RunnableGenerator(agen)
await runnable.ainvoke(None) # "Have a nice day"
[p async for p in runnable.astream(None)] # ["Have", " a", " nice", " day"]
RunnableGenerator makes it easy to implement custom behavior within a streaming
context. Below we show an example:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableGenerator, RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI()
chant_chain = (
ChatPromptTemplate.from_template("Give me a 3 word chant about {topic}")
| model
| StrOutputParser()
)
def character_generator(input: Iterator[str]) -> Iterator[str]:
for token in input:
if "," in token or "." in token:
yield "π" + token
else:
yield token
runnable = chant_chain | character_generator
assert type(runnable.last) is RunnableGenerator
"".join(runnable.stream({"topic": "waste"})) # Reduceπ, Reuseπ, Recycleπ.
# Note that RunnableLambda can be used to delay streaming of one step in a
# sequence until the previous step is finished:
def reverse_generator(input: str) -> Iterator[str]:
# Yield characters of input in reverse order.
for character in input[::-1]:
yield character
runnable = chant_chain | RunnableLambda(reverse_generator)
"".join(runnable.stream({"topic": "waste"})) # ".elcycer ,esuer ,ecudeR"RunnableGenerator(
self,
transform: Callable[[Iterator[Input]], Iterator[Output]] | Callable[[AsyncIterator[Input]], AsyncIterator[Output]],
atransform: Callable[[AsyncIterator[Input]], AsyncIterator[Output]] | None = None,
*,
name: str | None = None
)| Name | Type | Description |
|---|---|---|
transform* | Callable[[Iterator[Input]], Iterator[Output]] | Callable[[AsyncIterator[Input]], AsyncIterator[Output]] | The transform function. |
atransform | Callable[[AsyncIterator[Input]], AsyncIterator[Output]] | None | Default: NoneThe async transform function. |
name | str | None | Default: NoneThe name of the |
| Name | Type |
|---|---|
| transform | Callable[[Iterator[Input]], Iterator[Output]] | Callable[[AsyncIterator[Input]], AsyncIterator[Output]] |
| atransform | Callable[[AsyncIterator[Input]], AsyncIterator[Output]] | None |
| name | str | None |
Get the name of the Runnable.
Get a JSON schema that represents the input to the Runnable.
Get a JSON schema that represents the output of the Runnable.
The type of config this Runnable accepts specified as a Pydantic model.
Get a JSON schema that represents the config of the Runnable.
Return a graph representation of this Runnable.
Return a list of prompts used by this Runnable.
Pipe Runnable objects.
Pick keys from the output dict of this Runnable.
Assigns new fields to the dict output of this Runnable.
Default implementation runs invoke in parallel using a thread pool executor.
Run invoke in parallel on a list of inputs.
Default implementation runs ainvoke in parallel using asyncio.gather.
Run ainvoke in parallel on a list of inputs.
Stream all output from a Runnable, as reported to the callback system.
Generate a stream of events.
Bind arguments to a Runnable, returning a new Runnable.
Bind config to a Runnable, returning a new Runnable.
Bind lifecycle listeners to a Runnable, returning a new Runnable.
Bind async lifecycle listeners to a Runnable.
Bind input and output types to a Runnable, returning a new Runnable.
Create a new Runnable that retries the original Runnable on exceptions.
Return a new Runnable that maps a list of inputs to a list of outputs.
Add fallbacks to a Runnable, returning a new Runnable.
Create a BaseTool from a Runnable.