LangChain Reference home pageLangChain ReferenceLangChain Reference
  • GitHub
  • Main Docs
Deep Agents
LangChain
LangGraph
Integrations
LangSmith
  • Overview
    • Overview
    • Caches
    • Callbacks
    • Documents
    • Document loaders
    • Embeddings
    • Exceptions
    • Language models
    • Serialization
    • Output parsers
    • Prompts
    • Rate limiters
    • Retrievers
    • Runnables
    • Utilities
    • Vector stores
    MCP Adapters
    Standard Tests
    Text Splitters
    ⌘I

    LangChain Assistant

    Ask a question to get started

    Enter to send•Shift+Enter new line

    Menu

    OverviewCachesCallbacksDocumentsDocument loadersEmbeddingsExceptionsLanguage modelsSerializationOutput parsersPromptsRate limitersRetrieversRunnablesUtilitiesVector stores
    MCP Adapters
    Standard Tests
    Text Splitters
    Language
    Theme
    Pythonlangchain-corerunnablesbaseRunnableGenerator
    Class●Since v0.1

    RunnableGenerator

    Runnable that runs a generator function.

    RunnableGenerators can be instantiated directly or by using a generator within a sequence.

    RunnableGenerators can be used to implement custom behavior, such as custom output parsers, while preserving streaming capabilities. Given a generator function with a signature Iterator[A] -> Iterator[B], wrapping it in a RunnableGenerator allows it to emit output chunks as soon as they are streamed in from the previous step.

    Note

    If a generator function has a signature A -> Iterator[B], such that it requires its input from the previous step to be completed before emitting chunks (e.g., most LLMs need the entire prompt available to start generating), it can instead be wrapped in a RunnableLambda.

    Here is an example to show the basic mechanics of a RunnableGenerator:

    from typing import Any, AsyncIterator, Iterator
    
    from langchain_core.runnables import RunnableGenerator
    
    def gen(input: Iterator[Any]) -> Iterator[str]:
        for token in ["Have", " a", " nice", " day"]:
            yield token
    
    runnable = RunnableGenerator(gen)
    runnable.invoke(None)  # "Have a nice day"
    list(runnable.stream(None))  # ["Have", " a", " nice", " day"]
    runnable.batch([None, None])  # ["Have a nice day", "Have a nice day"]
    
    # Async version:
    async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]:
        for token in ["Have", " a", " nice", " day"]:
            yield token
    
    runnable = RunnableGenerator(agen)
    await runnable.ainvoke(None)  # "Have a nice day"
    [p async for p in runnable.astream(None)]  # ["Have", " a", " nice", " day"]

    RunnableGenerator makes it easy to implement custom behavior within a streaming context. Below we show an example:

    from langchain_core.prompts import ChatPromptTemplate
    from langchain_core.runnables import RunnableGenerator, RunnableLambda
    from langchain_openai import ChatOpenAI
    from langchain_core.output_parsers import StrOutputParser
    
    model = ChatOpenAI()
    chant_chain = (
        ChatPromptTemplate.from_template("Give me a 3 word chant about {topic}")
        | model
        | StrOutputParser()
    )
    
    def character_generator(input: Iterator[str]) -> Iterator[str]:
        for token in input:
            if "," in token or "." in token:
                yield "👏" + token
            else:
                yield token
    
    runnable = chant_chain | character_generator
    assert type(runnable.last) is RunnableGenerator
    "".join(runnable.stream({"topic": "waste"}))  # Reduce👏, Reuse👏, Recycle👏.
    
    # Note that RunnableLambda can be used to delay streaming of one step in a
    # sequence until the previous step is finished:
    def reverse_generator(input: str) -> Iterator[str]:
        # Yield characters of input in reverse order.
        for character in input[::-1]:
            yield character
    
    runnable = chant_chain | RunnableLambda(reverse_generator)
    "".join(runnable.stream({"topic": "waste"}))  # ".elcycer ,esuer ,ecudeR"
    Copy
    RunnableGenerator(
      self,
      transform: Callable[[Iterator[Input]], Iterator[Output]] | Callable[[AsyncIterator[Input]], AsyncIterator[Output]],
      atransform: Callable[[AsyncIterator[Input]], AsyncIterator[Output]] | None = None,
      *,
      name: str | None = None
    )

    Bases

    Runnable[Input, Output]

    Used in Docs

    • Build a voice agent with LangChain

    Parameters

    NameTypeDescription
    transform*Callable[[Iterator[Input]], Iterator[Output]] | Callable[[AsyncIterator[Input]], AsyncIterator[Output]]

    The transform function.

    atransformCallable[[AsyncIterator[Input]], AsyncIterator[Output]] | None
    Default:None

    The async transform function.

    namestr | None
    Default:None

    The name of the Runnable.

    Constructors

    constructor
    __init__
    NameType
    transformCallable[[Iterator[Input]], Iterator[Output]] | Callable[[AsyncIterator[Input]], AsyncIterator[Output]]
    atransformCallable[[AsyncIterator[Input]], AsyncIterator[Output]] | None
    namestr | None

    Attributes

    attribute
    name
    attribute
    InputType: Any
    attribute
    OutputType: Any

    Methods

    method
    get_input_schema
    method
    get_output_schema
    method
    transform
    method
    stream
    method
    invoke
    method
    atransform
    method
    astream
    method
    ainvoke

    Inherited fromRunnable

    Attributes

    Ainput_schema: type[BaseModel]
    —

    The type of input this Runnable accepts specified as a Pydantic model.

    Aoutput_schema: type[BaseModel]
    —

    Output schema.

    Aconfig_specs: list[ConfigurableFieldSpec]

    Methods

    Mget_nameMget_input_jsonschema
    —

    Get a JSON schema that represents the input to the Runnable.

    Mget_output_jsonschema
    —

    Get a JSON schema that represents the output of the Runnable.

    Mconfig_schema
    —

    The type of config this Runnable accepts specified as a Pydantic model.

    Mget_config_jsonschema
    —

    Get a JSON schema that represents the config of the Runnable.

    Mget_graphMget_prompts
    —

    Return a list of prompts used by this Runnable.

    Mpipe
    —

    Pipe Runnable objects.

    Mpick
    —

    Pick keys from the output dict of this Runnable.

    Massign
    —

    Merge the Dict input with the output produced by the mapping argument.

    MbatchMbatch_as_completed
    —

    Run invoke in parallel on a list of inputs.

    MabatchMabatch_as_completed
    —

    Run ainvoke in parallel on a list of inputs.

    Mastream_log
    —

    Stream all output from a Runnable, as reported to the callback system.

    Mastream_events
    —

    Generate a stream of events.

    Mbind
    —

    Bind arguments to a Runnable, returning a new Runnable.

    Mwith_configMwith_listeners
    —

    Bind lifecycle listeners to a Runnable, returning a new Runnable.

    Mwith_alisteners
    —

    Bind async lifecycle listeners to a Runnable.

    Mwith_types
    —

    Bind input and output types to a Runnable, returning a new Runnable.

    Mwith_retry
    —

    Create a new Runnable that retries the original Runnable on exceptions.

    Mmap
    —

    Map a function to multiple iterables.

    Mwith_fallbacks
    —

    Add fallbacks to a Runnable, returning a new Runnable.

    Mas_tool
    —

    Create a BaseTool from a Runnable.

    View source on GitHub