Skip to content

Commit d80f9ae

Browse files
committed
feat(proxy): add pass-through deployment filtering methods
Add dedicated methods to filter and select deployments for pass-through endpoints: - Implement get_available_deployment_for_pass_through() to ensure only deployments with use_in_pass_through=True are considered - Implement async_get_available_deployment_for_pass_through() for async operations - Add _filter_pass_through_deployments() helper method to filter by use_in_pass_through flag - Update vertex pass-through route to use the new dedicated method This ensures pass-through endpoints respect the use_in_pass_through configuration and apply proper load balancing strategy only to configured deployments. Add comprehensive tests to verify filtering and load balancing behavior.
1 parent 8a4f674 commit d80f9ae

File tree

3 files changed

+508
-16
lines changed

3 files changed

+508
-16
lines changed

litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1585,12 +1585,12 @@ async def _base_vertex_proxy_route(
15851585

15861586
if llm_router:
15871587
try:
1588-
deployment = llm_router.get_available_deployment(model=model_id)
1588+
# Use the dedicated pass-through deployment selection method to automatically filter use_in_pass_through=True
1589+
deployment = llm_router.get_available_deployment_for_pass_through(model=model_id)
15891590
if deployment:
15901591
litellm_params = deployment.get("litellm_params", {})
1591-
if litellm_params.get("use_in_pass_through"):
1592-
vertex_project = litellm_params.get("vertex_project")
1593-
vertex_location = litellm_params.get("vertex_location")
1592+
vertex_project = litellm_params.get("vertex_project")
1593+
vertex_location = litellm_params.get("vertex_location")
15941594
except Exception as e:
15951595
verbose_proxy_logger.debug(
15961596
f"Error getting available deployment for model {model_id}: {e}"

litellm/router.py

Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7683,6 +7683,154 @@ async def async_get_available_deployment(
76837683
)
76847684
raise e
76857685

7686+
async def async_get_available_deployment_for_pass_through(
7687+
self,
7688+
model: str,
7689+
request_kwargs: Dict,
7690+
messages: Optional[List[Dict[str, str]]] = None,
7691+
input: Optional[Union[str, List]] = None,
7692+
specific_deployment: Optional[bool] = False,
7693+
):
7694+
"""
7695+
Async version of get_available_deployment_for_pass_through
7696+
7697+
Only returns deployments configured with use_in_pass_through=True
7698+
"""
7699+
try:
7700+
parent_otel_span = _get_parent_otel_span_from_kwargs(request_kwargs)
7701+
7702+
# 1. Execute pre-routing hook
7703+
pre_routing_hook_response = await self.async_pre_routing_hook(
7704+
model=model,
7705+
request_kwargs=request_kwargs,
7706+
messages=messages,
7707+
input=input,
7708+
specific_deployment=specific_deployment,
7709+
)
7710+
if pre_routing_hook_response is not None:
7711+
model = pre_routing_hook_response.model
7712+
messages = pre_routing_hook_response.messages
7713+
7714+
# 2. Get healthy deployments
7715+
healthy_deployments = await self.async_get_healthy_deployments(
7716+
model=model,
7717+
request_kwargs=request_kwargs,
7718+
messages=messages,
7719+
input=input,
7720+
specific_deployment=specific_deployment,
7721+
parent_otel_span=parent_otel_span,
7722+
)
7723+
7724+
# 3. If specific deployment returned, verify if it supports pass-through
7725+
if isinstance(healthy_deployments, dict):
7726+
litellm_params = healthy_deployments.get("litellm_params", {})
7727+
if litellm_params.get("use_in_pass_through"):
7728+
return healthy_deployments
7729+
else:
7730+
raise litellm.BadRequestError(
7731+
message=f"Deployment {healthy_deployments.get('model_info', {}).get('id')} does not support pass-through endpoint (use_in_pass_through=False)",
7732+
model=model,
7733+
llm_provider="",
7734+
)
7735+
7736+
# 4. Filter deployments that support pass-through
7737+
pass_through_deployments = self._filter_pass_through_deployments(
7738+
healthy_deployments=healthy_deployments
7739+
)
7740+
7741+
if len(pass_through_deployments) == 0:
7742+
raise litellm.BadRequestError(
7743+
message=f"Model {model} has no deployments configured with use_in_pass_through=True. Please add use_in_pass_through: true to the deployment configuration",
7744+
model=model,
7745+
llm_provider="",
7746+
)
7747+
7748+
# 5. Apply load balancing strategy
7749+
start_time = time.perf_counter()
7750+
if (
7751+
self.routing_strategy == "usage-based-routing-v2"
7752+
and self.lowesttpm_logger_v2 is not None
7753+
):
7754+
deployment = (
7755+
await self.lowesttpm_logger_v2.async_get_available_deployments(
7756+
model_group=model,
7757+
healthy_deployments=pass_through_deployments, # type: ignore
7758+
messages=messages,
7759+
input=input,
7760+
)
7761+
)
7762+
elif (
7763+
self.routing_strategy == "latency-based-routing"
7764+
and self.lowestlatency_logger is not None
7765+
):
7766+
deployment = (
7767+
await self.lowestlatency_logger.async_get_available_deployments(
7768+
model_group=model,
7769+
healthy_deployments=pass_through_deployments, # type: ignore
7770+
messages=messages,
7771+
input=input,
7772+
request_kwargs=request_kwargs,
7773+
)
7774+
)
7775+
elif self.routing_strategy == "simple-shuffle":
7776+
return simple_shuffle(
7777+
llm_router_instance=self,
7778+
healthy_deployments=pass_through_deployments,
7779+
model=model,
7780+
)
7781+
elif (
7782+
self.routing_strategy == "least-busy"
7783+
and self.leastbusy_logger is not None
7784+
):
7785+
deployment = (
7786+
await self.leastbusy_logger.async_get_available_deployments(
7787+
model_group=model,
7788+
healthy_deployments=pass_through_deployments, # type: ignore
7789+
)
7790+
)
7791+
else:
7792+
deployment = None
7793+
7794+
if deployment is None:
7795+
exception = await async_raise_no_deployment_exception(
7796+
litellm_router_instance=self,
7797+
model=model,
7798+
parent_otel_span=parent_otel_span,
7799+
)
7800+
raise exception
7801+
7802+
verbose_router_logger.info(
7803+
f"async_get_available_deployment_for_pass_through model: {model}, selected deployment: {self.print_deployment(deployment)}"
7804+
)
7805+
7806+
end_time = time.perf_counter()
7807+
_duration = end_time - start_time
7808+
asyncio.create_task(
7809+
self.service_logger_obj.async_service_success_hook(
7810+
service=ServiceTypes.ROUTER,
7811+
duration=_duration,
7812+
call_type="<routing_strategy>.async_get_available_deployments",
7813+
parent_otel_span=parent_otel_span,
7814+
start_time=start_time,
7815+
end_time=end_time,
7816+
)
7817+
)
7818+
7819+
return deployment
7820+
except Exception as e:
7821+
traceback_exception = traceback.format_exc()
7822+
if request_kwargs is not None:
7823+
logging_obj = request_kwargs.get("litellm_logging_obj", None)
7824+
if logging_obj is not None:
7825+
threading.Thread(
7826+
target=logging_obj.failure_handler,
7827+
args=(e, traceback_exception),
7828+
).start()
7829+
asyncio.create_task(
7830+
logging_obj.async_failure_handler(e, traceback_exception) # type: ignore
7831+
)
7832+
raise e
7833+
76867834
async def async_pre_routing_hook(
76877835
self,
76887836
model: str,
@@ -7835,6 +7983,169 @@ def get_available_deployment(
78357983
)
78367984
return deployment
78377985

7986+
def get_available_deployment_for_pass_through(
7987+
self,
7988+
model: str,
7989+
messages: Optional[List[Dict[str, str]]] = None,
7990+
input: Optional[Union[str, List]] = None,
7991+
specific_deployment: Optional[bool] = False,
7992+
request_kwargs: Optional[Dict] = None,
7993+
):
7994+
"""
7995+
Returns deployments available for pass-through endpoints (based on load balancing strategy)
7996+
7997+
Similar to get_available_deployment, but only returns deployments with use_in_pass_through=True
7998+
7999+
Args:
8000+
model: Model name
8001+
messages: Optional list of messages
8002+
input: Optional input data
8003+
specific_deployment: Whether to find a specific deployment
8004+
request_kwargs: Optional request parameters
8005+
8006+
Returns:
8007+
Dict: Selected deployment configuration
8008+
8009+
Raises:
8010+
BadRequestError: If no deployment is configured with use_in_pass_through=True
8011+
RouterRateLimitError: If no pass-through deployments are available
8012+
"""
8013+
# 1. Perform common checks to get healthy deployments list
8014+
model, healthy_deployments = self._common_checks_available_deployment(
8015+
model=model,
8016+
messages=messages,
8017+
input=input,
8018+
specific_deployment=specific_deployment,
8019+
)
8020+
8021+
# 2. If the returned is a specific deployment (Dict), verify and return directly
8022+
if isinstance(healthy_deployments, dict):
8023+
litellm_params = healthy_deployments.get("litellm_params", {})
8024+
if litellm_params.get("use_in_pass_through"):
8025+
return healthy_deployments
8026+
else:
8027+
# Specific deployment does not support pass-through
8028+
raise litellm.BadRequestError(
8029+
message=f"Deployment {healthy_deployments.get('model_info', {}).get('id')} does not support pass-through endpoint (use_in_pass_through=False)",
8030+
model=model,
8031+
llm_provider="",
8032+
)
8033+
8034+
# 3. Filter deployments that support pass-through
8035+
pass_through_deployments = self._filter_pass_through_deployments(
8036+
healthy_deployments=healthy_deployments
8037+
)
8038+
8039+
if len(pass_through_deployments) == 0:
8040+
# No deployments support pass-through
8041+
raise litellm.BadRequestError(
8042+
message=f"Model {model} has no deployment configured with use_in_pass_through=True. Please add use_in_pass_through: true in the deployment configuration",
8043+
model=model,
8044+
llm_provider="",
8045+
)
8046+
8047+
# 4. Apply cooldown filtering
8048+
parent_otel_span: Optional[Span] = _get_parent_otel_span_from_kwargs(
8049+
request_kwargs
8050+
)
8051+
cooldown_deployments = _get_cooldown_deployments(
8052+
litellm_router_instance=self, parent_otel_span=parent_otel_span
8053+
)
8054+
pass_through_deployments = self._filter_cooldown_deployments(
8055+
healthy_deployments=pass_through_deployments,
8056+
cooldown_deployments=cooldown_deployments,
8057+
)
8058+
8059+
# 5. Apply pre-call checks (if enabled)
8060+
if self.enable_pre_call_checks and messages is not None:
8061+
pass_through_deployments = self._pre_call_checks(
8062+
model=model,
8063+
healthy_deployments=pass_through_deployments,
8064+
messages=messages,
8065+
request_kwargs=request_kwargs,
8066+
)
8067+
8068+
if len(pass_through_deployments) == 0:
8069+
model_ids = self.get_model_ids(model_name=model)
8070+
_cooldown_time = self.cooldown_cache.get_min_cooldown(
8071+
model_ids=model_ids, parent_otel_span=parent_otel_span
8072+
)
8073+
_cooldown_list = _get_cooldown_deployments(
8074+
litellm_router_instance=self, parent_otel_span=parent_otel_span
8075+
)
8076+
raise RouterRateLimitError(
8077+
model=model,
8078+
cooldown_time=_cooldown_time,
8079+
enable_pre_call_checks=self.enable_pre_call_checks,
8080+
cooldown_list=_cooldown_list,
8081+
)
8082+
8083+
# 6. Apply load balancing strategy
8084+
if self.routing_strategy == "least-busy" and self.leastbusy_logger is not None:
8085+
deployment = self.leastbusy_logger.get_available_deployments(
8086+
model_group=model, healthy_deployments=pass_through_deployments # type: ignore
8087+
)
8088+
elif self.routing_strategy == "simple-shuffle":
8089+
return simple_shuffle(
8090+
llm_router_instance=self,
8091+
healthy_deployments=pass_through_deployments,
8092+
model=model,
8093+
)
8094+
elif (
8095+
self.routing_strategy == "latency-based-routing"
8096+
and self.lowestlatency_logger is not None
8097+
):
8098+
deployment = self.lowestlatency_logger.get_available_deployments(
8099+
model_group=model,
8100+
healthy_deployments=pass_through_deployments, # type: ignore
8101+
request_kwargs=request_kwargs,
8102+
)
8103+
elif (
8104+
self.routing_strategy == "usage-based-routing"
8105+
and self.lowesttpm_logger is not None
8106+
):
8107+
deployment = self.lowesttpm_logger.get_available_deployments(
8108+
model_group=model,
8109+
healthy_deployments=pass_through_deployments, # type: ignore
8110+
messages=messages,
8111+
input=input,
8112+
)
8113+
elif (
8114+
self.routing_strategy == "usage-based-routing-v2"
8115+
and self.lowesttpm_logger_v2 is not None
8116+
):
8117+
deployment = self.lowesttpm_logger_v2.get_available_deployments(
8118+
model_group=model,
8119+
healthy_deployments=pass_through_deployments, # type: ignore
8120+
messages=messages,
8121+
input=input,
8122+
)
8123+
else:
8124+
deployment = None
8125+
8126+
if deployment is None:
8127+
verbose_router_logger.info(
8128+
f"get_available_deployment_for_pass_through model: {model}, no available deployments"
8129+
)
8130+
model_ids = self.get_model_ids(model_name=model)
8131+
_cooldown_time = self.cooldown_cache.get_min_cooldown(
8132+
model_ids=model_ids, parent_otel_span=parent_otel_span
8133+
)
8134+
_cooldown_list = _get_cooldown_deployments(
8135+
litellm_router_instance=self, parent_otel_span=parent_otel_span
8136+
)
8137+
raise RouterRateLimitError(
8138+
model=model,
8139+
cooldown_time=_cooldown_time,
8140+
enable_pre_call_checks=self.enable_pre_call_checks,
8141+
cooldown_list=_cooldown_list,
8142+
)
8143+
8144+
verbose_router_logger.info(
8145+
f"get_available_deployment_for_pass_through model: {model}, selected deployment: {self.print_deployment(deployment)}"
8146+
)
8147+
return deployment
8148+
78388149
def _filter_cooldown_deployments(
78398150
self, healthy_deployments: List[Dict], cooldown_deployments: List[str]
78408151
) -> List[Dict]:
@@ -7857,6 +8168,34 @@ def _filter_cooldown_deployments(
78578168
if deployment["model_info"]["id"] not in cooldown_set
78588169
]
78598170

8171+
def _filter_pass_through_deployments(
8172+
self, healthy_deployments: List[Dict]
8173+
) -> List[Dict]:
8174+
"""
8175+
Filter out deployments configured with use_in_pass_through=True
8176+
8177+
Args:
8178+
healthy_deployments: List of healthy deployments
8179+
8180+
Returns:
8181+
List[Dict]: Only includes a list of deployments that support pass-through
8182+
"""
8183+
verbose_router_logger.debug(
8184+
f"Filter pass-through deployments from {len(healthy_deployments)} healthy deployments"
8185+
)
8186+
8187+
pass_through_deployments = [
8188+
deployment
8189+
for deployment in healthy_deployments
8190+
if deployment.get("litellm_params", {}).get("use_in_pass_through", False)
8191+
]
8192+
8193+
verbose_router_logger.debug(
8194+
f"Found {len(pass_through_deployments)} deployments with pass-through enabled"
8195+
)
8196+
8197+
return pass_through_deployments
8198+
78608199
def _track_deployment_metrics(
78618200
self, deployment, parent_otel_span: Optional[Span], response=None
78628201
):

0 commit comments

Comments
 (0)