diff --git a/app/main.py b/app/main.py index 4b5f9a0..99feb7b 100644 --- a/app/main.py +++ b/app/main.py @@ -7,7 +7,15 @@ from app.services.tiles.base import load_grids from app.config.logger import setup_logging from app.config.settings import settings -from app.routers import jobs_status, unit_jobs, health, tiles, upscale_tasks, sync_jobs +from app.routers import ( + jobs_status, + unit_jobs, + health, + tiles, + upscale_tasks, + sync_jobs, + parameters, +) setup_logging() @@ -38,3 +46,4 @@ app.include_router(sync_jobs.router) app.include_router(upscale_tasks.router) app.include_router(health.router) +app.include_router(parameters.router) diff --git a/app/platforms/base.py b/app/platforms/base.py index 01b4209..5e0d664 100644 --- a/app/platforms/base.py +++ b/app/platforms/base.py @@ -1,8 +1,10 @@ from abc import ABC, abstractmethod +from typing import List from fastapi import Response from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum +from app.schemas.parameters import Parameter from app.schemas.unit_job import ServiceDetails from stac_pydantic import Collection @@ -83,3 +85,16 @@ async def get_job_results( :return: STAC collection representing the results. """ pass + + @abstractmethod + async def get_service_parameters( + self, user_token: str, details: ServiceDetails + ) -> List[Parameter]: + """ + Retrieve the parameters required for a specific processing service. + + :param user_token: The access token of the user executing the job. + :param details: The service details containing the service ID and application. + :return: Response containing the service parameters. + """ + pass diff --git a/app/platforms/implementations/ogc_api_process.py b/app/platforms/implementations/ogc_api_process.py index 8414ddc..620a3b5 100644 --- a/app/platforms/implementations/ogc_api_process.py +++ b/app/platforms/implementations/ogc_api_process.py @@ -1,7 +1,9 @@ +from typing import List from fastapi import Response from app.platforms.base import BaseProcessingPlatform from app.platforms.dispatcher import register_platform from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum, ProcessingStatusEnum +from app.schemas.parameters import Parameter from app.schemas.unit_job import ServiceDetails from stac_pydantic import Collection @@ -46,3 +48,10 @@ async def get_job_results( raise NotImplementedError( "OGC API Process job result retrieval not implemented yet." ) + + async def get_service_parameters( + self, user_token: str, details: ServiceDetails + ) -> List[Parameter]: + raise NotImplementedError( + "OGC API Process service parameter retrieval not implemented yet." + ) diff --git a/app/platforms/implementations/openeo.py b/app/platforms/implementations/openeo.py index d8c3d52..0b7c3a6 100644 --- a/app/platforms/implementations/openeo.py +++ b/app/platforms/implementations/openeo.py @@ -1,4 +1,5 @@ import datetime +from typing import List from fastapi import Response import jwt @@ -13,6 +14,7 @@ from app.platforms.base import BaseProcessingPlatform from app.platforms.dispatcher import register_platform from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum, ProcessTypeEnum +from app.schemas.parameters import ParamTypeEnum, Parameter from app.schemas.unit_job import ServiceDetails load_dotenv() @@ -267,3 +269,46 @@ async def get_job_results( connection = await self._setup_connection(user_token, details.endpoint) job = connection.job(job_id) return Collection(**job.get_results().get_metadata()) + + async def get_service_parameters( + self, user_token: str, details: ServiceDetails + ) -> List[Parameter]: + parameters = [] + logger.debug( + f"Fetching service parameters for OpenEO service at {details.application}" + ) + udp = requests.get(details.application) + udp.raise_for_status() + udp_params = udp.json().get("parameters", []) + + for param in udp_params: + schemas = param.get("schema", {}) + if not isinstance(schemas, list): + schemas = [schemas] + parameters.append( + Parameter( + name=param.get("name"), + description=param.get("description"), + default=param.get("default"), + optional=param.get("optional", False), + type=self._get_type_from_schemas(schemas), + ) + ) + + return parameters + + def _get_type_from_schemas(self, schemas: List[dict]) -> ParamTypeEnum: + for schema in schemas: + type = schema.get("type") + subtype = schema.get("subtype") + if type == "array" and subtype == "temporal-interval": + return ParamTypeEnum.DATE_INTERVAL + elif subtype == "bounding-box": + return ParamTypeEnum.BOUNDING_BOX + elif type == "boolean": + return ParamTypeEnum.BOOLEAN + elif type == "string": + return ParamTypeEnum.STRING + + # If no matching schema found, raise an error + raise ValueError(f"Unsupported parameter schemas: {schemas}") diff --git a/app/routers/parameters.py b/app/routers/parameters.py new file mode 100644 index 0000000..e57f60e --- /dev/null +++ b/app/routers/parameters.py @@ -0,0 +1,59 @@ +from typing import Annotated, List +from fastapi import Body, APIRouter, Depends, HTTPException, status +from loguru import logger + +from app.schemas.enum import ProcessTypeEnum +from app.schemas.parameters import ParamRequest, Parameter +from app.schemas.unit_job import ( + ServiceDetails, +) +from app.auth import oauth2_scheme +from app.services.processing import retrieve_service_parameters + + +# from app.auth import get_current_user + +router = APIRouter() + + +@router.post( + "/params", + status_code=status.HTTP_200_OK, + tags=["Unit Jobs"], + summary="Get the parameters of a specific processing service.", +) +async def get_job_params( + payload: Annotated[ + ParamRequest, + Body( + openapi_examples={ + "openEO Example": { + "summary": "Retrieving the parameters for an openEO-based service", + "description": "The following example demonstrates how to retrieve the " + "parameters for a processing job using an openEO-based service.", + "value": ParamRequest( + label=ProcessTypeEnum.OPENEO, + service=ServiceDetails( + endpoint="https://openeofed.dataspace.copernicus.eu", + application="https://raw.githubusercontent.com/ESA-APEx/apex_algorithms" + "/32ea3c9a6fa24fe063cb59164cd318cceb7209b0/openeo_udp/variabilitymap/" + "variabilitymap.json", + ), + ).model_dump(), + } + }, + ), + ], + token: str = Depends(oauth2_scheme), +) -> List[Parameter]: + """Retrieve the parameters required for a specific processing service.""" + try: + return await retrieve_service_parameters(token, payload) + except HTTPException as e: + raise e + except Exception as e: + logger.exception(f"Error retrieving service parameters: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"An error occurred while retrieving service parameters: {e}", + ) diff --git a/app/schemas/parameters.py b/app/schemas/parameters.py new file mode 100644 index 0000000..04e6f46 --- /dev/null +++ b/app/schemas/parameters.py @@ -0,0 +1,47 @@ +from enum import Enum +from typing import Any +from pydantic import BaseModel, Field + +from app.schemas.enum import ProcessTypeEnum +from app.schemas.unit_job import ServiceDetails + + +class ParamTypeEnum(str, Enum): + DATE_INTERVAL = "date-interval" + BOUNDING_BOX = "bounding-box" + BOOLEAN = "boolean" + STRING = "string" + + +class ParamRequest(BaseModel): + label: ProcessTypeEnum = Field( + ..., + description="Label representing the type of the service", + ) + service: ServiceDetails = Field( + ..., description="Details of the service for which to retrieve the parameters" + ) + + +class Parameter(BaseModel): + name: str = Field(..., description="Name of the parameter", examples=["param1"]) + type: ParamTypeEnum = Field( + ..., + description="Data type of the parameter", + examples=[ParamTypeEnum.DATE_INTERVAL], + ) + optional: bool = Field( + ..., + description="Indicates whether the parameter is optional", + examples=[False], + ) + description: str = Field( + ..., + description="Description of the parameter", + examples=["This parameter specifies the ..."], + ) + default: Any = Field( + None, + description="Default value of the parameter, if any", + examples=["default_value"], + ) diff --git a/app/services/processing.py b/app/services/processing.py index 97d6cc8..92b3cf4 100644 --- a/app/services/processing.py +++ b/app/services/processing.py @@ -15,6 +15,7 @@ from sqlalchemy.orm import Session from app.schemas.enum import ProcessingStatusEnum +from app.schemas.parameters import ParamRequest, Parameter from app.schemas.unit_job import ( BaseJobRequest, ProcessingJob, @@ -210,3 +211,20 @@ async def create_synchronous_job( parameters=request.parameters, format=request.format, ) + + +async def retrieve_service_parameters( + user_token: str, + payload: ParamRequest, +) -> List[Parameter]: + logger.info( + f"Retrieving service parameters for service {payload.service.application} at " + f"{payload.service.endpoint}" + ) + + platform = get_processing_platform(payload.label) + + return await platform.get_service_parameters( + user_token=user_token, + details=payload.service, + ) diff --git a/tests/conftest.py b/tests/conftest.py index 01b24b5..94db998 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,6 +13,7 @@ from app.database.models.upscaling_task import UpscalingTaskRecord from app.main import app from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum, ProcessTypeEnum +from app.schemas.parameters import ParamRequest, ParamTypeEnum, Parameter from app.schemas.unit_job import ( BaseJobRequest, ProcessingJob, @@ -198,3 +199,29 @@ def fake_sync_response(): media_type="application/json", status_code=200, ) + + +@pytest.fixture +def fake_param_request(): + return ParamRequest( + label=ProcessTypeEnum.OPENEO, + service=ServiceDetails(endpoint="foo", application="bar"), + ) + + +@pytest.fixture +def fake_parameter_result(): + return [ + Parameter( + name="spatial_extent", + type=ParamTypeEnum.BOUNDING_BOX, + optional=True, + description="Spatial extent parameter", + ), + Parameter( + name="temporal_extent", + type=ParamTypeEnum.DATE_INTERVAL, + optional=True, + description="Temporal extent parameter", + ), + ] diff --git a/tests/platforms/test_dispatcher.py b/tests/platforms/test_dispatcher.py index 1154f0b..e02720e 100644 --- a/tests/platforms/test_dispatcher.py +++ b/tests/platforms/test_dispatcher.py @@ -9,7 +9,7 @@ from stac_pydantic import Collection -from tests.conftest import fake_sync_response +from tests.conftest import fake_parameter_result, fake_sync_response class DummyPlatform(BaseProcessingPlatform): @@ -31,6 +31,9 @@ def get_job_status(self, job_id, details): def get_job_results(self, job_id, details): return self.fake_result + def get_service_parameters(self, user_token, details): + return fake_parameter_result() + @pytest.fixture(autouse=True) def clear_registry(): diff --git a/tests/platforms/test_openeo_platform.py b/tests/platforms/test_openeo_platform.py index 6eb35e2..1447e37 100644 --- a/tests/platforms/test_openeo_platform.py +++ b/tests/platforms/test_openeo_platform.py @@ -13,6 +13,7 @@ OpenEOPlatform, ) from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum +from app.schemas.parameters import ParamTypeEnum, Parameter from app.schemas.unit_job import ServiceDetails from stac_pydantic import Collection @@ -235,6 +236,19 @@ def test_connection_expired_no_bearer(platform): assert platform._connection_expired(conn) is True +@patch("app.platforms.implementations.openeo.jwt.decode") +def test_connection_expired_exception(mock_decode, platform): + mock_decode.side_effect = jwt.DecodeError("Invalid token") + exp = int( + ( + datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1) + ).timestamp() + ) + token = jwt.encode({"sub": "user", "exp": exp}, "secret", algorithm="HS256") + conn = _make_conn_with_token(token) + assert platform._connection_expired(conn) is True + + @pytest.mark.asyncio @patch( "app.platforms.implementations.openeo.exchange_token_for_provider", @@ -298,6 +312,154 @@ async def test_authenticate_user_with_client_credentials( assert returned is conn +@pytest.mark.asyncio +@patch( + "app.platforms.implementations.openeo.exchange_token_for_provider", + new_callable=AsyncMock, +) +async def test_authenticate_user_config_missing_url( + mock_exchange, monkeypatch, platform +): + url = "https://openeo.foo.bar" + + # prepare fake connection and spy method + conn = MagicMock() + conn.authenticate_oidc_client_credentials = MagicMock() + + # ensure the exchange mock exists but is not awaited + with pytest.raises( + ValueError, match="No OpenEO backend configuration found for URL" + ): + await platform._authenticate_user("user-token", url, conn) + + mock_exchange.assert_not_awaited() + + +@pytest.mark.asyncio +@patch( + "app.platforms.implementations.openeo.exchange_token_for_provider", + new_callable=AsyncMock, +) +async def test_authenticate_user_config_unsupported_method( + mock_exchange, monkeypatch, platform +): + url = "https://openeo.vito.be" + # disable user credentials path -> use client credentials + settings.openeo_backend_config[url].auth_method = "FOOBAR" + + # prepare fake connection and spy method + conn = MagicMock() + conn.authenticate_oidc_client_credentials = MagicMock() + + # ensure the exchange mock exists but is not awaited + with pytest.raises( + ValueError, match="Unsupported OpenEO authentication method" + ): + await platform._authenticate_user("user-token", url, conn) + + mock_exchange.assert_not_awaited() + + +@pytest.mark.asyncio +@patch( + "app.platforms.implementations.openeo.exchange_token_for_provider", + new_callable=AsyncMock, +) +async def test_authenticate_user_config_missing_credentials( + mock_exchange, monkeypatch, platform +): + url = "https://openeo.vito.be" + # disable user credentials path -> use client credentials + settings.openeo_backend_config[url].auth_method = ( + OpenEOAuthMethod.CLIENT_CREDENTIALS + ) + settings.openeo_backend_config[url].client_credentials = None + + # prepare fake connection and spy method + conn = MagicMock() + conn.authenticate_oidc_client_credentials = MagicMock() + + # ensure the exchange mock exists but is not awaited + with pytest.raises( + ValueError, match="Client credentials not configured for OpenEO backend" + ): + await platform._authenticate_user("user-token", url, conn) + + mock_exchange.assert_not_awaited() + + +@pytest.mark.asyncio +@patch( + "app.platforms.implementations.openeo.exchange_token_for_provider", + new_callable=AsyncMock, +) +async def test_authenticate_user_config_format_issue_credentials( + mock_exchange, monkeypatch, platform +): + url = "https://openeo.vito.be" + # disable user credentials path -> use client credentials + settings.openeo_backend_config[url].auth_method = ( + OpenEOAuthMethod.CLIENT_CREDENTIALS + ) + settings.openeo_backend_config[url].client_credentials = "foobar" + + # prepare fake connection and spy method + conn = MagicMock() + conn.authenticate_oidc_client_credentials = MagicMock() + + # ensure the exchange mock exists but is not awaited + with pytest.raises(ValueError, match="Invalid client credentials format for"): + await platform._authenticate_user("user-token", url, conn) + + mock_exchange.assert_not_awaited() + + +@pytest.mark.asyncio +@patch( + "app.platforms.implementations.openeo.exchange_token_for_provider", + new_callable=AsyncMock, +) +async def test_authenticate_user_config_missing_provider( + mock_exchange, monkeypatch, platform +): + url = "https://openeo.vito.be" + # disable user credentials path -> use client credentials + settings.openeo_backend_config[url].token_provider = None + + # prepare fake connection and spy method + conn = MagicMock() + conn.authenticate_oidc_client_credentials = MagicMock() + + # ensure the exchange mock exists but is not awaited + with pytest.raises(ValueError, match="must define"): + await platform._authenticate_user("user-token", url, conn) + + mock_exchange.assert_not_awaited() + + +@pytest.mark.asyncio +@patch( + "app.platforms.implementations.openeo.exchange_token_for_provider", + new_callable=AsyncMock, +) +async def test_authenticate_user_config_missing_prefix( + mock_exchange, monkeypatch, platform +): + url = "https://openeo.vito.be" + # disable user credentials path -> use client credentials + settings.openeo_backend_config[url].token_prefix = None + + # prepare fake connection and spy method + conn = MagicMock() + conn.authenticate_oidc_client_credentials = MagicMock() + + # ensure the exchange mock exists but is not awaited + with pytest.raises(ValueError, match="must define"): + await platform._authenticate_user("user-token", url, conn) + + mock_exchange.assert_not_awaited() + + @pytest.mark.asyncio @patch("app.platforms.implementations.openeo.openeo.connect") @patch.object(OpenEOPlatform, "_authenticate_user", new_callable=AsyncMock) @@ -405,3 +567,99 @@ async def test_execute_sync_job_success( assert response.status_code == mock_response.status_code assert json.loads(response.body) == json.loads(mock_response.content) mock_connect.assert_called_once_with("fake_token", service_details.endpoint) + + +@pytest.mark.asyncio +@patch("app.platforms.implementations.openeo.requests.get") +async def test_get_parameters_success(mock_udp_request, platform): + + udp_params = [ + { + "name": "flag_test", + "description": "Test for a boolean flag parameter", + "schema": {"type": "boolean"}, + }, + { + "name": "bbox_test", + "description": "Test for a bbox parameter", + "schema": {"type": "object", "subtype": "bounding-box"}, + }, + { + "name": "date_test", + "description": "Test for a date parameter", + "schema": {"type": "array", "subtype": "temporal-interval"}, + "optional": True, + "default": ["2020-01-01", "2020-12-31"], + }, + { + "name": "string_test", + "description": "Test for a string parameter", + "schema": {"type": "string"}, + }, + ] + mock_udp_request.return_value.json.return_value = { + "id": "process123", + "parameters": udp_params, + } + mock_udp_request.return_value.raise_for_status.return_value = None + result = await platform.get_service_parameters( + user_token="fake_token", + details=ServiceDetails( + endpoint="https://openeo.dataspace.copernicus.eu", + application="https://foo.bar/process.json", + ), + ) + parameters = [ + Parameter( + name=udp_params[0]["name"], + description=udp_params[0]["description"], + type=ParamTypeEnum.BOOLEAN, + optional=False, + ), + Parameter( + name=udp_params[1]["name"], + description=udp_params[1]["description"], + type=ParamTypeEnum.BOUNDING_BOX, + optional=False, + ), + Parameter( + name=udp_params[2]["name"], + description=udp_params[2]["description"], + type=ParamTypeEnum.DATE_INTERVAL, + optional=True, + default=udp_params[2]["default"], + ), + Parameter( + name=udp_params[3]["name"], + description=udp_params[3]["description"], + type=ParamTypeEnum.STRING, + optional=False, + ), + ] + assert result == parameters + + +@pytest.mark.asyncio +@patch("app.platforms.implementations.openeo.requests.get") +async def test_get_parameters_unsupported_type(mock_udp_request, platform): + + mock_udp_request.return_value.json.return_value = { + "id": "process123", + "parameters": [ + { + "name": "foobar_test", + "description": "Test for a foobar parameter", + "schema": {"type": "foobar"}, + } + ], + } + mock_udp_request.return_value.raise_for_status.return_value = None + + with pytest.raises(ValueError, match="Unsupported parameter schemas"): + await platform.get_service_parameters( + user_token="fake_token", + details=ServiceDetails( + endpoint="https://openeo.dataspace.copernicus.eu", + application="https://foo.bar/process.json", + ), + ) diff --git a/tests/services/test_processing.py b/tests/services/test_processing.py index 731aa99..dda49b0 100644 --- a/tests/services/test_processing.py +++ b/tests/services/test_processing.py @@ -18,6 +18,7 @@ get_job_status, get_processing_job_by_user_id, get_processing_jobs_by_user_id, + retrieve_service_parameters, ) @@ -534,3 +535,23 @@ async def test_create_sync_job_calls_platform_execute_failure( parameters=fake_processing_job_request.parameters, format=fake_processing_job_request.format, ) + + +@pytest.mark.asyncio +@patch("app.services.processing.get_processing_platform") +async def test_retrieve_service_parameters_success( + mock_get_platform, fake_parameter_result, fake_param_request +): + + fake_platform = MagicMock() + fake_platform.get_service_parameters = AsyncMock(return_value=fake_parameter_result) + mock_get_platform.return_value = fake_platform + + result = await retrieve_service_parameters("foobar-token", fake_param_request) + + mock_get_platform.assert_called_once_with(fake_param_request.label) + fake_platform.get_service_parameters.assert_called_once_with( + user_token="foobar-token", + details=fake_param_request.service, + ) + assert result == fake_parameter_result