Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 24 additions & 47 deletions pkg/utils/queue/bonded_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,28 +127,21 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume
select {
case <-time.After(retryDelay):
retryAttemptNumber++
func() {
defer func() {
if r := recover(); r != nil {
// Panic occurred, treat as error by not clearing hasRetry
}
}()
err := consumer.Consume(retryItem)
if err != nil {
// Still failing, increase delay (exponential backoff)
retryDelay = retryDelay * 2
if retryDelay > q.retryConfig.delay {
retryDelay = q.retryConfig.delay
}
} else {
// Success! Clear retry state and decrement size
hasRetry = false
retryItem = nil
retryDelay = time.Millisecond * 100
retryAttemptNumber = 0
q.size.Add(-1)
err := consumer.Consume(retryItem)
if err != nil {
// Still failing, increase delay (exponential backoff)
retryDelay = retryDelay * 2
if retryDelay > q.retryConfig.delay {
retryDelay = q.retryConfig.delay
}
}()
} else {
// Success! Clear retry state and decrement size
hasRetry = false
retryItem = nil
retryDelay = time.Millisecond * 100
retryAttemptNumber = 0
q.size.Add(-1)
}
case <-q.stopCh:
// Queue is closing
if hasRetry {
Expand All @@ -161,24 +154,15 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume
select {
case item, ok := <-queue:
if ok {
func() {
defer func() {
if r := recover(); r != nil {
// Panic occurred, treat as error
retryItem = item
hasRetry = true
}
}()
err := consumer.Consume(item)
if err != nil {
// Failed, set as retry item
retryItem = item
hasRetry = true
} else {
// Success, decrement size
q.size.Add(-1)
}
}()
err := consumer.Consume(item)
if err != nil {
// Failed, set as retry item
retryItem = item
hasRetry = true
} else {
// Success, decrement size
q.size.Add(-1)
}
} else {
// channel closed, finish worker
return
Expand All @@ -194,14 +178,7 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume
select {
case item, ok := <-queue:
if ok {
func() {
defer func() {
if r := recover(); r != nil {
// Panic occurred, but with retry disabled we still decrement size
}
}()
_ = consumer.Consume(item)
}()
_ = consumer.Consume(item)
q.size.Add(-1)
} else {
// channel closed, finish worker
Expand Down
49 changes: 0 additions & 49 deletions pkg/utils/queue/bonded_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,55 +558,6 @@ func TestConsumerFunc(t *testing.T) {
})
}

// TestBoundedQueuePanicHandling tests panic recovery in consumers
func TestBoundedQueuePanicHandling(t *testing.T) {
tests := []struct {
name string
retryEnabled bool
expectedMin int // Minimum expected processed items
}{
{"panic with retry disabled", false, 1},
{"panic with retry enabled", true, 0}, // With retry, panic item may be retried and dropped
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var processed int32
consumer := func(item interface{}) error {
if item == "panic" {
panic("test panic")
}
atomic.AddInt32(&processed, 1)
return nil
}

var q *BoundedQueue
if tt.retryEnabled {
q = NewBoundedQueueWithRetry(10, nil, 2, 10*time.Millisecond)
} else {
q = NewBoundedQueue(10, nil)
}
defer q.Stop()

q.StartConsumers(1, consumer)
time.Sleep(10 * time.Millisecond)

// This should not crash the test
q.Produce("panic")
time.Sleep(200 * time.Millisecond) // Longer wait for retry scenarios

// Queue should still be functional
q.Produce("normal")
time.Sleep(200 * time.Millisecond)

finalProcessed := atomic.LoadInt32(&processed)
if int(finalProcessed) < tt.expectedMin {
t.Errorf("Expected at least %d processed items, got %d", tt.expectedMin, finalProcessed)
}
})
}
}

// min helper function for Go versions that don't have it built-in
func min(a, b int) int {
if a < b {
Expand Down
Loading