Skip to content

Conversation

@sandeshit
Copy link
Contributor

Changes

  • Alert-system app
  • Polling, ETL

Checklist

Things that should succeed before merging.

  • Updated/ran unit tests
  • Updated CHANGELOG.md

Release

If there is a version update, make sure to tag the repository with the latest version.

@sandeshit sandeshit mentioned this pull request Nov 28, 2025
2 tasks
@sandeshit sandeshit changed the base branch from project/alert-system-workflow to develop December 7, 2025 18:22
@sandeshit sandeshit changed the base branch from develop to project/alert-system-workflow December 7, 2025 18:23
@sandeshit sandeshit marked this pull request as draft December 7, 2025 18:24
@sandeshit sandeshit requested a review from susilnem December 8, 2025 09:09
Copy link
Member

@susilnem susilnem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor changes...

Copy link
Member

@susilnem susilnem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small changes!

Copy link
Contributor

@sudip-khanal sudip-khanal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor changes

@sandeshit sandeshit requested a review from susilnem December 12, 2025 09:02
Copy link
Member

@susilnem susilnem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any analytics on usage and memory consumption?
Has any profiling been done so far?

return

try:
eligible_items = LoadItem.objects.filter(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
eligible_items = LoadItem.objects.filter(
eligible_items_qs = LoadItem.objects.filter(

Comment on lines +63 to +74
count = eligible_items.count()

if count == 0:
logger.info(f"[Past Events] No eligible items in run {extraction_run_id}")
return

logger.info(f"[Past Events] Processing {count} items from run {extraction_run_id}")

first_item = eligible_items.first()
if not first_item:
logger.info("No Connector found for the Event")
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are almost same checks.

processed += 1
except Exception as e:
failed += 1
logger.error(f"[Past Events] Failed for item {load_obj.id} in run {extraction_run_id}: {e}", exc_info=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.error(f"[Past Events] Failed for item {load_obj.id} in run {extraction_run_id}: {e}", exc_info=True)
logger.warning(f"[Past Events] Failed for item {load_obj.id} in run {extraction_run_id}")


logger.info(f"[Past Events] Completed run {extraction_run_id}: " f"{processed} processed, {failed} failed")

return {"extraction_run_id": extraction_run_id, "processed": processed, "failed": failed, "total": count}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a type dict for returning dict value

Comment on lines +120 to +124
go_event_ids = list(
Event.objects.filter(field_reports__num_affected__gte=item.total_people_exposed, dtype=connector.dtype)
.values_list("id", flat=True)
.distinct()
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N+M+1

.distinct()
)

item.related_go_events = go_event_ids
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if we have instead use M2M

Use bulk insert to add M2M (Use ignore existing)

return extraction_run_id

except Exception as exc:
logger.exception(f"[ETL] Connector {connector.id} failed: {exc}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.exception(f"[ETL] Connector {connector.id} failed: {exc}")
logger.warning(f"[ETL] Connector {connector.id} failed", exc_info=True)

try:
raise self.retry(exc=exc)
except MaxRetriesExceededError:
logger.error(f"[ETL] Max retries exceeded for connector {connector.id}", exc_info=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.error(f"[ETL] Max retries exceeded for connector {connector.id}", exc_info=True)
logger.error(f"[ETL] Max retries exceeded for connector {connector.label}", exc_info=True)

help_text=_("Unique identifier for the event item"),
)

resp_data = models.JSONField(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a

XXX: This might increase the database size

SOURCE_TYPE = Connector.ConnectorType.USGS_EARTHQUAKE

def handle(self, *args, **options):
if not self.SOURCE_TYPE:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not self.SOURCE_TYPE:

)

action = "Created" if created else "Updated"
logger.info(f"{action} Event for correlation_id={correlation_id}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.info(f"{action} Event for correlation_id={correlation_id}")
logger.info(f"{action} Event for {correlation_id=}")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alert_system/etl/Gdacs_flood/extraction.py
alert_system/etl/gdacs_flood/extraction.py

ISO 8601 datetime range string
"""

now = datetime.now(timezone.utc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
now = datetime.now(timezone.utc)
from django.utils import timezone
...
now = timezeone.now()

Comment on lines +13 to +18
event_collection_type = gdacs_flood_config.event_collection_type
hazard_collection_type = getattr(gdacs_flood_config, "hazard_collection_type", None)
impact_collection_type = getattr(gdacs_flood_config, "impact_collection_type", None)
filter_event = getattr(gdacs_flood_config, "filter_event", None)
transformer_class = GdacsTransformer
loader_class = GdacsLoader
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can replace all this with a single typed dict...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants