diff --git a/pkg/utils/queue/bonded_queue.go b/pkg/utils/queue/bonded_queue.go index e006acb..f31c3aa 100644 --- a/pkg/utils/queue/bonded_queue.go +++ b/pkg/utils/queue/bonded_queue.go @@ -127,28 +127,21 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume select { case <-time.After(retryDelay): retryAttemptNumber++ - func() { - defer func() { - if r := recover(); r != nil { - // Panic occurred, treat as error by not clearing hasRetry - } - }() - err := consumer.Consume(retryItem) - if err != nil { - // Still failing, increase delay (exponential backoff) - retryDelay = retryDelay * 2 - if retryDelay > q.retryConfig.delay { - retryDelay = q.retryConfig.delay - } - } else { - // Success! Clear retry state and decrement size - hasRetry = false - retryItem = nil - retryDelay = time.Millisecond * 100 - retryAttemptNumber = 0 - q.size.Add(-1) + err := consumer.Consume(retryItem) + if err != nil { + // Still failing, increase delay (exponential backoff) + retryDelay = retryDelay * 2 + if retryDelay > q.retryConfig.delay { + retryDelay = q.retryConfig.delay } - }() + } else { + // Success! Clear retry state and decrement size + hasRetry = false + retryItem = nil + retryDelay = time.Millisecond * 100 + retryAttemptNumber = 0 + q.size.Add(-1) + } case <-q.stopCh: // Queue is closing if hasRetry { @@ -161,24 +154,15 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume select { case item, ok := <-queue: if ok { - func() { - defer func() { - if r := recover(); r != nil { - // Panic occurred, treat as error - retryItem = item - hasRetry = true - } - }() - err := consumer.Consume(item) - if err != nil { - // Failed, set as retry item - retryItem = item - hasRetry = true - } else { - // Success, decrement size - q.size.Add(-1) - } - }() + err := consumer.Consume(item) + if err != nil { + // Failed, set as retry item + retryItem = item + hasRetry = true + } else { + // Success, decrement size + q.size.Add(-1) + } } else { // channel closed, finish worker return @@ -194,14 +178,7 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume select { case item, ok := <-queue: if ok { - func() { - defer func() { - if r := recover(); r != nil { - // Panic occurred, but with retry disabled we still decrement size - } - }() - _ = consumer.Consume(item) - }() + _ = consumer.Consume(item) q.size.Add(-1) } else { // channel closed, finish worker diff --git a/pkg/utils/queue/bonded_queue_test.go b/pkg/utils/queue/bonded_queue_test.go index 31032b2..d2a97e4 100644 --- a/pkg/utils/queue/bonded_queue_test.go +++ b/pkg/utils/queue/bonded_queue_test.go @@ -558,55 +558,6 @@ func TestConsumerFunc(t *testing.T) { }) } -// TestBoundedQueuePanicHandling tests panic recovery in consumers -func TestBoundedQueuePanicHandling(t *testing.T) { - tests := []struct { - name string - retryEnabled bool - expectedMin int // Minimum expected processed items - }{ - {"panic with retry disabled", false, 1}, - {"panic with retry enabled", true, 0}, // With retry, panic item may be retried and dropped - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - var processed int32 - consumer := func(item interface{}) error { - if item == "panic" { - panic("test panic") - } - atomic.AddInt32(&processed, 1) - return nil - } - - var q *BoundedQueue - if tt.retryEnabled { - q = NewBoundedQueueWithRetry(10, nil, 2, 10*time.Millisecond) - } else { - q = NewBoundedQueue(10, nil) - } - defer q.Stop() - - q.StartConsumers(1, consumer) - time.Sleep(10 * time.Millisecond) - - // This should not crash the test - q.Produce("panic") - time.Sleep(200 * time.Millisecond) // Longer wait for retry scenarios - - // Queue should still be functional - q.Produce("normal") - time.Sleep(200 * time.Millisecond) - - finalProcessed := atomic.LoadInt32(&processed) - if int(finalProcessed) < tt.expectedMin { - t.Errorf("Expected at least %d processed items, got %d", tt.expectedMin, finalProcessed) - } - }) - } -} - // min helper function for Go versions that don't have it built-in func min(a, b int) int { if a < b {