diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index 11737aef0..7eb8b14b3 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -815,6 +815,19 @@ async def detect_aborted_tx(): timeout=True, timeout_count=timeout_count ) continue + + # Check if we have a None event + if event is None: + # Check if we have any active partitions or tables to recover + if not active_tps and not standby_tps and not tp_to_table: + await _maybe_signal_recovery_end(timeout=True, timeout_count=5) + await asyncio.sleep(5.0) # Sleep to avoid busy loop + continue + + # Continue to next iteration if we have a None event + await asyncio.sleep(0.1) # Small sleep to avoid busy loop + continue + now = monotonic() timeout_count = 0 message = event.message