From 675cd43073d9c5d3de53aed3e7584289f85dd20f Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 4 Aug 2025 14:43:24 -0700 Subject: [PATCH 1/4] refactor: use UploadOptions --- mapillary_tools/upload.py | 10 +++-- mapillary_tools/uploader.py | 89 +++++++++++++++---------------------- 2 files changed, 41 insertions(+), 58 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 9303b813..05676227 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -82,11 +82,13 @@ def upload( _setup_ipc(emitter) mly_uploader = uploader.Uploader( - user_items, + uploader.UploadOptions( + user_items, + dry_run=dry_run, + nofinish=nofinish, + noresume=noresume, + ), emitter=emitter, - dry_run=dry_run, - nofinish=nofinish, - noresume=noresume, ) results = _gen_upload_everything( diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index e7186086..dc5a8bc8 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -50,6 +50,15 @@ LOG = logging.getLogger(__name__) +@dataclasses.dataclass(frozen=True) +class UploadOptions: + user_items: config.UserItem + chunk_size: int = int(constants.UPLOAD_CHUNK_SIZE_MB * 1024 * 1024) + dry_run: bool = False + nofinish: bool = False + noresume: bool = False + + class UploaderProgress(T.TypedDict, total=True): """ Progress data that Uploader cares about. @@ -516,26 +525,14 @@ def _upload_sequence( cls, uploader: Uploader, sequence: T.Sequence[types.ImageMetadata], - progress: dict[str, T.Any] | None = None, + progress: dict[str, T.Any], ) -> str: - if progress is None: - progress = {} - - # FIXME: This is a hack to disable the event emitter inside the uploader - uploader_without_emitter = uploader.copy_uploader_without_emitter() - _validate_metadatas(sequence) progress["entity_size"] = sum(m.filesize or 0 for m in sequence) - - # TODO: assert sequence is sorted - - single_image_uploader = SingleImageUploader( - uploader, uploader_without_emitter, progress=progress - ) - uploader.emitter.emit("upload_start", progress) + single_image_uploader = SingleImageUploader(uploader, progress=progress) with concurrent.futures.ThreadPoolExecutor( max_workers=constants.MAX_IMAGE_UPLOAD_WORKERS ) as executor: @@ -543,9 +540,7 @@ def _upload_sequence( executor.map(single_image_uploader.upload, sequence) ) - manifest_file_handle = cls._upload_manifest( - uploader_without_emitter, image_file_handles - ) + manifest_file_handle = cls._upload_manifest(uploader, image_file_handles) uploader.emitter.emit("upload_end", progress) @@ -559,8 +554,10 @@ def _upload_sequence( @classmethod def _upload_manifest( - cls, uploader_without_emitter: Uploader, image_file_handles: T.Sequence[str] + cls, uploader: Uploader, image_file_handles: T.Sequence[str] ) -> str: + uploader_without_emitter = Uploader(uploader.upload_options) + manifest = { "version": "1", "upload_type": "images", @@ -583,14 +580,14 @@ class SingleImageUploader: def __init__( self, uploader: Uploader, - uploader_without_emitter: Uploader, progress: dict[str, T.Any] | None = None, ): self.uploader = uploader - self.uploader_without_emitter = uploader_without_emitter self.progress = progress or {} self.lock = threading.Lock() - self.cache = self._maybe_create_persistent_cache_instance(uploader.user_items) + self.cache = self._maybe_create_persistent_cache_instance( + uploader.upload_options.user_items + ) def upload(self, image_metadata: types.ImageMetadata) -> str: mutable_progress = { @@ -600,14 +597,16 @@ def upload(self, image_metadata: types.ImageMetadata) -> str: image_bytes = self.dump_image_bytes(image_metadata) - session_key = self.uploader_without_emitter._gen_session_key( + uploader_without_emitter = Uploader(self.uploader.upload_options) + + session_key = uploader_without_emitter._gen_session_key( io.BytesIO(image_bytes), mutable_progress ) file_handle = self._file_handle_cache_get(session_key) if file_handle is None: - file_handle = self.uploader_without_emitter.upload_stream( + file_handle = uploader_without_emitter.upload_stream( io.BytesIO(image_bytes), session_key=session_key, progress=mutable_progress, @@ -690,24 +689,14 @@ def _file_handle_cache_set(self, key: str, value: str) -> None: class Uploader: def __init__( - self, - user_items: config.UserItem, - emitter: EventEmitter | None = None, - chunk_size: int = int(constants.UPLOAD_CHUNK_SIZE_MB * 1024 * 1024), - dry_run: bool = False, - nofinish: bool = False, - noresume: bool = False, + self, upload_options: UploadOptions, emitter: EventEmitter | None = None ): - self.user_items = user_items + self.upload_options = upload_options if emitter is None: # An empty event emitter that does nothing self.emitter = EventEmitter() else: self.emitter = emitter - self.chunk_size = chunk_size - self.dry_run = dry_run - self.nofinish = nofinish - self.noresume = noresume def upload_stream( self, @@ -725,7 +714,7 @@ def upload_stream( entity_size = fp.tell() progress["entity_size"] = entity_size - progress["chunk_size"] = self.chunk_size + progress["chunk_size"] = self.upload_options.chunk_size progress["retries"] = 0 progress["begin_offset"] = None @@ -762,14 +751,16 @@ def finish_upload( if progress is None: progress = {} - if self.dry_run or self.nofinish: + if self.upload_options.dry_run or self.upload_options.nofinish: cluster_id = "0" else: resp = api_v4.finish_upload( - self.user_items["user_upload_token"], + self.upload_options.user_items["user_upload_token"], file_handle, cluster_filetype, - organization_id=self.user_items.get("MAPOrganizationKey"), + organization_id=self.upload_options.user_items.get( + "MAPOrganizationKey" + ), ) body = api_v4.jsonify_response(resp) @@ -781,23 +772,13 @@ def finish_upload( return cluster_id - def copy_uploader_without_emitter(self) -> Uploader: - return Uploader( - self.user_items, - emitter=None, - chunk_size=self.chunk_size, - dry_run=self.dry_run, - nofinish=self.nofinish, - noresume=self.noresume, - ) - def _create_upload_service(self, session_key: str) -> upload_api_v4.UploadService: upload_service: upload_api_v4.UploadService - if self.dry_run: + if self.upload_options.dry_run: upload_path = os.getenv("MAPILLARY_UPLOAD_ENDPOINT") upload_service = upload_api_v4.FakeUploadService( - user_access_token=self.user_items["user_upload_token"], + user_access_token=self.upload_options.user_items["user_upload_token"], session_key=session_key, upload_path=Path(upload_path) if upload_path is not None else None, ) @@ -807,7 +788,7 @@ def _create_upload_service(self, session_key: str) -> upload_api_v4.UploadServic ) else: upload_service = upload_api_v4.UploadService( - user_access_token=self.user_items["user_upload_token"], + user_access_token=self.upload_options.user_items["user_upload_token"], session_key=session_key, ) @@ -846,7 +827,7 @@ def _chunk_with_progress_emitted( progress: UploaderProgress, ) -> T.Generator[bytes, None, None]: for chunk in upload_api_v4.UploadService.chunkize_byte_stream( - stream, self.chunk_size + stream, self.upload_options.chunk_size ): yield chunk @@ -879,7 +860,7 @@ def _upload_stream_retryable( return upload_service.upload_shifted_chunks(shifted_chunks, begin_offset) def _gen_session_key(self, fp: T.IO[bytes], progress: dict[str, T.Any]) -> str: - if self.noresume: + if self.upload_options.noresume: # Generate a unique UUID for session_key when noresume is True # to prevent resuming from previous uploads session_key = f"{_prefixed_uuid4()}" From e4381aa24a41145bed7c38955168a74d3326d33b Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 4 Aug 2025 14:45:20 -0700 Subject: [PATCH 2/4] estimate read timeouts --- mapillary_tools/api_v4.py | 12 +++++++++++- mapillary_tools/constants.py | 5 +++++ mapillary_tools/upload_api_v4.py | 24 ++++++++++++++++++------ mapillary_tools/uploader.py | 12 +++++++++++- 4 files changed, 45 insertions(+), 8 deletions(-) diff --git a/mapillary_tools/api_v4.py b/mapillary_tools/api_v4.py index ae38f280..40363daf 100644 --- a/mapillary_tools/api_v4.py +++ b/mapillary_tools/api_v4.py @@ -121,6 +121,7 @@ def _log_debug_request( json: dict | None = None, params: dict | None = None, headers: dict | None = None, + timeout: T.Any = None, ): if logging.getLogger().getEffectiveLevel() <= logging.DEBUG: return @@ -140,6 +141,9 @@ def _log_debug_request( if headers: msg += f" HEADERS={_sanitize(headers)}" + if timeout is not None: + msg += f" TIMEOUT={timeout}" + msg = msg.replace("\n", "\\n") LOG.debug(msg) @@ -202,6 +206,7 @@ def request_post( json=json, params=kwargs.get("params"), headers=kwargs.get("headers"), + timeout=kwargs.get("timeout"), ) if USE_SYSTEM_CERTS: @@ -235,7 +240,12 @@ def request_get( if not disable_debug: _log_debug_request( - "GET", url, params=kwargs.get("params"), headers=kwargs.get("headers") + "GET", + url, + params=kwargs.get("params"), + headers=kwargs.get("headers"), + # Do not log timeout here as it's always set to REQUESTS_TIMEOUT + timeout=None, ) if USE_SYSTEM_CERTS: diff --git a/mapillary_tools/constants.py b/mapillary_tools/constants.py index 09396a8e..cc842b51 100644 --- a/mapillary_tools/constants.py +++ b/mapillary_tools/constants.py @@ -151,6 +151,11 @@ def _parse_scaled_integers( _ENV_PREFIX + "UPLOAD_CACHE_DIR", os.path.join(tempfile.gettempdir(), "mapillary_tools", "upload_cache"), ) +# The minimal upload speed is used to calculate the read timeout to avoid upload hanging: +# timeout = upload_size / MIN_UPLOAD_SPEED +MIN_UPLOAD_SPEED: int | None = _parse_filesize( + os.getenv(_ENV_PREFIX + "MIN_UPLOAD_SPEED", "50K") # 50 KiB/s +) MAX_IMAGE_UPLOAD_WORKERS: int = int( os.getenv(_ENV_PREFIX + "MAX_IMAGE_UPLOAD_WORKERS", 64) ) diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index b8c2e27b..7e6819db 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -125,23 +125,34 @@ def upload_byte_stream( stream: T.IO[bytes], offset: int | None = None, chunk_size: int = 2 * 1024 * 1024, # 2MB + read_timeout: float | None = None, ) -> str: if offset is None: offset = self.fetch_offset() - return self.upload_chunks(self.chunkize_byte_stream(stream, chunk_size), offset) + return self.upload_chunks( + self.chunkize_byte_stream(stream, chunk_size), + offset, + read_timeout=read_timeout, + ) def upload_chunks( self, chunks: T.Iterable[bytes], offset: int | None = None, + read_timeout: float | None = None, ) -> str: if offset is None: offset = self.fetch_offset() shifted_chunks = self.shift_chunks(chunks, offset) - return self.upload_shifted_chunks(shifted_chunks, offset) + return self.upload_shifted_chunks( + shifted_chunks, offset, read_timeout=read_timeout + ) def upload_shifted_chunks( - self, shifted_chunks: T.Iterable[bytes], offset: int + self, + shifted_chunks: T.Iterable[bytes], + offset: int, + read_timeout: float | None = None, ) -> str: """ Upload the chunks that must already be shifted by the offset (e.g. fp.seek(offset, io.SEEK_SET)) @@ -153,8 +164,6 @@ def upload_shifted_chunks( "X-Entity-Name": self.session_key, } url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}" - # TODO: Estimate read timeout based on the data size - read_timeout = None resp = request_post( url, headers=headers, @@ -198,7 +207,10 @@ def __init__( @override def upload_shifted_chunks( - self, shifted_chunks: T.Iterable[bytes], offset: int + self, + shifted_chunks: T.Iterable[bytes], + offset: int, + read_timeout: float | None = None, ) -> str: expected_offset = self.fetch_offset() if offset != expected_offset: diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index dc5a8bc8..405a7eae 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -851,13 +851,23 @@ def _upload_stream_retryable( progress["begin_offset"] = begin_offset progress["offset"] = begin_offset + if not constants.MIN_UPLOAD_SPEED: + read_timeout = None + else: + remaining_bytes = abs(progress["entity_size"] - begin_offset) + read_timeout = max( + api_v4.REQUESTS_TIMEOUT, remaining_bytes / constants.MIN_UPLOAD_SPEED + ) + self.emitter.emit("upload_fetch_offset", progress) fp.seek(begin_offset, io.SEEK_SET) shifted_chunks = self._chunk_with_progress_emitted(fp, progress) - return upload_service.upload_shifted_chunks(shifted_chunks, begin_offset) + return upload_service.upload_shifted_chunks( + shifted_chunks, begin_offset, read_timeout=read_timeout + ) def _gen_session_key(self, fp: T.IO[bytes], progress: dict[str, T.Any]) -> str: if self.upload_options.noresume: From afcd7be31e717451edeb56c9b75ee1818999f42e Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 4 Aug 2025 15:32:47 -0700 Subject: [PATCH 3/4] fix tests --- tests/unit/test_uploader.py | 46 ++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index 8f10abde..c4a9bbcb 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -26,7 +26,9 @@ def setup_unittest_data(tmpdir: py.path.local): def test_upload_images(setup_unittest_data: py.path.local, setup_upload: py.path.local): mly_uploader = uploader.Uploader( - {"user_upload_token": "YOUR_USER_ACCESS_TOKEN"}, dry_run=True + uploader.UploadOptions( + {"user_upload_token": "YOUR_USER_ACCESS_TOKEN"}, dry_run=True + ) ) test_exif = setup_unittest_data.join("test_exif.jpg") descs: T.List[description.DescriptionOrError] = [ @@ -98,12 +100,14 @@ def test_upload_images_multiple_sequences( }, ] mly_uploader = uploader.Uploader( - { - "user_upload_token": "YOUR_USER_ACCESS_TOKEN", - # will call the API for real - # "MAPOrganizationKey": "3011753992432185", - }, - dry_run=True, + uploader.UploadOptions( + { + "user_upload_token": "YOUR_USER_ACCESS_TOKEN", + # will call the API for real + # "MAPOrganizationKey": "3011753992432185", + }, + dry_run=True, + ), ) results = list( uploader.ZipUploader.zip_images_and_upload( @@ -167,12 +171,14 @@ def test_upload_zip( assert len(zip_dir.listdir()) == 2, list(zip_dir.listdir()) mly_uploader = uploader.Uploader( - { - "user_upload_token": "YOUR_USER_ACCESS_TOKEN", - # will call the API for real - # "MAPOrganizationKey": 3011753992432185, - }, - dry_run=True, + uploader.UploadOptions( + { + "user_upload_token": "YOUR_USER_ACCESS_TOKEN", + # will call the API for real + # "MAPOrganizationKey": 3011753992432185, + }, + dry_run=True, + ), emitter=emitter, ) for zip_path in zip_dir.listdir(): @@ -184,12 +190,14 @@ def test_upload_zip( def test_upload_blackvue(tmpdir: py.path.local, setup_upload: py.path.local): mly_uploader = uploader.Uploader( - { - "user_upload_token": "YOUR_USER_ACCESS_TOKEN", - # will call the API for real - # "MAPOrganizationKey": "3011753992432185", - }, - dry_run=True, + uploader.UploadOptions( + { + "user_upload_token": "YOUR_USER_ACCESS_TOKEN", + # will call the API for real + # "MAPOrganizationKey": "3011753992432185", + }, + dry_run=True, + ) ) blackvue_path = tmpdir.join("blackvue.mp4") with open(blackvue_path, "wb") as fp: From bc0a7790dc0a7e030ed727061d47a466bbd9f2e0 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 4 Aug 2025 15:33:02 -0700 Subject: [PATCH 4/4] log as json --- mapillary_tools/upload.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 05676227..5636dec0 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -326,13 +326,13 @@ def _setup_ipc(emitter: uploader.EventEmitter): @emitter.on("upload_start") def upload_start(payload: uploader.Progress): type: uploader.EventName = "upload_start" - LOG.debug("IPC %s: %s", type.upper(), payload) + LOG.debug(f"{type.upper()}: {json.dumps(payload)}") ipc.send(type, payload) @emitter.on("upload_fetch_offset") def upload_fetch_offset(payload: uploader.Progress) -> None: type: uploader.EventName = "upload_fetch_offset" - LOG.debug("IPC %s: %s", type.upper(), payload) + LOG.debug(f"{type.upper()}: {json.dumps(payload)}") ipc.send(type, payload) @emitter.on("upload_progress") @@ -351,7 +351,7 @@ def upload_progress(payload: uploader.Progress): last_upload_progress_debug_at is None or last_upload_progress_debug_at + INTERVAL_SECONDS < now ): - LOG.debug("IPC %s: %s", type.upper(), payload) + LOG.debug(f"{type.upper()}: {json.dumps(payload)}") T.cast(T.Dict, payload)["_last_upload_progress_debug_at"] = now ipc.send(type, payload) @@ -359,7 +359,13 @@ def upload_progress(payload: uploader.Progress): @emitter.on("upload_end") def upload_end(payload: uploader.Progress) -> None: type: uploader.EventName = "upload_end" - LOG.debug("IPC %s: %s", type.upper(), payload) + LOG.debug(f"{type.upper()}: {json.dumps(payload)}") + ipc.send(type, payload) + + @emitter.on("upload_failed") + def upload_failed(payload: uploader.Progress) -> None: + type: uploader.EventName = "upload_failed" + LOG.debug(f"{type.upper()}: {json.dumps(payload)}") ipc.send(type, payload)