=================
Durable, async batched prompting for LLMs with retries, progress tracking, and resumability via a lightweight SQLite database.
- Async concurrency with backoff retries
- Durable progress stored in SQLite (resume on rerun)
- Pluggable worker: bring your own function or use a Pydantic AI + OpenRouter worker
- Optional structured outputs via Pydantic
response_model - Choose return shape: unique prompts only or expanded to original input length
- Return results in‑memory or export to JSONL
- Python 3.12+
uv add llm-batch-runner git+https://github.com/m-gaster/llm-batch-runner
The simplest way is to rely on environment variables (.env works too):
# .env
MODEL=openai/gpt-4o-mini
OPENROUTER_API_KEY=sk-or-...
Then run a small script:
import asyncio
from llm_batch_runner.main import prompt_map
prompts = [
"Summarize: The quick brown fox jumps over the lazy dog.",
"Give me 3 bullet points on why the sky appears blue.",
"Rewrite this in pirate speak: Hello, friend!",
]
async def main():
results = await prompt_map(prompts, concurrency=16, teardown=True)
for row in results:
print(row)
asyncio.run(main())This will:
- Create (or reuse) a SQLite DB at
.llm_batch_cache/runs.db - Run prompts concurrently with retries
- Print progress and return ordered results
- Remove the progress DB on exit when
teardown=True(and optionally the results DB whenteardown_results=True)
Other ways to provide a worker
- Direct params (OpenRouter):
results = await prompt_map(
prompts,
model_name="openai/gpt-4o-mini",
openrouter_api_key="sk-or-...",
)- Custom async worker:
async def echo_worker(p: str) -> str:
return p.upper()
results = await prompt_map(prompts, worker=echo_worker)You can ask the built-in Pydantic AI worker to return structured data by passing a Pydantic model class as response_model. The result stored in the DB and returned from prompt_map will be a JSON string matching your schema.
from pydantic import BaseModel
class Bullets(BaseModel):
points: list[str]
results = await prompt_map(
prompts,
model_name="openai/gpt-4o-mini",
openrouter_api_key="sk-or-...",
response_model=Bullets,
)
# each row["result"] is a JSON string for BulletsBy default, prompt_map deduplicates identical prompts internally. You can control the returned shape via output_shape:
results_orig = await prompt_map(prompts, output_shape="original") # default
results_unique = await prompt_map(prompts, output_shape="unique")The results DB (*-results.db) mirrors the chosen output_shape for that call. With original, duplicate prompts are written as multiple rows (distinguished by their idx).
Exporting to JSONL If you prefer a file output, you can export after a run:
from llm_batch_runner.utils import export_jsonl, DB_URL_DEFAULT
import asyncio
asyncio.run(export_jsonl(DB_URL_DEFAULT, out="results.jsonl"))concurrency: maximum simultaneous jobs (default 32)rpm_limit: optional client-side rate cap (requests/min). The runner paces request starts at roughly60 / rpm_limitseconds apart to stay under the cap.max_attempts: total attempts per job with exponential backoff (default 8)temperature: sampling temperature forwarded to the underlying model/worker (default 0.0 for deterministic responses)cache_db_url: override progress DB location, e.g.sqlite+aiosqlite:///my_runs.dbprogress_update_every: print frequency for progress updates (default 200)teardown: remove the progress/cache DB on completion (defaultTrue)teardown_results: also remove the separate results DB on completion (defaultFalse)output_shape:"original"(default) returns one row per input in original order;"unique"returns one row per unique prompt (ordered by first occurrence). Missing/failed prompts appear withstatus="missing"andresult=Nonein dict/Polars forms when usingoriginal.return_dtype: one of"list[dict]"(default),"list[str]","list[tuple[str,str]]", or"polars".
To approach a provider limit of ~4,000 requests/minute (~66.7 rps):
-
Choose an appropriate concurrency. A good rule of thumb is
concurrency ≈ rps_target × p95_latency_seconds. For example, with 1.0s p95 latency, start withconcurrency=80–120. -
Enable the client-side limiter to avoid 429s while saturating throughput:
results = await prompt_map(
prompts,
# Ensure your provider/model is set, e.g. via .env:
# MODEL=google/gemini-flash-2.5-latest
concurrency=128, # adjust based on observed latency
rpm_limit=3900, # slight headroom under 4000
teardown=True,
)- Watch the progress logs. The runner prints a rolling RPM estimate alongside
status counts. Increase/decrease
concurrencyto push the rolling RPM close to the target without increasing errors.
Notes:
- Very low latencies require lower
concurrency; higher latencies may require 200+. - If you see many retries due to 429s, reduce
rpm_limitslightly. If you consistently under-target RPM, increaseconcurrencyor raiserpm_limitup to just below the provider cap.
- The library uses SQLAlchemy (async) with a simple
jobstable and storespending|inflight|done|failedstates. - With
output_shape="unique", results are ordered by the first occurrence index of each unique prompt. Withoutput_shape="original", results are one-per-input in original order.