diff --git a/app/auth.py b/app/auth.py
index b09db15..d27d605 100644
--- a/app/auth.py
+++ b/app/auth.py
@@ -1,11 +1,14 @@
from typing import Any, Dict
import httpx
import jwt
-from fastapi import Depends, HTTPException, WebSocket, status
+from fastapi import Depends, WebSocket, status
from fastapi.security import OAuth2AuthorizationCodeBearer
from jwt import PyJWKClient
from loguru import logger
+from app.error import AuthException, DispatcherException
+from app.schemas.websockets import WSStatusMessage
+
from .config.settings import settings
# Keycloak OIDC info
@@ -37,9 +40,9 @@ def _decode_token(token: str):
)
return payload
except Exception:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
+ raise AuthException(
+ http_status=status.HTTP_401_UNAUTHORIZED,
+ message="Could not validate credentials. Please retry signing in.",
)
@@ -55,6 +58,7 @@ async def websocket_authenticate(websocket: WebSocket) -> str | None:
"""
logger.debug("Authenticating websocket")
token = websocket.query_params.get("token")
+
if not token:
logger.error("Token is missing from websocket authentication")
await websocket.close(code=1008, reason="Missing token")
@@ -63,9 +67,22 @@ async def websocket_authenticate(websocket: WebSocket) -> str | None:
try:
await websocket.accept()
return token
+ except DispatcherException as ae:
+ logger.error(f"Dispatcher exception detected: {ae.message}")
+ await websocket.send_json(
+ WSStatusMessage(type="error", message=ae.message).model_dump()
+ )
+ await websocket.close(code=1008, reason=ae.error_code)
+ return None
except Exception as e:
- logger.error(f"Invalid token in websocket authentication: {e}")
- await websocket.close(code=1008, reason="Invalid token")
+ logger.error(f"Unexpected error occurred during websocket authentication: {e}")
+ await websocket.send_json(
+ WSStatusMessage(
+ type="error",
+ message="Something went wrong during authentication. Please try again.",
+ ).model_dump()
+ )
+ await websocket.close(code=1008, reason="INTERNAL_ERROR")
return None
@@ -81,15 +98,15 @@ async def exchange_token_for_provider(
:return: The token response (dict) on success.
- :raise: Raises HTTPException with an appropriate status and message on error.
+ :raise: Raises AuthException with an appropriate status and message on error.
"""
token_url = f"{KEYCLOAK_BASE_URL}/protocol/openid-connect/token"
# Check if the necessary settings are in place
if not settings.keycloak_client_id or not settings.keycloak_client_secret:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="Token exchange not configured on the server (missing client credentials).",
+ raise AuthException(
+ http_status=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ message="Token exchange not configured on the server (missing client credentials).",
)
payload = {
@@ -105,9 +122,12 @@ async def exchange_token_for_provider(
resp = await client.post(token_url, data=payload)
except httpx.RequestError as exc:
logger.error(f"Token exchange network error for provider={provider}: {exc}")
- raise HTTPException(
- status_code=status.HTTP_502_BAD_GATEWAY,
- detail="Failed to contact the identity provider for token exchange.",
+ raise AuthException(
+ http_status=status.HTTP_502_BAD_GATEWAY,
+ message=(
+ f"Could not authenticate with {provider}. Please contact APEx support or reach out "
+ "through the APEx User Forum."
+ ),
)
# Parse response
@@ -117,9 +137,12 @@ async def exchange_token_for_provider(
logger.error(
f"Token exchange invalid JSON response (status={resp.status_code})"
)
- raise HTTPException(
- status_code=status.HTTP_502_BAD_GATEWAY,
- detail="Invalid response from identity provider during token exchange.",
+ raise AuthException(
+ http_status=status.HTTP_502_BAD_GATEWAY,
+ message=(
+ f"Could not authenticate with {provider}. Please contact APEx support or reach out "
+ "through the APEx User Forum."
+ ),
)
if resp.status_code != 200:
@@ -136,7 +159,16 @@ async def exchange_token_for_provider(
else status.HTTP_502_BAD_GATEWAY
)
- raise HTTPException(client_status, detail=body)
+ raise AuthException(
+ http_status=client_status,
+ message=(
+ f"Please link your account with {provider} in your "
+ "Account Dashboard"
+ if body.get("error", "") == "not_linked"
+ else f"Could not authenticate with {provider}: {err}"
+ ),
+ )
# Successful exchange, return token response (access_token, expires_in, etc.)
return body
diff --git a/app/database/db.py b/app/database/db.py
index e7cfd0a..1ebf87f 100644
--- a/app/database/db.py
+++ b/app/database/db.py
@@ -35,7 +35,7 @@ def get_db():
yield db
db.commit()
except Exception:
- logger.exception("An error occurred during database retrieval")
+ logger.error("An error occurred during database retrieval")
db.rollback()
raise
finally:
diff --git a/app/error.py b/app/error.py
new file mode 100644
index 0000000..896b1af
--- /dev/null
+++ b/app/error.py
@@ -0,0 +1,68 @@
+from typing import Any, Dict, Optional
+from fastapi import status
+from pydantic import BaseModel
+
+
+class ErrorResponse(BaseModel):
+ status: str = "error"
+ error_code: str
+ message: str
+ details: Optional[Dict[str, Any]] = None
+ request_id: Optional[str] = None
+
+
+class DispatcherException(Exception):
+ """
+ Base domain exception for the APEx Dispatch API.
+ """
+
+ http_status: int = status.HTTP_400_BAD_REQUEST
+ error_code: str = "APEX_ERROR"
+ message: str = "An error occurred."
+ details: Optional[Dict[str, Any]] = None
+
+ def __init__(
+ self,
+ message: Optional[str] = None,
+ error_code: Optional[str] = None,
+ http_status: Optional[int] = None,
+ details: Optional[Dict[str, Any]] = None,
+ ):
+ if message:
+ self.message = message
+ if error_code:
+ self.error_code = error_code
+ if http_status:
+ self.http_status = http_status
+ if details:
+ self.details = details
+
+ def __str__(self):
+ return f"{self.error_code}: {self.message}"
+
+
+class AuthException(DispatcherException):
+ def __init__(
+ self,
+ http_status: Optional[int] = status.HTTP_401_UNAUTHORIZED,
+ message: Optional[str] = "Authentication failed.",
+ ):
+ super().__init__(message, "AUTHENTICATION_FAILED", http_status)
+
+
+class JobNotFoundException(DispatcherException):
+ http_status: int = status.HTTP_404_NOT_FOUND
+ error_code: str = "JOB_NOT_FOUND"
+ message: str = "The requested job was not found."
+
+
+class TaskNotFoundException(DispatcherException):
+ http_status: int = status.HTTP_404_NOT_FOUND
+ error_code: str = "TASK_NOT_FOUND"
+ message: str = "The requested task was not found."
+
+
+class InternalException(DispatcherException):
+ http_status: int = status.HTTP_500_INTERNAL_SERVER_ERROR
+ error_code: str = "INTERNAL_ERROR"
+ message: str = "An internal server error occurred."
diff --git a/app/main.py b/app/main.py
index ca2ca98..4b5f9a0 100644
--- a/app/main.py
+++ b/app/main.py
@@ -2,6 +2,7 @@
from fastapi.middleware.cors import CORSMiddleware
from app.middleware.correlation_id import add_correlation_id
+from app.middleware.error_handling import register_exception_handlers
from app.platforms.dispatcher import load_processing_platforms
from app.services.tiles.base import load_grids
from app.config.logger import setup_logging
@@ -28,6 +29,7 @@
)
app.middleware("http")(add_correlation_id)
+register_exception_handlers(app)
# include routers
app.include_router(tiles.router)
diff --git a/app/middleware/error_handling.py b/app/middleware/error_handling.py
new file mode 100644
index 0000000..0798e67
--- /dev/null
+++ b/app/middleware/error_handling.py
@@ -0,0 +1,74 @@
+from typing import Any
+from fastapi import Request, status
+from fastapi.exceptions import RequestValidationError
+from fastapi.responses import JSONResponse
+from app.error import DispatcherException, ErrorResponse
+from app.middleware.correlation_id import correlation_id_ctx
+from loguru import logger
+
+
+def get_dispatcher_error_response(
+ exc: DispatcherException, request_id: str
+) -> ErrorResponse:
+ return ErrorResponse(
+ error_code=exc.error_code,
+ message=exc.message,
+ details=exc.details,
+ request_id=request_id,
+ )
+
+
+async def dispatch_exception_handler(request: Request, exc: DispatcherException):
+
+ content = get_dispatcher_error_response(exc, correlation_id_ctx.get())
+ logger.exception(f"DispatcherException raised: {exc.message}")
+ return JSONResponse(status_code=exc.http_status, content=content.dict())
+
+
+async def generic_exception_handler(request: Request, exc: Exception):
+
+ # DO NOT expose internal exceptions to the client
+ content = ErrorResponse(
+ error_code="INTERNAL_SERVER_ERROR",
+ message="An unexpected error occurred.",
+ details=None,
+ request_id=correlation_id_ctx.get(),
+ )
+
+ logger.exception(f"GenericException raised: {exc}")
+ return JSONResponse(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=content.dict()
+ )
+
+
+def _parse_validation_error(err: Any):
+ if "ctx" in err:
+ del err["ctx"]
+ return err
+
+
+async def validation_exception_handler(request: Request, exc: RequestValidationError):
+
+ logger.error(f"Request validation error: {exc.__class__.__name__}: {exc}")
+ content = ErrorResponse(
+ error_code="VALIDATION_ERROR",
+ message="Request validation failed.",
+ details={"errors": [_parse_validation_error(error) for error in exc.errors()]},
+ request_id=correlation_id_ctx.get(),
+ )
+
+ logger.error(content.dict())
+
+ return JSONResponse(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=content.dict()
+ )
+
+
+def register_exception_handlers(app):
+ """
+ Call this in main.py after creating the FastAPI() instance.
+ """
+
+ app.add_exception_handler(DispatcherException, dispatch_exception_handler)
+ app.add_exception_handler(RequestValidationError, validation_exception_handler)
+ app.add_exception_handler(Exception, generic_exception_handler)
diff --git a/app/routers/jobs_status.py b/app/routers/jobs_status.py
index 16fcd3f..5f24de1 100644
--- a/app/routers/jobs_status.py
+++ b/app/routers/jobs_status.py
@@ -7,6 +7,8 @@
from loguru import logger
from app.database.db import SessionLocal, get_db
+from app.error import DispatcherException, ErrorResponse, InternalException
+from app.middleware.error_handling import get_dispatcher_error_response
from app.schemas.jobs_status import JobsFilter, JobsStatusResponse
from app.schemas.websockets import WSStatusMessage
from app.services.processing import get_processing_jobs_by_user_id
@@ -22,6 +24,19 @@
"/jobs_status",
tags=["Upscale Tasks", "Unit Jobs"],
summary="Get a list of all upscaling tasks & processing jobs for the authenticated user",
+ responses={
+ InternalException.http_status: {
+ "description": "Internal server error",
+ "model": ErrorResponse,
+ "content": {
+ "application/json": {
+ "example": get_dispatcher_error_response(
+ InternalException(), "request-id"
+ )
+ }
+ },
+ },
+ },
)
async def get_jobs_status(
db: Session = Depends(get_db),
@@ -34,21 +49,29 @@ async def get_jobs_status(
"""
Return combined list of upscaling tasks and processing jobs for the authenticated user.
"""
- logger.debug("Fetching jobs list")
- upscaling_tasks = (
- await get_upscaling_tasks_by_user_id(token, db)
- if JobsFilter.upscaling in filter
- else []
- )
- processing_jobs = (
- await get_processing_jobs_by_user_id(token, db)
- if JobsFilter.processing in filter
- else []
- )
- return JobsStatusResponse(
- upscaling_tasks=upscaling_tasks,
- processing_jobs=processing_jobs,
- )
+ try:
+ logger.debug("Fetching jobs list")
+ upscaling_tasks = (
+ await get_upscaling_tasks_by_user_id(token, db)
+ if JobsFilter.upscaling in filter
+ else []
+ )
+ processing_jobs = (
+ await get_processing_jobs_by_user_id(token, db)
+ if JobsFilter.processing in filter
+ else []
+ )
+ return JobsStatusResponse(
+ upscaling_tasks=upscaling_tasks,
+ processing_jobs=processing_jobs,
+ )
+ except DispatcherException as de:
+ raise de
+ except Exception as e:
+ logger.error(f"Error retrieving job status: {e}")
+ raise InternalException(
+ message="An error occurred while retrieving the job status."
+ )
@router.websocket(
@@ -91,8 +114,20 @@ async def ws_jobs_status(
except WebSocketDisconnect:
logger.info("WebSocket disconnected")
+ except DispatcherException as ae:
+ logger.error(f"Dispatcher exception detected: {ae.message}")
+ await websocket.send_json(
+ WSStatusMessage(type="error", message=ae.message).model_dump()
+ )
+ await websocket.close(code=1011, reason=ae.error_code)
except Exception as e:
- logger.exception(f"Error in jobs_status_ws: {e}")
- await websocket.close(code=1011, reason="Error in job status websocket: {e}")
+ logger.error(f"Unexpected error occurred during websocket : {e}")
+ await websocket.send_json(
+ WSStatusMessage(
+ type="error",
+ message="An error occurred while monitoring the job status.",
+ ).model_dump()
+ )
+ await websocket.close(code=1011, reason="INTERNAL_ERROR")
finally:
db.close()
diff --git a/app/routers/sync_jobs.py b/app/routers/sync_jobs.py
index 7a83b9c..aba1850 100644
--- a/app/routers/sync_jobs.py
+++ b/app/routers/sync_jobs.py
@@ -1,7 +1,9 @@
from typing import Annotated
-from fastapi import Body, APIRouter, Depends, HTTPException, Response, status
+from fastapi import Body, APIRouter, Depends, Response, status
from loguru import logger
+from app.error import DispatcherException, ErrorResponse, InternalException
+from app.middleware.error_handling import get_dispatcher_error_response
from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum
from app.schemas.unit_job import (
BaseJobRequest,
@@ -23,6 +25,19 @@
status_code=status.HTTP_201_CREATED,
tags=["Unit Jobs"],
summary="Create a new processing job",
+ responses={
+ InternalException.http_status: {
+ "description": "Internal server error",
+ "model": ErrorResponse,
+ "content": {
+ "application/json": {
+ "example": get_dispatcher_error_response(
+ InternalException(), "request-id"
+ )
+ }
+ },
+ },
+ },
)
async def create_sync_job(
payload: Annotated[
@@ -97,11 +112,10 @@ async def create_sync_job(
"""Initiate a synchronous processing job with the provided data and return the result."""
try:
return await create_synchronous_job(token, payload)
- except HTTPException as e:
- raise e
+ except DispatcherException as de:
+ raise de
except Exception as e:
- logger.exception(f"Error creating synchronous job: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"An error occurred while creating the synchronous job: {e}",
+ logger.error(f"Error creating synchronous job: {e}")
+ raise InternalException(
+ message="An error occurred while creating the synchronous job."
)
diff --git a/app/routers/tiles.py b/app/routers/tiles.py
index 262d836..57fa3b3 100644
--- a/app/routers/tiles.py
+++ b/app/routers/tiles.py
@@ -1,8 +1,10 @@
from typing import Annotated
-from fastapi import APIRouter, HTTPException, status, Body
+from fastapi import APIRouter, status, Body
from geojson_pydantic import GeometryCollection, Polygon
from loguru import logger
+from app.error import DispatcherException, ErrorResponse, InternalException
+from app.middleware.error_handling import get_dispatcher_error_response
from app.schemas.tiles import GridTypeEnum, TileRequest
from app.services.tiles.base import split_polygon_by_grid
@@ -54,7 +56,18 @@
}
}
},
- }
+ },
+ InternalException.http_status: {
+ "description": "Internal server error",
+ "model": ErrorResponse,
+ "content": {
+ "application/json": {
+ "example": get_dispatcher_error_response(
+ InternalException(), "request-id"
+ )
+ }
+ },
+ },
},
)
def split_in_tiles(
@@ -90,11 +103,12 @@ def split_in_tiles(
try:
logger.debug(f"Splitting tiles in a {payload.grid} formation")
return split_polygon_by_grid(payload.aoi, payload.grid)
+ except DispatcherException as de:
+ raise de
except Exception as e:
- logger.exception(
- f"An error occurred while calculating tiles for {payload.grid}"
+ logger.error(
+ f"An error occurred while calculating tiles for {payload.grid}: {e}"
)
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"An error occurred while calculating tiles for {payload.grid}: {e}",
+ raise InternalException(
+ message=f"An error occurred while calculating tiles for {payload.grid}"
)
diff --git a/app/routers/unit_jobs.py b/app/routers/unit_jobs.py
index a20019c..d70102b 100644
--- a/app/routers/unit_jobs.py
+++ b/app/routers/unit_jobs.py
@@ -1,10 +1,17 @@
from typing import Annotated
-from fastapi import Body, APIRouter, Depends, HTTPException, status
+from fastapi import Body, APIRouter, Depends, status
from loguru import logger
from sqlalchemy.orm import Session
from app.auth import oauth2_scheme
from app.database.db import get_db
+from app.error import (
+ DispatcherException,
+ ErrorResponse,
+ InternalException,
+ JobNotFoundException,
+)
+from app.middleware.error_handling import get_dispatcher_error_response
from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum
from app.schemas.unit_job import (
BaseJobRequest,
@@ -30,6 +37,19 @@
status_code=status.HTTP_201_CREATED,
tags=["Unit Jobs"],
summary="Create a new processing job",
+ responses={
+ InternalException.http_status: {
+ "description": "Internal server error",
+ "model": ErrorResponse,
+ "content": {
+ "application/json": {
+ "example": get_dispatcher_error_response(
+ InternalException(), "request-id"
+ )
+ }
+ },
+ },
+ },
)
async def create_unit_job(
payload: Annotated[
@@ -105,20 +125,42 @@ async def create_unit_job(
"""Create a new processing job with the provided data."""
try:
return await create_processing_job(token, db, payload)
- except HTTPException as e:
- raise e
+ except DispatcherException as de:
+ raise de
except Exception as e:
- logger.exception(f"Error creating processing job: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"An error occurred while creating the processing job: {e}",
+ logger.error(f"Error creating processing job: {e}")
+ raise InternalException(
+ message="An error occurred while creating processing job."
)
@router.get(
"/unit_jobs/{job_id}",
tags=["Unit Jobs"],
- responses={404: {"description": "Processing job not found"}},
+ responses={
+ JobNotFoundException.http_status: {
+ "description": "Job not found",
+ "model": ErrorResponse,
+ "content": {
+ "application/json": {
+ "example": get_dispatcher_error_response(
+ JobNotFoundException(), "request-id"
+ )
+ }
+ },
+ },
+ InternalException.http_status: {
+ "description": "Internal server error",
+ "model": ErrorResponse,
+ "content": {
+ "application/json": {
+ "example": get_dispatcher_error_response(
+ InternalException(), "request-id"
+ )
+ }
+ },
+ },
+ },
)
async def get_job(
job_id: int, db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
@@ -126,26 +168,44 @@ async def get_job(
try:
job = await get_processing_job_by_user_id(token, db, job_id)
if not job:
- logger.error(f"Processing job {job_id} not found")
- raise HTTPException(
- status_code=404,
- detail=f"Processing job {job_id} not found",
- )
+ raise JobNotFoundException()
return job
- except HTTPException as e:
- raise e
+ except DispatcherException as de:
+ raise de
except Exception as e:
- logger.exception(f"Error retrieving processing job {job_id}: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"An error occurred while retrieving processing job {job_id}: {e}",
+ logger.error(f"Error retrieving processing job {job_id}: {e}")
+ raise InternalException(
+ message="An error occurred while retrieving the processing job."
)
@router.get(
"/unit_jobs/{job_id}/results",
tags=["Unit Jobs"],
- responses={404: {"description": "Processing job not found"}},
+ responses={
+ JobNotFoundException.http_status: {
+ "description": "Job not found",
+ "model": ErrorResponse,
+ "content": {
+ "application/json": {
+ "example": get_dispatcher_error_response(
+ JobNotFoundException(), "request-id"
+ )
+ }
+ },
+ },
+ InternalException.http_status: {
+ "description": "Internal server error",
+ "model": ErrorResponse,
+ "content": {
+ "application/json": {
+ "example": get_dispatcher_error_response(
+ InternalException(), "request-id"
+ )
+ }
+ },
+ },
+ },
)
async def get_job_results(
job_id: int, db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
@@ -153,17 +213,12 @@ async def get_job_results(
try:
result = await get_processing_job_results(token, db, job_id)
if not result:
- logger.error(f"Result for processing job {job_id} not found")
- raise HTTPException(
- status_code=404,
- detail=f"Result for processing job {job_id} not found",
- )
+ raise JobNotFoundException()
return result
- except HTTPException as e:
- raise e
+ except DispatcherException as de:
+ raise de
except Exception as e:
- logger.exception(f"Error getting results for processing job {job_id}: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"An error occurred while retrieving results for processing job {job_id}: {e}",
+ logger.error(f"Error getting results for processing job {job_id}: {e}")
+ raise InternalException(
+ message="An error occurred while retrieving processing job results."
)
diff --git a/app/routers/upscale_tasks.py b/app/routers/upscale_tasks.py
index 4c2d61a..b88028d 100644
--- a/app/routers/upscale_tasks.py
+++ b/app/routers/upscale_tasks.py
@@ -6,7 +6,6 @@
Body,
APIRouter,
Depends,
- HTTPException,
WebSocket,
WebSocketDisconnect,
status,
@@ -16,6 +15,13 @@
from app.auth import oauth2_scheme, websocket_authenticate
from app.database.db import SessionLocal, get_db
+from app.error import (
+ DispatcherException,
+ ErrorResponse,
+ InternalException,
+ TaskNotFoundException,
+)
+from app.middleware.error_handling import get_dispatcher_error_response
from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum
from app.schemas.unit_job import (
ServiceDetails,
@@ -43,6 +49,19 @@
status_code=status.HTTP_201_CREATED,
tags=["Upscale Tasks"],
summary="Create a new upscaling task",
+ responses={
+ InternalException.http_status: {
+ "description": "Internal server error",
+ "model": ErrorResponse,
+ "content": {
+ "application/json": {
+ "example": get_dispatcher_error_response(
+ InternalException(), "request-id"
+ )
+ }
+ },
+ },
+ },
)
async def create_upscale_task(
payload: Annotated[
@@ -120,20 +139,42 @@ async def create_upscale_task(
upscaling_task_id=task.id,
)
return task
- except HTTPException as e:
- raise e
+ except DispatcherException as de:
+ raise de
except Exception as e:
- logger.exception(f"Error creating upscale task: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"An error occurred while creating the upscale task: {e}",
+ logger.error(f"Error getting creating upscaling task: {e}")
+ raise InternalException(
+ message="An error occurred while retrieving processing job results."
)
@router.get(
"/upscale_tasks/{task_id}",
tags=["Upscale Tasks"],
- responses={404: {"description": "Upscale task not found"}},
+ responses={
+ TaskNotFoundException.http_status: {
+ "description": "Upscaling not found",
+ "model": ErrorResponse,
+ "content": {
+ "application/json": {
+ "example": get_dispatcher_error_response(
+ TaskNotFoundException(), "request-id"
+ )
+ }
+ },
+ },
+ InternalException.http_status: {
+ "description": "Internal server error",
+ "model": ErrorResponse,
+ "content": {
+ "application/json": {
+ "example": get_dispatcher_error_response(
+ InternalException(), "request-id"
+ )
+ }
+ },
+ },
+ },
)
async def get_upscale_task(
task_id: int,
@@ -144,24 +185,18 @@ async def get_upscale_task(
job = await get_upscaling_task_by_user_id(token, db, task_id)
if not job:
logger.error(f"Upscale task {task_id} not found")
- raise HTTPException(
- status_code=404,
- detail=f"Upscale task {task_id} not found",
- )
+ raise TaskNotFoundException()
return job
- except HTTPException as e:
- raise e
+ except DispatcherException as de:
+ raise de
except Exception as e:
- logger.exception(f"Error retrieving upscale task {task_id}: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"An error occurred while retrieving the upscale task {task_id}: {e}",
+ logger.error(f"Error retrieving upscale task {task_id}: {e}")
+ raise InternalException(
+ message="An error occurred while retrieving the upscale task."
)
-@router.websocket(
- "/ws/upscale_tasks/{task_id}",
-)
+@router.websocket("/ws/upscale_tasks/{task_id}")
async def ws_task_status(
websocket: WebSocket,
task_id: int,
@@ -212,6 +247,25 @@ async def ws_task_status(
except WebSocketDisconnect:
logger.info("WebSocket disconnected")
+ except DispatcherException as ae:
+ logger.error(f"Dispatcher exception detected: {ae.message}")
+ await websocket.send_json(
+ WSTaskStatusMessage(
+ type="error", task_id=task_id, message=ae.message
+ ).model_dump()
+ )
+ await websocket.close(code=1011, reason=ae.error_code)
except Exception as e:
- logger.exception(f"Error in upscaling task status websocket: {e}")
- await websocket.close(code=1011, reason=f"Error in job status websocket: {e}")
+ logger.error(
+ f"An error occurred while monitoring upscaling task {task_id}: {e}"
+ )
+ await websocket.send_json(
+ WSTaskStatusMessage(
+ type="error",
+ task_id=task_id,
+ message="An error occurred while monitoring upscaling task.",
+ ).model_dump()
+ )
+ await websocket.close(code=1011, reason="INTERNAL_ERROR")
+ finally:
+ db.close()
diff --git a/tests/routers/test_job_status.py b/tests/routers/test_job_status.py
index d0f22c7..22f7b8c 100644
--- a/tests/routers/test_job_status.py
+++ b/tests/routers/test_job_status.py
@@ -103,5 +103,6 @@ async def test_ws_jobs_status_closes_on_error(mock_get_jobs_status, client):
websocket.receive_json()
websocket.receive_json()
websocket.receive_json()
+ websocket.receive_json()
assert exc_info.value.code == 1011
diff --git a/tests/routers/test_sync_jobs.py b/tests/routers/test_sync_jobs.py
index 8575ede..e2ee119 100644
--- a/tests/routers/test_sync_jobs.py
+++ b/tests/routers/test_sync_jobs.py
@@ -1,7 +1,9 @@
import json
from unittest.mock import patch
-from fastapi import HTTPException
+from fastapi import status
+
+from app.error import InternalException
@patch("app.routers.sync_jobs.create_synchronous_job")
@@ -27,21 +29,19 @@ def test_sync_jobs_create_500(
mock_create_sync_job.side_effect = SystemError("Could not launch the job")
r = client.post("/sync_jobs", json=fake_processing_job_request.model_dump())
- assert r.status_code == 500
- assert "could not launch the job" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert "An error occurred while creating the synchronous job." in r.json().get("message", "")
@patch("app.routers.sync_jobs.create_synchronous_job")
-def test_sync_jobs_create_http_error(
+def test_sync_jobs_create_internal_error(
mock_create_sync_job,
client,
fake_processing_job_request,
):
- mock_create_sync_job.side_effect = HTTPException(
- status_code=503, detail="Oops, service unavailable"
- )
+ mock_create_sync_job.side_effect = InternalException()
r = client.post("/sync_jobs", json=fake_processing_job_request.model_dump())
- assert r.status_code == 503
- assert "service unavailable" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert "An internal server error occurred." in r.json().get("message", "")
diff --git a/tests/routers/test_tiles.py b/tests/routers/test_tiles.py
index d7af1c2..037ffdf 100644
--- a/tests/routers/test_tiles.py
+++ b/tests/routers/test_tiles.py
@@ -1,6 +1,7 @@
from unittest.mock import patch
import pytest
from app.schemas.tiles import GridTypeEnum
+from fastapi import status
@pytest.fixture
@@ -28,5 +29,5 @@ def test_split_in_tiles_success(client, dummy_payload):
def test_split_in_tiles_unknown_grid(mock_split, client, dummy_payload):
mock_split.side_effect = ValueError("Unknown grid: INVALID_GRID")
response = client.post("/tiles", json=dummy_payload)
- assert response.status_code == 500
- assert "Unknown grid" in response.json()["detail"]
+ assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert "An error occurred while calculating tiles for 20x20km" in response.json()["message"]
diff --git a/tests/routers/test_unit_jobs.py b/tests/routers/test_unit_jobs.py
index e001d49..de4a585 100644
--- a/tests/routers/test_unit_jobs.py
+++ b/tests/routers/test_unit_jobs.py
@@ -1,7 +1,9 @@
import json
from unittest.mock import patch
-from fastapi import HTTPException
+from fastapi import status
+
+from app.error import InternalException
@patch("app.routers.unit_jobs.create_processing_job")
@@ -29,24 +31,24 @@ def test_unit_jobs_create_500(
mock_create_processing_job.side_effect = SystemError("Could not launch the job")
r = client.post("/unit_jobs", json=fake_processing_job_request.model_dump())
- assert r.status_code == 500
- assert "could not launch the job" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert "An error occurred while creating processing job." in r.json().get(
+ "message", ""
+ )
@patch("app.routers.unit_jobs.create_processing_job")
-def test_unit_jobs_create_http_error(
+def test_unit_jobs_create_internal_error(
mock_create_processing_job,
client,
fake_processing_job_request,
):
- mock_create_processing_job.side_effect = HTTPException(
- status_code=503, detail="Oops, service unavailable"
- )
+ mock_create_processing_job.side_effect = InternalException
r = client.post("/unit_jobs", json=fake_processing_job_request.model_dump())
- assert r.status_code == 503
- assert "service unavailable" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert "An internal server error occurred." in r.json().get("message", "")
@patch("app.routers.unit_jobs.get_processing_job_by_user_id")
@@ -71,8 +73,8 @@ def test_unit_jobs_get_job_404(mock_get_processing_job, client):
mock_get_processing_job.return_value = None
r = client.get("/unit_jobs/1")
- assert r.status_code == 404
- assert "processing job 1 not found" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_404_NOT_FOUND
+ assert "The requested job was not found." in r.json().get("message", "")
@patch("app.routers.unit_jobs.get_processing_job_by_user_id")
@@ -81,20 +83,20 @@ def test_unit_jobs_get_job_500(mock_get_processing_job, client):
mock_get_processing_job.side_effect = RuntimeError("Database connection lost")
r = client.get("/unit_jobs/1")
- assert r.status_code == 500
- assert "database connection lost" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert "An error occurred while retrieving the processing job." in r.json().get(
+ "message", ""
+ )
@patch("app.routers.unit_jobs.get_processing_job_by_user_id")
-def test_unit_jobs_get_job_http_error(mock_get_processing_job, client):
+def test_unit_jobs_get_job_internal_error(mock_get_processing_job, client):
- mock_get_processing_job.side_effect = HTTPException(
- status_code=503, detail="Oops, service unavailable"
- )
+ mock_get_processing_job.side_effect = InternalException()
r = client.get("/unit_jobs/1")
- assert r.status_code == 503
- assert "service unavailable" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert "An internal server error occurred." in r.json().get("message", "")
@patch("app.routers.unit_jobs.get_processing_job_results")
@@ -119,8 +121,8 @@ def test_unit_jobs_get_job_results_404(mock_get_processing_job_results, client):
mock_get_processing_job_results.return_value = None
r = client.get("/unit_jobs/1/results")
- assert r.status_code == 404
- assert "result for processing job 1 not found" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_404_NOT_FOUND
+ assert "The requested job was not found." in r.json().get("message", "")
@patch("app.routers.unit_jobs.get_processing_job_results")
@@ -131,5 +133,7 @@ def test_unit_jobs_get_job_results_500(mock_get_processing_job_results, client):
)
r = client.get("/unit_jobs/1/results")
- assert r.status_code == 500
- assert "database connection lost" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert "An error occurred while retrieving processing job results." in r.json().get(
+ "message", ""
+ )
diff --git a/tests/routers/test_upscale_tasks.py b/tests/routers/test_upscale_tasks.py
index 86e3d53..7c5d01a 100644
--- a/tests/routers/test_upscale_tasks.py
+++ b/tests/routers/test_upscale_tasks.py
@@ -1,9 +1,11 @@
import json
from unittest.mock import AsyncMock, patch
-from fastapi import HTTPException, WebSocketDisconnect
+from fastapi import WebSocketDisconnect, status
import pytest
+from app.error import InternalException
+
@patch("app.routers.upscale_tasks.create_upscaling_processing_jobs")
@patch("app.routers.upscale_tasks.create_upscaling_task")
@@ -18,7 +20,7 @@ def test_upscaling_task_create_201(
mock_create_upscaling_task.return_value = fake_upscaling_task_summary
r = client.post("/upscale_tasks", json=fake_upscaling_task_request.model_dump())
- assert r.status_code == 201
+ assert r.status_code == status.HTTP_201_CREATED
assert r.json() == fake_upscaling_task_summary.model_dump()
assert mock_create_processing_jobs.called_once()
@@ -30,29 +32,27 @@ def test_upscaling_task_create_500(
fake_upscaling_task_request,
):
- mock_create_upscaling_task.side_effect = SystemError(
- "Could not launch the upscale task"
- )
+ mock_create_upscaling_task.side_effect = SystemError("Database connection lost")
r = client.post("/upscale_tasks", json=fake_upscaling_task_request.model_dump())
- assert r.status_code == 500
- assert "could not launch the upscale task" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert "An error occurred while retrieving processing job results." in r.json().get(
+ "message", ""
+ )
@patch("app.routers.upscale_tasks.create_upscaling_task")
-def test_upscaling_task_create_http_error(
+def test_upscaling_task_create_internal_error(
mock_create_upscaling_task,
client,
fake_upscaling_task_request,
):
- mock_create_upscaling_task.side_effect = HTTPException(
- status_code=503, detail="Oops, service unavailable"
- )
+ mock_create_upscaling_task.side_effect = InternalException()
r = client.post("/upscale_tasks", json=fake_upscaling_task_request.model_dump())
- assert r.status_code == 503
- assert "service unavailable" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert "An internal server error occurred." in r.json().get("message", "")
@patch("app.routers.upscale_tasks.get_upscaling_task_by_user_id")
@@ -65,7 +65,7 @@ def test_upscaling_task_get_task_200(
mock_get_upscale_task.return_value = fake_upscaling_task
r = client.get("/upscale_tasks/1")
- assert r.status_code == 200
+ assert r.status_code == status.HTTP_200_OK
assert json.dumps(r.json(), indent=1) == fake_upscaling_task.model_dump_json(
indent=1
)
@@ -77,8 +77,8 @@ def test_upscaling_task_get_task_404(mock_get_upscale_task, client):
mock_get_upscale_task.return_value = None
r = client.get("/upscale_tasks/1")
- assert r.status_code == 404
- assert "upscale task 1 not found" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_404_NOT_FOUND
+ assert "The requested task was not found." in r.json().get("message", "")
@patch("app.routers.upscale_tasks.get_upscaling_task_by_user_id")
@@ -87,20 +87,21 @@ def test_upscaling_task_get_task_500(mock_get_upscale_task, client):
mock_get_upscale_task.side_effect = SystemError("Database connection lost")
r = client.get("/upscale_tasks/1")
- assert r.status_code == 500
- assert "database connection lost" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert "An error occurred while retrieving the upscale task" in r.json().get(
+ "message", ""
+ )
@patch("app.routers.upscale_tasks.get_upscaling_task_by_user_id")
-def test_upscaling_task_get_task_http_error(mock_get_upscale_task, client):
+def test_upscaling_task_get_task_internal_error(mock_get_upscale_task, client):
- mock_get_upscale_task.side_effect = HTTPException(
- status_code=503, detail="Oops, service unavailable"
- )
+ error = InternalException()
+ mock_get_upscale_task.side_effect = error
r = client.get("/upscale_tasks/1")
- assert r.status_code == 503
- assert "service unavailable" in r.json().get("detail", "").lower()
+ assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert error.message in r.json().get("message", "")
@pytest.mark.asyncio
@@ -135,6 +136,7 @@ async def test_ws_jobs_status_closes_on_error(
websocket.receive_json()
websocket.receive_json()
websocket.receive_json()
+ websocket.receive_json()
assert exc_info.value.code == 1011