From a44defff69dfe76e4292a9c01e545ea572200879 Mon Sep 17 00:00:00 2001 From: Nursultan Kudaibergenov Date: Fri, 16 May 2025 23:54:14 +0600 Subject: [PATCH] fixed StreamT annotations, TPorTopicSet can also be optional --- faust/streams.py | 6 +++--- faust/types/events.py | 2 +- faust/types/streams.py | 35 ++++++++++++++++++++++------------- faust/types/transports.py | 2 +- 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/faust/streams.py b/faust/streams.py index 9f4286a3f..89ffe3931 100644 --- a/faust/streams.py +++ b/faust/streams.py @@ -594,7 +594,7 @@ def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]: async def noack_take( self, max_: int, within: Seconds - ) -> AsyncIterable[Sequence[T_co]]: + ) -> AsyncIterable[Sequence[EventT[T_co]]]: """ Buffer n values at a time and yield a list of buffered values. :param max_: Max number of messages to receive. When more than this @@ -606,7 +606,7 @@ async def noack_take( the agent is likely to stall and block buffered events for an unreasonable length of time(!). """ - buffer: List[T_co] = [] + buffer: List[EventT[T_co]] = [] events: List[EventT] = [] buffer_add = buffer.append event_add = events.append @@ -635,7 +635,7 @@ async def add_to_buffer(value: T) -> T: # We want to save events instead of values to allow for manual ack event = self.current_event - buffer_add(cast(T_co, event)) + buffer_add(event) if event is None: raise RuntimeError("Take buffer found current_event is None") diff --git a/faust/types/events.py b/faust/types/events.py index 9b1498609..0e25147d5 100644 --- a/faust/types/events.py +++ b/faust/types/events.py @@ -34,7 +34,7 @@ class _SchemaT: ... # noqa class EventT(Generic[T], AsyncContextManager): app: _AppT key: K - value: V + value: T headers: Mapping message: Message acked: bool diff --git a/faust/types/streams.py b/faust/types/streams.py index b5ab4b1e6..ee354616f 100644 --- a/faust/types/streams.py +++ b/faust/types/streams.py @@ -16,7 +16,6 @@ Tuple, TypeVar, Union, - no_type_check, ) from mode import Seconds, ServiceT @@ -149,22 +148,35 @@ def info(self) -> Mapping[str, Any]: ... def clone(self, **kwargs: Any) -> "StreamT": ... @abc.abstractmethod - @no_type_check - async def items(self) -> AsyncIterator[Tuple[K, T_co]]: ... + def noack(self) -> "StreamT": ... @abc.abstractmethod - @no_type_check - async def events(self) -> AsyncIterable[EventT]: ... + def items(self) -> AsyncIterator[Tuple[K, T_co]]: ... @abc.abstractmethod - @no_type_check - async def take( + def events(self) -> AsyncIterable[EventT]: ... + + @abc.abstractmethod + def take(self, max_: int, within: Seconds) -> AsyncIterable[Sequence[T_co]]: ... + + @abc.abstractmethod + def take_events( self, max_: int, within: Seconds + ) -> AsyncIterable[Sequence[EventT[T_co]]]: ... + + @abc.abstractmethod + def take_with_timestamp( + self, max_: int, within: Seconds, timestamp_field_name: str ) -> AsyncIterable[Sequence[T_co]]: ... @abc.abstractmethod def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]: ... + @abc.abstractmethod + def noack_take( + self, max_: int, within: Seconds + ) -> AsyncIterable[Sequence[EventT[T_co]]]: ... + @abc.abstractmethod def through(self, channel: Union[str, ChannelT]) -> "StreamT": ... @@ -180,6 +192,9 @@ def group_by( topic: Optional[TopicT] = None, ) -> "StreamT": ... + @abc.abstractmethod + def filter(self, fun: Processor[T]) -> "StreamT": ... + @abc.abstractmethod def derive_topic( self, @@ -198,14 +213,8 @@ async def throw(self, exc: BaseException) -> None: ... @abc.abstractmethod def __copy__(self) -> "StreamT": ... - @abc.abstractmethod - def __iter__(self) -> Any: ... - @abc.abstractmethod def __next__(self) -> T: ... - @abc.abstractmethod - def __aiter__(self) -> AsyncIterator[T_co]: ... - @abc.abstractmethod async def ack(self, event: EventT) -> bool: ... diff --git a/faust/types/transports.py b/faust/types/transports.py index 205c56e0f..3e8750d82 100644 --- a/faust/types/transports.py +++ b/faust/types/transports.py @@ -57,7 +57,7 @@ class _AppT: ... # noqa #: Argument to Consumer.commit to specify topics/tps to commit. TPorTopic = Union[str, TP] -TPorTopicSet = AbstractSet[TPorTopic] +TPorTopicSet = Optional[AbstractSet[TPorTopic]] #: Callback (:keyword:`async def`) called when consumer partitions are revoked. PartitionsRevokedCallback = Callable[[Set[TP]], Awaitable[None]]