Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added alert_system/__init__.py
Empty file.
34 changes: 34 additions & 0 deletions alert_system/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from django.contrib import admin

from .models import Connector, ExtractionItem, LoadItems


@admin.register(Connector)
class ConnectorAdmin(admin.ModelAdmin):
list_display = ("id", "type", "last_success_run", "status")
readonly_fields = ("last_success_run",)


@admin.register(ExtractionItem)
class EventAdmin(admin.ModelAdmin):
list_display = ("stac_id", "created_at", "collection")
list_filter = ("connector", "collection")
readonly_fields = ("connector",)
search_fields = ("stac_id",)


@admin.register(LoadItems)
class LoadAdmin(admin.ModelAdmin):
list_display = (
"id",
"event_title",
"created_at",
"correlation_id",
"item_eligible",
)
list_filter = (
"connector",
"item_eligible",
)
readonly_fields = ("connector", "item_eligible")
search_fields = ("id",)
6 changes: 6 additions & 0 deletions alert_system/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class AlertSystemConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "alert_system"
10 changes: 10 additions & 0 deletions alert_system/etl/Gdacs_flood/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# NOTE: Store Config files here. Might need to refactor if source supports filtering with hazards.
class GdacsFloodConfig:
def __init__(self):
self.event_endpoint = "/gdacs-events/items"
self.hazard_endpoint = "/gdacs-hazards/items"
self.impact_endpoint = "/gdacs-impacts/items"
self.people_exposed_threshold = 5


gdacs_flood_config = GdacsFloodConfig()
23 changes: 23 additions & 0 deletions alert_system/etl/Gdacs_flood/extraction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging

from alert_system.etl.base.extraction import BaseExtractionClass
from alert_system.etl.base.loader import BaseLoaderClass
from alert_system.etl.base.transform import BaseTransformerClass

from .config import gdacs_flood_config
from .loader import GdacsLoader
from .transform import GdacsTransformer

logger = logging.getLogger(__name__)


class GdacsFloodExtraction(BaseExtractionClass):
event_endpoint = gdacs_flood_config.event_endpoint
hazard_endpoint = getattr(gdacs_flood_config, "hazard_endpoint", None)
impact_endpoint = getattr(gdacs_flood_config, "impact_endpoint", None)

def get_transformer_class(self) -> type[BaseTransformerClass]:
return GdacsTransformer

def get_loader_class(self) -> type[BaseLoaderClass]:
return GdacsLoader
13 changes: 13 additions & 0 deletions alert_system/etl/Gdacs_flood/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from alert_system.etl.base.loader import BaseLoaderClass

from .config import gdacs_flood_config


class GdacsLoader(BaseLoaderClass):
people_exposed_threshold = gdacs_flood_config.people_exposed_threshold

# NOTE: Add additional changes to the filter here. This is example only.
def filter_eligible_items(self, load_obj):
if load_obj.get("people_exposed") > GdacsLoader.people_exposed_threshold:
return True
return False
90 changes: 90 additions & 0 deletions alert_system/etl/Gdacs_flood/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import logging
from typing import Dict, Tuple

from alert_system.etl.base.transform import BaseTransformerClass

logger = logging.getLogger(__name__)


class GdacsTransformer(BaseTransformerClass):
"""
Transformer for GDACS STAC impacts.
Extracts and normalizes impact fields, computes derived values, and stores metadata.
"""

# Mapping of (category, type) → flattened key
IMPACT_MAP: Dict[Tuple[str, str], str] = {
("people", "potentially_affected"): "people.potentially_affected",
("people", "death"): "people.death",
("people", "affected_total"): "people.affected_total",
("people", "affected_direct"): "people.affected_direct",
("people", "affected_indirect"): "people.affected_indirect",
("people", "highest_risk"): "people.highest_risk",
("buildings", "destroyed"): "buildings.destroyed",
}

# NOTE: This logic might change in future
def compute_people_exposed(self, impacts: dict) -> int:
value = next(
(
impacts.get(key)
for key in ["people.affected_total", "people.potentially_affected", "people.affected_direct"]
if impacts.get(key)
),
0,
)
if not isinstance(value, int):
logger.warning(f"people_exposed value is not int: {value}")
return 0
return value

# NOTE: This logic might change in future
def compute_buildings_exposed(self, impacts: dict) -> int:
"""
Compute the 'buildings_exposed' field.
"""
return impacts.get("buildings.destroyed") or 0

def process_impact(self, impact_items) -> BaseTransformerClass.ImpactType:
raw_impacts, metadata = {}, {}
for item in impact_items:
properties = item.resp_data.get("properties", {})
impact_detail = properties.get("monty:impact_detail", {})
category = impact_detail.get("category")
type_ = impact_detail.get("type")
value = impact_detail.get("value")
if category and type_:
field = self.IMPACT_MAP.get((category, type_)) or f"{category}.{type_}"
raw_impacts[field] = value
metadata[field] = impact_detail

return {
"people_exposed": self.compute_people_exposed(raw_impacts),
"buildings_exposed": self.compute_buildings_exposed(raw_impacts),
"impact_metadata": metadata,
}

def process_hazard(self, hazard_item) -> BaseTransformerClass.HazardType:
if not hazard_item:
return {
"severity_unit": "",
"severity_label": "",
"severity_value": 0,
}

properties = hazard_item.resp_data.get("properties", {})
detail = properties.get("monty:hazard_detail", {})

return {
"severity_unit": detail.get("severity_unit", ""),
"severity_label": detail.get("severity_label", ""),
"severity_value": detail.get("severity_value", 0),
}

def process_event(self, event_item) -> BaseTransformerClass.EventType:
properties = event_item.resp_data.get("properties", {})
return {
"title": properties.get("title", ""),
"description": properties.get("description", ""),
"country": properties.get("monty:country_codes", ""),
}
Loading
Loading