AsyncGraphRunStream(
self,
graph_aiter: AsyncIterator[Any] | None,
mux: StreamMux,
values_transformer| Name | Type | Description |
|---|---|---|
graph_aiter* | AsyncIterator[Any] | None | Async iterator over the graph's stream, or
|
mux* | StreamMux | The StreamMux owning projections and the main log. |
values_transformer* | ValuesTransformer | The built-in values transformer
providing |
wire_pump | bool | Default: True |
| Name | Type |
|---|---|
| graph_aiter | AsyncIterator[Any] | None |
| mux | StreamMux |
| values_transformer | ValuesTransformer |
| wire_pump | bool |
Async run stream with caller-driven pumping.
Async iteration on any projection drives the graph forward ā there
is no background task. Concurrent consumers share a single-flight
pump via an asyncio.Lock, so each awaiting cursor contributes one
event per acquisition. Backpressure comes from the logs: when a
subscribed log's buffer reaches maxlen, apush awaits the
subscriber to drain, which holds back the pump and paces the graph.
Projections are single-consumer ā a second aiter(run.values)
raises. Use projection.tee(n) for fan-out.
Use as an async context manager to guarantee clean shutdown on early exit:
async with await handler.astream(input) as run:
async for msg in run.messages:
...When True (default), bind _apump_next as the
mux's async pump callable. Subclasses that inherit a
parent pump via StreamMux._make_child should pass
False to preserve the parent binding.