DEPLOY_GRAPH_TEMPLATE = '"""Auto-generated deepagents deploy entry point.\n\nCreated by `deepagents deploy`. Do not edit manually — changes will be\noverwritten on the next deploy.\n"""\n\nimport json\nimport logging\nimport os\nfrom pathlib import Path\nfrom typing import TYPE_CHECKING\n\nfrom deepagents import create_deep_agent\nfrom deepagents.backends.composite import CompositeBackend\nfrom deepagents.backends.protocol import EditResult, SandboxBackendProtocol, WriteResult\nfrom deepagents.backends.store import StoreBackend\nfrom langchain.agents.middleware.types import (
\n AgentMiddleware,
\n AgentState,
\n ModelRequest,
\n ModelResponse,
\n PrivateStateAttr,
\n)\nfrom langchain_core.runnables import RunnableConfig\nfrom langgraph.prebuilt import ToolRuntime\n\nif TYPE_CHECKING:\n from langgraph.runtime import Runtime\n from langgraph_sdk.runtime import ServerRuntime\n\nlogger = logging.getLogger(__name__)\n\nSANDBOX_TEMPLATE = {sandbox_template!r}\nSANDBOX_IMAGE = {sandbox_image!r}\n\n# Mount points inside the composite backend.\nMEMORIES_PREFIX = "/memories/"\nSKILLS_PREFIX = "/skills/"\n\n# What to seed into the store on first run.\nSEED_PATH = Path(__file__).parent / "_seed.json"\n\n\nclass SandboxSyncMiddleware(AgentMiddleware):\n """Sync skill files from the store into the sandbox filesystem.\n\n Downloads all files under the configured skill sources from the composite\n backend (which routes /skills/ to the store) and uploads them directly\n into the sandbox so scripts can be executed.\n """\n\n def __init__(self,
*,
backend,
sources):\n self._backend = backend\n self._sources = sources\n self._synced_keys: set = set()\n\n def _get_backend(self,
state,
runtime,
config):\n if callable(self._backend):\n tool_runtime = ToolRuntime(\n state=state,
\n context=runtime.context,
\n stream_writer=runtime.stream_writer,
\n store=runtime.store,
\n config=config,
\n tool_call_id=None,
\n )\n return self._backend(tool_runtime)\n return self._backend\n\n async def _collect_files(self,
backend,
path):\n """Recursively list all files under *path* via ls (not glob)."""\n result = await backend.als(path)\n files = []\n for entry in result.entries or []:\n if entry.get("is_dir"):\n files.extend(await self._collect_files(backend, entry["path"]))\n else:\n files.append(entry["path"])\n return files\n\n async def abefore_agent(self,
state,
runtime,
config):\n backend = self._get_backend(state,
runtime,
config)\n if not isinstance(backend,
CompositeBackend):\n return None\n sandbox = backend.default\n if not isinstance(sandbox,
SandboxBackendProtocol):\n return None\n\n # Only sync once per sandbox instance\n cache_key = id(sandbox)\n if cache_key in self._synced_keys:\n return None\n self._synced_keys.add(cache_key)\n\n files_to_upload = []\n for source in self._sources:\n paths = await self._collect_files(backend,
source)\n if not paths:\n continue\n responses = await backend.adownload_files(paths)\n for resp in responses:\n if resp.content is not None:\n files_to_upload.append((resp.path, resp.content))\n\n if files_to_upload:\n results = await sandbox.aupload_files(files_to_upload)\n uploaded = sum(1 for r in results if r.error is None)\n logger.info(\n "Synced %d/%d skill files into sandbox",
\n uploaded,
\n len(files_to_upload),
\n )\n\n return None\n\n def wrap_model_call(self,
request,
handler):\n return handler(request)\n\n async def awrap_model_call(self,
request,
handler):\n return await handler(request)\n\n\nclass ReadOnlyStoreBackend(StoreBackend):\n """StoreBackend that rejects all writes and edits."""\n\n _READ_ONLY_MSG = (\n "This path is read-only. /memories/ and /skills/ are managed by "\n "the deployment config — they cannot be edited at runtime."\n )\n\n def write(self,
file_path,
content): # noqa: ARG002\n return WriteResult(error=self._READ_ONLY_MSG)\n\n async def awrite(self,
file_path,
content): # noqa: ARG002\n return WriteResult(error=self._READ_ONLY_MSG)\n\n def edit( # noqa: ARG002,
FBT002\n self,
file_path,
old_string,
new_string,
replace_all=False,
\n ):\n return EditResult(error=self._READ_ONLY_MSG)\n\n async def aedit( # noqa: ARG002,
FBT002\n self,
file_path,
old_string,
new_string,
replace_all=False,
\n ):\n return EditResult(error=self._READ_ONLY_MSG)\n\n\n_SEED_CACHE: dict | None = None\n\n\ndef _load_seed() -> dict:\n """Load and cache the bundled seed payload."""\n global _SEED_CACHE\n if _SEED_CACHE is not None:\n return _SEED_CACHE\n if not SEED_PATH.exists():\n _SEED_CACHE = {{"memories": {{}},
"skills": {{}}}}\n return _SEED_CACHE\n try:\n _SEED_CACHE = json.loads(SEED_PATH.read_text(encoding="utf-8"))\n except Exception as exc: # noqa: BLE001\n logger.warning("Failed to parse _seed.json: %s", exc)\n _SEED_CACHE = {{"memories": {{}},
"skills": {{}}}}\n return _SEED_CACHE\n\n\n# Per-(process, assistant_id) gate.\n_SEEDED_ASSISTANTS: set[str] = set()\n\n\nasync def _seed_store_if_needed(store, assistant_id: str) -> None:\n """Seed memories + skills under ``assistant_id`` once per process."""\n if assistant_id in _SEEDED_ASSISTANTS:\n return\n _SEEDED_ASSISTANTS.add(assistant_id)\n\n seed = _load_seed()\n\n memories_ns = (assistant_id, "memories")\n for path, content in seed.get("memories", {{}}).items():\n if await store.aget(memories_ns, path) is None:\n await store.aput(\n memories_ns,\n path,\n {{"content": content,
"encoding": "utf-8"}},\n )\n\n skills_ns = (assistant_id, "skills")\n for path, content in seed.get("skills", {{}}).items():\n if await store.aget(skills_ns, path) is None:\n await store.aput(\n skills_ns,\n path,\n {{"content": content,
"encoding": "utf-8"}},\n )\n\n\n{sandbox_block}\n\n{mcp_tools_block}\n\n\ndef _make_namespace_factory(assistant_id: str, section: str):\n """Return a namespace factory closed over an assistant id + section."""\n def _factory(ctx): # noqa: ARG001\n return (assistant_id, section)\n return _factory\n\n\nSANDBOX_SCOPE = {sandbox_scope!r}\n\n\ndef _build_backend_factory(assistant_id: str):\n """Return a backend factory that builds the composite per invocation."""\n def _factory(ctx): # noqa: ARG001\n from langgraph.config import get_config\n\n if SANDBOX_SCOPE == "assistant":\n cache_key = f"assistant:{{assistant_id}}"\n else:\n thread_id = get_config().get("configurable", {{}}).get("thread_id", "local")\n cache_key = f"thread:{{thread_id}}"\n sandbox_backend = _get_or_create_sandbox(cache_key)\n return CompositeBackend(\n default=sandbox_backend,\n routes={{\n MEMORIES_PREFIX: ReadOnlyStoreBackend(\n namespace=_make_namespace_factory(assistant_id, "memories"),\n ),
\n SKILLS_PREFIX: ReadOnlyStoreBackend(\n namespace=_make_namespace_factory(assistant_id, "skills"),\n ),
\n }},\n )\n return _factory\n\n\nasync def make_graph(config: RunnableConfig, runtime: "ServerRuntime"):\n """Async graph factory.\n\n Accepts the invocation\'s ``RunnableConfig`` so we can pull the\n ``assistant_id`` out of ``configurable`` and scope all store reads\n and writes under it. Seeds the memories + skills namespaces once per\n (process, assistant_id), then assembles the deep agent graph.\n """\n configurable = (config or {{}}).get("configurable", {{}}) or {{}}\n assistant_id = str(configurable.get("assistant_id") or {default_assistant_id!r})\n\n store = getattr(runtime, "store", None)\n if store is not None:\n await _seed_store_if_needed(store, assistant_id)\n\n tools: list = []\n {mcp_tools_load_call}\n\n backend_factory = _build_backend_factory(assistant_id)\n\n return create_deep_agent(\n model={model!r},\n memory=[f"{{MEMORIES_PREFIX}}AGENTS.md"],\n skills=[SKILLS_PREFIX],\n tools=tools,\n backend=backend_factory,\n middleware=[\n SandboxSyncMiddleware(backend=backend_factory,
sources=[SKILLS_PREFIX]),\n ],\n
)\n\n\ngraph = make_graph\n'