Checkpointer that stores checkpoints in a Postgres database.
PostgresSaver(
self,
conn: _internal.Conn,
pipe: Pipeline | None = None,
serde: SerializerProtocol | | Name | Type |
|---|---|
| conn | _internal.Conn |
| pipe | Pipeline | None |
| serde | SerializerProtocol | None |
Create a new PostgresSaver instance from a connection string.
Set up the checkpoint database asynchronously.
This method creates the necessary tables in the Postgres database if they don't already exist and runs database migrations. It MUST be called directly by the user the first time checkpointer is used.
List checkpoints from the database.
This method retrieves a list of checkpoint tuples from the Postgres database based on the provided config. The checkpoints are ordered by checkpoint ID in descending order (newest first).
Get a checkpoint tuple from the database.
This method retrieves a checkpoint tuple from the Postgres database based on the
provided config. If the config contains a checkpoint_id key, the checkpoint with
the matching thread ID and timestamp is retrieved. Otherwise, the latest checkpoint
for the given thread ID is retrieved.
Save a checkpoint to the database.
This method saves a checkpoint to the Postgres database. The checkpoint is associated with the provided config and its parent config (if any).
Store intermediate writes linked to a checkpoint.
This method saves intermediate writes associated with a checkpoint to the Postgres database.
Delete all checkpoints and writes associated with a thread ID.
Fast-path override of BaseCheckpointSaver.get_delta_channel_history.
Two-stage query, both stages cover ALL requested channels:
Stage 1 (paged): dynamic SELECT over checkpoints with K parallel
JSONB key lookups (one column pair per channel) — no subquery, no
aggregation. Pages newest-first by checkpoint_id with a cursor;
page size is _DELTA_PAGE_SIZE. Stops paging when every channel
has found its seed or the chain is exhausted.
Stage 2 (per-channel UNION ALL): one branch per channel reading
checkpoint_writes filtered to that channel's specific
chain_cids, plus one branch per channel that has a seed reading
checkpoint_blobs for that channel + version. Avoids the
over-fetch of a single channel = ANY(channels) filter when
channels have different chain depths.