From 27ca5609e2d0daf71f6ef744cf4a095d8aecedd5 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Fri, 31 Oct 2025 21:34:17 +0000 Subject: [PATCH 01/23] backing up --- src/saluki/main.py | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/src/saluki/main.py b/src/saluki/main.py index f3b8747..a5363db 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -11,6 +11,7 @@ _LISTEN = "listen" _CONSUME = "consume" +_REPLAY = "replay" def main() -> None: @@ -66,6 +67,8 @@ def main() -> None: consumer_mode_parser.add_argument("-s", "--schema", required=False, default="auto", type=str) consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true") consumer_mode_parser.add_argument("-p", "--partition", required=False, type=int, default=0) + # TODO make this allow multiple comma-split args + consumer_mode_parser.add_argument("-f", "--filter", required=False, type=str, nargs='+') listen_parser = sub_parsers.add_parser( _LISTEN, @@ -73,6 +76,29 @@ def main() -> None: parents=[parent_parser, consumer_parser], ) listen_parser.add_argument("-p", "--partition", required=False, type=int, default=None) + # TODO make filtering work for this as well + + #### NEW FEATURES HERE PLZ + # replay from, to offset + # saluki replay -o FROMOFFSET TOOFFSET srcbroker/srctopic destbroker/desttopic + + # replay from, to timestamp + # saluki replay -t FROMTIMESTAMP TOTIMESTAMP srcbroker/srctopic destbroker/desttopic + + # saluki consume x messages of y schema + # saluki consume -f pl72 mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts + + # saluki consume x messages of y or z schema + # saluki consume -f pl72,6s4t mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts or 6s4t run stops + + replay_parser = sub_parsers.add_parser( + _REPLAY, + help="replay mode - replay data into another topic", + parents=[parent_parser], + ) + replay_parser.add_argument("-o", "--offset", help="replay between offsets", type=bool, required=False, default="store_false") + replay_parser.add_argument("-t", "--timestamp", help="replay between timestamps", type=bool, required=False) + if len(sys.argv) == 1: parser.print_help() @@ -98,7 +124,8 @@ def main() -> None: args.offset, args.go_forwards, ) - + elif args.command == _REPLAY: + pass if __name__ == "__main__": main() From 4b1c8b52a72d81ae3c85e0641acb466acc836efb Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Fri, 31 Oct 2025 22:42:15 +0000 Subject: [PATCH 02/23] backing up --- README.md | 27 ++++++++++++++++++++++-- src/saluki/listen.py | 2 +- src/saluki/main.py | 50 +++++++++++++++++++++++++++++++------------- src/saluki/utils.py | 5 ++++- 4 files changed, 66 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index cae62da..780ded7 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,9 @@ ![](https://github.com/ISISComputingGroup/saluki/blob/main/resources/logo.png) -Serialise/deserialise flatbuffers blobs from kafka. -This currently deserialises https://github.com/ess-dmsc/python-streaming-data-types, but I am working to make it agnostic. Python bindings for the respective schema will need to be generated. +ISIS-specific Kafka tools. +Deserialises [the ESS flatbuffers blobs](https://github.com/ess-dmsc/python-streaming-data-types) from Kafka. + +Also allows replaying data in a topic. # Usage See `saluki --help` for all options. @@ -9,11 +11,32 @@ See `saluki --help` for all options. ## Listen to a topic for updates `saluki listen mybroker:9092/mytopic` - This will listen for updates for `mytopic` on `mybroker`. +### Filter to specific schemas + +TODO + ## Consume from a topic `saluki consume mybroker:9092/mytopic -p 1 -o 123456 -m 10` - This will print 9 messages before (and inclusively the offset specified) offset `123456` of `mytopic` on `mybroker`, in partition 1. Use the `-g` flag to go the other way, ie. in the above example to consume the 9 messages _after_ offset 123456 +### Consume X of a certain schema(s) + +TODO + +## List all topics and their high, low watermarks and number of messages +TODO + +## Replay data from one topic to another + +### Between offsets + +TODO + +### Between timestamps + +TODO + # Install `pip install saluki` diff --git a/src/saluki/listen.py b/src/saluki/listen.py index cf9daa4..817ab86 100644 --- a/src/saluki/listen.py +++ b/src/saluki/listen.py @@ -7,7 +7,7 @@ logger = logging.getLogger("saluki") -def listen(broker: str, topic: str, partition: int | None = None) -> None: +def listen(broker: str, topic: str, partition: int | None = None, filter: list[str] | None = None) -> None: """ Listen to a topic and deserialise each message :param broker: the broker address, including the port diff --git a/src/saluki/main.py b/src/saluki/main.py index a5363db..a41b869 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -11,8 +11,10 @@ _LISTEN = "listen" _CONSUME = "consume" -_REPLAY = "replay" - +_PLAY = "play" +_SNIFF = "sniff" +_BURY = "bury" +_DIG = "dig" def main() -> None: parser = argparse.ArgumentParser( @@ -21,6 +23,8 @@ def main() -> None: ) parent_parser = argparse.ArgumentParser(add_help=False) + #TODO this needs restructuring. consider having a topic_parser with partition options etc. + # because -m and -g does not make sense for saluki listen. parent_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic") parent_parser.add_argument( @@ -41,6 +45,9 @@ def main() -> None: sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command") + sniff_parser = sub_parsers.add_parser(_SNIFF, help="sniff - broker metadata") + sniff_parser.add_argument("broker", type=str) + consumer_parser = argparse.ArgumentParser(add_help=False) consumer_parser.add_argument( "-e", @@ -67,8 +74,7 @@ def main() -> None: consumer_mode_parser.add_argument("-s", "--schema", required=False, default="auto", type=str) consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true") consumer_mode_parser.add_argument("-p", "--partition", required=False, type=int, default=0) - # TODO make this allow multiple comma-split args - consumer_mode_parser.add_argument("-f", "--filter", required=False, type=str, nargs='+') + consumer_mode_parser.add_argument("-f", "--filter", required=False, action="append") listen_parser = sub_parsers.add_parser( _LISTEN, @@ -76,28 +82,36 @@ def main() -> None: parents=[parent_parser, consumer_parser], ) listen_parser.add_argument("-p", "--partition", required=False, type=int, default=None) - # TODO make filtering work for this as well #### NEW FEATURES HERE PLZ # replay from, to offset - # saluki replay -o FROMOFFSET TOOFFSET srcbroker/srctopic destbroker/desttopic + # saluki play -o FROMOFFSET TOOFFSET srcbroker/srctopic destbroker/desttopic # replay from, to timestamp - # saluki replay -t FROMTIMESTAMP TOTIMESTAMP srcbroker/srctopic destbroker/desttopic + # saluki play -t FROMTIMESTAMP TOTIMESTAMP srcbroker/srctopic destbroker/desttopic # saluki consume x messages of y schema # saluki consume -f pl72 mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts # saluki consume x messages of y or z schema - # saluki consume -f pl72,6s4t mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts or 6s4t run stops + # saluki consume -f pl72,6s4t mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts or 6s4t run stops# + + # saluki bury - dump data on topic to file + # saluki bury mybroker:9092/topicname -p 0 -f offsetortimestamp -t offsetortimestamp outputfile + + # saluki dig - push data from dump generated by saluki bury to topic + # saluki dig mybroker:9092/topicname -p 0 outputfile - replay_parser = sub_parsers.add_parser( - _REPLAY, + # saluki sniff - broker metadata ie. topic watermarks and num_messages. + # saluki sniff mybroker:9092 + + play_parser = sub_parsers.add_parser( + _PLAY, help="replay mode - replay data into another topic", parents=[parent_parser], ) - replay_parser.add_argument("-o", "--offset", help="replay between offsets", type=bool, required=False, default="store_false") - replay_parser.add_argument("-t", "--timestamp", help="replay between timestamps", type=bool, required=False) + play_parser.add_argument("-o", "--offset", help="offsets to replay between (inclusive)", type=int, nargs=2) + play_parser.add_argument("-t", "--timestamp", help="timestamps to replay between", type=str, nargs=2) if len(sys.argv) == 1: @@ -114,7 +128,7 @@ def main() -> None: logger.addHandler(logging.FileHandler(args.log_file.name)) if args.command == _LISTEN: - listen(broker, topic, args.partition) + listen(broker, topic, args.partition, args.filter) elif args.command == _CONSUME: consume( broker, @@ -124,7 +138,15 @@ def main() -> None: args.offset, args.go_forwards, ) - elif args.command == _REPLAY: + elif args.command == _PLAY: + pass + #play(src_broker, src_topic, dest_broker, dest_topic, args.offset, args.timestamp) + elif args.command == _SNIFF: + print(args.broker) + pass + elif args.command == _BURY: + pass + elif args.command == _DIG: pass if __name__ == "__main__": diff --git a/src/saluki/utils.py b/src/saluki/utils.py index 2bed95c..3a6beb8 100644 --- a/src/saluki/utils.py +++ b/src/saluki/utils.py @@ -31,7 +31,7 @@ def fallback_deserialiser(payload: bytes) -> str: return schema, ret -def deserialise_and_print_messages(msgs: List[Message], partition: int | None) -> None: +def deserialise_and_print_messages(msgs: List[Message], partition: int | None, filter: list[str] | None) -> None: for msg in msgs: try: if msg is None: @@ -42,6 +42,9 @@ def deserialise_and_print_messages(msgs: List[Message], partition: int | None) - if partition is not None and msg.partition() != partition: continue schema, deserialised = _try_to_deserialise_message(msg.value()) + if filter is not None and schema not in filter: + # ignore + break time = _parse_timestamp(msg) logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}") except Exception as e: From f0f8901383407645c6dd4337669b1dbb359869b8 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Sat, 1 Nov 2025 13:37:05 +0000 Subject: [PATCH 03/23] add sniff, restructure args a bit, remove log-file in favour of just piping --- src/saluki/main.py | 39 +++++++++++++-------------------------- src/saluki/sniff.py | 20 ++++++++++++++++++++ 2 files changed, 33 insertions(+), 26 deletions(-) create mode 100644 src/saluki/sniff.py diff --git a/src/saluki/main.py b/src/saluki/main.py index a41b869..fb8698f 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -4,6 +4,7 @@ from saluki.consume import consume from saluki.listen import listen +from saluki.sniff import sniff from saluki.utils import parse_kafka_uri logger = logging.getLogger("saluki") @@ -22,26 +23,19 @@ def main() -> None: description="serialise/de-serialise flatbuffers and consume/produce from/to kafka", ) - parent_parser = argparse.ArgumentParser(add_help=False) + topic_parser = argparse.ArgumentParser(add_help=False) #TODO this needs restructuring. consider having a topic_parser with partition options etc. # because -m and -g does not make sense for saluki listen. - parent_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic") + topic_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic") - parent_parser.add_argument( + topic_parser.add_argument( "-X", "--kafka-config", help="kafka options to pass through to librdkafka", required=False, default=None, ) - parent_parser.add_argument( - "-l", - "--log-file", - help="filename to output all data to", - required=False, - default=None, - type=argparse.FileType("a"), - ) + topic_parser.add_argument("-p", "--partition", required=False, type=int, default=0) sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command") @@ -58,7 +52,7 @@ def main() -> None: ) consumer_mode_parser = sub_parsers.add_parser( - _CONSUME, help="consumer mode", parents=[parent_parser, consumer_parser] + _CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser] ) consumer_mode_parser.add_argument( "-m", @@ -71,17 +65,14 @@ def main() -> None: consumer_mode_parser.add_argument( "-o", "--offset", help="offset to consume from", type=int, required=False ) - consumer_mode_parser.add_argument("-s", "--schema", required=False, default="auto", type=str) consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true") - consumer_mode_parser.add_argument("-p", "--partition", required=False, type=int, default=0) consumer_mode_parser.add_argument("-f", "--filter", required=False, action="append") - listen_parser = sub_parsers.add_parser( + listen_parser = sub_parsers.add_parser( # noqa: F841 _LISTEN, help="listen mode - listen until KeyboardInterrupt", - parents=[parent_parser, consumer_parser], + parents=[topic_parser, consumer_parser], ) - listen_parser.add_argument("-p", "--partition", required=False, type=int, default=None) #### NEW FEATURES HERE PLZ # replay from, to offset @@ -108,7 +99,7 @@ def main() -> None: play_parser = sub_parsers.add_parser( _PLAY, help="replay mode - replay data into another topic", - parents=[parent_parser], + parents=[topic_parser], ) play_parser.add_argument("-o", "--offset", help="offsets to replay between (inclusive)", type=int, nargs=2) play_parser.add_argument("-t", "--timestamp", help="timestamps to replay between", type=str, nargs=2) @@ -119,17 +110,14 @@ def main() -> None: sys.exit(1) args = parser.parse_args() - if args.kafka_config is not None: + if 'kafka_config' in args and args.kafka_config is not None: raise NotImplementedError("-X is not implemented yet.") - broker, topic = parse_kafka_uri(args.topic) - - if args.log_file: - logger.addHandler(logging.FileHandler(args.log_file.name)) - if args.command == _LISTEN: + broker, topic = parse_kafka_uri(args.topic) listen(broker, topic, args.partition, args.filter) elif args.command == _CONSUME: + broker, topic = parse_kafka_uri(args.topic) consume( broker, topic, @@ -142,8 +130,7 @@ def main() -> None: pass #play(src_broker, src_topic, dest_broker, dest_topic, args.offset, args.timestamp) elif args.command == _SNIFF: - print(args.broker) - pass + sniff(args.broker) elif args.command == _BURY: pass elif args.command == _DIG: diff --git a/src/saluki/sniff.py b/src/saluki/sniff.py new file mode 100644 index 0000000..ad7b498 --- /dev/null +++ b/src/saluki/sniff.py @@ -0,0 +1,20 @@ +from confluent_kafka import Consumer, TopicPartition +from confluent_kafka.admin import AdminClient + +def sniff(broker: str): + a = AdminClient({'bootstrap.servers': broker}) + c = Consumer({'bootstrap.servers': broker, 'group.id': 'saluki-sniff'}) + t = a.list_topics(timeout=5) + print(f"Cluster ID: {t.cluster_id}") + print(f"Brokers:") + [print(f"\t{value}") for value in t.brokers.values()] + + print(f"Topics:") + + for k,v in t.topics.items(): + partitions = v.partitions.keys() + print(f"\t{k}:") + for p in partitions: + tp = TopicPartition(k, p) + low, high = c.get_watermark_offsets(tp) + print(f"\t\tlow:{low}, high:{high}, num_messages:{high-low}") From 7470ef5989ade1d4e5a9e1957cd9e8ec2ca80f22 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Sat, 1 Nov 2025 13:57:06 +0000 Subject: [PATCH 04/23] sort out args for play --- src/saluki/main.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/saluki/main.py b/src/saluki/main.py index fb8698f..b3334d7 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -93,17 +93,15 @@ def main() -> None: # saluki dig - push data from dump generated by saluki bury to topic # saluki dig mybroker:9092/topicname -p 0 outputfile - # saluki sniff - broker metadata ie. topic watermarks and num_messages. - # saluki sniff mybroker:9092 - play_parser = sub_parsers.add_parser( _PLAY, help="replay mode - replay data into another topic", - parents=[topic_parser], + parents=[], ) - play_parser.add_argument("-o", "--offset", help="offsets to replay between (inclusive)", type=int, nargs=2) - play_parser.add_argument("-t", "--timestamp", help="timestamps to replay between", type=str, nargs=2) - + play_parser.add_argument("topics", type=str, nargs=2, help="SRC topic DEST topic") + g = play_parser.add_mutually_exclusive_group(required=True) + g.add_argument("-o", "--offsets", help="offsets to replay between (inclusive)", type=int, nargs=2) + g.add_argument("-t", "--timestamps", help="timestamps to replay between", type=str, nargs=2) if len(sys.argv) == 1: parser.print_help() @@ -127,8 +125,18 @@ def main() -> None: args.go_forwards, ) elif args.command == _PLAY: - pass - #play(src_broker, src_topic, dest_broker, dest_topic, args.offset, args.timestamp) + src_broker, src_topic = parse_kafka_uri(args.topics[0]) + dest_broker, dest_topic = parse_kafka_uri(args.topics[1]) + + print(f"SOURCE BROKER: {src_broker}, SOURCE TOPIC: {src_topic}, DEST BROKER: {dest_broker}, DEST TOPIC: {dest_topic} ") + if args.offsets is not None: + print(f"Replaying {src_broker}/{src_topic} between offsets {args.offsets[0]} and {args.offsets[1]} to {dest_broker}/{dest_topic}") + elif args.timestamps is not None: + print(f"Replaying {src_broker}/{src_topic} between timestamps {args.timestamps[0]} and {args.timestamps[1]} to {dest_broker}/{dest_topic}") + + if input("OK? (y/n)").lower() == 'y': + #play(src_broker, src_topic, dest_broker, dest_topic, args.offset, args.timestamp) + print("replayed") elif args.command == _SNIFF: sniff(args.broker) elif args.command == _BURY: From 2f14433de8f1a31098f73db9c810c289c7aa8304 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Sat, 1 Nov 2025 15:33:18 +0000 Subject: [PATCH 05/23] add skeleton for play() --- src/saluki/consume.py | 2 +- src/saluki/main.py | 7 +++++-- src/saluki/play.py | 12 ++++++++++++ 3 files changed, 18 insertions(+), 3 deletions(-) create mode 100644 src/saluki/play.py diff --git a/src/saluki/consume.py b/src/saluki/consume.py index e1edac8..02ff1cc 100644 --- a/src/saluki/consume.py +++ b/src/saluki/consume.py @@ -29,7 +29,7 @@ def consume( c = Consumer( { "bootstrap.servers": broker, - "group.id": "saluki", + "group.id": "saluki-consume", "session.timeout.ms": 6000, "auto.offset.reset": "latest", "enable.auto.offset.store": False, diff --git a/src/saluki/main.py b/src/saluki/main.py index b3334d7..c355c38 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -4,6 +4,7 @@ from saluki.consume import consume from saluki.listen import listen +from saluki.play import play from saluki.sniff import sniff from saluki.utils import parse_kafka_uri @@ -88,7 +89,8 @@ def main() -> None: # saluki consume -f pl72,6s4t mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts or 6s4t run stops# # saluki bury - dump data on topic to file - # saluki bury mybroker:9092/topicname -p 0 -f offsetortimestamp -t offsetortimestamp outputfile + # saluki bury mybroker:9092/topicname -p 0 -o startoffset finishoffset outputfile + # saluki bury mybroker:9092/topicname -p 0 -t starttimestamp finishtimestamp outputfile # saluki dig - push data from dump generated by saluki bury to topic # saluki dig mybroker:9092/topicname -p 0 outputfile @@ -102,6 +104,7 @@ def main() -> None: g = play_parser.add_mutually_exclusive_group(required=True) g.add_argument("-o", "--offsets", help="offsets to replay between (inclusive)", type=int, nargs=2) g.add_argument("-t", "--timestamps", help="timestamps to replay between", type=str, nargs=2) + g.add_argument("-c", "--chunk", help="forward in chunks. ie to avoid storing a huge list in memory", default=0, type=int, required=False) if len(sys.argv) == 1: parser.print_help() @@ -135,7 +138,7 @@ def main() -> None: print(f"Replaying {src_broker}/{src_topic} between timestamps {args.timestamps[0]} and {args.timestamps[1]} to {dest_broker}/{dest_topic}") if input("OK? (y/n)").lower() == 'y': - #play(src_broker, src_topic, dest_broker, dest_topic, args.offset, args.timestamp) + play(src_broker, src_topic, dest_broker, dest_topic, args.offsets, args.timestamps, args.chunks) print("replayed") elif args.command == _SNIFF: sniff(args.broker) diff --git a/src/saluki/play.py b/src/saluki/play.py new file mode 100644 index 0000000..fc4f243 --- /dev/null +++ b/src/saluki/play.py @@ -0,0 +1,12 @@ +from confluent_kafka import Consumer, Producer + + +def play(src_broker: str, src_topic: str, dest_broker: str, dest_topic: str, offsets:list[int]|None, timestamps:list[int]|None, chunks:int) -> None: + consumer = Consumer( { + "bootstrap.servers": src_broker, + "group.id": "saluki-play", + }) + producer = Producer({ + "bootstrap.servers": dest_broker, + }) + pass From 1394513ba9299df6c8615d3fc204960bc64251fb Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Mon, 3 Nov 2025 14:02:11 +0000 Subject: [PATCH 06/23] docs, formatting, fix listen -f --- README.md | 12 ++++---- src/saluki/consume.py | 4 ++- src/saluki/listen.py | 11 +++++-- src/saluki/main.py | 71 ++++++++++++++++++++++++++++++++----------- src/saluki/play.py | 24 +++++++++++---- src/saluki/sniff.py | 9 +++--- src/saluki/utils.py | 11 ++++--- tests/test_listen.py | 4 ++- 8 files changed, 104 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 780ded7..ceb6d82 100644 --- a/README.md +++ b/README.md @@ -8,14 +8,14 @@ Also allows replaying data in a topic. # Usage See `saluki --help` for all options. -## Listen to a topic for updates +## `listen` - Listen to a topic for updates `saluki listen mybroker:9092/mytopic` - This will listen for updates for `mytopic` on `mybroker`. ### Filter to specific schemas -TODO +`saluki listen mybroker:9092/mytopic -f f144 -f f142` - This will listen for updates but ignore messages with schema IDs of `f142` or `f144` -## Consume from a topic +## `consume`- Consume from a topic `saluki consume mybroker:9092/mytopic -p 1 -o 123456 -m 10` - This will print 9 messages before (and inclusively the offset specified) offset `123456` of `mytopic` on `mybroker`, in partition 1. Use the `-g` flag to go the other way, ie. in the above example to consume the 9 messages _after_ offset 123456 @@ -24,10 +24,10 @@ Use the `-g` flag to go the other way, ie. in the above example to consume the 9 TODO -## List all topics and their high, low watermarks and number of messages -TODO +## `sniff` - List all topics and their high, low watermarks and number of messages +`saluki sniff mybroker:9092` -## Replay data from one topic to another +## `play` - Replay data from one topic to another ### Between offsets diff --git a/src/saluki/consume.py b/src/saluki/consume.py index 02ff1cc..38c710f 100644 --- a/src/saluki/consume.py +++ b/src/saluki/consume.py @@ -47,7 +47,9 @@ def consume( start = offset - num_messages + 1 else: start = ( - c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[1] + c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[ + 1 + ] - num_messages ) diff --git a/src/saluki/listen.py b/src/saluki/listen.py index 817ab86..3ec7911 100644 --- a/src/saluki/listen.py +++ b/src/saluki/listen.py @@ -7,7 +7,12 @@ logger = logging.getLogger("saluki") -def listen(broker: str, topic: str, partition: int | None = None, filter: list[str] | None = None) -> None: +def listen( + broker: str, + topic: str, + partition: int | None = None, + schemas_to_filter_out: list[str] | None = None, +) -> None: """ Listen to a topic and deserialise each message :param broker: the broker address, including the port @@ -30,7 +35,9 @@ def listen(broker: str, topic: str, partition: int | None = None, filter: list[s logger.info(f"listening to {broker}/{topic}") while True: msg = c.poll(1.0) - deserialise_and_print_messages([msg], partition) + deserialise_and_print_messages( + [msg], partition, schemas_to_filter_out=schemas_to_filter_out + ) except KeyboardInterrupt: logger.debug("finished listening") finally: diff --git a/src/saluki/main.py b/src/saluki/main.py index c355c38..6eeeb87 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -18,6 +18,7 @@ _BURY = "bury" _DIG = "dig" + def main() -> None: parser = argparse.ArgumentParser( prog="saluki", @@ -25,9 +26,9 @@ def main() -> None: ) topic_parser = argparse.ArgumentParser(add_help=False) - #TODO this needs restructuring. consider having a topic_parser with partition options etc. - # because -m and -g does not make sense for saluki listen. - topic_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic") + topic_parser.add_argument( + "topic", type=str, help="Kafka topic. format is broker<:port>/topic" + ) topic_parser.add_argument( "-X", @@ -37,8 +38,11 @@ def main() -> None: default=None, ) topic_parser.add_argument("-p", "--partition", required=False, type=int, default=0) + topic_parser.add_argument("-f", "--filter", required=False, action="append") - sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command") + sub_parsers = parser.add_subparsers( + help="sub-command help", required=True, dest="command" + ) sniff_parser = sub_parsers.add_parser(_SNIFF, help="sniff - broker metadata") sniff_parser.add_argument("broker", type=str) @@ -66,10 +70,11 @@ def main() -> None: consumer_mode_parser.add_argument( "-o", "--offset", help="offset to consume from", type=int, required=False ) - consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true") - consumer_mode_parser.add_argument("-f", "--filter", required=False, action="append") + consumer_mode_parser.add_argument( + "-g", "--go-forwards", required=False, action="store_true" + ) - listen_parser = sub_parsers.add_parser( # noqa: F841 + listen_parser = sub_parsers.add_parser( # noqa: F841 _LISTEN, help="listen mode - listen until KeyboardInterrupt", parents=[topic_parser, consumer_parser], @@ -92,7 +97,7 @@ def main() -> None: # saluki bury mybroker:9092/topicname -p 0 -o startoffset finishoffset outputfile # saluki bury mybroker:9092/topicname -p 0 -t starttimestamp finishtimestamp outputfile - # saluki dig - push data from dump generated by saluki bury to topic + # saluki dig - push data from dump generated by saluki bury to topic # saluki dig mybroker:9092/topicname -p 0 outputfile play_parser = sub_parsers.add_parser( @@ -102,16 +107,31 @@ def main() -> None: ) play_parser.add_argument("topics", type=str, nargs=2, help="SRC topic DEST topic") g = play_parser.add_mutually_exclusive_group(required=True) - g.add_argument("-o", "--offsets", help="offsets to replay between (inclusive)", type=int, nargs=2) - g.add_argument("-t", "--timestamps", help="timestamps to replay between", type=str, nargs=2) - g.add_argument("-c", "--chunk", help="forward in chunks. ie to avoid storing a huge list in memory", default=0, type=int, required=False) + g.add_argument( + "-o", + "--offsets", + help="offsets to replay between (inclusive)", + type=int, + nargs=2, + ) + g.add_argument( + "-t", "--timestamps", help="timestamps to replay between", type=str, nargs=2 + ) + g.add_argument( + "-c", + "--chunk", + help="forward in chunks. ie to avoid storing a huge list in memory", + default=0, + type=int, + required=False, + ) if len(sys.argv) == 1: parser.print_help() sys.exit(1) args = parser.parse_args() - if 'kafka_config' in args and args.kafka_config is not None: + if "kafka_config" in args and args.kafka_config is not None: raise NotImplementedError("-X is not implemented yet.") if args.command == _LISTEN: @@ -131,14 +151,28 @@ def main() -> None: src_broker, src_topic = parse_kafka_uri(args.topics[0]) dest_broker, dest_topic = parse_kafka_uri(args.topics[1]) - print(f"SOURCE BROKER: {src_broker}, SOURCE TOPIC: {src_topic}, DEST BROKER: {dest_broker}, DEST TOPIC: {dest_topic} ") + print( + f"SOURCE BROKER: {src_broker}, SOURCE TOPIC: {src_topic}, DEST BROKER: {dest_broker}, DEST TOPIC: {dest_topic} " + ) if args.offsets is not None: - print(f"Replaying {src_broker}/{src_topic} between offsets {args.offsets[0]} and {args.offsets[1]} to {dest_broker}/{dest_topic}") + print( + f"Replaying {src_broker}/{src_topic} between offsets {args.offsets[0]} and {args.offsets[1]} to {dest_broker}/{dest_topic}" + ) elif args.timestamps is not None: - print(f"Replaying {src_broker}/{src_topic} between timestamps {args.timestamps[0]} and {args.timestamps[1]} to {dest_broker}/{dest_topic}") - - if input("OK? (y/n)").lower() == 'y': - play(src_broker, src_topic, dest_broker, dest_topic, args.offsets, args.timestamps, args.chunks) + print( + f"Replaying {src_broker}/{src_topic} between timestamps {args.timestamps[0]} and {args.timestamps[1]} to {dest_broker}/{dest_topic}" + ) + + if input("OK? (y/n)").lower() == "y": + play( + src_broker, + src_topic, + dest_broker, + dest_topic, + args.offsets, + args.timestamps, + args.chunks, + ) print("replayed") elif args.command == _SNIFF: sniff(args.broker) @@ -147,5 +181,6 @@ def main() -> None: elif args.command == _DIG: pass + if __name__ == "__main__": main() diff --git a/src/saluki/play.py b/src/saluki/play.py index fc4f243..4245b5c 100644 --- a/src/saluki/play.py +++ b/src/saluki/play.py @@ -1,12 +1,24 @@ from confluent_kafka import Consumer, Producer -def play(src_broker: str, src_topic: str, dest_broker: str, dest_topic: str, offsets:list[int]|None, timestamps:list[int]|None, chunks:int) -> None: - consumer = Consumer( { +def play( + src_broker: str, + src_topic: str, + dest_broker: str, + dest_topic: str, + offsets: list[int] | None, + timestamps: list[int] | None, + chunks: int, +) -> None: + consumer = Consumer( + { "bootstrap.servers": src_broker, "group.id": "saluki-play", - }) - producer = Producer({ - "bootstrap.servers": dest_broker, - }) + } + ) + producer = Producer( + { + "bootstrap.servers": dest_broker, + } + ) pass diff --git a/src/saluki/sniff.py b/src/saluki/sniff.py index ad7b498..87bf955 100644 --- a/src/saluki/sniff.py +++ b/src/saluki/sniff.py @@ -1,9 +1,10 @@ from confluent_kafka import Consumer, TopicPartition from confluent_kafka.admin import AdminClient + def sniff(broker: str): - a = AdminClient({'bootstrap.servers': broker}) - c = Consumer({'bootstrap.servers': broker, 'group.id': 'saluki-sniff'}) + a = AdminClient({"bootstrap.servers": broker}) + c = Consumer({"bootstrap.servers": broker, "group.id": "saluki-sniff"}) t = a.list_topics(timeout=5) print(f"Cluster ID: {t.cluster_id}") print(f"Brokers:") @@ -11,10 +12,10 @@ def sniff(broker: str): print(f"Topics:") - for k,v in t.topics.items(): + for k, v in t.topics.items(): partitions = v.partitions.keys() print(f"\t{k}:") for p in partitions: tp = TopicPartition(k, p) low, high = c.get_watermark_offsets(tp) - print(f"\t\tlow:{low}, high:{high}, num_messages:{high-low}") + print(f"\t\tlow:{low}, high:{high}, num_messages:{high - low}") diff --git a/src/saluki/utils.py b/src/saluki/utils.py index 3a6beb8..fa33d79 100644 --- a/src/saluki/utils.py +++ b/src/saluki/utils.py @@ -23,7 +23,9 @@ def _try_to_deserialise_message(payload: bytes) -> Tuple[str | None, str | None] def fallback_deserialiser(payload: bytes) -> str: return payload.decode() - deserialiser = DESERIALISERS.get(schema if schema is not None else "", fallback_deserialiser) + deserialiser = DESERIALISERS.get( + schema if schema is not None else "", fallback_deserialiser + ) logger.debug(f"Deserialiser: {deserialiser}") ret = deserialiser(payload) @@ -31,7 +33,9 @@ def fallback_deserialiser(payload: bytes) -> str: return schema, ret -def deserialise_and_print_messages(msgs: List[Message], partition: int | None, filter: list[str] | None) -> None: +def deserialise_and_print_messages( + msgs: List[Message], partition: int | None, schemas_to_filter_out: list[str] | None +) -> None: for msg in msgs: try: if msg is None: @@ -42,8 +46,7 @@ def deserialise_and_print_messages(msgs: List[Message], partition: int | None, f if partition is not None and msg.partition() != partition: continue schema, deserialised = _try_to_deserialise_message(msg.value()) - if filter is not None and schema not in filter: - # ignore + if schemas_to_filter_out is not None and schema in schemas_to_filter_out: break time = _parse_timestamp(msg) logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}") diff --git a/tests/test_listen.py b/tests/test_listen.py index 4ff4c2e..4801cb3 100644 --- a/tests/test_listen.py +++ b/tests/test_listen.py @@ -16,7 +16,9 @@ def test_listen_with_partition_assigns_to_partition(): mock.patch("saluki.listen.Consumer") as c, ): listen("somebroker", "sometopic", partition=expected_partition) - c.return_value.assign.assert_called_with([TopicPartition(topic, expected_partition)]) + c.return_value.assign.assert_called_with( + [TopicPartition(topic, expected_partition)] + ) def test_keyboard_interrupt_causes_consumer_to_close(): From 01299d86677580997234ad28ca777381e9493aaa Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Mon, 3 Nov 2025 16:18:13 +0000 Subject: [PATCH 07/23] play working for offsets --- src/saluki/listen.py | 2 +- src/saluki/main.py | 16 ---------------- src/saluki/play.py | 45 +++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/src/saluki/listen.py b/src/saluki/listen.py index 3ec7911..d78eb8b 100644 --- a/src/saluki/listen.py +++ b/src/saluki/listen.py @@ -23,7 +23,7 @@ def listen( c = Consumer( { "bootstrap.servers": broker, - "group.id": "saluki", + "group.id": "saluki-listen", "auto.offset.reset": "latest", "enable.auto.commit": False, } diff --git a/src/saluki/main.py b/src/saluki/main.py index 6eeeb87..883d4d9 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -93,13 +93,6 @@ def main() -> None: # saluki consume x messages of y or z schema # saluki consume -f pl72,6s4t mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts or 6s4t run stops# - # saluki bury - dump data on topic to file - # saluki bury mybroker:9092/topicname -p 0 -o startoffset finishoffset outputfile - # saluki bury mybroker:9092/topicname -p 0 -t starttimestamp finishtimestamp outputfile - - # saluki dig - push data from dump generated by saluki bury to topic - # saluki dig mybroker:9092/topicname -p 0 outputfile - play_parser = sub_parsers.add_parser( _PLAY, help="replay mode - replay data into another topic", @@ -117,14 +110,6 @@ def main() -> None: g.add_argument( "-t", "--timestamps", help="timestamps to replay between", type=str, nargs=2 ) - g.add_argument( - "-c", - "--chunk", - help="forward in chunks. ie to avoid storing a huge list in memory", - default=0, - type=int, - required=False, - ) if len(sys.argv) == 1: parser.print_help() @@ -171,7 +156,6 @@ def main() -> None: dest_topic, args.offsets, args.timestamps, - args.chunks, ) print("replayed") elif args.command == _SNIFF: diff --git a/src/saluki/play.py b/src/saluki/play.py index 4245b5c..c19d70d 100644 --- a/src/saluki/play.py +++ b/src/saluki/play.py @@ -1,4 +1,7 @@ -from confluent_kafka import Consumer, Producer +import logging +from confluent_kafka import Consumer, Producer, TopicPartition + +logger = logging.getLogger("saluki") def play( @@ -8,8 +11,21 @@ def play( dest_topic: str, offsets: list[int] | None, timestamps: list[int] | None, - chunks: int, ) -> None: + """ + Replay data from src_topic to dest_topic between the offsets OR timestamps specified. + This currently assumes contiguous data in a topic (ie. no log compaction) and only uses partition 0. + + :param src_broker: The source broker, including port. + :param src_topic: The topic to replay data from. + :param dest_broker: The destination broker, including port. + :param dest_topic: The topic to replay data to. + :param offsets: The start and finish offsets to replay data from. + :param timestamps: The start and finish timestamps to replay data from. + """ + + print(f"ARGS: {src_broker}, {src_topic}, {dest_broker}, {dest_topic}, {offsets}, {timestamps}") + consumer = Consumer( { "bootstrap.servers": src_broker, @@ -21,4 +37,27 @@ def play( "bootstrap.servers": dest_broker, } ) - pass + src_partition = 0 + + if timestamps is not None: + start_offset, stop_offset = consumer.offsets_for_times([ + TopicPartition(src_topic, src_partition, timestamps[0]), + TopicPartition(src_topic, src_partition, timestamps[1]), + ]) + else: + start_offset = TopicPartition(src_topic, src_partition, offsets[0]) + stop_offset = TopicPartition(src_topic, src_partition, offsets[1]) + consumer.assign([start_offset]) + + num_messages = stop_offset.offset - start_offset.offset + 1 + + try: + msgs = consumer.consume(num_messages) + [producer.produce(dest_topic, message.value(), message.key()) for message in msgs] + producer.flush() + except Exception: + logger.exception("Got exception while consuming:") + finally: + logger.debug(f"Closing consumer {consumer}") + consumer.close() + From 6a90c6e65f667d5bcc1a3f8336217c432a3ad777 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Tue, 4 Nov 2025 09:52:11 +0000 Subject: [PATCH 08/23] updating readme with new commands --- README.md | 6 ++--- src/saluki/consume.py | 4 +--- src/saluki/main.py | 54 +++++++++---------------------------------- src/saluki/play.py | 16 ++++++------- src/saluki/sniff.py | 6 ++--- src/saluki/utils.py | 4 +--- tests/test_listen.py | 4 +--- 7 files changed, 28 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index ceb6d82..a521773 100644 --- a/README.md +++ b/README.md @@ -31,15 +31,15 @@ TODO ### Between offsets -TODO +`saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -o 123 125` - This will forward messages at offset 123, 124 and 125 in the `source_topic` to the `dest_topic` ### Between timestamps -TODO +`saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps. # Install `pip install saluki` ## Developer setup -`pip install .[dev]` +`pip install -e .[dev]` diff --git a/src/saluki/consume.py b/src/saluki/consume.py index 38c710f..02ff1cc 100644 --- a/src/saluki/consume.py +++ b/src/saluki/consume.py @@ -47,9 +47,7 @@ def consume( start = offset - num_messages + 1 else: start = ( - c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[ - 1 - ] + c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[1] - num_messages ) diff --git a/src/saluki/main.py b/src/saluki/main.py index 883d4d9..0591848 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -15,8 +15,6 @@ _CONSUME = "consume" _PLAY = "play" _SNIFF = "sniff" -_BURY = "bury" -_DIG = "dig" def main() -> None: @@ -26,9 +24,7 @@ def main() -> None: ) topic_parser = argparse.ArgumentParser(add_help=False) - topic_parser.add_argument( - "topic", type=str, help="Kafka topic. format is broker<:port>/topic" - ) + topic_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic") topic_parser.add_argument( "-X", @@ -40,9 +36,7 @@ def main() -> None: topic_parser.add_argument("-p", "--partition", required=False, type=int, default=0) topic_parser.add_argument("-f", "--filter", required=False, action="append") - sub_parsers = parser.add_subparsers( - help="sub-command help", required=True, dest="command" - ) + sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command") sniff_parser = sub_parsers.add_parser(_SNIFF, help="sniff - broker metadata") sniff_parser.add_argument("broker", type=str) @@ -70,9 +64,7 @@ def main() -> None: consumer_mode_parser.add_argument( "-o", "--offset", help="offset to consume from", type=int, required=False ) - consumer_mode_parser.add_argument( - "-g", "--go-forwards", required=False, action="store_true" - ) + consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true") listen_parser = sub_parsers.add_parser( # noqa: F841 _LISTEN, @@ -81,12 +73,6 @@ def main() -> None: ) #### NEW FEATURES HERE PLZ - # replay from, to offset - # saluki play -o FROMOFFSET TOOFFSET srcbroker/srctopic destbroker/desttopic - - # replay from, to timestamp - # saluki play -t FROMTIMESTAMP TOTIMESTAMP srcbroker/srctopic destbroker/desttopic - # saluki consume x messages of y schema # saluki consume -f pl72 mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts @@ -108,7 +94,7 @@ def main() -> None: nargs=2, ) g.add_argument( - "-t", "--timestamps", help="timestamps to replay between", type=str, nargs=2 + "-t", "--timestamps", help="unix timestamps to replay between", type=str, nargs=2 ) if len(sys.argv) == 1: @@ -136,34 +122,16 @@ def main() -> None: src_broker, src_topic = parse_kafka_uri(args.topics[0]) dest_broker, dest_topic = parse_kafka_uri(args.topics[1]) - print( - f"SOURCE BROKER: {src_broker}, SOURCE TOPIC: {src_topic}, DEST BROKER: {dest_broker}, DEST TOPIC: {dest_topic} " + play( + src_broker, + src_topic, + dest_broker, + dest_topic, + args.offsets, + args.timestamps, ) - if args.offsets is not None: - print( - f"Replaying {src_broker}/{src_topic} between offsets {args.offsets[0]} and {args.offsets[1]} to {dest_broker}/{dest_topic}" - ) - elif args.timestamps is not None: - print( - f"Replaying {src_broker}/{src_topic} between timestamps {args.timestamps[0]} and {args.timestamps[1]} to {dest_broker}/{dest_topic}" - ) - - if input("OK? (y/n)").lower() == "y": - play( - src_broker, - src_topic, - dest_broker, - dest_topic, - args.offsets, - args.timestamps, - ) - print("replayed") elif args.command == _SNIFF: sniff(args.broker) - elif args.command == _BURY: - pass - elif args.command == _DIG: - pass if __name__ == "__main__": diff --git a/src/saluki/play.py b/src/saluki/play.py index c19d70d..ef25712 100644 --- a/src/saluki/play.py +++ b/src/saluki/play.py @@ -1,4 +1,5 @@ import logging + from confluent_kafka import Consumer, Producer, TopicPartition logger = logging.getLogger("saluki") @@ -14,7 +15,7 @@ def play( ) -> None: """ Replay data from src_topic to dest_topic between the offsets OR timestamps specified. - This currently assumes contiguous data in a topic (ie. no log compaction) and only uses partition 0. + This currently assumes contiguous data in a topic (ie. no log compaction) and uses partition 0. :param src_broker: The source broker, including port. :param src_topic: The topic to replay data from. @@ -24,8 +25,6 @@ def play( :param timestamps: The start and finish timestamps to replay data from. """ - print(f"ARGS: {src_broker}, {src_topic}, {dest_broker}, {dest_topic}, {offsets}, {timestamps}") - consumer = Consumer( { "bootstrap.servers": src_broker, @@ -40,10 +39,12 @@ def play( src_partition = 0 if timestamps is not None: - start_offset, stop_offset = consumer.offsets_for_times([ - TopicPartition(src_topic, src_partition, timestamps[0]), - TopicPartition(src_topic, src_partition, timestamps[1]), - ]) + start_offset, stop_offset = consumer.offsets_for_times( + [ + TopicPartition(src_topic, src_partition, timestamps[0]), + TopicPartition(src_topic, src_partition, timestamps[1]), + ] + ) else: start_offset = TopicPartition(src_topic, src_partition, offsets[0]) stop_offset = TopicPartition(src_topic, src_partition, offsets[1]) @@ -60,4 +61,3 @@ def play( finally: logger.debug(f"Closing consumer {consumer}") consumer.close() - diff --git a/src/saluki/sniff.py b/src/saluki/sniff.py index 87bf955..fac6ccb 100644 --- a/src/saluki/sniff.py +++ b/src/saluki/sniff.py @@ -2,15 +2,15 @@ from confluent_kafka.admin import AdminClient -def sniff(broker: str): +def sniff(broker: str) -> None: a = AdminClient({"bootstrap.servers": broker}) c = Consumer({"bootstrap.servers": broker, "group.id": "saluki-sniff"}) t = a.list_topics(timeout=5) print(f"Cluster ID: {t.cluster_id}") - print(f"Brokers:") + print("Brokers:") [print(f"\t{value}") for value in t.brokers.values()] - print(f"Topics:") + print("Topics:") for k, v in t.topics.items(): partitions = v.partitions.keys() diff --git a/src/saluki/utils.py b/src/saluki/utils.py index fa33d79..b41e187 100644 --- a/src/saluki/utils.py +++ b/src/saluki/utils.py @@ -23,9 +23,7 @@ def _try_to_deserialise_message(payload: bytes) -> Tuple[str | None, str | None] def fallback_deserialiser(payload: bytes) -> str: return payload.decode() - deserialiser = DESERIALISERS.get( - schema if schema is not None else "", fallback_deserialiser - ) + deserialiser = DESERIALISERS.get(schema if schema is not None else "", fallback_deserialiser) logger.debug(f"Deserialiser: {deserialiser}") ret = deserialiser(payload) diff --git a/tests/test_listen.py b/tests/test_listen.py index 4801cb3..4ff4c2e 100644 --- a/tests/test_listen.py +++ b/tests/test_listen.py @@ -16,9 +16,7 @@ def test_listen_with_partition_assigns_to_partition(): mock.patch("saluki.listen.Consumer") as c, ): listen("somebroker", "sometopic", partition=expected_partition) - c.return_value.assign.assert_called_with( - [TopicPartition(topic, expected_partition)] - ) + c.return_value.assign.assert_called_with([TopicPartition(topic, expected_partition)]) def test_keyboard_interrupt_causes_consumer_to_close(): From b34c9c466cb9c9c4308a678f8f3ed80d5d4f7e62 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Tue, 4 Nov 2025 16:29:09 +0000 Subject: [PATCH 09/23] allow filtering on consume --- README.md | 4 +--- src/saluki/consume.py | 4 +++- src/saluki/listen.py | 5 +++-- src/saluki/main.py | 14 +------------- src/saluki/utils.py | 4 ++-- 5 files changed, 10 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index a521773..cf97ef7 100644 --- a/README.md +++ b/README.md @@ -20,9 +20,7 @@ See `saluki --help` for all options. Use the `-g` flag to go the other way, ie. in the above example to consume the 9 messages _after_ offset 123456 -### Consume X of a certain schema(s) - -TODO +You can also filter out messages to specific schema(s) with the `-f` flag, like the example above for `listen`. ## `sniff` - List all topics and their high, low watermarks and number of messages `saluki sniff mybroker:9092` diff --git a/src/saluki/consume.py b/src/saluki/consume.py index 02ff1cc..875fa31 100644 --- a/src/saluki/consume.py +++ b/src/saluki/consume.py @@ -14,6 +14,7 @@ def consume( num_messages: int = 1, offset: int | None = None, go_forwards: bool = False, + schemas_to_filter_to: list[str] | None = None, ) -> None: """ consume from a topic and deserialise each message @@ -24,6 +25,7 @@ def consume( :param num_messages: number of messages to consume :param offset: offset to consume from/to :param go_forwards: whether to consume forwards or backwards + :param schemas_to_filter_to: schemas in messages to filter to :return: None """ c = Consumer( @@ -57,7 +59,7 @@ def consume( try: logger.info(f"Consuming {num_messages} messages") msgs = c.consume(num_messages) - deserialise_and_print_messages(msgs, partition) + deserialise_and_print_messages(msgs, partition, schemas_to_filter_to) except Exception: logger.exception("Got exception while consuming:") finally: diff --git a/src/saluki/listen.py b/src/saluki/listen.py index d78eb8b..7fccb14 100644 --- a/src/saluki/listen.py +++ b/src/saluki/listen.py @@ -11,13 +11,14 @@ def listen( broker: str, topic: str, partition: int | None = None, - schemas_to_filter_out: list[str] | None = None, + schemas_to_filter_to: list[str] | None = None, ) -> None: """ Listen to a topic and deserialise each message :param broker: the broker address, including the port :param topic: the topic to use :param partition: the partition to listen to (default is all partitions in a given topic) + :param schemas_to_filter_to: schemas to filter when listening to messages :return: None """ c = Consumer( @@ -36,7 +37,7 @@ def listen( while True: msg = c.poll(1.0) deserialise_and_print_messages( - [msg], partition, schemas_to_filter_out=schemas_to_filter_out + [msg], partition, schemas_to_filter_to=schemas_to_filter_to ) except KeyboardInterrupt: logger.debug("finished listening") diff --git a/src/saluki/main.py b/src/saluki/main.py index 0591848..46a026c 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -72,13 +72,6 @@ def main() -> None: parents=[topic_parser, consumer_parser], ) - #### NEW FEATURES HERE PLZ - # saluki consume x messages of y schema - # saluki consume -f pl72 mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts - - # saluki consume x messages of y or z schema - # saluki consume -f pl72,6s4t mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts or 6s4t run stops# - play_parser = sub_parsers.add_parser( _PLAY, help="replay mode - replay data into another topic", @@ -111,12 +104,7 @@ def main() -> None: elif args.command == _CONSUME: broker, topic = parse_kafka_uri(args.topic) consume( - broker, - topic, - args.partition, - args.messages, - args.offset, - args.go_forwards, + broker, topic, args.partition, args.messages, args.offset, args.go_forwards, args.filter ) elif args.command == _PLAY: src_broker, src_topic = parse_kafka_uri(args.topics[0]) diff --git a/src/saluki/utils.py b/src/saluki/utils.py index b41e187..4e7e638 100644 --- a/src/saluki/utils.py +++ b/src/saluki/utils.py @@ -32,7 +32,7 @@ def fallback_deserialiser(payload: bytes) -> str: def deserialise_and_print_messages( - msgs: List[Message], partition: int | None, schemas_to_filter_out: list[str] | None + msgs: List[Message], partition: int | None, schemas_to_filter_to: list[str] | None = None ) -> None: for msg in msgs: try: @@ -44,7 +44,7 @@ def deserialise_and_print_messages( if partition is not None and msg.partition() != partition: continue schema, deserialised = _try_to_deserialise_message(msg.value()) - if schemas_to_filter_out is not None and schema in schemas_to_filter_out: + if schemas_to_filter_to is not None and schema in schemas_to_filter_to: break time = _parse_timestamp(msg) logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}") From 18d81cf7617baae3c82f17912027a7bb74d14ddb Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Tue, 4 Nov 2025 16:44:02 +0000 Subject: [PATCH 10/23] add log file arg back, use logger for sniff --- src/saluki/main.py | 11 +++++++++++ src/saluki/sniff.py | 16 ++++++++++------ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/saluki/main.py b/src/saluki/main.py index 46a026c..d7beda4 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -22,6 +22,14 @@ def main() -> None: prog="saluki", description="serialise/de-serialise flatbuffers and consume/produce from/to kafka", ) + parser.add_argument( + "-l", + "--log-file", + help="filename to output all data to", + required=False, + default=None, + type=argparse.FileType("a"), + ) topic_parser = argparse.ArgumentParser(add_help=False) topic_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic") @@ -95,6 +103,9 @@ def main() -> None: sys.exit(1) args = parser.parse_args() + if args.log_file: + logger.addHandler(logging.FileHandler(args.log_file.name)) + if "kafka_config" in args and args.kafka_config is not None: raise NotImplementedError("-X is not implemented yet.") diff --git a/src/saluki/sniff.py b/src/saluki/sniff.py index fac6ccb..3f2bf43 100644 --- a/src/saluki/sniff.py +++ b/src/saluki/sniff.py @@ -1,21 +1,25 @@ +import logging + from confluent_kafka import Consumer, TopicPartition from confluent_kafka.admin import AdminClient +logger = logging.getLogger("saluki") + def sniff(broker: str) -> None: a = AdminClient({"bootstrap.servers": broker}) c = Consumer({"bootstrap.servers": broker, "group.id": "saluki-sniff"}) t = a.list_topics(timeout=5) - print(f"Cluster ID: {t.cluster_id}") - print("Brokers:") - [print(f"\t{value}") for value in t.brokers.values()] + logger.info(f"Cluster ID: {t.cluster_id}") + logger.info("Brokers:") + [logger.info(f"\t{value}") for value in t.brokers.values()] - print("Topics:") + logger.info("Topics:") for k, v in t.topics.items(): partitions = v.partitions.keys() - print(f"\t{k}:") + logger.info(f"\t{k}:") for p in partitions: tp = TopicPartition(k, p) low, high = c.get_watermark_offsets(tp) - print(f"\t\tlow:{low}, high:{high}, num_messages:{high - low}") + logger.info(f"\t\tlow:{low}, high:{high}, num_messages:{high - low}") From 54579a45a8a6aed182326a7766936e26139ea768 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Wed, 5 Nov 2025 15:40:21 +0000 Subject: [PATCH 11/23] add tests for utils and start for sniff --- src/saluki/utils.py | 4 ++-- tests/test_sniff.py | 24 ++++++++++++++++++++++++ tests/test_utils.py | 17 +++++++++++++++++ 3 files changed, 43 insertions(+), 2 deletions(-) create mode 100644 tests/test_sniff.py diff --git a/src/saluki/utils.py b/src/saluki/utils.py index 4e7e638..790d94d 100644 --- a/src/saluki/utils.py +++ b/src/saluki/utils.py @@ -44,8 +44,8 @@ def deserialise_and_print_messages( if partition is not None and msg.partition() != partition: continue schema, deserialised = _try_to_deserialise_message(msg.value()) - if schemas_to_filter_to is not None and schema in schemas_to_filter_to: - break + if schemas_to_filter_to is not None and schema not in schemas_to_filter_to: + continue time = _parse_timestamp(msg) logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}") except Exception as e: diff --git a/tests/test_sniff.py b/tests/test_sniff.py new file mode 100644 index 0000000..63976f5 --- /dev/null +++ b/tests/test_sniff.py @@ -0,0 +1,24 @@ +from unittest.mock import patch + +from confluent_kafka.admin import ClusterMetadata, BrokerMetadata, TopicMetadata + +from saluki.sniff import sniff + +def test_sniff_with_two_partitions_in_a_topic(): + with patch("saluki.sniff.AdminClient") as a, patch("saluki.sniff.Consumer") as c, patch("saluki.sniff.logger") as logger: + fake_cluster_md = ClusterMetadata() + broker1 = BrokerMetadata() + broker2 = BrokerMetadata() + fake_cluster_md.brokers = {0: broker1, 1: broker2} + + topic1 = TopicMetadata() + topic2 = TopicMetadata() + + fake_cluster_md.topics = { + "topic1": topic1, + "topic2": topic2 + } + a.list_topics.return_value = fake_cluster_md + sniff("whatever") + + # TODO \ No newline at end of file diff --git a/tests/test_utils.py b/tests/test_utils.py index 8f169ca..4f2e484 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,9 +2,11 @@ import pytest from confluent_kafka import Message +from streaming_data_types import serialise_f144 from streaming_data_types.forwarder_config_update_fc00 import ( ConfigurationUpdate, StreamInfo, + serialise_fc00 ) from saluki.utils import ( @@ -74,6 +76,21 @@ def test_deserialising_message_which_raises_does_not_stop_loop(mock_message): assert logger.info.call_count == 1 +def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list(mock_message): + with patch("saluki.utils.logger") as logger: + ok_message = Mock(spec=Message) + ok_message.value.return_value = serialise_fc00(config_change=1, streams=[]) + ok_message.error.return_value = False + ok_message.timestamp.return_value = 2, 1 + + mock_message.value.return_value = serialise_f144(source_name="test", value=123) + mock_message.error.return_value = False + mock_message.timestamp.return_value = 2, 1 + + deserialise_and_print_messages([mock_message, ok_message], None, schemas_to_filter_to=["fc00"]) + assert logger.info.call_count == 1 + + def test_message_that_has_valid_schema_but_empty_payload(): with pytest.raises(Exception): # Empty fc00 message - valid schema but not valid payload From 11356a1bdb9529526f111b43e5ce3d1c5dd24e96 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Wed, 5 Nov 2025 21:47:50 +0000 Subject: [PATCH 12/23] finish test for sniff --- src/saluki/sniff.py | 2 +- tests/test_sniff.py | 25 +++++++++++++++++++++---- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/saluki/sniff.py b/src/saluki/sniff.py index 3f2bf43..1f4572a 100644 --- a/src/saluki/sniff.py +++ b/src/saluki/sniff.py @@ -22,4 +22,4 @@ def sniff(broker: str) -> None: for p in partitions: tp = TopicPartition(k, p) low, high = c.get_watermark_offsets(tp) - logger.info(f"\t\tlow:{low}, high:{high}, num_messages:{high - low}") + logger.info(f"\t\t{tp.partition} - low:{low}, high:{high}, num_messages:{high - low}") diff --git a/tests/test_sniff.py b/tests/test_sniff.py index 63976f5..ce209ef 100644 --- a/tests/test_sniff.py +++ b/tests/test_sniff.py @@ -8,17 +8,34 @@ def test_sniff_with_two_partitions_in_a_topic(): with patch("saluki.sniff.AdminClient") as a, patch("saluki.sniff.Consumer") as c, patch("saluki.sniff.logger") as logger: fake_cluster_md = ClusterMetadata() broker1 = BrokerMetadata() - broker2 = BrokerMetadata() - fake_cluster_md.brokers = {0: broker1, 1: broker2} + broker1.id = "id1" + broker1.host = "mybroker" + broker1.port = 9093 + fake_cluster_md.brokers = {0: broker1} topic1 = TopicMetadata() + topic1.partitions = {0:{}} topic2 = TopicMetadata() + topic2.partitions = {0:{}, 1:{}} fake_cluster_md.topics = { "topic1": topic1, "topic2": topic2 } - a.list_topics.return_value = fake_cluster_md + a().list_topics.return_value = fake_cluster_md + c().get_watermark_offsets.return_value = 1,2 sniff("whatever") - # TODO \ No newline at end of file + brokers_call = logger.info.call_args_list[2] + + assert "mybroker:9093/id1" in brokers_call.args[0] + + topic1_call = logger.info.call_args_list[5] + assert "0 - low:1, high:2, num_messages:1" in topic1_call.args[0] + + topic2_call1 = logger.info.call_args_list[7] + assert "0 - low:1, high:2, num_messages:1" in topic2_call1.args[0] + + topic2_call2 = logger.info.call_args_list[8] + assert "1 - low:1, high:2, num_messages:1" in topic2_call2.args[0] + From 143a67396d22bb8a26d963a180dbed784fc86cd2 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Wed, 5 Nov 2025 21:48:26 +0000 Subject: [PATCH 13/23] ruff --- tests/test_sniff.py | 21 +++++++++++---------- tests/test_utils.py | 6 ++++-- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/tests/test_sniff.py b/tests/test_sniff.py index ce209ef..241cd96 100644 --- a/tests/test_sniff.py +++ b/tests/test_sniff.py @@ -1,11 +1,16 @@ from unittest.mock import patch -from confluent_kafka.admin import ClusterMetadata, BrokerMetadata, TopicMetadata +from confluent_kafka.admin import BrokerMetadata, ClusterMetadata, TopicMetadata from saluki.sniff import sniff + def test_sniff_with_two_partitions_in_a_topic(): - with patch("saluki.sniff.AdminClient") as a, patch("saluki.sniff.Consumer") as c, patch("saluki.sniff.logger") as logger: + with ( + patch("saluki.sniff.AdminClient") as a, + patch("saluki.sniff.Consumer") as c, + patch("saluki.sniff.logger") as logger, + ): fake_cluster_md = ClusterMetadata() broker1 = BrokerMetadata() broker1.id = "id1" @@ -14,16 +19,13 @@ def test_sniff_with_two_partitions_in_a_topic(): fake_cluster_md.brokers = {0: broker1} topic1 = TopicMetadata() - topic1.partitions = {0:{}} + topic1.partitions = {0: {}} topic2 = TopicMetadata() - topic2.partitions = {0:{}, 1:{}} + topic2.partitions = {0: {}, 1: {}} - fake_cluster_md.topics = { - "topic1": topic1, - "topic2": topic2 - } + fake_cluster_md.topics = {"topic1": topic1, "topic2": topic2} a().list_topics.return_value = fake_cluster_md - c().get_watermark_offsets.return_value = 1,2 + c().get_watermark_offsets.return_value = 1, 2 sniff("whatever") brokers_call = logger.info.call_args_list[2] @@ -38,4 +40,3 @@ def test_sniff_with_two_partitions_in_a_topic(): topic2_call2 = logger.info.call_args_list[8] assert "1 - low:1, high:2, num_messages:1" in topic2_call2.args[0] - diff --git a/tests/test_utils.py b/tests/test_utils.py index 4f2e484..372d2ab 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -6,7 +6,7 @@ from streaming_data_types.forwarder_config_update_fc00 import ( ConfigurationUpdate, StreamInfo, - serialise_fc00 + serialise_fc00, ) from saluki.utils import ( @@ -87,7 +87,9 @@ def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list mock_message.error.return_value = False mock_message.timestamp.return_value = 2, 1 - deserialise_and_print_messages([mock_message, ok_message], None, schemas_to_filter_to=["fc00"]) + deserialise_and_print_messages( + [mock_message, ok_message], None, schemas_to_filter_to=["fc00"] + ) assert logger.info.call_count == 1 From be1360797f82ec3cf262d254177ffa0e184e2e46 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Fri, 7 Nov 2025 09:49:56 +0000 Subject: [PATCH 14/23] add tests for play --- src/saluki/play.py | 3 +- src/saluki/sniff.py | 4 ++ tests/test_play.py | 97 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 tests/test_play.py diff --git a/src/saluki/play.py b/src/saluki/play.py index ef25712..aaf99a5 100644 --- a/src/saluki/play.py +++ b/src/saluki/play.py @@ -51,13 +51,14 @@ def play( consumer.assign([start_offset]) num_messages = stop_offset.offset - start_offset.offset + 1 + msgs = [] try: msgs = consumer.consume(num_messages) [producer.produce(dest_topic, message.value(), message.key()) for message in msgs] producer.flush() except Exception: - logger.exception("Got exception while consuming:") + logger.exception("Got exception while replaying:") finally: logger.debug(f"Closing consumer {consumer}") consumer.close() diff --git a/src/saluki/sniff.py b/src/saluki/sniff.py index 1f4572a..562f3ae 100644 --- a/src/saluki/sniff.py +++ b/src/saluki/sniff.py @@ -7,6 +7,10 @@ def sniff(broker: str) -> None: + """ + Prints the broker and topic metadata for a given broker. + :param broker: The broker address including port number. + """ a = AdminClient({"bootstrap.servers": broker}) c = Consumer({"bootstrap.servers": broker, "group.id": "saluki-sniff"}) t = a.list_topics(timeout=5) diff --git a/tests/test_play.py b/tests/test_play.py new file mode 100644 index 0000000..5ba507e --- /dev/null +++ b/tests/test_play.py @@ -0,0 +1,97 @@ +from unittest.mock import Mock, patch + +from confluent_kafka import Message, TopicPartition + +from saluki.play import play + + +def test_play_with_offsets(): + src_broker = "broker1" + src_topic = "topic1" + dest_broker = "broker2" + dest_topic = "topic2" + offsets = [1, 2] + + message_1 = Mock(spec=Message) + message_1_key = "msg1key" + message_1.key.return_value = message_1_key + message_1_val = "msg1" + message_1.value.return_value = message_1_val + + message_2 = Mock(spec=Message) + message_2_key = "msg2key" + message_2.key.return_value = message_2_key + message_2_val = "msg2" + message_2.value.return_value = message_2_val + + with patch("saluki.play.Consumer") as c, patch("saluki.play.Producer") as p: + consumer_obj = c() + consumer_obj.consume.return_value = [message_1, message_2] + + play(src_broker, src_topic, dest_broker, dest_topic, offsets, None) + + assert consumer_obj.assign.call_args.args[0][0].topic == src_topic + assert consumer_obj.assign.call_args.args[0][0].offset == offsets[0] + + consumer_obj.consume.assert_called_with(2) # stop - start + 1 + + p_obj = p() + call_1 = p_obj.produce.call_args_list[0] + assert call_1.args == (dest_topic, message_1_val, message_1_key) + call_2 = p_obj.produce.call_args_list[1] + assert call_2.args == (dest_topic, message_2_val, message_2_key) + + +def test_play_with_timestamps(): + src_broker = "broker1" + src_topic = "topic1" + dest_broker = "broker2" + dest_topic = "topic2" + timestamps = [1762444369, 1762444375] + + message_1 = Mock(spec=Message) + message_1_key = "msg1key" + message_1.key.return_value = message_1_key + message_1_val = "msg1" + message_1.value.return_value = message_1_val + + message_2 = Mock(spec=Message) + message_2_key = "msg2key" + message_2.key.return_value = message_2_key + message_2_val = "msg2" + message_2.value.return_value = message_2_val + + with patch("saluki.play.Consumer") as c, patch("saluki.play.Producer") as p: + consumer_obj = c() + consumer_obj.offsets_for_times.return_value = [ + TopicPartition(src_topic, partition=0, offset=2), + TopicPartition(src_topic, partition=0, offset=3), + ] + consumer_obj.consume.return_value = [message_1, message_2] + + play(src_broker, src_topic, dest_broker, dest_topic, None, timestamps) + + assert consumer_obj.assign.call_args.args[0][0].topic == src_topic + assert consumer_obj.assign.call_args.args[0][0].offset == 2 + + consumer_obj.consume.assert_called_with(2) # stop - start + 1 + + p_obj = p() + call_1 = p_obj.produce.call_args_list[0] + assert call_1.args == (dest_topic, message_1_val, message_1_key) + call_2 = p_obj.produce.call_args_list[1] + assert call_2.args == (dest_topic, message_2_val, message_2_key) + + +def test_play_with_exception_when_consuming_consumer_still_closed(): + with ( + patch("saluki.play.Consumer") as mock_consumer, + patch("saluki.play.Producer"), + patch("saluki.play.logger") as mock_logger, + ): + mock_consumer().consume.side_effect = Exception("blah") + play("", "", "", "", [1, 2], None) + + mock_logger.exception.assert_called_once() + + mock_consumer().close.assert_called_once() From 9d5566d00fad626b5c8f7f82ce8c3eb2c23a8555 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Fri, 7 Nov 2025 10:04:28 +0000 Subject: [PATCH 15/23] pyright fixes --- src/saluki/play.py | 6 ++++-- tests/test_play.py | 6 ++++++ tests/test_sniff.py | 4 ++-- tests/test_utils.py | 2 +- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/saluki/play.py b/src/saluki/play.py index aaf99a5..056f434 100644 --- a/src/saluki/play.py +++ b/src/saluki/play.py @@ -45,13 +45,15 @@ def play( TopicPartition(src_topic, src_partition, timestamps[1]), ] ) - else: + elif offsets is not None: start_offset = TopicPartition(src_topic, src_partition, offsets[0]) stop_offset = TopicPartition(src_topic, src_partition, offsets[1]) + else: + raise ValueError("offsets and timestamps cannot both be None") + consumer.assign([start_offset]) num_messages = stop_offset.offset - start_offset.offset + 1 - msgs = [] try: msgs = consumer.consume(num_messages) diff --git a/tests/test_play.py b/tests/test_play.py index 5ba507e..f13b7a0 100644 --- a/tests/test_play.py +++ b/tests/test_play.py @@ -1,5 +1,6 @@ from unittest.mock import Mock, patch +import pytest from confluent_kafka import Message, TopicPartition from saluki.play import play @@ -95,3 +96,8 @@ def test_play_with_exception_when_consuming_consumer_still_closed(): mock_logger.exception.assert_called_once() mock_consumer().close.assert_called_once() + + +def test_play_raises_when_offsets_and_timestamps_are_none(): + with pytest.raises(ValueError): + play("", "", "", "", None, None) diff --git a/tests/test_sniff.py b/tests/test_sniff.py index 241cd96..fe50cc1 100644 --- a/tests/test_sniff.py +++ b/tests/test_sniff.py @@ -13,8 +13,8 @@ def test_sniff_with_two_partitions_in_a_topic(): ): fake_cluster_md = ClusterMetadata() broker1 = BrokerMetadata() - broker1.id = "id1" - broker1.host = "mybroker" + broker1.id = "id1" # type: ignore + broker1.host = "mybroker" # type: ignore broker1.port = 9093 fake_cluster_md.brokers = {0: broker1} diff --git a/tests/test_utils.py b/tests/test_utils.py index 372d2ab..9c6a98a 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -79,7 +79,7 @@ def test_deserialising_message_which_raises_does_not_stop_loop(mock_message): def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list(mock_message): with patch("saluki.utils.logger") as logger: ok_message = Mock(spec=Message) - ok_message.value.return_value = serialise_fc00(config_change=1, streams=[]) + ok_message.value.return_value = serialise_fc00(config_change=1, streams=[]) # type: ignore ok_message.error.return_value = False ok_message.timestamp.return_value = 2, 1 From 25e60b5d91d23731dcd44a9c1b629dced0f93d26 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Fri, 7 Nov 2025 10:16:51 +0000 Subject: [PATCH 16/23] add note on uvx --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index cf97ef7..dd8d776 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,9 @@ Deserialises [the ESS flatbuffers blobs](https://github.com/ess-dmsc/python-stre Also allows replaying data in a topic. # Usage + +To run the latest version, use `uvx saluki `. + See `saluki --help` for all options. ## `listen` - Listen to a topic for updates @@ -35,9 +38,6 @@ You can also filter out messages to specific schema(s) with the `-f` flag, like `saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps. -# Install -`pip install saluki` - -## Developer setup +# Developer setup `pip install -e .[dev]` From 62f581003bca268bc5435b7b40afbdd978b012ec Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Mon, 10 Nov 2025 15:31:28 +0000 Subject: [PATCH 17/23] review comments --- README.md | 23 ++++++++++++++++++++++- src/saluki/consume.py | 3 ++- src/saluki/listen.py | 3 ++- src/saluki/play.py | 6 ++++-- src/saluki/sniff.py | 6 ++++-- 5 files changed, 34 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index dd8d776..04e27db 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,9 @@ Also allows replaying data in a topic. # Usage -To run the latest version, use `uvx saluki `. +To run the latest version, install [uv](https://docs.astral.sh/uv/getting-started/installation/) and use `uvx saluki `. + +alternatively you can `pip install saluki` and run it from a `venv`. See `saluki --help` for all options. @@ -28,6 +30,25 @@ You can also filter out messages to specific schema(s) with the `-f` flag, like ## `sniff` - List all topics and their high, low watermarks and number of messages `saluki sniff mybroker:9092` +Output looks as follows: + +``` +$ saluki sniff mybroker:9092 + +INFO:saluki:Cluster ID: redpanda.0faa4595-7298-407e-9db7-7e2758d1af1f +INFO:saluki:Brokers: +INFO:saluki: 192.168.0.111:9092/1 +INFO:saluki: 192.168.0.112:9092/2 +INFO:saluki: 192.168.0.113:9092/0 +INFO:saluki:Topics: +INFO:saluki: MERLIN_events: +INFO:saluki: 0 - low:262322729, high:302663378, num_messages:40340649 +INFO:saluki: MERLIN_runInfo: +INFO:saluki: 0 - low:335, high:2516, num_messages:2181 +INFO:saluki: MERLIN_monitorHistograms: +INFO:saluki: 0 - low:7515, high:7551, num_messages:36 +``` + ## `play` - Replay data from one topic to another ### Between offsets diff --git a/src/saluki/consume.py b/src/saluki/consume.py index 875fa31..a6f0369 100644 --- a/src/saluki/consume.py +++ b/src/saluki/consume.py @@ -1,4 +1,5 @@ import logging +import uuid from confluent_kafka import Consumer, TopicPartition @@ -31,7 +32,7 @@ def consume( c = Consumer( { "bootstrap.servers": broker, - "group.id": "saluki-consume", + "group.id": f"saluki-consume-{uuid.uuid4()}", "session.timeout.ms": 6000, "auto.offset.reset": "latest", "enable.auto.offset.store": False, diff --git a/src/saluki/listen.py b/src/saluki/listen.py index 7fccb14..82a205d 100644 --- a/src/saluki/listen.py +++ b/src/saluki/listen.py @@ -1,4 +1,5 @@ import logging +import uuid from confluent_kafka import Consumer, TopicPartition @@ -24,7 +25,7 @@ def listen( c = Consumer( { "bootstrap.servers": broker, - "group.id": "saluki-listen", + "group.id": f"saluki-listen-{uuid.uuid4()}", "auto.offset.reset": "latest", "enable.auto.commit": False, } diff --git a/src/saluki/play.py b/src/saluki/play.py index 056f434..02a107d 100644 --- a/src/saluki/play.py +++ b/src/saluki/play.py @@ -1,4 +1,5 @@ import logging +import uuid from confluent_kafka import Consumer, Producer, TopicPartition @@ -28,7 +29,7 @@ def play( consumer = Consumer( { "bootstrap.servers": src_broker, - "group.id": "saluki-play", + "group.id": f"saluki-play-{uuid.uuid4()}", } ) producer = Producer( @@ -57,7 +58,8 @@ def play( try: msgs = consumer.consume(num_messages) - [producer.produce(dest_topic, message.value(), message.key()) for message in msgs] + for message in msgs: + producer.produce(dest_topic, message.value(), message.key()) producer.flush() except Exception: logger.exception("Got exception while replaying:") diff --git a/src/saluki/sniff.py b/src/saluki/sniff.py index 562f3ae..cd6fd59 100644 --- a/src/saluki/sniff.py +++ b/src/saluki/sniff.py @@ -1,4 +1,5 @@ import logging +import uuid from confluent_kafka import Consumer, TopicPartition from confluent_kafka.admin import AdminClient @@ -12,11 +13,12 @@ def sniff(broker: str) -> None: :param broker: The broker address including port number. """ a = AdminClient({"bootstrap.servers": broker}) - c = Consumer({"bootstrap.servers": broker, "group.id": "saluki-sniff"}) + c = Consumer({"bootstrap.servers": broker, "group.id": f"saluki-sniff-{uuid.uuid4()}"}) t = a.list_topics(timeout=5) logger.info(f"Cluster ID: {t.cluster_id}") logger.info("Brokers:") - [logger.info(f"\t{value}") for value in t.brokers.values()] + for value in t.brokers.values(): + logger.info(f"\t{value}") logger.info("Topics:") From 000df6d514cd1af2a186565815fe2a1d1f2df247 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Tue, 18 Nov 2025 14:37:48 +0000 Subject: [PATCH 18/23] backing up - add parser for timestamps --- pyproject.toml | 1 + src/saluki/main.py | 23 +++++++++++++---- src/saluki/sniff.py | 21 ++++++++++----- src/saluki/utils.py | 4 +-- tests/test_sniff.py | 63 +++++++++++++++++++++++++++++++++++---------- 5 files changed, 85 insertions(+), 27 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index be08d11..f36f0ac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ dynamic = ["version"] dependencies = [ "ess-streaming-data-types", "confluent-kafka", + "python-dateutil", "tzdata" ] readme = {file = "README.md", content-type = "text/markdown"} diff --git a/src/saluki/main.py b/src/saluki/main.py index d7beda4..da5ad28 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -2,6 +2,7 @@ import logging import sys +from dateutil.parser import parse, ParserError from saluki.consume import consume from saluki.listen import listen from saluki.play import play @@ -17,6 +18,16 @@ _SNIFF = "sniff" +def _dateutil_parsable_or_unix_timestamp(inp: str) -> float: + try: + try: + return parse(inp).timestamp() + except ParserError: + return float(inp) + except ValueError: + raise argparse.ArgumentTypeError(f"timestamp {inp} is not parsable by dateutil.parse() and is not a unix timestamp") + + def main() -> None: parser = argparse.ArgumentParser( prog="saluki", @@ -47,7 +58,7 @@ def main() -> None: sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command") sniff_parser = sub_parsers.add_parser(_SNIFF, help="sniff - broker metadata") - sniff_parser.add_argument("broker", type=str) + sniff_parser.add_argument("broker", type=str, help="broker, optionally suffixed with a topic name") consumer_parser = argparse.ArgumentParser(add_help=False) consumer_parser.add_argument( @@ -94,9 +105,7 @@ def main() -> None: type=int, nargs=2, ) - g.add_argument( - "-t", "--timestamps", help="unix timestamps to replay between", type=str, nargs=2 - ) + g.add_argument("-t", "--timestamps", help='timestamps to replay between in ISO8601 or RFC3339 format ie. "2025-11-17 07:00:00" ', type=_dateutil_parsable_or_unix_timestamp, nargs=2) if len(sys.argv) == 1: parser.print_help() @@ -130,7 +139,11 @@ def main() -> None: args.timestamps, ) elif args.command == _SNIFF: - sniff(args.broker) + try: + broker, topic = parse_kafka_uri(args.broker) + sniff(broker, topic) + except RuntimeError: + sniff(args.broker) if __name__ == "__main__": diff --git a/src/saluki/sniff.py b/src/saluki/sniff.py index cd6fd59..795e3b0 100644 --- a/src/saluki/sniff.py +++ b/src/saluki/sniff.py @@ -7,22 +7,31 @@ logger = logging.getLogger("saluki") -def sniff(broker: str) -> None: +def sniff(broker: str, topic: str | None = None) -> None: """ Prints the broker and topic metadata for a given broker. + If a topic is given, only this topic's partitions and watermarks will be printed. :param broker: The broker address including port number. + :param topic: Optional topic to filter information to. """ a = AdminClient({"bootstrap.servers": broker}) c = Consumer({"bootstrap.servers": broker, "group.id": f"saluki-sniff-{uuid.uuid4()}"}) t = a.list_topics(timeout=5) - logger.info(f"Cluster ID: {t.cluster_id}") - logger.info("Brokers:") - for value in t.brokers.values(): - logger.info(f"\t{value}") + if topic is not None and topic not in t.topics.keys(): + logger.warning(f"Topic {topic} not found on broker {broker}") + return - logger.info("Topics:") + if topic is None: + logger.info(f"Cluster ID: {t.cluster_id}") + logger.info("Brokers:") + for value in t.brokers.values(): + logger.info(f"\t{value}") + + logger.info("Topics:") for k, v in t.topics.items(): + if topic is not None and k != topic: + continue partitions = v.partitions.keys() logger.info(f"\t{k}:") for p in partitions: diff --git a/src/saluki/utils.py b/src/saluki/utils.py index 790d94d..80ee255 100644 --- a/src/saluki/utils.py +++ b/src/saluki/utils.py @@ -79,8 +79,8 @@ def parse_kafka_uri(uri: str) -> Tuple[str, str]: If username is provided, a SASL mechanism must also be provided. Any other validation must be performed in the calling code. """ - broker, topic = uri.split("/") if "/" in uri else (uri, "") - if not topic: + broker, topic = uri.split("/") if "/" in uri else (uri, None) + if topic is None: raise RuntimeError( f"Unable to parse URI {uri}, topic not defined. URI should be of form" f" broker[:port]/topic" diff --git a/tests/test_sniff.py b/tests/test_sniff.py index fe50cc1..d3a5b74 100644 --- a/tests/test_sniff.py +++ b/tests/test_sniff.py @@ -1,29 +1,38 @@ +import pytest from unittest.mock import patch from confluent_kafka.admin import BrokerMetadata, ClusterMetadata, TopicMetadata from saluki.sniff import sniff +@pytest.fixture() +def fake_cluster_md(): + """ + Returns a fake cluster metadata object with two topics; + one with 1 partition and the other with 2. + """ + fake_cluster_md = ClusterMetadata() + broker1 = BrokerMetadata() + broker1.id = "id1" # type: ignore + broker1.host = "mybroker" # type: ignore + broker1.port = 9093 + fake_cluster_md.brokers = {0: broker1} -def test_sniff_with_two_partitions_in_a_topic(): + topic1 = TopicMetadata() + topic1.partitions = {0: {}} + + topic2 = TopicMetadata() + topic2.partitions = {0: {}, 1: {}} + + fake_cluster_md.topics = {"topic1": topic1, "topic2": topic2} + return fake_cluster_md + +def test_sniff_with_two_partitions_in_a_topic(fake_cluster_md): with ( patch("saluki.sniff.AdminClient") as a, patch("saluki.sniff.Consumer") as c, patch("saluki.sniff.logger") as logger, ): - fake_cluster_md = ClusterMetadata() - broker1 = BrokerMetadata() - broker1.id = "id1" # type: ignore - broker1.host = "mybroker" # type: ignore - broker1.port = 9093 - fake_cluster_md.brokers = {0: broker1} - - topic1 = TopicMetadata() - topic1.partitions = {0: {}} - topic2 = TopicMetadata() - topic2.partitions = {0: {}, 1: {}} - - fake_cluster_md.topics = {"topic1": topic1, "topic2": topic2} a().list_topics.return_value = fake_cluster_md c().get_watermark_offsets.return_value = 1, 2 sniff("whatever") @@ -40,3 +49,29 @@ def test_sniff_with_two_partitions_in_a_topic(): topic2_call2 = logger.info.call_args_list[8] assert "1 - low:1, high:2, num_messages:1" in topic2_call2.args[0] + +def test_sniff_with_single_topic(fake_cluster_md): + with ( + patch("saluki.sniff.AdminClient") as a, + patch("saluki.sniff.Consumer") as c, + patch("saluki.sniff.logger") as logger, + ): + + a().list_topics.return_value = fake_cluster_md + c().get_watermark_offsets.return_value = 1, 2 + sniff("mybroker:9093", "topic1") + + assert "\ttopic1" in logger.info.call_args_list[0].args[0] + assert "\t\t0 - low:1, high:2, num_messages:1" in logger.info.call_args_list[1].args[0] + + +def test_sniff_with_single_nonexistent_topic(): + with ( + patch("saluki.sniff.AdminClient") as a, + patch("saluki.sniff.Consumer"), + patch("saluki.sniff.logger") as logger, + ): + # Deliberately blank cluster metadata ie. no topics + a().list_topics.return_value = ClusterMetadata() + sniff("somebroker:9092", "sometopic") + logger.warning.assert_called_with("Topic sometopic not found on broker somebroker:9092") From a72ad7dc1739fc5f93bf6ffbf605b8ef4eae1644 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Wed, 19 Nov 2025 13:11:50 +0000 Subject: [PATCH 19/23] backing up again --- src/saluki/main.py | 34 +++++++++++++++------------------- src/saluki/play.py | 38 ++++++++++++++++++++++++++++++++------ src/saluki/utils.py | 19 ++++++++++++++++++- 3 files changed, 65 insertions(+), 26 deletions(-) diff --git a/src/saluki/main.py b/src/saluki/main.py index da5ad28..c77b828 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -2,12 +2,11 @@ import logging import sys -from dateutil.parser import parse, ParserError from saluki.consume import consume from saluki.listen import listen from saluki.play import play from saluki.sniff import sniff -from saluki.utils import parse_kafka_uri +from saluki.utils import parse_kafka_uri, dateutil_parsable_or_unix_timestamp logger = logging.getLogger("saluki") logging.basicConfig(level=logging.INFO) @@ -18,22 +17,14 @@ _SNIFF = "sniff" -def _dateutil_parsable_or_unix_timestamp(inp: str) -> float: - try: - try: - return parse(inp).timestamp() - except ParserError: - return float(inp) - except ValueError: - raise argparse.ArgumentTypeError(f"timestamp {inp} is not parsable by dateutil.parse() and is not a unix timestamp") - - def main() -> None: parser = argparse.ArgumentParser( prog="saluki", description="serialise/de-serialise flatbuffers and consume/produce from/to kafka", ) - parser.add_argument( + common_options = argparse.ArgumentParser(add_help=False) + common_options.add_argument("-v", "--verbose", help="show DEBUG logs", action='store_true') + common_options.add_argument( "-l", "--log-file", help="filename to output all data to", @@ -57,8 +48,8 @@ def main() -> None: sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command") - sniff_parser = sub_parsers.add_parser(_SNIFF, help="sniff - broker metadata") - sniff_parser.add_argument("broker", type=str, help="broker, optionally suffixed with a topic name") + sniff_parser = sub_parsers.add_parser(_SNIFF, help="sniff - broker metadata", parents=[common_options]) + sniff_parser.add_argument("broker", type=str, help="broker, optionally suffixed with a topic name to filter to") consumer_parser = argparse.ArgumentParser(add_help=False) consumer_parser.add_argument( @@ -70,7 +61,7 @@ def main() -> None: ) consumer_mode_parser = sub_parsers.add_parser( - _CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser] + _CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser, common_options] ) consumer_mode_parser.add_argument( "-m", @@ -88,13 +79,13 @@ def main() -> None: listen_parser = sub_parsers.add_parser( # noqa: F841 _LISTEN, help="listen mode - listen until KeyboardInterrupt", - parents=[topic_parser, consumer_parser], + parents=[topic_parser, consumer_parser, common_options], ) play_parser = sub_parsers.add_parser( _PLAY, help="replay mode - replay data into another topic", - parents=[], + parents=[common_options], ) play_parser.add_argument("topics", type=str, nargs=2, help="SRC topic DEST topic") g = play_parser.add_mutually_exclusive_group(required=True) @@ -105,13 +96,16 @@ def main() -> None: type=int, nargs=2, ) - g.add_argument("-t", "--timestamps", help='timestamps to replay between in ISO8601 or RFC3339 format ie. "2025-11-17 07:00:00" ', type=_dateutil_parsable_or_unix_timestamp, nargs=2) + g.add_argument("-t", "--timestamps", help='timestamps to replay between in ISO8601 or RFC3339 format ie. "2025-11-17 07:00:00 or as a unix timestamp" ', type=dateutil_parsable_or_unix_timestamp, nargs=2) if len(sys.argv) == 1: parser.print_help() sys.exit(1) args = parser.parse_args() + if args.verbose: + logger.setLevel(logging.DEBUG) + if args.log_file: logger.addHandler(logging.FileHandler(args.log_file.name)) @@ -141,8 +135,10 @@ def main() -> None: elif args.command == _SNIFF: try: broker, topic = parse_kafka_uri(args.broker) + logger.debug(f"Sniffing single topic {topic} on broker {broker}") sniff(broker, topic) except RuntimeError: + logger.debug(f"Sniffing whole broker {args.broker}") sniff(args.broker) diff --git a/src/saluki/play.py b/src/saluki/play.py index 02a107d..1c43435 100644 --- a/src/saluki/play.py +++ b/src/saluki/play.py @@ -1,6 +1,6 @@ import logging import uuid - +from time import sleep from confluent_kafka import Consumer, Producer, TopicPartition logger = logging.getLogger("saluki") @@ -40,27 +40,53 @@ def play( src_partition = 0 if timestamps is not None: - start_offset, stop_offset = consumer.offsets_for_times( + logger.debug(f"getting offsets for times: {timestamps[0]} and {timestamps[1]}") + start_offset = consumer.offsets_for_times( [ TopicPartition(src_topic, src_partition, timestamps[0]), - TopicPartition(src_topic, src_partition, timestamps[1]), + ] - ) + )[0] + # See https://github.com/confluentinc/confluent-kafka-python/issues/1178 as to why offsets_for_times is called twice. + stop_offset = consumer.offsets_for_times([TopicPartition(src_topic, src_partition, timestamps[1])])[0] elif offsets is not None: start_offset = TopicPartition(src_topic, src_partition, offsets[0]) stop_offset = TopicPartition(src_topic, src_partition, offsets[1]) else: raise ValueError("offsets and timestamps cannot both be None") + logger.debug(f"start_offset: {start_offset.offset}, stop_offset: {stop_offset.offset}") + + logger.debug(f"assigning to offset {start_offset.offset}") consumer.assign([start_offset]) num_messages = stop_offset.offset - start_offset.offset + 1 + def delivery_report(err, msg): + """ Called once for each message produced to indicate delivery result. + Triggered by poll() or flush().""" + if err is not None: + logger.error('Message delivery failed: {}'.format(err)) + else: + logger.debug('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) + try: msgs = consumer.consume(num_messages) + logger.debug(f"finished consuming {num_messages} messages") + consumer.close() + # logger.debug(f"{msgs}") for message in msgs: - producer.produce(dest_topic, message.value(), message.key()) - producer.flush() + producer.poll(0) + producer.produce(dest_topic, message.value(), message.key(), callback=delivery_report) + # producer.produce_batch(dest_topic, [{'key': message.key(), 'value': message.value()} for message in msgs]) + # producer.poll() + logger.debug(f"flushing producer. len(p): {len(producer)}") + # while len(producer): producer.flush() + + producer.flush(timeout=10) + + logger.debug(f"length after flushing: {len(producer)}") + except Exception: logger.exception("Got exception while replaying:") finally: diff --git a/src/saluki/utils.py b/src/saluki/utils.py index 80ee255..b2074b2 100644 --- a/src/saluki/utils.py +++ b/src/saluki/utils.py @@ -1,9 +1,11 @@ import datetime import logging +from argparse import ArgumentTypeError from typing import List, Tuple from zoneinfo import ZoneInfo from confluent_kafka import Message +from dateutil.parser import parse, ParserError from streaming_data_types import DESERIALISERS from streaming_data_types.exceptions import ShortBufferException from streaming_data_types.utils import get_schema @@ -47,7 +49,7 @@ def deserialise_and_print_messages( if schemas_to_filter_to is not None and schema not in schemas_to_filter_to: continue time = _parse_timestamp(msg) - logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}") + logger.info(f"(o:{msg.offset()},t:{time},s:{schema}) {deserialised}") except Exception as e: logger.exception(f"Got error while deserialising: {e}") @@ -89,3 +91,18 @@ def parse_kafka_uri(uri: str) -> Tuple[str, str]: broker, topic, ) + + +def dateutil_parsable_or_unix_timestamp(inp: str) -> int: + """ + Parse a dateutil string, if this fails then try to parse a unix timestamp. + This returns a unix timestamp as an int + """ + try: + try: + return int(round(parse(inp).timestamp() * 1000)) + except ParserError: + logger.debug(f"Failed to parse {inp} as a dateutil parsable. Falling back to unix timestamp") + return int(inp) + except ValueError: + raise ArgumentTypeError(f"timestamp {inp} is not parsable by dateutil.parse() and is not a unix timestamp") From 7f3193791299347352677d3c1389c77b093a1ed0 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Wed, 19 Nov 2025 14:55:08 +0000 Subject: [PATCH 20/23] get timestamps working for replay and consume --- pyproject.toml | 2 +- src/saluki/consume.py | 8 ++++++++ src/saluki/main.py | 11 +++++++---- src/saluki/play.py | 19 ++----------------- src/saluki/utils.py | 2 +- 5 files changed, 19 insertions(+), 23 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f36f0ac..a74c110 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "saluki" dynamic = ["version"] dependencies = [ "ess-streaming-data-types", - "confluent-kafka", + "confluent-kafka>=2.12.1", # for produce_batch in play() "python-dateutil", "tzdata" ] diff --git a/src/saluki/consume.py b/src/saluki/consume.py index a6f0369..3eca7f3 100644 --- a/src/saluki/consume.py +++ b/src/saluki/consume.py @@ -16,6 +16,7 @@ def consume( offset: int | None = None, go_forwards: bool = False, schemas_to_filter_to: list[str] | None = None, + timestamp: int|None = None, ) -> None: """ consume from a topic and deserialise each message @@ -27,6 +28,7 @@ def consume( :param offset: offset to consume from/to :param go_forwards: whether to consume forwards or backwards :param schemas_to_filter_to: schemas in messages to filter to + :param timestamp: optionally a timestamp as a starting point :return: None """ c = Consumer( @@ -41,6 +43,12 @@ def consume( } ) + if timestamp is not None: + offset = c.offsets_for_times( + [TopicPartition(topic, partition, timestamp)] + )[0].offset + logger.debug(f"offset for timestamp {timestamp} is {offset}") + if go_forwards: if offset is None: raise ValueError("Can't go forwards without an offset") diff --git a/src/saluki/main.py b/src/saluki/main.py index c77b828..248b5f7 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -71,10 +71,13 @@ def main() -> None: required=False, default=1, ) - consumer_mode_parser.add_argument( - "-o", "--offset", help="offset to consume from", type=int, required=False - ) + consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true") + cg = consumer_mode_parser.add_mutually_exclusive_group(required=False) + cg.add_argument( + "-o", "--offset", help="offset to consume from", type=int, + ) + cg.add_argument("-t", "--timestamp", help="timestamp to consume from", type=dateutil_parsable_or_unix_timestamp) listen_parser = sub_parsers.add_parser( # noqa: F841 _LISTEN, @@ -118,7 +121,7 @@ def main() -> None: elif args.command == _CONSUME: broker, topic = parse_kafka_uri(args.topic) consume( - broker, topic, args.partition, args.messages, args.offset, args.go_forwards, args.filter + broker, topic, args.partition, args.messages, args.offset, args.go_forwards, args.filter, args.timestamp ) elif args.command == _PLAY: src_broker, src_topic = parse_kafka_uri(args.topics[0]) diff --git a/src/saluki/play.py b/src/saluki/play.py index 1c43435..8c15440 100644 --- a/src/saluki/play.py +++ b/src/saluki/play.py @@ -1,6 +1,5 @@ import logging import uuid -from time import sleep from confluent_kafka import Consumer, Producer, TopicPartition logger = logging.getLogger("saluki") @@ -17,6 +16,7 @@ def play( """ Replay data from src_topic to dest_topic between the offsets OR timestamps specified. This currently assumes contiguous data in a topic (ie. no log compaction) and uses partition 0. + It also does not copy message timestamps. :param src_broker: The source broker, including port. :param src_topic: The topic to replay data from. @@ -62,27 +62,12 @@ def play( num_messages = stop_offset.offset - start_offset.offset + 1 - def delivery_report(err, msg): - """ Called once for each message produced to indicate delivery result. - Triggered by poll() or flush().""" - if err is not None: - logger.error('Message delivery failed: {}'.format(err)) - else: - logger.debug('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) - try: msgs = consumer.consume(num_messages) logger.debug(f"finished consuming {num_messages} messages") consumer.close() - # logger.debug(f"{msgs}") - for message in msgs: - producer.poll(0) - producer.produce(dest_topic, message.value(), message.key(), callback=delivery_report) - # producer.produce_batch(dest_topic, [{'key': message.key(), 'value': message.value()} for message in msgs]) - # producer.poll() + producer.produce_batch(dest_topic, [{'key': message.key(), 'value': message.value()} for message in msgs]) logger.debug(f"flushing producer. len(p): {len(producer)}") - # while len(producer): producer.flush() - producer.flush(timeout=10) logger.debug(f"length after flushing: {len(producer)}") diff --git a/src/saluki/utils.py b/src/saluki/utils.py index b2074b2..fb16b58 100644 --- a/src/saluki/utils.py +++ b/src/saluki/utils.py @@ -101,7 +101,7 @@ def dateutil_parsable_or_unix_timestamp(inp: str) -> int: try: try: return int(round(parse(inp).timestamp() * 1000)) - except ParserError: + except (ParserError, OverflowError): logger.debug(f"Failed to parse {inp} as a dateutil parsable. Falling back to unix timestamp") return int(inp) except ValueError: From a0f8c2d73149f953d1f46000501011eb8a3bb542 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Wed, 19 Nov 2025 14:56:56 +0000 Subject: [PATCH 21/23] ruff --- src/saluki/consume.py | 6 ++---- src/saluki/main.py | 42 ++++++++++++++++++++++++++++++++++-------- src/saluki/play.py | 13 +++++++++---- src/saluki/utils.py | 10 +++++++--- tests/test_sniff.py | 6 ++++-- 5 files changed, 56 insertions(+), 21 deletions(-) diff --git a/src/saluki/consume.py b/src/saluki/consume.py index 3eca7f3..18f9aa1 100644 --- a/src/saluki/consume.py +++ b/src/saluki/consume.py @@ -16,7 +16,7 @@ def consume( offset: int | None = None, go_forwards: bool = False, schemas_to_filter_to: list[str] | None = None, - timestamp: int|None = None, + timestamp: int | None = None, ) -> None: """ consume from a topic and deserialise each message @@ -44,9 +44,7 @@ def consume( ) if timestamp is not None: - offset = c.offsets_for_times( - [TopicPartition(topic, partition, timestamp)] - )[0].offset + offset = c.offsets_for_times([TopicPartition(topic, partition, timestamp)])[0].offset logger.debug(f"offset for timestamp {timestamp} is {offset}") if go_forwards: diff --git a/src/saluki/main.py b/src/saluki/main.py index 248b5f7..1bcf97b 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -6,7 +6,7 @@ from saluki.listen import listen from saluki.play import play from saluki.sniff import sniff -from saluki.utils import parse_kafka_uri, dateutil_parsable_or_unix_timestamp +from saluki.utils import dateutil_parsable_or_unix_timestamp, parse_kafka_uri logger = logging.getLogger("saluki") logging.basicConfig(level=logging.INFO) @@ -23,7 +23,7 @@ def main() -> None: description="serialise/de-serialise flatbuffers and consume/produce from/to kafka", ) common_options = argparse.ArgumentParser(add_help=False) - common_options.add_argument("-v", "--verbose", help="show DEBUG logs", action='store_true') + common_options.add_argument("-v", "--verbose", help="show DEBUG logs", action="store_true") common_options.add_argument( "-l", "--log-file", @@ -48,8 +48,12 @@ def main() -> None: sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command") - sniff_parser = sub_parsers.add_parser(_SNIFF, help="sniff - broker metadata", parents=[common_options]) - sniff_parser.add_argument("broker", type=str, help="broker, optionally suffixed with a topic name to filter to") + sniff_parser = sub_parsers.add_parser( + _SNIFF, help="sniff - broker metadata", parents=[common_options] + ) + sniff_parser.add_argument( + "broker", type=str, help="broker, optionally suffixed with a topic name to filter to" + ) consumer_parser = argparse.ArgumentParser(add_help=False) consumer_parser.add_argument( @@ -75,9 +79,17 @@ def main() -> None: consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true") cg = consumer_mode_parser.add_mutually_exclusive_group(required=False) cg.add_argument( - "-o", "--offset", help="offset to consume from", type=int, + "-o", + "--offset", + help="offset to consume from", + type=int, + ) + cg.add_argument( + "-t", + "--timestamp", + help="timestamp to consume from", + type=dateutil_parsable_or_unix_timestamp, ) - cg.add_argument("-t", "--timestamp", help="timestamp to consume from", type=dateutil_parsable_or_unix_timestamp) listen_parser = sub_parsers.add_parser( # noqa: F841 _LISTEN, @@ -99,7 +111,14 @@ def main() -> None: type=int, nargs=2, ) - g.add_argument("-t", "--timestamps", help='timestamps to replay between in ISO8601 or RFC3339 format ie. "2025-11-17 07:00:00 or as a unix timestamp" ', type=dateutil_parsable_or_unix_timestamp, nargs=2) + g.add_argument( + "-t", + "--timestamps", + help="timestamps to replay between in ISO8601 or RFC3339 format ie." + ' "2025-11-17 07:00:00 or as a unix timestamp" ', + type=dateutil_parsable_or_unix_timestamp, + nargs=2, + ) if len(sys.argv) == 1: parser.print_help() @@ -121,7 +140,14 @@ def main() -> None: elif args.command == _CONSUME: broker, topic = parse_kafka_uri(args.topic) consume( - broker, topic, args.partition, args.messages, args.offset, args.go_forwards, args.filter, args.timestamp + broker, + topic, + args.partition, + args.messages, + args.offset, + args.go_forwards, + args.filter, + args.timestamp, ) elif args.command == _PLAY: src_broker, src_topic = parse_kafka_uri(args.topics[0]) diff --git a/src/saluki/play.py b/src/saluki/play.py index 8c15440..1f058e8 100644 --- a/src/saluki/play.py +++ b/src/saluki/play.py @@ -1,5 +1,6 @@ import logging import uuid + from confluent_kafka import Consumer, Producer, TopicPartition logger = logging.getLogger("saluki") @@ -44,11 +45,13 @@ def play( start_offset = consumer.offsets_for_times( [ TopicPartition(src_topic, src_partition, timestamps[0]), - ] )[0] - # See https://github.com/confluentinc/confluent-kafka-python/issues/1178 as to why offsets_for_times is called twice. - stop_offset = consumer.offsets_for_times([TopicPartition(src_topic, src_partition, timestamps[1])])[0] + # See https://github.com/confluentinc/confluent-kafka-python/issues/1178 + # as to why offsets_for_times is called twice. + stop_offset = consumer.offsets_for_times( + [TopicPartition(src_topic, src_partition, timestamps[1])] + )[0] elif offsets is not None: start_offset = TopicPartition(src_topic, src_partition, offsets[0]) stop_offset = TopicPartition(src_topic, src_partition, offsets[1]) @@ -66,7 +69,9 @@ def play( msgs = consumer.consume(num_messages) logger.debug(f"finished consuming {num_messages} messages") consumer.close() - producer.produce_batch(dest_topic, [{'key': message.key(), 'value': message.value()} for message in msgs]) + producer.produce_batch( + dest_topic, [{"key": message.key(), "value": message.value()} for message in msgs] + ) logger.debug(f"flushing producer. len(p): {len(producer)}") producer.flush(timeout=10) diff --git a/src/saluki/utils.py b/src/saluki/utils.py index fb16b58..d033a57 100644 --- a/src/saluki/utils.py +++ b/src/saluki/utils.py @@ -5,7 +5,7 @@ from zoneinfo import ZoneInfo from confluent_kafka import Message -from dateutil.parser import parse, ParserError +from dateutil.parser import ParserError, parse from streaming_data_types import DESERIALISERS from streaming_data_types.exceptions import ShortBufferException from streaming_data_types.utils import get_schema @@ -102,7 +102,11 @@ def dateutil_parsable_or_unix_timestamp(inp: str) -> int: try: return int(round(parse(inp).timestamp() * 1000)) except (ParserError, OverflowError): - logger.debug(f"Failed to parse {inp} as a dateutil parsable. Falling back to unix timestamp") + logger.debug( + f"Failed to parse {inp} as a dateutil parsable. Falling back to unix timestamp" + ) return int(inp) except ValueError: - raise ArgumentTypeError(f"timestamp {inp} is not parsable by dateutil.parse() and is not a unix timestamp") + raise ArgumentTypeError( + f"timestamp {inp} is not parsable by dateutil.parse() and is not a unix timestamp" + ) diff --git a/tests/test_sniff.py b/tests/test_sniff.py index d3a5b74..8c37d87 100644 --- a/tests/test_sniff.py +++ b/tests/test_sniff.py @@ -1,10 +1,11 @@ -import pytest from unittest.mock import patch +import pytest from confluent_kafka.admin import BrokerMetadata, ClusterMetadata, TopicMetadata from saluki.sniff import sniff + @pytest.fixture() def fake_cluster_md(): """ @@ -27,6 +28,7 @@ def fake_cluster_md(): fake_cluster_md.topics = {"topic1": topic1, "topic2": topic2} return fake_cluster_md + def test_sniff_with_two_partitions_in_a_topic(fake_cluster_md): with ( patch("saluki.sniff.AdminClient") as a, @@ -50,13 +52,13 @@ def test_sniff_with_two_partitions_in_a_topic(fake_cluster_md): topic2_call2 = logger.info.call_args_list[8] assert "1 - low:1, high:2, num_messages:1" in topic2_call2.args[0] + def test_sniff_with_single_topic(fake_cluster_md): with ( patch("saluki.sniff.AdminClient") as a, patch("saluki.sniff.Consumer") as c, patch("saluki.sniff.logger") as logger, ): - a().list_topics.return_value = fake_cluster_md c().get_watermark_offsets.return_value = 1, 2 sniff("mybroker:9093", "topic1") From e910159e389f3c457ff1e88f04996b8a2a5d7295 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Wed, 19 Nov 2025 15:44:03 +0000 Subject: [PATCH 22/23] finish tests --- tests/test_consume.py | 12 ++++++++++++ tests/test_play.py | 22 +++++++++++----------- tests/test_utils.py | 27 ++++++++++++++++++++++++++- 3 files changed, 49 insertions(+), 12 deletions(-) diff --git a/tests/test_consume.py b/tests/test_consume.py index 6f49d02..49e3d0d 100644 --- a/tests/test_consume.py +++ b/tests/test_consume.py @@ -82,3 +82,15 @@ def test_consume_but_exception_thrown_consumer_is_closed(): c.return_value.consume.side_effect = Exception consume("somebroker", "sometopic", num_messages=1) c.return_value.close.assert_called_once() + +@patch("saluki.consume.Consumer") +def test_consume_with_timestamp(mock_consumer): + expected_topic = "sometopic" + partition = 0 + timestamp = 1234 + offset = 2345 + + mock_consumer.offsets_for_times.return_value = [TopicPartition(expected_topic, partition, offset)] + consume("somebroker", topic=expected_topic, timestamp=timestamp, partition=partition) + + mock_consumer.return_value.assign.assert_called_with([TopicPartition(expected_topic, partition, offset)]) diff --git a/tests/test_play.py b/tests/test_play.py index f13b7a0..e1e595a 100644 --- a/tests/test_play.py +++ b/tests/test_play.py @@ -37,10 +37,10 @@ def test_play_with_offsets(): consumer_obj.consume.assert_called_with(2) # stop - start + 1 p_obj = p() - call_1 = p_obj.produce.call_args_list[0] - assert call_1.args == (dest_topic, message_1_val, message_1_key) - call_2 = p_obj.produce.call_args_list[1] - assert call_2.args == (dest_topic, message_2_val, message_2_key) + produce_batch_call = p_obj.produce_batch.call_args.args + assert dest_topic == produce_batch_call[0] + assert {'key': message_1_key, 'value': message_1_val} in produce_batch_call[1] + assert {'key': message_2_key, 'value': message_2_val} in produce_batch_call[1] def test_play_with_timestamps(): @@ -64,9 +64,9 @@ def test_play_with_timestamps(): with patch("saluki.play.Consumer") as c, patch("saluki.play.Producer") as p: consumer_obj = c() - consumer_obj.offsets_for_times.return_value = [ - TopicPartition(src_topic, partition=0, offset=2), - TopicPartition(src_topic, partition=0, offset=3), + consumer_obj.offsets_for_times.side_effect = [ + [TopicPartition(src_topic, partition=0, offset=2)], + [TopicPartition(src_topic, partition=0, offset=3)] ] consumer_obj.consume.return_value = [message_1, message_2] @@ -78,10 +78,10 @@ def test_play_with_timestamps(): consumer_obj.consume.assert_called_with(2) # stop - start + 1 p_obj = p() - call_1 = p_obj.produce.call_args_list[0] - assert call_1.args == (dest_topic, message_1_val, message_1_key) - call_2 = p_obj.produce.call_args_list[1] - assert call_2.args == (dest_topic, message_2_val, message_2_key) + produce_batch_call = p_obj.produce_batch.call_args.args + assert dest_topic == produce_batch_call[0] + assert {'key': message_1_key, 'value': message_1_val} in produce_batch_call[1] + assert {'key': message_2_key, 'value': message_2_val} in produce_batch_call[1] def test_play_with_exception_when_consuming_consumer_still_closed(): diff --git a/tests/test_utils.py b/tests/test_utils.py index 9c6a98a..a1651fd 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,3 +1,4 @@ +from argparse import ArgumentTypeError from unittest.mock import Mock, patch import pytest @@ -13,7 +14,7 @@ _parse_timestamp, _try_to_deserialise_message, deserialise_and_print_messages, - parse_kafka_uri, + parse_kafka_uri, dateutil_parsable_or_unix_timestamp, ) @@ -174,3 +175,27 @@ def test_uri_with_no_topic(): test_broker = "some_broker" with pytest.raises(RuntimeError): parse_kafka_uri(test_broker) + +@pytest.mark.parametrize("timestamp", +["2025-11-19T15:27:11", + "2025-11-19T15:27:11Z", +"2025-11-19T15:27:11+00:00" + ] + +) +def test_parses_datetime_properly_with_string(timestamp): + assert dateutil_parsable_or_unix_timestamp(timestamp) == 1763566031000 + +@pytest.mark.parametrize("timestamp", +["1763566031000", + "1763566031", + "1763566031000000", + ] +) +def test_parses_datetime_properly_and_leaves_unix_timestamp_alone(timestamp): + assert dateutil_parsable_or_unix_timestamp(timestamp) == int(timestamp) + + +def test_invalid_timestamp_raises(): + with pytest.raises(ArgumentTypeError): + dateutil_parsable_or_unix_timestamp("invalid") \ No newline at end of file From c083469330d6ab3d176c1eaefc8754b1a52be002 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Wed, 19 Nov 2025 15:45:34 +0000 Subject: [PATCH 23/23] ruff --- tests/test_consume.py | 9 +++++++-- tests/test_play.py | 10 +++++----- tests/test_utils.py | 25 +++++++++++++------------ 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/tests/test_consume.py b/tests/test_consume.py index 49e3d0d..d72c061 100644 --- a/tests/test_consume.py +++ b/tests/test_consume.py @@ -83,6 +83,7 @@ def test_consume_but_exception_thrown_consumer_is_closed(): consume("somebroker", "sometopic", num_messages=1) c.return_value.close.assert_called_once() + @patch("saluki.consume.Consumer") def test_consume_with_timestamp(mock_consumer): expected_topic = "sometopic" @@ -90,7 +91,11 @@ def test_consume_with_timestamp(mock_consumer): timestamp = 1234 offset = 2345 - mock_consumer.offsets_for_times.return_value = [TopicPartition(expected_topic, partition, offset)] + mock_consumer.offsets_for_times.return_value = [ + TopicPartition(expected_topic, partition, offset) + ] consume("somebroker", topic=expected_topic, timestamp=timestamp, partition=partition) - mock_consumer.return_value.assign.assert_called_with([TopicPartition(expected_topic, partition, offset)]) + mock_consumer.return_value.assign.assert_called_with( + [TopicPartition(expected_topic, partition, offset)] + ) diff --git a/tests/test_play.py b/tests/test_play.py index e1e595a..94b49af 100644 --- a/tests/test_play.py +++ b/tests/test_play.py @@ -39,8 +39,8 @@ def test_play_with_offsets(): p_obj = p() produce_batch_call = p_obj.produce_batch.call_args.args assert dest_topic == produce_batch_call[0] - assert {'key': message_1_key, 'value': message_1_val} in produce_batch_call[1] - assert {'key': message_2_key, 'value': message_2_val} in produce_batch_call[1] + assert {"key": message_1_key, "value": message_1_val} in produce_batch_call[1] + assert {"key": message_2_key, "value": message_2_val} in produce_batch_call[1] def test_play_with_timestamps(): @@ -66,7 +66,7 @@ def test_play_with_timestamps(): consumer_obj = c() consumer_obj.offsets_for_times.side_effect = [ [TopicPartition(src_topic, partition=0, offset=2)], - [TopicPartition(src_topic, partition=0, offset=3)] + [TopicPartition(src_topic, partition=0, offset=3)], ] consumer_obj.consume.return_value = [message_1, message_2] @@ -80,8 +80,8 @@ def test_play_with_timestamps(): p_obj = p() produce_batch_call = p_obj.produce_batch.call_args.args assert dest_topic == produce_batch_call[0] - assert {'key': message_1_key, 'value': message_1_val} in produce_batch_call[1] - assert {'key': message_2_key, 'value': message_2_val} in produce_batch_call[1] + assert {"key": message_1_key, "value": message_1_val} in produce_batch_call[1] + assert {"key": message_2_key, "value": message_2_val} in produce_batch_call[1] def test_play_with_exception_when_consuming_consumer_still_closed(): diff --git a/tests/test_utils.py b/tests/test_utils.py index a1651fd..263c280 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -13,8 +13,9 @@ from saluki.utils import ( _parse_timestamp, _try_to_deserialise_message, + dateutil_parsable_or_unix_timestamp, deserialise_and_print_messages, - parse_kafka_uri, dateutil_parsable_or_unix_timestamp, + parse_kafka_uri, ) @@ -176,21 +177,21 @@ def test_uri_with_no_topic(): with pytest.raises(RuntimeError): parse_kafka_uri(test_broker) -@pytest.mark.parametrize("timestamp", -["2025-11-19T15:27:11", - "2025-11-19T15:27:11Z", -"2025-11-19T15:27:11+00:00" - ] +@pytest.mark.parametrize( + "timestamp", ["2025-11-19T15:27:11", "2025-11-19T15:27:11Z", "2025-11-19T15:27:11+00:00"] ) def test_parses_datetime_properly_with_string(timestamp): assert dateutil_parsable_or_unix_timestamp(timestamp) == 1763566031000 -@pytest.mark.parametrize("timestamp", -["1763566031000", - "1763566031", - "1763566031000000", - ] + +@pytest.mark.parametrize( + "timestamp", + [ + "1763566031000", + "1763566031", + "1763566031000000", + ], ) def test_parses_datetime_properly_and_leaves_unix_timestamp_alone(timestamp): assert dateutil_parsable_or_unix_timestamp(timestamp) == int(timestamp) @@ -198,4 +199,4 @@ def test_parses_datetime_properly_and_leaves_unix_timestamp_alone(timestamp): def test_invalid_timestamp_raises(): with pytest.raises(ArgumentTypeError): - dateutil_parsable_or_unix_timestamp("invalid") \ No newline at end of file + dateutil_parsable_or_unix_timestamp("invalid")