From 5c1e168e23e269e888ad25add63aa29372c1e263 Mon Sep 17 00:00:00 2001 From: James Ding Date: Mon, 17 Nov 2025 14:42:44 -0600 Subject: [PATCH 1/4] feat: add WebSocketOptions for configurable WebSocket connections --- src/fishaudio/__init__.py | 7 ++++- src/fishaudio/core/__init__.py | 2 ++ src/fishaudio/core/websocket_options.py | 42 +++++++++++++++++++++++++ src/fishaudio/resources/tts.py | 12 ++++++- 4 files changed, 61 insertions(+), 2 deletions(-) create mode 100644 src/fishaudio/core/websocket_options.py diff --git a/src/fishaudio/__init__.py b/src/fishaudio/__init__.py index dcedf83..ce383a2 100644 --- a/src/fishaudio/__init__.py +++ b/src/fishaudio/__init__.py @@ -29,6 +29,7 @@ from ._version import __version__ from .client import AsyncFishAudio, FishAudio from .core.iterators import AsyncAudioStream, AudioStream +from .core.websocket_options import WebSocketOptions from .exceptions import ( APIError, AuthenticationError, @@ -41,7 +42,7 @@ ValidationError, WebSocketError, ) -from .types import FlushEvent, TextEvent +from .types import FlushEvent, ReferenceAudio, TextEvent, TTSConfig from .utils import play, save, stream # Main exports @@ -56,8 +57,12 @@ # Audio streams "AudioStream", "AsyncAudioStream", + # Configuration + "TTSConfig", + "WebSocketOptions", # Types "FlushEvent", + "ReferenceAudio", "TextEvent", # Exceptions "APIError", diff --git a/src/fishaudio/core/__init__.py b/src/fishaudio/core/__init__.py index 633f95d..2b0cc45 100644 --- a/src/fishaudio/core/__init__.py +++ b/src/fishaudio/core/__init__.py @@ -3,10 +3,12 @@ from .client_wrapper import AsyncClientWrapper, ClientWrapper from .omit import OMIT from .request_options import RequestOptions +from .websocket_options import WebSocketOptions __all__ = [ "AsyncClientWrapper", "ClientWrapper", "OMIT", "RequestOptions", + "WebSocketOptions", ] diff --git a/src/fishaudio/core/websocket_options.py b/src/fishaudio/core/websocket_options.py new file mode 100644 index 0000000..3308aca --- /dev/null +++ b/src/fishaudio/core/websocket_options.py @@ -0,0 +1,42 @@ +"""WebSocket-level options for WebSocket connections.""" + +from typing import Any, Dict, Optional + + +class WebSocketOptions: + """ + Options that can be provided to configure WebSocket connections. + + Attributes: + keepalive_ping_timeout_seconds: Maximum time to wait for a pong response + to a keepalive ping before considering the connection dead (default: 20s) + keepalive_ping_interval_seconds: Interval between keepalive pings (default: 20s) + max_message_size_bytes: Maximum size for incoming messages (default: 65,536 bytes) + queue_size: Size of the message receive queue (default: 512) + """ + + def __init__( + self, + *, + keepalive_ping_timeout_seconds: Optional[float] = None, + keepalive_ping_interval_seconds: Optional[float] = None, + max_message_size_bytes: Optional[int] = None, + queue_size: Optional[int] = None, + ): + self.keepalive_ping_timeout_seconds = keepalive_ping_timeout_seconds + self.keepalive_ping_interval_seconds = keepalive_ping_interval_seconds + self.max_message_size_bytes = max_message_size_bytes + self.queue_size = queue_size + + def to_httpx_ws_kwargs(self) -> Dict[str, Any]: + """Convert to kwargs dict for httpx_ws aconnect_ws/connect_ws.""" + kwargs = {} + if self.keepalive_ping_timeout_seconds is not None: + kwargs["keepalive_ping_timeout_seconds"] = self.keepalive_ping_timeout_seconds + if self.keepalive_ping_interval_seconds is not None: + kwargs["keepalive_ping_interval_seconds"] = self.keepalive_ping_interval_seconds + if self.max_message_size_bytes is not None: + kwargs["max_message_size_bytes"] = self.max_message_size_bytes + if self.queue_size is not None: + kwargs["queue_size"] = self.queue_size + return kwargs diff --git a/src/fishaudio/resources/tts.py b/src/fishaudio/resources/tts.py index bd3ceec..b9f646e 100644 --- a/src/fishaudio/resources/tts.py +++ b/src/fishaudio/resources/tts.py @@ -8,7 +8,7 @@ from httpx_ws import AsyncWebSocketSession, WebSocketSession, aconnect_ws, connect_ws from .realtime import aiter_websocket_audio, iter_websocket_audio -from ..core import AsyncClientWrapper, ClientWrapper, RequestOptions +from ..core import AsyncClientWrapper, ClientWrapper, RequestOptions, WebSocketOptions from ..core.iterators import AsyncAudioStream, AudioStream from ..types import ( AudioFormat, @@ -215,6 +215,7 @@ def stream_websocket( config: TTSConfig = TTSConfig(), model: Model = "s1", max_workers: int = 10, + ws_options: Optional[WebSocketOptions] = None, ) -> Iterator[bytes]: """ Stream text and receive audio in real-time via WebSocket. @@ -305,6 +306,9 @@ def text_generator(): speed, base=config.prosody ) + # Prepare WebSocket connection kwargs + ws_kwargs = ws_options.to_httpx_ws_kwargs() if ws_options else {} + executor = ThreadPoolExecutor(max_workers=max_workers) try: @@ -316,6 +320,7 @@ def text_generator(): "model": model, "Authorization": f"Bearer {self._client.api_key}", }, + **ws_kwargs, ) as ws: def sender(): @@ -502,6 +507,7 @@ async def stream_websocket( speed: Optional[float] = None, config: TTSConfig = TTSConfig(), model: Model = "s1", + ws_options: Optional[WebSocketOptions] = None, ): """ Stream text and receive audio in real-time via WebSocket (async). @@ -591,11 +597,15 @@ async def text_generator(): speed, base=config.prosody ) + # Prepare WebSocket connection kwargs + ws_kwargs = ws_options.to_httpx_ws_kwargs() if ws_options else {} + ws: AsyncWebSocketSession async with aconnect_ws( "/v1/tts/live", client=self._client.client, headers={"model": model, "Authorization": f"Bearer {self._client.api_key}"}, + **ws_kwargs, ) as ws: async def sender(): From 2bf07e8c17ad86074ddd86ccef54333e1d48250c Mon Sep 17 00:00:00 2001 From: James Ding Date: Mon, 17 Nov 2025 15:30:46 -0600 Subject: [PATCH 2/4] style: improve formatting of WebSocketOptions kwargs conversion --- src/fishaudio/core/websocket_options.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/fishaudio/core/websocket_options.py b/src/fishaudio/core/websocket_options.py index 3308aca..9e05f71 100644 --- a/src/fishaudio/core/websocket_options.py +++ b/src/fishaudio/core/websocket_options.py @@ -32,9 +32,13 @@ def to_httpx_ws_kwargs(self) -> Dict[str, Any]: """Convert to kwargs dict for httpx_ws aconnect_ws/connect_ws.""" kwargs = {} if self.keepalive_ping_timeout_seconds is not None: - kwargs["keepalive_ping_timeout_seconds"] = self.keepalive_ping_timeout_seconds + kwargs["keepalive_ping_timeout_seconds"] = ( + self.keepalive_ping_timeout_seconds + ) if self.keepalive_ping_interval_seconds is not None: - kwargs["keepalive_ping_interval_seconds"] = self.keepalive_ping_interval_seconds + kwargs["keepalive_ping_interval_seconds"] = ( + self.keepalive_ping_interval_seconds + ) if self.max_message_size_bytes is not None: kwargs["max_message_size_bytes"] = self.max_message_size_bytes if self.queue_size is not None: From c31687500e15c807f87018feef36115c3dee6058 Mon Sep 17 00:00:00 2001 From: James Ding Date: Mon, 17 Nov 2025 17:56:02 -0600 Subject: [PATCH 3/4] test: add unit tests for WebSocketOptions and its integration in streaming --- .../test_tts_websocket_integration.py | 54 +++++++++++++++++ tests/unit/test_core.py | 40 ++++++++++++- tests/unit/test_tts_realtime.py | 58 ++++++++++++++++++- 3 files changed, 150 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_tts_websocket_integration.py b/tests/integration/test_tts_websocket_integration.py index 8cfe555..8341a18 100644 --- a/tests/integration/test_tts_websocket_integration.py +++ b/tests/integration/test_tts_websocket_integration.py @@ -2,6 +2,7 @@ import pytest +from fishaudio import WebSocketOptions from fishaudio.types import Prosody, TTSConfig, TextEvent, FlushEvent from .conftest import TEST_REFERENCE_ID @@ -118,6 +119,59 @@ def text_stream(): with pytest.raises(WebSocketError, match="WebSocket stream ended with error"): list(client.tts.stream_websocket(text_stream())) + def test_websocket_very_long_generation_with_timeout(self, client, save_audio): + """ + Test that very long text generation succeeds with increased timeout. + + This test generates a very long response that could potentially take >20 seconds + to fully generate, which would cause a WebSocketNetworkError with the default + keepalive_ping_timeout_seconds=20. By using an increased timeout of 60 seconds, + we can handle longer generation times without disconnection. + + This is the SOLUTION to issue #47. To reproduce the timeout issue, run: + python reproduce_issue_47.py --mode=both + """ + # Use significantly increased timeout to handle very long generations + ws_options = WebSocketOptions( + keepalive_ping_timeout_seconds=60.0, + keepalive_ping_interval_seconds=30.0, + ) + + def text_stream(): + # Generate a very long piece of text that will take significant time to process + long_text = [ + "This is a test of very long form text-to-speech generation. ", + "We are testing the ability to handle extended generation times without timing out. ", + "The default WebSocket keepalive timeout of 20 seconds can be insufficient for long responses. ", + "By increasing the keepalive_ping_timeout_seconds to 60 seconds, we allow for longer gaps between chunks. ", + "This is particularly important for conversational AI applications where responses can be quite lengthy. ", + "The WebSocket connection should remain stable throughout the entire generation process. ", + "We include enough text here to ensure the generation takes a substantial amount of time. ", + "This helps verify that the increased timeout setting is working correctly. ", + "The audio streaming should continue smoothly without any network errors. ", + "Each sentence adds more content to be synthesized into speech. ", + "The system should handle this gracefully with the custom WebSocket options. ", + "This demonstrates the practical value of the WebSocketOptions feature. ", + "Users can now configure timeouts based on their specific use case requirements. ", + "Long-form content generation is now much more reliable. ", + "The implementation passes through all necessary parameters to the underlying httpx_ws library. ", + ] + for sentence in long_text: + yield sentence + + # This should succeed with increased timeout + audio_chunks = list( + client.tts.stream_websocket(text_stream(), ws_options=ws_options) + ) + + assert len(audio_chunks) > 0, "Should receive audio chunks for very long text" + complete_audio = b"".join(audio_chunks) + # Very long text should produce substantial audio + assert len(complete_audio) > 10000, ( + "Very long text should produce substantial audio data" + ) + save_audio(audio_chunks, "test_websocket_very_long_with_timeout.mp3") + class TestAsyncTTSWebSocketIntegration: """Test async TTS WebSocket streaming with real API.""" diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py index f77dc04..75c6741 100644 --- a/tests/unit/test_core.py +++ b/tests/unit/test_core.py @@ -4,7 +4,13 @@ from unittest.mock import patch import httpx -from fishaudio.core import OMIT, ClientWrapper, AsyncClientWrapper, RequestOptions +from fishaudio.core import ( + OMIT, + ClientWrapper, + AsyncClientWrapper, + RequestOptions, + WebSocketOptions, +) class TestOMIT: @@ -51,6 +57,38 @@ def test_get_timeout(self): assert timeout.connect == 30.0 +class TestWebSocketOptions: + """Test WebSocketOptions class.""" + + def test_to_httpx_ws_kwargs_all_options(self): + """Test to_httpx_ws_kwargs with all options set.""" + options = WebSocketOptions( + keepalive_ping_timeout_seconds=60.0, + keepalive_ping_interval_seconds=30.0, + max_message_size_bytes=131072, + queue_size=1024, + ) + kwargs = options.to_httpx_ws_kwargs() + assert kwargs == { + "keepalive_ping_timeout_seconds": 60.0, + "keepalive_ping_interval_seconds": 30.0, + "max_message_size_bytes": 131072, + "queue_size": 1024, + } + + def test_to_httpx_ws_kwargs_partial_options(self): + """Test to_httpx_ws_kwargs with only some options set.""" + options = WebSocketOptions(keepalive_ping_timeout_seconds=60.0) + kwargs = options.to_httpx_ws_kwargs() + assert kwargs == {"keepalive_ping_timeout_seconds": 60.0} + assert "keepalive_ping_interval_seconds" not in kwargs + + def test_to_httpx_ws_kwargs_no_options(self): + """Test to_httpx_ws_kwargs with no options set.""" + options = WebSocketOptions() + assert options.to_httpx_ws_kwargs() == {} + + class TestClientWrapper: """Test sync ClientWrapper.""" diff --git a/tests/unit/test_tts_realtime.py b/tests/unit/test_tts_realtime.py index 27874bb..9cd62e6 100644 --- a/tests/unit/test_tts_realtime.py +++ b/tests/unit/test_tts_realtime.py @@ -3,7 +3,7 @@ import pytest from unittest.mock import Mock, AsyncMock, MagicMock, patch -from fishaudio.core import ClientWrapper, AsyncClientWrapper +from fishaudio.core import ClientWrapper, AsyncClientWrapper, WebSocketOptions from fishaudio.resources.tts import TTSClient, AsyncTTSClient from fishaudio.types import Prosody, TTSConfig, TextEvent, FlushEvent, ReferenceAudio import ormsgpack @@ -345,6 +345,30 @@ def submit_side_effect(fn): assert len(start_event_payload["request"]["references"]) == 1 assert start_event_payload["request"]["references"][0]["text"] == "Param" + @patch("fishaudio.resources.tts.connect_ws") + @patch("fishaudio.resources.tts.ThreadPoolExecutor") + def test_stream_websocket_with_ws_options( + self, mock_executor, mock_connect_ws, tts_client, mock_client_wrapper + ): + """Test WebSocket streaming passes through WebSocketOptions.""" + mock_ws = MagicMock() + mock_ws.__enter__ = Mock(return_value=mock_ws) + mock_ws.__exit__ = Mock(return_value=None) + mock_connect_ws.return_value = mock_ws + mock_future = Mock() + mock_future.result.return_value = None + mock_executor_instance = Mock() + mock_executor_instance.submit.return_value = mock_future + mock_executor.return_value = mock_executor_instance + + with patch("fishaudio.resources.tts.iter_websocket_audio") as mock_receiver: + mock_receiver.return_value = iter([b"audio"]) + ws_options = WebSocketOptions(keepalive_ping_timeout_seconds=60.0) + list(tts_client.stream_websocket(iter(["Test"]), ws_options=ws_options)) + assert ( + mock_connect_ws.call_args[1]["keepalive_ping_timeout_seconds"] == 60.0 + ) + class TestAsyncTTSRealtimeClient: """Test asynchronous AsyncTTSClient realtime streaming.""" @@ -649,3 +673,35 @@ async def text_stream(): start_event_payload = ormsgpack.unpackb(first_call[0][0]) assert len(start_event_payload["request"]["references"]) == 1 assert start_event_payload["request"]["references"][0]["text"] == "Param" + + @pytest.mark.asyncio + @patch("fishaudio.resources.tts.aconnect_ws") + async def test_stream_websocket_with_ws_options( + self, mock_aconnect_ws, async_tts_client, async_mock_client_wrapper + ): + """Test async WebSocket streaming passes through WebSocketOptions.""" + mock_ws = MagicMock() + mock_ws.__aenter__ = AsyncMock(return_value=mock_ws) + mock_ws.__aexit__ = AsyncMock(return_value=None) + mock_ws.send_bytes = AsyncMock() + mock_aconnect_ws.return_value = mock_ws + + async def mock_audio_receiver(ws): + yield b"audio" + + with patch( + "fishaudio.resources.tts.aiter_websocket_audio", + return_value=mock_audio_receiver(mock_ws), + ): + ws_options = WebSocketOptions(keepalive_ping_timeout_seconds=60.0) + + async def text_stream(): + yield "Test" + + async for _ in async_tts_client.stream_websocket( + text_stream(), ws_options=ws_options + ): + pass + assert ( + mock_aconnect_ws.call_args[1]["keepalive_ping_timeout_seconds"] == 60.0 + ) From 14fc516c746af5c60891c23fa97e9e8da80bd9ef Mon Sep 17 00:00:00 2001 From: James Ding Date: Mon, 17 Nov 2025 18:30:48 -0600 Subject: [PATCH 4/4] feat: add WebSocket options for configurable timeouts and message limits in TTS streaming --- src/fishaudio/core/websocket_options.py | 24 +++++++++++++++----- src/fishaudio/resources/tts.py | 30 +++++++++++++++++++++++-- 2 files changed, 46 insertions(+), 8 deletions(-) diff --git a/src/fishaudio/core/websocket_options.py b/src/fishaudio/core/websocket_options.py index 9e05f71..1403922 100644 --- a/src/fishaudio/core/websocket_options.py +++ b/src/fishaudio/core/websocket_options.py @@ -5,14 +5,26 @@ class WebSocketOptions: """ - Options that can be provided to configure WebSocket connections. + Options for configuring WebSocket connections. + + These options are passed directly to httpx_ws's connect_ws/aconnect_ws functions. + For complete documentation, see https://frankie567.github.io/httpx-ws/reference/httpx_ws/ Attributes: - keepalive_ping_timeout_seconds: Maximum time to wait for a pong response - to a keepalive ping before considering the connection dead (default: 20s) - keepalive_ping_interval_seconds: Interval between keepalive pings (default: 20s) - max_message_size_bytes: Maximum size for incoming messages (default: 65,536 bytes) - queue_size: Size of the message receive queue (default: 512) + keepalive_ping_timeout_seconds: Maximum delay the client will wait for an answer + to its Ping event. If the delay is exceeded, WebSocketNetworkError will be + raised and the connection closed. Default: 20 seconds. + keepalive_ping_interval_seconds: Interval at which the client will automatically + send a Ping event to keep the connection alive. Set to None to disable this + mechanism. Default: 20 seconds. + max_message_size_bytes: Message size in bytes to receive from the server. + Default: 65536 bytes (64 KiB). + queue_size: Size of the queue where received messages will be held until they + are consumed. If the queue is full, the client will stop receiving messages + from the server until the queue has room available. Default: 512. + + Note: + Parameter descriptions adapted from httpx_ws documentation. """ def __init__( diff --git a/src/fishaudio/resources/tts.py b/src/fishaudio/resources/tts.py index b9f646e..360c892 100644 --- a/src/fishaudio/resources/tts.py +++ b/src/fishaudio/resources/tts.py @@ -232,13 +232,16 @@ def stream_websocket( config: TTS configuration (audio settings, voice, model parameters) model: TTS model to use max_workers: ThreadPoolExecutor workers for concurrent sender + ws_options: WebSocket connection options for configuring timeouts, message size limits, etc. + Useful for long-running generations that may exceed default timeout values. + See WebSocketOptions class for available parameters. Returns: Iterator of audio bytes Example: ```python - from fishaudio import FishAudio, TTSConfig, ReferenceAudio + from fishaudio import FishAudio, TTSConfig, ReferenceAudio, WebSocketOptions client = FishAudio(api_key="...") @@ -274,6 +277,16 @@ def text_generator(): ): f.write(audio_chunk) + # With WebSocket options for long-running generations + # Useful if you're generating very long responses that may take >20 seconds + ws_options = WebSocketOptions(keepalive_ping_timeout_seconds=60.0) + with open("output.mp3", "wb") as f: + for audio_chunk in client.tts.stream_websocket( + text_generator(), + ws_options=ws_options + ): + f.write(audio_chunk) + # Parameters override config values config = TTSConfig(format="mp3", latency="balanced") with open("output.wav", "wb") as f: @@ -523,13 +536,16 @@ async def stream_websocket( speed: Speech speed multiplier, e.g. 1.5 for 1.5x speed (overrides config.prosody.speed if provided) config: TTS configuration (audio settings, voice, model parameters) model: TTS model to use + ws_options: WebSocket connection options for configuring timeouts, message size limits, etc. + Useful for long-running generations that may exceed default timeout values. + See WebSocketOptions class for available parameters. Returns: Async iterator of audio bytes Example: ```python - from fishaudio import AsyncFishAudio, TTSConfig, ReferenceAudio + from fishaudio import AsyncFishAudio, TTSConfig, ReferenceAudio, WebSocketOptions client = AsyncFishAudio(api_key="...") @@ -565,6 +581,16 @@ async def text_generator(): ): await f.write(audio_chunk) + # With WebSocket options for long-running generations + # Useful if you're generating very long responses that may take >20 seconds + ws_options = WebSocketOptions(keepalive_ping_timeout_seconds=60.0) + async with aiofiles.open("output.mp3", "wb") as f: + async for audio_chunk in client.tts.stream_websocket( + text_generator(), + ws_options=ws_options + ): + await f.write(audio_chunk) + # Parameters override config values config = TTSConfig(format="mp3", latency="balanced") async with aiofiles.open("output.wav", "wb") as f: