diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a02576d..f608bbcd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Added ability to include environment variables within metadata for runs. - Added new feature allowing users to log tensors as multidimensional metrics after defining a grid. - Improves checks on `offline.cache` directory specification in config file. +- Added ability to upload multiple runs as a batch via the low level API. ## [v2.1.2](https://github.com/simvue-io/client/releases/tag/v2.1.2) - 2025-06-25 diff --git a/simvue/api/objects/artifact/base.py b/simvue/api/objects/artifact/base.py index e8e4df92..6261e446 100644 --- a/simvue/api/objects/artifact/base.py +++ b/simvue/api/objects/artifact/base.py @@ -59,6 +59,10 @@ def __init__( # from the initial creation self._init_data: dict[str, dict] = {} + @classmethod + def new(cls, *_, **__) -> Self: + raise NotImplementedError + def commit(self) -> None: """Not applicable, cannot commit single write artifact.""" self._logger.info("Cannot call method 'commit' on write-once type 'Artifact'") diff --git a/simvue/api/objects/artifact/file.py b/simvue/api/objects/artifact/file.py index b2dfd078..664dadde 100644 --- a/simvue/api/objects/artifact/file.py +++ b/simvue/api/objects/artifact/file.py @@ -94,7 +94,7 @@ def new( _artifact._init_data = {} else: - _artifact._init_data = _artifact._post(**_artifact._staging) + _artifact._init_data = _artifact._post_single(**_artifact._staging) _artifact._staging["url"] = _artifact._init_data["url"] _artifact._init_data["runs"] = kwargs.get("runs") or {} diff --git a/simvue/api/objects/artifact/object.py b/simvue/api/objects/artifact/object.py index efe1d9fa..bfbdd46e 100644 --- a/simvue/api/objects/artifact/object.py +++ b/simvue/api/objects/artifact/object.py @@ -114,7 +114,7 @@ def new( file.write(_serialized) else: - _artifact._init_data = _artifact._post(**_artifact._staging) + _artifact._init_data = _artifact._post_single(**_artifact._staging) _artifact._staging["url"] = _artifact._init_data["url"] _artifact._init_data["runs"] = kwargs.get("runs") or {} diff --git a/simvue/api/objects/base.py b/simvue/api/objects/base.py index 15fd0dce..51e9bfad 100644 --- a/simvue/api/objects/base.py +++ b/simvue/api/objects/base.py @@ -18,6 +18,7 @@ import msgpack import pydantic +from collections.abc import Generator from simvue.utilities import staging_merger from simvue.config.user import SimvueConfiguration from simvue.exception import ObjectNotFoundError @@ -172,6 +173,16 @@ def to_params(self) -> dict[str, str]: return {"id": self.column, "desc": self.descending} +class VisibilityBatchArgs(pydantic.BaseModel): + tenant: bool | None = None + user: list[str] | None = None + public: bool | None = None + + +class ObjectBatchArgs(pydantic.BaseModel): + pass + + class SimvueObject(abc.ABC): def __init__( self, @@ -361,13 +372,21 @@ def _get_visibility(self) -> dict[str, bool | list[str]]: return {} @classmethod + @abc.abstractmethod def new(cls, **_) -> Self: pass + @classmethod + def batch_create( + cls, obj_args: ObjectBatchArgs, visibility: VisibilityBatchArgs + ) -> Generator[str]: + _, __ = obj_args, visibility + raise NotImplementedError + @classmethod def ids( cls, count: int | None = None, offset: int | None = None, **kwargs - ) -> typing.Generator[str, None, None]: + ) -> Generator[str, None, None]: """Retrieve a list of all object identifiers. Parameters @@ -402,7 +421,7 @@ def get( count: pydantic.PositiveInt | None = None, offset: pydantic.NonNegativeInt | None = None, **kwargs, - ) -> typing.Generator[tuple[str, T | None], None, None]: + ) -> Generator[tuple[str, T | None], None, None]: """Retrieve items of this object type from the server. Parameters @@ -467,7 +486,7 @@ def _get_all_objects( endpoint: str | None = None, expected_type: type = dict, **kwargs, - ) -> typing.Generator[dict, None, None]: + ) -> Generator[dict, None, None]: _class_instance = cls(_read_only=True) # Allow the possibility of paginating a URL that is not the @@ -514,7 +533,7 @@ def read_only(self, is_read_only: bool) -> None: if not self._read_only: self._staging = self._get_local_staged() - def commit(self) -> dict | None: + def commit(self) -> dict | list[dict] | None: """Send updates to the server, or if offline, store locally.""" if self._read_only: raise AttributeError("Cannot commit object in 'read-only' mode") @@ -526,15 +545,22 @@ def commit(self) -> dict | None: self._cache() return - _response: dict | None = None + _response: dict[str, str] | list[dict[str, str]] | None = None # Initial commit is creation of object # if staging is empty then we do not need to use PUT if not self._identifier or self._identifier.startswith("offline_"): - self._logger.debug( - f"Posting from staged data for {self._label} '{self.id}': {self._staging}" - ) - _response = self._post(**self._staging) + # If batch upload send as list, else send as dictionary of params + if _batch_commit := self._staging.get("batch"): + self._logger.debug( + f"Posting batched data to server: {len(_batch_commit)} {self._label}s" + ) + _response = self._post_batch(batch_data=_batch_commit) + else: + self._logger.debug( + f"Posting from staged data for {self._label} '{self.id}': {self._staging}" + ) + _response = self._post_single(**self._staging) elif self._staging: self._logger.debug( f"Pushing updates from staged data for {self._label} '{self.id}': {self._staging}" @@ -570,11 +596,45 @@ def url(self) -> URL | None: """ return None if self._identifier is None else self._base_url / self._identifier - def _post( + def _post_batch( + self, + batch_data: list[ObjectBatchArgs], + ) -> list[dict[str, str]]: + _response = sv_post( + url=f"{self._base_url}", + headers=self._headers | {"Content-Type": "application/msgpack"}, + params=self._params, + data=batch_data, + is_json=True, + ) + + if _response.status_code == http.HTTPStatus.FORBIDDEN: + raise RuntimeError( + f"Forbidden: You do not have permission to create object of type '{self._label}'" + ) + + _json_response = get_json_from_response( + response=_response, + expected_status=[http.HTTPStatus.OK, http.HTTPStatus.CONFLICT], + scenario=f"Creation of multiple {self._label}s", + expected_type=list, + ) + + if not len(batch_data) == (_n_created := len(_json_response)): + raise RuntimeError( + f"Expected {len(batch_data)} to be created, but only {_n_created} found." + ) + + self._logger.debug(f"successfully created {_n_created} {self._label}s") + + return _json_response + + def _post_single( self, *, is_json: bool = True, data: list | dict | None = None, **kwargs - ) -> dict[str, typing.Any]: + ) -> dict[str, typing.Any] | list[dict[str, typing.Any]]: if not is_json: kwargs = msgpack.packb(data or kwargs, use_bin_type=True) + _response = sv_post( url=f"{self._base_url}", headers=self._headers | {"Content-Type": "application/msgpack"}, @@ -594,11 +654,6 @@ def _post( scenario=f"Creation of {self._label}", ) - if isinstance(_json_response, list): - raise RuntimeError( - "Expected dictionary from JSON response but got type list" - ) - if _id := _json_response.get("id"): self._logger.debug("'%s' created successfully", _id) self._identifier = _id diff --git a/simvue/api/objects/events.py b/simvue/api/objects/events.py index f786c99b..2bd50ebd 100644 --- a/simvue/api/objects/events.py +++ b/simvue/api/objects/events.py @@ -78,8 +78,8 @@ def new( **kwargs, ) - def _post(self, **kwargs) -> dict[str, typing.Any]: - return super()._post(is_json=False, **kwargs) + def _post_single(self, **kwargs) -> dict[str, typing.Any]: + return super()._post_single(is_json=False, **kwargs) def _put(self, **kwargs) -> dict[str, typing.Any]: raise NotImplementedError("Method 'put' is not available for type Events") diff --git a/simvue/api/objects/metrics.py b/simvue/api/objects/metrics.py index 11559dea..2226c4fd 100644 --- a/simvue/api/objects/metrics.py +++ b/simvue/api/objects/metrics.py @@ -138,8 +138,8 @@ def names(self, run_ids: list[str]) -> list[str]: expected_type=list, ) - def _post(self, **kwargs) -> dict[str, typing.Any]: - return super()._post(is_json=False, **kwargs) + def _post_single(self, **kwargs) -> dict[str, typing.Any]: + return super()._post_single(is_json=False, **kwargs) def delete(self, **kwargs) -> dict[str, typing.Any]: """Metrics cannot be deleted""" diff --git a/simvue/api/objects/run.py b/simvue/api/objects/run.py index 79d5405a..bda02c93 100644 --- a/simvue/api/objects/run.py +++ b/simvue/api/objects/run.py @@ -7,6 +7,7 @@ """ +from collections.abc import Generator, Iterable import http import typing import pydantic @@ -19,7 +20,15 @@ except ImportError: from typing_extensions import Self -from .base import SimvueObject, Sort, staging_check, Visibility, write_only +from .base import ( + ObjectBatchArgs, + VisibilityBatchArgs, + SimvueObject, + Sort, + staging_check, + Visibility, + write_only, +) from simvue.api.request import ( get as sv_get, put as sv_put, @@ -54,6 +63,18 @@ def check_column(cls, column: str) -> str: return column +class RunBatchArgs(ObjectBatchArgs): + name: str | None = None + description: str | None = None + tags: list[str] | None = None + metadata: dict[str, str | int | float | bool] | None = None + folder: typing.Annotated[str, pydantic.Field(pattern=FOLDER_REGEX)] | None = None + system: dict[str, typing.Any] | None = None + status: typing.Literal[ + "terminated", "created", "failed", "completed", "lost", "running" + ] = "created" + + class Run(SimvueObject): """Class for directly interacting with/creating runs on the server.""" @@ -123,6 +144,50 @@ def new( **kwargs, ) + @classmethod + @pydantic.validate_call + def batch_create( + cls, + entries: Iterable[RunBatchArgs], + *, + visibility: VisibilityBatchArgs | None = None, + folder: typing.Annotated[str, pydantic.StringConstraints(pattern=FOLDER_REGEX)] + | None = None, + metadata: dict[str, str | int | float | bool] | None = None, + ) -> Generator[str]: + """Create a batch of Runs as a single request. + + Parameters + ---------- + entries : Iterable[RunBatchArgs] + define the runs to be created. + visibility : VisibilityBatchArgs | None, optional + specify visibility options for these runs, default is None. + folder : str, optional + override folder specification for these runs to be a single folder, default None. + metadata : dict[str, int | str | float | bool], optional + override metadata specification for these runs, default None. + + Yields + ------ + str + identifiers for created runs + """ + _data: list[dict[str, object]] = [ + entry.model_dump(exclude_none=True) + | ( + {"visibility": visibility.model_dump(exclude_none=True)} + if visibility + else {} + ) + | ({"folder": folder} if folder else {}) + | {"metadata": (entry.metadata or {}) | (metadata or {})} + for entry in entries + ] + for entry in Run(batch=_data, _read_only=False).commit() or []: + _id: str = entry["id"] + yield _id + @property @staging_check def name(self) -> str: diff --git a/tests/unit/test_run.py b/tests/unit/test_run.py index 560620ff..366dc7c9 100644 --- a/tests/unit/test_run.py +++ b/tests/unit/test_run.py @@ -4,6 +4,7 @@ import time import datetime import uuid +from simvue.api.objects.run import RunBatchArgs from simvue.sender import sender from simvue.api.objects import Run, Folder from simvue.client import Client @@ -187,3 +188,24 @@ def test_run_get_properties() -> None: if _failed: raise AssertionError("\n" + "\n\t- ".join(": ".join(i) for i in _failed)) + + +@pytest.mark.api +@pytest.mark.online +def test_batch_run_creation() -> None: + _uuid: str = f"{uuid.uuid4()}".split("-")[0] + _folder: Folder = Folder.new(path=f"/simvue_unit_testing/{_uuid}") + _folder.commit() + _runs = [ + RunBatchArgs(name=f"batched_run_{i}") + for i in range(10) + ] + _counter: int = 0 + for i, _id in enumerate(Run.batch_create(entries=_runs, folder=f"/simvue_unit_testing/{_uuid}", metadata={"batch_id": 0})): + _run = Run(identifier=_id) + assert _run.name == f"batched_run_{i}" + assert _run.metadata["batch_id"] == 0 + _counter +=1 + assert _counter == 10 + with contextlib.suppress(Exception): + _folder.delete(recursive=True, delete_runs=True)