Skip to content

Commit 64d43cc

Browse files
committed
configurable max concurrent health check batch size
Signed-off-by: Keval Mahajan <mahajankeval23@gmail.com>
1 parent 8eb28cd commit 64d43cc

File tree

4 files changed

+38
-11
lines changed

4 files changed

+38
-11
lines changed

.env.example

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,26 +733,41 @@ PROMPT_CACHE_SIZE=100
733733
MAX_PROMPT_SIZE=102400
734734
PROMPT_RENDER_TIMEOUT=10
735735

736+
#####################################
737+
# MCP server Health Check COnfigurations
738+
#####################################
739+
736740
# Health Check Configuration
737741
HEALTH_CHECK_INTERVAL=60
738742
# Health check timeout in seconds (default: 10, matches config.py)
739743
HEALTH_CHECK_TIMEOUT=10
740744
UNHEALTHY_THRESHOLD=5
741745
# Gateway URL validation timeout in seconds (default: 5, matches config.py)
742746
GATEWAY_VALIDATION_TIMEOUT=5
747+
# Maximum number of concurrent health checks per worker. Prevents resource exhaustion during health check operations (default: 20, matches config.py)
748+
MAX_CONCURRENT_HEALTH_CHECKS=20
743749

744750
# File lock name for gateway service leader election
745751
# Used to coordinate multiple gateway instances when running in cluster mode
746752
# Default: "gateway_service_leader.lock"
747753
FILELOCK_NAME=gateway_service_leader.lock
748754

755+
756+
#####################################
757+
# Default Root Paths
758+
#####################################
759+
749760
# Default root paths (JSON array)
750761
# List of default root paths for resource resolution
751762
# Example: ["/api/v1", "/mcp"]
752763
# Default: []
753764
DEFAULT_ROOTS=[]
754765

766+
767+
#####################################
755768
# OpenTelemetry Observability Configuration
769+
#####################################
770+
756771
# Enable distributed tracing and metrics collection
757772
# Options: true (default), false
758773
OTEL_ENABLE_OBSERVABILITY=true

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1996,6 +1996,7 @@ ENABLE_METRICS=false
19961996
| `UNHEALTHY_THRESHOLD` | Fail-count before peer deactivation, | `3` | int > 0 |
19971997
| | Set to -1 if deactivation is not needed. | | |
19981998
| `GATEWAY_VALIDATION_TIMEOUT` | Gateway URL validation timeout (secs) | `5` | int > 0 |
1999+
| `MAX_CONCURRENT_HEALTH_CHECKS` | Max Concurrent health checks | `20` | int > 0 |
19992000
| `FILELOCK_NAME` | File lock for leader election | `gateway_service_leader.lock` | string |
20002001
| `DEFAULT_ROOTS` | Default root paths for resources | `[]` | JSON array |
20012002

mcpgateway/config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,8 @@ def parse_issuers(cls, v: Any) -> list[str]:
925925
health_check_interval: int = 60 # seconds
926926
health_check_timeout: int = 10 # seconds
927927
unhealthy_threshold: int = 5 # after this many failures, mark as Offline
928-
928+
max_concurrent_health_checks: int = 20 # maximum concurrent health checks per worker
929+
929930
# Validation Gateway URL
930931
gateway_validation_timeout: int = 5 # seconds
931932
gateway_max_redirects: int = 5

mcpgateway/services/gateway_service.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2141,22 +2141,32 @@ async def check_health_of_gateways(self, db: Session, gateways: List[DbGateway],
21412141
True
21422142
"""
21432143
start_time = time.monotonic()
2144+
concurrency_limit = min(settings.max_concurrent_health_checks, max(10, os.cpu_count() * 5)) # adaptive concurrency
2145+
semaphore = asyncio.Semaphore(concurrency_limit)
2146+
2147+
async def limited_check(gateway: DbGateway):
2148+
async with semaphore:
2149+
await self._check_single_gateway_health(db, gateway, user_email)
21442150

21452151
# Create trace span for health check batch
21462152
with create_span("gateway.health_check_batch", {"gateway.count": len(gateways), "check.type": "health"}) as batch_span:
2147-
# Create tasks for concurrent health checks
2148-
tasks = []
2149-
for gateway in gateways:
2150-
if gateway.auth_type == "one_time_auth":
2151-
continue # Skip health check for one_time auth gateways
2152-
tasks.append(self._check_single_gateway_health(db, gateway, user_email))
2153-
2154-
# Execute all health checks concurrently
2155-
if tasks:
2153+
# Chunk processing to avoid overload
2154+
if not gateways:
2155+
return True
2156+
chunk_size = concurrency_limit
2157+
for i in range(0, len(gateways), chunk_size):
2158+
# batch will be a sublist of gateways from index i to i + chunk_size
2159+
batch = gateways[i:i + chunk_size]
2160+
2161+
# Each task is a health check for a gateway in the batch, excluding those with auth_type == "one_time_auth"
2162+
tasks = [limited_check(gw) for gw in batch if gw.auth_type != "one_time_auth"]
2163+
2164+
# Execute all health checks concurrently
21562165
await asyncio.gather(*tasks, return_exceptions=True)
2166+
await asyncio.sleep(0.05) # small pause prevents network saturation
21572167

21582168
elapsed = time.monotonic() - start_time
2159-
2169+
21602170
if batch_span:
21612171
batch_span.set_attribute("check.duration_ms", int(elapsed * 1000))
21622172
batch_span.set_attribute("check.completed", True)

0 commit comments

Comments
 (0)