diff --git a/README.md b/README.md index cae62da..04e27db 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,64 @@ ![](https://github.com/ISISComputingGroup/saluki/blob/main/resources/logo.png) -Serialise/deserialise flatbuffers blobs from kafka. -This currently deserialises https://github.com/ess-dmsc/python-streaming-data-types, but I am working to make it agnostic. Python bindings for the respective schema will need to be generated. +ISIS-specific Kafka tools. +Deserialises [the ESS flatbuffers blobs](https://github.com/ess-dmsc/python-streaming-data-types) from Kafka. + +Also allows replaying data in a topic. # Usage + +To run the latest version, install [uv](https://docs.astral.sh/uv/getting-started/installation/) and use `uvx saluki `. + +alternatively you can `pip install saluki` and run it from a `venv`. + See `saluki --help` for all options. -## Listen to a topic for updates +## `listen` - Listen to a topic for updates `saluki listen mybroker:9092/mytopic` - This will listen for updates for `mytopic` on `mybroker`. -## Consume from a topic +### Filter to specific schemas + +`saluki listen mybroker:9092/mytopic -f f144 -f f142` - This will listen for updates but ignore messages with schema IDs of `f142` or `f144` + +## `consume`- Consume from a topic `saluki consume mybroker:9092/mytopic -p 1 -o 123456 -m 10` - This will print 9 messages before (and inclusively the offset specified) offset `123456` of `mytopic` on `mybroker`, in partition 1. Use the `-g` flag to go the other way, ie. in the above example to consume the 9 messages _after_ offset 123456 -# Install -`pip install saluki` +You can also filter out messages to specific schema(s) with the `-f` flag, like the example above for `listen`. + +## `sniff` - List all topics and their high, low watermarks and number of messages +`saluki sniff mybroker:9092` + +Output looks as follows: + +``` +$ saluki sniff mybroker:9092 + +INFO:saluki:Cluster ID: redpanda.0faa4595-7298-407e-9db7-7e2758d1af1f +INFO:saluki:Brokers: +INFO:saluki: 192.168.0.111:9092/1 +INFO:saluki: 192.168.0.112:9092/2 +INFO:saluki: 192.168.0.113:9092/0 +INFO:saluki:Topics: +INFO:saluki: MERLIN_events: +INFO:saluki: 0 - low:262322729, high:302663378, num_messages:40340649 +INFO:saluki: MERLIN_runInfo: +INFO:saluki: 0 - low:335, high:2516, num_messages:2181 +INFO:saluki: MERLIN_monitorHistograms: +INFO:saluki: 0 - low:7515, high:7551, num_messages:36 +``` + +## `play` - Replay data from one topic to another + +### Between offsets + +`saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -o 123 125` - This will forward messages at offset 123, 124 and 125 in the `source_topic` to the `dest_topic` + +### Between timestamps + +`saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps. -## Developer setup -`pip install .[dev]` +# Developer setup +`pip install -e .[dev]` diff --git a/pyproject.toml b/pyproject.toml index be08d11..a74c110 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,8 @@ name = "saluki" dynamic = ["version"] dependencies = [ "ess-streaming-data-types", - "confluent-kafka", + "confluent-kafka>=2.12.1", # for produce_batch in play() + "python-dateutil", "tzdata" ] readme = {file = "README.md", content-type = "text/markdown"} diff --git a/src/saluki/consume.py b/src/saluki/consume.py index e1edac8..18f9aa1 100644 --- a/src/saluki/consume.py +++ b/src/saluki/consume.py @@ -1,4 +1,5 @@ import logging +import uuid from confluent_kafka import Consumer, TopicPartition @@ -14,6 +15,8 @@ def consume( num_messages: int = 1, offset: int | None = None, go_forwards: bool = False, + schemas_to_filter_to: list[str] | None = None, + timestamp: int | None = None, ) -> None: """ consume from a topic and deserialise each message @@ -24,12 +27,14 @@ def consume( :param num_messages: number of messages to consume :param offset: offset to consume from/to :param go_forwards: whether to consume forwards or backwards + :param schemas_to_filter_to: schemas in messages to filter to + :param timestamp: optionally a timestamp as a starting point :return: None """ c = Consumer( { "bootstrap.servers": broker, - "group.id": "saluki", + "group.id": f"saluki-consume-{uuid.uuid4()}", "session.timeout.ms": 6000, "auto.offset.reset": "latest", "enable.auto.offset.store": False, @@ -38,6 +43,10 @@ def consume( } ) + if timestamp is not None: + offset = c.offsets_for_times([TopicPartition(topic, partition, timestamp)])[0].offset + logger.debug(f"offset for timestamp {timestamp} is {offset}") + if go_forwards: if offset is None: raise ValueError("Can't go forwards without an offset") @@ -57,7 +66,7 @@ def consume( try: logger.info(f"Consuming {num_messages} messages") msgs = c.consume(num_messages) - deserialise_and_print_messages(msgs, partition) + deserialise_and_print_messages(msgs, partition, schemas_to_filter_to) except Exception: logger.exception("Got exception while consuming:") finally: diff --git a/src/saluki/listen.py b/src/saluki/listen.py index cf9daa4..82a205d 100644 --- a/src/saluki/listen.py +++ b/src/saluki/listen.py @@ -1,4 +1,5 @@ import logging +import uuid from confluent_kafka import Consumer, TopicPartition @@ -7,18 +8,24 @@ logger = logging.getLogger("saluki") -def listen(broker: str, topic: str, partition: int | None = None) -> None: +def listen( + broker: str, + topic: str, + partition: int | None = None, + schemas_to_filter_to: list[str] | None = None, +) -> None: """ Listen to a topic and deserialise each message :param broker: the broker address, including the port :param topic: the topic to use :param partition: the partition to listen to (default is all partitions in a given topic) + :param schemas_to_filter_to: schemas to filter when listening to messages :return: None """ c = Consumer( { "bootstrap.servers": broker, - "group.id": "saluki", + "group.id": f"saluki-listen-{uuid.uuid4()}", "auto.offset.reset": "latest", "enable.auto.commit": False, } @@ -30,7 +37,9 @@ def listen(broker: str, topic: str, partition: int | None = None) -> None: logger.info(f"listening to {broker}/{topic}") while True: msg = c.poll(1.0) - deserialise_and_print_messages([msg], partition) + deserialise_and_print_messages( + [msg], partition, schemas_to_filter_to=schemas_to_filter_to + ) except KeyboardInterrupt: logger.debug("finished listening") finally: diff --git a/src/saluki/main.py b/src/saluki/main.py index f3b8747..1bcf97b 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -4,13 +4,17 @@ from saluki.consume import consume from saluki.listen import listen -from saluki.utils import parse_kafka_uri +from saluki.play import play +from saluki.sniff import sniff +from saluki.utils import dateutil_parsable_or_unix_timestamp, parse_kafka_uri logger = logging.getLogger("saluki") logging.basicConfig(level=logging.INFO) _LISTEN = "listen" _CONSUME = "consume" +_PLAY = "play" +_SNIFF = "sniff" def main() -> None: @@ -18,28 +22,39 @@ def main() -> None: prog="saluki", description="serialise/de-serialise flatbuffers and consume/produce from/to kafka", ) + common_options = argparse.ArgumentParser(add_help=False) + common_options.add_argument("-v", "--verbose", help="show DEBUG logs", action="store_true") + common_options.add_argument( + "-l", + "--log-file", + help="filename to output all data to", + required=False, + default=None, + type=argparse.FileType("a"), + ) - parent_parser = argparse.ArgumentParser(add_help=False) - parent_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic") + topic_parser = argparse.ArgumentParser(add_help=False) + topic_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic") - parent_parser.add_argument( + topic_parser.add_argument( "-X", "--kafka-config", help="kafka options to pass through to librdkafka", required=False, default=None, ) - parent_parser.add_argument( - "-l", - "--log-file", - help="filename to output all data to", - required=False, - default=None, - type=argparse.FileType("a"), - ) + topic_parser.add_argument("-p", "--partition", required=False, type=int, default=0) + topic_parser.add_argument("-f", "--filter", required=False, action="append") sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command") + sniff_parser = sub_parsers.add_parser( + _SNIFF, help="sniff - broker metadata", parents=[common_options] + ) + sniff_parser.add_argument( + "broker", type=str, help="broker, optionally suffixed with a topic name to filter to" + ) + consumer_parser = argparse.ArgumentParser(add_help=False) consumer_parser.add_argument( "-e", @@ -50,7 +65,7 @@ def main() -> None: ) consumer_mode_parser = sub_parsers.add_parser( - _CONSUME, help="consumer mode", parents=[parent_parser, consumer_parser] + _CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser, common_options] ) consumer_mode_parser.add_argument( "-m", @@ -60,36 +75,70 @@ def main() -> None: required=False, default=1, ) - consumer_mode_parser.add_argument( - "-o", "--offset", help="offset to consume from", type=int, required=False - ) - consumer_mode_parser.add_argument("-s", "--schema", required=False, default="auto", type=str) + consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true") - consumer_mode_parser.add_argument("-p", "--partition", required=False, type=int, default=0) + cg = consumer_mode_parser.add_mutually_exclusive_group(required=False) + cg.add_argument( + "-o", + "--offset", + help="offset to consume from", + type=int, + ) + cg.add_argument( + "-t", + "--timestamp", + help="timestamp to consume from", + type=dateutil_parsable_or_unix_timestamp, + ) - listen_parser = sub_parsers.add_parser( + listen_parser = sub_parsers.add_parser( # noqa: F841 _LISTEN, help="listen mode - listen until KeyboardInterrupt", - parents=[parent_parser, consumer_parser], + parents=[topic_parser, consumer_parser, common_options], + ) + + play_parser = sub_parsers.add_parser( + _PLAY, + help="replay mode - replay data into another topic", + parents=[common_options], + ) + play_parser.add_argument("topics", type=str, nargs=2, help="SRC topic DEST topic") + g = play_parser.add_mutually_exclusive_group(required=True) + g.add_argument( + "-o", + "--offsets", + help="offsets to replay between (inclusive)", + type=int, + nargs=2, + ) + g.add_argument( + "-t", + "--timestamps", + help="timestamps to replay between in ISO8601 or RFC3339 format ie." + ' "2025-11-17 07:00:00 or as a unix timestamp" ', + type=dateutil_parsable_or_unix_timestamp, + nargs=2, ) - listen_parser.add_argument("-p", "--partition", required=False, type=int, default=None) if len(sys.argv) == 1: parser.print_help() sys.exit(1) args = parser.parse_args() - if args.kafka_config is not None: - raise NotImplementedError("-X is not implemented yet.") - - broker, topic = parse_kafka_uri(args.topic) + if args.verbose: + logger.setLevel(logging.DEBUG) if args.log_file: logger.addHandler(logging.FileHandler(args.log_file.name)) + if "kafka_config" in args and args.kafka_config is not None: + raise NotImplementedError("-X is not implemented yet.") + if args.command == _LISTEN: - listen(broker, topic, args.partition) + broker, topic = parse_kafka_uri(args.topic) + listen(broker, topic, args.partition, args.filter) elif args.command == _CONSUME: + broker, topic = parse_kafka_uri(args.topic) consume( broker, topic, @@ -97,7 +146,29 @@ def main() -> None: args.messages, args.offset, args.go_forwards, + args.filter, + args.timestamp, + ) + elif args.command == _PLAY: + src_broker, src_topic = parse_kafka_uri(args.topics[0]) + dest_broker, dest_topic = parse_kafka_uri(args.topics[1]) + + play( + src_broker, + src_topic, + dest_broker, + dest_topic, + args.offsets, + args.timestamps, ) + elif args.command == _SNIFF: + try: + broker, topic = parse_kafka_uri(args.broker) + logger.debug(f"Sniffing single topic {topic} on broker {broker}") + sniff(broker, topic) + except RuntimeError: + logger.debug(f"Sniffing whole broker {args.broker}") + sniff(args.broker) if __name__ == "__main__": diff --git a/src/saluki/play.py b/src/saluki/play.py new file mode 100644 index 0000000..1f058e8 --- /dev/null +++ b/src/saluki/play.py @@ -0,0 +1,84 @@ +import logging +import uuid + +from confluent_kafka import Consumer, Producer, TopicPartition + +logger = logging.getLogger("saluki") + + +def play( + src_broker: str, + src_topic: str, + dest_broker: str, + dest_topic: str, + offsets: list[int] | None, + timestamps: list[int] | None, +) -> None: + """ + Replay data from src_topic to dest_topic between the offsets OR timestamps specified. + This currently assumes contiguous data in a topic (ie. no log compaction) and uses partition 0. + It also does not copy message timestamps. + + :param src_broker: The source broker, including port. + :param src_topic: The topic to replay data from. + :param dest_broker: The destination broker, including port. + :param dest_topic: The topic to replay data to. + :param offsets: The start and finish offsets to replay data from. + :param timestamps: The start and finish timestamps to replay data from. + """ + + consumer = Consumer( + { + "bootstrap.servers": src_broker, + "group.id": f"saluki-play-{uuid.uuid4()}", + } + ) + producer = Producer( + { + "bootstrap.servers": dest_broker, + } + ) + src_partition = 0 + + if timestamps is not None: + logger.debug(f"getting offsets for times: {timestamps[0]} and {timestamps[1]}") + start_offset = consumer.offsets_for_times( + [ + TopicPartition(src_topic, src_partition, timestamps[0]), + ] + )[0] + # See https://github.com/confluentinc/confluent-kafka-python/issues/1178 + # as to why offsets_for_times is called twice. + stop_offset = consumer.offsets_for_times( + [TopicPartition(src_topic, src_partition, timestamps[1])] + )[0] + elif offsets is not None: + start_offset = TopicPartition(src_topic, src_partition, offsets[0]) + stop_offset = TopicPartition(src_topic, src_partition, offsets[1]) + else: + raise ValueError("offsets and timestamps cannot both be None") + + logger.debug(f"start_offset: {start_offset.offset}, stop_offset: {stop_offset.offset}") + + logger.debug(f"assigning to offset {start_offset.offset}") + consumer.assign([start_offset]) + + num_messages = stop_offset.offset - start_offset.offset + 1 + + try: + msgs = consumer.consume(num_messages) + logger.debug(f"finished consuming {num_messages} messages") + consumer.close() + producer.produce_batch( + dest_topic, [{"key": message.key(), "value": message.value()} for message in msgs] + ) + logger.debug(f"flushing producer. len(p): {len(producer)}") + producer.flush(timeout=10) + + logger.debug(f"length after flushing: {len(producer)}") + + except Exception: + logger.exception("Got exception while replaying:") + finally: + logger.debug(f"Closing consumer {consumer}") + consumer.close() diff --git a/src/saluki/sniff.py b/src/saluki/sniff.py new file mode 100644 index 0000000..795e3b0 --- /dev/null +++ b/src/saluki/sniff.py @@ -0,0 +1,40 @@ +import logging +import uuid + +from confluent_kafka import Consumer, TopicPartition +from confluent_kafka.admin import AdminClient + +logger = logging.getLogger("saluki") + + +def sniff(broker: str, topic: str | None = None) -> None: + """ + Prints the broker and topic metadata for a given broker. + If a topic is given, only this topic's partitions and watermarks will be printed. + :param broker: The broker address including port number. + :param topic: Optional topic to filter information to. + """ + a = AdminClient({"bootstrap.servers": broker}) + c = Consumer({"bootstrap.servers": broker, "group.id": f"saluki-sniff-{uuid.uuid4()}"}) + t = a.list_topics(timeout=5) + if topic is not None and topic not in t.topics.keys(): + logger.warning(f"Topic {topic} not found on broker {broker}") + return + + if topic is None: + logger.info(f"Cluster ID: {t.cluster_id}") + logger.info("Brokers:") + for value in t.brokers.values(): + logger.info(f"\t{value}") + + logger.info("Topics:") + + for k, v in t.topics.items(): + if topic is not None and k != topic: + continue + partitions = v.partitions.keys() + logger.info(f"\t{k}:") + for p in partitions: + tp = TopicPartition(k, p) + low, high = c.get_watermark_offsets(tp) + logger.info(f"\t\t{tp.partition} - low:{low}, high:{high}, num_messages:{high - low}") diff --git a/src/saluki/utils.py b/src/saluki/utils.py index 2bed95c..d033a57 100644 --- a/src/saluki/utils.py +++ b/src/saluki/utils.py @@ -1,9 +1,11 @@ import datetime import logging +from argparse import ArgumentTypeError from typing import List, Tuple from zoneinfo import ZoneInfo from confluent_kafka import Message +from dateutil.parser import ParserError, parse from streaming_data_types import DESERIALISERS from streaming_data_types.exceptions import ShortBufferException from streaming_data_types.utils import get_schema @@ -31,7 +33,9 @@ def fallback_deserialiser(payload: bytes) -> str: return schema, ret -def deserialise_and_print_messages(msgs: List[Message], partition: int | None) -> None: +def deserialise_and_print_messages( + msgs: List[Message], partition: int | None, schemas_to_filter_to: list[str] | None = None +) -> None: for msg in msgs: try: if msg is None: @@ -42,8 +46,10 @@ def deserialise_and_print_messages(msgs: List[Message], partition: int | None) - if partition is not None and msg.partition() != partition: continue schema, deserialised = _try_to_deserialise_message(msg.value()) + if schemas_to_filter_to is not None and schema not in schemas_to_filter_to: + continue time = _parse_timestamp(msg) - logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}") + logger.info(f"(o:{msg.offset()},t:{time},s:{schema}) {deserialised}") except Exception as e: logger.exception(f"Got error while deserialising: {e}") @@ -75,8 +81,8 @@ def parse_kafka_uri(uri: str) -> Tuple[str, str]: If username is provided, a SASL mechanism must also be provided. Any other validation must be performed in the calling code. """ - broker, topic = uri.split("/") if "/" in uri else (uri, "") - if not topic: + broker, topic = uri.split("/") if "/" in uri else (uri, None) + if topic is None: raise RuntimeError( f"Unable to parse URI {uri}, topic not defined. URI should be of form" f" broker[:port]/topic" @@ -85,3 +91,22 @@ def parse_kafka_uri(uri: str) -> Tuple[str, str]: broker, topic, ) + + +def dateutil_parsable_or_unix_timestamp(inp: str) -> int: + """ + Parse a dateutil string, if this fails then try to parse a unix timestamp. + This returns a unix timestamp as an int + """ + try: + try: + return int(round(parse(inp).timestamp() * 1000)) + except (ParserError, OverflowError): + logger.debug( + f"Failed to parse {inp} as a dateutil parsable. Falling back to unix timestamp" + ) + return int(inp) + except ValueError: + raise ArgumentTypeError( + f"timestamp {inp} is not parsable by dateutil.parse() and is not a unix timestamp" + ) diff --git a/tests/test_consume.py b/tests/test_consume.py index 6f49d02..d72c061 100644 --- a/tests/test_consume.py +++ b/tests/test_consume.py @@ -82,3 +82,20 @@ def test_consume_but_exception_thrown_consumer_is_closed(): c.return_value.consume.side_effect = Exception consume("somebroker", "sometopic", num_messages=1) c.return_value.close.assert_called_once() + + +@patch("saluki.consume.Consumer") +def test_consume_with_timestamp(mock_consumer): + expected_topic = "sometopic" + partition = 0 + timestamp = 1234 + offset = 2345 + + mock_consumer.offsets_for_times.return_value = [ + TopicPartition(expected_topic, partition, offset) + ] + consume("somebroker", topic=expected_topic, timestamp=timestamp, partition=partition) + + mock_consumer.return_value.assign.assert_called_with( + [TopicPartition(expected_topic, partition, offset)] + ) diff --git a/tests/test_play.py b/tests/test_play.py new file mode 100644 index 0000000..94b49af --- /dev/null +++ b/tests/test_play.py @@ -0,0 +1,103 @@ +from unittest.mock import Mock, patch + +import pytest +from confluent_kafka import Message, TopicPartition + +from saluki.play import play + + +def test_play_with_offsets(): + src_broker = "broker1" + src_topic = "topic1" + dest_broker = "broker2" + dest_topic = "topic2" + offsets = [1, 2] + + message_1 = Mock(spec=Message) + message_1_key = "msg1key" + message_1.key.return_value = message_1_key + message_1_val = "msg1" + message_1.value.return_value = message_1_val + + message_2 = Mock(spec=Message) + message_2_key = "msg2key" + message_2.key.return_value = message_2_key + message_2_val = "msg2" + message_2.value.return_value = message_2_val + + with patch("saluki.play.Consumer") as c, patch("saluki.play.Producer") as p: + consumer_obj = c() + consumer_obj.consume.return_value = [message_1, message_2] + + play(src_broker, src_topic, dest_broker, dest_topic, offsets, None) + + assert consumer_obj.assign.call_args.args[0][0].topic == src_topic + assert consumer_obj.assign.call_args.args[0][0].offset == offsets[0] + + consumer_obj.consume.assert_called_with(2) # stop - start + 1 + + p_obj = p() + produce_batch_call = p_obj.produce_batch.call_args.args + assert dest_topic == produce_batch_call[0] + assert {"key": message_1_key, "value": message_1_val} in produce_batch_call[1] + assert {"key": message_2_key, "value": message_2_val} in produce_batch_call[1] + + +def test_play_with_timestamps(): + src_broker = "broker1" + src_topic = "topic1" + dest_broker = "broker2" + dest_topic = "topic2" + timestamps = [1762444369, 1762444375] + + message_1 = Mock(spec=Message) + message_1_key = "msg1key" + message_1.key.return_value = message_1_key + message_1_val = "msg1" + message_1.value.return_value = message_1_val + + message_2 = Mock(spec=Message) + message_2_key = "msg2key" + message_2.key.return_value = message_2_key + message_2_val = "msg2" + message_2.value.return_value = message_2_val + + with patch("saluki.play.Consumer") as c, patch("saluki.play.Producer") as p: + consumer_obj = c() + consumer_obj.offsets_for_times.side_effect = [ + [TopicPartition(src_topic, partition=0, offset=2)], + [TopicPartition(src_topic, partition=0, offset=3)], + ] + consumer_obj.consume.return_value = [message_1, message_2] + + play(src_broker, src_topic, dest_broker, dest_topic, None, timestamps) + + assert consumer_obj.assign.call_args.args[0][0].topic == src_topic + assert consumer_obj.assign.call_args.args[0][0].offset == 2 + + consumer_obj.consume.assert_called_with(2) # stop - start + 1 + + p_obj = p() + produce_batch_call = p_obj.produce_batch.call_args.args + assert dest_topic == produce_batch_call[0] + assert {"key": message_1_key, "value": message_1_val} in produce_batch_call[1] + assert {"key": message_2_key, "value": message_2_val} in produce_batch_call[1] + + +def test_play_with_exception_when_consuming_consumer_still_closed(): + with ( + patch("saluki.play.Consumer") as mock_consumer, + patch("saluki.play.Producer"), + patch("saluki.play.logger") as mock_logger, + ): + mock_consumer().consume.side_effect = Exception("blah") + play("", "", "", "", [1, 2], None) + + mock_logger.exception.assert_called_once() + + mock_consumer().close.assert_called_once() + + +def test_play_raises_when_offsets_and_timestamps_are_none(): + with pytest.raises(ValueError): + play("", "", "", "", None, None) diff --git a/tests/test_sniff.py b/tests/test_sniff.py new file mode 100644 index 0000000..8c37d87 --- /dev/null +++ b/tests/test_sniff.py @@ -0,0 +1,79 @@ +from unittest.mock import patch + +import pytest +from confluent_kafka.admin import BrokerMetadata, ClusterMetadata, TopicMetadata + +from saluki.sniff import sniff + + +@pytest.fixture() +def fake_cluster_md(): + """ + Returns a fake cluster metadata object with two topics; + one with 1 partition and the other with 2. + """ + fake_cluster_md = ClusterMetadata() + broker1 = BrokerMetadata() + broker1.id = "id1" # type: ignore + broker1.host = "mybroker" # type: ignore + broker1.port = 9093 + fake_cluster_md.brokers = {0: broker1} + + topic1 = TopicMetadata() + topic1.partitions = {0: {}} + + topic2 = TopicMetadata() + topic2.partitions = {0: {}, 1: {}} + + fake_cluster_md.topics = {"topic1": topic1, "topic2": topic2} + return fake_cluster_md + + +def test_sniff_with_two_partitions_in_a_topic(fake_cluster_md): + with ( + patch("saluki.sniff.AdminClient") as a, + patch("saluki.sniff.Consumer") as c, + patch("saluki.sniff.logger") as logger, + ): + a().list_topics.return_value = fake_cluster_md + c().get_watermark_offsets.return_value = 1, 2 + sniff("whatever") + + brokers_call = logger.info.call_args_list[2] + + assert "mybroker:9093/id1" in brokers_call.args[0] + + topic1_call = logger.info.call_args_list[5] + assert "0 - low:1, high:2, num_messages:1" in topic1_call.args[0] + + topic2_call1 = logger.info.call_args_list[7] + assert "0 - low:1, high:2, num_messages:1" in topic2_call1.args[0] + + topic2_call2 = logger.info.call_args_list[8] + assert "1 - low:1, high:2, num_messages:1" in topic2_call2.args[0] + + +def test_sniff_with_single_topic(fake_cluster_md): + with ( + patch("saluki.sniff.AdminClient") as a, + patch("saluki.sniff.Consumer") as c, + patch("saluki.sniff.logger") as logger, + ): + a().list_topics.return_value = fake_cluster_md + c().get_watermark_offsets.return_value = 1, 2 + sniff("mybroker:9093", "topic1") + + assert "\ttopic1" in logger.info.call_args_list[0].args[0] + assert "\t\t0 - low:1, high:2, num_messages:1" in logger.info.call_args_list[1].args[0] + + +def test_sniff_with_single_nonexistent_topic(): + with ( + patch("saluki.sniff.AdminClient") as a, + patch("saluki.sniff.Consumer"), + patch("saluki.sniff.logger") as logger, + ): + # Deliberately blank cluster metadata ie. no topics + a().list_topics.return_value = ClusterMetadata() + sniff("somebroker:9092", "sometopic") + logger.warning.assert_called_with("Topic sometopic not found on broker somebroker:9092") diff --git a/tests/test_utils.py b/tests/test_utils.py index 8f169ca..263c280 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,15 +1,19 @@ +from argparse import ArgumentTypeError from unittest.mock import Mock, patch import pytest from confluent_kafka import Message +from streaming_data_types import serialise_f144 from streaming_data_types.forwarder_config_update_fc00 import ( ConfigurationUpdate, StreamInfo, + serialise_fc00, ) from saluki.utils import ( _parse_timestamp, _try_to_deserialise_message, + dateutil_parsable_or_unix_timestamp, deserialise_and_print_messages, parse_kafka_uri, ) @@ -74,6 +78,23 @@ def test_deserialising_message_which_raises_does_not_stop_loop(mock_message): assert logger.info.call_count == 1 +def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list(mock_message): + with patch("saluki.utils.logger") as logger: + ok_message = Mock(spec=Message) + ok_message.value.return_value = serialise_fc00(config_change=1, streams=[]) # type: ignore + ok_message.error.return_value = False + ok_message.timestamp.return_value = 2, 1 + + mock_message.value.return_value = serialise_f144(source_name="test", value=123) + mock_message.error.return_value = False + mock_message.timestamp.return_value = 2, 1 + + deserialise_and_print_messages( + [mock_message, ok_message], None, schemas_to_filter_to=["fc00"] + ) + assert logger.info.call_count == 1 + + def test_message_that_has_valid_schema_but_empty_payload(): with pytest.raises(Exception): # Empty fc00 message - valid schema but not valid payload @@ -155,3 +176,27 @@ def test_uri_with_no_topic(): test_broker = "some_broker" with pytest.raises(RuntimeError): parse_kafka_uri(test_broker) + + +@pytest.mark.parametrize( + "timestamp", ["2025-11-19T15:27:11", "2025-11-19T15:27:11Z", "2025-11-19T15:27:11+00:00"] +) +def test_parses_datetime_properly_with_string(timestamp): + assert dateutil_parsable_or_unix_timestamp(timestamp) == 1763566031000 + + +@pytest.mark.parametrize( + "timestamp", + [ + "1763566031000", + "1763566031", + "1763566031000000", + ], +) +def test_parses_datetime_properly_and_leaves_unix_timestamp_alone(timestamp): + assert dateutil_parsable_or_unix_timestamp(timestamp) == int(timestamp) + + +def test_invalid_timestamp_raises(): + with pytest.raises(ArgumentTypeError): + dateutil_parsable_or_unix_timestamp("invalid")