-
Notifications
You must be signed in to change notification settings - Fork 14
Re-emit track_published events for already published tracks after connecting to SFU #203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds event re-emission and a public republish API: ConnectionManager registers a wildcard listener to forward all SFU WebSocket events into itself and exposes Changes
Sequence DiagramsequenceDiagram
participant CM as ConnectionManager
participant WS as SFU WebSocket
participant SFU as SFU State/Store
participant Subs as Local Subscribers
CM->>WS: establish WebSocket connection
activate WS
WS-->>CM: connection established
WS->>CM: send events (wildcard listener forwards all events)
deactivate WS
CM->>CM: republish_tracks()
activate CM
CM->>SFU: query participants & published tracks
SFU-->>CM: participants + track metadata
loop per participant & track (excluding self)
CM->>CM: update subscriptions for track
CM->>Subs: emit "track_published" event
end
deactivate CM
note right of Subs: Subscribers receive reconstructed published-track events
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
getstream/video/rtc/connection_manager.py(3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
Plugins that work with audio, video, or WebRTC functionality should depend on
getstream[webrtc]instead of justgetstreamto access WebRTC-related dependencies likeaiortc,numpy,torch,torchaudio,soundfile,scipy,deepgram-sdk, andelevenlabs
Files:
getstream/video/rtc/connection_manager.py
🧬 Code graph analysis (1)
getstream/video/rtc/connection_manager.py (4)
getstream/utils/event_emitter.py (2)
on_wildcard(87-92)emit(21-85)getstream/video/rtc/pb/stream/video/sfu/models/models_pb2.pyi (2)
participants(520-530)published_tracks(651-656)getstream/video/rtc/participants.py (1)
get_participants(58-60)getstream/video/rtc/pb/stream/video/sfu/event/events_pb2.pyi (1)
TrackPublished(590-631)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Tests (3.10)
- GitHub Check: Tests (3.12)
- GitHub Check: Tests (3.11)
- GitHub Check: Tests (3.13)
🔇 Additional comments (1)
getstream/video/rtc/connection_manager.py (1)
325-326: LGTM! Wildcard re-emission enables external event subscription.The placement is correct—specific handlers are registered first (lines 306-323), then the wildcard re-emits all events through ConnectionManager's emit method. This allows external code to subscribe to SFU events via ConnectionManager without direct access to the WebSocket client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
getstream/video/rtc/connection_manager.py (1)
539-576: Make_republish_existing_tracksfully best‑effort soconnect()can’t end in an inconsistent stateRight now only
self._ws_client.emit(...)is wrapped intry/except. If anything before that raises (e.g.,participants_state.get_participants(), iteratingparticipant.published_tracks, or constructingevents_pb2.TrackPublished), the exception will bubble out of_republish_existing_tracks(). Sinceconnect()has already marked the connection asJOINEDandrunning=Truebefore this call, such an exception would causeconnect()to fail while leaving a live WS connection and a “joined” state.To keep
connect()’s contract sane and address the earlier concern about inconsistent state,_republish_existing_tracks()should never raise and should treat re‑emission as best‑effort.You can do this by broadening the error handling to cover event construction and iteration, and logging/continuing on failure instead of propagating:
async def _republish_existing_tracks(self) -> None: @@ - if not self._ws_client: - return None - - participants = self.participants_state.get_participants() - - for participant in participants: - # Skip the tracks belonging to this connection - if participant.session_id == self.session_id: - continue - - for track_type_int in participant.published_tracks: - event = events_pb2.TrackPublished( - user_id=participant.user_id, - session_id=participant.session_id, - participant=participant, - type=track_type_int, - ) - try: - self._ws_client.emit("track_published", event) - except Exception: - logger.exception( - f"Failed to emit track_published event " - f"for the already published " - f"track {participant.user_id}:{participant.session_id}:{track_type_int}" - ) - - return None + if not self._ws_client: + # No signaling client available; nothing to republish. + return None + + try: + participants = self.participants_state.get_participants() + except Exception: + logger.exception("Failed to get participants for republishing tracks") + return None + + for participant in participants: + # Skip the tracks belonging to this connection + if participant.session_id == self.session_id: + continue + + for track_type_int in getattr(participant, "published_tracks", []): + try: + event = events_pb2.TrackPublished( + user_id=participant.user_id, + session_id=participant.session_id, + participant=participant, + type=track_type_int, + ) + self._ws_client.emit("track_published", event) + except Exception: + logger.exception( + "Failed to emit track_published event for " + "track %s:%s:%s", + participant.user_id, + participant.session_id, + track_type_int, + ) + + return NoneThis keeps the connection usable even if re‑emission encounters bad data and aligns with the goal of not letting this helper break the overall connect flow.
🧹 Nitpick comments (1)
getstream/video/rtc/connection_manager.py (1)
384-386: Re‑publish call placement after successful connect is appropriateCalling
await self._republish_existing_tracks()after_connect_internal()ensures:
- WS connection and participants state are established first.
- Track re‑emission happens before
connect()returns to user code.Given
_republish_existing_tracks()is designed as a best‑effort helper, this placement makes sense. If you want more observability later, consider a debug log or telemetry span around the republish call, but it’s not required for correctness.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
getstream/video/rtc/connection_manager.py(3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
Plugins that work with audio, video, or WebRTC functionality should depend on
getstream[webrtc]instead of justgetstreamto access WebRTC-related dependencies likeaiortc,numpy,torch,torchaudio,soundfile,scipy,deepgram-sdk, andelevenlabs
Files:
getstream/video/rtc/connection_manager.py
🧬 Code graph analysis (1)
getstream/video/rtc/connection_manager.py (3)
getstream/utils/event_emitter.py (2)
on_wildcard(87-92)emit(21-85)getstream/video/rtc/pb/stream/video/sfu/models/models_pb2.pyi (2)
participants(520-530)published_tracks(651-656)getstream/video/rtc/participants.py (1)
get_participants(58-60)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Tests (3.12)
- GitHub Check: Tests (3.11)
- GitHub Check: Tests (3.10)
- GitHub Check: Tests (3.13)
🔇 Additional comments (1)
getstream/video/rtc/connection_manager.py (1)
325-327: Wildcard re‑emission from WS client to ConnectionManager looks goodForwarding all SFU WS events via
self._ws_client.on_wildcard("*", self.emit)cleanly exposes them throughConnectionManager’s own emitter while keeping the existing per‑event handlers intact. This is a reasonable, low‑friction way to let callers subscribe at theConnectionManagerlevel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
getstream/video/rtc/connection_manager.py (2)
384-386: Consider top-level error handling for robustness.While the
_republish_existing_tracks()implementation now includes per-track error handling, a catastrophic failure (e.g., unexpected exception before entering the participant loop) would leave the connection state asJOINEDwithout republishing tracks. According to the PR description, missing these events prevents track discovery for late joiners.Although the internal defensive checks (e.g., early return if
_ws_clientis None) reduce this risk, wrapping the call in a try/except would prevent inconsistent state and provide clearer diagnostics:# Re-publish the already published tracks because # SFU doesn't send events for them. -await self._republish_existing_tracks() +try: + await self._republish_existing_tracks() +except Exception as e: + logger.error(f"Failed to republish existing tracks: {e}", exc_info=True) + self.emit("error", {"message": "Failed to republish existing tracks", "error": e})
579-579: Optional: Remove redundant return statement.The explicit
return Noneis unnecessary since Python functions implicitly returnNone. While harmless, removing it would make the code slightly cleaner.Apply this diff:
f"track {participant.user_id}:{participant.session_id}:{track_type_int}" ) - - return None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
getstream/video/rtc/connection_manager.py(3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
Plugins that work with audio, video, or WebRTC functionality should depend on
getstream[webrtc]instead of justgetstreamto access WebRTC-related dependencies likeaiortc,numpy,torch,torchaudio,soundfile,scipy,deepgram-sdk, andelevenlabs
Files:
getstream/video/rtc/connection_manager.py
🧬 Code graph analysis (1)
getstream/video/rtc/connection_manager.py (5)
getstream/utils/event_emitter.py (2)
on_wildcard(87-92)emit(21-85)getstream/video/rtc/pb/stream/video/sfu/models/models_pb2.pyi (2)
participants(520-530)published_tracks(651-656)getstream/video/rtc/participants.py (1)
get_participants(58-60)getstream/video/rtc/pb/stream/video/sfu/event/events_pb2.pyi (1)
TrackPublished(590-631)getstream/video/rtc/tracks.py (1)
handle_track_published(199-266)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Tests (3.11)
- GitHub Check: Tests (3.12)
- GitHub Check: Tests (3.13)
- GitHub Check: Tests (3.10)
🔇 Additional comments (1)
getstream/video/rtc/connection_manager.py (1)
325-327: LGTM! Event forwarding enables external listeners.The wildcard listener correctly forwards all WebSocket events to the ConnectionManager's event emitter, allowing consumers to subscribe to events on the ConnectionManager itself rather than the internal
_ws_client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
getstream/video/rtc/connection_manager.py(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
Plugins that work with audio, video, or WebRTC functionality should depend on
getstream[webrtc]instead of justgetstreamto access WebRTC-related dependencies likeaiortc,numpy,torch,torchaudio,soundfile,scipy,deepgram-sdk, andelevenlabs
Files:
getstream/video/rtc/connection_manager.py
🧬 Code graph analysis (1)
getstream/video/rtc/connection_manager.py (3)
getstream/utils/event_emitter.py (2)
on_wildcard(87-92)emit(21-85)getstream/video/rtc/participants.py (1)
get_participants(58-60)getstream/video/rtc/pb/stream/video/sfu/event/events_pb2.pyi (1)
TrackPublished(590-631)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Tests (3.13)
- GitHub Check: Tests (3.12)
- GitHub Check: Tests (3.10)
- GitHub Check: Tests (3.11)
🔇 Additional comments (1)
getstream/video/rtc/connection_manager.py (1)
325-327: Wildcard bridge from SFU WebSocket into ConnectionManager looks goodHooking
self._ws_client.on_wildcard("*", self.emit)matches theemit(event, *args, **kwargs)signature and cleanly forwards all SFU events throughConnectionManager, enabling clients to subscribe at this higher level without changing existing handlers.
Problem:
track_publishedevents are not always emitted for the already published tracks with the participant joins.Without these events, the video track may never be discovered by the participants who joined later.
This PR re-emits
track_publishedevents after the WS connection is established to force the track update.Summary by CodeRabbit
New Features
Bug Fixes
✏️ Tip: You can customize this high-level summary in your review settings.