Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@
from app.services.tiles.base import load_grids
from app.config.logger import setup_logging
from app.config.settings import settings
from app.routers import jobs_status, unit_jobs, health, tiles, upscale_tasks, sync_jobs
from app.routers import (
jobs_status,
unit_jobs,
health,
tiles,
upscale_tasks,
sync_jobs,
parameters,
)

setup_logging()

Expand Down Expand Up @@ -38,3 +46,4 @@
app.include_router(sync_jobs.router)
app.include_router(upscale_tasks.router)
app.include_router(health.router)
app.include_router(parameters.router)
15 changes: 15 additions & 0 deletions app/platforms/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from abc import ABC, abstractmethod
from typing import List

from fastapi import Response

from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum
from app.schemas.parameters import Parameter
from app.schemas.unit_job import ServiceDetails

from stac_pydantic import Collection
Expand Down Expand Up @@ -83,3 +85,16 @@ async def get_job_results(
:return: STAC collection representing the results.
"""
pass

@abstractmethod
async def get_service_parameters(
self, user_token: str, details: ServiceDetails
) -> List[Parameter]:
"""
Retrieve the parameters required for a specific processing service.

:param user_token: The access token of the user executing the job.
:param details: The service details containing the service ID and application.
:return: Response containing the service parameters.
"""
pass
9 changes: 9 additions & 0 deletions app/platforms/implementations/ogc_api_process.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import List
from fastapi import Response
from app.platforms.base import BaseProcessingPlatform
from app.platforms.dispatcher import register_platform
from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum, ProcessingStatusEnum
from app.schemas.parameters import Parameter
from app.schemas.unit_job import ServiceDetails
from stac_pydantic import Collection

Expand Down Expand Up @@ -46,3 +48,10 @@ async def get_job_results(
raise NotImplementedError(
"OGC API Process job result retrieval not implemented yet."
)

async def get_service_parameters(
self, user_token: str, details: ServiceDetails
) -> List[Parameter]:
raise NotImplementedError(
"OGC API Process service parameter retrieval not implemented yet."
)
45 changes: 45 additions & 0 deletions app/platforms/implementations/openeo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
from typing import List

from fastapi import Response
import jwt
Expand All @@ -13,6 +14,7 @@
from app.platforms.base import BaseProcessingPlatform
from app.platforms.dispatcher import register_platform
from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum, ProcessTypeEnum
from app.schemas.parameters import ParamTypeEnum, Parameter
from app.schemas.unit_job import ServiceDetails

load_dotenv()
Expand Down Expand Up @@ -267,3 +269,46 @@ async def get_job_results(
connection = await self._setup_connection(user_token, details.endpoint)
job = connection.job(job_id)
return Collection(**job.get_results().get_metadata())

async def get_service_parameters(
self, user_token: str, details: ServiceDetails
) -> List[Parameter]:
parameters = []
logger.debug(
f"Fetching service parameters for OpenEO service at {details.application}"
)
udp = requests.get(details.application)
udp.raise_for_status()
udp_params = udp.json().get("parameters", [])

for param in udp_params:
schemas = param.get("schema", {})
if not isinstance(schemas, list):
schemas = [schemas]
parameters.append(
Parameter(
name=param.get("name"),
description=param.get("description"),
default=param.get("default"),
optional=param.get("optional", False),
type=self._get_type_from_schemas(schemas),
)
)

return parameters

def _get_type_from_schemas(self, schemas: List[dict]) -> ParamTypeEnum:
for schema in schemas:
type = schema.get("type")
subtype = schema.get("subtype")
if type == "array" and subtype == "temporal-interval":
return ParamTypeEnum.DATE_INTERVAL
elif subtype == "bounding-box":
return ParamTypeEnum.BOUNDING_BOX
elif type == "boolean":
return ParamTypeEnum.BOOLEAN
elif type == "string":
return ParamTypeEnum.STRING

# If no matching schema found, raise an error
raise ValueError(f"Unsupported parameter schemas: {schemas}")
59 changes: 59 additions & 0 deletions app/routers/parameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import Annotated, List
from fastapi import Body, APIRouter, Depends, HTTPException, status
from loguru import logger

from app.schemas.enum import ProcessTypeEnum
from app.schemas.parameters import ParamRequest, Parameter
from app.schemas.unit_job import (
ServiceDetails,
)
from app.auth import oauth2_scheme
from app.services.processing import retrieve_service_parameters


# from app.auth import get_current_user

router = APIRouter()


@router.post(
"/params",
status_code=status.HTTP_200_OK,
tags=["Unit Jobs"],
summary="Get the parameters of a specific processing service.",
)
async def get_job_params(
payload: Annotated[
ParamRequest,
Body(
openapi_examples={
"openEO Example": {
"summary": "Retrieving the parameters for an openEO-based service",
"description": "The following example demonstrates how to retrieve the "
"parameters for a processing job using an openEO-based service.",
"value": ParamRequest(
label=ProcessTypeEnum.OPENEO,
service=ServiceDetails(
endpoint="https://openeofed.dataspace.copernicus.eu",
application="https://raw.githubusercontent.com/ESA-APEx/apex_algorithms"
"/32ea3c9a6fa24fe063cb59164cd318cceb7209b0/openeo_udp/variabilitymap/"
"variabilitymap.json",
),
).model_dump(),
}
},
),
],
token: str = Depends(oauth2_scheme),
) -> List[Parameter]:
"""Retrieve the parameters required for a specific processing service."""
try:
return await retrieve_service_parameters(token, payload)
except HTTPException as e:
raise e
except Exception as e:
logger.exception(f"Error retrieving service parameters: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"An error occurred while retrieving service parameters: {e}",
)
47 changes: 47 additions & 0 deletions app/schemas/parameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field

from app.schemas.enum import ProcessTypeEnum
from app.schemas.unit_job import ServiceDetails


class ParamTypeEnum(str, Enum):
DATE_INTERVAL = "date-interval"
BOUNDING_BOX = "bounding-box"
BOOLEAN = "boolean"
STRING = "string"


class ParamRequest(BaseModel):
label: ProcessTypeEnum = Field(
...,
description="Label representing the type of the service",
)
service: ServiceDetails = Field(
..., description="Details of the service for which to retrieve the parameters"
)


class Parameter(BaseModel):
name: str = Field(..., description="Name of the parameter", examples=["param1"])
type: ParamTypeEnum = Field(
...,
description="Data type of the parameter",
examples=[ParamTypeEnum.DATE_INTERVAL],
)
optional: bool = Field(
...,
description="Indicates whether the parameter is optional",
examples=[False],
)
description: str = Field(
...,
description="Description of the parameter",
examples=["This parameter specifies the ..."],
)
default: Any = Field(
None,
description="Default value of the parameter, if any",
examples=["default_value"],
)
18 changes: 18 additions & 0 deletions app/services/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from sqlalchemy.orm import Session

from app.schemas.enum import ProcessingStatusEnum
from app.schemas.parameters import ParamRequest, Parameter
from app.schemas.unit_job import (
BaseJobRequest,
ProcessingJob,
Expand Down Expand Up @@ -210,3 +211,20 @@ async def create_synchronous_job(
parameters=request.parameters,
format=request.format,
)


async def retrieve_service_parameters(
user_token: str,
payload: ParamRequest,
) -> List[Parameter]:
logger.info(
f"Retrieving service parameters for service {payload.service.application} at "
f"{payload.service.endpoint}"
)

platform = get_processing_platform(payload.label)

return await platform.get_service_parameters(
user_token=user_token,
details=payload.service,
)
27 changes: 27 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from app.database.models.upscaling_task import UpscalingTaskRecord
from app.main import app
from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum, ProcessTypeEnum
from app.schemas.parameters import ParamRequest, ParamTypeEnum, Parameter
from app.schemas.unit_job import (
BaseJobRequest,
ProcessingJob,
Expand Down Expand Up @@ -198,3 +199,29 @@ def fake_sync_response():
media_type="application/json",
status_code=200,
)


@pytest.fixture
def fake_param_request():
return ParamRequest(
label=ProcessTypeEnum.OPENEO,
service=ServiceDetails(endpoint="foo", application="bar"),
)


@pytest.fixture
def fake_parameter_result():
return [
Parameter(
name="spatial_extent",
type=ParamTypeEnum.BOUNDING_BOX,
optional=True,
description="Spatial extent parameter",
),
Parameter(
name="temporal_extent",
type=ParamTypeEnum.DATE_INTERVAL,
optional=True,
description="Temporal extent parameter",
),
]
5 changes: 4 additions & 1 deletion tests/platforms/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from stac_pydantic import Collection

from tests.conftest import fake_sync_response
from tests.conftest import fake_parameter_result, fake_sync_response


class DummyPlatform(BaseProcessingPlatform):
Expand All @@ -31,6 +31,9 @@ def get_job_status(self, job_id, details):
def get_job_results(self, job_id, details):
return self.fake_result

def get_service_parameters(self, user_token, details):
return fake_parameter_result()


@pytest.fixture(autouse=True)
def clear_registry():
Expand Down
Loading