Runnable that runs a mapping of Runnables in parallel.
Returns a mapping of their outputs.
RunnableParallel is one of the two main composition primitives,
alongside RunnableSequence. It invokes Runnables concurrently, providing the
same input to each.
A RunnableParallel can be instantiated directly or by using a dict literal
within a sequence.
Here is a simple example that uses functions to illustrate the use of
RunnableParallel:
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
def mul_three(x: int) -> int:
return x * 3
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
runnable_3 = RunnableLambda(mul_three)
sequence = runnable_1 | { # this dict is coerced to a RunnableParallel
"mul_two": runnable_2,
"mul_three": runnable_3,
}
# Or equivalently:
# sequence = runnable_1 | RunnableParallel(
# {"mul_two": runnable_2, "mul_three": runnable_3}
# )
# Also equivalently:
# sequence = runnable_1 | RunnableParallel(
# mul_two=runnable_2,
# mul_three=runnable_3,
# )
sequence.invoke(1)
await sequence.ainvoke(1)
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
RunnableParallel makes it easy to run Runnables in parallel. In the below
example, we simultaneously stream output from two different Runnable objects:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
joke_chain = (
ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
)
poem_chain = (
ChatPromptTemplate.from_template("write a 2-line poem about {topic}")
| model
)
runnable = RunnableParallel(joke=joke_chain, poem=poem_chain)
# Display stream
output = {key: "" for key, _ in runnable.output_schema()}
for chunk in runnable.stream({"topic": "bear"}):
for key in chunk:
output[key] = output[key] + chunk[key].content
print(output) # noqa: T201RunnableParallel(
self,
steps__: Mapping[str, Runnable[Input, Any] | Callable[[Input], Any] | Mapping[str, Runnable[Input, Any] | Callable[[Input], Any]]] | None = None,
**kwargs: Runnable[Input, Any] | Callable[[Input], Any] | Mapping[str, Runnable[Input, Any] | Callable[[Input], Any]] = {}
)| Name | Type | Description |
|---|---|---|
steps__ | Mapping[str, Runnable[Input, Any] | Callable[[Input], Any] | Mapping[str, Runnable[Input, Any] | Callable[[Input], Any]]] | None | Default: NoneThe steps to include. |
**kwargs | Runnable[Input, Any] | Callable[[Input], Any] | Mapping[str, Runnable[Input, Any] | Callable[[Input], Any]] | Default: {}Additional steps to include. |
Return True as this class is serializable.
Get the namespace of the LangChain object.
Get the name of the Runnable.
Get the input schema of the Runnable.
Get the output schema of the Runnable.
Get the graph representation of the Runnable.
Get a JSON schema that represents the input to the Runnable.
Get a JSON schema that represents the output of the Runnable.
The type of config this Runnable accepts specified as a Pydantic model.
Get a JSON schema that represents the config of the Runnable.
Return a list of prompts used by this Runnable.
Pipe Runnable objects.
Pick keys from the output dict of this Runnable.
Merge the Dict input with the output produced by the mapping argument.
Run invoke in parallel on a list of inputs.
Run ainvoke in parallel on a list of inputs.
Stream all output from a Runnable, as reported to the callback system.
Generate a stream of events.
Bind arguments to a Runnable, returning a new Runnable.
Bind lifecycle listeners to a Runnable, returning a new Runnable.
Bind async lifecycle listeners to a Runnable.
Bind input and output types to a Runnable, returning a new Runnable.
Create a new Runnable that retries the original Runnable on exceptions.
Map a function to multiple iterables.
Add fallbacks to a Runnable, returning a new Runnable.
Create a BaseTool from a Runnable.