diff --git a/.gitignore b/.gitignore index 090159801f..752e555076 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,9 @@ conf/st2.travis.conf # generated GitHub Actions conf conf/st2.githubactions.conf +# Vagrant +.vagrant/ + # Installer logs pip-log.txt diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 1e48760eef..c5d99d2c85 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -13,6 +13,15 @@ Changed ~~~~~~~ * Bumped `jsonschema` 2.6.0 -> 3.2.0 now that python3.6 is not supported. #6118 +* The FileWatchSensor in the linux pack uses `watchdog` now instead of `logshipper` (`logshipper` and its requirement `pyinotify` are unmaintained). + The `watchdog` project is actively maintained, and has wide support for Linux, MacOS, BSD, Windows, and a polling fallback implementation. + Dropping `pyinotify` has a side benefit of allowing MacOS users to more easily hack on StackStorm's code, since `pyinotify` cannot install on MacOS. + + The FileWatchSensor is now an excellent example of how to write a sensor following this refactor. It breaks up the code into well structured classes + that all have a single focus. You can also run the sensor Python script itself, which makes development and testing much easier. #5096 + + Contributed by @blag + Added ~~~~~ * Continue introducing `pants `_ to improve DX (Developer Experience) diff --git a/contrib/linux/README.md b/contrib/linux/README.md index 6c5100e2e8..b296ad9741 100644 --- a/contrib/linux/README.md +++ b/contrib/linux/README.md @@ -2,32 +2,42 @@ This pack contains actions for commonly used Linux commands and tools. -## Configuration +## Sensors + +### FileWatchSensor + +This sensor monitors files specified in rules (under trigger parameters) for new lines. +Once a new line is detected, a trigger instance is emitted. + +#### Adding a file path to the file watch sensor -* ``file_watch_sensor.file_paths`` - A list of paths to the files to monitor. - Note: Those need to be full paths to the files (e.g. ``/var/log/auth.log``) - and not directories (files don't need to exist yet when the sensor is ran - though). +To tell the FileWatchSensor to start watching a new file, define a rule that +- uses the `linux.file_watch.line` trigger type +- pass the `file_path` to watch under trigger parameters. -Example: +For example, this rule would cause the sensor to start watching `/tmp/st2_test`: ```yaml --- -file_watch_sensor: - file_paths: - - /opt/data/absolute_path_to_file.log -``` +name: sample_rule_file_watch +description: Run echo on changes to /tmp/st2_test +enabled: false -## Sensors +trigger: + type: linux.file_watch.line + parameters: + file_path: /tmp/st2_test -### FileWatchSensor +action: + ref: core.local + parameters: + cmd: echo "{{trigger}}" +``` -This sensor monitors specified files for new new lines. Once a new line is -detected, a trigger is emitted. -### linux.file_watch.line trigger +#### linux.file_watch.line trigger -Example trigger payload: +Example trigger instance payload: ```json { diff --git a/contrib/linux/requirements.txt b/contrib/linux/requirements.txt index d1864ccbb8..e59495eee0 100644 --- a/contrib/linux/requirements.txt +++ b/contrib/linux/requirements.txt @@ -1,3 +1 @@ -# used by file watcher sensor -pyinotify>=0.9.5,<=0.10 ; platform_system=="Linux" -logshipper@ git+https://github.com/StackStorm/logshipper.git@stackstorm_patched ; platform_system=="Linux" +watchdog diff --git a/contrib/linux/sensors/README.md b/contrib/linux/sensors/README.md index 084fcad6a6..743a0a3246 100644 --- a/contrib/linux/sensors/README.md +++ b/contrib/linux/sensors/README.md @@ -1,6 +1,6 @@ ## NOTICE -File watch sensor has been updated to use trigger with parameters supplied via a rule approach. Tailing a file path supplied via a config file is now deprecated. +File watch sensor has been updated to use trigger with parameters supplied via a rule approach. An example rule to supply a file path is as follows: diff --git a/contrib/linux/sensors/file_watch_sensor.py b/contrib/linux/sensors/file_watch_sensor.py index 906dde9d48..77a702ff1c 100644 --- a/contrib/linux/sensors/file_watch_sensor.py +++ b/contrib/linux/sensors/file_watch_sensor.py @@ -13,39 +13,412 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os -import eventlet +import pathlib +import signal +import time +import sys -from logshipper.tail import Tail +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler -from st2reactor.sensor.base import Sensor +try: + from st2reactor.sensor.base import Sensor +except ImportError: + + class Sensor: + def __init__(self, *args, sensor_service=None, config=None, **kwargs): + self.sensor_service = sensor_service + self.config = config + + +class EventHandler(FileSystemEventHandler): + """ + A class to track and route different events to event handlers/callbacks for + files. This allows this EventHandler class to be used for watches on + individual files, since the directory events will include events for + individual files. + """ + + def __init__(self, *args, callbacks=None, **kwargs): + self.callbacks = callbacks or {} + + def dispatch(self, event): + if not event.is_directory: + super().dispatch(event) + + def on_created(self, event): + cb = self.callbacks.get("created") + if cb: + cb(event=event) + + def on_modified(self, event): + cb = self.callbacks.get("modified") + if cb: + cb(event=event) + + def on_moved(self, event): + cb = self.callbacks.get("moved") + if cb: + cb(event=event) + + def on_deleted(self, event): + cb = self.callbacks.get("deleted") + if cb: + cb(event=event) + + +class SingleFileTail(object): + """ + A class to tail a single file, also handling emitting events when the + watched file is created, truncated, or moved. + + If follow is False (the default), then the watch will be removed when the + file is moved, and recreated if/when the file is recreated (with read_all + set to True so each line in the recreated file is handled). This mode + should be useful for logs that are rotated regularly. + + If follow is True, then the cursor position for the old file location will + be saved, the watch for the old file location will be removed, a new watch + for the new file location will be created, and only new lines added after + the previous cursor position will be handled. This mode should be useful + for user files that may be moved or renamed as they are being edited. + + If read_all is False (the default), then the file cursor will be set to the + end of the file and only new lines added after the watch is created will be + handled. This should be useful when you are only interested in lines that + are added to an already existing file while it is watched and you are not + interested in the contents of the file before it is watched. + + If read_all is True, then each line in the file, starting from the + beginning of the file, is handled. This should be useful when you wish to + fully process a file once it is created. + + Note that while the watch events are serialized in a queue, this code does + not attempt to serialize its own file access with locks, so a situation + where one file is quickly created and/or updated may trigger race + conditions and therefore unpredictable behavior. + """ + + def __init__( + self, + path, + handler, + follow=False, + read_all=False, + observer=None, + logger=None, + fd=None, + ): + if logger is None: + raise Exception("SingleFileTail was initialized without a logger") + + self._path = None + self.fd = fd + self.handler = handler + self.follow = follow + self.read_all = read_all + self.buffer = "" + self.observer = observer or Observer() + self.logger = logger + self.watch = None + self.parent_watch = None + + if path: + self.set_path(path) + self.open() + + def get_path(self): + return self._path + + # Set all of these when the path updates + def set_path(self, new_path): + self.logger.debug(f"Setting path to {new_path}") + self._path = pathlib.Path(new_path) + self.abs_path = self._path.absolute().resolve() + self.parent_dir = self.abs_path.parent + + path = property(get_path, set_path) + + def get_event_src_path(self, event): + return pathlib.Path(event.src_path).absolute().resolve() + + def read_chunk(self, fd, chunk_size=1024): + self.logger.debug("Reading chunk") + # Buffer 1024 bytes at a time + try: + buffer = os.read(fd, chunk_size) + except (OSError, FileNotFoundError): + buffer = b"" + else: + self.logger.debug("Read chunk") + + # If the 1024 bytes cuts the line off in the middle of a multi-byte + # utf-8 character then decoding will raise an UnicodeDecodeError. + try: + buffer = buffer.decode(encoding="utf8") + except UnicodeDecodeError as e: + # Grab the first few bytes of the partial character + # e.start is the first byte of the decoding issue + first_byte_of_partial_character = buffer[e.start] + number_of_bytes_read_so_far = e.end - e.start + self.logger.debug(f"Read {number_of_bytes_read_so_far}") + + # Try to read the remainder of the character + # You could replace these conditionals with bit math, but that's a + # lot more difficult to read + if first_byte_of_partial_character & 0xF0 == 0xC0: + char_length = 2 + elif first_byte_of_partial_character & 0xF0 == 0xE0: + char_length = 3 + elif first_byte_of_partial_character & 0xF0 == 0xF0: + char_length = 4 + else: + # We could have run into an issue besides reading a partial + # character, so raise that exception + raise e + + number_of_bytes_to_read = char_length - number_of_bytes_read_so_far + + self.logger.debug(f"Reading {number_of_bytes_to_read} more bytes") + buff = os.read(fd, number_of_bytes_to_read) + if len(buff) == number_of_bytes_to_read: + buffer += buff + return buffer.decode(encoding="utf8") + + # If we did not successfully read a complete character, there's + # nothing else we can really do but reraise the exception + raise e + else: + return buffer + + def read(self, event=None): + self.logger.debug("Reading file") + while True: + # Read a chunk of bytes + buff = self.read_chunk(self.fd) + + if not buff: + return + + # Append to previous buffer + if self.buffer: + self.logger.debug(f"Appending to existing buffer: '{self.buffer}'") + buff = self.buffer + buff + self.buffer = "" + + lines = buff.splitlines(True) + # If the last character of the last line is not a newline + if ( + lines and lines[-1] and lines[-1][-1] != "\n" + ): # Incomplete line in the buffer + self.logger.debug(f"Saving partial line in the buffer: '{lines[-1]}'") + self.buffer = lines[-1] # Save the last line fragment + lines = lines[:-1] + + for line in lines: + self.logger.debug(f"Passing line to callback: '{line[:-1]}'") + self.handler(self.path, line[:-1]) + + def reopen_and_read(self, event=None, skip_to_end=False): + # Directory watches will fire events for unrelated files + # Ignore all events except those for our path + if event and self.get_event_src_path(event) != self.abs_path: + self.logger.debug( + f"Ignoring event for non-tracked file: '{event.src_path}'" + ) + return + + # Guard against this being called twice - happens sometimes with inotify + if self.fd: + # Save our current position into the file (this is a little wonky) + pos = os.lseek(self.fd, 0, os.SEEK_CUR) + self.logger.debug(f"Saving position ({pos}) into file {self.abs_path}") + + # The file was moved and not recreated + if not self.follow: + # If we aren't following then don't reopen the file + # When the file is created again that will be handled by + # open_and_read + # But we do make sure to keep the parent file watch around to + # listen to created events + self.close(event=event, emit_remaining=True, end_parent_watch=False) + return + else: + # If we are following the file, don't emit the remainder of the + # last line + self.close(event=event, emit_remaining=False) + + # Use the file's new location + self.path = event.dest_path + # Seek to where we left off + self.open(event=event, seek_to=pos) + self.read(event=event) + + def open_and_read(self, event=None, seek_to=None): + # Directory watches will fire events for unrelated files + # Ignore all events except those for our path + if event and self.get_event_src_path(event) != self.abs_path: + self.logger.debug(f"Ignoring event for non-tailed file: '{event.src_path}'") + return + + self.read_all = True + + self.open(event=event, seek_to=seek_to) + self.read(event=event) + + def open(self, event=None, seek_to=None): + # Use self.watch as a guard + if not self.watch: + self.logger.debug(f"Opening file '{self.path}'") + try: + self.stat = os.stat(self.path) + except FileNotFoundError: + # If the file doesn't exist when we are asked to monitor it, set + # this flag so we read it all if/when it does appear + self.logger.debug("File does not yet exist, setting read_all=True") + self.read_all = True + else: + self.fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK) + + if self.read_all or seek_to == "start": + self.logger.debug("Seeking to start") + os.lseek(self.fd, 0, os.SEEK_SET) + + if not self.read_all or seek_to == "end": + self.logger.debug("Seeking to end") + os.lseek(self.fd, 0, os.SEEK_END) + + file_event_handler = EventHandler( + callbacks={ + "created": self.open, + "deleted": self.close, + "modified": self.read, + "moved": self.reopen_and_read, + } + ) + + self.logger.debug(f"Scheduling watch on file: '{self.path}'") + self.watch = self.observer.schedule(file_event_handler, self.path) + self.logger.debug(f"Scheduled watch on file: '{self.path}'") + + # Avoid watching this twice + self.logger.debug(f"Parent watch: {self.parent_watch}") + if not self.parent_watch: + dir_event_handler = EventHandler( + callbacks={ + "created": self.open_and_read, + "moved": self.reopen_and_read, + } + ) + + self.logger.debug( + f"Scheduling watch on parent directory: '{self.parent_dir}'" + ) + self.parent_watch = self.observer.schedule( + dir_event_handler, self.parent_dir + ) + self.logger.debug( + f"Scheduled watch on parent directory: '{self.parent_dir}'" + ) + + def close(self, event=None, emit_remaining=True, end_parent_watch=True): + self.logger.debug(f"Closing single file tail on '{self.path}'") + # Reset the guard + if self.buffer and emit_remaining: + self.logger.debug(f"Emitting remaining partial line: '{self.buffer}'") + self.handler(self.path, self.buffer) + self.buffer = "" + if self.parent_watch and end_parent_watch: + self.logger.debug(f"Unscheduling parent directory watch: {self.parent_dir}") + self.observer.unschedule(self.parent_watch) + self.parent_watch = None + self.logger.debug(f"Unscheduled parent directory watch: {self.parent_dir}") + if self.watch: + self.logger.debug(f"Unscheduling file watch: {self._path}") + self.observer.unschedule(self.watch) + self.watch = None + self.logger.debug(f"Unscheduled file watch: {self._path}") + # Unscheduling a watch on a file descriptor requires a non-None fd, so + # we close the fd and set self.fd to None after unscheduling the file + # watch + if self.fd: + self.logger.debug(f"Closing file handle {self.fd}") + os.close(self.fd) + self.fd = None + self.logger.debug("Closed file handle") + + +class TailManager(object): + def __init__(self, *args, logger=None, **kwargs): + if logger is None: + raise Exception("TailManager was initialized without a logger") + + self.logger = logger + self.started = False + self.tails = {} + self.observer = Observer() + + def tail_file(self, path, handler, follow=False, read_all=False): + if handler not in self.tails.setdefault(path, {}): + self.logger.debug(f"Tailing single file: {path}") + sft = SingleFileTail( + path, + handler, + follow=follow, + read_all=read_all, + observer=self.observer, + logger=self.logger, + ) + self.tails[path][handler] = sft + + def stop_tailing_file(self, path, handler): + self.logger.debug(f"Stopping tail on {path}") + tailed_file = self.tails.get(path, {}).pop(handler) + tailed_file.close() + # Amortize some cleanup while we're at it + if not self.tails.get(path): + self.tails.pop(path) + + def run(self): + self.logger.debug("Running TailManager") + while True: + time.sleep(1) + + def start(self): + if self.tails and not self.started: + self.logger.debug("Starting TailManager") + self.observer.start() + self.logger.debug(f"Started Observer, emitters: {self.observer.emitters}") + self.started = True + + def stop(self): + if self.started: + self.logger.debug("Stopping TailManager") + for handlers in self.tails.values(): + for tailed_file in handlers.values(): + tailed_file.close() + self.observer.stop() + self.observer.join() + self.started = False class FileWatchSensor(Sensor): - def __init__(self, sensor_service, config=None): - super(FileWatchSensor, self).__init__( - sensor_service=sensor_service, config=config - ) - self.log = self._sensor_service.get_logger(__name__) - self.tail = None + def __init__(self, *args, logger=None, **kwargs): + super().__init__(*args, **kwargs) + self.log = logger or self.sensor_service.get_logger(__name__) self.file_ref = {} def setup(self): - self.tail = Tail(filenames=[]) - self.tail.handler = self._handle_line - self.tail.should_run = True + self.tail_manager = TailManager(logger=self.log) + self.tail_manager.start() def run(self): - self.tail.run() + self.tail_manager.run() def cleanup(self): - if self.tail: - self.tail.should_run = False - - try: - self.tail.notifier.stop() - except Exception: - self.log.exception("Unable to stop the tail notifier") + self.tail_manager.stop() def add_trigger(self, trigger): file_path = trigger["parameters"].get("file_path", None) @@ -54,21 +427,52 @@ def add_trigger(self, trigger): self.log.error('Received trigger type without "file_path" field.') return - trigger = trigger.get("ref", None) - - if not trigger: - raise Exception(f"Trigger {trigger} did not contain a ref.") + trigger_ref = trigger.get("ref", None) - # Wait a bit to avoid initialization race in logshipper library - eventlet.sleep(1.0) + if not trigger_ref: + raise Exception(f"Trigger {trigger_ref} did not contain a ref.") - self.tail.add_file(filename=file_path) + self.tail_manager.tail_file(file_path, self._handle_line) self.file_ref[file_path] = trigger - self.log.info(f"Added file '{file_path}' ({trigger}) to watch list.") + self.log.info(f"Added file '{file_path}' ({trigger_ref}) to watch list.") + + self.tail_manager.start() def update_trigger(self, trigger): - pass + file_path = trigger["parameters"].get("file_path", None) + + if not file_path: + self.log.error('Received trigger type without "file_path" field.') + return + + trigger_ref = trigger.get("ref", None) + + if file_path in self.file_ref: + self.log.debug( + f"No update required as file '{file_path}' ({trigger_ref}) already in watch list." + ) + return + + if not trigger_ref: + raise Exception(f"Trigger {trigger_ref} did not contain a ref.") + + for old_file_path, ref in self.file_ref.items(): + if ref == trigger_ref: + self.tail_manager.stop_tailing_file(old_file_path, self._handle_line) + self.file_ref.pop(old_file_path) + + self.tail_manager.tail_file(file_path, self._handle_line) + self.file_ref[file_path] = trigger + + self.log.info( + f"Updated to add file '{file_path}' instead of '{old_file_path}' ({trigger_ref}) in watch list." + ) + break + + if file_path not in self.file_ref: + # Maybe the add_trigger message was missed. + self.add_trigger(trigger) def remove_trigger(self, trigger): file_path = trigger["parameters"].get("file_path", None) @@ -77,7 +481,7 @@ def remove_trigger(self, trigger): self.log.error("Received trigger type without 'file_path' field.") return - self.tail.remove_file(filename=file_path) + self.tail_manager.stop_tailing_file(file_path, self._handle_line) self.file_ref.pop(file_path) self.log.info(f"Removed file '{file_path}' ({trigger}) from watch list.") @@ -92,10 +496,23 @@ def _handle_line(self, file_path, line): trigger = self.file_ref[file_path] payload = { "file_path": file_path, - "file_name": os.path.basename(file_path), + "file_name": pathlib.Path(file_path).name, "line": line, } self.log.debug( f"Sending payload {payload} for trigger {trigger} to sensor_service." ) self.sensor_service.dispatch(trigger=trigger, payload=payload) + + +if __name__ == "__main__": + logger = logging.getLogger(__name__) + tm = TailManager(logger=logger) + tm.tail_file(__file__, handler=print) + tm.start() + + def halt(sig, frame): + tm.stop() + sys.exit(0) + + signal.signal(signal.SIGINT, halt) diff --git a/contrib/linux/sensors/file_watch_sensor.yaml b/contrib/linux/sensors/file_watch_sensor.yaml index ba622a9eb7..0550d3e29d 100644 --- a/contrib/linux/sensors/file_watch_sensor.yaml +++ b/contrib/linux/sensors/file_watch_sensor.yaml @@ -16,6 +16,11 @@ description: "Path to the file to monitor" type: "string" required: true + follow: + description: "Whether or not to follow the file when it moves" + type: "boolean" + required: false + default: false additionalProperties: false # This is the schema of the trigger payload the sensor generates payload_schema: diff --git a/contrib/linux/tests/test_file_watch_sensor.py b/contrib/linux/tests/test_file_watch_sensor.py new file mode 100644 index 0000000000..a0f210b524 --- /dev/null +++ b/contrib/linux/tests/test_file_watch_sensor.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python + +# Copyright 2020 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import pathlib + +# import sys +# import threading +import time + +import eventlet +import mock +from file_watch_sensor import FileWatchSensor + +WAIT_TIME = 1 + +logger = logging.getLogger(__name__) + + +def test_file_watch_sensor(): + mock_sensor_service = mock.MagicMock() + mock_logger = mock.MagicMock() + + filename = "test.txt" + filepath = pathlib.Path(filename).absolute().resolve() + filepath.touch() + + fws = FileWatchSensor( + sensor_service=mock_sensor_service, config={}, logger=mock_logger + ) + + time.sleep(WAIT_TIME) + + fws.setup() + + time.sleep(WAIT_TIME) + + # th = threading.Thread(target=fws.run) + th = eventlet.spawn(fws.run) + th.start() + + time.sleep(WAIT_TIME) + + fws.add_trigger( + { + "id": "asdf.adsfasdf-asdf-asdf-asdfasdfasdf", + "pack": "linux", + "name": "asdf.adsfasdf-asdf-asdf-asdfasdfasdf", + "ref": "linux.asdf.adsfasdf-asdf-asdf-asdfasdfasdf", + "uid": "trigger:linux:asdf.adsfasdf-asdf-asdf-asdfasdfasdf", + "type": "linux.file_watch.line", + "parameters": { + "file_path": filepath, + "follow": True, + }, + } + ) + + time.sleep(WAIT_TIME) + + with open(filepath, "a") as f: + f.write("Added line 1\n") + + time.sleep(WAIT_TIME) + + with open(filepath, "a") as f: + f.write("Added line 2\n") + + time.sleep(WAIT_TIME) + + expected_calls = [ + mock.call( + trigger="linux.asdf.adsfasdf-asdf-asdf-asdfasdfasdf", + payload={ + "file_path": pathlib.PosixPath("/vagrant/contrib/linux/test.txt"), + "file_name": "test.txt", + "line": "Added line 1", + }, + ), + mock.call( + trigger="linux.asdf.adsfasdf-asdf-asdf-asdfasdfasdf", + payload={ + "file_path": pathlib.PosixPath("/vagrant/contrib/linux/test.txt"), + "file_name": "test.txt", + "line": "Added line 2", + }, + ), + ] + mock_sensor_service.dispatch.assert_has_calls(expected_calls, any_order=False) + print(mock_logger.method_calls) + # th.join() + + fws.cleanup() + + os.unlink(filepath) + + +def test_file_watch_sensor_without_trigger_filepath(): + mock_sensor_service = mock.MagicMock() + mock_logger = mock.MagicMock() + + filename = "test.txt" + filepath = pathlib.Path(filename).absolute().resolve() + filepath.touch() + + fws = FileWatchSensor( + sensor_service=mock_sensor_service, config={}, logger=mock_logger + ) + + time.sleep(WAIT_TIME) + + fws.setup() + + time.sleep(WAIT_TIME) + + # th = threading.Thread(target=fws.run) + th = eventlet.spawn(fws.run) + th.start() + + time.sleep(WAIT_TIME) + + fws.add_trigger( + { + "id": "asdf.adsfasdf-asdf-asdf-asdfasdfasdf", + "pack": "linux", + "name": "asdf.adsfasdf-asdf-asdf-asdfasdfasdf", + "ref": "linux.asdf.adsfasdf-asdf-asdf-asdfasdfasdf", + "uid": "trigger:linux:asdf.adsfasdf-asdf-asdf-asdfasdfasdf", + "type": "linux.file_watch.line", + "parameters": { + # 'file_path': filepath, + "follow": True, + }, + } + ) + + +def test_file_watch_sensor_without_trigger_ref(): + mock_sensor_service = mock.MagicMock() + mock_logger = mock.MagicMock() + + filename = "test.txt" + filepath = pathlib.Path(filename).absolute().resolve() + filepath.touch() + + fws = FileWatchSensor( + sensor_service=mock_sensor_service, config={}, logger=mock_logger + ) + + time.sleep(WAIT_TIME) + + fws.setup() + + time.sleep(WAIT_TIME) + + # th = threading.Thread(target=fws.run) + th = eventlet.spawn(fws.run) + th.start() + + time.sleep(WAIT_TIME) + + try: + fws.add_trigger( + { + "id": "asdf.adsfasdf-asdf-asdf-asdfasdfasdf", + "pack": "linux", + "name": "asdf.adsfasdf-asdf-asdf-asdfasdfasdf", + # 'ref': 'linux.asdf.adsfasdf-asdf-asdf-asdfasdfasdf', + "uid": "trigger:linux:asdf.adsfasdf-asdf-asdf-asdfasdfasdf", + "type": "linux.file_watch.line", + "parameters": { + "file_path": filepath, + "follow": True, + }, + } + ) + except Exception as e: + # Make sure we ignore the right exception + if "did not contain a ref" not in str(e): + raise e + else: + raise AssertionError( + "FileWatchSensor.add_trigger() did not raise an " + "exception when passed a trigger without a ref" + ) + finally: + os.unlink(filepath) + + +if __name__ == "__main__": + # logger.setLevel(logging.DEBUG) + + # handler = logging.StreamHandler(sys.stderr) + # handler.setLevel(logging.DEBUG) + # formatter = logging.Formatter('%(name)s: %(levelname)s: %(message)s') + + # logger.addHandler(handler) + + test_file_watch_sensor() + test_file_watch_sensor_without_trigger_filepath() + test_file_watch_sensor_without_trigger_ref() diff --git a/contrib/linux/tests/test_single_file_tail.py b/contrib/linux/tests/test_single_file_tail.py new file mode 100644 index 0000000000..c8d573e717 --- /dev/null +++ b/contrib/linux/tests/test_single_file_tail.py @@ -0,0 +1,637 @@ +#!/usr/bin/env python + +# Copyright 2020 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import logging +import os +import pathlib +import time + +from watchdog.observers import Observer + +from file_watch_sensor import SingleFileTail + +WAIT_TIME = 1 + +logger = logging.getLogger(__name__) + + +def test_read_chunk_over_multibyte_character_boundary(): + wide_characters = [None, None, "\u0130", "\u2050", "\U00088080"] + for n in range(2, 5): + yield from _gen_n_byte_character_tests(1024, n, wide_characters[n]) + + for n in range(2, 5): + yield from _gen_n_byte_character_tests(2048, n, wide_characters[n]) + + for n in range(2, 5): + yield from _gen_n_byte_character_tests(4096, n, wide_characters[n]) + + +def _gen_n_byte_character_tests(chunk_size, n, char): + for length in range(chunk_size, chunk_size + n + 1): + yield _run_n_byte_character_tests, chunk_size, n, length, char + + +def _run_n_byte_character_tests(chunk_size, n, length, char): + filename = f"chunk_boundary_{n}u_{length}.txt" + + with open(filename, "wb+") as f: + # Write out a file that is of the given length + # aaaaaa...aaa\x82 + f.write(("a" * (length - n) + char).encode("utf-8")) + + fd = os.open(filename, os.O_RDONLY) + + sft = SingleFileTail(None, None, fd=fd, logger=logger) + + result = sft.read_chunk(fd, chunk_size=chunk_size) + + os.close(fd) + os.unlink(filename) + + if length < chunk_size + n: + assert result == ("a" * (length - n) + char) + else: + assert result == ("a" * chunk_size) + + +def test_read_chunk_with_bad_utf8_character(): + filename = "bad_utf8_character.txt" + + utf8_str = "\U00088080" + utf8_bytes = utf8_str.encode("utf-8") + chopped_utf8_bytes = utf8_bytes[:-1] + + with open(filename, "wb+") as f: + # Write out a file that is of the given length + # aaaaaa...aaa\x82 + f.write(b"a") + f.write(chopped_utf8_bytes) + f.write(b"a") + + fd = os.open(filename, os.O_RDONLY) + + sft = SingleFileTail(None, None, fd=fd, logger=logger) + + err = None + try: + sft.read_chunk(fd) + except Exception as e: + err = e + finally: + assert err is not None + assert isinstance(err, UnicodeDecodeError) + + os.close(fd) + os.unlink(filename) + + +def test_read_chunk_from_nonexistent_file(): + filename = "nonexistent_file.txt" + + with open(filename, "w+") as f: + f.write("This file will not exist in a few moments") + + fd = os.open(filename, os.O_RDONLY) + os.close(fd) + + os.unlink(filename) + + sft = SingleFileTail(None, None, fd=fd, logger=logger) + + assert sft.read_chunk(fd=fd) == "" + + +# Helper function + + +def append_to_list(list_to_append, path, element): + logger.debug(f"Appending ({path}):\n{element} to {list_to_append}") + list_to_append.append(element) + + +def test_initialize_without_logger(): + try: + SingleFileTail(None, None, fd=None) + except Exception as e: + expected_message = "SingleFileTail was initialized without a logger" + exception_message = getattr(e, "message", e.args[0]) + if exception_message != expected_message: + raise e + else: + raise AssertionError( + "SingleFileTail initialized fine without a " "logger parameter" + ) + + +def test_append_to_watched_file_with_absolute_path(): + tailed_filename = (pathlib.Path.cwd() / pathlib.Path("tailed_file.txt")).resolve() + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + observer = Observer() + + sft = SingleFileTail( + tailed_filename, append_to_list_partial, observer=observer, logger=logger + ) + + observer.start() + time.sleep(WAIT_TIME) + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + sft.close() + + observer.stop() + + os.unlink(tailed_filename) + + +def test_append_to_watched_file_with_relative_path(): + tailed_filename = pathlib.Path("tailed_file.txt") + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + observer = Observer() + + sft = SingleFileTail( + tailed_filename, append_to_list_partial, observer=observer, logger=logger + ) + + observer.start() + time.sleep(WAIT_TIME) + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + sft.close() + + observer.stop() + + os.unlink(tailed_filename) + + +def test_append_to_watched_file_observer_start_first(): + tailed_filename = pathlib.Path("tailed_file.txt") + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + observer = Observer() + observer.start() + + sft = SingleFileTail( + tailed_filename, append_to_list_partial, observer=observer, logger=logger + ) + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + sft.close() + + observer.stop() + + os.unlink(tailed_filename) + + +def test_not_watched_file(): + tailed_filename = "tailed_file.txt" + not_tailed_filename = "not_tailed_file.txt" + new_not_tailed_filename = not_tailed_filename.replace(".txt", "_moved.txt") + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + observer = Observer() + + sft = SingleFileTail( + tailed_filename, append_to_list_partial, observer=observer, logger=logger + ) + + observer.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + with open(not_tailed_filename, "a+") as f: + f.write("Added line 1 - not tailed\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + os.replace(not_tailed_filename, new_not_tailed_filename) + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + sft.close() + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + observer.stop() + + os.unlink(tailed_filename) + os.unlink(new_not_tailed_filename) + + +def test_watch_nonexistent_file(): + tailed_filename = "tailed_file.txt" + + if os.path.exists(tailed_filename): + os.unlink(tailed_filename) + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + observer = Observer() + + sft = SingleFileTail( + tailed_filename, append_to_list_partial, observer=observer, logger=logger + ) + + observer.start() + + assert appended_lines == [] + + assert not os.path.exists(tailed_filename) + + with open(tailed_filename, "w+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert os.path.exists(tailed_filename) + + assert appended_lines == [ + "Added line 1", + ] + + sft.close() + time.sleep(WAIT_TIME) + + observer.stop() + + os.unlink(tailed_filename) + + +def test_follow_watched_file_moved(): + tailed_filename = "tailed_file_to_move.txt" + new_filename = tailed_filename.replace("_to_move.txt", "_moved.txt") + + if os.path.exists(new_filename): + os.unlink(new_filename) + if os.path.exists(tailed_filename): + os.unlink(tailed_filename) + + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + observer = Observer() + + sft = SingleFileTail( + tailed_filename, + append_to_list_partial, + follow=True, + observer=observer, + logger=logger, + ) + + observer.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + with open(tailed_filename, "a+") as f: + f.write("Added line 2") # No newline + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + os.replace(tailed_filename, new_filename) + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + with open(new_filename, "a+") as f: + f.write(" - end of line 2\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2 - end of line 2", + ] + + with open(tailed_filename, "w+") as f: + f.write("New file - text line 1\n") + f.write("New file - text line 2\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2 - end of line 2", + ] + + sft.close() + + observer.stop() + + os.unlink(new_filename) + os.unlink(tailed_filename) + + +def test_not_followed_watched_file_moved(): + tailed_filename = "tailed_file_to_move.txt" + new_filename = tailed_filename.replace("_to_move.txt", "_moved.txt") + + if os.path.exists(new_filename): + os.unlink(new_filename) + if os.path.exists(tailed_filename): + os.unlink(tailed_filename) + + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + observer = Observer() + + sft = SingleFileTail( + tailed_filename, + append_to_list_partial, + follow=False, + observer=observer, + logger=logger, + ) + + observer.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + with open(tailed_filename, "a+") as f: + f.write("Added line 2") # No newline + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + os.rename(tailed_filename, new_filename) + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + ] + + with open(new_filename, "a+") as f: + f.write(" - end of line 2\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + ] + + assert not os.path.exists(tailed_filename) + + with open(tailed_filename, "w+") as f: + f.write("Recreated file - text line 1\n") + f.write("Recreated file - text line 2\n") + time.sleep(WAIT_TIME) + + assert os.path.exists(tailed_filename) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + "Recreated file - text line 1", + "Recreated file - text line 2", + ] + + sft.close() + + observer.stop() + + os.unlink(new_filename) + os.unlink(tailed_filename) + + +def test_non_watched_file_moved(): + tailed_filename = "tailed_file_to_move.txt" + not_tailed_filename = f"not_{tailed_filename}" + new_not_tailed_filename = not_tailed_filename.replace("_to_move.txt", "_moved.txt") + + if os.path.exists(not_tailed_filename): + os.unlink(not_tailed_filename) + if os.path.exists(new_not_tailed_filename): + os.unlink(new_not_tailed_filename) + if os.path.exists(tailed_filename): + os.unlink(tailed_filename) + + with open(not_tailed_filename, "w+") as f: + f.write("Text here will not be monitored\n") + + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + observer = Observer() + + sft = SingleFileTail( + tailed_filename, append_to_list_partial, observer=observer, logger=logger + ) + + observer.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + os.replace(not_tailed_filename, new_not_tailed_filename) + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + sft.close() + + observer.stop() + + os.unlink(new_not_tailed_filename) + os.unlink(tailed_filename) + + +def test_watched_file_deleted(): + tailed_filename = "tailed_file_deleted.txt" + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + observer = Observer() + + sft = SingleFileTail( + tailed_filename, append_to_list_partial, observer=observer, logger=logger + ) + + observer.start() + time.sleep(WAIT_TIME) + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + os.unlink(tailed_filename) + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + sft.close() + + observer.stop() + + +def test_watched_file_immediately_deleted(): + tailed_filename = "tailed_file_deleted.txt" + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + observer = Observer() + + sft = SingleFileTail( + tailed_filename, append_to_list_partial, observer=observer, logger=logger + ) + + observer.start() + time.sleep(WAIT_TIME) + + os.unlink(tailed_filename) + + assert appended_lines == [] + + sft.close() + + observer.stop() + + +if __name__ == "__main__": + import sys + + logger.setLevel(logging.DEBUG) + + handler = logging.StreamHandler(sys.stderr) + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter("%(name)s: %(levelname)s: %(message)s") + + logger.addHandler(handler) + + test_read_chunk_over_multibyte_character_boundary() + test_read_chunk_with_bad_utf8_character() + test_read_chunk_from_nonexistent_file() + test_initialize_without_logger() + test_append_to_watched_file_with_absolute_path() + test_append_to_watched_file_with_relative_path() + test_append_to_watched_file_observer_start_first() + test_not_watched_file() + test_watch_nonexistent_file() + test_follow_watched_file_moved() + test_not_followed_watched_file_moved() + test_non_watched_file_moved() + test_watched_file_deleted() + test_watched_file_immediately_deleted() diff --git a/contrib/linux/tests/test_tail_manager.py b/contrib/linux/tests/test_tail_manager.py new file mode 100644 index 0000000000..b74a886a88 --- /dev/null +++ b/contrib/linux/tests/test_tail_manager.py @@ -0,0 +1,660 @@ +#!/usr/bin/env python + +# Copyright 2020 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import logging +import os +import pathlib +import time + +from file_watch_sensor import TailManager + +WAIT_TIME = 1 + +logger = logging.getLogger(__name__) + + +# Helper function + + +def append_to_list(list_to_append, path, element): + logger.debug(f"Appending ({path}):\n{element} to {list_to_append}") + list_to_append.append(element) + + +def test_initialized_without_logger(): + try: + TailManager() + except Exception as e: + expected_message = "TailManager was initialized without a logger" + exc_msg = getattr(e, "message", e.args[0]) + if exc_msg != expected_message: + raise e + else: + raise AssertionError( + "TailManager initialized fine without a " "logger parameter" + ) + + +def test_append_to_watched_file_with_absolute_path(): + tailed_filename = (pathlib.Path.cwd() / pathlib.Path("tailed_file.txt")).resolve() + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + tm = TailManager(logger=logger) + tm.tail_file(tailed_filename, handler=append_to_list_partial) + tm.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + tm.stop_tailing_file(tailed_filename, handler=append_to_list_partial) + + tm.stop() + + os.unlink(tailed_filename) + + +def test_not_watched_file(): + tailed_filename = "tailed_file.txt" + not_tailed_filename = "not_tailed_file.txt" + new_not_tailed_filename = not_tailed_filename.replace(".txt", "_moved.txt") + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + tm = TailManager(logger=logger) + tm.tail_file(tailed_filename, handler=append_to_list_partial) + tm.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + with open(not_tailed_filename, "a+") as f: + f.write("Added line 1 - not tailed\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + os.replace(not_tailed_filename, new_not_tailed_filename) + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + tm.stop_tailing_file(tailed_filename, handler=append_to_list_partial) + + tm.stop() + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + os.unlink(tailed_filename) + os.unlink(new_not_tailed_filename) + + +def test_watch_nonexistent_file(): + tailed_filename = "tailed_file.txt" + + if os.path.exists(tailed_filename): + os.unlink(tailed_filename) + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + tm = TailManager(logger=logger) + tm.tail_file(tailed_filename, handler=append_to_list_partial) + tm.start() + time.sleep(WAIT_TIME) + + assert appended_lines == [] + + with open(tailed_filename, "w+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + tm.stop_tailing_file(tailed_filename, handler=append_to_list_partial) + + tm.stop() + time.sleep(WAIT_TIME) + + os.unlink(tailed_filename) + + +def test_follow_watched_file_moved(): + tailed_filename = "tailed_file_to_move.txt" + new_filename = tailed_filename.replace("_to_move.txt", "_moved.txt") + + if os.path.exists(new_filename): + os.unlink(new_filename) + if os.path.exists(tailed_filename): + os.unlink(tailed_filename) + + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + tm = TailManager(logger=logger) + tm.tail_file(tailed_filename, handler=append_to_list_partial, follow=True) + tm.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + with open(tailed_filename, "a+") as f: + f.write("Added line 2") # No newline + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + os.replace(tailed_filename, new_filename) + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + with open(new_filename, "a+") as f: + f.write(" - end of line 2\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2 - end of line 2", + ] + + with open(tailed_filename, "w+") as f: + f.write("New file - text line 1\n") + f.write("New file - text line 2\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2 - end of line 2", + ] + + tm.stop_tailing_file(tailed_filename, handler=append_to_list_partial) + + tm.stop() + + os.unlink(new_filename) + os.unlink(tailed_filename) + + +def test_not_followed_watched_file_moved(): + tailed_filename = "tailed_file_to_move.txt" + new_filename = tailed_filename.replace("_to_move.txt", "_moved.txt") + + if os.path.exists(new_filename): + os.unlink(new_filename) + if os.path.exists(tailed_filename): + os.unlink(tailed_filename) + + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + tm = TailManager(logger=logger) + tm.tail_file(tailed_filename, handler=append_to_list_partial, follow=False) + tm.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + with open(tailed_filename, "a+") as f: + f.write("Added line 2") # No newline + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + os.replace(tailed_filename, new_filename) + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + ] + + with open(new_filename, "a+") as f: + f.write(" - end of line 2\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + ] + + with open(tailed_filename, "w+") as f: + f.write("Recreated file - text line 1\n") + f.write("Recreated file - text line 2\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + "Recreated file - text line 1", + "Recreated file - text line 2", + ] + + tm.stop_tailing_file(tailed_filename, handler=append_to_list_partial) + + tm.stop() + + os.unlink(new_filename) + os.unlink(tailed_filename) + + +def test_non_watched_file_moved(): + tailed_filename = "tailed_file_to_move.txt" + not_tailed_filename = f"not_{tailed_filename}" + new_not_tailed_filename = not_tailed_filename.replace("_to_move.txt", "_moved.txt") + + if os.path.exists(not_tailed_filename): + os.unlink(not_tailed_filename) + if os.path.exists(new_not_tailed_filename): + os.unlink(new_not_tailed_filename) + if os.path.exists(tailed_filename): + os.unlink(tailed_filename) + + with open(not_tailed_filename, "w+") as f: + f.write("Text here will not be monitored\n") + + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + tm = TailManager(logger=logger) + tm.tail_file(tailed_filename, handler=append_to_list_partial) + tm.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + os.replace(not_tailed_filename, new_not_tailed_filename) + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + tm.stop_tailing_file(tailed_filename, handler=append_to_list_partial) + + tm.stop() + + os.unlink(new_not_tailed_filename) + os.unlink(tailed_filename) + + +def test_watched_file_deleted(): + tailed_filename = "tailed_file_deleted.txt" + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + tm = TailManager(logger=logger) + tm.tail_file(tailed_filename, handler=append_to_list_partial) + tm.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + os.unlink(tailed_filename) + + assert appended_lines == [ + "Added line 1", + ] + + tm.stop() + + +def test_watched_file_immediately_deleted(): + tailed_filename = "tailed_file_deleted.txt" + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + tm = TailManager(logger=logger) + tm.tail_file(tailed_filename, handler=append_to_list_partial) + tm.start() + + os.unlink(tailed_filename) + + tm.stop() + + +def test_append_to_watched_file(): + tailed_filename = "tailed_file.txt" + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + tm = TailManager(logger=logger) + tm.tail_file(tailed_filename, handler=append_to_list_partial) + tm.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + with open(tailed_filename, "a+") as f: + f.write("Added line 2\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + ] + + with open(tailed_filename, "a+") as f: + f.write("Start of added partial line 1") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + ] + + with open(tailed_filename, "a+") as f: + f.write(" - finished partial line 1\nStart of added partial line 2") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + "Start of added partial line 1 - finished partial line 1", + ] + + with open(tailed_filename, "a+") as f: + f.write(" - finished partial line 2\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + "Start of added partial line 1 - finished partial line 1", + "Start of added partial line 2 - finished partial line 2", + ] + + with open(tailed_filename, "a+") as f: + f.write("Final line without a newline") + time.sleep(WAIT_TIME) + + tm.stop_tailing_file(tailed_filename, handler=append_to_list_partial) + + tm.stop() + + time.sleep(WAIT_TIME) + assert appended_lines == [ + "Added line 1", + "Added line 2", + "Start of added partial line 1 - finished partial line 1", + "Start of added partial line 2 - finished partial line 2", + "Final line without a newline", + ] + + os.unlink(tailed_filename) + + +def test_tail_file_twice(): + tailed_filename = "tailed_file.txt" + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + tm = TailManager(logger=logger) + tm.tail_file(tailed_filename, handler=append_to_list_partial) + tm.tail_file(tailed_filename, handler=append_to_list_partial) + tm.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + with open(tailed_filename, "a+") as f: + f.write("Added line 2\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + ] + + with open(tailed_filename, "a+") as f: + f.write("Start of added partial line 1") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + ] + + with open(tailed_filename, "a+") as f: + f.write(" - finished partial line 1\nStart of added partial line 2") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + "Start of added partial line 1 - finished partial line 1", + ] + + with open(tailed_filename, "a+") as f: + f.write(" - finished partial line 2\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + "Added line 2", + "Start of added partial line 1 - finished partial line 1", + "Start of added partial line 2 - finished partial line 2", + ] + + with open(tailed_filename, "a+") as f: + f.write("Final line without a newline") + time.sleep(WAIT_TIME) + + tm.stop_tailing_file(tailed_filename, handler=append_to_list_partial) + + tm.stop() + + time.sleep(WAIT_TIME) + assert appended_lines == [ + "Added line 1", + "Added line 2", + "Start of added partial line 1 - finished partial line 1", + "Start of added partial line 2 - finished partial line 2", + "Final line without a newline", + ] + + os.unlink(tailed_filename) + + +def test_stop(): + tailed_filename = "tailed_file_stop.txt" + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + tm = TailManager(logger=logger) + tm.tail_file(tailed_filename, handler=append_to_list_partial) + tm.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + with open(tailed_filename, "a+") as f: + f.write("Final line without a newline") + time.sleep(WAIT_TIME) + + tm.stop() + + time.sleep(WAIT_TIME) + assert appended_lines == [ + "Added line 1", + "Final line without a newline", + ] + + os.unlink(tailed_filename) + + +def test_stop_twice(): + tailed_filename = "tailed_file_stop.txt" + with open(tailed_filename, "w+") as f: + f.write("Preexisting text line 1\n") + f.write("Preexisting text line 2\n") + + appended_lines = [] + append_to_list_partial = functools.partial(append_to_list, appended_lines) + + tm = TailManager(logger=logger) + tm.tail_file(tailed_filename, handler=append_to_list_partial) + tm.start() + + with open(tailed_filename, "a+") as f: + f.write("Added line 1\n") + time.sleep(WAIT_TIME) + + assert appended_lines == [ + "Added line 1", + ] + + with open(tailed_filename, "a+") as f: + f.write("Final line without a newline") + time.sleep(WAIT_TIME) + + tm.stop() + tm.stop() + + time.sleep(WAIT_TIME) + assert appended_lines == [ + "Added line 1", + "Final line without a newline", + ] + + os.unlink(tailed_filename) + + +if __name__ == "__main__": + import sys + + logger.setLevel(logging.DEBUG) + + handler = logging.StreamHandler(sys.stderr) + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter("%(name)s: %(levelname)s: %(message)s") + + logger.addHandler(handler) + + test_initialized_without_logger() + test_append_to_watched_file_with_absolute_path() + test_not_watched_file() + test_watch_nonexistent_file() + test_follow_watched_file_moved() + test_not_followed_watched_file_moved() + test_non_watched_file_moved() + test_watched_file_deleted() + test_watched_file_immediately_deleted() + test_append_to_watched_file() + test_tail_file_twice() + test_stop() + test_stop_twice() diff --git a/fixed-requirements.txt b/fixed-requirements.txt index 389105c72f..aefa903f15 100644 --- a/fixed-requirements.txt +++ b/fixed-requirements.txt @@ -42,7 +42,6 @@ oslo.utils<5.0,>=4.0.0 paramiko==2.11.0 passlib==1.7.4 prompt-toolkit==1.0.15 -pyinotify==0.9.6 ; platform_system=="Linux" pymongo==3.11.3 pyparsing<3 zstandard==0.15.2 diff --git a/lockfiles/st2.lock b/lockfiles/st2.lock index 0d618ae884..005676efab 100644 --- a/lockfiles/st2.lock +++ b/lockfiles/st2.lock @@ -29,7 +29,6 @@ // "jsonschema<4,>=3", // "kombu", // "lockfile", -// "logshipper@ git+https://github.com/StackStorm/logshipper.git@stackstorm_patched ; platform_system == \"Linux\"", // "mail-parser==3.15.0", // "mock", // "mongoengine", @@ -44,7 +43,6 @@ // "prettytable", // "prompt-toolkit<2", // "psutil", -// "pyinotify<=0.10,>=0.9.5; platform_system == \"Linux\"", // "pymongo", // "pyrabbit", // "pysocks", @@ -74,6 +72,7 @@ // "ujson", // "unittest2", // "virtualenv", +// "watchdog", // "webob", // "webtest", // "wheel", @@ -1693,29 +1692,6 @@ "requires_python": null, "version": "0.12.2" }, - { - "artifacts": [ - { - "algorithm": "sha256", - "hash": "b0fb343d27ee21201dc0002bd48cd7591dfe3cb17914b56326f99344bfb89dc9", - "url": "git+https://github.com/StackStorm/logshipper.git@stackstorm_patched" - } - ], - "project_name": "logshipper", - "requires_dists": [ - "eventlet", - "pika", - "pyinotify", - "python-dateutil", - "python-statsd", - "pytz", - "pyyaml", - "requests", - "six" - ], - "requires_python": null, - "version": "0.0.0" - }, { "artifacts": [ { @@ -2724,19 +2700,6 @@ "requires_python": "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.7", "version": "2.21" }, - { - "artifacts": [ - { - "algorithm": "sha256", - "hash": "9c998a5d7606ca835065cdabc013ae6c66eb9ea76a00a1e3bc6e0cfe2b4f71f4", - "url": "https://files.pythonhosted.org/packages/e3/c0/fd5b18dde17c1249658521f69598f3252f11d9d7a980c5be8619970646e1/pyinotify-0.9.6.tar.gz" - } - ], - "project_name": "pyinotify", - "requires_dists": [], - "requires_python": null, - "version": "0.9.6" - }, { "artifacts": [ { @@ -4452,6 +4415,96 @@ "requires_python": ">=3.7.0", "version": "2.1.2" }, + { + "artifacts": [ + { + "algorithm": "sha256", + "hash": "d429c2430c93b7903914e4db9a966c7f2b068dd2ebdd2fa9b9ce094c7d459f33", + "url": "https://files.pythonhosted.org/packages/2b/f0/456948b865ab259784f774154e7d65844fa9757522fdb11533fbf8ae7aca/watchdog-3.0.0-py3-none-manylinux2014_x86_64.whl" + }, + { + "algorithm": "sha256", + "hash": "c07253088265c363d1ddf4b3cdb808d59a0468ecd017770ed716991620b8f77a", + "url": "https://files.pythonhosted.org/packages/21/72/46fd174352cd88b9157ade77e3b8835125d4b1e5186fc7f1e8c44664e029/watchdog-3.0.0-py3-none-manylinux2014_i686.whl" + }, + { + "algorithm": "sha256", + "hash": "25f70b4aa53bd743729c7475d7ec41093a580528b100e9a8c5b5efe8899592fc", + "url": "https://files.pythonhosted.org/packages/2e/54/48527f3aea4f7ed331072352fee034a7f3d6ec7a2ed873681738b2586498/watchdog-3.0.0-cp38-cp38-macosx_10_9_x86_64.whl" + }, + { + "algorithm": "sha256", + "hash": "c9d8c8ec7efb887333cf71e328e39cffbf771d8f8f95d308ea4125bf5f90ba64", + "url": "https://files.pythonhosted.org/packages/30/65/9e36a3c821d47a22e54a8fc73681586b2d26e82d24ea3af63acf2ef78f97/watchdog-3.0.0-pp39-pypy39_pp73-macosx_10_9_x86_64.whl" + }, + { + "algorithm": "sha256", + "hash": "ba07e92756c97e3aca0912b5cbc4e5ad802f4557212788e72a72a47ff376950d", + "url": "https://files.pythonhosted.org/packages/40/1b/4e6d3e0f587587931f590531b4ed08070d71a9efb35541d792a68d8ee593/watchdog-3.0.0-py3-none-manylinux2014_s390x.whl" + }, + { + "algorithm": "sha256", + "hash": "233b5817932685d39a7896b1090353fc8efc1ef99c9c054e46c8002561252fb8", + "url": "https://files.pythonhosted.org/packages/51/b9/444a984b1667013bac41b31b45d9718e069cc7502a43a924896806605d83/watchdog-3.0.0-cp39-cp39-macosx_11_0_arm64.whl" + }, + { + "algorithm": "sha256", + "hash": "51f90f73b4697bac9c9a78394c3acbbd331ccd3655c11be1a15ae6fe289a8c83", + "url": "https://files.pythonhosted.org/packages/71/3a/b12740f4f60861240d57b42a2ac6ac0a2821db506c4435f7872c1fad867d/watchdog-3.0.0-py3-none-manylinux2014_ppc64le.whl" + }, + { + "algorithm": "sha256", + "hash": "5113334cf8cf0ac8cd45e1f8309a603291b614191c9add34d33075727a967709", + "url": "https://files.pythonhosted.org/packages/74/3c/e4b77f4f069aca2b6e35925db7a1aa6cb600dcb52fc3e962284640ca37f3/watchdog-3.0.0-py3-none-manylinux2014_ppc64.whl" + }, + { + "algorithm": "sha256", + "hash": "7c5f84b5194c24dd573fa6472685b2a27cc5a17fe5f7b6fd40345378ca6812e3", + "url": "https://files.pythonhosted.org/packages/75/fe/d9a37d8df76878853f68dd665ec6d2c7a984645de460164cb880a93ffe6b/watchdog-3.0.0-cp39-cp39-macosx_10_9_universal2.whl" + }, + { + "algorithm": "sha256", + "hash": "8ae9cda41fa114e28faf86cb137d751a17ffd0316d1c34ccf2235e8a84365c7f", + "url": "https://files.pythonhosted.org/packages/7f/6e/7ca8ed16928d7b11da69372f55c64a09dce649d2b24b03f7063cd8683c4b/watchdog-3.0.0-cp38-cp38-macosx_10_9_universal2.whl" + }, + { + "algorithm": "sha256", + "hash": "0e06ab8858a76e1219e68c7573dfeba9dd1c0219476c5a44d5333b01d7e1743a", + "url": "https://files.pythonhosted.org/packages/92/28/631872d7fbc45527037060db8c838b47a129a6c09d2297d6dddcfa283cf2/watchdog-3.0.0-py3-none-manylinux2014_aarch64.whl" + }, + { + "algorithm": "sha256", + "hash": "3aa7f6a12e831ddfe78cdd4f8996af9cf334fd6346531b16cec61c3b3c0d8da0", + "url": "https://files.pythonhosted.org/packages/94/ce/70c65a6c4b0330129c402624d42f67ce82d6a0ba2036de67628aeffda3c1/watchdog-3.0.0-cp39-cp39-macosx_10_9_x86_64.whl" + }, + { + "algorithm": "sha256", + "hash": "4d98a320595da7a7c5a18fc48cb633c2e73cda78f93cac2ef42d42bf609a33f9", + "url": "https://files.pythonhosted.org/packages/95/a6/d6ef450393dac5734c63c40a131f66808d2e6f59f6165ab38c98fbe4e6ec/watchdog-3.0.0.tar.gz" + }, + { + "algorithm": "sha256", + "hash": "d00e6be486affb5781468457b21a6cbe848c33ef43f9ea4a73b4882e5f188a44", + "url": "https://files.pythonhosted.org/packages/c0/a2/4e3230bdc1fb878b152a2c66aa941732776f4545bd68135d490591d66713/watchdog-3.0.0-py3-none-manylinux2014_armv7l.whl" + }, + { + "algorithm": "sha256", + "hash": "4f94069eb16657d2c6faada4624c39464f65c05606af50bb7902e036e3219be3", + "url": "https://files.pythonhosted.org/packages/dc/89/3a3ce6dd01807ff918aec3bbcabc92ed1a7edc5bb2266c720bb39fec1bec/watchdog-3.0.0-cp38-cp38-macosx_11_0_arm64.whl" + }, + { + "algorithm": "sha256", + "hash": "8f3ceecd20d71067c7fd4c9e832d4e22584318983cabc013dbf3f70ea95de346", + "url": "https://files.pythonhosted.org/packages/ea/76/bef1c6f6ac18041234a9f3e8bc995d611e255c44f10433bfaf255968c269/watchdog-3.0.0-pp38-pypy38_pp73-macosx_10_9_x86_64.whl" + } + ], + "project_name": "watchdog", + "requires_dists": [ + "PyYAML>=3.10; extra == \"watchmedo\"" + ], + "requires_python": ">=3.7", + "version": "3.0.0" + }, { "artifacts": [ { @@ -4881,7 +4934,6 @@ "jsonschema<4,>=3", "kombu", "lockfile", - "logshipper", "mail-parser==3.15.0", "mock", "mongoengine", @@ -4896,7 +4948,6 @@ "prettytable", "prompt-toolkit<2", "psutil", - "pyinotify<=0.10,>=0.9.5; platform_system == \"Linux\"", "pymongo", "pyrabbit", "pysocks", @@ -4926,6 +4977,7 @@ "ujson", "unittest2", "virtualenv", + "watchdog", "webob", "webtest", "wheel", diff --git a/pyproject.toml b/pyproject.toml index 0747dbabfc..0ed759b3e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,9 @@ exclude = ''' /( | \.git | \.virtualenv + | venv + | virtualenv + | virtualenv-osx | __pycache__ | test_content_version )/ diff --git a/requirements.txt b/requirements.txt index c6ddc77374..7f28a10638 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,7 +29,6 @@ jsonpath-rw==1.4.0 jsonschema==3.2.0 kombu==5.0.2 lockfile==0.12.2 -logshipper@ git+https://github.com/StackStorm/logshipper.git@stackstorm_patched ; platform_system=="Linux" mock==4.0.3 mongoengine==0.23.0 networkx<3 @@ -46,7 +45,6 @@ prettytable==2.1.0 prompt-toolkit==1.0.15 psutil==5.8.0 pyOpenSSL==23.1.0 -pyinotify==0.9.6 ; platform_system=="Linux" pymongo==3.11.3 pyparsing<3 pyrabbit @@ -75,6 +73,7 @@ tenacity>=3.2.1,<7.0.0 tooz==2.8.0 typing-extensions<4.2 unittest2 +watchdog webob==1.8.7 webtest zake==0.2.2 diff --git a/st2actions/in-requirements.txt b/st2actions/in-requirements.txt index 14cda20b57..21a5c8bc2c 100644 --- a/st2actions/in-requirements.txt +++ b/st2actions/in-requirements.txt @@ -17,8 +17,7 @@ python-json-logger gitpython lockfile # needed by core "linux" pack - TODO: create virtualenv for linux pack on postinst -pyinotify -logshipper@ git+https://github.com/StackStorm/logshipper.git@stackstorm_patched ; platform_system=="Linux" +watchdog # required by pack_mgmt/setup_virtualenv.py#L135 virtualenv # needed by requests diff --git a/st2actions/requirements.txt b/st2actions/requirements.txt index bdfe4e8b1c..4d2ad95273 100644 --- a/st2actions/requirements.txt +++ b/st2actions/requirements.txt @@ -13,13 +13,12 @@ gitpython<=3.1.37 jinja2==2.11.3 kombu==5.0.2 lockfile==0.12.2 -logshipper@ git+https://github.com/StackStorm/logshipper.git@stackstorm_patched ; platform_system=="Linux" oslo.config>=1.12.1,<1.13 oslo.utils<5.0,>=4.0.0 -pyinotify==0.9.6 ; platform_system=="Linux" pyparsing<3 python-dateutil==2.8.1 python-json-logger pyyaml==5.4.1 requests[security]==2.25.1 six==1.13.0 +watchdog diff --git a/test-requirements.txt b/test-requirements.txt index 0145399504..867ef25fce 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -15,8 +15,8 @@ tabulate unittest2 sphinx==1.7.9 sphinx-autobuild -# pin alabaster (sphinx dependency) or pip installs one that is not compatible -alabaster<0.7.14 +# pin alabaster (transitive dep of sphinx) as newer versions of alabaster do not support older sphinx versions +alabaster<0.7.14 ; python_version >= '3.9' # nosetests enhancements rednose nose-timer==1.0.1