High-level Runnable for the structured-batch enrichment pattern.
Wraps :class:ParallelTaskGroup with a default_task_spec so callers
can pass a list of records (pydantic instances or dicts) and get back a
list of enriched output dicts. Mirrors the canonical enrichment pattern
documented at
https://docs.parallel.ai/task-api/examples/task-enrichment.
Defaults to the core processor (the docs' recommendation for
enrichment workflows).
Example:
from pydantic import BaseModel, Field
from langchain_parallel import ParallelEnrichment
class CompanyInput(BaseModel):
company: str = Field(description="Company name to enrich")
class CompanyOutput(BaseModel):
headquarters: str
founding_year: int = Field(description="Year the company was founded")
enricher = ParallelEnrichment(
input_schema=CompanyInput,
output_schema=CompanyOutput,
# Defaults to processor="core-fast"; pass "core" or "pro" for
# higher accuracy when latency is less of a concern.
)
results = enricher.invoke([
{"company": "Anthropic"},
{"company": "OpenAI"},
{"company": "Google DeepMind"},
])
for inp, out in zip(["Anthropic", "OpenAI", "Google DeepMind"], results):
content = out["output"]["content"]
print(inp, "->", content)