Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,6 @@ cluster_estimation:
communications:
timeout: 60.0 # seconds
worker_period: 0.5 # seconds

auto_landing:
enabled: true
72 changes: 68 additions & 4 deletions modules/auto_landing/auto_landing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@
"""

import math
from enum import Enum

from .. import merged_odometry_detections
from ..common.modules.logger import logger


class DetectionSelectionStrategy(Enum):
"""
Strategies for selecting which detection to use when multiple targets are detected.
"""

FIRST_DETECTION = "first_detection" # Use first detection in list (original behavior)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iffy about first detection because it also runs on the assumption that the first detection is correct, but leaving for now

HIGHEST_CONFIDENCE = "highest_confidence" # Choose detection with highest confidence


class AutoLandingInformation:
"""
Information necessary for the LANDING_TARGET MAVLink command.
Expand Down Expand Up @@ -44,17 +54,21 @@ def create(
im_h: float,
im_w: float,
local_logger: logger.Logger,
selection_strategy: DetectionSelectionStrategy = DetectionSelectionStrategy.FIRST_DETECTION,
) -> "tuple [bool, AutoLanding | None ]":
"""
fov_x: The horizontal camera field of view in degrees.
fov_y: The vertical camera field of view in degrees.
im_w: Width of image.
im_h: Height of image.
selection_strategy: Strategy for selecting which detection to use when multiple targets are detected.

Returns an AutoLanding object.
"""

return True, AutoLanding(cls.__create_key, fov_x, fov_y, im_h, im_w, local_logger)
return True, AutoLanding(
cls.__create_key, fov_x, fov_y, im_h, im_w, local_logger, selection_strategy
)

def __init__(
self,
Expand All @@ -64,6 +78,7 @@ def __init__(
im_h: float,
im_w: float,
local_logger: logger.Logger,
selection_strategy: DetectionSelectionStrategy,
) -> None:
"""
Private constructor, use create() method.
Expand All @@ -75,10 +90,41 @@ def __init__(
self.im_h = im_h
self.im_w = im_w
self.__logger = local_logger
self.__selection_strategy = selection_strategy

def _select_detection(self, detections: list) -> int | None:
"""
Select which detection to use based on the configured strategy.

Returns the index of the selected detection, or None if no suitable detection found.
"""
if not detections:
return None

if (
len(detections) == 1
or self.__selection_strategy == DetectionSelectionStrategy.FIRST_DETECTION
):
return 0

if self.__selection_strategy == DetectionSelectionStrategy.HIGHEST_CONFIDENCE:
# Find detection with highest confidence
max_confidence = 0
selected_index = 0

for i, detection in enumerate(detections):
if detection.confidence > max_confidence:
max_confidence = detection.confidence
selected_index = i

return selected_index

# Default fallback
return 0

def run(
self, odometry_detections: merged_odometry_detections.MergedOdometryDetections
) -> "tuple[bool, AutoLandingInformation]":
) -> "tuple[bool, AutoLandingInformation | None]":
"""
Calculates the x and y angles in radians of the bounding box based on its center.

Expand All @@ -87,8 +133,26 @@ def run(
Returns an AutoLandingInformation object.
"""

# TODO: Devise better algorithm to pick which detection to land at if several are detected
x_center, y_center = odometry_detections.detections[0].get_centre()
# Check if we have any detections
if not odometry_detections.detections:
self.__logger.warning("No detections available for auto-landing", True)
return False, None

# Select which detection to use
selected_index = self._select_detection(odometry_detections.detections)
if selected_index is None:
self.__logger.error("Failed to select detection for auto-landing", True)
return False, None

selected_detection = odometry_detections.detections[selected_index]

# Log selection information
self.__logger.info(
f"Selected detection {selected_index + 1}/{len(odometry_detections.detections)} using strategy: {self.__selection_strategy.value}",
True,
)

x_center, y_center = selected_detection.get_centre()

angle_x = (x_center - self.im_w / 2) * (self.fov_x * (math.pi / 180)) / self.im_w
angle_y = (y_center - self.im_h / 2) * (self.fov_y * (math.pi / 180)) / self.im_h
Expand Down
42 changes: 29 additions & 13 deletions modules/auto_landing/auto_landing_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import os
import pathlib
import queue
import time

from utilities.workers import queue_proxy_wrapper
Expand All @@ -18,16 +19,23 @@ def auto_landing_worker(
im_h: float,
im_w: float,
period: float,
detection_strategy: auto_landing.DetectionSelectionStrategy,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Worker process.
Worker process for auto-landing operations.

period: Wait time in seconds.
input_queue and output_queue are data queues.
controller is how the main process communicates to this worker process.
fov_x: Horizontal field of view in degrees.
fov_y: Vertical field of view in degrees.
im_h: Image height in pixels.
im_w: Image width in pixels.
period: Wait time in seconds between processing cycles.
detection_strategy: Strategy for selecting detection when multiple targets are present.
input_queue: Queue for receiving merged odometry detections.
output_queue: Queue for sending auto-landing information.
controller: Worker controller for pause/exit management.
"""

worker_name = pathlib.Path(__file__).stem
Expand All @@ -42,25 +50,33 @@ def auto_landing_worker(

local_logger.info("Logger initialized", True)

result, auto_lander = auto_landing.AutoLanding.create(fov_x, fov_y, im_h, im_w, local_logger)

# Create auto-landing instance
result, auto_lander = auto_landing.AutoLanding.create(
fov_x, fov_y, im_h, im_w, local_logger, detection_strategy
)
if not result:
local_logger.error("Worker failed to create class object", True)
local_logger.error("Worker failed to create AutoLanding object", True)
return

# Get Pylance to stop complaining
assert auto_lander is not None

local_logger.info("Auto-landing worker initialized successfully", True)

while not controller.is_exit_requested():
controller.check_pause()

input_data = input_queue.queue.get()
if input_data is None:
# Process detections if available
input_data = None
try:
input_data = input_queue.queue.get_nowait()
except queue.Empty:
# No data available, continue
continue

result, value = auto_lander.run(input_data)
if not result:
continue
if input_data is not None:
result, landing_info = auto_lander.run(input_data)
if result and landing_info:
output_queue.queue.put(landing_info)

output_queue.queue.put(value)
time.sleep(period)
Loading
Loading