Skip to content

Commit 36ca5ab

Browse files
authored
flowcontrol: Support dynamic priority provisioning (#2006)
* flowcontrol: support dynamic priority provisioning This commit implements dynamic provisioning for priority bands, enabling the system to accept requests with priority levels that were not explicitly configured at startup. Key Changes: 1. Hybrid State Model: - Adopts `sync.Map` in `FlowRegistry` and `registryShard` for operational state (queues, stats). This enables lock-free reads on the hot path and safe concurrent writes when provisioning new bands. - Retains `map` + `RWMutex` for the `Config` definition to ensure a consistent source of truth for topology limits and names. 2. Dynamic Provisioning Logic: - `prepareNewFlow` implements an optimistic lock-free check for band existence. - `ensurePriorityBand` handles the atomic JIT creation of new bands, updating both the global config definitions and the active shard runtimes. 3. Configuration Extensions: - Added `DefaultPriorityBand` to `Config` to serve as the template for dynamically created bands. - Updated validation logic to support empty static band lists, defaulting instead to a purely dynamic configuration. - Removed obsolescent test cases enforcing static band requirements. * flowcontrol: add tests for dynamic provisioning This commit extends the test suite to validate the new dynamic priority provisioning capabilities in the FlowRegistry and Shard components. Key Updates: 1. Shard Tests: - Added `TestShard_DynamicProvisioning` to verify that `addPriorityBand` correctly updates internal state (`sync.Map`, sorted levels) and maintains idempotency. - Verified that accessing missing configuration triggers the expected invariant violation panic. 2. Registry Tests: - Added `TestFlowRegistry_DynamicProvisioning` to verify the end-to-end flow: creating a new priority via `WithConnection` updates the global config, stats, and propagates to all active shards. - Added concurrency tests to ensure multiple clients requesting the same new priority simultaneously do not race or duplicate work. - Verified that dynamic bands are correctly persisted across scaling events. 3. Config Tests: - Updated `TestNewConfig` to support empty static band configurations (now valid). - Added verification for `DefaultPriorityBand` template initialization and cloning behavior. * flowcontrol: adopt dynamic bootstrapping for POC Removes the explicit hardcoded configuration of the "Default" priority band (Priority 0) in the main entry point. The system now relies entirely on the newly implemented `DefaultPriorityBand` mechanism to dynamically provision Priority 0 just-in-time upon the first request. This simplifies the wiring code and validates the zero-config startup path.
1 parent a37709b commit 36ca5ab

File tree

7 files changed

+424
-154
lines changed

7 files changed

+424
-154
lines changed

cmd/epp/runner/runner.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,8 @@ func must[T any](t T, err error) T {
104104

105105
// TODO: This is hardcoded for POC only. This needs to be hooked up to our text-based config story.
106106
var flowControlConfig = flowcontrol.Config{
107-
Controller: fccontroller.Config{}, // Use all defaults.
108-
Registry: must(fcregistry.NewConfig(
109-
fcregistry.WithPriorityBand(
110-
must(fcregistry.NewPriorityBandConfig(0, "Default")),
111-
),
112-
)),
107+
Controller: fccontroller.Config{}, // Use all defaults.
108+
Registry: must(fcregistry.NewConfig()), // Use all defaults.
113109
}
114110

115111
var (

pkg/epp/flowcontrol/registry/config.go

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,16 +112,20 @@ type Config struct {
112112

113113
// PriorityBands defines the set of priority band templates managed by the `FlowRegistry`.
114114
// It is a map keyed by Priority level, providing O(1) access and ensuring priority uniqueness by definition.
115-
// Required: At least one `PriorityBandConfig` must be provided for a functional registry.
116115
PriorityBands map[int]*PriorityBandConfig
117116

117+
// DefaultPriorityBand serves as a template for dynamically provisioning priority bands when a request arrives with a
118+
// priority level that was not explicitly configured.
119+
// If nil, it is automatically populated with system defaults during NewConfig.
120+
DefaultPriorityBand *PriorityBandConfig
121+
118122
// InitialShardCount specifies the number of parallel shards to create when the registry is initialized.
119123
// This value must be greater than zero.
120124
// Optional: Defaults to `defaultInitialShardCount` (1).
121125
InitialShardCount int
122126

123-
// FlowGCTimeout defines the interval at which the registry scans for and garbage collects idle flows. A flow is
124-
// collected if it has been observed to be Idle for at least one full scan interval.
127+
// FlowGCTimeout defines the interval at which the registry scans for and garbage collects idle flows.
128+
// A flow is collected if it has been observed to be Idle for at least one full scan interval.
125129
// Optional: Defaults to `defaultFlowGCTimeout` (5 minutes).
126130
FlowGCTimeout time.Duration
127131

@@ -223,6 +227,14 @@ func WithPriorityBand(band *PriorityBandConfig) ConfigOption {
223227
}
224228
}
225229

230+
// WithDefaultPriorityBand sets the template configuration used for dynamically provisioning priority bands.
231+
func WithDefaultPriorityBand(band *PriorityBandConfig) ConfigOption {
232+
return func(b *configBuilder) error {
233+
b.config.DefaultPriorityBand = band
234+
return nil
235+
}
236+
}
237+
226238
// withCapabilityChecker overrides the compatibility checker used during validation.
227239
// It is intended for use only in internal unit tests.
228240
// test-only
@@ -304,8 +316,19 @@ func NewConfig(opts ...ConfigOption) (*Config, error) {
304316
}
305317
}
306318

307-
// Apply defaults to all bands.
308-
// This covers the case where a user passed a raw struct literal via WithPriorityBand.
319+
// Initialize DefaultPriorityBand if missing.
320+
// This ensures we always have a template for dynamic provisioning.
321+
if builder.config.DefaultPriorityBand == nil {
322+
builder.config.DefaultPriorityBand = &PriorityBandConfig{}
323+
}
324+
325+
// Apply defaults to the template.
326+
builder.config.DefaultPriorityBand.applyDefaults()
327+
if builder.config.DefaultPriorityBand.PriorityName == "" {
328+
builder.config.DefaultPriorityBand.PriorityName = "Dynamic-Default"
329+
}
330+
331+
// Apply defaults to all explicitly configured bands.
309332
for _, band := range builder.config.PriorityBands {
310333
band.applyDefaults()
311334
}
@@ -317,7 +340,7 @@ func NewConfig(opts ...ConfigOption) (*Config, error) {
317340
}
318341

319342
// NewPriorityBandConfig creates a new band configuration with the required fields.
320-
// It applies system defaults first, then applies any provided options to override those defaults
343+
// It applies system defaults first, then applies any provided options to override those defaults.
321344
func NewPriorityBandConfig(priority int, name string, opts ...PriorityBandConfigOption) (*PriorityBandConfig, error) {
322345
pb := &PriorityBandConfig{
323346
Priority: priority,
@@ -387,10 +410,15 @@ func (c *Config) validate(checker capabilityChecker) error {
387410
return errors.New("eventChannelBufferSize must be greater than 0")
388411
}
389412

390-
if len(c.PriorityBands) == 0 {
391-
return errors.New("at least one priority band must be defined")
413+
// Validate the dynamic template.
414+
// We use a dummy priority since the template itself doesn't have a fixed priority.
415+
templateValidationCopy := *c.DefaultPriorityBand
416+
templateValidationCopy.Priority = 0
417+
if err := templateValidationCopy.validate(checker); err != nil {
418+
return fmt.Errorf("invalid DefaultPriorityBand configuration: %w", err)
392419
}
393420

421+
// Validate statically configured bands.
394422
names := make(map[string]struct{}, len(c.PriorityBands))
395423
for _, band := range c.PriorityBands {
396424
if _, exists := names[band.PriorityName]; exists {
@@ -462,6 +490,12 @@ func (c *Config) Clone() *Config {
462490
}
463491

464492
clone := *c
493+
494+
if c.DefaultPriorityBand != nil {
495+
val := *c.DefaultPriorityBand
496+
clone.DefaultPriorityBand = &val
497+
}
498+
465499
if c.PriorityBands != nil {
466500
clone.PriorityBands = make(map[int]*PriorityBandConfig, len(c.PriorityBands))
467501
for prio, band := range c.PriorityBands {

pkg/epp/flowcontrol/registry/config_test.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,40 @@ func TestNewConfig(t *testing.T) {
107107
assert.Equal(t, defaultIntraFlowDispatchPolicy, band.IntraFlowDispatchPolicy)
108108
},
109109
},
110-
111-
// --- Validation Errors (Global) ---
112110
{
113-
name: "ShouldError_WhenNoPriorityBandsDefined",
114-
opts: []ConfigOption{WithInitialShardCount(1)},
115-
expectErr: true,
116-
expectedErrIs: nil, // Generic error expected.
111+
name: "ShouldSucceed_WhenNoPriorityBandsDefined_WithDynamicDefaults",
112+
opts: []ConfigOption{
113+
// No WithPriorityBand options provided.
114+
// This relies entirely on dynamic provisioning.
115+
},
116+
assertion: func(t *testing.T, cfg *Config) {
117+
assert.Empty(t, cfg.PriorityBands, "PriorityBands map should be empty")
118+
require.NotNil(t, cfg.DefaultPriorityBand, "DefaultPriorityBand template must be initialized")
119+
assert.Equal(t, "Dynamic-Default", cfg.DefaultPriorityBand.PriorityName)
120+
assert.Equal(t, defaultQueue, cfg.DefaultPriorityBand.Queue)
121+
},
122+
},
123+
{
124+
name: "ShouldRespectCustomDefaultPriorityBand",
125+
opts: []ConfigOption{
126+
WithDefaultPriorityBand(&PriorityBandConfig{
127+
PriorityName: "My-Custom-Template",
128+
Queue: "CustomQueue",
129+
}),
130+
withCapabilityChecker(&mockCapabilityChecker{
131+
checkCompatibilityFunc: func(_ intra.RegisteredPolicyName, _ queue.RegisteredQueueName) error { return nil },
132+
}),
133+
},
134+
assertion: func(t *testing.T, cfg *Config) {
135+
require.NotNil(t, cfg.DefaultPriorityBand)
136+
assert.Equal(t, "My-Custom-Template", cfg.DefaultPriorityBand.PriorityName)
137+
assert.Equal(t, queue.RegisteredQueueName("CustomQueue"), cfg.DefaultPriorityBand.Queue)
138+
// Assert other defaults were applied to the template.
139+
assert.Equal(t, defaultIntraFlowDispatchPolicy, cfg.DefaultPriorityBand.IntraFlowDispatchPolicy)
140+
},
117141
},
142+
143+
// --- Validation Errors (Global) ---
118144
{
119145
name: "ShouldError_WhenInitialShardCountIsInvalid",
120146
opts: []ConfigOption{WithInitialShardCount(0)}, // Option itself should return error.
@@ -334,4 +360,21 @@ func TestConfig_Clone(t *testing.T) {
334360
assert.Equal(t, uint64(99999), clone.PriorityBands[1].MaxBytes)
335361
assert.Equal(t, "Modified", clone.PriorityBands[1].PriorityName)
336362
})
363+
364+
t.Run("ShouldDeepCopyDefaultPriorityBand", func(t *testing.T) {
365+
t.Parallel()
366+
original, err := NewConfig()
367+
require.NoError(t, err)
368+
369+
clone := original.Clone()
370+
371+
require.NotSame(t, original.DefaultPriorityBand, clone.DefaultPriorityBand,
372+
"Clone should have a distinct pointer for DefaultPriorityBand")
373+
assert.Equal(t, original.DefaultPriorityBand.PriorityName, clone.DefaultPriorityBand.PriorityName)
374+
375+
// Modify Clone.
376+
clone.DefaultPriorityBand.PriorityName = "Hacked"
377+
assert.Equal(t, "Dynamic-Default", original.DefaultPriorityBand.PriorityName,
378+
"Modifying clone template should not affect original")
379+
})
337380
}

pkg/epp/flowcontrol/registry/registry.go

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,14 @@ type FlowRegistry struct {
120120
flowStates sync.Map // stores `types.FlowKey` -> *flowState
121121

122122
// Globally aggregated statistics, updated atomically via lock-free propagation.
123-
totalByteSize atomic.Int64
124-
totalLen atomic.Int64
125-
perPriorityBandStats map[int]*bandStats // Keyed by priority.
123+
totalByteSize atomic.Int64
124+
totalLen atomic.Int64
125+
126+
// perPriorityBandStats tracks aggregated stats per priority.
127+
// Key: int (priority), Value: *bandStats
128+
// We use sync.Map here to allow for lock-free reads on the hot path (Stats) while allowing dynamic provisioning to
129+
// add new keys safely.
130+
perPriorityBandStats sync.Map
126131

127132
// --- Administrative state (protected by `mu`) ---
128133

@@ -152,11 +157,10 @@ func withClock(clk clock.WithTickerAndDelayedExecution) RegistryOption {
152157
func NewFlowRegistry(config *Config, logger logr.Logger, opts ...RegistryOption) (*FlowRegistry, error) {
153158
cfg := config.Clone()
154159
fr := &FlowRegistry{
155-
config: cfg,
156-
logger: logger.WithName("flow-registry"),
157-
activeShards: []*registryShard{},
158-
drainingShards: make(map[string]*registryShard),
159-
perPriorityBandStats: make(map[int]*bandStats, len(cfg.PriorityBands)),
160+
config: cfg,
161+
logger: logger.WithName("flow-registry"),
162+
activeShards: []*registryShard{},
163+
drainingShards: make(map[string]*registryShard),
160164
}
161165

162166
for _, opt := range opts {
@@ -167,7 +171,7 @@ func NewFlowRegistry(config *Config, logger logr.Logger, opts ...RegistryOption)
167171
}
168172

169173
for prio := range cfg.PriorityBands {
170-
fr.perPriorityBandStats[prio] = &bandStats{}
174+
fr.perPriorityBandStats.Store(prio, &bandStats{})
171175
}
172176

173177
if err := fr.updateShardCount(cfg.InitialShardCount); err != nil {
@@ -252,9 +256,18 @@ func (fr *FlowRegistry) WithConnection(key types.FlowKey, fn func(conn contracts
252256

253257
// prepareNewFlow creates a new `flowState` and synchronizes its queues and policies onto all existing shards.
254258
func (fr *FlowRegistry) prepareNewFlow(key types.FlowKey) (*flowState, error) {
255-
// Get a stable snapshot of the shard topology.
256-
// An RLock is sufficient because while the list of shards must be stable, the internal state of each shard is
257-
// protected by its own lock.
259+
fr.mu.RLock()
260+
_, exists := fr.config.PriorityBands[key.Priority]
261+
fr.mu.RUnlock()
262+
263+
// If the band was missing, we must acquire the Write Lock to create it.
264+
if !exists {
265+
if err := fr.ensurePriorityBand(key.Priority); err != nil {
266+
return nil, err
267+
}
268+
}
269+
270+
// Now we know the band exists (or we errored). Re-acquire Read Lock to safely read the topology and build components.
258271
fr.mu.RLock()
259272
defer fr.mu.RUnlock()
260273

@@ -272,6 +285,34 @@ func (fr *FlowRegistry) prepareNewFlow(key types.FlowKey) (*flowState, error) {
272285
return &flowState{key: key}, nil
273286
}
274287

288+
// ensurePriorityBand safely provisions a new priority band.
289+
func (fr *FlowRegistry) ensurePriorityBand(priority int) error {
290+
fr.mu.Lock()
291+
defer fr.mu.Unlock()
292+
293+
// Double-Check: Someone might have created it while we swapped locks in prepareNewFlow.
294+
if _, ok := fr.config.PriorityBands[priority]; ok {
295+
return nil
296+
}
297+
298+
fr.logger.Info("Dynamically provisioning new priority band", "priority", priority)
299+
300+
newBand := *fr.config.DefaultPriorityBand
301+
newBand.Priority = priority
302+
newBand.PriorityName = fmt.Sprintf("Dynamic-%d", priority)
303+
fr.config.PriorityBands[priority] = &newBand
304+
305+
fr.perPriorityBandStats.LoadOrStore(priority, &bandStats{})
306+
307+
fr.repartitionShardConfigsLocked()
308+
309+
for _, shard := range fr.activeShards {
310+
shard.addPriorityBand(priority)
311+
}
312+
313+
return nil
314+
}
315+
275316
// --- `contracts.FlowRegistryObserver` Implementation ---
276317

277318
// Stats returns globally aggregated statistics for the entire `FlowRegistry`.
@@ -288,16 +329,19 @@ func (fr *FlowRegistry) Stats() contracts.AggregateStats {
288329
PerPriorityBandStats: make(map[int]contracts.PriorityBandStats, len(fr.config.PriorityBands)),
289330
}
290331

291-
for p, s := range fr.perPriorityBandStats {
292-
bandCfg := fr.config.PriorityBands[p]
293-
stats.PerPriorityBandStats[p] = contracts.PriorityBandStats{
294-
Priority: p,
332+
fr.perPriorityBandStats.Range(func(key, value any) bool {
333+
priority := key.(int)
334+
bandStats := value.(*bandStats)
335+
bandCfg := fr.config.PriorityBands[priority]
336+
stats.PerPriorityBandStats[priority] = contracts.PriorityBandStats{
337+
Priority: priority,
295338
PriorityName: bandCfg.PriorityName,
296339
CapacityBytes: bandCfg.MaxBytes,
297-
ByteSize: uint64(s.byteSize.Load()),
298-
Len: uint64(s.len.Load()),
340+
ByteSize: uint64(bandStats.byteSize.Load()),
341+
Len: uint64(bandStats.len.Load()),
299342
}
300-
}
343+
return true
344+
})
301345
return stats
302346
}
303347

@@ -585,11 +629,8 @@ func (fr *FlowRegistry) updateAllShardsCacheLocked() {
585629

586630
// propagateStatsDelta is the top-level, lock-free aggregator for all statistics.
587631
func (fr *FlowRegistry) propagateStatsDelta(priority int, lenDelta, byteSizeDelta int64) {
588-
stats, ok := fr.perPriorityBandStats[priority]
589-
if !ok {
590-
panic(fmt.Sprintf("invariant violation: priority band (%d) stats missing during propagation", priority))
591-
}
592-
632+
val, _ := fr.perPriorityBandStats.Load(priority)
633+
stats := val.(*bandStats)
593634
stats.len.Add(lenDelta)
594635
stats.byteSize.Add(byteSizeDelta)
595636
fr.totalLen.Add(lenDelta)

0 commit comments

Comments
 (0)