From 00aee6339e523773eb574a46775478c4c3d4f375 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Thu, 18 Dec 2025 00:05:13 +0000 Subject: [PATCH 1/4] test: promote lifecycle logic to shared utils Refactor the integration test utilities to provide a standardized way to start hermetic gRPC servers, and migrate the BBR harness to use it. - Add `StartExtProcServer`: A helper for background server management with strict TCP readiness checks and 127.0.0.1 binding. - Update `GetFreePort`: Return `int` instead of `*net.TCPAddr` for easier consumption. - Refactor `BBRHarness`: Remove manual server startup boilerplate in favor of the new shared helper, reducing code duplication. --- test/integration/bbr/harness_test.go | 63 ++++------------------ test/integration/util.go | 81 +++++++++++++++++++++++++--- 2 files changed, 86 insertions(+), 58 deletions(-) diff --git a/test/integration/bbr/harness_test.go b/test/integration/bbr/harness_test.go index b55d9e345..bd47b0870 100644 --- a/test/integration/bbr/harness_test.go +++ b/test/integration/bbr/harness_test.go @@ -18,16 +18,11 @@ package bbr import ( "context" - "net" - "strconv" - "strings" "testing" - "time" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/stretchr/testify/require" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/server" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" @@ -53,10 +48,8 @@ func NewBBRHarness(t *testing.T, ctx context.Context, streaming bool) *BBRHarnes t.Helper() // 1. Allocate Free Port - tcpAddr, err := integration.GetFreePort() + port, err := integration.GetFreePort() require.NoError(t, err, "failed to acquire free port for BBR server") - port := tcpAddr.Port - serverAddr := net.JoinHostPort(tcpAddr.IP.String(), strconv.Itoa(tcpAddr.Port)) // 2. Configure BBR Server // BBR is simpler than EPP; it doesn't need a K8s Manager. @@ -67,58 +60,24 @@ func NewBBRHarness(t *testing.T, ctx context.Context, streaming bool) *BBRHarnes // 3. Start Server in Background serverCtx, serverCancel := context.WithCancel(ctx) - // Channel to signal if the server dies immediately (e.g., port binding error) - serverErrChan := make(chan error, 1) - - go func() { - logger.Info("Starting BBR server", "address", serverAddr, "streaming", streaming) - if err := runner.AsRunnable(logger.WithName("bbr-server")).Start(serverCtx); err != nil { - if !strings.Contains(err.Error(), "context canceled") { - logger.Error(err, "BBR server stopped unexpectedly") - select { - case serverErrChan <- err: - default: - } - } - } - }() - - // 4. Wait for Server Readiness - // We must poll the port until the server successfully binds and listens. - require.Eventually(t, func() bool { - // Check for premature crash. - select { - case err := <-serverErrChan: - t.Fatalf("Server failed to start: %v", err) - default: - } - - // Check for TCP readiness. - conn, err := net.DialTimeout("tcp", serverAddr, 100*time.Millisecond) - if err != nil { - return false - } - conn.Close() - return true - }, 5*time.Second, 50*time.Millisecond, "BBR Server failed to bind port %s", serverAddr) - - // 5. Connect Client - // Blocking dial ensures the server is reachable before the test logic begins. - conn, err := grpc.NewClient(serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(t, err, "failed to create grpc connection to BBR server") - - extProcClient, err := extProcPb.NewExternalProcessorClient(conn).Process(ctx) - require.NoError(t, err, "failed to initialize ext_proc stream client") + runnable := runner.AsRunnable(logger.WithName("bbr-server")).Start + client, conn := integration.StartExtProcServer( + t, + serverCtx, + runnable, + port, + logger, + ) h := &BBRHarness{ t: t, ctx: ctx, - Client: extProcClient, + Client: client, server: runner, grpcConn: conn, } - // 6. Register Cleanup + // 4. Register Cleanup t.Cleanup(func() { logger.Info("Tearing down BBR server", "port", port) serverCancel() diff --git a/test/integration/util.go b/test/integration/util.go index 8b17f38e4..a4f170624 100644 --- a/test/integration/util.go +++ b/test/integration/util.go @@ -30,6 +30,7 @@ import ( "io" "net" "strconv" + "strings" "testing" "time" @@ -37,13 +38,16 @@ import ( extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/go-logr/logr" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/structpb" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request" ) -// --- Request Builders (High-Level DSL) --- +// --- Request Builders (Protocol Level) --- // ReqLLM creates a sequence of gRPC messages representing a standard, streamed LLM inference request. // It generates: @@ -205,7 +209,7 @@ func GenerateRequestMetadata(filterMetadata []string) map[string]*structpb.Struc return requestMetadata } -// --- Response Builders --- +// --- Response Builders (Protocol Level) --- // NewRequestBufferedResponse creates a complete set of responses for the Request phase. // It simulates the EPP deciding to: @@ -433,17 +437,82 @@ func StreamedRequest( // --- System Utilities --- +// StartExtProcServer handles the lifecycle of starting a gRPC server in the background and connecting to it. +// It guarantees that the server is listening on the specified port before returning. +// +// serverRunner: A function that blocks until the server exits (e.g. Runnable.Start). +// port: The port the server is configured to listen on. +func StartExtProcServer( + t *testing.T, + ctx context.Context, + serverRunner func(context.Context) error, + port int, + logger logr.Logger, +) (extProcPb.ExternalProcessor_ProcessClient, *grpc.ClientConn) { + t.Helper() + + // Force IPv4 to match GetFreePort's binding and avoid IPv6 race conditions in CI. + serverAddr := fmt.Sprintf("127.0.0.1:%d", port) + + // Channel to signal if the server dies immediately (e.g., port binding error/panic) + serverErrChan := make(chan error, 1) + + // Start server in background. + go func() { + logger.Info("Starting ExtProc server", "address", serverAddr) + if err := serverRunner(ctx); err != nil { + // Ignore expected cancellations during teardown. + if !strings.Contains(err.Error(), "context canceled") { + logger.Error(err, "Server stopped unexpectedly") + select { + case serverErrChan <- err: + default: + } + } + } + }() + + // Wait for TCP readiness. + // We must poll the port until the server successfully binds and listens. + require.Eventually(t, func() bool { + // Fast-fail if the server crashed immediately. + select { + case err := <-serverErrChan: + t.Fatalf("Server failed to start: %v", err) + default: + } + + // Check if the port is open. + conn, err := net.DialTimeout("tcp", serverAddr, 50*time.Millisecond) + if err != nil { + return false + } + conn.Close() + return true + }, 5*time.Second, 50*time.Millisecond, "Server failed to bind port %s", serverAddr) + + // Connect client. + // Blocking dial is safe because we know the port is open. + conn, err := grpc.NewClient(serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err, "failed to create grpc connection") + + extProcClient, err := extProcPb.NewExternalProcessorClient(conn).Process(ctx) + require.NoError(t, err, "failed to initialize ext_proc stream client") + + return extProcClient, conn +} + // GetFreePort finds an available IPv4 TCP port on localhost. // It works by asking the OS to allocate a port by listening on port 0, capturing the assigned address, and then // immediately closing the listener. // // Note: There is a theoretical race condition where another process grabs the port between the Close() call and the // subsequent usage, but this is generally acceptable in hermetic test environments. -func GetFreePort() (*net.TCPAddr, error) { +func GetFreePort() (int, error) { // Force IPv4 to prevent flakes on dual-stack CI environments listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { - return nil, fmt.Errorf("failed to listen on a free port: %w", err) + return 0, fmt.Errorf("failed to listen on a free port: %w", err) } // Critical: Close the listener immediately so the caller can bind to it. @@ -451,9 +520,9 @@ func GetFreePort() (*net.TCPAddr, error) { addr, ok := listener.Addr().(*net.TCPAddr) if !ok { - return nil, errors.New("failed to cast listener address to TCPAddr") + return 0, errors.New("failed to cast listener address to TCPAddr") } - return addr, nil + return addr.Port, nil } // --- Internal Helpers --- From 12e658d92df123541241d1507ef37543727a54db Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Thu, 18 Dec 2025 00:06:07 +0000 Subject: [PATCH 2/4] test/epp: introduce isolated test harness and DSL Add a new testing infrastructure for the Endpoint Picker (EPP) to enable hermetic, namespace-isolated testing. - Add `TestHarness`: Manages a namespace-scoped Controller Manager and server instance per test case. This ensures resources from one test do not pollute the scheduler state of another. - Add DSL: Introduce intent-based builders (e.g., `ReqSubset`, `P`) and assertions (e.g., `ExpectRouteTo`) to decouple test logic from gRPC boilerplate. - Support deterministic synchronization by waiting for Prometheus metrics to settle before generating traffic. --- test/integration/epp/harness_test.go | 332 +++++++++++++++++++++++++++ test/integration/epp/util_test.go | 171 ++++++++++++++ 2 files changed, 503 insertions(+) create mode 100644 test/integration/epp/harness_test.go create mode 100644 test/integration/epp/util_test.go diff --git a/test/integration/epp/harness_test.go b/test/integration/epp/harness_test.go new file mode 100644 index 000000000..f91377a32 --- /dev/null +++ b/test/integration/epp/harness_test.go @@ -0,0 +1,332 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package epp + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + metricsutils "k8s.io/component-base/metrics/testutil" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + crconfig "sigs.k8s.io/controller-runtime/pkg/config" + crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" + epptestutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" + "sigs.k8s.io/gateway-api-inference-extension/test/integration" +) + +// TestHarness encapsulates the environment for a single isolated EPP test run. +// It manages the lifecycle of the controller manager, the EPP server, and the K8s namespace. +type TestHarness struct { + t *testing.T + ctx context.Context + Namespace string + + Mgr ctrl.Manager + ServerRunner *server.ExtProcServerRunner + Client extProcPb.ExternalProcessor_ProcessClient + Datastore datastore.Datastore + + // Internal handles for cleanup + grpcConn *grpc.ClientConn +} + +// NewTestHarness boots up a fully isolated test environment. +// It creates a unique Namespace, scopes the Manager to that Namespace, and starts the components. +// Note: EPP tests must run serially because they rely on the global Prometheus registry. +func NewTestHarness(t *testing.T, ctx context.Context) *TestHarness { + t.Helper() + + // 1. Identity & Namespace Isolation + // We use a unique UUID to ensure that resources from this test do not collide with others. + uid := uuid.New().String()[:8] + nsName := "epp-test-" + uid + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: nsName}} + require.NoError(t, k8sClient.Create(ctx, ns), "failed to create test namespace") + + // 2. Free Port Allocation + grpcPort, err := integration.GetFreePort() + require.NoError(t, err, "failed to acquire free port") + + // 3. Manager Scoped to Namespace + // Critical: We restrict the Manager's cache to the test namespace to avoid processing objects from other tests or + // previous runs. + skipValidation := true + mgrOpts := ctrl.Options{ + Scheme: testScheme, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{ + nsName: {}, // Implicitly filters all watches to this NS. + }, + }, + Controller: crconfig.Controller{ + SkipNameValidation: &skipValidation, + }, + Metrics: metricsserver.Options{ + BindAddress: "0", // Disable metrics server binding or use ephemeral to avoid port conflicts. + }, + HealthProbeBindAddress: "0", + LeaderElection: false, + } + mgr, err := ctrl.NewManager(testEnv.Config, mgrOpts) + require.NoError(t, err, "failed to create manager") + + // 4. EPP Server Configuration + runner := server.NewDefaultExtProcServerRunner() + // Overwrite default fields with test-specific configuration. + runner.GKNN = common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: nsName, Name: testPoolName}, + GroupKind: schema.GroupKind{Group: v1.GroupVersion.Group, Kind: "InferencePool"}, + } + runner.GrpcPort = grpcPort + runner.SecureServing = false + runner.HealthChecking = false + runner.TestPodMetricsClient = &backendmetrics.FakePodMetricsClient{} + runner.RefreshPrometheusMetricsInterval = 50 * time.Millisecond + runner.MetricsStalenessThreshold = 2 * time.Second + + // 5. Dependency Injection (Scheduler, Scorers, Datastore) + pmf := backendmetrics.NewPodMetricsFactory(runner.TestPodMetricsClient, 10*time.Millisecond) + // We disable periodic resync (0) to ensure deterministic test behavior. + runner.Datastore = datastore.NewDatastore(ctx, pmf, 0) + + defaultProfile := framework.NewSchedulerProfile(). + WithScorers( + framework.NewWeightedScorer(scorer.NewKVCacheUtilizationScorer(), 1), + framework.NewWeightedScorer(scorer.NewQueueScorer(), 1), + framework.NewWeightedScorer(prefix.New(ctx, prefix.DefaultConfig), 1), + framework.NewWeightedScorer(scorer.NewLoraAffinityScorer(), 1), + ). + WithPicker(picker.NewMaxScorePicker(picker.DefaultMaxNumOfEndpoints)) + + profileHandler := profile.NewSingleProfileHandler() + schedulerConfig := scheduling.NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}) + + sdConfig := &saturationdetector.Config{ + QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold, + KVCacheUtilThreshold: saturationdetector.DefaultKVCacheUtilThreshold, + MetricsStalenessThreshold: saturationdetector.DefaultMetricsStalenessThreshold, + } + runner.SaturationDetector = saturationdetector.NewDetector(sdConfig, logger.WithName("sd")) + locator := requestcontrol.NewDatastorePodLocator(runner.Datastore) + runner.Director = requestcontrol.NewDirectorWithConfig( + runner.Datastore, + scheduling.NewSchedulerWithConfig(schedulerConfig), + requestcontrol.NewLegacyAdmissionController(runner.SaturationDetector, locator), + locator, + requestcontrol.NewConfig(), + ) + + require.NoError(t, runner.SetupWithManager(ctx, mgr), "failed to setup server runner") + + // 6. Start Background Processes + mgrCtx, mgrCancel := context.WithCancel(ctx) + + // Start Manager. + go func() { + if err := mgr.Start(mgrCtx); err != nil { + // Context cancellation is expected during teardown. + if !strings.Contains(err.Error(), "context canceled") { + logger.Error(err, "manager stopped unexpectedly") + } + } + }() + + // Start ExtProc server. + serverCtx, serverCancel := context.WithCancel(ctx) + runnable := runner.AsRunnable(logger.WithName("server")).Start + + client, conn := integration.StartExtProcServer( + t, + serverCtx, + runnable, + grpcPort, + logger, + ) + + h := &TestHarness{ + t: t, + ctx: ctx, + Namespace: nsName, + Mgr: mgr, + ServerRunner: runner, + Client: client, + Datastore: runner.Datastore, + grpcConn: conn, + } + + // 7. Register Cleanup + t.Cleanup(func() { + serverCancel() + mgrCancel() + _ = h.grpcConn.Close() + // Deleting the Namespace cascades to all contained resources. + _ = k8sClient.Delete(context.Background(), ns) + // Crucial: Reset global metrics registry to prevent pollution between serial tests. + metrics.Reset() + }) + + return h +} + +// --- Fluent Builder API --- + +// WithBaseResources injects the standard pool and objective definitions into the test namespace. +// These resources are pre-parsed in TestMain to avoid I/O overhead in the loop. +func (h *TestHarness) WithBaseResources() *TestHarness { + h.t.Helper() + for _, obj := range baseResources { + copy := obj.DeepCopy() + copy.SetNamespace(h.Namespace) + require.NoError(h.t, k8sClient.Create(h.ctx, copy), "failed to create base resource: %s", obj.GetKind()) + } + return h +} + +// WithPods creates pod objects in the API server and configures the fake metrics client. +func (h *TestHarness) WithPods(pods []podState) *TestHarness { + h.t.Helper() + metricsMap := make(map[types.NamespacedName]*backendmetrics.MetricsState) + + // Pre-calculate metrics and register them with the fake client. + for _, p := range pods { + metricsKeyName := fmt.Sprintf("pod-%d-rank-0", p.index) + activeModelsMap := make(map[string]int) + for _, m := range p.activeModels { + activeModelsMap[m] = 1 + } + + metricsMap[types.NamespacedName{Namespace: h.Namespace, Name: metricsKeyName}] = &backendmetrics.MetricsState{ + WaitingQueueSize: p.queueSize, + KVCacheUsagePercent: p.kvCacheUsage, + ActiveModels: activeModelsMap, + WaitingModels: make(map[string]int), + } + } + h.ServerRunner.TestPodMetricsClient.SetRes(metricsMap) + + // Create K8s Objects. + for _, p := range pods { + name := fmt.Sprintf("pod-%d", p.index) + + // Create K8s object. + pod := epptestutil.MakePod(name). + Namespace(h.Namespace). + ReadyCondition(). // Sets Status.Conditions. + Labels(map[string]string{"app": testPoolName}). + IP(fmt.Sprintf("192.168.1.%d", p.index+1)). + Complete(). + ObjRef() + + // Snapshot the status (Create wipes it). + intendedStatus := pod.Status + + // Create the resource. + require.NoError(h.t, k8sClient.Create(h.ctx, pod), "failed to create pod %s", name) + + // Restore Status on the created K8s object which now has the correct ResourceVersion/UID. + pod.Status = intendedStatus + + // Update Status subresource. + require.NoError(h.t, k8sClient.Status().Update(h.ctx, pod), "failed to update status for pod %s", name) + } + + return h +} + +// WaitForReadyPodsMetric blocks until the prometheus metric 'inference_pool_ready_pods' matches the expected count. +// This ensures the background metric collector has fully synced. +func (h *TestHarness) WaitForReadyPodsMetric(expectedCount int) { + h.t.Helper() + + expected := cleanMetric(metricReadyPods(expectedCount)) + require.Eventually(h.t, func() bool { + err := metricsutils.GatherAndCompare(crmetrics.Registry, strings.NewReader(expected), + "inference_pool_ready_pods") + return err == nil + }, 10*time.Second, 50*time.Millisecond, "Timed out waiting for inference_pool_ready_pods metric to settle") +} + +// WaitForSync blocks until the EPP Datastore has synced the expected number of pods and, optionally, a specific model +// objective. +func (h *TestHarness) WaitForSync(expectedPods int, checkModelObjective string) *TestHarness { + h.t.Helper() + require.Eventually(h.t, func() bool { + if !h.Datastore.PoolHasSynced() { + return false + } + if len(h.Datastore.PodList(datastore.AllPodsPredicate)) != expectedPods { + return false + } + if checkModelObjective != "" && h.Datastore.ObjectiveGet(checkModelObjective) == nil { + return false + } + return true + }, 10*time.Second, 50*time.Millisecond, + "Datastore sync timed out.\n- PoolSynced: %v\n- Pods Found: %d (Expected: %d)\n- Objective '%s' Found: %v", + h.Datastore.PoolHasSynced(), + len(h.Datastore.PodList(datastore.AllPodsPredicate)), + expectedPods, + checkModelObjective, + h.Datastore.ObjectiveGet(checkModelObjective) != nil, + ) + return h +} + +// ExpectMetrics asserts that specific metrics match the expected Prometheus output. +// It uses Eventually to allow for slight delays in metric recording (e.g. async token counting). +func (h *TestHarness) ExpectMetrics(expected map[string]string) { + h.t.Helper() + for name, value := range expected { + var err error + assert.Eventually(h.t, func() bool { + err = metricsutils.GatherAndCompare(crmetrics.Registry, strings.NewReader(value), name) + return err == nil + }, 2*time.Second, 50*time.Millisecond, "Timed out waiting for metric %s to match: %v", name) + if err != nil { + h.t.Errorf("Metric mismatch for %s: %v", name, err) + } + } +} diff --git a/test/integration/epp/util_test.go b/test/integration/epp/util_test.go new file mode 100644 index 000000000..4baee930d --- /dev/null +++ b/test/integration/epp/util_test.go @@ -0,0 +1,171 @@ +/* +Copyright 2025 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0. +*/ + +package epp + +import ( + "encoding/json" + "fmt" + "strings" + + envoyCorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" + requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request" + "sigs.k8s.io/gateway-api-inference-extension/test/integration" +) + +// --- Domain Request Builders --- + +// ReqSubset creates a request sequence with Envoy Endpoint Metadata. +// This simulates the "Subset Load Balancing" flow where EPP picks a specific pod IP. +func ReqSubset(prompt, model, target string, subsets ...string) []*extProcPb.ProcessingRequest { + // Uses the shared low-level generator which handles the metadata construction + return integration.GenerateStreamedRequestSet(logger, prompt, model, target, subsets) +} + +// ReqResponseOnly creates a sequence simulating only the response phase from Envoy. +// It skips the RequestHeaders phase entirely. +func ReqResponseOnly( + respHeaders map[string]string, + bodyChunks ...string, +) []*extProcPb.ProcessingRequest { + reqs := []*extProcPb.ProcessingRequest{} + + // 1. Response Headers + hListResp := []*envoyCorev3.HeaderValue{} + for k, v := range respHeaders { + hListResp = append(hListResp, &envoyCorev3.HeaderValue{Key: k, RawValue: []byte(v)}) + } + reqs = append(reqs, &extProcPb.ProcessingRequest{ + Request: &extProcPb.ProcessingRequest_ResponseHeaders{ + ResponseHeaders: &extProcPb.HttpHeaders{Headers: &envoyCorev3.HeaderMap{Headers: hListResp}}, + }, + }) + + // 2. Response Body Chunks + for i, chunk := range bodyChunks { + reqs = append(reqs, &extProcPb.ProcessingRequest{ + Request: &extProcPb.ProcessingRequest_ResponseBody{ + ResponseBody: &extProcPb.HttpBody{ + Body: []byte(chunk), + EndOfStream: i == len(bodyChunks)-1, + }, + }, + }) + } + return reqs +} + +// --- Response Expectations --- + +// ExpectRouteTo asserts that the request was successfully routed to the specified endpoint and that the body was +// rewritten to match the target model. +func ExpectRouteTo(endpoint, targetModel, prompt string) []*extProcPb.ProcessingResponse { + // Reconstruct the expected rewritten body. + j, _ := json.Marshal(map[string]any{ + "max_tokens": 100, "model": targetModel, "prompt": prompt, "temperature": 0, + }) + return integration.NewRequestBufferedResponse( + endpoint, string(j), + &envoyCorev3.HeaderValueOption{Header: &envoyCorev3.HeaderValue{Key: "hi", RawValue: []byte("mom")}}, + &envoyCorev3.HeaderValueOption{Header: &envoyCorev3.HeaderValue{ + Key: requtil.RequestIdHeaderKey, + RawValue: []byte("test-request-id"), + }}, + ) +} + +// ExpectReject asserts that the EPP immediately rejected the request with the given code and message. +func ExpectReject(code envoyTypePb.StatusCode, msg string) []*extProcPb.ProcessingResponse { + return integration.NewImmediateErrorResponse(code, msg) +} + +// ExpectBufferResp asserts that the EPP buffers the response and rewrites the body. +// This uses the shared primitive but adds EPP-specific headers we expect. +func ExpectBufferResp(body string, contentType string) []*extProcPb.ProcessingResponse { + return integration.NewResponseBufferedResponse( + body, + &envoyCorev3.HeaderValueOption{Header: &envoyCorev3.HeaderValue{ + Key: "x-went-into-resp-headers", + RawValue: []byte("true"), + }}, + &envoyCorev3.HeaderValueOption{Header: &envoyCorev3.HeaderValue{ + Key: "content-type", + RawValue: []byte(contentType), + }}, + ) +} + +// ExpectStreamResp asserts that the EPP streams the response chunks (pass-through). +// It constructs a sequence of: +// 1. ResponseHeaders (with "x-went-into-resp-headers" and "text/event-stream") +// 2. ResponseBody chunks (with EndOfStream=true on the final chunk) +func ExpectStreamResp(chunks ...string) []*extProcPb.ProcessingResponse { + // 1. The Header Response Frame + res := []*extProcPb.ProcessingResponse{ + integration.NewResponseHeaders( + &envoyCorev3.HeaderValueOption{Header: &envoyCorev3.HeaderValue{ + Key: "x-went-into-resp-headers", + RawValue: []byte("true"), + }}, + &envoyCorev3.HeaderValueOption{Header: &envoyCorev3.HeaderValue{ + Key: "content-type", + RawValue: []byte("text/event-stream"), + }}, + &envoyCorev3.HeaderValueOption{Header: &envoyCorev3.HeaderValue{Key: "status", RawValue: []byte("200")}}, + ), + } + + // 2. The Body Chunk Frames + for i, chunk := range chunks { + res = append(res, integration.NewResponseStreamChunk(chunk, i == len(chunks)-1)) + } + return res +} + +// --- Data Structures & Metrics Helpers --- + +type podState struct { + index int + queueSize int + kvCacheUsage float64 + activeModels []string +} + +// P constructs a Pod State: Index, Queue, KV%, Models... +// Usage: P(0, 5, 0.2, "model-a") +func P(idx int, q int, kv float64, models ...string) podState { + return podState{index: idx, queueSize: q, kvCacheUsage: kv, activeModels: models} +} + +type label struct{ name, value string } + +func labelsToString(labels []label) string { + var sb strings.Builder + for i, l := range labels { + if i > 0 { + sb.WriteString(",") + } + sb.WriteString(fmt.Sprintf("%s=%q", l.name, l.value)) + } + return sb.String() +} + +func metricReqTotal(model, target string) string { + return fmt.Sprintf(` + # HELP inference_objective_request_total [ALPHA] Counter of inference objective requests broken out for each model and target model. + # TYPE inference_objective_request_total counter + inference_objective_request_total{%s} 1 + `, labelsToString([]label{{"model_name", model}, {"target_model_name", target}})) +} + +func metricReadyPods(count int) string { + return fmt.Sprintf(` + # HELP inference_pool_ready_pods [ALPHA] The number of ready pods in the inference server pool. + # TYPE inference_pool_ready_pods gauge + inference_pool_ready_pods{%s} %d + `, labelsToString([]label{{"name", testPoolName}}), count) +} From 4f6f4d73d0cc291f5b3f832d6a3c6877c029d0fa Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Thu, 18 Dec 2025 00:07:32 +0000 Subject: [PATCH 3/4] test/epp: migrate integration suite to new harness Rewrite the hermetic integration suite to use the new isolated harness and DSL. - Migrate all existing test cases from epp_test.go to hermetic_test.go. - Enable pre-loading of base CRDs in TestMain to optimize setup time. - Remove legacy setup code and global variable reliance. --- test/integration/epp/hermetic_test.go | 1508 +++++-------------------- test/integration/epp/util_test.go | 13 +- 2 files changed, 315 insertions(+), 1206 deletions(-) diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 74e3408fd..e7a42ebe7 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -14,1351 +14,449 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package epp contains integration tests for the ext proc while faking the backend pods. +// Package epp contains integration tests for the Endpoint Picker extension. package epp import ( - "bufio" - "bytes" "context" - "errors" "fmt" - "io" "os" "path/filepath" "strings" "testing" - "time" configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/google/go-cmp/cmp" - "github.com/stretchr/testify/assert" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" "google.golang.org/protobuf/testing/protocmp" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - k8syaml "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/apimachinery/pkg/util/yaml" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - metricsutils "k8s.io/component-base/metrics/testutil" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" - crconfig "sigs.k8s.io/controller-runtime/pkg/config" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" - crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" - "sigs.k8s.io/controller-runtime/pkg/metrics/filters" - metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" - backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector/framework/plugins/utilizationdetector" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request" - epptestutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" - integrationutils "sigs.k8s.io/gateway-api-inference-extension/test/integration" - "sigs.k8s.io/yaml" + "sigs.k8s.io/gateway-api-inference-extension/test/integration" ) const ( - // Test Infrastructure - testPoolName = "vllm-llama3-8b-instruct-pool" - testNamespace = "default" - testMetricsPort = 8889 - - // Model Names + testPoolName = "vllm-llama3-8b-instruct-pool" modelMyModel = "my-model" modelMyModelTarget = "my-model-12345" - modelToBeWritten = "model-to-be-rewritten" - modelAfterRewrite = "rewritten-model" modelSQLLora = "sql-lora" modelSQLLoraTarget = "sql-lora-1fdg2" modelSheddable = "sql-lora-sheddable" modelSheddableTarget = "sql-lora-1fdg3" modelDirect = "direct-model" + modelToBeWritten = "model-to-be-rewritten" + modelAfterRewrite = "rewritten-model" ) +// Global State (Initialized in TestMain) var ( - testGRPCAddress = fmt.Sprintf("localhost:%d", server.DefaultGrpcPort) - serverRunner *server.ExtProcServerRunner - k8sClient k8sclient.Client - testEnv *envtest.Environment - scheme = runtime.NewScheme() - logger = logutil.NewTestLogger().V(logutil.VERBOSE) + k8sClient client.Client + testEnv *envtest.Environment + testScheme = runtime.NewScheme() + logger = zap.New(zap.UseDevMode(true), zap.Level(zapcore.Level(logutil.DEFAULT))) + baseResources []*unstructured.Unstructured ) func TestMain(m *testing.M) { - cleanup := BeforeSuite() - code := m.Run() - cleanup() - os.Exit(code) -} + ctrl.SetLogger(logger) -type label struct { - name, - value string -} + // 1. EnvTest Setup (API Server + Etcd) + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, + } + cfg, err := testEnv.Start() + if err != nil { + panic(fmt.Sprintf("failed to start test environment: %v", err)) + } -func labelsToString(labels []label) string { - var sb strings.Builder - i := 0 - for _, l := range labels { - if i > 0 { - sb.WriteString(",") - } - sb.WriteString(fmt.Sprintf("%s=%q", l.name, l.value)) - i++ + // 2. Client & Scheme Registration + utilruntime.Must(clientgoscheme.AddToScheme(testScheme)) + utilruntime.Must(v1alpha2.Install(testScheme)) + utilruntime.Must(v1.Install(testScheme)) + k8sClient, err = client.New(cfg, client.Options{Scheme: testScheme}) + if err != nil { + panic(err) } - return sb.String() -} -func inferenceObjectiveRequestTotal(labels []label) string { - return fmt.Sprintf(` - # HELP inference_objective_request_total [ALPHA] Counter of inference objective requests broken out for each model and target model. - # TYPE inference_objective_request_total counter - inference_objective_request_total{%s} 1 - `, labelsToString(labels)) -} + // 3. Global Metric Registration + // Necessary because we cannot parallelize tests using the global registry. + metrics.Register() + + // 4. Pre-parse Base Resources + // We load the YAML once here to avoid unnecessary I/O in every test case. + baseResources = loadBaseResources() -func inferencePoolReadyPods(v int, labels []label) string { - return fmt.Sprintf(` - # HELP inference_pool_ready_pods [ALPHA] The number of ready pods in the inference server pool. - # TYPE inference_pool_ready_pods gauge - inference_pool_ready_pods{%s} %d - `, labelsToString(labels), v) + code := m.Run() + + _ = testEnv.Stop() + os.Exit(code) } func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) { tests := []struct { - name string - requests []*extProcPb.ProcessingRequest - pods map[*backend.Pod]*backendmetrics.MetricsState - wantResponses []*extProcPb.ProcessingResponse - wantMetrics map[string]string - wantErr bool - immediateResponse *extProcPb.ImmediateResponse + name string + requests []*extProcPb.ProcessingRequest + pods []podState + wantResponses []*extProcPb.ProcessingResponse + wantMetrics map[string]string + waitForModel string }{ - // Request flow tests + // --- Standard Routing Logic --- { - name: "select lower queue and kv cache, no active lora", - requests: integrationutils.GenerateStreamedRequestSet(logger, "test1", modelMyModel, modelMyModelTarget, nil), - // Pod 1 will be picked because it has relatively low queue size and low KV cache. - pods: newPodStates( - podState{index: 0, queueSize: 3, kvCacheUsage: 0.2}, - podState{index: 1, queueSize: 0, kvCacheUsage: 0.1}, - podState{index: 2, queueSize: 10, kvCacheUsage: 0.2}, - ), - wantMetrics: map[string]string{ - "inference_objective_request_total": inferenceObjectiveRequestTotal([]label{ - {"model_name", modelMyModel}, - {"target_model_name", modelMyModelTarget}, - }), - "inference_pool_ready_pods": inferencePoolReadyPods(3, []label{ - {"name", testPoolName}, - }), + name: "select lower queue and kv cache", + requests: integration.ReqLLM(logger, "test1", modelMyModel, modelMyModelTarget), + pods: []podState{ + P(0, 3, 0.2), + P(1, 0, 0.1), // Winner (Low Queue, Low KV) + P(2, 10, 0.2), }, - wantErr: false, - wantResponses: integrationutils.NewRequestBufferedResponse( - "192.168.1.2:8000", - fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test1","temperature":0}`, modelMyModelTarget), - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "hi", - RawValue: []byte("mom"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: requtil.RequestIdHeaderKey, - RawValue: []byte("test-request-id"), - }, - }, - ), - }, - { - name: "invalid json; return body", - requests: []*extProcPb.ProcessingRequest{ - { - Request: &extProcPb.ProcessingRequest_RequestHeaders{ - RequestHeaders: &extProcPb.HttpHeaders{ - Headers: &configPb.HeaderMap{ - Headers: []*configPb.HeaderValue{ - { - Key: "hi", - Value: "mom", - }, - }, - }, - }, - }, - }, - { - Request: &extProcPb.ProcessingRequest_RequestBody{ - RequestBody: &extProcPb.HttpBody{Body: []byte("no healthy upstream"), EndOfStream: true}, - }, - }, + wantResponses: ExpectRouteTo("192.168.1.2:8000", modelMyModelTarget, "test1"), + wantMetrics: map[string]string{ + "inference_objective_request_total": cleanMetric(metricReqTotal(modelMyModel, modelMyModelTarget)), + "inference_pool_ready_pods": cleanMetric(metricReadyPods(3)), }, - // Pod 1 will be picked because it has relatively low queue size, the requested model active, and low KV cache. - pods: newPodStates( - podState{index: 0, queueSize: 0, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar"}}, - podState{index: 1, queueSize: 0, kvCacheUsage: 0.1, activeModels: []string{"foo", modelSQLLoraTarget}}, - podState{index: 2, queueSize: 10, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar"}}, - ), - wantErr: false, - wantResponses: integrationutils.NewImmediateErrorResponse( - envoyTypePb.StatusCode_BadRequest, - "inference gateway: BadRequest - Error unmarshaling request body", - ), }, { name: "select active lora, low queue", - requests: integrationutils.GenerateStreamedRequestSet(logger, "test2", modelSQLLora, modelSQLLoraTarget, nil), - // Pod 1 will be picked because it has relatively low queue size, the requested model active, and low KV cache. - pods: newPodStates( - podState{index: 0, queueSize: 0, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar"}}, - podState{index: 1, queueSize: 0, kvCacheUsage: 0.1, activeModels: []string{"foo", modelSQLLoraTarget}}, - podState{index: 2, queueSize: 10, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar"}}, - ), - + requests: integration.ReqLLM(logger, "test2", modelSQLLora, modelSQLLoraTarget), + pods: []podState{ + P(0, 0, 0.2, "foo", "bar"), + P(1, 0, 0.1, "foo", modelSQLLoraTarget), // Winner (Has LoRA) + P(2, 10, 0.2, "foo", "bar"), + }, + wantResponses: ExpectRouteTo("192.168.1.2:8000", modelSQLLoraTarget, "test2"), wantMetrics: map[string]string{ - "inference_objective_request_total": inferenceObjectiveRequestTotal([]label{ - {"model_name", modelSQLLora}, - {"target_model_name", modelSQLLoraTarget}, - }), + "inference_objective_request_total": cleanMetric(metricReqTotal(modelSQLLora, modelSQLLoraTarget)), }, - wantErr: false, - wantResponses: integrationutils.NewRequestBufferedResponse( - "192.168.1.2:8000", - fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test2","temperature":0}`, modelSQLLoraTarget), - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "hi", - RawValue: []byte("mom"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: requtil.RequestIdHeaderKey, - RawValue: []byte("test-request-id"), - }, - }, - ), }, { - name: "select lora despite higher kv cache usage", - requests: integrationutils.GenerateStreamedRequestSet(logger, "test3", modelSQLLora, modelSQLLoraTarget, nil), - // Pod 2 will be picked despite NOT having the requested model active as it is above the affinity for queue size. - // Also it is critical, so we should still admit the request despite all queue sizes being greater than the queue - // size threshold. - pods: newPodStates( - podState{index: 0, queueSize: 10, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar"}}, - podState{index: 1, queueSize: 10, kvCacheUsage: 0.4, activeModels: []string{"foo", modelSQLLoraTarget}}, - podState{index: 2, queueSize: 10, kvCacheUsage: 0.3, activeModels: []string{"foo"}}, - ), + name: "select lora despite higher kv cache (affinity)", + requests: integration.ReqLLM(logger, "test3", modelSQLLora, modelSQLLoraTarget), + pods: []podState{ + P(0, 10, 0.2, "foo", "bar"), + P(1, 10, 0.4, "foo", modelSQLLoraTarget), // Winner (Affinity overrides KV) + P(2, 10, 0.3, "foo"), + }, + wantResponses: ExpectRouteTo("192.168.1.2:8000", modelSQLLoraTarget, "test3"), wantMetrics: map[string]string{ - "inference_objective_request_total": inferenceObjectiveRequestTotal([]label{ - {"model_name", modelSQLLora}, - {"target_model_name", modelSQLLoraTarget}, - }), + "inference_objective_request_total": cleanMetric(metricReqTotal(modelSQLLora, modelSQLLoraTarget)), }, - wantErr: false, - wantResponses: integrationutils.NewRequestBufferedResponse( - "192.168.1.2:8000", - fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test3","temperature":0}`, modelSQLLoraTarget), - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "hi", - RawValue: []byte("mom"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: requtil.RequestIdHeaderKey, - RawValue: []byte("test-request-id"), - }, - }, - ), }, { - name: "don't shed requests by default", - requests: integrationutils.GenerateStreamedRequestSet(logger, "test4", modelSQLLora, modelSQLLoraTarget, nil), - // pod 0: excluded; above queue size threshold - // pod 1: excluded; above KV cache threshold - // pod 2: excluded; above queue size threshold - pods: newPodStates( - podState{index: 0, queueSize: 6, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar", modelSQLLoraTarget}}, - podState{index: 1, queueSize: 0, kvCacheUsage: 0.85, activeModels: []string{"foo"}}, - podState{index: 2, queueSize: 10, kvCacheUsage: 0.9, activeModels: []string{"foo"}}, - ), + name: "do not shed requests by default", + requests: integration.ReqLLM(logger, "test4", modelSQLLora, modelSQLLoraTarget), + pods: []podState{ + P(0, 6, 0.2, "foo", "bar", modelSQLLoraTarget), // Winner (Lowest saturated) + P(1, 0, 0.85, "foo"), + P(2, 10, 0.9, "foo"), + }, + wantResponses: ExpectRouteTo("192.168.1.1:8000", modelSQLLoraTarget, "test4"), wantMetrics: map[string]string{ - "inference_objective_request_total": inferenceObjectiveRequestTotal([]label{ - {"model_name", modelSQLLora}, - {"target_model_name", modelSQLLoraTarget}, - }), + "inference_objective_request_total": cleanMetric(metricReqTotal(modelSQLLora, modelSQLLoraTarget)), }, - wantErr: false, - wantResponses: integrationutils.NewRequestBufferedResponse( - "192.168.1.1:8000", - fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test4","temperature":0}`, modelSQLLoraTarget), - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "hi", - RawValue: []byte("mom"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: requtil.RequestIdHeaderKey, - RawValue: []byte("test-request-id"), - }, - }, - ), }, + + // --- Error Handling & Edge Cases ---- { - name: "body sent over multiple requests, noncritical, but one server has capacity, do not shed", - requests: []*extProcPb.ProcessingRequest{ - { - Request: &extProcPb.ProcessingRequest_RequestHeaders{ - RequestHeaders: &extProcPb.HttpHeaders{ - Headers: &configPb.HeaderMap{ - Headers: []*configPb.HeaderValue{ - { - Key: "hi", - Value: "mom", - }, - { - Key: metadata.ObjectiveKey, - Value: modelSheddable, - }, - { - Key: metadata.ModelNameRewriteKey, - Value: modelSheddableTarget, - }, - { - Key: requtil.RequestIdHeaderKey, - Value: "test-request-id", - }, - }, - }, - }, - }, - }, { - Request: &extProcPb.ProcessingRequest_RequestBody{ - RequestBody: &extProcPb.HttpBody{Body: []byte("{\"max_tokens\":100,\"model\":\"sql-lo"), EndOfStream: false}, - }, - }, - { - Request: &extProcPb.ProcessingRequest_RequestBody{ - RequestBody: &extProcPb.HttpBody{Body: []byte("ra-sheddable\",\"prompt\":\"test6\",\"temperature\":0}"), EndOfStream: true}, - }, - }, - }, - // Pod 1 will be picked because it has relatively low queue size and low KV cache. - pods: newPodStates( - podState{index: 0, queueSize: 4, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar", modelSheddableTarget}}, - podState{index: 1, queueSize: 4, kvCacheUsage: 0.85, activeModels: []string{"foo", modelSheddableTarget}}, - podState{index: 2, queueSize: 10, kvCacheUsage: 0.9, activeModels: []string{"foo", modelSheddableTarget}}, + name: "invalid json body", + requests: integration.ReqRaw( + map[string]string{"hi": "mom"}, + "no healthy upstream", ), - wantMetrics: map[string]string{ - "inference_objective_request_total": inferenceObjectiveRequestTotal([]label{ - {"model_name", modelSheddable}, - {"target_model_name", modelSheddableTarget}, - }), + pods: []podState{ + P(0, 0, 0.2, "foo", "bar"), }, - wantErr: false, - wantResponses: integrationutils.NewRequestBufferedResponse( - "192.168.1.1:8000", - fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test6","temperature":0}`, modelSheddableTarget), - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "hi", - RawValue: []byte("mom"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: requtil.RequestIdHeaderKey, - RawValue: []byte("test-request-id"), - }, - }, + wantResponses: ExpectReject( + envoyTypePb.StatusCode_BadRequest, + "inference gateway: BadRequest - Error unmarshaling request body", ), }, { - name: "inferenceobjective's modelName is not translated, passthrough", - requests: []*extProcPb.ProcessingRequest{ - { - Request: &extProcPb.ProcessingRequest_RequestHeaders{ - RequestHeaders: &extProcPb.HttpHeaders{ - Headers: &configPb.HeaderMap{ - Headers: []*configPb.HeaderValue{ - { - Key: "hi", - Value: "mom", - }, - { - Key: metadata.ObjectiveKey, - Value: modelDirect, - }, - { - Key: metadata.ModelNameRewriteKey, - Value: modelDirect, - }, - { - Key: metadata.ModelNameRewriteKey, - Value: modelDirect, - }, - { - Key: requtil.RequestIdHeaderKey, - Value: "test-request-id", - }, - }, - }, - }, - }, - }, - { - Request: &extProcPb.ProcessingRequest_RequestBody{ - RequestBody: &extProcPb.HttpBody{Body: []byte("{\"max_tokens\":100,\"model\":\"direct-"), EndOfStream: false}, - }, - }, - { - Request: &extProcPb.ProcessingRequest_RequestBody{ - RequestBody: &extProcPb.HttpBody{Body: []byte("model\",\"prompt\":\"test6\",\"temperature\":0}"), EndOfStream: true}, - }, - }, - }, - // pod 0: selected due to low queue size and kv cache usage - pods: newPodStates( - podState{index: 0, queueSize: 4, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar", modelSheddableTarget}}, - podState{index: 1, queueSize: 0, kvCacheUsage: 0.85, activeModels: []string{"foo", modelSheddableTarget}}, - podState{index: 2, queueSize: 10, kvCacheUsage: 0.9, activeModels: []string{"foo", modelSheddableTarget}}, + name: "split body across chunks", + requests: integration.ReqRaw( + map[string]string{ + "hi": "mom", + metadata.ObjectiveKey: modelSheddable, + metadata.ModelNameRewriteKey: modelSheddableTarget, + requtil.RequestIdHeaderKey: "test-request-id", + }, + `{"max_tokens":100,"model":"sql-lo`, + `ra-sheddable","prompt":"test6","temperature":0}`, ), + pods: []podState{ + P(0, 4, 0.2, "foo", "bar", modelSheddableTarget), + P(1, 4, 0.85, "foo", modelSheddableTarget), + }, + wantResponses: ExpectRouteTo("192.168.1.1:8000", modelSheddableTarget, "test6"), wantMetrics: map[string]string{ - "inference_objective_request_total": inferenceObjectiveRequestTotal([]label{ - {"model_name", modelDirect}, - {"target_model_name", modelDirect}, - }), + "inference_objective_request_total": cleanMetric(metricReqTotal(modelSheddable, modelSheddableTarget)), }, - wantErr: false, - wantResponses: integrationutils.NewRequestBufferedResponse( - "192.168.1.1:8000", - fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test6","temperature":0}`, modelDirect), - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "hi", - RawValue: []byte("mom"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: requtil.RequestIdHeaderKey, - RawValue: []byte("test-request-id"), - }, - }, - ), }, - // Response flow tests { - name: "responsebody sent over multiple requests, content-type is json, buffer", - requests: []*extProcPb.ProcessingRequest{ - { - Request: &extProcPb.ProcessingRequest_ResponseHeaders{ - ResponseHeaders: &extProcPb.HttpHeaders{ - Headers: &configPb.HeaderMap{ - Headers: []*configPb.HeaderValue{ - { - Key: "content-type", - Value: "application/json", - }, - }, - }, - }, - }, - }, - { - Request: &extProcPb.ProcessingRequest_ResponseBody{ - ResponseBody: &extProcPb.HttpBody{Body: []byte("{\"max_tokens\":100,\"model\":\"sql-lo"), EndOfStream: false}, - }, - }, - { - Request: &extProcPb.ProcessingRequest_ResponseBody{ - ResponseBody: &extProcPb.HttpBody{Body: []byte("ra-sheddable\",\"prompt\":\"test6\",\"temperature\":0}"), EndOfStream: true}, - }, - }, - }, - // pod 0: selected - // pod 1: excluded; above KV cache threshold - // pod 2: excluded; above queue size threshold - pods: newPodStates( - podState{index: 0, queueSize: 4, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar", modelSheddableTarget}}, - podState{index: 1, queueSize: 0, kvCacheUsage: 0.85, activeModels: []string{"foo", modelSheddableTarget}}, - podState{index: 2, queueSize: 10, kvCacheUsage: 0.9, activeModels: []string{"foo", modelSheddableTarget}}, - ), - wantErr: false, - wantResponses: integrationutils.NewResponseBufferedResponse( - fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test6","temperature":0}`, modelSheddable), - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "x-went-into-resp-headers", - RawValue: []byte("true"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "content-type", - RawValue: []uint8("application/json"), - }, - }, - ), + name: "no backend pods available", + requests: integration.ReqHeaderOnly(map[string]string{"content-type": "application/json"}), + pods: nil, + wantResponses: ExpectReject(envoyTypePb.StatusCode_InternalServerError, + "inference gateway: Internal - no pods available in datastore"), }, { - name: "Response is invalid json; return body", - requests: []*extProcPb.ProcessingRequest{ - { - Request: &extProcPb.ProcessingRequest_ResponseHeaders{ - ResponseHeaders: &extProcPb.HttpHeaders{ - Headers: &configPb.HeaderMap{ - Headers: []*configPb.HeaderValue{ - { - Key: "content-type", - Value: "application/json", - }, - }, - }, - }, - }, - }, - { - Request: &extProcPb.ProcessingRequest_ResponseBody{ - ResponseBody: &extProcPb.HttpBody{Body: []byte("no healthy upstream"), EndOfStream: true}, - }, - }, - }, - // pod 0: selected - // pod 1: excluded; above KV cache threshold - // pod 2: excluded; above queue size threshold - pods: newPodStates( - podState{index: 0, queueSize: 4, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar", modelSheddableTarget}}, - podState{index: 1, queueSize: 0, kvCacheUsage: 0.85, activeModels: []string{"foo", modelSheddableTarget}}, - podState{index: 2, queueSize: 10, kvCacheUsage: 0.9, activeModels: []string{"foo", modelSheddableTarget}}, - ), - wantErr: false, - wantResponses: integrationutils.NewResponseBufferedResponse( - "no healthy upstream", - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "x-went-into-resp-headers", - RawValue: []byte("true"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "content-type", - RawValue: []uint8("application/json"), - }, - }, + name: "request missing model field", + requests: integration.ReqRaw( + map[string]string{"content-type": "application/json"}, + `{"hello":"world"}`, ), + wantResponses: ExpectReject(envoyTypePb.StatusCode_BadRequest, + "inference gateway: BadRequest - model not found in request body"), }, + + // --- Subsetting & Metadata --- { - name: "responsebody sent over a single request, but empty body with EndOfStream in the second request(this is how envoy operates); content-type is json, buffer", - requests: []*extProcPb.ProcessingRequest{ - { - Request: &extProcPb.ProcessingRequest_ResponseHeaders{ - ResponseHeaders: &extProcPb.HttpHeaders{ - Headers: &configPb.HeaderMap{ - Headers: []*configPb.HeaderValue{ - { - Key: "content-type", - Value: "application/json", - }, - }, - }, - }, - }, - }, - { - Request: &extProcPb.ProcessingRequest_ResponseBody{ - ResponseBody: &extProcPb.HttpBody{Body: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-sheddable\",\"prompt\":\"test6\",\"temperature\":0}"), EndOfStream: false}, - }, - }, - { - Request: &extProcPb.ProcessingRequest_ResponseBody{ - ResponseBody: &extProcPb.HttpBody{Body: []byte(""), EndOfStream: true}, - }, - }, + name: "subsetting: select best from subset", + // Only pods in the subset list are eligible. + requests: ReqSubset("test2", modelSQLLora, modelSQLLoraTarget, + "192.168.1.1:8000", "192.168.1.2:8000", "192.168.1.3:8000"), + pods: []podState{ + P(0, 0, 0.2, "foo"), + P(1, 0, 0.1, "foo", modelSQLLoraTarget), // Winner (Low Queue + Matches Subset) + P(2, 10, 0.2, "foo"), }, - // pod 0: selected - // pod 1: excluded; above KV cache threshold - // pod 2: excluded; above queue size threshold - pods: newPodStates( - podState{index: 0, queueSize: 4, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar", modelSheddableTarget}}, - podState{index: 1, queueSize: 0, kvCacheUsage: 0.85, activeModels: []string{"foo", modelSheddableTarget}}, - podState{index: 2, queueSize: 10, kvCacheUsage: 0.9, activeModels: []string{"foo", modelSheddableTarget}}, - ), - wantErr: false, - wantResponses: integrationutils.NewResponseBufferedResponse( - fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test6","temperature":0}`, modelSheddable), - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "x-went-into-resp-headers", - RawValue: []byte("true"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "content-type", - RawValue: []uint8("application/json"), - }, - }, - ), + wantResponses: ExpectRouteTo("192.168.1.2:8000", modelSQLLoraTarget, "test2"), }, { - name: "responsebody sent over a single request, but empty body with EndOfStream in the second request(this is how envoy operates); content-type is json, buffer", - requests: []*extProcPb.ProcessingRequest{ - { - Request: &extProcPb.ProcessingRequest_ResponseHeaders{ - ResponseHeaders: &extProcPb.HttpHeaders{ - Headers: &configPb.HeaderMap{ - Headers: []*configPb.HeaderValue{ - { - Key: "content-type", - RawValue: []byte("text/event-stream"), - }, - { - Key: "status", - RawValue: []byte("200"), - }, - }, - }, - }, - }, - }, - { - Request: &extProcPb.ProcessingRequest_ResponseBody{ - ResponseBody: &extProcPb.HttpBody{ - Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"NEVER","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false}, - }, - }, - { - Request: &extProcPb.ProcessingRequest_ResponseBody{ - ResponseBody: &extProcPb.HttpBody{ - Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"GONNA","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false}, - }, - }, - { - Request: &extProcPb.ProcessingRequest_ResponseBody{ - ResponseBody: &extProcPb.HttpBody{ - Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"GIVE","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false}, - }, - }, - { - Request: &extProcPb.ProcessingRequest_ResponseBody{ - ResponseBody: &extProcPb.HttpBody{ - Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"YOU","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false}, - }, - }, - { - Request: &extProcPb.ProcessingRequest_ResponseBody{ - ResponseBody: &extProcPb.HttpBody{ - Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"UP","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`), - EndOfStream: false}, - }, - }, - { - Request: &extProcPb.ProcessingRequest_ResponseBody{ - ResponseBody: &extProcPb.HttpBody{ - Body: []byte("data: {\"id\":\"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9\",\"object\":\"text_completion\",\"created\":1741379018,\"model\":\"food-review-1\",\"choices\":[],\"usage\":{\"prompt_tokens\":7,\"total_tokens\":17,\"completion_tokens\":10}}\ndata: [DONE]"), - EndOfStream: false}, - }, - }, - { - Request: &extProcPb.ProcessingRequest_ResponseBody{ - ResponseBody: &extProcPb.HttpBody{ - Body: []byte(""), - EndOfStream: true}, - }, - }, - }, - wantMetrics: map[string]string{`inference_objective_input_tokens`: ` - # HELP inference_objective_input_tokens [ALPHA] Inference objective input token count distribution for requests in each model. - # TYPE inference_objective_input_tokens histogram - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="1"} 0 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="8"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="16"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="32"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="64"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="128"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="256"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="512"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="1024"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="2048"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="4096"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="8192"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="16384"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="32778"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="65536"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="131072"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="262144"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="524288"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="1.048576e+06"} 1 - inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="+Inf"} 1 - inference_objective_input_tokens_sum{model_name="",target_model_name=""} 7 - inference_objective_input_tokens_count{model_name="",target_model_name=""} 1 - `, - `inference_objective_normalized_time_per_output_token_seconds`: ` - # HELP inference_objective_normalized_time_per_output_token_seconds [ALPHA] Inference objective latency divided by number of output tokens in seconds for each model and target model. - # TYPE inference_objective_normalized_time_per_output_token_seconds histogram - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="0.001"} 0 - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="0.002"} 0 - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="0.005"} 0 - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="0.01"} 0 - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="0.02"} 0 - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="0.05"} 0 - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="0.1"} 0 - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="0.2"} 0 - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="0.5"} 0 - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="1"} 0 - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="2"} 0 - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="5"} 0 - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="10"} 0 - inference_objective_normalized_time_per_output_token_seconds_bucket{model_name="",target_model_name="",le="+Inf"} 1 - inference_objective_normalized_time_per_output_token_seconds_sum{model_name="",target_model_name=""} 9.223372036854776e+08 - inference_objective_normalized_time_per_output_token_seconds_count{model_name="",target_model_name=""} 1 - `}, - wantResponses: []*extProcPb.ProcessingResponse{ - integrationutils.NewResponseHeaders( - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "x-went-into-resp-headers", - RawValue: []byte("true"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "content-type", - RawValue: []byte("text/event-stream"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "status", - RawValue: []byte("200"), - }, - }, - ), - integrationutils.NewResponseStreamChunk(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"NEVER","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`, false), - integrationutils.NewResponseStreamChunk(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"GONNA","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`, false), - integrationutils.NewResponseStreamChunk(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"GIVE","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`, false), - integrationutils.NewResponseStreamChunk(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"YOU","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`, false), - integrationutils.NewResponseStreamChunk(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"food-review-1","choices":[{"index":0,"text":"UP","logprobs":null,"finish_reason":null,"stop_reason":null}],"usage":null}`, false), - integrationutils.NewResponseStreamChunk("data: {\"id\":\"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9\",\"object\":\"text_completion\",\"created\":1741379018,\"model\":\"food-review-1\",\"choices\":[],\"usage\":{\"prompt_tokens\":7,\"total_tokens\":17,\"completion_tokens\":10}}\ndata: [DONE]", false), - integrationutils.NewResponseStreamChunk("", true), + name: "subsetting: partial match", + requests: ReqSubset("test2", modelSQLLora, modelSQLLoraTarget, "192.168.1.3:8000"), + pods: []podState{ + P(0, 0, 0.2, "foo"), + P(1, 0, 0.1, "foo", modelSQLLoraTarget), + P(2, 10, 0.2, "foo"), // Winner (Matches Subset, despite load) }, + wantResponses: ExpectRouteTo("192.168.1.3:8000", modelSQLLoraTarget, "test2"), }, - // Bodyless Request test { - name: "simple GET Request", - requests: []*extProcPb.ProcessingRequest{ - { - Request: &extProcPb.ProcessingRequest_RequestHeaders{ - RequestHeaders: &extProcPb.HttpHeaders{ - Headers: &configPb.HeaderMap{ - Headers: []*configPb.HeaderValue{ - { - Key: "content-type", - RawValue: []byte("text/event-stream"), - }, - { - Key: "status", - RawValue: []byte("200"), - }, - }, - }, - EndOfStream: true, - }, - }, - }, + name: "subsetting: no pods match", + requests: ReqSubset("test2", modelSQLLora, modelSQLLoraTarget, "192.168.1.99:8000"), + pods: []podState{ + P(0, 0, 0.2, "foo"), + P(1, 0, 0.1, "foo", modelSQLLoraTarget), }, - wantResponses: nil, - pods: newPodStates( - podState{index: 0, queueSize: 4, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar", modelSheddableTarget}}, - ), - wantMetrics: map[string]string{}, + wantResponses: ExpectReject(envoyTypePb.StatusCode_ServiceUnavailable, + "inference gateway: ServiceUnavailable - failed to find candidate pods for serving the request"), }, + + // --- Request Modification (Passthrough & Rewrite) --- { - name: "select active lora with subsetting tag, all pods available", - requests: integrationutils.GenerateStreamedRequestSet( - logger, - "test2", - modelSQLLora, - modelSQLLoraTarget, - []string{"192.168.1.1:8000", "192.168.1.2:8000", "192.168.1.3:8000"}), - // Pod 1 will be picked because it has relatively low queue size, the requested model active, low KV cache, and within subset. - pods: newPodStates( - podState{index: 0, queueSize: 0, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar"}}, - podState{index: 1, queueSize: 0, kvCacheUsage: 0.1, activeModels: []string{"foo", modelSQLLoraTarget}}, - podState{index: 2, queueSize: 10, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar"}}, + name: "passthrough: model not in objectives", + requests: integration.ReqRaw( + map[string]string{ + "hi": "mom", + metadata.ObjectiveKey: modelDirect, + metadata.ModelNameRewriteKey: modelDirect, + requtil.RequestIdHeaderKey: "test-request-id", + }, + `{"max_tokens":100,"model":"direct-`, + `model","prompt":"test6","temperature":0}`, ), - + pods: []podState{ + P(0, 4, 0.2, "foo", "bar", modelSheddableTarget), + }, + wantResponses: ExpectRouteTo("192.168.1.1:8000", modelDirect, "test6"), wantMetrics: map[string]string{ - "inference_objective_request_total": inferenceObjectiveRequestTotal([]label{ - {"model_name", modelSQLLora}, - {"target_model_name", modelSQLLoraTarget}, - }), + "inference_objective_request_total": cleanMetric(metricReqTotal(modelDirect, modelDirect)), }, - wantErr: false, - wantResponses: integrationutils.NewRequestBufferedResponse( - "192.168.1.2:8000", - fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test2","temperature":0}`, modelSQLLoraTarget), - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "hi", - RawValue: []byte("mom"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: requtil.RequestIdHeaderKey, - RawValue: []byte("test-request-id"), - }, - }, - ), }, { - name: "select active lora with subsetting tag, some pods match", - requests: integrationutils.GenerateStreamedRequestSet( - logger, - "test2", - modelSQLLora, - modelSQLLoraTarget, - []string{"192.168.1.3:8000"}), - // Pod 3 has high queue and kv cache utilization, but it will still be picked because it is the only one matching subsetting target. - pods: newPodStates( - podState{index: 0, queueSize: 0, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar"}}, - podState{index: 1, queueSize: 0, kvCacheUsage: 0.1, activeModels: []string{"foo", modelSQLLoraTarget}}, - podState{index: 2, queueSize: 10, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar"}}, - ), - + name: "rewrite request model", + requests: integration.ReqLLM(logger, "test-rewrite", modelToBeWritten, modelToBeWritten), + pods: []podState{ + P(0, 0, 0.1, "foo", modelAfterRewrite), + }, + wantResponses: ExpectRouteTo("192.168.1.1:8000", modelAfterRewrite, "test-rewrite"), wantMetrics: map[string]string{ - "inference_objective_request_total": inferenceObjectiveRequestTotal([]label{ - {"model_name", modelSQLLora}, - {"target_model_name", modelSQLLoraTarget}, - }), + "inference_objective_request_total": cleanMetric(metricReqTotal(modelToBeWritten, modelAfterRewrite)), }, - wantErr: false, - wantResponses: integrationutils.NewRequestBufferedResponse( - "192.168.1.3:8000", - fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test2","temperature":0}`, modelSQLLoraTarget), - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "hi", - RawValue: []byte("mom"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: requtil.RequestIdHeaderKey, - RawValue: []byte("test-request-id"), - }, - }, - ), }, { - name: "select active lora with subsetting tag, no pods available", - requests: integrationutils.GenerateStreamedRequestSet( - logger, - "test2", - modelSQLLora, - modelSQLLoraTarget, - []string{"192.168.1.4:8000", "192.168.1.5:8000", "192.168.1.6:8000"}), - // No pods will be picked as none are within the subset. - pods: newPodStates( - podState{index: 0, queueSize: 0, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar"}}, - podState{index: 1, queueSize: 0, kvCacheUsage: 0.1, activeModels: []string{"foo", modelSQLLoraTarget}}, - podState{index: 2, queueSize: 10, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar"}}, - ), + name: "protocol: simple GET (header only)", + requests: integration.ReqHeaderOnly(map[string]string{ + "content-type": "text/event-stream", + "status": "200", + }), + pods: []podState{P(0, 0, 0, "foo")}, + wantResponses: nil, + }, - wantMetrics: map[string]string{}, - wantErr: true, - wantResponses: []*extProcPb.ProcessingResponse{ - { - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_ServiceUnavailable, - }, - Body: []byte("inference gateway: ServiceUnavailable - failed to find candidate pods for serving the request"), - }, - }, - }, - }, + // --- Response Processing (Buffering & Streaming) --- + { + name: "response buffering: multi-chunk JSON", + requests: ReqResponseOnly( + map[string]string{"content-type": "application/json"}, + `{"max_tokens":100,"model":"sql-lo`, + `ra-sheddable","prompt":"test6","temperature":0}`, + ), + pods: []podState{P(0, 4, 0.2, modelSheddableTarget)}, + wantResponses: ExpectBufferResp( + fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test6","temperature":0}`, modelSheddable), + "application/json"), }, { - name: "no backend pods are available", - requests: []*extProcPb.ProcessingRequest{ - { - Request: &extProcPb.ProcessingRequest_RequestHeaders{ - RequestHeaders: &extProcPb.HttpHeaders{ - Headers: &configPb.HeaderMap{ - Headers: []*configPb.HeaderValue{ - { - Key: "content-type", - RawValue: []byte("text/event-stream"), - }, - { - Key: "status", - RawValue: []byte("200"), - }, - }, - }, - EndOfStream: true, - }, - }, - }, - }, - pods: nil, - wantMetrics: map[string]string{}, - wantErr: true, - wantResponses: []*extProcPb.ProcessingResponse{ - { - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_InternalServerError, - }, - Body: []byte("inference gateway: Internal - no pods available in datastore"), - }, - }, - }, - }, + name: "response buffering: invalid JSON", + requests: ReqResponseOnly( + map[string]string{"content-type": "application/json"}, + "no healthy upstream", + ), + pods: []podState{P(0, 4, 0.2, modelSheddableTarget)}, + wantResponses: ExpectBufferResp("no healthy upstream", "application/json"), }, { - name: "request don't contains invalid payload, model not exist", - requests: []*extProcPb.ProcessingRequest{ - { - Request: &extProcPb.ProcessingRequest_RequestBody{ - RequestBody: &extProcPb.HttpBody{ - Body: []byte(`{"hello":"world"}`), - EndOfStream: true}, - }, - }, - }, - wantErr: true, - wantMetrics: map[string]string{}, - wantResponses: []*extProcPb.ProcessingResponse{ - { - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_BadRequest, - }, - Body: []byte("inference gateway: BadRequest - model not found in request body"), - }, - }, - }, - }, + name: "response buffering: empty EOS chunk (JSON)", + requests: ReqResponseOnly( + map[string]string{"content-type": "application/json"}, + `{"max_tokens":100,"model":"sql-lora-sheddable","prompt":"test6","temperature":0}`, + "", + ), + pods: []podState{P(0, 4, 0.2, modelSheddableTarget)}, + wantResponses: ExpectBufferResp( + fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test6","temperature":0}`, modelSheddable), + "application/json"), }, { - name: "rewrite request model", - requests: integrationutils.GenerateStreamedRequestSet(logger, "test-rewrite", modelToBeWritten, modelToBeWritten, nil), - // Pod 0 will be picked. - // Expected flow: - // 1. Request asks for "model-to-be-rewritten" - // 2. Rewrite rule transforms "model-to-be-rewritten" -> "rewritten-model" - // 3. EPP sends request to backend with model "rewritten-model" - pods: newPodStates( - podState{index: 0, queueSize: 0, kvCacheUsage: 0.1, activeModels: []string{"foo", "rewritten-model"}}, + name: "response streaming: SSE token counting", + requests: ReqResponseOnly( + map[string]string{"content-type": "text/event-stream", "status": "200"}, + // Chunk 1: Simulate a standard data chunk. + `data: {}`, + // Chunk 2: Usage data + DONE signal. + `data: {"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}`+"\n"+`data: [DONE]`, + "", // EndOfStream + ), + pods: []podState{P(0, 4, 0.2, modelSheddableTarget)}, + waitForModel: modelSheddable, + wantResponses: ExpectStreamResp( + `data: {}`, + `data: {"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}`+"\n"+`data: [DONE]`, + "", ), + // Labels are empty because we skipped the Request phase. wantMetrics: map[string]string{ - "inference_objective_request_total": inferenceObjectiveRequestTotal([]label{ - {"model_name", modelToBeWritten}, - {"target_model_name", modelAfterRewrite}, - }), + "inference_objective_input_tokens": cleanMetric(` + # HELP inference_objective_input_tokens [ALPHA] Inference objective input token count distribution for requests in each model. + # TYPE inference_objective_input_tokens histogram + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="1"} 0 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="8"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="16"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="32"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="64"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="128"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="256"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="512"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="1024"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="2048"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="4096"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="8192"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="16384"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="32778"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="65536"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="131072"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="262144"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="524288"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="1.048576e+06"} 1 + inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="+Inf"} 1 + inference_objective_input_tokens_sum{model_name="",target_model_name=""} 7 + inference_objective_input_tokens_count{model_name="",target_model_name=""} 1 + `), }, - wantErr: false, - wantResponses: integrationutils.NewRequestBufferedResponse( - "192.168.1.1:8000", - // Note: The prompt remains "test-rewrite", but the model in the JSON body is updated to the *rewritten target* model. - fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test-rewrite","temperature":0}`, modelAfterRewrite), - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "hi", - RawValue: []byte("mom"), - }, - }, - &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: requtil.RequestIdHeaderKey, - RawValue: []byte("test-request-id"), - }, - }, - ), }, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpHermeticServer(t, test.pods) - t.Cleanup(cleanup) - responses, err := integrationutils.StreamedRequest(t, client, test.requests, len(test.wantResponses)) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Resolve Model to Sync. + modelToSync := tc.waitForModel + if modelToSync == "" { + modelToSync = modelMyModel + } + + h := NewTestHarness(t, context.Background()). + WithBaseResources(). + WithPods(tc.pods). + WaitForSync(len(tc.pods), modelToSync) - if err != nil && !test.wantErr { - t.Errorf("In test %s, unexpected error, got: %v, want error: %v", test.name, err, test.wantErr) + // Wait for metrics to settle to avoid race conditions where Datastore has pods but Scheduler/Metrics collector + // hasn't processed them yet (causing random scheduling or missing metrics). + if len(tc.pods) > 0 { + h.WaitForReadyPodsMetric(len(tc.pods)) } - if diff := cmp.Diff(test.wantResponses, responses, + + responses, err := integration.StreamedRequest(t, h.Client, tc.requests, len(tc.wantResponses)) + + require.NoError(t, err) + + if diff := cmp.Diff(tc.wantResponses, responses, protocmp.Transform(), protocmp.SortRepeated(func(a, b *configPb.HeaderValueOption) bool { return a.GetHeader().GetKey() < b.GetHeader().GetKey() }), ); diff != "" { - t.Errorf("In test %s, unexpected response, (-want +got): %v", test.name, diff) + t.Errorf("Response mismatch (-want +got): %v", diff) } - if len(test.wantMetrics) != 0 { - for metricName, value := range test.wantMetrics { - if err := metricsutils.GatherAndCompare(crmetrics.Registry, strings.NewReader(value), metricName); err != nil { - t.Error(fmt.Errorf("In test %s, %v", test.name, err)) - } - } + if len(tc.wantMetrics) > 0 { + h.ExpectMetrics(tc.wantMetrics) } - metrics.Reset() }) } } -func setUpHermeticServer(t *testing.T, podAndMetrics map[*backend.Pod]*backendmetrics.MetricsState) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { - // Reconfigure the TestPodMetricsClient. - res := map[types.NamespacedName]*backendmetrics.MetricsState{} - for pod, metrics := range podAndMetrics { - res[pod.NamespacedName] = metrics - } - serverRunner.TestPodMetricsClient.SetRes(res) - - serverCtx, stopServer := context.WithCancel(context.Background()) - - // TODO: this should be consistent with the inference pool - podLabels := map[string]string{ - "app": testPoolName, - } - - for pod := range podAndMetrics { - pod := epptestutil.MakePod(pod.PodName). - Namespace(pod.NamespacedName.Namespace). - ReadyCondition(). - Labels(podLabels). - IP(pod.GetIPAddress()). - Complete(). - ObjRef() - - copy := pod.DeepCopy() - if err := k8sClient.Create(context.Background(), copy); err != nil { - logutil.Fatal(logger, err, "Failed to create pod", "pod", pod) - } - - // since no pod controllers deployed in fake environment, we manually update pod status - copy.Status = pod.Status - if err := k8sClient.Status().Update(context.Background(), copy); err != nil { - logutil.Fatal(logger, err, "Failed to update pod status", "pod", pod) - } - } - go func() { - if err := serverRunner.AsRunnable(logger.WithName("ext-proc")).Start(serverCtx); err != nil { - logutil.Fatal(logger, err, "Failed to start ext-proc server") - } - }() - - time.Sleep(serverRunner.RefreshPrometheusMetricsInterval) // wait for metrics to get available before running tests that rely on these metrics - - // check if all pods are synced to datastore - assert.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Len(t, serverRunner.Datastore.PodList(datastore.AllPodsPredicate), len(podAndMetrics), "Datastore not synced") - }, 10*time.Second, time.Second) - - // Create a grpc connection - conn, err := grpc.NewClient(testGRPCAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - logutil.Fatal(logger, err, "Failed to connect", "address", testGRPCAddress) - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) - if err != nil { - logutil.Fatal(logger, err, "Failed to create client") - } - return client, func() { - cancel() - conn.Close() - stopServer() - - // clear created pods - for pod := range podAndMetrics { - pod := epptestutil.MakePod(pod.PodName). - Namespace(pod.NamespacedName.Namespace).Complete().ObjRef() - - if err := k8sClient.Delete(context.Background(), pod); err != nil { - logutil.Fatal(logger, err, "Failed to delete pod", "pod", fakePod) - } - } - } -} - -func fakePod(index int) *backend.Pod { - return &backend.Pod{ - NamespacedName: types.NamespacedName{Name: fmt.Sprintf("pod-%v-rank-0", index), Namespace: testNamespace}, - Address: fmt.Sprintf("192.168.1.%d", index+1), - PodName: fmt.Sprintf("pod-%v", index), - Labels: make(map[string]string, 0), - } -} - -// podState is a descriptor for a pod's simulated metrics. -type podState struct { - index int - queueSize int - kvCacheUsage float64 - activeModels []string -} - -// newPodStates generates the backend metrics map required by the test setup. -func newPodStates(states ...podState) map[*backend.Pod]*backendmetrics.MetricsState { - res := make(map[*backend.Pod]*backendmetrics.MetricsState) - for _, s := range states { - pod := fakePod(s.index) - activeModelsMap := make(map[string]int) - for _, model := range s.activeModels { - activeModelsMap[model] = 1 - } - res[pod] = &backendmetrics.MetricsState{ - WaitingQueueSize: s.queueSize, - KVCacheUsagePercent: s.kvCacheUsage, - ActiveModels: activeModelsMap, - WaitingModels: make(map[string]int), - } - } - return res -} - -// Sets up a test environment and returns the runner struct -func BeforeSuite() func() { - // Set up mock k8s API Client - testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, - ErrorIfCRDPathMissing: true, - } - cfg, err := testEnv.Start() +// loadBaseResources parses the YAML manifest once at startup. +func loadBaseResources() []*unstructured.Unstructured { + path := filepath.Join("..", "..", "testdata", "inferencepool-with-model-hermetic.yaml") + data, err := os.ReadFile(path) if err != nil { - logutil.Fatal(logger, err, "Failed to start test environment", "config", cfg) + panic(fmt.Sprintf("failed to read manifest %s: %v", path, err)) } - utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(v1alpha2.Install(scheme)) - utilruntime.Must(v1.Install(scheme)) - - k8sClient, err = k8sclient.New(cfg, k8sclient.Options{Scheme: scheme}) - if err != nil { - logutil.Fatal(logger, err, "Failed to start k8s Client") - } else if k8sClient == nil { - logutil.Fatal(logger, nil, "No error, but returned kubernetes client is nil", "config", cfg) - } - - // Init runtime. - ctrl.SetLogger(logger) - - metrics.Register() - // Register metrics handler. - // Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server. - // More info: - // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/server - // - https://book.kubebuilder.io/reference/metrics.html - metricsServerOptions := metricsserver.Options{ - BindAddress: fmt.Sprintf(":%d", testMetricsPort), - FilterProvider: filters.WithAuthenticationAndAuthorization, - } - mgr, err := server.NewManagerWithOptions(cfg, managerTestOptions(testNamespace, testPoolName, metricsServerOptions)) - if err != nil { - logutil.Fatal(logger, err, "Failed to create controller manager") - } - - serverRunner = server.NewDefaultExtProcServerRunner() - serverRunner.TestPodMetricsClient = &backendmetrics.FakePodMetricsClient{} - pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond) - // Adjust from defaults - serverRunner.GKNN = common.GKNN{ - NamespacedName: types.NamespacedName{Namespace: testNamespace, Name: testPoolName}, - GroupKind: schema.GroupKind{Group: v1.GroupVersion.Group, Kind: "InferencePool"}, - } - - serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf, 0) - - kvCacheUtilizationScorer := scorer.NewKVCacheUtilizationScorer() - queueingScorer := scorer.NewQueueScorer() - prefixCacheScorer := prefix.New(context.Background(), prefix.DefaultConfig) - loraAffinityScorer := scorer.NewLoraAffinityScorer() - - defaultProfile := framework.NewSchedulerProfile(). - WithScorers(framework.NewWeightedScorer(kvCacheUtilizationScorer, 1), - framework.NewWeightedScorer(queueingScorer, 1), - framework.NewWeightedScorer(prefixCacheScorer, 1), - framework.NewWeightedScorer(loraAffinityScorer, 1), - ). - WithPicker(picker.NewMaxScorePicker(picker.DefaultMaxNumOfEndpoints)) - - profileHandler := profile.NewSingleProfileHandler() - - schedulerConfig := scheduling.NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}) - scheduler := scheduling.NewSchedulerWithConfig(schedulerConfig) - - sdConfig := &utilizationdetector.Config{ - QueueDepthThreshold: utilizationdetector.DefaultQueueDepthThreshold, - KVCacheUtilThreshold: utilizationdetector.DefaultKVCacheUtilThreshold, - MetricsStalenessThreshold: utilizationdetector.DefaultMetricsStalenessThreshold, - } - detector := utilizationdetector.NewDetector(sdConfig, logger.WithName("saturation-detector")) - serverRunner.SaturationDetector = detector - locator := requestcontrol.NewDatastorePodLocator(serverRunner.Datastore) - admissionController := requestcontrol.NewLegacyAdmissionController(detector, locator) - serverRunner.Director = requestcontrol.NewDirectorWithConfig( - serverRunner.Datastore, - scheduler, - admissionController, - locator, - requestcontrol.NewConfig(), - ) - serverRunner.SecureServing = false - - if err := serverRunner.SetupWithManager(context.Background(), mgr); err != nil { - logutil.Fatal(logger, err, "Failed to setup server runner") - } - - // Start the controller manager in a go routine, not blocking - go func() { - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - logutil.Fatal(logger, err, "Failed to start manager") - } - }() - - logger.Info("Setting up hermetic ExtProc server") - - // Unmarshal CRDs from file into structs - manifestsPath := filepath.Join("..", "..", "testdata", "inferencepool-with-model-hermetic.yaml") - docs, err := readDocuments(manifestsPath) - if err != nil { - logutil.Fatal(logger, err, "Can't read object manifests", "path", manifestsPath) - } - - for _, doc := range docs { - obj := &unstructured.Unstructured{} - if err = yaml.Unmarshal(doc, obj); err != nil { - logutil.Fatal(logger, err, "Can't unmarshal object", "document", doc) - } - logger.Info("Creating object", "kind", obj.GetKind(), "object", obj) - if err := k8sClient.Create(context.Background(), obj); err != nil { - logutil.Fatal(logger, err, "Unable to create object", "object", obj.GetName()) - } - } - - assert.Eventually(nil, func() bool { - modelExist := serverRunner.Datastore.ObjectiveGet(modelMyModel) - synced := serverRunner.Datastore.PoolHasSynced() && modelExist != nil - return synced - }, 10*time.Second, 10*time.Millisecond) - - return func() { - _ = testEnv.Stop() - _ = k8sClient.DeleteAllOf(context.Background(), &v1.InferencePool{}) - _ = k8sClient.DeleteAllOf(context.Background(), &v1alpha2.InferenceObjective{}) - _ = k8sClient.DeleteAllOf(context.Background(), &v1alpha2.InferenceModelRewrite{}) - } -} - -// readDocuments reads documents from file. -func readDocuments(fp string) ([][]byte, error) { - b, err := os.ReadFile(fp) - if err != nil { - return nil, err - } - - docs := [][]byte{} - reader := k8syaml.NewYAMLReader(bufio.NewReader(bytes.NewReader(b))) + var objs []*unstructured.Unstructured + decoder := yaml.NewYAMLOrJSONDecoder(strings.NewReader(string(data)), 4096) for { - // Read document - doc, err := reader.Read() - if err != nil { - if errors.Is(err, io.EOF) { + u := &unstructured.Unstructured{} + if err := decoder.Decode(u); err != nil { + if err.Error() == "EOF" { break } - return nil, err + panic(fmt.Sprintf("failed to decode YAML: %v", err)) } - docs = append(docs, doc) + objs = append(objs, u) } - return docs, nil + return objs } -// inject options that allow multiple test runs to run -// https://github.com/kubernetes-sigs/controller-runtime/issues/2937 -func managerTestOptions(namespace, name string, metricsServerOptions metricsserver.Options) ctrl.Options { - return ctrl.Options{ - Scheme: scheme, - Cache: cache.Options{ - ByObject: map[k8sclient.Object]cache.ByObject{ - &corev1.Pod{}: { - Namespaces: map[string]cache.Config{ - namespace: {}, - }, - }, - &v1.InferencePool{}: { - Namespaces: map[string]cache.Config{ - namespace: { - FieldSelector: fields.SelectorFromSet(fields.Set{ - "metadata.name": name, - }), - }, - }, - }, - &v1alpha2.InferenceObjective{}: { - Namespaces: map[string]cache.Config{ - namespace: {}, - }, - }, - &v1alpha2.InferenceModelRewrite{}: { - Namespaces: map[string]cache.Config{ - namespace: {}, - }, - }, - }, - }, - Controller: crconfig.Controller{ - SkipNameValidation: boolPointer(true), - }, - Metrics: metricsServerOptions, +// cleanMetric removes indentation from multiline metric strings and ensures a trailing newline exists, which is +// required by the Prometheus text parser. +func cleanMetric(s string) string { + lines := strings.Split(s, "\n") + var cleaned []string + for _, l := range lines { + trimmed := strings.TrimSpace(l) + if trimmed != "" { + cleaned = append(cleaned, trimmed) + } } -} - -func boolPointer(b bool) *bool { - return &b + return strings.Join(cleaned, "\n") + "\n" } diff --git a/test/integration/epp/util_test.go b/test/integration/epp/util_test.go index 4baee930d..e22ade483 100644 --- a/test/integration/epp/util_test.go +++ b/test/integration/epp/util_test.go @@ -1,6 +1,17 @@ /* Copyright 2025 The Kubernetes Authors. -Licensed under the Apache License, Version 2.0. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ package epp From 6e1742eff738214bcae637747296710189ea7250 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Sun, 21 Dec 2025 22:23:35 +0000 Subject: [PATCH 4/4] resolve conflicts from #1976 --- test/integration/epp/harness_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/integration/epp/harness_test.go b/test/integration/epp/harness_test.go index f91377a32..9888ed35a 100644 --- a/test/integration/epp/harness_test.go +++ b/test/integration/epp/harness_test.go @@ -45,7 +45,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector/framework/plugins/utilizationdetector" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix" @@ -144,12 +144,12 @@ func NewTestHarness(t *testing.T, ctx context.Context) *TestHarness { profileHandler := profile.NewSingleProfileHandler() schedulerConfig := scheduling.NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}) - sdConfig := &saturationdetector.Config{ - QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold, - KVCacheUtilThreshold: saturationdetector.DefaultKVCacheUtilThreshold, - MetricsStalenessThreshold: saturationdetector.DefaultMetricsStalenessThreshold, + sdConfig := &utilizationdetector.Config{ + QueueDepthThreshold: utilizationdetector.DefaultQueueDepthThreshold, + KVCacheUtilThreshold: utilizationdetector.DefaultKVCacheUtilThreshold, + MetricsStalenessThreshold: utilizationdetector.DefaultMetricsStalenessThreshold, } - runner.SaturationDetector = saturationdetector.NewDetector(sdConfig, logger.WithName("sd")) + runner.SaturationDetector = utilizationdetector.NewDetector(sdConfig, logger.WithName("sd")) locator := requestcontrol.NewDatastorePodLocator(runner.Datastore) runner.Director = requestcontrol.NewDirectorWithConfig( runner.Datastore,