diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f8f78f2..8e67ed2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -34,6 +34,7 @@ jobs: python -m pip install -e . - name: Test + timeout-minutes: 1 run: python -m pytest --cov-report=xml --cov=. - name: Upload coverage to Codecov diff --git a/requirements-dev.txt b/requirements-dev.txt index 1b4add2..844661d 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,5 +1,5 @@ pytest pytest-asyncio pytest-cov -mock-ssh-server +mock-ssh-server @ git+https://github.com/skshetry/mock-ssh-server.git@command-queue-race importlib-metadata >= 6.0.0 diff --git a/sshfs/spec.py b/sshfs/spec.py index 88f7f2b..5c1fc73 100644 --- a/sshfs/spec.py +++ b/sshfs/spec.py @@ -8,7 +8,13 @@ import asyncssh from asyncssh.sftp import SFTPOpUnsupported -from fsspec.asyn import AsyncFileSystem, async_methods, sync, sync_wrapper +from fsspec.asyn import ( + AsyncFileSystem, + FSTimeoutError, + async_methods, + sync, + sync_wrapper, +) from fsspec.utils import infer_storage_options from sshfs.file import SSHFile @@ -71,7 +77,7 @@ def __init__( **_client_args, ) weakref.finalize( - self, sync, self.loop, self._finalize, self._pool, self._stack + self, self._finalize, self.loop, self._pool, self._stack ) @classmethod @@ -101,15 +107,29 @@ async def _connect( connect = sync_wrapper(_connect) @staticmethod - async def _finalize(pool, stack): - await pool.close() - - # If an error occurs while the SSHFile is trying to - # open the native file, then the client might get broken - # due to partial initialization. We are just going to ignore - # the errors that arises on the finalization layer - with suppress(BrokenPipeError): - await stack.aclose() + def _finalize(loop, pool, stack): + async def close(): + await pool.close() + # If an error occurs while the SSHFile is trying to + # open the native file, then the client might get broken + # due to partial initialization. We are just going to ignore + # the errors that arises on the finalization layer + with suppress(BrokenPipeError): + await stack.aclose() + + if loop is not None and loop.is_running(): + try: + loop = asyncio.get_running_loop() + loop.create_task(close()) + return + except RuntimeError: + pass + + try: + sync(loop, close, timeout=0.1) + return + except FSTimeoutError: + pass @property def client(self):