LangChain Reference home pageLangChain ReferenceLangChain Reference
  • GitHub
  • Main Docs
Deep Agents
LangChain
LangGraph
Integrations
LangSmith
  • Overview
  • Graphs
  • Functional API
  • Pregel
  • Checkpointing
  • Storage
  • Caching
  • Types
  • Runtime
  • Config
  • Errors
  • Constants
  • Channels
  • Agents
LangGraph CLI
LangGraph SDK
LangGraph Supervisor
LangGraph Swarm
⌘I

LangChain Assistant

Ask a question to get started

Enter to send•Shift+Enter new line

Menu

OverviewGraphsFunctional APIPregelCheckpointingStorageCachingTypesRuntimeConfigErrorsConstantsChannelsAgents
LangGraph CLI
LangGraph SDK
LangGraph Supervisor
LangGraph Swarm
Language
Theme
Pythonlanggraphconfigget_stream_writer
Function●Since v0.2

get_stream_writer

Access LangGraph StreamWriter from inside a graph node or entrypoint task at runtime.

Can be called from inside any StateGraph node or functional API task.

Async with Python < 3.11

If you are using Python < 3.11 and are running LangGraph asynchronously, get_stream_writer() won't work since it uses contextvar propagation (only available in Python >= 3.11).

Copy
get_stream_writer() -> StreamWriter

Using with StateGraph:

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START
from langgraph.config import get_stream_writer

class State(TypedDict):
    foo: int

def my_node(state: State):
    my_stream_writer = get_stream_writer()
    my_stream_writer({"custom_data": "Hello!"})
    return {"foo": state["foo"] + 1}

graph = (
    StateGraph(State)
    .add_node(my_node)
    .add_edge(START, "my_node")
    .compile(store=store)
)

for chunk in graph.stream({"foo": 1}, stream_mode="custom"):
    print(chunk)
{"custom_data": "Hello!"}

Using with functional API:

from langgraph.func import entrypoint, task
from langgraph.config import get_stream_writer

@task
def my_task(value: int):
    my_stream_writer = get_stream_writer()
    my_stream_writer({"custom_data": "Hello!"})
    return value + 1

@entrypoint(store=store)
def workflow(value: int):
    return my_task(value).result()

for chunk in workflow.stream(1, stream_mode="custom"):
    print(chunk)
{"custom_data": "Hello!"}

Used in Docs

  • Overview
  • Overview
  • Use the functional API
View source on GitHub