Skip to content

Conversation

@dangusev
Copy link
Contributor

@dangusev dangusev commented Dec 9, 2025

Problem: track_published events are not always emitted for the already published tracks with the participant joins.

Without these events, the video track may never be discovered by the participants who joined later.

This PR re-emits track_published events after the WS connection is established to force the track update.

Summary by CodeRabbit

  • New Features

    • Added a public async method to re-announce already-published tracks so existing participant tracks can be propagated on demand.
  • Bug Fixes

    • Restored visibility of tracks for late-joining participants: previously-published tracks are replayed and become observable when joining an ongoing session.
    • Improved event propagation on connect: the connection now forwards session events after establishing the socket so session state is reconstructed reliably on join.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 9, 2025

Walkthrough

Adds event re-emission and a public republish API: ConnectionManager registers a wildcard listener to forward all SFU WebSocket events into itself and exposes async republish_tracks(self) -> None which iterates existing participants' published tracks and re-emits track_published events downstream. (≤50 words)

Changes

Cohort / File(s) Summary
Post-join event re-emission & track republish
getstream/video/rtc/connection_manager.py
Registers a wildcard WebSocket listener to re-emit incoming SFU events through ConnectionManager; adds public async republish_tracks(self) -> None to iterate participants' published tracks (excluding self), update subscriptions, and emit track_published events; connect flow updated to perform re-emission and invoke republishing.

Sequence Diagram

sequenceDiagram
    participant CM as ConnectionManager
    participant WS as SFU WebSocket
    participant SFU as SFU State/Store
    participant Subs as Local Subscribers

    CM->>WS: establish WebSocket connection
    activate WS
    WS-->>CM: connection established
    WS->>CM: send events (wildcard listener forwards all events)
    deactivate WS

    CM->>CM: republish_tracks()
    activate CM
    CM->>SFU: query participants & published tracks
    SFU-->>CM: participants + track metadata
    loop per participant & track (excluding self)
        CM->>CM: update subscriptions for track
        CM->>Subs: emit "track_published" event
    end
    deactivate CM

    note right of Subs: Subscribers receive reconstructed published-track events
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • Review event schema and payload shape for re-emitted track_published events.
  • Verify ordering: wildcard registration vs. calling republish_tracks.
  • Check duplication avoidance (tracks already emitted by SFU).
  • Confirm error handling per-track and subscription update ordering.

Poem

🐰 I hopped back in where signals play,
I heard the calls and sent them your way,
Each track I nudged from quiet to bright,
A thump, a stitch — I set them right,
Little paws, big echoes — all okay.

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately reflects the main change: re-emitting track_published events for already-published tracks after SFU connection, which directly addresses the PR's core objective.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/reemit-published-tracks

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between eaee60d and e0d8f67.

