diff --git a/deploy/docker/output_control.py b/deploy/docker/output_control.py new file mode 100644 index 000000000..020fa4164 --- /dev/null +++ b/deploy/docker/output_control.py @@ -0,0 +1,326 @@ +""" +Output Control Module - Pagination and field control for API responses. + +This module provides the `apply_output_control()` function that applies pagination, +limiting, and field exclusion to CrawlResult dictionaries before returning them +to clients. + +The function is designed to be: +- Non-destructive: Makes copies, doesn't modify the original data +- Backward compatible: Returns unchanged data when control is None +- Comprehensive: Handles all field types (text, collections, nested) +- Informative: Returns metadata about what was truncated +""" + +import copy +from typing import Any, Dict, List, Optional, Tuple + +from schemas import ( + CollectionStats, + ContentFieldStats, + OutputControl, + OutputMeta, +) + + +def _get_nested_value(data: Dict[str, Any], path: str) -> Optional[Any]: + """Get a value from a nested dict using dot notation. + + Args: + data: The dictionary to traverse + path: Dot-separated path (e.g., 'markdown.raw_markdown') + + Returns: + The value at the path, or None if not found + """ + parts = path.split(".") + current = data + for part in parts: + if not isinstance(current, dict) or part not in current: + return None + current = current[part] + return current + + +def _set_nested_value(data: Dict[str, Any], path: str, value: Any) -> None: + """Set a value in a nested dict using dot notation. + + Args: + data: The dictionary to modify + path: Dot-separated path (e.g., 'markdown.raw_markdown') + value: The value to set + """ + parts = path.split(".") + current = data + for part in parts[:-1]: + if part not in current or not isinstance(current[part], dict): + current[part] = {} + current = current[part] + current[parts[-1]] = value + + +def _delete_nested_value(data: Dict[str, Any], path: str) -> bool: + """Delete a value from a nested dict using dot notation. + + Args: + data: The dictionary to modify + path: Dot-separated path (e.g., 'markdown.raw_markdown') + + Returns: + True if the value was deleted, False if not found + """ + parts = path.split(".") + current = data + for part in parts[:-1]: + if not isinstance(current, dict) or part not in current: + return False + current = current[part] + + if isinstance(current, dict) and parts[-1] in current: + del current[parts[-1]] + return True + return False + + +def apply_output_control( + data: Dict[str, Any], control: Optional[OutputControl] +) -> Tuple[Dict[str, Any], Optional[OutputMeta]]: + """ + Apply pagination, limiting, and field exclusion to a CrawlResult dict. + + This function is the core of the output control system. It processes a + CrawlResult dictionary and applies the requested pagination, collection + limiting, and field exclusion operations. + + Args: + data: The result of CrawlResult.model_dump() - a dictionary representation + of the crawl result. This should contain fields like: + - html: str + - cleaned_html: str + - markdown: dict with raw_markdown, fit_markdown, etc. + - links: dict with internal, external lists + - media: dict with images, videos, audios lists + - tables: list + - etc. + control: Output control configuration. If None, returns the original + data unchanged with no metadata. + + Returns: + Tuple of: + - Modified data dictionary (deep copy if any modifications were made) + - OutputMeta if any truncation occurred, None otherwise + + Notes: + - The function makes a deep copy of the data before modifications + - Metadata is added to the result under the '_output_meta' key + - All parameters in OutputControl are optional; only specified + operations are applied + - The function handles missing fields gracefully + + Example: + >>> control = OutputControl(content_limit=5000, max_links=10) + >>> result, meta = apply_output_control(crawl_result, control) + >>> if meta and meta.truncated: + ... print(f"Content was truncated: {meta.content_stats}") + """ + # Fast path: no control specified means no changes + if control is None: + return data, None + + # Check if any control options are actually set + has_pagination = ( + control.content_offset is not None or control.content_limit is not None + ) + has_collection_limits = ( + control.max_links is not None + or control.max_media is not None + or control.max_tables is not None + ) + has_exclusions = ( + control.exclude_fields is not None and len(control.exclude_fields) > 0 + ) + + # If no options are set, return unchanged + if not (has_pagination or has_collection_limits or has_exclusions): + return data, None + + # Make a deep copy to avoid modifying the original + data = copy.deepcopy(data) + + # Initialize metadata + meta = OutputMeta(truncated=False) + + # 1. Apply field exclusion first (before other processing) + if has_exclusions: + excluded = [] + for field in control.exclude_fields: + if "." in field: + # Nested field exclusion (e.g., 'markdown.references_markdown') + if _delete_nested_value(data, field): + excluded.append(field) + elif field in data: + # Top-level field exclusion + del data[field] + excluded.append(field) + + if excluded: + meta.excluded_fields = excluded + meta.truncated = True + + # 2. Apply content pagination to text fields + if has_pagination: + content_stats: Dict[str, ContentFieldStats] = {} + + # Define text fields to paginate + # Top-level text fields + top_level_text_fields = [ + "html", + "cleaned_html", + "fit_html", + "extracted_content", + ] + + # Nested markdown fields + markdown_fields = [ + "markdown.raw_markdown", + "markdown.fit_markdown", + "markdown.markdown_with_citations", + "markdown.references_markdown", + ] + + offset = control.content_offset or 0 + limit = control.content_limit + + # Process top-level text fields + for field in top_level_text_fields: + if field in data and isinstance(data[field], str): + content = data[field] + total = len(content) + + if limit is not None or offset > 0: + sliced = ( + content[offset : offset + limit] if limit else content[offset:] + ) + returned = len(sliced) + data[field] = sliced + + # Record stats if truncation occurred + if returned < total or offset > 0: + content_stats[field] = ContentFieldStats( + total_chars=total, + returned_chars=returned, + offset=offset, + has_more=(offset + returned) < total, + ) + + # Process nested markdown fields + for field_path in markdown_fields: + content = _get_nested_value(data, field_path) + if content is not None and isinstance(content, str): + total = len(content) + + if limit is not None or offset > 0: + sliced = ( + content[offset : offset + limit] if limit else content[offset:] + ) + returned = len(sliced) + _set_nested_value(data, field_path, sliced) + + # Record stats if truncation occurred + if returned < total or offset > 0: + content_stats[field_path] = ContentFieldStats( + total_chars=total, + returned_chars=returned, + offset=offset, + has_more=(offset + returned) < total, + ) + + if content_stats: + meta.content_stats = content_stats + meta.truncated = True + + # 3. Apply collection limiting + collection_stats: Dict[str, CollectionStats] = {} + + # Limit links (applies to both internal and external) + if ( + control.max_links is not None + and "links" in data + and isinstance(data["links"], dict) + ): + for link_type in ["internal", "external"]: + if link_type in data["links"] and isinstance( + data["links"][link_type], list + ): + items = data["links"][link_type] + total = len(items) + if total > control.max_links: + data["links"][link_type] = items[: control.max_links] + collection_stats[f"links.{link_type}"] = CollectionStats( + total_count=total, returned_count=control.max_links + ) + + # Limit media (applies to images, videos, audios) + if ( + control.max_media is not None + and "media" in data + and isinstance(data["media"], dict) + ): + for media_type in ["images", "videos", "audios"]: + if media_type in data["media"] and isinstance( + data["media"][media_type], list + ): + items = data["media"][media_type] + total = len(items) + if total > control.max_media: + data["media"][media_type] = items[: control.max_media] + collection_stats[f"media.{media_type}"] = CollectionStats( + total_count=total, returned_count=control.max_media + ) + + # Limit tables + if ( + control.max_tables is not None + and "tables" in data + and isinstance(data["tables"], list) + ): + items = data["tables"] + total = len(items) + if total > control.max_tables: + data["tables"] = items[: control.max_tables] + collection_stats["tables"] = CollectionStats( + total_count=total, returned_count=control.max_tables + ) + + if collection_stats: + meta.collection_stats = collection_stats + meta.truncated = True + + # Only add metadata to response if something was actually modified + if meta.truncated: + data["_output_meta"] = meta.model_dump(exclude_none=True) + return data, meta + + return data, None + + +def apply_output_control_to_batch( + results: List[Dict[str, Any]], control: Optional[OutputControl] +) -> List[Dict[str, Any]]: + """ + Apply output control to a batch of results. + + Convenience function for applying output control to multiple CrawlResult + dictionaries, such as when processing batch crawl results. + + Args: + results: List of CrawlResult dictionaries + control: Output control configuration (applied to each result) + + Returns: + List of modified result dictionaries + """ + if control is None: + return results + + return [apply_output_control(result, control)[0] for result in results] diff --git a/deploy/docker/schemas.py b/deploy/docker/schemas.py index 21d47fc44..cefc0e051 100644 --- a/deploy/docker/schemas.py +++ b/deploy/docker/schemas.py @@ -4,25 +4,132 @@ from utils import FilterType +# ============================================================================= +# Output Control Schemas - Pagination and field control for API responses +# ============================================================================= + + +class OutputControl(BaseModel): + """Controls pagination and field inclusion in responses. + + All parameters are optional. When not specified, full output is returned + (backward compatible default behavior). + """ + + # Content pagination (applies to: markdown, html, cleaned_html, extracted_content) + content_offset: Optional[int] = Field( + None, + ge=0, + description="Character offset for text content fields (markdown, html, cleaned_html, extracted_content)", + ) + content_limit: Optional[int] = Field( + None, ge=1, description="Maximum characters to return for text content fields" + ) + + # Collection limiting (applies to: links, media, tables) + max_links: Optional[int] = Field( + None, + ge=0, + description="Maximum number of links to include (applies to both internal and external)", + ) + max_media: Optional[int] = Field( + None, + ge=0, + description="Maximum number of media items to include per category (images, videos, audios)", + ) + max_tables: Optional[int] = Field( + None, ge=0, description="Maximum number of tables to include" + ) + + # Field exclusion + exclude_fields: Optional[List[str]] = Field( + None, + description=( + "Fields to completely omit from response. Valid values: " + "html, cleaned_html, fit_html, markdown, links, media, tables, metadata, " + "response_headers, network_requests, console_messages, screenshot, pdf. " + "Supports dot-notation for nested exclusion (e.g., 'markdown.references_markdown')" + ), + ) + + +class ContentFieldStats(BaseModel): + """Statistics for a paginated text field.""" + + total_chars: int = Field( + ..., description="Total character count of the original content" + ) + returned_chars: int = Field( + ..., description="Number of characters returned after pagination" + ) + offset: int = Field(..., description="Character offset applied") + has_more: bool = Field( + ..., description="True if more content exists beyond the returned portion" + ) + + +class CollectionStats(BaseModel): + """Statistics for a limited collection field.""" + + total_count: int = Field( + ..., description="Total item count in the original collection" + ) + returned_count: int = Field( + ..., description="Number of items returned after limiting" + ) + + +class OutputMeta(BaseModel): + """Metadata about output pagination and truncation. + + Included in responses (as '_output_meta') when any pagination or limiting is applied. + """ + + truncated: bool = Field( + ..., description="True if any content was truncated or limited" + ) + + # Per-field truncation info (only present for affected fields) + content_stats: Optional[Dict[str, ContentFieldStats]] = Field( + None, + description="Statistics for text content fields that were paginated (e.g., 'markdown.raw_markdown')", + ) + collection_stats: Optional[Dict[str, CollectionStats]] = Field( + None, + description="Statistics for collection fields that were limited (e.g., 'links.internal')", + ) + excluded_fields: Optional[List[str]] = Field( + None, description="List of fields that were excluded from response" + ) + + +# ============================================================================= +# Request Schemas +# ============================================================================= + + class CrawlRequest(BaseModel): urls: List[str] = Field(min_length=1, max_length=100) browser_config: Optional[Dict] = Field(default_factory=dict) crawler_config: Optional[Dict] = Field(default_factory=dict) + output: Optional[OutputControl] = Field( + None, description="Output control for pagination and field exclusion" + ) class HookConfig(BaseModel): """Configuration for user-provided hooks""" + code: Dict[str, str] = Field( - default_factory=dict, - description="Map of hook points to Python code strings" + default_factory=dict, description="Map of hook points to Python code strings" ) timeout: int = Field( default=30, ge=1, le=120, - description="Timeout in seconds for each hook execution" + description="Timeout in seconds for each hook execution", ) - + class Config: schema_extra = { "example": { @@ -39,57 +146,83 @@ async def hook(page, context, **kwargs): await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await page.wait_for_timeout(2000) return page -""" +""", }, - "timeout": 30 + "timeout": 30, } } class CrawlRequestWithHooks(CrawlRequest): """Extended crawl request with hooks support""" + hooks: Optional[HookConfig] = Field( - default=None, - description="Optional user-provided hook functions" + default=None, description="Optional user-provided hook functions" ) + class MarkdownRequest(BaseModel): """Request body for the /md endpoint.""" - url: str = Field(..., description="Absolute http/https URL to fetch") - f: FilterType = Field(FilterType.FIT, description="Content‑filter strategy: fit, raw, bm25, or llm") - q: Optional[str] = Field(None, description="Query string used by BM25/LLM filters") - c: Optional[str] = Field("0", description="Cache‑bust / revision counter") - provider: Optional[str] = Field(None, description="LLM provider override (e.g., 'anthropic/claude-3-opus')") - temperature: Optional[float] = Field(None, description="LLM temperature override (0.0-2.0)") + + url: str = Field(..., description="Absolute http/https URL to fetch") + f: FilterType = Field( + FilterType.FIT, description="Content‑filter strategy: fit, raw, bm25, or llm" + ) + q: Optional[str] = Field(None, description="Query string used by BM25/LLM filters") + c: Optional[str] = Field("0", description="Cache‑bust / revision counter") + provider: Optional[str] = Field( + None, description="LLM provider override (e.g., 'anthropic/claude-3-opus')" + ) + temperature: Optional[float] = Field( + None, description="LLM temperature override (0.0-2.0)" + ) base_url: Optional[str] = Field(None, description="LLM API base URL override") + output: Optional[OutputControl] = Field( + None, description="Output control for pagination and field exclusion" + ) class RawCode(BaseModel): code: str + class HTMLRequest(BaseModel): url: str - + output: Optional[OutputControl] = Field( + None, description="Output control for pagination and field exclusion" + ) + + class ScreenshotRequest(BaseModel): url: str screenshot_wait_for: Optional[float] = 2 output_path: Optional[str] = None + output: Optional[OutputControl] = Field( + None, description="Output control for pagination and field exclusion" + ) + class PDFRequest(BaseModel): url: str output_path: Optional[str] = None + output: Optional[OutputControl] = Field( + None, description="Output control for pagination and field exclusion" + ) class JSEndpointRequest(BaseModel): url: str scripts: List[str] = Field( - ..., - description="List of separated JavaScript snippets to execute" + ..., description="List of separated JavaScript snippets to execute" + ) + output: Optional[OutputControl] = Field( + None, description="Output control for pagination and field exclusion" ) class WebhookConfig(BaseModel): """Configuration for webhook notifications.""" + webhook_url: HttpUrl webhook_data_in_payload: bool = False webhook_headers: Optional[Dict[str, str]] = None @@ -97,10 +230,11 @@ class WebhookConfig(BaseModel): class WebhookPayload(BaseModel): """Payload sent to webhook endpoints.""" + task_id: str task_type: str # "crawl", "llm_extraction", etc. status: str # "completed" or "failed" timestamp: str # ISO 8601 format urls: List[str] error: Optional[str] = None - data: Optional[Dict] = None # Included only if webhook_data_in_payload=True \ No newline at end of file + data: Optional[Dict] = None # Included only if webhook_data_in_payload=True diff --git a/deploy/docker/server.py b/deploy/docker/server.py index 62e4e4413..6aa1a4aec 100644 --- a/deploy/docker/server.py +++ b/deploy/docker/server.py @@ -19,9 +19,11 @@ import logging from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig from api import ( - handle_markdown_request, handle_llm_qa, - handle_stream_crawl_request, handle_crawl_request, - stream_results + handle_markdown_request, + handle_llm_qa, + handle_stream_crawl_request, + handle_crawl_request, + stream_results, ) from schemas import ( CrawlRequestWithHooks, @@ -32,10 +34,9 @@ PDFRequest, JSEndpointRequest, ) +from output_control import apply_output_control, apply_output_control_to_batch -from utils import ( - FilterType, load_config, setup_logging, verify_email_domain -) +from utils import FilterType, load_config, setup_logging, verify_email_domain import os import sys import time @@ -44,12 +45,13 @@ from contextlib import asynccontextmanager import pathlib -from fastapi import ( - FastAPI, HTTPException, Request, Path, Query, Depends -) +from fastapi import FastAPI, HTTPException, Request, Path, Query, Depends from rank_bm25 import BM25Okapi from fastapi.responses import ( - StreamingResponse, RedirectResponse, PlainTextResponse, JSONResponse + StreamingResponse, + RedirectResponse, + PlainTextResponse, + JSONResponse, ) from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware @@ -79,6 +81,7 @@ MAX_PAGES = config["crawler"]["pool"].get("max_pages", 30) GLOBAL_SEM = asyncio.Semaphore(MAX_PAGES) + # ── default browser config helper ───────────────────────────── def get_default_browser_config() -> BrowserConfig: """Get default BrowserConfig from config.yml.""" @@ -87,6 +90,7 @@ def get_default_browser_config() -> BrowserConfig: **config["crawler"]["browser"].get("kwargs", {}), ) + # import logging # page_log = logging.getLogger("page_cap") # orig_arun = AsyncWebCrawler.arun @@ -105,6 +109,8 @@ def get_default_browser_config() -> BrowserConfig: async def capped_arun(self, *a, **kw): async with GLOBAL_SEM: return await orig_arun(self, *a, **kw) + + AsyncWebCrawler.arun = capped_arun # ───────────────────── FastAPI lifespan ────────────────────── @@ -122,10 +128,12 @@ async def lifespan(_: FastAPI): monitor_module.monitor_stats.start_persistence_worker() # Initialize browser pool - await init_permanent(BrowserConfig( - extra_args=config["crawler"]["browser"].get("extra_args", []), - **config["crawler"]["browser"].get("kwargs", {}), - )) + await init_permanent( + BrowserConfig( + extra_args=config["crawler"]["browser"].get("extra_args", []), + **config["crawler"]["browser"].get("kwargs", {}), + ) + ) # Start background tasks app.state.janitor = asyncio.create_task(janitor()) @@ -139,6 +147,7 @@ async def lifespan(_: FastAPI): # Monitor cleanup (persist stats and stop workers) from monitor import get_monitor + try: await get_monitor().cleanup() except Exception as e: @@ -146,9 +155,11 @@ async def lifespan(_: FastAPI): await close_all() + async def _timeline_updater(): """Update timeline data every 5 seconds.""" from monitor import get_monitor + while True: await asyncio.sleep(5) try: @@ -158,6 +169,7 @@ async def _timeline_updater(): except Exception as e: logger.warning(f"Timeline update error: {e}") + # ───────────────────── FastAPI instance ────────────────────── app = FastAPI( title=config["app"]["title"], @@ -199,6 +211,7 @@ async def _timeline_updater(): async def root(): return RedirectResponse("/playground") + # ─────────────────── infra / middleware ───────────────────── redis = aioredis.from_url(config["redis"].get("uri", "redis://localhost")) @@ -216,9 +229,7 @@ def _setup_security(app_: FastAPI): if sec.get("https_redirect"): app_.add_middleware(HTTPSRedirectMiddleware) if sec.get("trusted_hosts", []) != ["*"]: - app_.add_middleware( - TrustedHostMiddleware, allowed_hosts=sec["trusted_hosts"] - ) + app_.add_middleware(TrustedHostMiddleware, allowed_hosts=sec["trusted_hosts"]) _setup_security(app) @@ -236,6 +247,7 @@ async def add_security_headers(request: Request, call_next): resp.headers.update(config["security"]["headers"]) return resp + # ───────────────── safe config‑dump helper ───────────────── ALLOWED_TYPES = { "CrawlerRunConfig": CrawlerRunConfig, @@ -257,9 +269,11 @@ def _safe_eval_config(expr: str) -> dict: raise ValueError("Expression must be a single constructor call") call = tree.body - if not (isinstance(call.func, ast.Name) and call.func.id in {"CrawlerRunConfig", "BrowserConfig"}): - raise ValueError( - "Only CrawlerRunConfig(...) or BrowserConfig(...) are allowed") + if not ( + isinstance(call.func, ast.Name) + and call.func.id in {"CrawlerRunConfig", "BrowserConfig"} + ): + raise ValueError("Only CrawlerRunConfig(...) or BrowserConfig(...) are allowed") # forbid nested calls to keep the surface tiny for node in ast.walk(call): @@ -267,10 +281,10 @@ def _safe_eval_config(expr: str) -> dict: raise ValueError("Nested function calls are not permitted") # expose everything that crawl4ai exports, nothing else - safe_env = {name: getattr(_c4, name) - for name in dir(_c4) if not name.startswith("_")} - obj = eval(compile(tree, "", "eval"), - {"__builtins__": {}}, safe_env) + safe_env = { + name: getattr(_c4, name) for name in dir(_c4) if not name.startswith("_") + } + obj = eval(compile(tree, "", "eval"), {"__builtins__": {}}, safe_env) return obj.dump() @@ -279,10 +293,12 @@ def _safe_eval_config(expr: str) -> dict: # ── monitor router ────────────────────────────────────────── from monitor_routes import router as monitor_router + app.include_router(monitor_router) logger = logging.getLogger(__name__) + # ──────────────────────── Endpoints ────────────────────────── @app.post("/token") async def get_token(req: TokenRequest): @@ -308,21 +324,37 @@ async def get_markdown( body: MarkdownRequest, _td: Dict = Depends(token_dep), ): - if not body.url.startswith(("http://", "https://")) and not body.url.startswith(("raw:", "raw://")): + if not body.url.startswith(("http://", "https://")) and not body.url.startswith( + ("raw:", "raw://") + ): raise HTTPException( - 400, "Invalid URL format. Must start with http://, https://, or for raw HTML (raw:, raw://)") + 400, + "Invalid URL format. Must start with http://, https://, or for raw HTML (raw:, raw://)", + ) markdown = await handle_markdown_request( - body.url, body.f, body.q, body.c, config, body.provider, - body.temperature, body.base_url + body.url, + body.f, + body.q, + body.c, + config, + body.provider, + body.temperature, + body.base_url, ) - return JSONResponse({ + response_data = { "url": body.url, "filter": body.f, "query": body.q, "cache": body.c, "markdown": markdown, - "success": True - }) + "success": True, + } + + # Apply output control if specified + if body.output: + response_data, _ = apply_output_control(response_data, body.output) + + return JSONResponse(response_data) @app.post("/html") @@ -338,6 +370,7 @@ async def generate_html( Use when you need sanitized HTML structures for building schemas or further processing. """ from crawler_pool import get_crawler + cfg = CrawlerRunConfig() try: crawler = await get_crawler(get_default_browser_config()) @@ -347,11 +380,19 @@ async def generate_html( raw_html = results[0].html from crawl4ai.utils import preprocess_html_for_schema + processed_html = preprocess_html_for_schema(raw_html) - return JSONResponse({"html": processed_html, "url": body.url, "success": True}) + response_data = {"html": processed_html, "url": body.url, "success": True} + + # Apply output control if specified + if body.output: + response_data, _ = apply_output_control(response_data, body.output) + + return JSONResponse(response_data) except Exception as e: raise HTTPException(500, detail=str(e)) + # Screenshot endpoint @@ -369,8 +410,11 @@ async def generate_screenshot( Then in result instead of the screenshot you will get a path to the saved file. """ from crawler_pool import get_crawler + try: - cfg = CrawlerRunConfig(screenshot=True, screenshot_wait_for=body.screenshot_wait_for) + cfg = CrawlerRunConfig( + screenshot=True, screenshot_wait_for=body.screenshot_wait_for + ) crawler = await get_crawler(get_default_browser_config()) results = await crawler.arun(url=body.url, config=cfg) if not results[0].success: @@ -381,11 +425,19 @@ async def generate_screenshot( os.makedirs(os.path.dirname(abs_path), exist_ok=True) with open(abs_path, "wb") as f: f.write(base64.b64decode(screenshot_data)) - return {"success": True, "path": abs_path} - return {"success": True, "screenshot": screenshot_data} + response_data = {"success": True, "path": abs_path} + else: + response_data = {"success": True, "screenshot": screenshot_data} + + # Apply output control if specified + if body.output: + response_data, _ = apply_output_control(response_data, body.output) + + return response_data except Exception as e: raise HTTPException(500, detail=str(e)) + # PDF endpoint @@ -403,6 +455,7 @@ async def generate_pdf( Then in result instead of the PDF you will get a path to the saved file. """ from crawler_pool import get_crawler + try: cfg = CrawlerRunConfig(pdf=True) crawler = await get_crawler(get_default_browser_config()) @@ -415,8 +468,18 @@ async def generate_pdf( os.makedirs(os.path.dirname(abs_path), exist_ok=True) with open(abs_path, "wb") as f: f.write(pdf_data) - return {"success": True, "path": abs_path} - return {"success": True, "pdf": base64.b64encode(pdf_data).decode()} + response_data = {"success": True, "path": abs_path} + else: + response_data = { + "success": True, + "pdf": base64.b64encode(pdf_data).decode(), + } + + # Apply output control if specified + if body.output: + response_data, _ = apply_output_control(response_data, body.output) + + return response_data except Exception as e: raise HTTPException(500, detail=str(e)) @@ -475,6 +538,7 @@ class MarkdownGenerationResult(BaseModel): """ from crawler_pool import get_crawler + try: cfg = CrawlerRunConfig(js_code=body.scripts) crawler = await get_crawler(get_default_browser_config()) @@ -482,6 +546,11 @@ class MarkdownGenerationResult(BaseModel): if not results[0].success: raise HTTPException(500, detail=results[0].error_message or "Crawl failed") data = results[0].model_dump() + + # Apply output control if specified + if body.output: + data, _ = apply_output_control(data, body.output) + return JSONResponse(data) except Exception as e: raise HTTPException(500, detail=str(e)) @@ -496,7 +565,9 @@ async def llm_endpoint( ): if not q: raise HTTPException(400, "Query parameter 'q' is required") - if not url.startswith(("http://", "https://")) and not url.startswith(("raw:", "raw://")): + if not url.startswith(("http://", "https://")) and not url.startswith( + ("raw:", "raw://") + ): url = "https://" + url answer = await handle_llm_qa(url, q, config) return JSONResponse({"answer": answer}) @@ -505,31 +576,29 @@ async def llm_endpoint( @app.get("/schema") async def get_schema(): from crawl4ai import BrowserConfig, CrawlerRunConfig - return {"browser": BrowserConfig().dump(), - "crawler": CrawlerRunConfig().dump()} + + return {"browser": BrowserConfig().dump(), "crawler": CrawlerRunConfig().dump()} @app.get("/hooks/info") async def get_hooks_info(): """Get information about available hook points and their signatures""" from hook_manager import UserHookManager - + hook_info = {} for hook_point, params in UserHookManager.HOOK_SIGNATURES.items(): hook_info[hook_point] = { "parameters": params, "description": get_hook_description(hook_point), - "example": get_hook_example(hook_point) + "example": get_hook_example(hook_point), } - - return JSONResponse({ - "available_hooks": hook_info, - "timeout_limits": { - "min": 1, - "max": 120, - "default": 30 + + return JSONResponse( + { + "available_hooks": hook_info, + "timeout_limits": {"min": 1, "max": 120, "default": 30}, } - }) + ) def get_hook_description(hook_point: str) -> str: @@ -542,7 +611,7 @@ def get_hook_description(hook_point: str) -> str: "on_user_agent_updated": "Called when user agent is updated", "on_execution_started": "Called when custom JavaScript execution begins", "before_retrieve_html": "Called before retrieving the final HTML - ideal for scrolling", - "before_return_html": "Called just before returning the HTML content" + "before_return_html": "Called just before returning the HTML content", } return descriptions.get(hook_point, "") @@ -558,19 +627,17 @@ def get_hook_example(hook_point: str) -> str: 'domain': '.example.com' }]) return page""", - "before_retrieve_html": """async def hook(page, context, **kwargs): # Scroll to load lazy content await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await page.wait_for_timeout(2000) return page""", - "before_goto": """async def hook(page, context, url, **kwargs): # Set custom headers await page.set_extra_http_headers({ 'X-Custom-Header': 'value' }) - return page""" + return page""", } return examples.get(hook_point, "# Implement your hook logic here\nreturn page") @@ -604,25 +671,34 @@ async def crawl( crawler_config = CrawlerRunConfig.load(crawl_request.crawler_config) if crawler_config.stream: return await stream_process(crawl_request=crawl_request) - + # Prepare hooks config if provided hooks_config = None if crawl_request.hooks: hooks_config = { - 'code': crawl_request.hooks.code, - 'timeout': crawl_request.hooks.timeout + "code": crawl_request.hooks.code, + "timeout": crawl_request.hooks.timeout, } - + results = await handle_crawl_request( urls=crawl_request.urls, browser_config=crawl_request.browser_config, crawler_config=crawl_request.crawler_config, config=config, - hooks_config=hooks_config + hooks_config=hooks_config, ) # check if all of the results are not successful if all(not result["success"] for result in results["results"]): - raise HTTPException(500, f"Crawl request failed: {results['results'][0]['error_message']}") + raise HTTPException( + 500, f"Crawl request failed: {results['results'][0]['error_message']}" + ) + + # Apply output control to each result if specified + if crawl_request.output: + results["results"] = apply_output_control_to_batch( + results["results"], crawl_request.output + ) + return JSONResponse(results) @@ -638,24 +714,24 @@ async def crawl_stream( return await stream_process(crawl_request=crawl_request) + async def stream_process(crawl_request: CrawlRequestWithHooks): - # Prepare hooks config if provided# Prepare hooks config if provided hooks_config = None if crawl_request.hooks: hooks_config = { - 'code': crawl_request.hooks.code, - 'timeout': crawl_request.hooks.timeout + "code": crawl_request.hooks.code, + "timeout": crawl_request.hooks.timeout, } - + crawler, gen, hooks_info = await handle_stream_crawl_request( urls=crawl_request.urls, browser_config=crawl_request.browser_config, crawler_config=crawl_request.crawler_config, config=config, - hooks_config=hooks_config + hooks_config=hooks_config, ) - + # Add hooks info to response headers if available headers = { "Cache-Control": "no-cache", @@ -664,8 +740,17 @@ async def stream_process(crawl_request: CrawlRequestWithHooks): } if hooks_info: import json - headers["X-Hooks-Status"] = json.dumps(hooks_info['status']['status']) - + + headers["X-Hooks-Status"] = json.dumps(hooks_info["status"]["status"]) + + # Wrap stream_results with output control if specified + if crawl_request.output: + return StreamingResponse( + stream_results_with_output_control(crawler, gen, crawl_request.output), + media_type="application/x-ndjson", + headers=headers, + ) + return StreamingResponse( stream_results(crawler, gen), media_type="application/x-ndjson", @@ -673,14 +758,56 @@ async def stream_process(crawl_request: CrawlRequestWithHooks): ) +async def stream_results_with_output_control(crawler, results_gen, output_control): + """Wrapper around stream_results that applies output control to each result.""" + import json + from utils import datetime_handler + from base64 import b64encode + + try: + async for result in results_gen: + try: + result_dict = result.model_dump() + # Ensure fit_html is JSON-serializable + if "fit_html" in result_dict and not ( + result_dict["fit_html"] is None + or isinstance(result_dict["fit_html"], str) + ): + result_dict["fit_html"] = None + # If PDF exists, encode it to base64 + if result_dict.get("pdf") is not None: + result_dict["pdf"] = b64encode(result_dict["pdf"]).decode("utf-8") + + # Apply output control to this result + result_dict, _ = apply_output_control(result_dict, output_control) + + logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}") + data = json.dumps(result_dict, default=datetime_handler) + "\n" + yield data.encode("utf-8") + except Exception as e: + logger.error(f"Serialization error: {e}") + error_response = { + "error": str(e), + "url": getattr(result, "url", "unknown"), + } + yield (json.dumps(error_response) + "\n").encode("utf-8") + + yield json.dumps({"status": "completed"}).encode("utf-8") + + except asyncio.CancelledError: + logger.warning("Client disconnected during streaming") + finally: + pass + + def chunk_code_functions(code_md: str) -> List[str]: """Extract each function/class from markdown code blocks per file.""" pattern = re.compile( # match "## File: " then a ```py fence, then capture until the closing ``` - r'##\s*File:\s*(?P.+?)\s*?\r?\n' # file header - r'```py\s*?\r?\n' # opening fence - r'(?P.*?)(?=\r?\n```)', # code block - re.DOTALL + r"##\s*File:\s*(?P.+?)\s*?\r?\n" # file header + r"```py\s*?\r?\n" # opening fence + r"(?P.*?)(?=\r?\n```)", # code block + re.DOTALL, ) chunks: List[str] = [] for m in pattern.finditer(code_md): @@ -720,15 +847,14 @@ async def get_context( request: Request, _td: Dict = Depends(token_dep), context_type: str = Query("all", regex="^(code|doc|all)$"), - query: Optional[str] = Query( - None, description="search query to filter chunks"), + query: Optional[str] = Query(None, description="search query to filter chunks"), score_ratio: float = Query( - 0.5, ge=0.0, le=1.0, description="min score as fraction of max_score"), - max_results: int = Query( - 20, ge=1, description="absolute cap on returned chunks"), + 0.5, ge=0.0, le=1.0, description="min score as fraction of max_score" + ), + max_results: int = Query(20, ge=1, description="absolute cap on returned chunks"), ): """ - This end point is design for any questions about Crawl4ai library. It returns a plain text markdown with extensive information about Crawl4ai. + This end point is design for any questions about Crawl4ai library. It returns a plain text markdown with extensive information about Crawl4ai. You can use this as a context for any AI assistant. Use this endpoint for AI assistants to retrieve library context for decision making or code generation tasks. Alway is BEST practice you provide a query to filter the context. Otherwise the lenght of the response will be very long. @@ -762,10 +888,12 @@ async def get_context( return JSONResponse({"code_context": code_content}) if context_type == "doc": return JSONResponse({"doc_context": doc_content}) - return JSONResponse({ - "code_context": code_content, - "doc_context": doc_content, - }) + return JSONResponse( + { + "code_context": code_content, + "doc_context": doc_content, + } + ) tokens = query.split() results: Dict[str, List[Dict[str, float]]] = {} @@ -789,7 +917,7 @@ async def get_context( max_sd = float(scores_d.max()) if scores_d.size > 0 else 0.0 cutoff_d = max_sd * score_ratio idxs = [i for i, s in enumerate(scores_d) if s >= cutoff_d] - neighbors = set(i for idx in idxs for i in (idx-1, idx, idx+1)) + neighbors = set(i for idx in idxs for i in (idx - 1, idx, idx + 1)) valid = [i for i in sorted(neighbors) if 0 <= i < len(sections)] valid = valid[:max_results] results["doc_results"] = [ @@ -801,14 +929,12 @@ async def get_context( # attach MCP layer (adds /mcp/ws, /mcp/sse, /mcp/schema) print(f"MCP server running on {config['app']['host']}:{config['app']['port']}") -attach_mcp( - app, - base_url=f"http://{config['app']['host']}:{config['app']['port']}" -) +attach_mcp(app, base_url=f"http://{config['app']['host']}:{config['app']['port']}") # ────────────────────────── cli ────────────────────────────── if __name__ == "__main__": import uvicorn + uvicorn.run( "server:app", host=config["app"]["host"], diff --git a/deploy/docker/tests/test_output_control.py b/deploy/docker/tests/test_output_control.py new file mode 100644 index 000000000..eb1e12907 --- /dev/null +++ b/deploy/docker/tests/test_output_control.py @@ -0,0 +1,452 @@ +#!/usr/bin/env python3 +""" +Test Suite: Output Control (Pagination) Feature + +Tests the apply_output_control() function and endpoint integration. +Requires the crawl4ai container to be running on port 11235. + +Run with: pytest test_output_control.py -v +""" + +import asyncio +import pytest +import httpx +import json +import sys +import os + +# Add parent directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from output_control import apply_output_control, apply_output_control_to_batch +from schemas import OutputControl, OutputMeta, ContentFieldStats, CollectionStats + +# Config +BASE_URL = os.environ.get("CRAWL4AI_URL", "http://localhost:11235") +TEST_URL = "https://httpbin.org/html" # Simple, stable test page + + +# ============================================================================= +# Unit Tests for apply_output_control() +# ============================================================================= + + +class TestApplyOutputControlUnit: + """Unit tests for the apply_output_control function.""" + + def test_no_control_returns_unchanged(self): + """When control is None, data should be returned unchanged.""" + data = { + "url": "https://example.com", + "html": "Test content", + "success": True, + "links": {"internal": [{"href": "/page1"}], "external": []}, + } + original_data = data.copy() + + result, meta = apply_output_control(data, None) + + assert result == original_data + assert meta is None + assert "_output_meta" not in result + + def test_empty_control_returns_unchanged(self): + """When control has no options set, data should be unchanged.""" + data = {"html": "test content", "url": "https://example.com"} + control = OutputControl() # All None defaults + + result, meta = apply_output_control(data, control) + + assert result == data + assert meta is None + assert "_output_meta" not in result + + def test_content_limit_truncates_text_fields(self): + """content_limit should truncate text fields to specified length.""" + data = { + "html": "A" * 10000, + "cleaned_html": "B" * 5000, + "extracted_content": "C" * 3000, + } + control = OutputControl(content_limit=1000) + + result, meta = apply_output_control(data, control) + + assert len(result["html"]) == 1000 + assert len(result["cleaned_html"]) == 1000 + assert len(result["extracted_content"]) == 1000 + assert meta is not None + assert meta.truncated is True + assert "_output_meta" in result + + def test_content_offset_skips_characters(self): + """content_offset should skip initial characters.""" + data = {"html": "0123456789ABCDEF"} + control = OutputControl(content_offset=10) + + result, meta = apply_output_control(data, control) + + assert result["html"] == "ABCDEF" + assert meta.truncated is True + assert meta.content_stats["html"].offset == 10 + + def test_content_offset_and_limit_combined(self): + """content_offset + content_limit should work together.""" + data = {"html": "0123456789ABCDEFGHIJ"} + control = OutputControl(content_offset=5, content_limit=5) + + result, meta = apply_output_control(data, control) + + assert result["html"] == "56789" + assert meta.content_stats["html"].total_chars == 20 + assert meta.content_stats["html"].returned_chars == 5 + assert meta.content_stats["html"].offset == 5 + assert meta.content_stats["html"].has_more is True + + def test_nested_markdown_fields_truncated(self): + """Nested markdown fields should be paginated.""" + data = { + "markdown": { + "raw_markdown": "X" * 5000, + "fit_markdown": "Y" * 3000, + "references_markdown": "Z" * 1000, + } + } + control = OutputControl(content_limit=500) + + result, meta = apply_output_control(data, control) + + assert len(result["markdown"]["raw_markdown"]) == 500 + assert len(result["markdown"]["fit_markdown"]) == 500 + assert len(result["markdown"]["references_markdown"]) == 500 + assert "markdown.raw_markdown" in meta.content_stats + assert "markdown.fit_markdown" in meta.content_stats + + def test_max_links_limits_collections(self): + """max_links should limit both internal and external links.""" + data = { + "links": { + "internal": [{"href": f"/page{i}"} for i in range(100)], + "external": [{"href": f"https://ext{i}.com"} for i in range(50)], + } + } + control = OutputControl(max_links=10) + + result, meta = apply_output_control(data, control) + + assert len(result["links"]["internal"]) == 10 + assert len(result["links"]["external"]) == 10 + assert meta.collection_stats["links.internal"].total_count == 100 + assert meta.collection_stats["links.internal"].returned_count == 10 + assert meta.collection_stats["links.external"].total_count == 50 + assert meta.collection_stats["links.external"].returned_count == 10 + + def test_max_media_limits_all_media_types(self): + """max_media should limit images, videos, and audios.""" + data = { + "media": { + "images": [{"src": f"img{i}.jpg"} for i in range(30)], + "videos": [{"src": f"vid{i}.mp4"} for i in range(20)], + "audios": [{"src": f"aud{i}.mp3"} for i in range(10)], + } + } + control = OutputControl(max_media=5) + + result, meta = apply_output_control(data, control) + + assert len(result["media"]["images"]) == 5 + assert len(result["media"]["videos"]) == 5 + assert len(result["media"]["audios"]) == 5 + assert meta.collection_stats["media.images"].total_count == 30 + + def test_max_tables_limits_tables(self): + """max_tables should limit tables array.""" + data = {"tables": [f"" for i in range(25)]} + control = OutputControl(max_tables=3) + + result, meta = apply_output_control(data, control) + + assert len(result["tables"]) == 3 + assert meta.collection_stats["tables"].total_count == 25 + assert meta.collection_stats["tables"].returned_count == 3 + + def test_exclude_fields_removes_top_level(self): + """exclude_fields should remove specified top-level fields.""" + data = { + "html": "content", + "cleaned_html": "content", + "links": {"internal": []}, + "metadata": {"title": "Test"}, + } + control = OutputControl(exclude_fields=["html", "cleaned_html", "metadata"]) + + result, meta = apply_output_control(data, control) + + assert "html" not in result + assert "cleaned_html" not in result + assert "metadata" not in result + assert "links" in result # Not excluded + assert meta.excluded_fields == ["html", "cleaned_html", "metadata"] + + def test_exclude_fields_nested_dot_notation(self): + """exclude_fields should support dot notation for nested fields.""" + data = { + "markdown": { + "raw_markdown": "raw content", + "fit_markdown": "fit content", + "references_markdown": "refs", + } + } + control = OutputControl(exclude_fields=["markdown.references_markdown"]) + + result, meta = apply_output_control(data, control) + + assert "raw_markdown" in result["markdown"] + assert "fit_markdown" in result["markdown"] + assert "references_markdown" not in result["markdown"] + assert "markdown.references_markdown" in meta.excluded_fields + + def test_missing_fields_handled_gracefully(self): + """Function should handle missing fields without errors.""" + data = {"url": "https://example.com", "success": True} + control = OutputControl( + content_limit=100, max_links=10, exclude_fields=["nonexistent_field"] + ) + + result, meta = apply_output_control(data, control) + + # Should not raise, should return data unchanged except meta + assert result["url"] == "https://example.com" + # Meta should still be None since nothing was actually truncated + # (the fields don't exist, so nothing to truncate) + + def test_output_meta_added_to_result(self): + """_output_meta should be added when truncation occurs.""" + data = {"html": "X" * 1000} + control = OutputControl(content_limit=100) + + result, meta = apply_output_control(data, control) + + assert "_output_meta" in result + assert result["_output_meta"]["truncated"] is True + assert "content_stats" in result["_output_meta"] + + def test_deep_copy_preserves_original(self): + """Original data should not be modified.""" + data = { + "html": "X" * 1000, + "links": {"internal": [{"href": "/a"}, {"href": "/b"}]}, + } + original_html_len = len(data["html"]) + original_links_count = len(data["links"]["internal"]) + + control = OutputControl(content_limit=100, max_links=1) + result, _ = apply_output_control(data, control) + + # Original unchanged + assert len(data["html"]) == original_html_len + assert len(data["links"]["internal"]) == original_links_count + # Result truncated + assert len(result["html"]) == 100 + assert len(result["links"]["internal"]) == 1 + + +class TestApplyOutputControlBatch: + """Tests for apply_output_control_to_batch function.""" + + def test_batch_applies_to_all_results(self): + """Output control should apply to every result in batch.""" + results = [ + {"html": "A" * 1000, "url": "https://a.com"}, + {"html": "B" * 1000, "url": "https://b.com"}, + {"html": "C" * 1000, "url": "https://c.com"}, + ] + control = OutputControl(content_limit=100) + + processed = apply_output_control_to_batch(results, control) + + assert len(processed) == 3 + for r in processed: + assert len(r["html"]) == 100 + assert "_output_meta" in r + + def test_batch_none_control_returns_unchanged(self): + """Batch with None control should return original list.""" + results = [{"html": "test"}, {"html": "test2"}] + + processed = apply_output_control_to_batch(results, None) + + assert processed == results + + +# ============================================================================= +# Integration Tests (require running container) +# ============================================================================= + + +@pytest.fixture +def http_client(): + """Async HTTP client for integration tests.""" + return httpx.AsyncClient(base_url=BASE_URL, timeout=60.0) + + +class TestOutputControlIntegration: + """Integration tests against running crawl4ai server.""" + + @pytest.mark.asyncio + async def test_md_endpoint_with_content_limit(self, http_client): + """Test /md endpoint respects content_limit.""" + payload = {"url": TEST_URL, "f": "fit", "output": {"content_limit": 500}} + + response = await http_client.post("/md", json=payload) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + # Check that markdown was truncated if longer than 500 + if "_output_meta" in data: + assert data["_output_meta"]["truncated"] is True + + @pytest.mark.asyncio + async def test_md_endpoint_without_output_unchanged(self, http_client): + """Test /md endpoint without output param returns full response.""" + payload = {"url": TEST_URL, "f": "fit"} + + response = await http_client.post("/md", json=payload) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert "_output_meta" not in data + + @pytest.mark.asyncio + async def test_crawl_endpoint_with_output_control(self, http_client): + """Test /crawl endpoint with output control.""" + payload = { + "urls": [TEST_URL], + "browser_config": {}, + "crawler_config": {}, + "output": { + "content_limit": 1000, + "max_links": 5, + "exclude_fields": ["response_headers", "network_requests"], + }, + } + + response = await http_client.post("/crawl", json=payload) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + + # Check results have output control applied + for result in data["results"]: + if result.get("success"): + assert "response_headers" not in result + assert "network_requests" not in result + + @pytest.mark.asyncio + async def test_execute_js_endpoint_with_output(self, http_client): + """Test /execute_js endpoint with output control.""" + payload = { + "url": TEST_URL, + "scripts": ["document.title"], + "output": {"content_limit": 500, "exclude_fields": ["html"]}, + } + + response = await http_client.post("/execute_js", json=payload) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert "html" not in data + + @pytest.mark.asyncio + async def test_mcp_schema_includes_output_control(self, http_client): + """Test that MCP schema correctly exposes OutputControl parameters.""" + response = await http_client.get("/mcp/schema") + + assert response.status_code == 200 + schema = response.json() + + # Find the 'md' tool + md_tool = next((t for t in schema["tools"] if t["name"] == "md"), None) + assert md_tool is not None, "md tool not found in MCP schema" + + # Check that output parameter is in the schema + input_schema = md_tool["inputSchema"] + + # The schema should have $defs or definitions for OutputControl + assert "$defs" in input_schema or "definitions" in input_schema + defs = input_schema.get("$defs", input_schema.get("definitions", {})) + + assert "OutputControl" in defs, "OutputControl not found in schema definitions" + + # Verify OutputControl has expected properties + output_control_schema = defs["OutputControl"] + props = output_control_schema.get("properties", {}) + assert "content_limit" in props + assert "content_offset" in props + assert "max_links" in props + assert "exclude_fields" in props + + +# ============================================================================= +# Edge Case Tests +# ============================================================================= + + +class TestOutputControlEdgeCases: + """Test edge cases and error conditions.""" + + def test_offset_beyond_content_length(self): + """Offset greater than content length should return empty string.""" + data = {"html": "short"} + control = OutputControl(content_offset=100) + + result, meta = apply_output_control(data, control) + + assert result["html"] == "" + assert meta.content_stats["html"].returned_chars == 0 + assert meta.content_stats["html"].has_more is False + + def test_limit_zero_not_allowed(self): + """content_limit=0 should be rejected by Pydantic validation.""" + with pytest.raises(Exception): # Pydantic ValidationError + OutputControl(content_limit=0) + + def test_negative_offset_not_allowed(self): + """Negative offset should be rejected by Pydantic validation.""" + with pytest.raises(Exception): + OutputControl(content_offset=-1) + + def test_empty_links_dict_handled(self): + """Empty links dict should not cause errors.""" + data = {"links": {}} + control = OutputControl(max_links=10) + + result, meta = apply_output_control(data, control) + + assert result["links"] == {} + # No truncation occurred + assert meta is None or not meta.truncated + + def test_non_string_field_ignored(self): + """Non-string fields should not be truncated.""" + data = { + "html": "content", + "status_code": 200, + "success": True, + } + control = OutputControl(content_limit=3) + + result, meta = apply_output_control(data, control) + + assert result["html"] == "con" + assert result["status_code"] == 200 + assert result["success"] is True + + +if __name__ == "__main__": + # Run unit tests directly + pytest.main([__file__, "-v", "-x"])