> ## Documentation Index
> Fetch the complete documentation index at: https://phidatainc-redirect-agent-platform-overview.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Batch and durability

> Workflows over a folder, background runs, scheduled jobs with retries.

A single `agent.run(files=[File(...)])` handles one document. Production runs see folders, queues, and nightly drops. Three primitives cover the production shape: concurrent runs for ad-hoc batches, background runs for long jobs, scheduled runs for nightly intake.

## Concurrent batch over a list

The simplest batch is a folder of files. `agent.arun` is async, so a semaphore plus `asyncio.gather` is enough.

```python theme={null}
import asyncio
from pathlib import Path

from agno.agent import Agent
from agno.media import File
from agno.models.openai import OpenAIResponses
from pydantic import BaseModel

from your_schemas import Invoice  # define your output schema once


agent = Agent(
    model=OpenAIResponses(id="gpt-5.5"),
    instructions="Extract invoice fields and line items. Null for missing.",
    output_schema=Invoice,
)


async def extract_one(path: Path, sem: asyncio.Semaphore) -> Invoice:
    async with sem:
        run = await agent.arun(
            "Extract this invoice.",
            files=[File(filepath=str(path))],
        )
        return run.content


async def extract_folder(folder: Path, concurrency: int = 8) -> list[Invoice]:
    sem = asyncio.Semaphore(concurrency)
    paths = sorted(folder.glob("*.pdf"))
    return await asyncio.gather(*(extract_one(p, sem) for p in paths))


invoices = asyncio.run(extract_folder(Path("./incoming-invoices")))
# [Invoice(invoice_number='1042', ...), Invoice(invoice_number='1043', ...), ...]
```

A semaphore is the smallest concurrency control. It keeps you under the provider's rate limit and bounds memory. Tune the `concurrency` to the slowest of: your rate quota, your DB write throughput, your memory budget.

## Background runs for long jobs

Sync runs hold an HTTP connection until the model returns. For multi-page contracts or scanned PDFs that take minutes, start the run in the background and poll.

```python theme={null}
import asyncio

from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.models.openai import OpenAIResponses
from agno.run.base import RunStatus

db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai")
agent = Agent(model=OpenAIResponses(id="gpt-5.5"), db=db, output_schema=Invoice)


async def extract_long(file_url: str) -> Invoice:
    started = await agent.arun(
        "Extract this invoice.",
        files=[File(url=file_url)],
        background=True,
    )
    # started.status is RunStatus.pending; the work continues in the background.

    while True:
        await asyncio.sleep(2)
        run = await agent.aget_run_output(
            run_id=started.run_id,
            session_id=started.session_id,
        )
        if run is None:
            continue
        if run.status == RunStatus.completed:
            return run.content
        if run.status == RunStatus.error:
            raise RuntimeError(f"Run {started.run_id} failed")
```

The background run is persisted in `db`. The agent process can restart and a different process can poll the same `run_id`. That is the durability property: state lives in the database, not in the calling process.

## Scheduled batch with retries

For nightly intake (an SFTP drop, a Drive folder, a queue), put an AgentOS in front of your agent and let the scheduler fire the run on cron.

```python theme={null}
from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.models.openai import OpenAIResponses
from agno.os import AgentOS

db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai")

extractor = Agent(
    id="invoice-extractor",
    model=OpenAIResponses(id="gpt-5.5"),
    db=db,
    output_schema=Invoice,
)

agent_os = AgentOS(
    agents=[extractor],
    db=db,
    scheduler=True,
    scheduler_poll_interval=15,    # check for due jobs every N seconds
)
app = agent_os.get_app()
```

Then create the schedule in Python. `ScheduleManager` writes to the same `db` the AgentOS polls.

```python theme={null}
from agno.scheduler import ScheduleManager

mgr = ScheduleManager(db)

mgr.create(
    name="nightly-invoice-intake",
    cron="0 2 * * *",                 # 2am every day
    endpoint="/agents/invoice-extractor/runs",
    payload={"message": "Process the overnight invoice drop."},
    timezone="America/New_York",
    max_retries=2,
    retry_delay_seconds=300,
    if_exists="update",
)
```

`if_exists="update"` makes the call idempotent. Re-running the bootstrap script does not create duplicates. The scheduler retries on HTTP failure with the configured delay, and every fire writes a row to `agno_schedule_runs` with status and timing.

## Pattern comparison

| Pattern                                       | When to reach for it                     | Process lifetime                   |
| --------------------------------------------- | ---------------------------------------- | ---------------------------------- |
| `asyncio.gather` over `agent.arun`            | One-time backfill, a fixed list of files | One process, end-to-end            |
| `agent.arun(background=True)` + poll          | Single long document, restart-tolerant   | State in `db`, process can restart |
| `AgentOS(scheduler=True)` + `ScheduleManager` | Recurring intake (nightly, hourly)       | Long-running AgentOS process       |
| `Workflow` with `Loop` / `Parallel` steps     | Multi-step pipelines per document        | Either ad-hoc or scheduled         |

The scheduler fires endpoints. Endpoints are agents, teams, or workflows. So a nightly job that ingests a folder, extracts each file, and writes to your warehouse is a workflow exposed at `/workflows/<id>/runs`, scheduled with the same `ScheduleManager.create` call. See [Workflows](/workflows/overview).

## Observability

Every scheduled fire creates a row in `agno_schedule_runs` with the schedule id, attempt number, status, and the `run_id` of the underlying agent run. To see the last day of activity:

```python theme={null}
runs = mgr.get_runs(schedule_id, limit=100)
for r in runs:
    print(r.triggered_at, r.status, r.attempt, r.error or "")
```

Failed attempts keep their error text. Retries are separate rows with the same `schedule_id` and an incrementing `attempt`. That is the audit trail you can hand to ops.

## Production checklist

| Concern                    | What to add                                                                                                                  |
| -------------------------- | ---------------------------------------------------------------------------------------------------------------------------- |
| Idempotency per document   | Pass a deterministic `session_id` (e.g. document hash) so re-runs upsert.                                                    |
| Dead-letter queue          | After `max_retries`, the row stays in `agno_schedule_runs` with `status="failed"`. Read it and route to a manual queue.      |
| Per-provider rate limiting | The `asyncio.Semaphore` is enough for one provider. For mixed providers, run one semaphore per provider.                     |
| Storage of inputs          | `File(url=...)` keeps the URL but not the bytes. If retention matters, store the source PDF before extraction.               |
| Authoritative cost         | `RunMetrics.cost` is populated when the provider returns it. For exact reconciliation, attach a token-rate table downstream. |

## Next steps

| Task                                     | Guide                                                                           |
| ---------------------------------------- | ------------------------------------------------------------------------------- |
| Pause on low-confidence fields           | [Human routing and eval](/use-cases/document-processing/human-routing-and-eval) |
| Compose multiple agents into a pipeline  | [Workflows](/workflows/overview)                                                |
| See the workflow + scheduler integration | [Scheduling](/features/scheduling)                                              |

## Developer Resources

* [Scheduler cookbook](https://github.com/agno-agi/agno/tree/main/cookbook/05_agent_os/scheduler)
* [Background execution cookbook](https://github.com/agno-agi/agno/tree/main/cookbook/02_agents/14_advanced/background_execution.py)
* [Workflows cookbook](https://github.com/agno-agi/agno/tree/main/cookbook/04_workflows)