📒 Files selected for processing (1)
  • getstream/video/rtc/connection_manager.py (3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

Plugins that work with audio, video, or WebRTC functionality should depend on getstream[webrtc] instead of just getstream to access WebRTC-related dependencies like aiortc, numpy, torch, torchaudio, soundfile, scipy, deepgram-sdk, and elevenlabs

Files:

  • getstream/video/rtc/connection_manager.py
🧬 Code graph analysis (1)
getstream/video/rtc/connection_manager.py (4)
getstream/utils/event_emitter.py (2)
  • on_wildcard (87-92)
  • emit (21-85)
getstream/video/rtc/pb/stream/video/sfu/models/models_pb2.pyi (2)
  • participants (520-530)
  • published_tracks (651-656)
getstream/video/rtc/participants.py (1)
  • get_participants (58-60)
getstream/video/rtc/pb/stream/video/sfu/event/events_pb2.pyi (1)
  • TrackPublished (590-631)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Tests (3.10)
  • GitHub Check: Tests (3.12)
  • GitHub Check: Tests (3.11)
  • GitHub Check: Tests (3.13)
🔇 Additional comments (1)
getstream/video/rtc/connection_manager.py (1)

325-326: LGTM! Wildcard re-emission enables external event subscription.

The placement is correct—specific handlers are registered first (lines 306-323), then the wildcard re-emits all events through ConnectionManager's emit method. This allows external code to subscribe to SFU events via ConnectionManager without direct access to the WebSocket client.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
getstream/video/rtc/connection_manager.py (1)

539-576: Make _republish_existing_tracks fully best‑effort so connect() can’t end in an inconsistent state

Right now only self._ws_client.emit(...) is wrapped in try/except. If anything before that raises (e.g., participants_state.get_participants(), iterating participant.published_tracks, or constructing events_pb2.TrackPublished), the exception will bubble out of _republish_existing_tracks(). Since connect() has already marked the connection as JOINED and running=True before this call, such an exception would cause connect() to fail while leaving a live WS connection and a “joined” state.

To keep connect()’s contract sane and address the earlier concern about inconsistent state, _republish_existing_tracks() should never raise and should treat re‑emission as best‑effort.

You can do this by broadening the error handling to cover event construction and iteration, and logging/continuing on failure instead of propagating:

     async def _republish_existing_tracks(self) -> None:
@@
-        if not self._ws_client:
-            return None
-
-        participants = self.participants_state.get_participants()
-
-        for participant in participants:
-            # Skip the tracks belonging to this connection
-            if participant.session_id == self.session_id:
-                continue
-
-            for track_type_int in participant.published_tracks:
-                event = events_pb2.TrackPublished(
-                    user_id=participant.user_id,
-                    session_id=participant.session_id,
-                    participant=participant,
-                    type=track_type_int,
-                )
-                try:
-                    self._ws_client.emit("track_published", event)
-                except Exception:
-                    logger.exception(
-                        f"Failed to emit track_published event "
-                        f"for the already published "
-                        f"track {participant.user_id}:{participant.session_id}:{track_type_int}"
-                    )
-
-        return None
+        if not self._ws_client:
+            # No signaling client available; nothing to republish.
+            return None
+
+        try:
+            participants = self.participants_state.get_participants()
+        except Exception:
+            logger.exception("Failed to get participants for republishing tracks")
+            return None
+
+        for participant in participants:
+            # Skip the tracks belonging to this connection
+            if participant.session_id == self.session_id:
+                continue
+
+            for track_type_int in getattr(participant, "published_tracks", []):
+                try:
+                    event = events_pb2.TrackPublished(
+                        user_id=participant.user_id,
+                        session_id=participant.session_id,
+                        participant=participant,
+                        type=track_type_int,
+                    )
+                    self._ws_client.emit("track_published", event)
+                except Exception:
+                    logger.exception(
+                        "Failed to emit track_published event for "
+                        "track %s:%s:%s",
+                        participant.user_id,
+                        participant.session_id,
+                        track_type_int,
+                    )
+
+        return None

This keeps the connection usable even if re‑emission encounters bad data and aligns with the goal of not letting this helper break the overall connect flow.

🧹 Nitpick comments (1)
getstream/video/rtc/connection_manager.py (1)

384-386: Re‑publish call placement after successful connect is appropriate

Calling await self._republish_existing_tracks() after _connect_internal() ensures:

  • WS connection and participants state are established first.
  • Track re‑emission happens before connect() returns to user code.

Given _republish_existing_tracks() is designed as a best‑effort helper, this placement makes sense. If you want more observability later, consider a debug log or telemetry span around the republish call, but it’s not required for correctness.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between e0d8f67 and d0ad244.

📒 Files selected for processing (1)
  • getstream/video/rtc/connection_manager.py (3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

Plugins that work with audio, video, or WebRTC functionality should depend on getstream[webrtc] instead of just getstream to access WebRTC-related dependencies like aiortc, numpy, torch, torchaudio, soundfile, scipy, deepgram-sdk, and elevenlabs

Files:

  • getstream/video/rtc/connection_manager.py
🧬 Code graph analysis (1)
getstream/video/rtc/connection_manager.py (3)
getstream/utils/event_emitter.py (2)
  • on_wildcard (87-92)
  • emit (21-85)
getstream/video/rtc/pb/stream/video/sfu/models/models_pb2.pyi (2)
  • participants (520-530)
  • published_tracks (651-656)
getstream/video/rtc/participants.py (1)
  • get_participants (58-60)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Tests (3.12)
  • GitHub Check: Tests (3.11)
  • GitHub Check: Tests (3.10)
  • GitHub Check: Tests (3.13)
🔇 Additional comments (1)
getstream/video/rtc/connection_manager.py (1)

325-327: Wildcard re‑emission from WS client to ConnectionManager looks good

Forwarding all SFU WS events via self._ws_client.on_wildcard("*", self.emit) cleanly exposes them through ConnectionManager’s own emitter while keeping the existing per‑event handlers intact. This is a reasonable, low‑friction way to let callers subscribe at the ConnectionManager level.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
getstream/video/rtc/connection_manager.py (2)

384-386: Consider top-level error handling for robustness.

While the _republish_existing_tracks() implementation now includes per-track error handling, a catastrophic failure (e.g., unexpected exception before entering the participant loop) would leave the connection state as JOINED without republishing tracks. According to the PR description, missing these events prevents track discovery for late joiners.

Although the internal defensive checks (e.g., early return if _ws_client is None) reduce this risk, wrapping the call in a try/except would prevent inconsistent state and provide clearer diagnostics:

 # Re-publish the already published tracks because
 # SFU doesn't send events for them.
-await self._republish_existing_tracks()
+try:
+    await self._republish_existing_tracks()
+except Exception as e:
+    logger.error(f"Failed to republish existing tracks: {e}", exc_info=True)
+    self.emit("error", {"message": "Failed to republish existing tracks", "error": e})

579-579: Optional: Remove redundant return statement.

The explicit return None is unnecessary since Python functions implicitly return None. While harmless, removing it would make the code slightly cleaner.

Apply this diff:

                     f"track {participant.user_id}:{participant.session_id}:{track_type_int}"
                 )
-
-        return None
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between d0ad244 and 2111c61.

📒 Files selected for processing (1)
  • getstream/video/rtc/connection_manager.py (3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

Plugins that work with audio, video, or WebRTC functionality should depend on getstream[webrtc] instead of just getstream to access WebRTC-related dependencies like aiortc, numpy, torch, torchaudio, soundfile, scipy, deepgram-sdk, and elevenlabs

Files:

  • getstream/video/rtc/connection_manager.py
🧬 Code graph analysis (1)
getstream/video/rtc/connection_manager.py (5)
getstream/utils/event_emitter.py (2)
  • on_wildcard (87-92)
  • emit (21-85)
getstream/video/rtc/pb/stream/video/sfu/models/models_pb2.pyi (2)
  • participants (520-530)
  • published_tracks (651-656)
getstream/video/rtc/participants.py (1)
  • get_participants (58-60)
getstream/video/rtc/pb/stream/video/sfu/event/events_pb2.pyi (1)
  • TrackPublished (590-631)
getstream/video/rtc/tracks.py (1)
  • handle_track_published (199-266)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Tests (3.11)
  • GitHub Check: Tests (3.12)
  • GitHub Check: Tests (3.13)
  • GitHub Check: Tests (3.10)
🔇 Additional comments (1)
getstream/video/rtc/connection_manager.py (1)

325-327: LGTM! Event forwarding enables external listeners.

The wildcard listener correctly forwards all WebSocket events to the ConnectionManager's event emitter, allowing consumers to subscribe to events on the ConnectionManager itself rather than the internal _ws_client.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between d0439bb and e4c5bcc.

📒 Files selected for processing (1)
  • getstream/video/rtc/connection_manager.py (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

Plugins that work with audio, video, or WebRTC functionality should depend on getstream[webrtc] instead of just getstream to access WebRTC-related dependencies like aiortc, numpy, torch, torchaudio, soundfile, scipy, deepgram-sdk, and elevenlabs

Files:

  • getstream/video/rtc/connection_manager.py
🧬 Code graph analysis (1)
getstream/video/rtc/connection_manager.py (3)
getstream/utils/event_emitter.py (2)
  • on_wildcard (87-92)
  • emit (21-85)
getstream/video/rtc/participants.py (1)
  • get_participants (58-60)
getstream/video/rtc/pb/stream/video/sfu/event/events_pb2.pyi (1)
  • TrackPublished (590-631)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Tests (3.13)
  • GitHub Check: Tests (3.12)
  • GitHub Check: Tests (3.10)
  • GitHub Check: Tests (3.11)
🔇 Additional comments (1)
getstream/video/rtc/connection_manager.py (1)

325-327: Wildcard bridge from SFU WebSocket into ConnectionManager looks good

Hooking self._ws_client.on_wildcard("*", self.emit) matches the emit(event, *args, **kwargs) signature and cleanly forwards all SFU events through ConnectionManager, enabling clients to subscribe at this higher level without changing existing handlers.

@tbarbugli tbarbugli merged commit ba5d58b into main Dec 12, 2025
5 checks passed
@dangusev dangusev deleted the fix/reemit-published-tracks branch December 12, 2025 13:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants