diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 339ceabf3..6dc037d9c 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -315,7 +315,7 @@ func (r *Runner) Run(ctx context.Context) error { return fmt.Errorf("failed to initialize Flow Controller: %w", err) } go registry.Run(ctx) - admissionController = requestcontrol.NewFlowControlAdmissionController(fc) + admissionController = requestcontrol.NewFlowControlAdmissionController(fc, opts.PoolName) } else { setupLog.Info("Experimental Flow Control layer is disabled, using legacy admission control") admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector, locator) diff --git a/pkg/epp/flowcontrol/controller/controller.go b/pkg/epp/flowcontrol/controller/controller.go index 29eda4ec5..8ab5027b3 100644 --- a/pkg/epp/flowcontrol/controller/controller.go +++ b/pkg/epp/flowcontrol/controller/controller.go @@ -212,10 +212,15 @@ func (fc *FlowController) EnqueueAndWait( req types.FlowControlRequest, ) (types.QueueOutcome, error) { flowKey := req.FlowKey() - fairnessID := flowKey.ID priority := strconv.Itoa(flowKey.Priority) - metrics.IncFlowControlQueueSize(fairnessID, priority) - defer metrics.DecFlowControlQueueSize(fairnessID, priority) + metrics.IncFlowControlQueueSize( + flowKey.ID, priority, + req.InferencePoolName(), + req.ModelName(), req.TargetModelName()) + defer metrics.DecFlowControlQueueSize( + flowKey.ID, priority, + req.InferencePoolName(), + req.ModelName(), req.TargetModelName()) // 1. Create the derived context that governs this request's lifecycle (Parent Cancellation + TTL). reqCtx, cancel, enqueueTime := fc.createRequestContext(ctx, req) diff --git a/pkg/epp/flowcontrol/controller/internal/item.go b/pkg/epp/flowcontrol/controller/internal/item.go index f0d5d3286..25961abdd 100644 --- a/pkg/epp/flowcontrol/controller/internal/item.go +++ b/pkg/epp/flowcontrol/controller/internal/item.go @@ -158,7 +158,11 @@ func (fi *FlowItem) finalizeInternal(outcome types.QueueOutcome, err error) { duration := time.Since(fi.enqueueTime) flowKey := fi.originalRequest.FlowKey() - metrics.RecordFlowControlRequestQueueDuration(flowKey.ID, strconv.Itoa(flowKey.Priority), outcome.String(), duration) + metrics.RecordFlowControlRequestQueueDuration( + flowKey.ID, strconv.Itoa(flowKey.Priority), outcome.String(), + fi.originalRequest.InferencePoolName(), + fi.OriginalRequest().ModelName(), fi.OriginalRequest().TargetModelName(), + duration) fi.done <- finalState close(fi.done) diff --git a/pkg/epp/flowcontrol/types/mocks/mocks.go b/pkg/epp/flowcontrol/types/mocks/mocks.go index 2e12b0449..e3e0ac2a0 100644 --- a/pkg/epp/flowcontrol/types/mocks/mocks.go +++ b/pkg/epp/flowcontrol/types/mocks/mocks.go @@ -31,20 +31,54 @@ type MockFlowControlRequest struct { InitialEffectiveTTLV time.Duration IDV string MetadataV map[string]any + InferencePoolNameV string + ModelNameV string + TargetModelNameV string } -// NewMockFlowControlRequest creates a new MockFlowControlRequest instance. +// MockRequestOption is a functional option for configuring a MockFlowControlRequest. +type MockRequestOption func(*MockFlowControlRequest) + +// WithInferencePoolName sets the InferencePoolName for the mock request. +func WithInferencePoolName(name string) MockRequestOption { + return func(m *MockFlowControlRequest) { + m.InferencePoolNameV = name + } +} + +// WithModelName sets the ModelName for the mock request. +func WithModelName(name string) MockRequestOption { + return func(m *MockFlowControlRequest) { + m.ModelNameV = name + } +} + +// WithTargetModelName sets the TargetModelName for the mock request. +func WithTargetModelName(name string) MockRequestOption { + return func(m *MockFlowControlRequest) { + m.TargetModelNameV = name + } +} + +// NewMockFlowControlRequest creates a new MockFlowControlRequest instance with optional configuration. func NewMockFlowControlRequest( byteSize uint64, id string, key types.FlowKey, + opts ...MockRequestOption, ) *MockFlowControlRequest { - return &MockFlowControlRequest{ + m := &MockFlowControlRequest{ ByteSizeV: byteSize, IDV: id, FlowKeyV: key, MetadataV: make(map[string]any), } + + for _, opt := range opts { + opt(m) + } + + return m } func (m *MockFlowControlRequest) FlowKey() types.FlowKey { return m.FlowKeyV } @@ -52,6 +86,9 @@ func (m *MockFlowControlRequest) ByteSize() uint64 { return m. func (m *MockFlowControlRequest) InitialEffectiveTTL() time.Duration { return m.InitialEffectiveTTLV } func (m *MockFlowControlRequest) ID() string { return m.IDV } func (m *MockFlowControlRequest) GetMetadata() map[string]any { return m.MetadataV } +func (m *MockFlowControlRequest) InferencePoolName() string { return m.InferencePoolNameV } +func (m *MockFlowControlRequest) ModelName() string { return m.ModelNameV } +func (m *MockFlowControlRequest) TargetModelName() string { return m.TargetModelNameV } var _ types.FlowControlRequest = &MockFlowControlRequest{} @@ -92,13 +129,20 @@ var _ types.QueueItemAccessor = &MockQueueItemAccessor{} // NewMockQueueItemAccessor is a constructor for `MockQueueItemAccessor` that initializes the mock with a default // `MockFlowControlRequest` and `MockQueueItemHandle` to prevent nil pointer dereferences in tests. -func NewMockQueueItemAccessor(byteSize uint64, reqID string, key types.FlowKey) *MockQueueItemAccessor { +// It accepts MockRequestOptions to configure the underlying request. +func NewMockQueueItemAccessor( + byteSize uint64, + reqID string, + key types.FlowKey, + opts ...MockRequestOption, +) *MockQueueItemAccessor { return &MockQueueItemAccessor{ EnqueueTimeV: time.Now(), OriginalRequestV: NewMockFlowControlRequest( byteSize, reqID, key, + opts..., ), HandleV: &MockQueueItemHandle{}, } diff --git a/pkg/epp/flowcontrol/types/request.go b/pkg/epp/flowcontrol/types/request.go index 61ac32049..a095e5b64 100644 --- a/pkg/epp/flowcontrol/types/request.go +++ b/pkg/epp/flowcontrol/types/request.go @@ -52,6 +52,18 @@ type FlowControlRequest interface { // This data is passed transparently to components like the contracts.PodLocator to resolve resources (candidate pods) // lazily during the dispatch cycle. GetMetadata() map[string]any + + // --- Passthrough for Observability --- + + // InferencePoolName returns the name of the backend pool this request is targeting. + // This is used for observability (metrics labeling) to correlate queue depth with specific backend pools. + InferencePoolName() string + + // ModelName returns the name of the base model being requested (e.g., "llama-2-70b"). + ModelName() string + + // TargetModelName returns the name of the specific adapter or traffic target (e.g., "finance-lora-v1"). + TargetModelName() string } // QueueItemHandle is an opaque handle to an item that has been successfully added to a `framework.SafeQueue`. It acts diff --git a/pkg/epp/metrics/metrics.go b/pkg/epp/metrics/metrics.go index fdb806f93..d252f61f0 100644 --- a/pkg/epp/metrics/metrics.go +++ b/pkg/epp/metrics/metrics.go @@ -31,10 +31,12 @@ import ( ) const ( + // --- Subsystems --- InferenceObjectiveComponent = "inference_objective" InferencePoolComponent = "inference_pool" InferenceExtension = "inference_extension" + // --- Internal Keys (for Legacy/Gauge Usage) --- KVCacheUsagePercentKey = "KVCacheUsagePercent" WaitingQueueSizeKey = "WaitingQueueSize" RunningRequestsSizeKey = "RunningRequestsSize" @@ -42,17 +44,58 @@ const ( ActiveModelsKey = "ActiveModels" WaitingModelsKey = "WaitingModels" UpdateTimeKey = "UpdateTime" + + // Metric Type Values + TypeTPOT = "tpot" + TypePredictedTPOT = "predicted_tpot" + TypeTPOTPredictionDuration = "tpot_prediction_duration" + TypeTPOTSLOViolation = "tpot_slo_violation" + TypeTPOTSLOThreshold = "tpot_slo_threshold" + + TypeTTFT = "ttft" + TypePredictedTTFT = "predicted_ttft" + TypeTTFTPredictionDuration = "ttft_prediction_duration" + TypeTTFTSLOViolation = "ttft_slo_violation" + TypeTTFTSLOThreshold = "ttft_slo_threshold" +) + +var ( + // --- Common Label Sets --- + ModelLabels = []string{"model_name", "target_model_name"} + ModelTypeLabels = []string{"model_name", "target_model_name", "type"} + PoolLabels = []string{"name"} + + // --- Common Buckets --- + + // GeneralLatencyBuckets for long running inference from 5ms to 1 hour + GeneralLatencyBuckets = []float64{ + 0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3, 4, 5, 6, + 8, 10, 15, 20, 30, 45, 60, 120, 180, 240, 300, 360, 480, 600, 900, 1200, + 1800, 2700, 3600, + } + + // TPOTBuckets for time-per-output-token (usually milliseconds to seconds) + TPOTBuckets = []float64{ + 0.0005, 0.00205, 0.005, 0.01, 0.02, 0.04, 0.06, 0.08, 0.1, 0.125, 0.15, 0.2, + 0.3, 0.4, 0.5, 0.6, 0.8, 1, 1.5, 2, 3, 4.5, 6, 12, 18, 24, 30, 36, 48, 60, + 90, 120, 180, 270, 360, + } + + // PredictionLatencyBuckets for internal latency (predictions) from 100us to 5s + PredictionLatencyBuckets = []float64{ + 0.0001, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, + } ) +// --- Inference Objective Metrics --- var ( - // Inference Objective Metrics requestCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Subsystem: InferenceObjectiveComponent, Name: "request_total", Help: metricsutil.HelpMsgWithStability("Counter of inference objective requests broken out for each model and target model.", compbasemetrics.ALPHA), }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) requestErrCounter = prometheus.NewCounterVec( @@ -61,17 +104,16 @@ var ( Name: "request_error_total", Help: metricsutil.HelpMsgWithStability("Counter of inference objective requests errors broken out for each model and target model.", compbasemetrics.ALPHA), }, - []string{"model_name", "target_model_name", "error_code"}, + append(ModelLabels, "error_code"), ) - // Gauge for various inference request metrics inferenceGauges = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Subsystem: InferenceObjectiveComponent, Name: "inference_request_metric", Help: metricsutil.HelpMsgWithStability("Consolidated gauge for various inference request metrics including TTFT, TPOT, SLOs, and prediction durations.", compbasemetrics.ALPHA), }, - []string{"model_name", "target_model_name", "type"}, + ModelTypeLabels, ) requestTTFT = prometheus.NewHistogramVec( @@ -79,12 +121,9 @@ var ( Subsystem: InferenceObjectiveComponent, Name: "request_ttft_seconds", Help: metricsutil.HelpMsgWithStability("Inference model TTFT distribution in seconds for each model and target model.", compbasemetrics.ALPHA), - Buckets: []float64{ - 0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3, - 4, 5, 6, 8, 10, 15, 20, 30, 45, 60, 120, 180, 240, 300, 360, 480, 600, 900, 1200, 1800, 2700, 3600, - }, + Buckets: GeneralLatencyBuckets, }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) requestPredictedTTFT = prometheus.NewHistogramVec( @@ -92,25 +131,19 @@ var ( Subsystem: InferenceObjectiveComponent, Name: "request_predicted_ttft_seconds", Help: metricsutil.HelpMsgWithStability("Inference model Predicted TTFT distribution in seconds for each model and target model.", compbasemetrics.ALPHA), - Buckets: []float64{ - 0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3, - 4, 5, 6, 8, 10, 15, 20, 30, 45, 60, 120, 180, 240, 300, 360, 480, 600, 900, 1200, 1800, 2700, 3600, - }, + Buckets: GeneralLatencyBuckets, }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) - // New metrics for TTFT prediction duration requestTTFTPredictionDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Subsystem: InferenceObjectiveComponent, Name: "request_ttft_prediction_duration_seconds", Help: metricsutil.HelpMsgWithStability("Duration taken to generate TTFT predictions in seconds for each model and target model.", compbasemetrics.ALPHA), - Buckets: []float64{ - 0.0001, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, - }, + Buckets: PredictionLatencyBuckets, }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) requestTPOT = prometheus.NewHistogramVec( @@ -118,12 +151,9 @@ var ( Subsystem: InferenceObjectiveComponent, Name: "request_tpot_seconds", Help: metricsutil.HelpMsgWithStability("Inference model TPOT distribution in seconds for each model and target model.", compbasemetrics.ALPHA), - Buckets: []float64{ - 0.0005, 0.00205, 0.005, 0.01, 0.02, 0.04, 0.06, 0.08, 0.1, 0.125, 0.15, 0.2, 0.3, - 0.4, 0.5, 0.6, 0.8, 1, 1.5, 2, 3, 4.5, 6, 12, 18, 24, 30, 36, 48, 60, 90, 120, 180, 270, 360, - }, + Buckets: TPOTBuckets, }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) requestPredictedTPOT = prometheus.NewHistogramVec( @@ -131,35 +161,28 @@ var ( Subsystem: InferenceObjectiveComponent, Name: "request_predicted_tpot_seconds", Help: metricsutil.HelpMsgWithStability("Inference model Predicted TPOT distribution in seconds for each model and target model.", compbasemetrics.ALPHA), - Buckets: []float64{ - 0.0005, 0.00205, 0.005, 0.01, 0.02, 0.04, 0.06, 0.08, 0.1, 0.125, 0.15, 0.2, 0.3, - 0.4, 0.5, 0.6, 0.8, 1, 1.5, 2, 3, 4.5, 6, 12, 18, 24, 30, 36, 48, 60, 90, 120, 180, 270, 360, - }, + Buckets: TPOTBuckets, }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) - // New metrics for TPOT prediction duration requestTPOTPredictionDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Subsystem: InferenceObjectiveComponent, Name: "request_tpot_prediction_duration_seconds", Help: metricsutil.HelpMsgWithStability("Duration taken to generate TPOT predictions in seconds for each model and target model.", compbasemetrics.ALPHA), - Buckets: []float64{ - 0.0001, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, - }, + Buckets: PredictionLatencyBuckets, }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) - // Counter for SLO Violations sloViolationCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Subsystem: InferenceObjectiveComponent, Name: "request_slo_violation_total", Help: metricsutil.HelpMsgWithStability("Counter of SLO violations for each model, target model, and violation type.", compbasemetrics.ALPHA), }, - []string{"model_name", "target_model_name", "type"}, + ModelTypeLabels, ) requestLatencies = prometheus.NewHistogramVec( @@ -167,12 +190,9 @@ var ( Subsystem: InferenceObjectiveComponent, Name: "request_duration_seconds", Help: metricsutil.HelpMsgWithStability("Inference objective response latency distribution in seconds for each model and target model.", compbasemetrics.ALPHA), - Buckets: []float64{ - 0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3, - 4, 5, 6, 8, 10, 15, 20, 30, 45, 60, 120, 180, 240, 300, 360, 480, 600, 900, 1200, 1800, 2700, 3600, - }, + Buckets: GeneralLatencyBuckets, }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) requestSizes = prometheus.NewHistogramVec( @@ -187,7 +207,7 @@ var ( 16777216, 33554432, 67108864, 134217728, 268435456, 536870912, 1073741824, // Exponential up to 1GB }, }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) responseSizes = prometheus.NewHistogramVec( @@ -199,7 +219,7 @@ var ( // 8192 * 4 = 32768. Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32778, 65536}, }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) inputTokens = prometheus.NewHistogramVec( @@ -210,7 +230,7 @@ var ( // Most models have a input context window less than 1 million tokens. Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32778, 65536, 131072, 262144, 524288, 1048576}, }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) outputTokens = prometheus.NewHistogramVec( @@ -221,7 +241,7 @@ var ( // Most models generates output less than 8192 tokens. Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192}, }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) promptCachedTokens = prometheus.NewHistogramVec( @@ -232,7 +252,7 @@ var ( // Most models have a input context window less than 1 million tokens. Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32778, 65536, 131072, 262144, 524288, 1048576}, }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) runningRequests = prometheus.NewGaugeVec( @@ -255,17 +275,19 @@ var ( 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, }, }, - []string{"model_name", "target_model_name"}, + ModelLabels, ) +) - // Inference Pool Metrics +// --- Inference Pool Metrics --- +var ( inferencePoolAvgKVCache = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Subsystem: InferencePoolComponent, Name: "average_kv_cache_utilization", Help: metricsutil.HelpMsgWithStability("The average kv cache utilization for an inference server pool.", compbasemetrics.ALPHA), }, - []string{"name"}, + PoolLabels, ) inferencePoolAvgQueueSize = prometheus.NewGaugeVec( @@ -274,7 +296,7 @@ var ( Name: "average_queue_size", Help: metricsutil.HelpMsgWithStability("The average number of requests pending in the model server queue.", compbasemetrics.ALPHA), }, - []string{"name"}, + PoolLabels, ) inferencePoolReadyPods = prometheus.NewGaugeVec( @@ -283,10 +305,12 @@ var ( Name: "ready_pods", Help: metricsutil.HelpMsgWithStability("The number of ready pods in the inference server pool.", compbasemetrics.ALPHA), }, - []string{"name"}, + PoolLabels, ) +) - // Scheduler Metrics +// --- Scheduling Metrics --- +var ( SchedulerE2ELatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Subsystem: InferenceExtension, @@ -295,12 +319,10 @@ var ( Buckets: []float64{ 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, }, - // StabilityLevel: prometheus.ALPHA, }, []string{}, ) - // SchedulerAttemptsTotal counts total number of scheduling attempts, labeled by status. SchedulerAttemptsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Subsystem: InferenceExtension, @@ -322,7 +344,6 @@ var ( []string{"extension_point", "plugin_type", "plugin_name"}, ) - // Prefix indexer Metrics PrefixCacheSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Subsystem: InferenceExtension, @@ -352,18 +373,20 @@ var ( }, []string{}, ) +) - // Info Metrics - InferenceExtensionInfo = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: InferenceExtension, - Name: "info", - Help: metricsutil.HelpMsgWithStability("General information of the current build of Inference Extension.", compbasemetrics.ALPHA), - }, - []string{"commit", "build_ref"}, - ) +// --- Info Metrics --- +var InferenceExtensionInfo = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: InferenceExtension, + Name: "info", + Help: metricsutil.HelpMsgWithStability("General information of the current build of Inference Extension.", compbasemetrics.ALPHA), + }, + []string{"commit", "build_ref"}, +) - // Flow Control Metrics +// --- Flow Control Metrics --- +var ( flowControlRequestQueueDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Subsystem: InferenceExtension, @@ -373,7 +396,7 @@ var ( 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, }, }, - []string{"fairness_id", "priority", "outcome"}, + append([]string{"fairness_id", "priority", "outcome", "inference_pool"}, ModelLabels...), ) flowControlQueueSize = prometheus.NewGaugeVec( @@ -382,18 +405,18 @@ var ( Name: "flow_control_queue_size", Help: metricsutil.HelpMsgWithStability("Current number of requests being actively managed by the EPP flow control layer, from the start of the EnqueueAndWait call until a final outcome is reached.", compbasemetrics.ALPHA), }, - []string{"fairness_id", "priority"}, + append([]string{"fairness_id", "priority", "inference_pool"}, ModelLabels...), ) +) - // Inference Model Rewrite Metrics - inferenceModelRewriteDecisionsTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Subsystem: InferenceExtension, - Name: "model_rewrite_decisions_total", - Help: metricsutil.HelpMsgWithStability("Total number of inference model rewrite decisions.", compbasemetrics.ALPHA), - }, - []string{"model_rewrite_name", "model_name", "target_model"}, - ) +// --- Inference Model Rewrite Metrics --- +var inferenceModelRewriteDecisionsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: InferenceExtension, + Name: "model_rewrite_decisions_total", + Help: metricsutil.HelpMsgWithStability("Total number of inference model rewrite decisions.", compbasemetrics.ALPHA), + }, + []string{"model_rewrite_name", "model_name", "target_model"}, ) var registerMetrics sync.Once @@ -523,7 +546,7 @@ func RecordRequestTPOT(ctx context.Context, modelName, targetModelName string, t return false } requestTPOT.WithLabelValues(modelName, targetModelName).Observe(tpot) - inferenceGauges.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "tpot"}).Set(tpot) + inferenceGauges.WithLabelValues(modelName, targetModelName, TypeTPOT).Set(tpot) return true } @@ -539,12 +562,12 @@ func RecordRequestTPOTWithSLO(ctx context.Context, modelName, targetModelName st // Check for SLO violation (tpot exceeds threshold) if tpot > sloThreshold { - inferenceGauges.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "tpot_slo_violation"}).Set(1) - sloViolationCounter.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "tpot"}).Inc() + inferenceGauges.WithLabelValues(modelName, targetModelName, TypeTPOTSLOViolation).Set(1) + sloViolationCounter.WithLabelValues(modelName, targetModelName, TypeTPOT).Inc() log.FromContext(ctx).V(logutil.DEFAULT).Info("TPOT SLO violation detected", "modelName", modelName, "targetModelName", targetModelName, "tpot", tpot, "threshold", sloThreshold) } else { - inferenceGauges.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "tpot_slo_violation"}).Set(0) + inferenceGauges.WithLabelValues(modelName, targetModelName, TypeTPOTSLOViolation).Set(0) } return true @@ -558,7 +581,7 @@ func RecordRequestPredictedTPOT(ctx context.Context, modelName, targetModelName return false } requestPredictedTPOT.WithLabelValues(modelName, targetModelName).Observe(predicted_tpot) - inferenceGauges.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "predicted_tpot"}).Set(predicted_tpot) + inferenceGauges.WithLabelValues(modelName, targetModelName, TypePredictedTPOT).Set(predicted_tpot) return true } @@ -570,7 +593,7 @@ func RecordRequestTPOTPredictionDuration(ctx context.Context, modelName, targetM return false } requestTPOTPredictionDuration.WithLabelValues(modelName, targetModelName).Observe(duration) - inferenceGauges.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "tpot_prediction_duration"}).Set(duration) + inferenceGauges.WithLabelValues(modelName, targetModelName, TypeTPOTPredictionDuration).Set(duration) return true } @@ -582,7 +605,7 @@ func RecordRequestTTFT(ctx context.Context, modelName, targetModelName string, t return false } requestTTFT.WithLabelValues(modelName, targetModelName).Observe(ttft) - inferenceGauges.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "ttft"}).Set(ttft) + inferenceGauges.WithLabelValues(modelName, targetModelName, TypeTTFT).Set(ttft) return true } @@ -598,12 +621,12 @@ func RecordRequestTTFTWithSLO(ctx context.Context, modelName, targetModelName st // Check for SLO violation (ttft exceeds threshold) if ttft > sloThreshold { - inferenceGauges.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "ttft_slo_violation"}).Set(1) - sloViolationCounter.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "ttft"}).Inc() + inferenceGauges.WithLabelValues(modelName, targetModelName, TypeTTFTSLOViolation).Set(1) + sloViolationCounter.WithLabelValues(modelName, targetModelName, TypeTTFT).Inc() log.FromContext(ctx).V(logutil.DEFAULT).Info("TTFT SLO violation detected", "modelName", modelName, "targetModelName", targetModelName, "ttft", ttft, "threshold", sloThreshold) } else { - inferenceGauges.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "ttft_slo_violation"}).Set(0) + inferenceGauges.WithLabelValues(modelName, targetModelName, TypeTTFTSLOViolation).Set(0) } return true @@ -617,7 +640,7 @@ func RecordRequestPredictedTTFT(ctx context.Context, modelName, targetModelName return false } requestPredictedTTFT.WithLabelValues(modelName, targetModelName).Observe(predicted_ttft) - inferenceGauges.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "predicted_ttft"}).Set(predicted_ttft) + inferenceGauges.WithLabelValues(modelName, targetModelName, TypePredictedTTFT).Set(predicted_ttft) return true } @@ -629,7 +652,7 @@ func RecordRequestTTFTPredictionDuration(ctx context.Context, modelName, targetM return false } requestTTFTPredictionDuration.WithLabelValues(modelName, targetModelName).Observe(duration) - inferenceGauges.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "ttft_prediction_duration"}).Set(duration) + inferenceGauges.WithLabelValues(modelName, targetModelName, TypeTTFTPredictionDuration).Set(duration) return true } @@ -751,30 +774,39 @@ func RecordInferenceExtensionInfo(commitSha, buildRef string) { } // RecordFlowControlRequestQueueDuration records the duration a request spent in the Flow Control layer. -func RecordFlowControlRequestQueueDuration(fairnessID, priority, outcome string, duration time.Duration) { - flowControlRequestQueueDuration.WithLabelValues(fairnessID, priority, outcome).Observe(duration.Seconds()) +func RecordFlowControlRequestQueueDuration( + fairnessID, priority, outcome, + inferencePool, + modelName, targetModelName string, + duration time.Duration, +) { + flowControlRequestQueueDuration.WithLabelValues( + fairnessID, priority, outcome, + inferencePool, + modelName, targetModelName, + ).Observe(duration.Seconds()) } // IncFlowControlQueueSize increments the Flow Control queue size gauge. -func IncFlowControlQueueSize(fairnessID, priority string) { - flowControlQueueSize.WithLabelValues(fairnessID, priority).Inc() +func IncFlowControlQueueSize(fairnessID, priority, inferencePool, modelName, targetModelName string) { + flowControlQueueSize.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Inc() } // DecFlowControlQueueSize decrements the Flow Control queue size gauge. -func DecFlowControlQueueSize(fairnessID, priority string) { - flowControlQueueSize.WithLabelValues(fairnessID, priority).Dec() +func DecFlowControlQueueSize(fairnessID, priority, inferencePool, modelName, targetModelName string) { + flowControlQueueSize.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Dec() } // SetTTFTSLOThreshold sets the TTFT SLO threshold for a model. // This allows dynamic threshold management and makes the threshold visible in metrics. func SetTTFTSLOThreshold(modelName, targetModelName string, threshold float64) { - inferenceGauges.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "ttft_slo_threshold"}).Set(threshold) + inferenceGauges.WithLabelValues(modelName, targetModelName, TypeTTFTSLOThreshold).Set(threshold) } // SetTPOTSLOThreshold sets the TPOT SLO threshold for a model. // This allows dynamic threshold management and makes the threshold visible in metrics. func SetTPOTSLOThreshold(modelName, targetModelName string, threshold float64) { - inferenceGauges.With(prometheus.Labels{"model_name": modelName, "target_model_name": targetModelName, "type": "tpot_slo_threshold"}).Set(threshold) + inferenceGauges.WithLabelValues(modelName, targetModelName, TypeTPOTSLOThreshold).Set(threshold) } // RecordInferenceModelRewriteDecision records the routing decision for InferenceModelRewrite. diff --git a/pkg/epp/metrics/metrics_test.go b/pkg/epp/metrics/metrics_test.go index 5dac3731e..91dc3a647 100644 --- a/pkg/epp/metrics/metrics_test.go +++ b/pkg/epp/metrics/metrics_test.go @@ -848,6 +848,12 @@ func getHistogramVecLabelValues(t *testing.T, h *prometheus.HistogramVec, labelV func TestFlowControlQueueDurationMetric(t *testing.T) { Reset() + const ( + pool = "pool-1" + model = "llama-2" + target = "llama-base" + ) + records := []struct { fairnessID string priority string @@ -861,7 +867,7 @@ func TestFlowControlQueueDurationMetric(t *testing.T) { } for _, rec := range records { - RecordFlowControlRequestQueueDuration(rec.fairnessID, rec.priority, rec.outcome, rec.duration) + RecordFlowControlRequestQueueDuration(rec.fairnessID, rec.priority, rec.outcome, pool, model, target, rec.duration) } testCases := []struct { @@ -871,20 +877,41 @@ func TestFlowControlQueueDurationMetric(t *testing.T) { expectSum float64 }{ { - name: "user-a, prio 100, dispatched", - labels: prometheus.Labels{"fairness_id": "user-a", "priority": "100", "outcome": "Dispatched"}, + name: "user-a, prio 100, dispatched", + labels: prometheus.Labels{ + "fairness_id": "user-a", + "priority": "100", + "outcome": "Dispatched", + "inference_pool": pool, + "model_name": model, + "target_model_name": target, + }, expectCount: 2, expectSum: 0.03, // 0.01 + 0.02 }, { - name: "user-b, prio 100, rejected", - labels: prometheus.Labels{"fairness_id": "user-b", "priority": "100", "outcome": "RejectedCapacity"}, + name: "user-b, prio 100, rejected", + labels: prometheus.Labels{ + "fairness_id": "user-b", + "priority": "100", + "outcome": "RejectedCapacity", + "inference_pool": pool, + "model_name": model, + "target_model_name": target, + }, expectCount: 1, expectSum: 0.005, }, { - name: "user-a, prio 50, dispatched", - labels: prometheus.Labels{"fairness_id": "user-a", "priority": "50", "outcome": "Dispatched"}, + name: "user-a, prio 50, dispatched", + labels: prometheus.Labels{ + "fairness_id": "user-a", + "priority": "50", + "outcome": "Dispatched", + "inference_pool": pool, + "model_name": model, + "target_model_name": target, + }, expectCount: 1, expectSum: 0.1, }, @@ -892,7 +919,14 @@ func TestFlowControlQueueDurationMetric(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - labels := []string{tc.labels["fairness_id"], tc.labels["priority"], tc.labels["outcome"]} + labels := []string{ + tc.labels["fairness_id"], + tc.labels["priority"], + tc.labels["outcome"], + tc.labels["inference_pool"], + tc.labels["model_name"], + tc.labels["target_model_name"], + } hist, err := getHistogramVecLabelValues(t, flowControlRequestQueueDuration, labels...) require.NoError(t, err, "Failed to get histogram for labels %v", tc.labels) require.Equal(t, tc.expectCount, hist.GetSampleCount(), "Sample count mismatch for labels %v", tc.labels) @@ -904,31 +938,37 @@ func TestFlowControlQueueDurationMetric(t *testing.T) { func TestFlowControlQueueSizeMetric(t *testing.T) { Reset() + const ( + pool = "pool-1" + model = "llama-2" + target = "llama-base" + ) + // Basic Inc/Dec - IncFlowControlQueueSize("user-a", "100") - val, err := testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-a", "100")) + IncFlowControlQueueSize("user-a", "100", pool, model, target) + val, err := testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-a", "100", pool, model, target)) require.NoError(t, err, "Failed to get gauge value for user-a/100 after Inc") require.Equal(t, 1.0, val, "Gauge value should be 1 after Inc for user-a/100") - DecFlowControlQueueSize("user-a", "100") - val, err = testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-a", "100")) + DecFlowControlQueueSize("user-a", "100", pool, model, target) + val, err = testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-a", "100", pool, model, target)) require.NoError(t, err, "Failed to get gauge value for user-a/100 after Dec") require.Equal(t, 0.0, val, "Gauge value should be 0 after Dec for user-a/100") // Multiple labels - IncFlowControlQueueSize("user-b", "200") - IncFlowControlQueueSize("user-b", "200") - val, err = testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-b", "200")) + IncFlowControlQueueSize("user-b", "200", pool, model, target) + IncFlowControlQueueSize("user-b", "200", pool, model, target) + val, err = testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-b", "200", pool, model, target)) require.NoError(t, err, "Failed to get gauge value for user-b/200") require.Equal(t, 2.0, val, "Gauge value should be 2 for user-b/200") - DecFlowControlQueueSize("user-b", "200") - val, err = testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-b", "200")) + DecFlowControlQueueSize("user-b", "200", pool, model, target) + val, err = testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-b", "200", pool, model, target)) require.NoError(t, err, "Failed to get gauge value for user-b/200 after one Dec") require.Equal(t, 1.0, val, "Gauge value should be 1 for user-b/200 after one Dec") // Non-existent labels - val, err = testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-c", "100")) + val, err = testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-c", "100", pool, model, target)) require.NoError(t, err, "Failed to get gauge value for non-existent user-c/100") require.Equal(t, 0.0, val, "Gauge value for non-existent labels should be 0") } diff --git a/pkg/epp/requestcontrol/admission.go b/pkg/epp/requestcontrol/admission.go index 2a806b56d..25f8b5b12 100644 --- a/pkg/epp/requestcontrol/admission.go +++ b/pkg/epp/requestcontrol/admission.go @@ -137,12 +137,14 @@ func (lac *LegacyAdmissionController) Admit( // It uses the provided Flow Controller to enqueue the request and await an outcome. type FlowControlAdmissionController struct { flowController flowController + poolName string } // NewFlowControlAdmissionController creates a new FlowControlAdmissionController. -func NewFlowControlAdmissionController(fc flowController) *FlowControlAdmissionController { +func NewFlowControlAdmissionController(fc flowController, poolName string) *FlowControlAdmissionController { return &FlowControlAdmissionController{ flowController: fc, + poolName: poolName, } } @@ -158,11 +160,14 @@ func (fcac *FlowControlAdmissionController) Admit( "requestID", reqCtx.SchedulingRequest.RequestId, "priority", priority, "fairnessID", reqCtx.FairnessID) fcReq := &flowControlRequest{ - requestID: reqCtx.SchedulingRequest.RequestId, - fairnessID: reqCtx.FairnessID, - priority: priority, - requestByteSize: uint64(reqCtx.RequestSize), - reqMetadata: reqCtx.Request.Metadata, + requestID: reqCtx.SchedulingRequest.RequestId, + fairnessID: reqCtx.FairnessID, + priority: priority, + requestByteSize: uint64(reqCtx.RequestSize), + reqMetadata: reqCtx.Request.Metadata, + inferencePoolName: fcac.poolName, + modelName: reqCtx.IncomingModelName, + targetModelName: reqCtx.TargetModelName, } outcome, err := fcac.flowController.EnqueueAndWait(ctx, fcReq) @@ -173,11 +178,14 @@ func (fcac *FlowControlAdmissionController) Admit( // flowControlRequest is an adapter that implements the types.FlowControlRequest interface. type flowControlRequest struct { - requestID string - fairnessID string - priority int - requestByteSize uint64 - reqMetadata map[string]any + requestID string + fairnessID string + priority int + requestByteSize uint64 + reqMetadata map[string]any + inferencePoolName string + modelName string + targetModelName string } var _ types.FlowControlRequest = &flowControlRequest{} @@ -185,12 +193,13 @@ var _ types.FlowControlRequest = &flowControlRequest{} func (r *flowControlRequest) ID() string { return r.requestID } func (r *flowControlRequest) InitialEffectiveTTL() time.Duration { return 0 } // Use controller default. func (r *flowControlRequest) ByteSize() uint64 { return r.requestByteSize } +func (r *flowControlRequest) GetMetadata() map[string]any { return r.reqMetadata } +func (r *flowControlRequest) InferencePoolName() string { return r.inferencePoolName } +func (r *flowControlRequest) ModelName() string { return r.modelName } +func (r *flowControlRequest) TargetModelName() string { return r.targetModelName } func (r *flowControlRequest) FlowKey() types.FlowKey { return types.FlowKey{ID: r.fairnessID, Priority: r.priority} } -func (r *flowControlRequest) GetMetadata() map[string]any { - return r.reqMetadata -} // translateFlowControlOutcome maps the context-rich outcome of the Flow Control layer to the public errutil.Error // contract used by the Director. diff --git a/pkg/epp/requestcontrol/admission_test.go b/pkg/epp/requestcontrol/admission_test.go index 388a5c992..51a6c90b4 100644 --- a/pkg/epp/requestcontrol/admission_test.go +++ b/pkg/epp/requestcontrol/admission_test.go @@ -259,7 +259,7 @@ func TestFlowControlAdmissionController_Admit(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() fc := &mockFlowController{outcome: tc.fcOutcome, err: tc.fcErr} - ac := NewFlowControlAdmissionController(fc) + ac := NewFlowControlAdmissionController(fc, "pool") err := ac.Admit(ctx, reqCtx, tc.priority) diff --git a/site-src/guides/metrics-and-observability.md b/site-src/guides/metrics-and-observability.md index 8364f8721..327162a39 100644 --- a/site-src/guides/metrics-and-observability.md +++ b/site-src/guides/metrics-and-observability.md @@ -59,8 +59,8 @@ These metrics provide insights into the experimental flow control layer within t | **Metric name** | **Metric Type** |
**Description**
|
**Labels**
| **Status** | |:---|:---|:---|:---|:---| -| inference_extension_flow_control_request_queue_duration_seconds | Distribution | Distribution of the total time requests spend in the flow control layer. This is measured from the moment a request enters the `EnqueueAndWait` function until it reaches a final outcome (e.g., Dispatched, Rejected, Evicted). | `fairness_id`=<flow-id>
`priority`=<flow-priority>
`outcome`=<QueueOutcome> | ALPHA | -| inference_extension_flow_control_queue_size | Gauge | The current number of requests being actively managed by the flow control layer. This counts requests from the moment they enter the `EnqueueAndWait` function until they reach a final outcome. | `fairness_id`=<flow-id>
`priority`=<flow-priority> | ALPHA | +| inference_extension_flow_control_request_queue_duration_seconds | Distribution | Distribution of the total time requests spend in the flow control layer. This is measured from the moment a request enters the `EnqueueAndWait` function until it reaches a final outcome (e.g., Dispatched, Rejected, Evicted). | `fairness_id`=<flow-id>
`priority`=<flow-priority>
`outcome`=<QueueOutcome>
`inference_pool`=<pool-name>
`model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA | +| inference_extension_flow_control_queue_size | Gauge | The current number of requests being actively managed by the flow control layer. This counts requests from the moment they enter the `EnqueueAndWait` function until they reach a final outcome. | `fairness_id`=<flow-id>
`priority`=<flow-priority>
`inference_pool`=<pool-name>
`model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA | ## Scrape Metrics & Pprof profiles