diff --git a/config.yaml b/config.yaml index e6a3a0e1..af4d4dd8 100644 --- a/config.yaml +++ b/config.yaml @@ -82,3 +82,6 @@ cluster_estimation: communications: timeout: 60.0 # seconds worker_period: 0.5 # seconds + +auto_landing: + enabled: true diff --git a/modules/auto_landing/auto_landing.py b/modules/auto_landing/auto_landing.py index a8841062..be596d3c 100644 --- a/modules/auto_landing/auto_landing.py +++ b/modules/auto_landing/auto_landing.py @@ -4,11 +4,21 @@ """ import math +from enum import Enum from .. import merged_odometry_detections from ..common.modules.logger import logger +class DetectionSelectionStrategy(Enum): + """ + Strategies for selecting which detection to use when multiple targets are detected. + """ + + FIRST_DETECTION = "first_detection" # Use first detection in list (original behavior) + HIGHEST_CONFIDENCE = "highest_confidence" # Choose detection with highest confidence + + class AutoLandingInformation: """ Information necessary for the LANDING_TARGET MAVLink command. @@ -44,17 +54,21 @@ def create( im_h: float, im_w: float, local_logger: logger.Logger, + selection_strategy: DetectionSelectionStrategy = DetectionSelectionStrategy.FIRST_DETECTION, ) -> "tuple [bool, AutoLanding | None ]": """ fov_x: The horizontal camera field of view in degrees. fov_y: The vertical camera field of view in degrees. im_w: Width of image. im_h: Height of image. + selection_strategy: Strategy for selecting which detection to use when multiple targets are detected. Returns an AutoLanding object. """ - return True, AutoLanding(cls.__create_key, fov_x, fov_y, im_h, im_w, local_logger) + return True, AutoLanding( + cls.__create_key, fov_x, fov_y, im_h, im_w, local_logger, selection_strategy + ) def __init__( self, @@ -64,6 +78,7 @@ def __init__( im_h: float, im_w: float, local_logger: logger.Logger, + selection_strategy: DetectionSelectionStrategy, ) -> None: """ Private constructor, use create() method. @@ -75,10 +90,41 @@ def __init__( self.im_h = im_h self.im_w = im_w self.__logger = local_logger + self.__selection_strategy = selection_strategy + + def _select_detection(self, detections: list) -> int | None: + """ + Select which detection to use based on the configured strategy. + + Returns the index of the selected detection, or None if no suitable detection found. + """ + if not detections: + return None + + if ( + len(detections) == 1 + or self.__selection_strategy == DetectionSelectionStrategy.FIRST_DETECTION + ): + return 0 + + if self.__selection_strategy == DetectionSelectionStrategy.HIGHEST_CONFIDENCE: + # Find detection with highest confidence + max_confidence = 0 + selected_index = 0 + + for i, detection in enumerate(detections): + if detection.confidence > max_confidence: + max_confidence = detection.confidence + selected_index = i + + return selected_index + + # Default fallback + return 0 def run( self, odometry_detections: merged_odometry_detections.MergedOdometryDetections - ) -> "tuple[bool, AutoLandingInformation]": + ) -> "tuple[bool, AutoLandingInformation | None]": """ Calculates the x and y angles in radians of the bounding box based on its center. @@ -87,8 +133,26 @@ def run( Returns an AutoLandingInformation object. """ - # TODO: Devise better algorithm to pick which detection to land at if several are detected - x_center, y_center = odometry_detections.detections[0].get_centre() + # Check if we have any detections + if not odometry_detections.detections: + self.__logger.warning("No detections available for auto-landing", True) + return False, None + + # Select which detection to use + selected_index = self._select_detection(odometry_detections.detections) + if selected_index is None: + self.__logger.error("Failed to select detection for auto-landing", True) + return False, None + + selected_detection = odometry_detections.detections[selected_index] + + # Log selection information + self.__logger.info( + f"Selected detection {selected_index + 1}/{len(odometry_detections.detections)} using strategy: {self.__selection_strategy.value}", + True, + ) + + x_center, y_center = selected_detection.get_centre() angle_x = (x_center - self.im_w / 2) * (self.fov_x * (math.pi / 180)) / self.im_w angle_y = (y_center - self.im_h / 2) * (self.fov_y * (math.pi / 180)) / self.im_h diff --git a/modules/auto_landing/auto_landing_worker.py b/modules/auto_landing/auto_landing_worker.py index 62890e04..81fc8ae8 100644 --- a/modules/auto_landing/auto_landing_worker.py +++ b/modules/auto_landing/auto_landing_worker.py @@ -4,6 +4,7 @@ import os import pathlib +import queue import time from utilities.workers import queue_proxy_wrapper @@ -18,16 +19,23 @@ def auto_landing_worker( im_h: float, im_w: float, period: float, + detection_strategy: auto_landing.DetectionSelectionStrategy, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ - Worker process. + Worker process for auto-landing operations. - period: Wait time in seconds. - input_queue and output_queue are data queues. - controller is how the main process communicates to this worker process. + fov_x: Horizontal field of view in degrees. + fov_y: Vertical field of view in degrees. + im_h: Image height in pixels. + im_w: Image width in pixels. + period: Wait time in seconds between processing cycles. + detection_strategy: Strategy for selecting detection when multiple targets are present. + input_queue: Queue for receiving merged odometry detections. + output_queue: Queue for sending auto-landing information. + controller: Worker controller for pause/exit management. """ worker_name = pathlib.Path(__file__).stem @@ -42,25 +50,33 @@ def auto_landing_worker( local_logger.info("Logger initialized", True) - result, auto_lander = auto_landing.AutoLanding.create(fov_x, fov_y, im_h, im_w, local_logger) - + # Create auto-landing instance + result, auto_lander = auto_landing.AutoLanding.create( + fov_x, fov_y, im_h, im_w, local_logger, detection_strategy + ) if not result: - local_logger.error("Worker failed to create class object", True) + local_logger.error("Worker failed to create AutoLanding object", True) return # Get Pylance to stop complaining assert auto_lander is not None + local_logger.info("Auto-landing worker initialized successfully", True) + while not controller.is_exit_requested(): controller.check_pause() - input_data = input_queue.queue.get() - if input_data is None: + # Process detections if available + input_data = None + try: + input_data = input_queue.queue.get_nowait() + except queue.Empty: + # No data available, continue continue - result, value = auto_lander.run(input_data) - if not result: - continue + if input_data is not None: + result, landing_info = auto_lander.run(input_data) + if result and landing_info: + output_queue.queue.put(landing_info) - output_queue.queue.put(value) time.sleep(period) diff --git a/tests/integration/test_auto_landing_worker.py b/tests/integration/test_auto_landing_worker.py new file mode 100644 index 00000000..727b5a07 --- /dev/null +++ b/tests/integration/test_auto_landing_worker.py @@ -0,0 +1,248 @@ +""" +Test auto-landing worker process. +""" + +import multiprocessing as mp +import pathlib +import time +import math + +import numpy as np + +from modules import detections_and_time +from modules import merged_odometry_detections +from modules.auto_landing import auto_landing +from modules.auto_landing import auto_landing_worker +from modules.common.modules import orientation +from modules.common.modules import position_local +from modules.common.modules.mavlink import drone_odometry_local +from modules.common.modules.read_yaml import read_yaml +from utilities.workers import queue_proxy_wrapper +from utilities.workers import worker_controller + + +# Test-specific parameters +WORKER_PERIOD = 0.1 # seconds +CONFIG_FILE_PATH = pathlib.Path("config.yaml") +detection_strategy = auto_landing.DetectionSelectionStrategy.FIRST_DETECTION +# detection_strategy = auto_landing.DetectionSelectionStrategy.HIGHEST_CONFIDENCE + + +def simulate_detection_input( + input_queue: queue_proxy_wrapper.QueueProxyWrapper, + detections: list, + position: tuple, + orientation_angles: tuple, +) -> None: + """ + Create and place merged odometry detections into the input queue. + """ + # Create drone position + result, drone_position = position_local.PositionLocal.create( + position[0], position[1], position[2] + ) + assert result + assert drone_position is not None + + # Create drone orientation + result, drone_orientation = orientation.Orientation.create( + orientation_angles[0], orientation_angles[1], orientation_angles[2] + ) + assert result + assert drone_orientation is not None + + # Create drone odometry + result, drone_odometry = drone_odometry_local.DroneOdometryLocal.create( + drone_position, drone_orientation + ) + assert result + assert drone_odometry is not None + + # Create merged odometry detections + result, merged = merged_odometry_detections.MergedOdometryDetections.create( + drone_odometry, detections + ) + assert result + assert merged is not None + + input_queue.queue.put(merged) + + +def create_test_detection( + bbox: list, label: int, confidence: float +) -> detections_and_time.Detection: + """ + Create a test detection with the given parameters. + """ + result, detection = detections_and_time.Detection.create( + np.array(bbox, dtype=np.float32), label, confidence + ) + assert result + assert detection is not None + return detection + + +def calculate_expected_angles( + detection_bbox: list, fov_x_deg: float, fov_y_deg: float, im_w: float, im_h: float +) -> tuple[float, float]: + """ + Calculates the expected angle_x and angle_y for a given detection bounding box + and camera parameters, mirroring the logic in auto_landing.py. + Assumes fov_x_deg and fov_y_deg are in degrees. + """ + x_center = (detection_bbox[0] + detection_bbox[2]) / 2 + y_center = (detection_bbox[1] + detection_bbox[3]) / 2 + + angle_x = (x_center - im_w / 2) * (fov_x_deg * (math.pi / 180)) / im_w + angle_y = (y_center - im_h / 2) * (fov_y_deg * (math.pi / 180)) / im_h + return angle_x, angle_y + + +def main() -> int: + """ + Main function. + """ + # Read config file + result, config = read_yaml.open_config(CONFIG_FILE_PATH) + assert result and config is not None, "Failed to read config file" + + # Check if the feature is enabled in the config + auto_landing_config = config.get("auto_landing", {}) + if not auto_landing_config.get("enabled", False): + print("Auto-landing is disabled in config.yaml, skipping test.") + return 0 + + # Extract parameters from config + try: + fov_x = config["geolocation"]["fov_x"] + fov_y = config["geolocation"]["fov_y"] + image_height = config["video_input"]["height"] + image_width = config["video_input"]["width"] + except KeyError as e: + print(f"ERROR: Config file missing required key: {e}") + return -1 + + # Setup + controller = worker_controller.WorkerController() + + mp_manager = mp.Manager() + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + + # Create worker process using parameters from config + worker = mp.Process( + target=auto_landing_worker.auto_landing_worker, + args=( + fov_x, + fov_y, + image_height, + image_width, + WORKER_PERIOD, + detection_strategy, # Explicitly pass the strategy to be tested + input_queue, + output_queue, + controller, + ), + ) + + # Start worker + worker.start() + + # Give worker time to initialize + time.sleep(0.5) + + # Test 1: Send a single detection and verify it's processed + print("--- Test 1: Processing single detection ---") + detection1_bbox = [200, 200, 400, 400] + detection1 = create_test_detection(detection1_bbox, 1, 0.9) + simulate_detection_input( + input_queue, + [detection1], + (0.0, 0.0, -50.0), # 50 meters above ground + (0.0, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have output now + assert not output_queue.queue.empty() + landing_info = output_queue.queue.get_nowait() + assert landing_info is not None + assert hasattr(landing_info, "angle_x") + assert hasattr(landing_info, "angle_y") + assert hasattr(landing_info, "target_dist") + print("--- Test 1 Passed ---") + + # Test 2: Verify strategy with multiple detections + print(f"--- Test 2: Verifying {detection_strategy.value} strategy ---") + + # This is the first detection in the list + detection_first_bbox = [100, 100, 200, 200] + detection_first = create_test_detection(detection_first_bbox, 1, 0.7) + + # This detection is at the center and has higher confidence + center_x = image_width / 2 + center_y = image_height / 2 + detection_center_bbox = [center_x, center_y, center_x, center_y] + detection_center = create_test_detection(detection_center_bbox, 2, 0.95) + + # Order matters for FIRST_DETECTION strategy + detections_for_test_2 = [detection_first, detection_center] + + simulate_detection_input( + input_queue, + detections_for_test_2, + (0.0, 0.0, -100.0), # 100 meters above ground + (0.0, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have output + assert not output_queue.queue.empty() + landing_info2 = output_queue.queue.get_nowait() + assert landing_info2 is not None + + # Determine expected angles based on the strategy + expected_angle_x = 0.0 + expected_angle_y = 0.0 + + if detection_strategy == auto_landing.DetectionSelectionStrategy.HIGHEST_CONFIDENCE: + # If HIGHEST_CONFIDENCE, it should pick detection_center + expected_angle_x, expected_angle_y = calculate_expected_angles( + detection_center_bbox, fov_x, fov_y, image_width, image_height + ) + elif detection_strategy == auto_landing.DetectionSelectionStrategy.FIRST_DETECTION: + # If FIRST_DETECTION, it should pick detection_first + expected_angle_x, expected_angle_y = calculate_expected_angles( + detection_first_bbox, fov_x, fov_y, image_width, image_height + ) + + # Verify the calculated angles using a tolerance for float comparison + assert math.isclose( + landing_info2.angle_x, expected_angle_x, rel_tol=1e-7 + ), f"Expected angle_x to be {expected_angle_x}, but got {landing_info2.angle_x}" + assert math.isclose( + landing_info2.angle_y, expected_angle_y, rel_tol=1e-7 + ), f"Expected angle_y to be {expected_angle_y}, but got {landing_info2.angle_y}" + print("--- Test 2 Passed ---") + + # Cleanup + controller.request_exit() + + # Drain queues + input_queue.fill_and_drain_queue() + + # Wait for worker to finish + worker.join(timeout=5.0) + worker.terminate() + + return 0 + + +if __name__ == "__main__": + result_main = main() + if result_main < 0: + print(f"ERROR: Status code: {result_main}") + + print("Done!")