Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Added ability to include environment variables within metadata for runs.
- Added new feature allowing users to log tensors as multidimensional metrics after defining a grid.
- Improves checks on `offline.cache` directory specification in config file.
- Added ability to upload multiple runs as a batch via the low level API.

## [v2.1.2](https://github.com/simvue-io/client/releases/tag/v2.1.2) - 2025-06-25

Expand Down
4 changes: 4 additions & 0 deletions simvue/api/objects/artifact/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def __init__(
# from the initial creation
self._init_data: dict[str, dict] = {}

@classmethod
def new(cls, *_, **__) -> Self:
raise NotImplementedError

def commit(self) -> None:
"""Not applicable, cannot commit single write artifact."""
self._logger.info("Cannot call method 'commit' on write-once type 'Artifact'")
Expand Down
2 changes: 1 addition & 1 deletion simvue/api/objects/artifact/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def new(
_artifact._init_data = {}

else:
_artifact._init_data = _artifact._post(**_artifact._staging)
_artifact._init_data = _artifact._post_single(**_artifact._staging)
_artifact._staging["url"] = _artifact._init_data["url"]

_artifact._init_data["runs"] = kwargs.get("runs") or {}
Expand Down
2 changes: 1 addition & 1 deletion simvue/api/objects/artifact/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def new(
file.write(_serialized)

else:
_artifact._init_data = _artifact._post(**_artifact._staging)
_artifact._init_data = _artifact._post_single(**_artifact._staging)
_artifact._staging["url"] = _artifact._init_data["url"]

_artifact._init_data["runs"] = kwargs.get("runs") or {}
Expand Down
87 changes: 71 additions & 16 deletions simvue/api/objects/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import msgpack
import pydantic

from collections.abc import Generator
from simvue.utilities import staging_merger
from simvue.config.user import SimvueConfiguration
from simvue.exception import ObjectNotFoundError
Expand Down Expand Up @@ -172,6 +173,16 @@ def to_params(self) -> dict[str, str]:
return {"id": self.column, "desc": self.descending}


class VisibilityBatchArgs(pydantic.BaseModel):
tenant: bool | None = None
user: list[str] | None = None
public: bool | None = None


class ObjectBatchArgs(pydantic.BaseModel):
pass


class SimvueObject(abc.ABC):
def __init__(
self,
Expand Down Expand Up @@ -361,13 +372,21 @@ def _get_visibility(self) -> dict[str, bool | list[str]]:
return {}

@classmethod
@abc.abstractmethod
def new(cls, **_) -> Self:
pass

@classmethod
def batch_create(
cls, obj_args: ObjectBatchArgs, visibility: VisibilityBatchArgs
) -> Generator[str]:
_, __ = obj_args, visibility
raise NotImplementedError

@classmethod
def ids(
cls, count: int | None = None, offset: int | None = None, **kwargs
) -> typing.Generator[str, None, None]:
) -> Generator[str, None, None]:
"""Retrieve a list of all object identifiers.

Parameters
Expand Down Expand Up @@ -402,7 +421,7 @@ def get(
count: pydantic.PositiveInt | None = None,
offset: pydantic.NonNegativeInt | None = None,
**kwargs,
) -> typing.Generator[tuple[str, T | None], None, None]:
) -> Generator[tuple[str, T | None], None, None]:
"""Retrieve items of this object type from the server.

Parameters
Expand Down Expand Up @@ -467,7 +486,7 @@ def _get_all_objects(
endpoint: str | None = None,
expected_type: type = dict,
**kwargs,
) -> typing.Generator[dict, None, None]:
) -> Generator[dict, None, None]:
_class_instance = cls(_read_only=True)

# Allow the possibility of paginating a URL that is not the
Expand Down Expand Up @@ -514,7 +533,7 @@ def read_only(self, is_read_only: bool) -> None:
if not self._read_only:
self._staging = self._get_local_staged()

def commit(self) -> dict | None:
def commit(self) -> dict | list[dict] | None:
"""Send updates to the server, or if offline, store locally."""
if self._read_only:
raise AttributeError("Cannot commit object in 'read-only' mode")
Expand All @@ -526,15 +545,22 @@ def commit(self) -> dict | None:
self._cache()
return

_response: dict | None = None
_response: dict[str, str] | list[dict[str, str]] | None = None

# Initial commit is creation of object
# if staging is empty then we do not need to use PUT
if not self._identifier or self._identifier.startswith("offline_"):
self._logger.debug(
f"Posting from staged data for {self._label} '{self.id}': {self._staging}"
)
_response = self._post(**self._staging)
# If batch upload send as list, else send as dictionary of params
if _batch_commit := self._staging.get("batch"):
self._logger.debug(
f"Posting batched data to server: {len(_batch_commit)} {self._label}s"
)
_response = self._post_batch(batch_data=_batch_commit)
else:
self._logger.debug(
f"Posting from staged data for {self._label} '{self.id}': {self._staging}"
)
_response = self._post_single(**self._staging)
elif self._staging:
self._logger.debug(
f"Pushing updates from staged data for {self._label} '{self.id}': {self._staging}"
Expand Down Expand Up @@ -570,11 +596,45 @@ def url(self) -> URL | None:
"""
return None if self._identifier is None else self._base_url / self._identifier

def _post(
def _post_batch(
self,
batch_data: list[ObjectBatchArgs],
) -> list[dict[str, str]]:
_response = sv_post(
url=f"{self._base_url}",
headers=self._headers | {"Content-Type": "application/msgpack"},
params=self._params,
data=batch_data,
is_json=True,
)

if _response.status_code == http.HTTPStatus.FORBIDDEN:
raise RuntimeError(
f"Forbidden: You do not have permission to create object of type '{self._label}'"
)

_json_response = get_json_from_response(
response=_response,
expected_status=[http.HTTPStatus.OK, http.HTTPStatus.CONFLICT],
scenario=f"Creation of multiple {self._label}s",
expected_type=list,
)

if not len(batch_data) == (_n_created := len(_json_response)):
raise RuntimeError(
f"Expected {len(batch_data)} to be created, but only {_n_created} found."
)

self._logger.debug(f"successfully created {_n_created} {self._label}s")

return _json_response

def _post_single(
self, *, is_json: bool = True, data: list | dict | None = None, **kwargs
) -> dict[str, typing.Any]:
) -> dict[str, typing.Any] | list[dict[str, typing.Any]]:
if not is_json:
kwargs = msgpack.packb(data or kwargs, use_bin_type=True)

_response = sv_post(
url=f"{self._base_url}",
headers=self._headers | {"Content-Type": "application/msgpack"},
Expand All @@ -594,11 +654,6 @@ def _post(
scenario=f"Creation of {self._label}",
)

if isinstance(_json_response, list):
raise RuntimeError(
"Expected dictionary from JSON response but got type list"
)

if _id := _json_response.get("id"):
self._logger.debug("'%s' created successfully", _id)
self._identifier = _id
Expand Down
4 changes: 2 additions & 2 deletions simvue/api/objects/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ def new(
**kwargs,
)

def _post(self, **kwargs) -> dict[str, typing.Any]:
return super()._post(is_json=False, **kwargs)
def _post_single(self, **kwargs) -> dict[str, typing.Any]:
return super()._post_single(is_json=False, **kwargs)

def _put(self, **kwargs) -> dict[str, typing.Any]:
raise NotImplementedError("Method 'put' is not available for type Events")
Expand Down
4 changes: 2 additions & 2 deletions simvue/api/objects/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ def names(self, run_ids: list[str]) -> list[str]:
expected_type=list,
)

def _post(self, **kwargs) -> dict[str, typing.Any]:
return super()._post(is_json=False, **kwargs)
def _post_single(self, **kwargs) -> dict[str, typing.Any]:
return super()._post_single(is_json=False, **kwargs)

def delete(self, **kwargs) -> dict[str, typing.Any]:
"""Metrics cannot be deleted"""
Expand Down
67 changes: 66 additions & 1 deletion simvue/api/objects/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

"""

from collections.abc import Generator, Iterable
import http
import typing
import pydantic
Expand All @@ -19,7 +20,15 @@
except ImportError:
from typing_extensions import Self

from .base import SimvueObject, Sort, staging_check, Visibility, write_only
from .base import (
ObjectBatchArgs,
VisibilityBatchArgs,
SimvueObject,
Sort,
staging_check,
Visibility,
write_only,
)
from simvue.api.request import (
get as sv_get,
put as sv_put,
Expand Down Expand Up @@ -54,6 +63,18 @@ def check_column(cls, column: str) -> str:
return column


class RunBatchArgs(ObjectBatchArgs):
name: str | None = None
description: str | None = None
tags: list[str] | None = None
metadata: dict[str, str | int | float | bool] | None = None
folder: typing.Annotated[str, pydantic.Field(pattern=FOLDER_REGEX)] | None = None
system: dict[str, typing.Any] | None = None
status: typing.Literal[
"terminated", "created", "failed", "completed", "lost", "running"
] = "created"


class Run(SimvueObject):
"""Class for directly interacting with/creating runs on the server."""

Expand Down Expand Up @@ -123,6 +144,50 @@ def new(
**kwargs,
)

@classmethod
@pydantic.validate_call
def batch_create(
cls,
entries: Iterable[RunBatchArgs],
*,
visibility: VisibilityBatchArgs | None = None,
folder: typing.Annotated[str, pydantic.StringConstraints(pattern=FOLDER_REGEX)]
| None = None,
metadata: dict[str, str | int | float | bool] | None = None,
) -> Generator[str]:
"""Create a batch of Runs as a single request.

Parameters
----------
entries : Iterable[RunBatchArgs]
define the runs to be created.
visibility : VisibilityBatchArgs | None, optional
specify visibility options for these runs, default is None.
folder : str, optional
override folder specification for these runs to be a single folder, default None.
metadata : dict[str, int | str | float | bool], optional
override metadata specification for these runs, default None.

Yields
------
str
identifiers for created runs
"""
_data: list[dict[str, object]] = [
entry.model_dump(exclude_none=True)
| (
{"visibility": visibility.model_dump(exclude_none=True)}
if visibility
else {}
)
| ({"folder": folder} if folder else {})
| {"metadata": (entry.metadata or {}) | (metadata or {})}
for entry in entries
]
for entry in Run(batch=_data, _read_only=False).commit() or []:
_id: str = entry["id"]
yield _id

@property
@staging_check
def name(self) -> str:
Expand Down
22 changes: 22 additions & 0 deletions tests/unit/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import datetime
import uuid
from simvue.api.objects.run import RunBatchArgs
from simvue.sender import sender
from simvue.api.objects import Run, Folder
from simvue.client import Client
Expand Down Expand Up @@ -187,3 +188,24 @@ def test_run_get_properties() -> None:

if _failed:
raise AssertionError("\n" + "\n\t- ".join(": ".join(i) for i in _failed))


@pytest.mark.api
@pytest.mark.online
def test_batch_run_creation() -> None:
_uuid: str = f"{uuid.uuid4()}".split("-")[0]
_folder: Folder = Folder.new(path=f"/simvue_unit_testing/{_uuid}")
_folder.commit()
_runs = [
RunBatchArgs(name=f"batched_run_{i}")
for i in range(10)
]
_counter: int = 0
for i, _id in enumerate(Run.batch_create(entries=_runs, folder=f"/simvue_unit_testing/{_uuid}", metadata={"batch_id": 0})):
_run = Run(identifier=_id)
assert _run.name == f"batched_run_{i}"
assert _run.metadata["batch_id"] == 0
_counter +=1
assert _counter == 10
with contextlib.suppress(Exception):
_folder.delete(recursive=True, delete_runs=True)