2727
2828# Standard
2929import asyncio
30- from contextlib import asynccontextmanager
30+ from contextlib import asynccontextmanager , suppress
3131from datetime import datetime
3232import json
3333import os as _os # local alias to avoid collisions
114114from mcpgateway .services .import_service import ImportError as ImportServiceError
115115from mcpgateway .services .import_service import ImportService , ImportValidationError
116116from mcpgateway .services .logging_service import LoggingService
117+ from mcpgateway .services .log_aggregator import get_log_aggregator
117118from mcpgateway .services .metrics import setup_metrics
118119from mcpgateway .services .prompt_service import PromptError , PromptNameConflictError , PromptNotFoundError , PromptService
119120from mcpgateway .services .resource_service import ResourceError , ResourceNotFoundError , ResourceService , ResourceURIConflictError
@@ -392,6 +393,10 @@ async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
392393 Exception: Any unhandled error that occurs during service
393394 initialisation or shutdown is re-raised to the caller.
394395 """
396+ aggregation_stop_event : Optional [asyncio .Event ] = None
397+ aggregation_loop_task : Optional [asyncio .Task ] = None
398+ aggregation_backfill_task : Optional [asyncio .Task ] = None
399+
395400 # Initialize logging service FIRST to ensure all logging goes to dual output
396401 await logging_service .initialize ()
397402 logger .info ("Starting MCP Gateway services" )
@@ -447,6 +452,46 @@ async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
447452 # Reconfigure uvicorn loggers after startup to capture access logs in dual output
448453 logging_service .configure_uvicorn_after_startup ()
449454
455+ if settings .metrics_aggregation_enabled :
456+ aggregation_stop_event = asyncio .Event ()
457+ log_aggregator = get_log_aggregator ()
458+
459+ async def run_log_backfill () -> None :
460+ hours = getattr (settings , "metrics_aggregation_backfill_hours" , 0 )
461+ if hours <= 0 :
462+ return
463+ try :
464+ await asyncio .to_thread (log_aggregator .backfill , hours )
465+ logger .info ("Log aggregation backfill completed for last %s hour(s)" , hours )
466+ except Exception as backfill_error : # pragma: no cover - defensive logging
467+ logger .warning ("Log aggregation backfill failed: %s" , backfill_error )
468+
469+ async def run_log_aggregation_loop () -> None :
470+ interval_seconds = max (1 , int (settings .metrics_aggregation_window_minutes )) * 60
471+ logger .info (
472+ "Starting log aggregation loop (window=%s min)" ,
473+ log_aggregator .aggregation_window_minutes ,
474+ )
475+ try :
476+ while not aggregation_stop_event .is_set ():
477+ try :
478+ await asyncio .to_thread (log_aggregator .aggregate_all_components )
479+ except Exception as agg_error : # pragma: no cover - defensive logging
480+ logger .warning ("Log aggregation loop iteration failed: %s" , agg_error )
481+
482+ try :
483+ await asyncio .wait_for (aggregation_stop_event .wait (), timeout = interval_seconds )
484+ except asyncio .TimeoutError :
485+ continue
486+ except asyncio .CancelledError :
487+ logger .debug ("Log aggregation loop cancelled" )
488+ raise
489+ finally :
490+ logger .info ("Log aggregation loop stopped" )
491+
492+ aggregation_backfill_task = asyncio .create_task (run_log_backfill ())
493+ aggregation_loop_task = asyncio .create_task (run_log_aggregation_loop ())
494+
450495 yield
451496 except Exception as e :
452497 logger .error (f"Error during startup: { str (e )} " )
@@ -460,6 +505,14 @@ async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
460505 raise SystemExit (1 )
461506 raise
462507 finally :
508+ if aggregation_stop_event is not None :
509+ aggregation_stop_event .set ()
510+ for task in (aggregation_backfill_task , aggregation_loop_task ):
511+ if task :
512+ task .cancel ()
513+ with suppress (asyncio .CancelledError ):
514+ await task
515+
463516 # Shutdown plugin manager
464517 if plugin_manager :
465518 try :
0 commit comments