Raised when a subgraph is interrupted, suppressed by the root graph. Never raised directly, or surfaced to the user.
A protocol event emitted by the streaming infrastructure.
Wraps a raw stream part (values, messages, custom, etc.) in a uniform
envelope with a monotonic sequence number assigned by the root StreamMux.
Consumers that need a total order across root events should use seq, not
params.timestamp (which is wall-clock and not monotonic).
Extension point for custom stream projections.
Transformers observe protocol events flowing through the StreamMux and build typed derived projections (StreamChannels, promises, etc.).
Set _native = True on a transformer to have its projection keys
exposed as direct attributes on the run stream (in addition to
appearing in run.extensions).
Subclasses must implement init and override at least one of
process / aprocess. The finalize / afinalize and fail /
afail hooks are optional — the default implementations are no-ops.
StreamChannel instances in the projection dict are auto-closed /
auto-failed by the mux, so most transformers don't need finalize
or fail at all.
Transformers that need async work pick the async lane by:
aprocess (and optionally afinalize / afail), orself.schedule(coro) from inside a sync process, orrequires_async = True explicitly.The mux detects these cases at registration and raises if they're
used under sync stream() — they only work under astream().
Use aprocess when the pump must wait for async work before the
next transformer sees the event (e.g. PII redaction that mutates
event in place). Use schedule() for decoupled async work whose
result lands on an independent projection (e.g. async moderation
scoring, cost lookup, external tracing).
Async handle for a discovered subgraph (extends AsyncGraphRunStream).
Sync handle for a discovered subgraph (extends GraphRunStream).
Single-consumer drainable queue for streaming events, with optional protocol auto-forwarding.
When constructed with a name, the StreamMux auto-wires every
push() to also inject a ProtocolEvent into the main event stream
using the channel's name as the method. When constructed without a
name, the channel is local-only — items are only visible to
in-process consumers that iterate the channel directly.
Items are popped off the front as the consumer advances — there is
no retention beyond what's currently queued. A channel accepts
exactly one subscriber; a second __iter__ / __aiter__ call
raises. Use tee(n) / atee(n) for fan-out.
Starts unbound — neither __iter__ nor __aiter__ is available
until the StreamMux calls _bind(is_async). After binding, only
the matching iteration protocol works; the other raises TypeError.
Pump wiring (set by the run stream, not by _bind):
- _request_more: sync pump callable, returns True if a new
event was produced.
- _arequest_more: async pump coroutine factory, same contract.
Memory is bounded by caller pace: both sync and async use caller- driven pumps, so each cursor advance produces at most one event.
Lazy-subscribe: push appends to the local buffer only when a
subscriber has registered. Auto-forward via _wire_fn always fires
regardless of subscription state.
Lifecycle (close / fail) is managed by the mux — transformers
don't need to close their channels manually.
Central event dispatcher for the streaming infrastructure.
Owns the main event log and routes events through a transformer
pipeline. StreamChannels with a name discovered in transformer
projections are auto-wired so that every push() also injects a
ProtocolEvent into the main log. StreamChannels without a name
are local-only.
Pass is_async=True when the mux will be consumed via async
iteration (handler.astream()). All StreamChannel instances
discovered during registration are automatically bound to the
matching mode.
Capture values events as a drainable stream of state snapshots.
Keeps _latest / _interrupted / _interrupts as scalar state
regardless of whether the log has a subscriber — so run.output()
and run.interrupted work without forcing the caller to iterate
run.values. Log pushes are silent no-ops when unsubscribed.
Native transformer — projection keys are exposed as direct
attributes on the run stream (e.g. run.values).
Only values events at the run's own level are captured; snapshots
from deeper subgraphs are left in the main event log but excluded
from the projection. "Own level" is defined by scope, which
stream_v2 / astream_v2 populate from the caller's checkpoint
namespace so that a nested stream_v2 call still sees its own
root snapshots.
Capture messages events as ChatModelStream objects.
The messages projection yields one ChatModelStream (or
AsyncChatModelStream) per LLM call. Consumers iterate
run.messages to get stream handles, then use each handle's typed
projections (.text, .reasoning, .tool_calls, .usage,
.output) for per-message content.
Two input shapes are handled (via params["data"] = (payload, metadata) from StreamMessagesHandler):
"event" key) — emitted by
stream_v2() / astream_v2() via the on_stream_event
callback. Routed to an existing ChatModelStream by
metadata["run_id"]. A message-start event creates a new
stream; message-finish closes it.AIMessage — emitted from on_chain_end when a node
returns a finalized message. Replayed as a synthetic protocol
event lifecycle via message_to_events, then the
already-complete stream is pushed to the log.V1 AIMessageChunk tuples (from on_llm_new_token) are not
streamed into this projection: chat models that want to populate
run.messages with content-block streaming must use
stream_v2() / astream_v2(). Models called via the legacy
stream() method still surface their final AIMessage via
on_chain_end when a node returns it as state.
Only events at the run's own level are projected; tokens from
deeper subgraphs are left in the main event log but excluded from
.messages. "Own level" is defined by scope, which
stream_v2 / astream_v2 populate from the caller's checkpoint
namespace so that a stream_v2 call inside a node still sees its
own root chat model streams on .messages. Consumers that need
subgraph tokens should iterate the raw event stream or register a
custom transformer.
Native transformer — the messages projection is exposed as a
direct attribute on the run stream.
Payload of a lifecycle event surfaced on the lifecycle channel.
Auto-forwarded as lifecycle protocol events (no custom: prefix
because LifecycleTransformer is a native transformer) so remote
SDK clients receive the same data in-process consumers see via
run.lifecycle.
Surface subgraph lifecycle as lifecycle protocol events.
Pushes LifecyclePayload to a StreamChannel named lifecycle.
The channel is auto-forwarded by the mux so payloads land in the
main event log under method = "lifecycle" (native transformer —
no custom: prefix) — visible to remote SDK clients over the
wire and to in-process consumers via run.lifecycle.
Tracks subgraphs at every depth strictly below the transformer's scope, so a graph → subgraph → subgraph chain produces lifecycle events for both nested levels in a flat stream.
Native transformer — projection key lifecycle is exposed as
run.lifecycle.
Discover subgraph invocations as in-process navigation handles.
Per discovered direct-child subgraph, builds a SubgraphRunStream
(or AsyncSubgraphRunStream) wrapping a child mini-mux scoped to
the subgraph's namespace. Consumers iterate run.subgraphs to
receive handles, then drill into handle.values / handle.messages
/ handle.subgraphs (recursive grandchildren) / handle.lifecycle.
Each mini-mux owns its own scope and uses its own
SubgraphTransformer to discover its direct children, so
grandchildren live on the child handle — never on the root's
subgraphs log. Forwarding events into the matching child mini-mux
is what keeps the child's projections populated.
Native transformer — subgraphs is exposed as run.subgraphs.