From 9a918bafcb9b51af4d7497c715bb71ed503672a2 Mon Sep 17 00:00:00 2001 From: Ashwin Ranade Date: Tue, 18 Nov 2025 13:58:55 -0500 Subject: [PATCH 01/11] initial commit --- .../use_cases/model_endpoint_use_cases.py | 18 +++++ .../live_model_endpoint_infra_gateway.py | 21 ++++++ .../k8s_endpoint_resource_delegate.py | 70 +++++++++++++++++++ .../services/live_model_endpoint_service.py | 63 +++++++++++++++++ 4 files changed, 172 insertions(+) diff --git a/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py b/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py index ea8466430..eeb7910cb 100644 --- a/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py +++ b/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py @@ -603,6 +603,24 @@ async def execute(self, user: User, model_endpoint_id: str) -> DeleteModelEndpoi model_endpoint_id=model_endpoint_id ) if model_endpoint is None: + # Check for orphaned K8s resources + from model_engine_server.infra.services.live_model_endpoint_service import ( + LiveModelEndpointService, + ) + + if isinstance(self.model_endpoint_service, LiveModelEndpointService): + owner = await self.model_endpoint_service._cleanup_orphaned_k8s_resources( + model_endpoint_id + ) + if owner is not None: + # Verify authorization - user must match owner (created_by from K8s labels) + # Note: For team-based auth, we'd need to look up team_id from user_id, + # but for orphan cleanup, user_id match is sufficient + if user.user_id != owner and not user.is_privileged_user: + raise ObjectNotAuthorizedException + # Resources were cleaned up successfully + return DeleteModelEndpointV1Response(deleted=True) + # No orphaned resources found raise ObjectNotFoundException if not self.authz_module.check_access_write_owned_entity(user, model_endpoint): raise ObjectNotAuthorizedException diff --git a/model-engine/model_engine_server/infra/gateways/live_model_endpoint_infra_gateway.py b/model-engine/model_engine_server/infra/gateways/live_model_endpoint_infra_gateway.py index bca30e10a..4637fd57e 100644 --- a/model-engine/model_engine_server/infra/gateways/live_model_endpoint_infra_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/live_model_endpoint_infra_gateway.py @@ -258,6 +258,27 @@ async def delete_model_endpoint_infra(self, model_endpoint_record: ModelEndpoint endpoint_type=endpoint_type, ) + async def delete_model_endpoint_infra_by_id( + self, endpoint_id: str, deployment_name: str, endpoint_type: ModelEndpointType + ) -> bool: + """ + Deletes model endpoint infrastructure when DB record doesn't exist (orphaned resources). + This method accepts minimal parameters extracted from K8s resources. + + Args: + endpoint_id: The endpoint ID + deployment_name: The deployment name (from K8s resource) + endpoint_type: The endpoint type (SYNC, STREAMING, or ASYNC) + + Returns: + True if resources were successfully deleted, False otherwise + """ + return await self.resource_gateway.delete_resources( + endpoint_id=endpoint_id, + deployment_name=deployment_name, + endpoint_type=endpoint_type, + ) + async def restart_model_endpoint_infra( self, model_endpoint_record: ModelEndpointRecord ) -> None: diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 45ab0d73e..475ce8ab6 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -1163,6 +1163,76 @@ async def _get_deployment(endpoint_id, deployment_name): raise return deployment_config + @staticmethod + async def _get_deployment_by_endpoint_id_label(endpoint_id: str) -> Optional[V1Deployment]: + """ + Gets a Deployment by querying K8s with endpoint_id label selector. + Used when DB record doesn't exist but K8s resources might (orphaned resources). + + Args: + endpoint_id: The endpoint_id to search for + + Returns: + The first deployment found with matching endpoint_id label, or None if not found + """ + apps_client = get_kubernetes_apps_client() + label_selector = f"endpoint_id={endpoint_id}" + try: + deployments = await apps_client.list_namespaced_deployment( + namespace=hmi_config.endpoint_namespace, + label_selector=label_selector, + ) + if deployments.items: + return deployments.items[0] + return None + except ApiException as e: + if e.status == 404: + return None + logger.exception(f"Error querying deployments by endpoint_id label {endpoint_id}") + raise + + @staticmethod + async def _determine_endpoint_type_from_k8s(endpoint_id: str) -> ModelEndpointType: + """ + Determines endpoint type by checking for HPA/KEDA (SYNC/STREAMING) vs ASYNC. + Defaults to STREAMING if unable to determine (common for MCPx endpoints). + + Args: + endpoint_id: The endpoint_id to check + + Returns: + The determined ModelEndpointType + """ + k8s_resource_group_name = _endpoint_id_to_k8s_resource_group_name(endpoint_id) + autoscaling_client = get_kubernetes_autoscaling_client() + custom_objects_client = get_kubernetes_custom_objects_client() + + # Check for HPA (indicates SYNC/STREAMING) + try: + await autoscaling_client.read_namespaced_horizontal_pod_autoscaler( + k8s_resource_group_name, hmi_config.endpoint_namespace + ) + return ModelEndpointType.STREAMING # Default to STREAMING for MCPx + except ApiException: + pass + + # Check for KEDA ScaledObject (indicates SYNC/STREAMING) + try: + await custom_objects_client.get_namespaced_custom_object( + group="keda.sh", + version="v1alpha1", + namespace=hmi_config.endpoint_namespace, + plural="scaledobjects", + name=k8s_resource_group_name, + ) + return ModelEndpointType.STREAMING # Default to STREAMING for MCPx + except ApiException: + pass + + # If no HPA/KEDA found, likely ASYNC + # But MCPx uses STREAMING, so default to that + return ModelEndpointType.STREAMING + @staticmethod async def _get_all_config_maps() -> ( List[kubernetes_asyncio.client.models.v1_config_map.V1ConfigMap] diff --git a/model-engine/model_engine_server/infra/services/live_model_endpoint_service.py b/model-engine/model_engine_server/infra/services/live_model_endpoint_service.py index 6c28f4990..c04e9863b 100644 --- a/model-engine/model_engine_server/infra/services/live_model_endpoint_service.py +++ b/model-engine/model_engine_server/infra/services/live_model_endpoint_service.py @@ -33,6 +33,9 @@ from model_engine_server.domain.services import ModelEndpointService from model_engine_server.domain.use_cases.model_endpoint_use_cases import MODEL_BUNDLE_CHANGED_KEY from model_engine_server.infra.gateways import ModelEndpointInfraGateway +from model_engine_server.infra.gateways.resources.k8s_endpoint_resource_delegate import ( + K8SEndpointResourceDelegate, +) from model_engine_server.infra.repositories import ModelEndpointCacheRepository from model_engine_server.infra.repositories.model_endpoint_record_repository import ( ModelEndpointRecordRepository, @@ -407,6 +410,66 @@ async def delete_model_endpoint(self, model_endpoint_id: str) -> None: logger.info(f"Endpoint delete released lock for {created_by}, {name}") + async def _cleanup_orphaned_k8s_resources(self, endpoint_id: str) -> Optional[str]: + """ + Cleans up orphaned K8s resources when DB record doesn't exist. + Returns the owner (created_by) from K8s labels if resources were found, None otherwise. + + Args: + endpoint_id: The endpoint_id to check for orphaned resources + + Returns: + The owner (created_by) from K8s labels if resources found, None otherwise + """ + try: + deployment = await K8SEndpointResourceDelegate._get_deployment_by_endpoint_id_label( + endpoint_id + ) + if deployment is None: + return None + + # Extract owner and deployment name from K8s labels + labels = deployment.metadata.labels or {} + owner = labels.get("created_by") or labels.get("user_id") + deployment_name = deployment.metadata.name + + if not owner: + logger.warning( + f"Found orphaned K8s resources for endpoint_id {endpoint_id} but no owner label" + ) + return None + + # Determine endpoint type + endpoint_type = await K8SEndpointResourceDelegate._determine_endpoint_type_from_k8s( + endpoint_id + ) + + # Clean up resources + logger.info( + f"Cleaning up orphaned K8s resources for endpoint_id {endpoint_id}, " + f"deployment_name {deployment_name}, owner {owner}" + ) + deleted = await self.model_endpoint_infra_gateway.delete_model_endpoint_infra_by_id( + endpoint_id=endpoint_id, + deployment_name=deployment_name, + endpoint_type=endpoint_type, + ) + if deleted: + logger.info( + f"Successfully cleaned up orphaned K8s resources for endpoint_id {endpoint_id}" + ) + else: + logger.warning( + f"Failed to clean up some orphaned K8s resources for endpoint_id {endpoint_id}" + ) + + return owner + except Exception as e: + logger.exception( + f"Error cleaning up orphaned K8s resources for endpoint_id {endpoint_id}: {e}" + ) + return None + async def restart_model_endpoint(self, model_endpoint_id: str) -> None: record = await self.model_endpoint_record_repository.get_model_endpoint_record( model_endpoint_id=model_endpoint_id From 841b6c191ac344cd574bcd73db11a464105fb162 Mon Sep 17 00:00:00 2001 From: Ashwin Ranade Date: Thu, 18 Dec 2025 18:00:36 -0500 Subject: [PATCH 02/11] fix: Add timeout configuration for MCP servers - Add 5-minute Istio VirtualService timeout for MCP servers - Add 10-minute aiohttp ClientSession timeout for MCP passthrough forwarders - Fixes 30-second timeout errors causing MCP request failures MCP servers often have long-running operations that exceed the default 30-second Istio timeout. This change: 1. Detects MCP servers by checking if forwarder_type is 'passthrough' and routes contain '/mcp' 2. Sets Istio VirtualService timeout to 5 minutes for MCP servers 3. Sets aiohttp ClientSession timeout to 10 minutes for MCP passthrough forwarders 4. Keeps default timeouts for non-MCP endpoints --- .../service_template_config_map.yaml | 1 + .../inference/forwarding/forwarding.py | 23 +++++++++++++++++-- .../gateways/resources/k8s_resource_types.py | 22 ++++++++++++++++++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/charts/model-engine/templates/service_template_config_map.yaml b/charts/model-engine/templates/service_template_config_map.yaml index 311553813..4a29bb371 100644 --- a/charts/model-engine/templates/service_template_config_map.yaml +++ b/charts/model-engine/templates/service_template_config_map.yaml @@ -938,6 +938,7 @@ data: host: "${RESOURCE_NAME}.${NAMESPACE}.svc.cluster.local" port: number: 80 + ${MCP_TIMEOUT} {{- end }} {{- if .Values.destinationrule.enabled }} destination-rule.yaml: |- diff --git a/model-engine/model_engine_server/inference/forwarding/forwarding.py b/model-engine/model_engine_server/inference/forwarding/forwarding.py index 5183955b9..b1753dc50 100644 --- a/model-engine/model_engine_server/inference/forwarding/forwarding.py +++ b/model-engine/model_engine_server/inference/forwarding/forwarding.py @@ -630,6 +630,23 @@ def endpoint(route: str) -> str: @dataclass class PassthroughForwarder(ModelEngineSerializationMixin): passthrough_endpoint: str + + # Default timeout: 5 minutes (matching aiohttp default) + # MCP servers need longer timeout: 10 minutes to handle long-running operations + DEFAULT_TIMEOUT_SECONDS = 5 * 60 + MCP_TIMEOUT_SECONDS = 10 * 60 + + def _is_mcp_server(self) -> bool: + """Detect if this is an MCP server by checking if endpoint contains /mcp""" + return "/mcp" in self.passthrough_endpoint.lower() + + def _get_timeout(self) -> aiohttp.ClientTimeout: + """Get appropriate timeout based on server type""" + timeout_seconds = ( + self.MCP_TIMEOUT_SECONDS if self._is_mcp_server() + else self.DEFAULT_TIMEOUT_SECONDS + ) + return aiohttp.ClientTimeout(total=timeout_seconds) async def _make_request( self, request: Any, aioclient: aiohttp.ClientSession @@ -656,7 +673,8 @@ async def _make_request( ) async def forward_stream(self, request: Any): - async with aiohttp.ClientSession() as aioclient: + timeout = self._get_timeout() + async with aiohttp.ClientSession(timeout=timeout) as aioclient: response = await self._make_request(request, aioclient) response_headers = response.headers yield (response_headers, response.status) @@ -670,7 +688,8 @@ async def forward_stream(self, request: Any): yield await response.read() async def forward_sync(self, request: Any): - async with aiohttp.ClientSession() as aioclient: + timeout = self._get_timeout() + async with aiohttp.ClientSession(timeout=timeout) as aioclient: response = await self._make_request(request, aioclient) return response diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 6f0d3133c..4bc23d3df 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -382,6 +382,7 @@ class VirtualServiceArguments(_BaseEndpointArguments): """Keyword-arguments for substituting into virtual-service templates.""" DNS_HOST_DOMAIN: str + MCP_TIMEOUT: str # Timeout for MCP servers (e.g., "300s" for 5 minutes, "" for default) class LwsServiceEntryArguments(_BaseEndpointArguments): @@ -1361,6 +1362,26 @@ def get_endpoint_resource_arguments_from_request( SERVICE_NAME_OVERRIDE=service_name_override, ) elif endpoint_resource_name == "virtual-service": + # Detect MCP servers by checking if they use passthrough forwarder and have /mcp routes + # MCP servers need longer timeout (5 minutes) to handle long-running operations + is_mcp_server = False + if isinstance(flavor, RunnableImageLike): + # Check if forwarder type is passthrough (MCP servers use passthrough) + if flavor.forwarder_type == "passthrough": + # Check if any routes contain /mcp + all_routes = [] + if flavor.predict_route: + all_routes.append(flavor.predict_route) + if flavor.routes: + all_routes.extend(flavor.routes) + if flavor.extra_routes: + all_routes.extend(flavor.extra_routes) + # MCP servers have routes containing /mcp + is_mcp_server = any("/mcp" in route.lower() for route in all_routes) + + # Format timeout as YAML: "timeout: 300s" for MCP servers, empty string for others + mcp_timeout = "timeout: 300s" if is_mcp_server else "" + return VirtualServiceArguments( # Base resource arguments RESOURCE_NAME=k8s_resource_group_name, @@ -1373,6 +1394,7 @@ def get_endpoint_resource_arguments_from_request( OWNER=owner, GIT_TAG=GIT_TAG, DNS_HOST_DOMAIN=infra_config().dns_host_domain, + MCP_TIMEOUT=mcp_timeout, ) elif endpoint_resource_name == "destination-rule": return DestinationRuleArguments( From 35b20514e65b32214388a476f1f5fab769336f57 Mon Sep 17 00:00:00 2001 From: Ashwin Ranade Date: Fri, 19 Dec 2025 12:16:15 -0500 Subject: [PATCH 03/11] Remove orphaned K8s resource cleanup code Revert orphaned resource cleanup logic that was added for handling cases where DB records don't exist but K8s resources do. This simplifies the delete endpoint logic to just raise ObjectNotFoundException when the endpoint doesn't exist in the DB. --- .../use_cases/model_endpoint_use_cases.py | 18 ------ .../k8s_endpoint_resource_delegate.py | 28 --------- .../services/live_model_endpoint_service.py | 60 ------------------- 3 files changed, 106 deletions(-) diff --git a/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py b/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py index eeb7910cb..ea8466430 100644 --- a/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py +++ b/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py @@ -603,24 +603,6 @@ async def execute(self, user: User, model_endpoint_id: str) -> DeleteModelEndpoi model_endpoint_id=model_endpoint_id ) if model_endpoint is None: - # Check for orphaned K8s resources - from model_engine_server.infra.services.live_model_endpoint_service import ( - LiveModelEndpointService, - ) - - if isinstance(self.model_endpoint_service, LiveModelEndpointService): - owner = await self.model_endpoint_service._cleanup_orphaned_k8s_resources( - model_endpoint_id - ) - if owner is not None: - # Verify authorization - user must match owner (created_by from K8s labels) - # Note: For team-based auth, we'd need to look up team_id from user_id, - # but for orphan cleanup, user_id match is sufficient - if user.user_id != owner and not user.is_privileged_user: - raise ObjectNotAuthorizedException - # Resources were cleaned up successfully - return DeleteModelEndpointV1Response(deleted=True) - # No orphaned resources found raise ObjectNotFoundException if not self.authz_module.check_access_write_owned_entity(user, model_endpoint): raise ObjectNotAuthorizedException diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 475ce8ab6..6cf53e612 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -1163,34 +1163,6 @@ async def _get_deployment(endpoint_id, deployment_name): raise return deployment_config - @staticmethod - async def _get_deployment_by_endpoint_id_label(endpoint_id: str) -> Optional[V1Deployment]: - """ - Gets a Deployment by querying K8s with endpoint_id label selector. - Used when DB record doesn't exist but K8s resources might (orphaned resources). - - Args: - endpoint_id: The endpoint_id to search for - - Returns: - The first deployment found with matching endpoint_id label, or None if not found - """ - apps_client = get_kubernetes_apps_client() - label_selector = f"endpoint_id={endpoint_id}" - try: - deployments = await apps_client.list_namespaced_deployment( - namespace=hmi_config.endpoint_namespace, - label_selector=label_selector, - ) - if deployments.items: - return deployments.items[0] - return None - except ApiException as e: - if e.status == 404: - return None - logger.exception(f"Error querying deployments by endpoint_id label {endpoint_id}") - raise - @staticmethod async def _determine_endpoint_type_from_k8s(endpoint_id: str) -> ModelEndpointType: """ diff --git a/model-engine/model_engine_server/infra/services/live_model_endpoint_service.py b/model-engine/model_engine_server/infra/services/live_model_endpoint_service.py index c04e9863b..12c026738 100644 --- a/model-engine/model_engine_server/infra/services/live_model_endpoint_service.py +++ b/model-engine/model_engine_server/infra/services/live_model_endpoint_service.py @@ -410,66 +410,6 @@ async def delete_model_endpoint(self, model_endpoint_id: str) -> None: logger.info(f"Endpoint delete released lock for {created_by}, {name}") - async def _cleanup_orphaned_k8s_resources(self, endpoint_id: str) -> Optional[str]: - """ - Cleans up orphaned K8s resources when DB record doesn't exist. - Returns the owner (created_by) from K8s labels if resources were found, None otherwise. - - Args: - endpoint_id: The endpoint_id to check for orphaned resources - - Returns: - The owner (created_by) from K8s labels if resources found, None otherwise - """ - try: - deployment = await K8SEndpointResourceDelegate._get_deployment_by_endpoint_id_label( - endpoint_id - ) - if deployment is None: - return None - - # Extract owner and deployment name from K8s labels - labels = deployment.metadata.labels or {} - owner = labels.get("created_by") or labels.get("user_id") - deployment_name = deployment.metadata.name - - if not owner: - logger.warning( - f"Found orphaned K8s resources for endpoint_id {endpoint_id} but no owner label" - ) - return None - - # Determine endpoint type - endpoint_type = await K8SEndpointResourceDelegate._determine_endpoint_type_from_k8s( - endpoint_id - ) - - # Clean up resources - logger.info( - f"Cleaning up orphaned K8s resources for endpoint_id {endpoint_id}, " - f"deployment_name {deployment_name}, owner {owner}" - ) - deleted = await self.model_endpoint_infra_gateway.delete_model_endpoint_infra_by_id( - endpoint_id=endpoint_id, - deployment_name=deployment_name, - endpoint_type=endpoint_type, - ) - if deleted: - logger.info( - f"Successfully cleaned up orphaned K8s resources for endpoint_id {endpoint_id}" - ) - else: - logger.warning( - f"Failed to clean up some orphaned K8s resources for endpoint_id {endpoint_id}" - ) - - return owner - except Exception as e: - logger.exception( - f"Error cleaning up orphaned K8s resources for endpoint_id {endpoint_id}: {e}" - ) - return None - async def restart_model_endpoint(self, model_endpoint_id: str) -> None: record = await self.model_endpoint_record_repository.get_model_endpoint_record( model_endpoint_id=model_endpoint_id From 0ed67b8b081a0adf45313bad93e313dd5e5fd752 Mon Sep 17 00:00:00 2001 From: Ashwin Ranade Date: Fri, 19 Dec 2025 12:19:40 -0500 Subject: [PATCH 04/11] Revert aiohttp timeout changes - only fix Istio VirtualService timeout --- .../inference/forwarding/forwarding.py | 23 ++----------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/model-engine/model_engine_server/inference/forwarding/forwarding.py b/model-engine/model_engine_server/inference/forwarding/forwarding.py index b1753dc50..5183955b9 100644 --- a/model-engine/model_engine_server/inference/forwarding/forwarding.py +++ b/model-engine/model_engine_server/inference/forwarding/forwarding.py @@ -630,23 +630,6 @@ def endpoint(route: str) -> str: @dataclass class PassthroughForwarder(ModelEngineSerializationMixin): passthrough_endpoint: str - - # Default timeout: 5 minutes (matching aiohttp default) - # MCP servers need longer timeout: 10 minutes to handle long-running operations - DEFAULT_TIMEOUT_SECONDS = 5 * 60 - MCP_TIMEOUT_SECONDS = 10 * 60 - - def _is_mcp_server(self) -> bool: - """Detect if this is an MCP server by checking if endpoint contains /mcp""" - return "/mcp" in self.passthrough_endpoint.lower() - - def _get_timeout(self) -> aiohttp.ClientTimeout: - """Get appropriate timeout based on server type""" - timeout_seconds = ( - self.MCP_TIMEOUT_SECONDS if self._is_mcp_server() - else self.DEFAULT_TIMEOUT_SECONDS - ) - return aiohttp.ClientTimeout(total=timeout_seconds) async def _make_request( self, request: Any, aioclient: aiohttp.ClientSession @@ -673,8 +656,7 @@ async def _make_request( ) async def forward_stream(self, request: Any): - timeout = self._get_timeout() - async with aiohttp.ClientSession(timeout=timeout) as aioclient: + async with aiohttp.ClientSession() as aioclient: response = await self._make_request(request, aioclient) response_headers = response.headers yield (response_headers, response.status) @@ -688,8 +670,7 @@ async def forward_stream(self, request: Any): yield await response.read() async def forward_sync(self, request: Any): - timeout = self._get_timeout() - async with aiohttp.ClientSession(timeout=timeout) as aioclient: + async with aiohttp.ClientSession() as aioclient: response = await self._make_request(request, aioclient) return response From 13739e61d5e4a17c35691cfab1a149aeb1493b43 Mon Sep 17 00:00:00 2001 From: Ashwin Ranade Date: Fri, 19 Dec 2025 12:22:13 -0500 Subject: [PATCH 05/11] Simplify timeout logic - just check passthrough forwarder type --- .../gateways/resources/k8s_resource_types.py | 27 +++++-------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 4bc23d3df..0c8e6dca2 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -382,7 +382,7 @@ class VirtualServiceArguments(_BaseEndpointArguments): """Keyword-arguments for substituting into virtual-service templates.""" DNS_HOST_DOMAIN: str - MCP_TIMEOUT: str # Timeout for MCP servers (e.g., "300s" for 5 minutes, "" for default) + MCP_TIMEOUT: str # Timeout for passthrough forwarders (e.g., "300s" for 5 minutes, "" for default) class LwsServiceEntryArguments(_BaseEndpointArguments): @@ -1362,25 +1362,10 @@ def get_endpoint_resource_arguments_from_request( SERVICE_NAME_OVERRIDE=service_name_override, ) elif endpoint_resource_name == "virtual-service": - # Detect MCP servers by checking if they use passthrough forwarder and have /mcp routes - # MCP servers need longer timeout (5 minutes) to handle long-running operations - is_mcp_server = False - if isinstance(flavor, RunnableImageLike): - # Check if forwarder type is passthrough (MCP servers use passthrough) - if flavor.forwarder_type == "passthrough": - # Check if any routes contain /mcp - all_routes = [] - if flavor.predict_route: - all_routes.append(flavor.predict_route) - if flavor.routes: - all_routes.extend(flavor.routes) - if flavor.extra_routes: - all_routes.extend(flavor.extra_routes) - # MCP servers have routes containing /mcp - is_mcp_server = any("/mcp" in route.lower() for route in all_routes) - - # Format timeout as YAML: "timeout: 300s" for MCP servers, empty string for others - mcp_timeout = "timeout: 300s" if is_mcp_server else "" + # Set 5-minute timeout for passthrough forwarders (used by MCP servers) + # to fix 30-second default timeout issue + is_passthrough = isinstance(flavor, RunnableImageLike) and flavor.forwarder_type == "passthrough" + timeout = "timeout: 300s" if is_passthrough else "" return VirtualServiceArguments( # Base resource arguments @@ -1394,7 +1379,7 @@ def get_endpoint_resource_arguments_from_request( OWNER=owner, GIT_TAG=GIT_TAG, DNS_HOST_DOMAIN=infra_config().dns_host_domain, - MCP_TIMEOUT=mcp_timeout, + MCP_TIMEOUT=timeout, ) elif endpoint_resource_name == "destination-rule": return DestinationRuleArguments( From e28f72cdba67fe05d3e054f3bc4e1ccc477c88c2 Mon Sep 17 00:00:00 2001 From: Ashwin Ranade Date: Fri, 19 Dec 2025 12:25:35 -0500 Subject: [PATCH 06/11] Remove unused orphaned cleanup methods - not needed for timeout fix --- .../live_model_endpoint_infra_gateway.py | 21 ---------- .../k8s_endpoint_resource_delegate.py | 42 ------------------- 2 files changed, 63 deletions(-) diff --git a/model-engine/model_engine_server/infra/gateways/live_model_endpoint_infra_gateway.py b/model-engine/model_engine_server/infra/gateways/live_model_endpoint_infra_gateway.py index 4637fd57e..bca30e10a 100644 --- a/model-engine/model_engine_server/infra/gateways/live_model_endpoint_infra_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/live_model_endpoint_infra_gateway.py @@ -258,27 +258,6 @@ async def delete_model_endpoint_infra(self, model_endpoint_record: ModelEndpoint endpoint_type=endpoint_type, ) - async def delete_model_endpoint_infra_by_id( - self, endpoint_id: str, deployment_name: str, endpoint_type: ModelEndpointType - ) -> bool: - """ - Deletes model endpoint infrastructure when DB record doesn't exist (orphaned resources). - This method accepts minimal parameters extracted from K8s resources. - - Args: - endpoint_id: The endpoint ID - deployment_name: The deployment name (from K8s resource) - endpoint_type: The endpoint type (SYNC, STREAMING, or ASYNC) - - Returns: - True if resources were successfully deleted, False otherwise - """ - return await self.resource_gateway.delete_resources( - endpoint_id=endpoint_id, - deployment_name=deployment_name, - endpoint_type=endpoint_type, - ) - async def restart_model_endpoint_infra( self, model_endpoint_record: ModelEndpointRecord ) -> None: diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 6cf53e612..45ab0d73e 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -1163,48 +1163,6 @@ async def _get_deployment(endpoint_id, deployment_name): raise return deployment_config - @staticmethod - async def _determine_endpoint_type_from_k8s(endpoint_id: str) -> ModelEndpointType: - """ - Determines endpoint type by checking for HPA/KEDA (SYNC/STREAMING) vs ASYNC. - Defaults to STREAMING if unable to determine (common for MCPx endpoints). - - Args: - endpoint_id: The endpoint_id to check - - Returns: - The determined ModelEndpointType - """ - k8s_resource_group_name = _endpoint_id_to_k8s_resource_group_name(endpoint_id) - autoscaling_client = get_kubernetes_autoscaling_client() - custom_objects_client = get_kubernetes_custom_objects_client() - - # Check for HPA (indicates SYNC/STREAMING) - try: - await autoscaling_client.read_namespaced_horizontal_pod_autoscaler( - k8s_resource_group_name, hmi_config.endpoint_namespace - ) - return ModelEndpointType.STREAMING # Default to STREAMING for MCPx - except ApiException: - pass - - # Check for KEDA ScaledObject (indicates SYNC/STREAMING) - try: - await custom_objects_client.get_namespaced_custom_object( - group="keda.sh", - version="v1alpha1", - namespace=hmi_config.endpoint_namespace, - plural="scaledobjects", - name=k8s_resource_group_name, - ) - return ModelEndpointType.STREAMING # Default to STREAMING for MCPx - except ApiException: - pass - - # If no HPA/KEDA found, likely ASYNC - # But MCPx uses STREAMING, so default to that - return ModelEndpointType.STREAMING - @staticmethod async def _get_all_config_maps() -> ( List[kubernetes_asyncio.client.models.v1_config_map.V1ConfigMap] From 59295c73577dbd614d9140fa667241e241c87940 Mon Sep 17 00:00:00 2001 From: Ashwin Ranade Date: Fri, 19 Dec 2025 12:26:08 -0500 Subject: [PATCH 07/11] Add back /mcp route check - passthrough alone is not sufficient --- .../gateways/resources/k8s_resource_types.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 0c8e6dca2..204b9d1a0 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -1362,10 +1362,19 @@ def get_endpoint_resource_arguments_from_request( SERVICE_NAME_OVERRIDE=service_name_override, ) elif endpoint_resource_name == "virtual-service": - # Set 5-minute timeout for passthrough forwarders (used by MCP servers) - # to fix 30-second default timeout issue - is_passthrough = isinstance(flavor, RunnableImageLike) and flavor.forwarder_type == "passthrough" - timeout = "timeout: 300s" if is_passthrough else "" + # Set 5-minute timeout for MCP servers to fix 30-second default timeout issue + # MCP servers use passthrough forwarder and have routes containing /mcp + is_mcp_server = False + if isinstance(flavor, RunnableImageLike) and flavor.forwarder_type == "passthrough": + all_routes = [] + if flavor.predict_route: + all_routes.append(flavor.predict_route) + if flavor.routes: + all_routes.extend(flavor.routes) + if flavor.extra_routes: + all_routes.extend(flavor.extra_routes) + is_mcp_server = any("/mcp" in route.lower() for route in all_routes) + timeout = "timeout: 300s" if is_mcp_server else "" return VirtualServiceArguments( # Base resource arguments From 2b39d41d378a7aea096287b992ff656f28a7e056 Mon Sep 17 00:00:00 2001 From: Ashwin Ranade Date: Fri, 19 Dec 2025 12:33:17 -0500 Subject: [PATCH 08/11] Update comment: clarify that empty string defaults to 30 seconds --- .../infra/gateways/resources/k8s_resource_types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 204b9d1a0..447a8f127 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -382,7 +382,7 @@ class VirtualServiceArguments(_BaseEndpointArguments): """Keyword-arguments for substituting into virtual-service templates.""" DNS_HOST_DOMAIN: str - MCP_TIMEOUT: str # Timeout for passthrough forwarders (e.g., "300s" for 5 minutes, "" for default) + MCP_TIMEOUT: str # Timeout for passthrough forwarders (e.g., "300s" for 5 minutes, "" defaults to 30 seconds) class LwsServiceEntryArguments(_BaseEndpointArguments): From 6e01fb994a2e3b599d9639c874285f7fd1018cf6 Mon Sep 17 00:00:00 2001 From: Ashwin Ranade Date: Fri, 19 Dec 2025 12:34:10 -0500 Subject: [PATCH 09/11] Update comment format --- .../infra/gateways/resources/k8s_resource_types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 447a8f127..354de3d20 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -382,7 +382,7 @@ class VirtualServiceArguments(_BaseEndpointArguments): """Keyword-arguments for substituting into virtual-service templates.""" DNS_HOST_DOMAIN: str - MCP_TIMEOUT: str # Timeout for passthrough forwarders (e.g., "300s" for 5 minutes, "" defaults to 30 seconds) + MCP_TIMEOUT: str # Timeout for passthrough forwarders (e.g., "300s" for 5 minutes). "" (Default) is 30 seconds class LwsServiceEntryArguments(_BaseEndpointArguments): From 7769c54b9972f37ebf59910e6b08630fc128db35 Mon Sep 17 00:00:00 2001 From: Ashwin Ranade Date: Fri, 19 Dec 2025 12:34:36 -0500 Subject: [PATCH 10/11] Simplify comment to just state default timeout --- .../infra/gateways/resources/k8s_resource_types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 354de3d20..a5fcf886d 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -382,7 +382,7 @@ class VirtualServiceArguments(_BaseEndpointArguments): """Keyword-arguments for substituting into virtual-service templates.""" DNS_HOST_DOMAIN: str - MCP_TIMEOUT: str # Timeout for passthrough forwarders (e.g., "300s" for 5 minutes). "" (Default) is 30 seconds + MCP_TIMEOUT: str # "" (Default) is 30 seconds class LwsServiceEntryArguments(_BaseEndpointArguments): From 8b436c07581ecd3095542e4482ae595d16644081 Mon Sep 17 00:00:00 2001 From: Ashwin Ranade Date: Fri, 19 Dec 2025 13:57:23 -0500 Subject: [PATCH 11/11] Fix black formatting - split long line --- .../infra/gateways/resources/k8s_resource_types.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 11e2e7077..736161ef3 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -1365,7 +1365,10 @@ def get_endpoint_resource_arguments_from_request( # Set 5-minute timeout for MCP servers to fix 30-second default timeout issue # MCP servers use passthrough forwarder and have routes containing /mcp is_mcp_server = False - if isinstance(flavor, RunnableImageLike) and flavor.forwarder_type == "passthrough": + if ( + isinstance(flavor, RunnableImageLike) + and flavor.forwarder_type == "passthrough" + ): all_routes = [] if flavor.predict_route: all_routes.append(flavor.predict_route)