|
| 1 | +/* |
| 2 | +Copyright 2025 The Kubernetes Authors. |
| 3 | +
|
| 4 | +Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +you may not use this file except in compliance with the License. |
| 6 | +You may obtain a copy of the License at |
| 7 | +
|
| 8 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +
|
| 10 | +Unless required by applicable law or agreed to in writing, software |
| 11 | +distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +See the License for the specific language governing permissions and |
| 14 | +limitations under the License. |
| 15 | +*/ |
| 16 | + |
| 17 | +package epp |
| 18 | + |
| 19 | +import ( |
| 20 | + "context" |
| 21 | + "fmt" |
| 22 | + "strings" |
| 23 | + "testing" |
| 24 | + "time" |
| 25 | + |
| 26 | + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" |
| 27 | + "github.com/google/uuid" |
| 28 | + "github.com/stretchr/testify/assert" |
| 29 | + "github.com/stretchr/testify/require" |
| 30 | + "google.golang.org/grpc" |
| 31 | + corev1 "k8s.io/api/core/v1" |
| 32 | + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| 33 | + "k8s.io/apimachinery/pkg/runtime/schema" |
| 34 | + "k8s.io/apimachinery/pkg/types" |
| 35 | + metricsutils "k8s.io/component-base/metrics/testutil" |
| 36 | + ctrl "sigs.k8s.io/controller-runtime" |
| 37 | + "sigs.k8s.io/controller-runtime/pkg/cache" |
| 38 | + crconfig "sigs.k8s.io/controller-runtime/pkg/config" |
| 39 | + crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" |
| 40 | + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" |
| 41 | + |
| 42 | + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" |
| 43 | + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" |
| 44 | + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" |
| 45 | + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" |
| 46 | + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" |
| 47 | + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" |
| 48 | + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" |
| 49 | + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" |
| 50 | + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" |
| 51 | + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix" |
| 52 | + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker" |
| 53 | + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile" |
| 54 | + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer" |
| 55 | + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" |
| 56 | + epptestutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" |
| 57 | + "sigs.k8s.io/gateway-api-inference-extension/test/integration" |
| 58 | +) |
| 59 | + |
| 60 | +// TestHarness encapsulates the environment for a single isolated EPP test run. |
| 61 | +// It manages the lifecycle of the controller manager, the EPP server, and the K8s namespace. |
| 62 | +type TestHarness struct { |
| 63 | + t *testing.T |
| 64 | + ctx context.Context |
| 65 | + Namespace string |
| 66 | + |
| 67 | + Mgr ctrl.Manager |
| 68 | + ServerRunner *server.ExtProcServerRunner |
| 69 | + Client extProcPb.ExternalProcessor_ProcessClient |
| 70 | + Datastore datastore.Datastore |
| 71 | + |
| 72 | + // Internal handles for cleanup |
| 73 | + grpcConn *grpc.ClientConn |
| 74 | +} |
| 75 | + |
| 76 | +// NewTestHarness boots up a fully isolated test environment. |
| 77 | +// It creates a unique Namespace, scopes the Manager to that Namespace, and starts the components. |
| 78 | +// Note: EPP tests must run serially because they rely on the global Prometheus registry. |
| 79 | +func NewTestHarness(t *testing.T, ctx context.Context) *TestHarness { |
| 80 | + t.Helper() |
| 81 | + |
| 82 | + // 1. Identity & Namespace Isolation |
| 83 | + // We use a unique UUID to ensure that resources from this test do not collide with others. |
| 84 | + uid := uuid.New().String()[:8] |
| 85 | + nsName := "epp-test-" + uid |
| 86 | + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: nsName}} |
| 87 | + require.NoError(t, k8sClient.Create(ctx, ns), "failed to create test namespace") |
| 88 | + |
| 89 | + // 2. Free Port Allocation |
| 90 | + grpcPort, err := integration.GetFreePort() |
| 91 | + require.NoError(t, err, "failed to acquire free port") |
| 92 | + |
| 93 | + // 3. Manager Scoped to Namespace |
| 94 | + // Critical: We restrict the Manager's cache to the test namespace to avoid processing objects from other tests or |
| 95 | + // previous runs. |
| 96 | + skipValidation := true |
| 97 | + mgrOpts := ctrl.Options{ |
| 98 | + Scheme: testScheme, |
| 99 | + Cache: cache.Options{ |
| 100 | + DefaultNamespaces: map[string]cache.Config{ |
| 101 | + nsName: {}, // Implicitly filters all watches to this NS. |
| 102 | + }, |
| 103 | + }, |
| 104 | + Controller: crconfig.Controller{ |
| 105 | + SkipNameValidation: &skipValidation, |
| 106 | + }, |
| 107 | + Metrics: metricsserver.Options{ |
| 108 | + BindAddress: "0", // Disable metrics server binding or use ephemeral to avoid port conflicts. |
| 109 | + }, |
| 110 | + HealthProbeBindAddress: "0", |
| 111 | + LeaderElection: false, |
| 112 | + } |
| 113 | + mgr, err := ctrl.NewManager(testEnv.Config, mgrOpts) |
| 114 | + require.NoError(t, err, "failed to create manager") |
| 115 | + |
| 116 | + // 4. EPP Server Configuration |
| 117 | + runner := server.NewDefaultExtProcServerRunner() |
| 118 | + // Overwrite default fields with test-specific configuration. |
| 119 | + runner.GKNN = common.GKNN{ |
| 120 | + NamespacedName: types.NamespacedName{Namespace: nsName, Name: testPoolName}, |
| 121 | + GroupKind: schema.GroupKind{Group: v1.GroupVersion.Group, Kind: "InferencePool"}, |
| 122 | + } |
| 123 | + runner.GrpcPort = grpcPort |
| 124 | + runner.SecureServing = false |
| 125 | + runner.HealthChecking = false |
| 126 | + runner.TestPodMetricsClient = &backendmetrics.FakePodMetricsClient{} |
| 127 | + runner.RefreshPrometheusMetricsInterval = 50 * time.Millisecond |
| 128 | + runner.MetricsStalenessThreshold = 2 * time.Second |
| 129 | + |
| 130 | + // 5. Dependency Injection (Scheduler, Scorers, Datastore) |
| 131 | + pmf := backendmetrics.NewPodMetricsFactory(runner.TestPodMetricsClient, 10*time.Millisecond) |
| 132 | + // We disable periodic resync (0) to ensure deterministic test behavior. |
| 133 | + runner.Datastore = datastore.NewDatastore(ctx, pmf, 0) |
| 134 | + |
| 135 | + defaultProfile := framework.NewSchedulerProfile(). |
| 136 | + WithScorers( |
| 137 | + framework.NewWeightedScorer(scorer.NewKVCacheUtilizationScorer(), 1), |
| 138 | + framework.NewWeightedScorer(scorer.NewQueueScorer(), 1), |
| 139 | + framework.NewWeightedScorer(prefix.New(ctx, prefix.DefaultConfig), 1), |
| 140 | + framework.NewWeightedScorer(scorer.NewLoraAffinityScorer(), 1), |
| 141 | + ). |
| 142 | + WithPicker(picker.NewMaxScorePicker(picker.DefaultMaxNumOfEndpoints)) |
| 143 | + |
| 144 | + profileHandler := profile.NewSingleProfileHandler() |
| 145 | + schedulerConfig := scheduling.NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}) |
| 146 | + |
| 147 | + sdConfig := &saturationdetector.Config{ |
| 148 | + QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold, |
| 149 | + KVCacheUtilThreshold: saturationdetector.DefaultKVCacheUtilThreshold, |
| 150 | + MetricsStalenessThreshold: saturationdetector.DefaultMetricsStalenessThreshold, |
| 151 | + } |
| 152 | + runner.SaturationDetector = saturationdetector.NewDetector(sdConfig, logger.WithName("sd")) |
| 153 | + locator := requestcontrol.NewDatastorePodLocator(runner.Datastore) |
| 154 | + runner.Director = requestcontrol.NewDirectorWithConfig( |
| 155 | + runner.Datastore, |
| 156 | + scheduling.NewSchedulerWithConfig(schedulerConfig), |
| 157 | + requestcontrol.NewLegacyAdmissionController(runner.SaturationDetector, locator), |
| 158 | + locator, |
| 159 | + requestcontrol.NewConfig(), |
| 160 | + ) |
| 161 | + |
| 162 | + require.NoError(t, runner.SetupWithManager(ctx, mgr), "failed to setup server runner") |
| 163 | + |
| 164 | + // 6. Start Background Processes |
| 165 | + mgrCtx, mgrCancel := context.WithCancel(ctx) |
| 166 | + |
| 167 | + // Start Manager. |
| 168 | + go func() { |
| 169 | + if err := mgr.Start(mgrCtx); err != nil { |
| 170 | + // Context cancellation is expected during teardown. |
| 171 | + if !strings.Contains(err.Error(), "context canceled") { |
| 172 | + logger.Error(err, "manager stopped unexpectedly") |
| 173 | + } |
| 174 | + } |
| 175 | + }() |
| 176 | + |
| 177 | + // Start ExtProc server. |
| 178 | + serverCtx, serverCancel := context.WithCancel(ctx) |
| 179 | + runnable := runner.AsRunnable(logger.WithName("server")).Start |
| 180 | + |
| 181 | + client, conn := integration.StartExtProcServer( |
| 182 | + t, |
| 183 | + serverCtx, |
| 184 | + runnable, |
| 185 | + grpcPort, |
| 186 | + logger, |
| 187 | + ) |
| 188 | + |
| 189 | + h := &TestHarness{ |
| 190 | + t: t, |
| 191 | + ctx: ctx, |
| 192 | + Namespace: nsName, |
| 193 | + Mgr: mgr, |
| 194 | + ServerRunner: runner, |
| 195 | + Client: client, |
| 196 | + Datastore: runner.Datastore, |
| 197 | + grpcConn: conn, |
| 198 | + } |
| 199 | + |
| 200 | + // 7. Register Cleanup |
| 201 | + t.Cleanup(func() { |
| 202 | + serverCancel() |
| 203 | + mgrCancel() |
| 204 | + _ = h.grpcConn.Close() |
| 205 | + // Deleting the Namespace cascades to all contained resources. |
| 206 | + _ = k8sClient.Delete(context.Background(), ns) |
| 207 | + // Crucial: Reset global metrics registry to prevent pollution between serial tests. |
| 208 | + metrics.Reset() |
| 209 | + }) |
| 210 | + |
| 211 | + return h |
| 212 | +} |
| 213 | + |
| 214 | +// --- Fluent Builder API --- |
| 215 | + |
| 216 | +// WithBaseResources injects the standard pool and objective definitions into the test namespace. |
| 217 | +// These resources are pre-parsed in TestMain to avoid I/O overhead in the loop. |
| 218 | +func (h *TestHarness) WithBaseResources() *TestHarness { |
| 219 | + h.t.Helper() |
| 220 | + for _, obj := range baseResources { |
| 221 | + copy := obj.DeepCopy() |
| 222 | + copy.SetNamespace(h.Namespace) |
| 223 | + require.NoError(h.t, k8sClient.Create(h.ctx, copy), "failed to create base resource: %s", obj.GetKind()) |
| 224 | + } |
| 225 | + return h |
| 226 | +} |
| 227 | + |
| 228 | +// WithPods creates pod objects in the API server and configures the fake metrics client. |
| 229 | +func (h *TestHarness) WithPods(pods []podState) *TestHarness { |
| 230 | + h.t.Helper() |
| 231 | + metricsMap := make(map[types.NamespacedName]*backendmetrics.MetricsState) |
| 232 | + |
| 233 | + // Pre-calculate metrics and register them with the fake client. |
| 234 | + for _, p := range pods { |
| 235 | + metricsKeyName := fmt.Sprintf("pod-%d-rank-0", p.index) |
| 236 | + activeModelsMap := make(map[string]int) |
| 237 | + for _, m := range p.activeModels { |
| 238 | + activeModelsMap[m] = 1 |
| 239 | + } |
| 240 | + |
| 241 | + metricsMap[types.NamespacedName{Namespace: h.Namespace, Name: metricsKeyName}] = &backendmetrics.MetricsState{ |
| 242 | + WaitingQueueSize: p.queueSize, |
| 243 | + KVCacheUsagePercent: p.kvCacheUsage, |
| 244 | + ActiveModels: activeModelsMap, |
| 245 | + WaitingModels: make(map[string]int), |
| 246 | + } |
| 247 | + } |
| 248 | + h.ServerRunner.TestPodMetricsClient.SetRes(metricsMap) |
| 249 | + |
| 250 | + // Create K8s Objects. |
| 251 | + for _, p := range pods { |
| 252 | + name := fmt.Sprintf("pod-%d", p.index) |
| 253 | + |
| 254 | + // Create K8s object. |
| 255 | + pod := epptestutil.MakePod(name). |
| 256 | + Namespace(h.Namespace). |
| 257 | + ReadyCondition(). // Sets Status.Conditions. |
| 258 | + Labels(map[string]string{"app": testPoolName}). |
| 259 | + IP(fmt.Sprintf("192.168.1.%d", p.index+1)). |
| 260 | + Complete(). |
| 261 | + ObjRef() |
| 262 | + |
| 263 | + // Snapshot the status (Create wipes it). |
| 264 | + intendedStatus := pod.Status |
| 265 | + |
| 266 | + // Create the resource. |
| 267 | + require.NoError(h.t, k8sClient.Create(h.ctx, pod), "failed to create pod %s", name) |
| 268 | + |
| 269 | + // Restore Status on the created K8s object which now has the correct ResourceVersion/UID. |
| 270 | + pod.Status = intendedStatus |
| 271 | + |
| 272 | + // Update Status subresource. |
| 273 | + require.NoError(h.t, k8sClient.Status().Update(h.ctx, pod), "failed to update status for pod %s", name) |
| 274 | + } |
| 275 | + |
| 276 | + return h |
| 277 | +} |
| 278 | + |
| 279 | +// WaitForReadyPodsMetric blocks until the prometheus metric 'inference_pool_ready_pods' matches the expected count. |
| 280 | +// This ensures the background metric collector has fully synced. |
| 281 | +func (h *TestHarness) WaitForReadyPodsMetric(expectedCount int) { |
| 282 | + h.t.Helper() |
| 283 | + |
| 284 | + expected := cleanMetric(metricReadyPods(expectedCount)) |
| 285 | + require.Eventually(h.t, func() bool { |
| 286 | + err := metricsutils.GatherAndCompare(crmetrics.Registry, strings.NewReader(expected), |
| 287 | + "inference_pool_ready_pods") |
| 288 | + return err == nil |
| 289 | + }, 10*time.Second, 50*time.Millisecond, "Timed out waiting for inference_pool_ready_pods metric to settle") |
| 290 | +} |
| 291 | + |
| 292 | +// WaitForSync blocks until the EPP Datastore has synced the expected number of pods and, optionally, a specific model |
| 293 | +// objective. |
| 294 | +func (h *TestHarness) WaitForSync(expectedPods int, checkModelObjective string) *TestHarness { |
| 295 | + h.t.Helper() |
| 296 | + require.Eventually(h.t, func() bool { |
| 297 | + if !h.Datastore.PoolHasSynced() { |
| 298 | + return false |
| 299 | + } |
| 300 | + if len(h.Datastore.PodList(datastore.AllPodsPredicate)) != expectedPods { |
| 301 | + return false |
| 302 | + } |
| 303 | + if checkModelObjective != "" && h.Datastore.ObjectiveGet(checkModelObjective) == nil { |
| 304 | + return false |
| 305 | + } |
| 306 | + return true |
| 307 | + }, 10*time.Second, 50*time.Millisecond, |
| 308 | + "Datastore sync timed out.\n- PoolSynced: %v\n- Pods Found: %d (Expected: %d)\n- Objective '%s' Found: %v", |
| 309 | + h.Datastore.PoolHasSynced(), |
| 310 | + len(h.Datastore.PodList(datastore.AllPodsPredicate)), |
| 311 | + expectedPods, |
| 312 | + checkModelObjective, |
| 313 | + h.Datastore.ObjectiveGet(checkModelObjective) != nil, |
| 314 | + ) |
| 315 | + return h |
| 316 | +} |
| 317 | + |
| 318 | +// ExpectMetrics asserts that specific metrics match the expected Prometheus output. |
| 319 | +// It uses Eventually to allow for slight delays in metric recording (e.g. async token counting). |
| 320 | +func (h *TestHarness) ExpectMetrics(expected map[string]string) { |
| 321 | + h.t.Helper() |
| 322 | + for name, value := range expected { |
| 323 | + var err error |
| 324 | + assert.Eventually(h.t, func() bool { |
| 325 | + err = metricsutils.GatherAndCompare(crmetrics.Registry, strings.NewReader(value), name) |
| 326 | + return err == nil |
| 327 | + }, 2*time.Second, 50*time.Millisecond, "Timed out waiting for metric %s to match: %v", name) |
| 328 | + if err != nil { |
| 329 | + h.t.Errorf("Metric mismatch for %s: %v", name, err) |
| 330 | + } |
| 331 | + } |
| 332 | +} |
0 commit comments