From db8c8ab4e36918b55393ca732f54dc8770f4971a Mon Sep 17 00:00:00 2001 From: level09 Date: Fri, 7 Mar 2025 18:11:04 +0100 Subject: [PATCH 1/2] fix(tables): Handle None events in recovery process Adds proper handling for None events in the changelog recovery process to prevent processing attempts on None events from the changelog queue. The fix silently skips None events and avoids busy loops when there are no active partitions or tables to recover. Improves stability of the recovery process by gracefully handling edge cases without adding unnecessary logging in production environments. --- faust/tables/recovery.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index 11737aef0..53353849c 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -815,6 +815,19 @@ async def detect_aborted_tx(): timeout=True, timeout_count=timeout_count ) continue + + # Check if we have a None event + if event is None: + # Check if we have any active partitions or tables to recover + if not active_tps and not standby_tps and not tp_to_table: + await _maybe_signal_recovery_end(timeout=True, timeout_count=5) + await asyncio.sleep(5.0) # Sleep to avoid busy loop + continue + + # Continue to next iteration if we have a None event + await asyncio.sleep(0.1) # Small sleep to avoid busy loop + continue + now = monotonic() timeout_count = 0 message = event.message From 00b5b0e18ca156c8dcded48241b00ab2958de4ca Mon Sep 17 00:00:00 2001 From: level09 Date: Sat, 8 Mar 2025 17:39:02 +0100 Subject: [PATCH 2/2] style: Fix linting issues in recovery.py Remove trailing whitespace and fix indentation to resolve CI check failures. --- faust/tables/recovery.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index 53353849c..7eb8b14b3 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -815,7 +815,7 @@ async def detect_aborted_tx(): timeout=True, timeout_count=timeout_count ) continue - + # Check if we have a None event if event is None: # Check if we have any active partitions or tables to recover @@ -823,11 +823,11 @@ async def detect_aborted_tx(): await _maybe_signal_recovery_end(timeout=True, timeout_count=5) await asyncio.sleep(5.0) # Sleep to avoid busy loop continue - + # Continue to next iteration if we have a None event await asyncio.sleep(0.1) # Small sleep to avoid busy loop continue - + now = monotonic() timeout_count = 0 message = event.message