Observes ProtocolEvents during a graph run and builds typed derived projections (secondary event logs, promises, etc.).
Data is surfaced to consumers through projections returned from
init(). Projections are merged into GraphRunStream.extensions for
in-process consumers. Use StreamChannel.local for local streaming
values, StreamChannel.remote for values that should also be visible
to remote clients, or Promise<T> for final values.
To make projection data available to remote clients (SDK consumers
over WebSocket / SSE), create a named channel with
StreamChannel.remote(name). The StreamMux detects named
StreamChannel instances in the init() return and auto-forwards every
push() as a ProtocolEvent on the channel's named method. Remote
clients subscribe via session.subscribe("custom:<name>").
finalize and fail are optional. When a transformer uses
StreamChannel, the mux auto-closes/fails the channels on run
completion — no manual lifecycle management needed. Implement
finalize/fail only for non-channel teardown (e.g. resolving a
Promise).
interface StreamTransformerMark the channel as failed after all buffered items are consumed.
Called once when the underlying Pregel run completes without throwing. Optional — only needed for non-channel teardown (e.g. resolving promises).
May return a PromiseLike<void> to defer the main event log close
until the async work (e.g. emitting terminal lifecycle events) has
completed. The mux awaits all returned promises before closing its
event log.
Called once before the run starts.
Optional hook invoked by StreamMux.addTransformer immediately
after the transformer is attached to the mux. Receives a limited
handle that exposes only StreamEmitter.push — enough for
the transformer to emit synthesized ProtocolEvents on any
namespace it chooses (e.g. a deepagents SubagentTransformer
fabricating lifecycle/messages/values events under a
["tools:<tool_call_id>"] namespace when a task tool starts).
Transformers that do not synthesize events can omit this hook.
The StreamEmitter handle is only safe to call from within
StreamTransformer.process. Emitting from an unrelated async
context (e.g. after process has returned, from a setTimeout,
etc.) races with the mux's close/fail cycle and may land events in
an already-closed log.
Called for each ProtocolEvent before it is appended to the main log.