Asynchronous Postgres-backed store with optional vector search using pgvector.
Basic setup and usage:
from langgraph.store.postgres import AsyncPostgresStore
conn_string = "postgresql://user:pass@localhost:5432/dbname"
async with AsyncPostgresStore.from_conn_string(conn_string) as store:
await store.setup() # Run migrations. Done once
# Store and retrieve data
await store.aput(("users", "123"), "prefs", {"theme": "dark"})
item = await store.aget(("users", "123"), "prefs")
Vector search using LangChain embeddings:
from langchain.embeddings import init_embeddings
from langgraph.store.postgres import AsyncPostgresStore
conn_string = "postgresql://user:pass@localhost:5432/dbname"
async with AsyncPostgresStore.from_conn_string(
conn_string,
index={
"dims": 1536,
"embed": init_embeddings("openai:text-embedding-3-small"),
"fields": ["text"] # specify which fields to embed. Default is the whole serialized value
}
) as store:
await store.setup() # Run migrations. Done once
# Store documents
await store.aput(("docs",), "doc1", {"text": "Python tutorial"})
await store.aput(("docs",), "doc2", {"text": "TypeScript guide"})
await store.aput(("docs",), "doc3", {"text": "Other guide"}, index=False) # don't index
# Search by similarity
results = await store.asearch(("docs",), query="programming guides", limit=2)
Using connection pooling for better performance:
from langgraph.store.postgres import AsyncPostgresStore, PoolConfig
conn_string = "postgresql://user:pass@localhost:5432/dbname"
async with AsyncPostgresStore.from_conn_string(
conn_string,
pool_config=PoolConfig(
min_size=5,
max_size=20
)
) as store:
await store.setup() # Run migrations. Done once
# Use store with connection pooling...AsyncPostgresStore(
self,
conn: _ainternal.Conn,
*,
pipe: AsyncPipeline | None = None,
deserializer: Callable[[bytes | orjson.Fragment], dict[str, Any]] | None = None,
index: PostgresIndexConfig | None = None,
ttl: TTLConfig | None = None
)AsyncBatchedBaseStoreBasePostgresStore[_ainternal.Conn]Warning:
Make sure to:
setup() before first use to create necessary tables and indexesNote:
Semantic search is disabled by default. You can enable it by providing an index configuration
when creating the store. Without this configuration, all index arguments passed to
put or aput will have no effect.
Note:
If you provide a TTL configuration, you must explicitly call start_ttl_sweeper() to begin
the background task that removes expired items. Call stop_ttl_sweeper() to properly
clean up resources when you're done with the store.
Create a new AsyncPostgresStore instance from a connection string.
Set up the store database asynchronously.
This method creates the necessary tables in the Postgres database if they don't already exist and runs database migrations. It MUST be called directly by the user the first time the store is used.
Delete expired store items based on TTL.
Periodically delete expired store items based on TTL.
Stop the TTL sweeper task if it's running.