From 2f4f5bf5ccd684f065e11f2441961c1f95515e4f Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Thu, 17 Jul 2025 22:08:57 -0400 Subject: [PATCH 01/10] Complete autonomous landing module --- modules/auto_landing/auto_landing.py | 201 +++++++++++++- modules/auto_landing/auto_landing_worker.py | 128 ++++++++- tests/integration/test_auto_landing_worker.py | 248 ++++++++++++++++++ 3 files changed, 559 insertions(+), 18 deletions(-) create mode 100644 tests/integration/test_auto_landing_worker.py diff --git a/modules/auto_landing/auto_landing.py b/modules/auto_landing/auto_landing.py index a8841062..5bfcf3c8 100644 --- a/modules/auto_landing/auto_landing.py +++ b/modules/auto_landing/auto_landing.py @@ -4,11 +4,25 @@ """ import math +from enum import Enum +from typing import Optional +import threading from .. import merged_odometry_detections from ..common.modules.logger import logger +class DetectionSelectionStrategy(Enum): + """ + Strategies for selecting which detection to use when multiple targets are detected. + """ + + NEAREST_TO_CENTER = "nearest_to_center" # Choose detection closest to image center + LARGEST_AREA = "largest_area" # Choose detection with largest bounding box area + HIGHEST_CONFIDENCE = "highest_confidence" # Choose detection with highest confidence + FIRST_DETECTION = "first_detection" # Use first detection in list (original behavior) + + class AutoLandingInformation: """ Information necessary for the LANDING_TARGET MAVLink command. @@ -44,17 +58,21 @@ def create( im_h: float, im_w: float, local_logger: logger.Logger, + selection_strategy: DetectionSelectionStrategy = DetectionSelectionStrategy.NEAREST_TO_CENTER, ) -> "tuple [bool, AutoLanding | None ]": """ fov_x: The horizontal camera field of view in degrees. fov_y: The vertical camera field of view in degrees. im_w: Width of image. im_h: Height of image. + selection_strategy: Strategy for selecting which detection to use when multiple targets are detected. Returns an AutoLanding object. """ - return True, AutoLanding(cls.__create_key, fov_x, fov_y, im_h, im_w, local_logger) + return True, AutoLanding( + cls.__create_key, fov_x, fov_y, im_h, im_w, local_logger, selection_strategy + ) def __init__( self, @@ -64,6 +82,7 @@ def __init__( im_h: float, im_w: float, local_logger: logger.Logger, + selection_strategy: DetectionSelectionStrategy, ) -> None: """ Private constructor, use create() method. @@ -75,10 +94,75 @@ def __init__( self.im_h = im_h self.im_w = im_w self.__logger = local_logger + self.__selection_strategy = selection_strategy + + def _select_detection(self, detections: list) -> Optional[int]: + """ + Select which detection to use based on the configured strategy. + + Returns the index of the selected detection, or None if no suitable detection found. + """ + if not detections: + return None + + if len(detections) == 1: + return 0 + + if self.__selection_strategy == DetectionSelectionStrategy.FIRST_DETECTION: + return 0 + + if self.__selection_strategy == DetectionSelectionStrategy.NEAREST_TO_CENTER: + # Find detection closest to image center + image_center_x = self.im_w / 2 + image_center_y = self.im_h / 2 + + min_distance = float("inf") + selected_index = 0 + + for i, detection in enumerate(detections): + det_center_x, det_center_y = detection.get_centre() + distance = math.sqrt( + (det_center_x - image_center_x) ** 2 + (det_center_y - image_center_y) ** 2 + ) + if distance < min_distance: + min_distance = distance + selected_index = i + + return selected_index + + if self.__selection_strategy == DetectionSelectionStrategy.LARGEST_AREA: + # Find detection with largest bounding box area + max_area = 0 + selected_index = 0 + + for i, detection in enumerate(detections): + width = detection.x_2 - detection.x_1 + height = detection.y_2 - detection.y_1 + area = width * height + if area > max_area: + max_area = area + selected_index = i + + return selected_index + + if self.__selection_strategy == DetectionSelectionStrategy.HIGHEST_CONFIDENCE: + # Find detection with highest confidence + max_confidence = 0 + selected_index = 0 + + for i, detection in enumerate(detections): + if detection.confidence > max_confidence: + max_confidence = detection.confidence + selected_index = i + + return selected_index + + # Default fallback + return 0 def run( self, odometry_detections: merged_odometry_detections.MergedOdometryDetections - ) -> "tuple[bool, AutoLandingInformation]": + ) -> "tuple[bool, AutoLandingInformation | None]": """ Calculates the x and y angles in radians of the bounding box based on its center. @@ -87,8 +171,26 @@ def run( Returns an AutoLandingInformation object. """ - # TODO: Devise better algorithm to pick which detection to land at if several are detected - x_center, y_center = odometry_detections.detections[0].get_centre() + # Check if we have any detections + if not odometry_detections.detections: + self.__logger.warning("No detections available for auto-landing", True) + return False, None + + # Select which detection to use + selected_index = self._select_detection(odometry_detections.detections) + if selected_index is None: + self.__logger.error("Failed to select detection for auto-landing", True) + return False, None + + selected_detection = odometry_detections.detections[selected_index] + + # Log selection information + self.__logger.info( + f"Selected detection {selected_index + 1}/{len(odometry_detections.detections)} using strategy: {self.__selection_strategy.value}", + True, + ) + + x_center, y_center = selected_detection.get_centre() angle_x = (x_center - self.im_w / 2) * (self.fov_x * (math.pi / 180)) / self.im_w angle_y = (y_center - self.im_h / 2) * (self.fov_y * (math.pi / 180)) / self.im_h @@ -108,3 +210,94 @@ def run( ) return True, AutoLandingInformation(angle_x, angle_y, target_to_vehicle_dist) + + +class AutoLandingController: + """ + Controller for turning auto-landing on/off. + """ + + __create_key = object() + + @classmethod + def create( + cls, + auto_landing: AutoLanding, + local_logger: logger.Logger, + ) -> "tuple[bool, AutoLandingController | None]": + """ + Create an AutoLandingController instance. + + auto_landing: The AutoLanding instance to control. + local_logger: Logger instance for logging. + + Returns an AutoLandingController object. + """ + return True, AutoLandingController(cls.__create_key, auto_landing, local_logger) + + def __init__( + self, + class_private_create_key: object, + auto_landing: AutoLanding, + local_logger: logger.Logger, + ) -> None: + """ + Private constructor, use create() method. + """ + assert class_private_create_key is AutoLandingController.__create_key, "Use create() method" + + self.__auto_landing = auto_landing + self.__logger = local_logger + self.__enabled = False + self.__enabled_lock = threading.Lock() + + def is_enabled(self) -> bool: + """ + Check if auto-landing is enabled. + """ + with self.__enabled_lock: + return self.__enabled + + def enable(self) -> bool: + """ + Enable auto-landing system. + + Returns True if successfully enabled, False otherwise. + """ + with self.__enabled_lock: + if not self.__enabled: + self.__enabled = True + self.__logger.info("Auto-landing system enabled", True) + return True + self.__logger.warning("Auto-landing system already enabled", True) + return False + + def disable(self) -> bool: + """ + Disable auto-landing system. + + Returns True if successfully disabled, False otherwise. + """ + with self.__enabled_lock: + if self.__enabled: + self.__enabled = False + self.__logger.info("Auto-landing system disabled", True) + return True + self.__logger.warning("Auto-landing system already disabled", True) + return False + + def process_detections( + self, odometry_detections: merged_odometry_detections.MergedOdometryDetections + ) -> "tuple[bool, AutoLandingInformation | None]": + """ + Process detections if auto-landing is enabled. + + Returns landing information if processing was successful, None otherwise. + """ + with self.__enabled_lock: + if not self.__enabled: + return False, None + + # Process the detections using the auto-landing module + result, landing_info = self.__auto_landing.run(odometry_detections) + return result, landing_info diff --git a/modules/auto_landing/auto_landing_worker.py b/modules/auto_landing/auto_landing_worker.py index 62890e04..59a45f74 100644 --- a/modules/auto_landing/auto_landing_worker.py +++ b/modules/auto_landing/auto_landing_worker.py @@ -4,6 +4,7 @@ import os import pathlib +import queue import time from utilities.workers import queue_proxy_wrapper @@ -12,22 +13,40 @@ from ..common.modules.logger import logger +class AutoLandingCommand: + """ + Command structure for controlling auto-landing operations. + """ + + def __init__(self, command: str) -> None: + self.command = command + + def auto_landing_worker( fov_x: float, fov_y: float, im_h: float, im_w: float, period: float, + detection_strategy: auto_landing.DetectionSelectionStrategy, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, + command_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ - Worker process. + Worker process for auto-landing operations. - period: Wait time in seconds. - input_queue and output_queue are data queues. - controller is how the main process communicates to this worker process. + fov_x: Horizontal field of view in degrees. + fov_y: Vertical field of view in degrees. + im_h: Image height in pixels. + im_w: Image width in pixels. + period: Wait time in seconds between processing cycles. + detection_strategy: Strategy for selecting detection when multiple targets are present. + input_queue: Queue for receiving merged odometry detections. + output_queue: Queue for sending auto-landing information. + command_queue: Queue for receiving enable/disable commands. + controller: Worker controller for pause/exit management. """ worker_name = pathlib.Path(__file__).stem @@ -42,25 +61,106 @@ def auto_landing_worker( local_logger.info("Logger initialized", True) - result, auto_lander = auto_landing.AutoLanding.create(fov_x, fov_y, im_h, im_w, local_logger) - + # Create auto-landing instance + result, auto_lander = auto_landing.AutoLanding.create( + fov_x, fov_y, im_h, im_w, local_logger, detection_strategy + ) if not result: - local_logger.error("Worker failed to create class object", True) + local_logger.error("Worker failed to create AutoLanding object", True) return # Get Pylance to stop complaining assert auto_lander is not None + # Create auto-landing controller + result, landing_controller = auto_landing.AutoLandingController.create( + auto_lander, local_logger + ) + if not result: + local_logger.error("Worker failed to create AutoLandingController object", True) + return + + # Get Pylance to stop complaining + assert landing_controller is not None + + local_logger.info("Auto-landing worker initialized successfully", True) + while not controller.is_exit_requested(): controller.check_pause() - input_data = input_queue.queue.get() - if input_data is None: - continue + # Process commands first + _process_commands(command_queue, landing_controller, local_logger) - result, value = auto_lander.run(input_data) - if not result: - continue + # Process detections if available + input_data = None + try: + input_data = input_queue.queue.get_nowait() + except queue.Empty: + # No data available, continue + pass + + if input_data is not None: + result, landing_info = landing_controller.process_detections(input_data) + if result and landing_info: + output_queue.queue.put(landing_info) - output_queue.queue.put(value) time.sleep(period) + + +def _process_commands( + command_queue: queue_proxy_wrapper.QueueProxyWrapper, + landing_controller: auto_landing.AutoLandingController, + local_logger: logger.Logger, +) -> None: + """ + Process all available commands in the command queue. + + command_queue: Queue containing AutoLandingCommand objects. + landing_controller: Controller instance to execute commands on. + local_logger: Logger for command processing. + """ + while True: + try: + command = command_queue.queue.get_nowait() + if command is None: + break + + if isinstance(command, AutoLandingCommand): + _execute_command(command, landing_controller, local_logger) + else: + local_logger.warning(f"Received invalid command type: {type(command)}", True) + + except queue.Empty: + # No more commands available + break + + +def _execute_command( + command: AutoLandingCommand, + landing_controller: auto_landing.AutoLandingController, + local_logger: logger.Logger, +) -> None: + """ + Execute an auto-landing command. + + command: Command to execute. + landing_controller: Controller instance to execute command on. + local_logger: Logger for command execution. + """ + local_logger.info(f"Executing command: {command.command}", True) + + success = False + if command.command == "enable": + success = landing_controller.enable() + elif command.command == "disable": + success = landing_controller.disable() + else: + local_logger.error( + f"Unknown command: {command.command}. Only 'enable' and 'disable' are supported.", True + ) + return + + if success: + local_logger.info(f"Command '{command.command}' executed successfully", True) + else: + local_logger.error(f"Command '{command.command}' failed to execute", True) diff --git a/tests/integration/test_auto_landing_worker.py b/tests/integration/test_auto_landing_worker.py new file mode 100644 index 00000000..07e9edaf --- /dev/null +++ b/tests/integration/test_auto_landing_worker.py @@ -0,0 +1,248 @@ +""" +Test auto-landing worker process. +""" + +import datetime +import multiprocessing as mp +import pathlib +import time + +import numpy as np + +from modules import detections_and_time +from modules import merged_odometry_detections +from modules.auto_landing import auto_landing +from modules.auto_landing import auto_landing_worker +from modules.common.modules import orientation +from modules.common.modules import position_local +from modules.common.modules.mavlink import drone_odometry_local +from modules.common.modules.logger import logger +from utilities.workers import queue_proxy_wrapper +from utilities.workers import worker_controller + +# Worker parameters +FOV_X = 90.0 # degrees +FOV_Y = 90.0 # degrees +IMAGE_HEIGHT = 640.0 # pixels +IMAGE_WIDTH = 640.0 # pixels +WORKER_PERIOD = 0.1 # seconds +DETECTION_STRATEGY = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER +LOG_TIMINGS = False # Disable timing logging for the test + +# Ensure logs directory exists and create timestamped subdirectory +LOG_DIR = pathlib.Path("logs") +LOG_DIR.mkdir(parents=True, exist_ok=True) + +# Create a timestamped subdirectory for this test session +TIMESTAMP_FORMAT = "%Y-%m-%d_%H-%M-%S" +test_session_dir = LOG_DIR / datetime.datetime.now().strftime(TIMESTAMP_FORMAT) +test_session_dir.mkdir(parents=True, exist_ok=True) + + +def simulate_detection_input( + input_queue: queue_proxy_wrapper.QueueProxyWrapper, + detections: list, + position: tuple, + orientation_angles: tuple, +) -> None: + """ + Create and place merged odometry detections into the input queue. + """ + # Create drone position + result, drone_position = position_local.PositionLocal.create( + position[0], position[1], position[2] + ) + assert result + assert drone_position is not None + + # Create drone orientation + result, drone_orientation = orientation.Orientation.create( + orientation_angles[0], orientation_angles[1], orientation_angles[2] + ) + assert result + assert drone_orientation is not None + + # Create drone odometry + result, drone_odometry = drone_odometry_local.DroneOdometryLocal.create( + drone_position, drone_orientation + ) + assert result + assert drone_odometry is not None + + # Create merged odometry detections + result, merged = merged_odometry_detections.MergedOdometryDetections.create( + drone_odometry, detections + ) + assert result + assert merged is not None + + input_queue.queue.put(merged) + + +def create_test_detection( + bbox: list, label: int, confidence: float +) -> detections_and_time.Detection: + """ + Create a test detection with the given parameters. + """ + result, detection = detections_and_time.Detection.create( + np.array(bbox, dtype=np.float32), label, confidence + ) + assert result + assert detection is not None + return detection + + +def main() -> int: + """ + Main function. + """ + # Logger + test_name = pathlib.Path(__file__).stem + result, local_logger = logger.Logger.create(test_name, LOG_TIMINGS) + assert result # Logger initialization should succeed + assert local_logger is not None + + # Setup + controller = worker_controller.WorkerController() + + mp_manager = mp.Manager() + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + + # Create worker process + worker = mp.Process( + target=auto_landing_worker.auto_landing_worker, + args=( + FOV_X, + FOV_Y, + IMAGE_HEIGHT, + IMAGE_WIDTH, + WORKER_PERIOD, + DETECTION_STRATEGY, + input_queue, + output_queue, + command_queue, + controller, + ), + ) + + # Start worker + worker.start() + + # Give worker time to initialize + time.sleep(0.5) + + # Test 1: Worker should not process detections when disabled (default state) + detection1 = create_test_detection([200, 200, 400, 400], 1, 0.9) + simulate_detection_input( + input_queue, + [detection1], + (0.0, 0.0, -50.0), # 50 meters above ground + (0.0, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have no output since auto-landing is disabled by default + assert output_queue.queue.empty() + + # Test 2: Enable auto-landing and verify it processes detections + enable_command = auto_landing_worker.AutoLandingCommand("enable") + command_queue.queue.put(enable_command) + + time.sleep(0.2) + + # Now send the same detection - should be processed + detection2 = create_test_detection([300, 300, 500, 500], 2, 0.85) + simulate_detection_input( + input_queue, + [detection2], + (10.0, 5.0, -75.0), # 75 meters above ground + (0.1, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have output now + assert not output_queue.queue.empty() + landing_info = output_queue.queue.get_nowait() + assert landing_info is not None + assert hasattr(landing_info, "angle_x") + assert hasattr(landing_info, "angle_y") + assert hasattr(landing_info, "target_dist") + + # Test 3: Test with multiple detections (should use NEAREST_TO_CENTER strategy) + detection3 = create_test_detection([100, 100, 200, 200], 1, 0.7) # Far from center + detection4 = create_test_detection([310, 310, 330, 330], 2, 0.8) # Close to center (320, 320) + + simulate_detection_input( + input_queue, + [detection3, detection4], + (0.0, 0.0, -100.0), # 100 meters above ground + (0.0, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have output for the detection closest to center + assert not output_queue.queue.empty() + landing_info2 = output_queue.queue.get_nowait() + assert landing_info2 is not None + + # Test 4: Disable auto-landing and verify it stops processing + disable_command = auto_landing_worker.AutoLandingCommand("disable") + command_queue.queue.put(disable_command) + + time.sleep(0.2) + + # Send another detection - should not be processed + detection5 = create_test_detection([400, 400, 600, 600], 3, 0.95) + simulate_detection_input( + input_queue, + [detection5], + (0.0, 0.0, -60.0), + (0.0, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have no new output + assert output_queue.queue.empty() + + # Test 5: Test invalid command handling + invalid_command = auto_landing_worker.AutoLandingCommand("invalid_command") + command_queue.queue.put(invalid_command) + + time.sleep(0.2) + + # Worker should continue running despite invalid command + assert worker.is_alive() + + # Test 6: Test with no detections (empty detection list should not crash) + # This should not create a MergedOdometryDetections object since it requires non-empty detections + # So we just verify the worker continues running + + # Cleanup + controller.request_exit() + + # Drain queues + input_queue.fill_and_drain_queue() + command_queue.fill_and_drain_queue() + + # Wait for worker to finish + worker.join(timeout=5.0) + if worker.is_alive(): + worker.terminate() + worker.join() + + return 0 + + +if __name__ == "__main__": + result_main = main() + if result_main < 0: + print(f"ERROR: Status code: {result_main}") + + print("Done!") From 2e93ec17988109a76836d4fa8bc6b1f7352e78e4 Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Sun, 27 Jul 2025 02:55:01 -0400 Subject: [PATCH 02/10] remove typing import --- modules/auto_landing/auto_landing.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/auto_landing/auto_landing.py b/modules/auto_landing/auto_landing.py index 5bfcf3c8..a437ab2d 100644 --- a/modules/auto_landing/auto_landing.py +++ b/modules/auto_landing/auto_landing.py @@ -5,7 +5,6 @@ import math from enum import Enum -from typing import Optional import threading from .. import merged_odometry_detections @@ -96,7 +95,7 @@ def __init__( self.__logger = local_logger self.__selection_strategy = selection_strategy - def _select_detection(self, detections: list) -> Optional[int]: + def _select_detection(self, detections: list) -> int | None: """ Select which detection to use based on the configured strategy. From 1bc325958676b750f2e7d64c999c5ba84cd7282d Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Sun, 27 Jul 2025 03:11:51 -0400 Subject: [PATCH 03/10] remove largest area and nearest to center --- modules/auto_landing/auto_landing.py | 45 ++-------------------------- 1 file changed, 3 insertions(+), 42 deletions(-) diff --git a/modules/auto_landing/auto_landing.py b/modules/auto_landing/auto_landing.py index a437ab2d..4ba8ca4b 100644 --- a/modules/auto_landing/auto_landing.py +++ b/modules/auto_landing/auto_landing.py @@ -16,10 +16,8 @@ class DetectionSelectionStrategy(Enum): Strategies for selecting which detection to use when multiple targets are detected. """ - NEAREST_TO_CENTER = "nearest_to_center" # Choose detection closest to image center - LARGEST_AREA = "largest_area" # Choose detection with largest bounding box area - HIGHEST_CONFIDENCE = "highest_confidence" # Choose detection with highest confidence FIRST_DETECTION = "first_detection" # Use first detection in list (original behavior) + HIGHEST_CONFIDENCE = "highest_confidence" # Choose detection with highest confidence class AutoLandingInformation: @@ -57,7 +55,7 @@ def create( im_h: float, im_w: float, local_logger: logger.Logger, - selection_strategy: DetectionSelectionStrategy = DetectionSelectionStrategy.NEAREST_TO_CENTER, + selection_strategy: DetectionSelectionStrategy = DetectionSelectionStrategy.FIRST_DETECTION, ) -> "tuple [bool, AutoLanding | None ]": """ fov_x: The horizontal camera field of view in degrees. @@ -104,46 +102,9 @@ def _select_detection(self, detections: list) -> int | None: if not detections: return None - if len(detections) == 1: + if len(detections) == 1 or self.__selection_strategy == DetectionSelectionStrategy.FIRST_DETECTION: return 0 - if self.__selection_strategy == DetectionSelectionStrategy.FIRST_DETECTION: - return 0 - - if self.__selection_strategy == DetectionSelectionStrategy.NEAREST_TO_CENTER: - # Find detection closest to image center - image_center_x = self.im_w / 2 - image_center_y = self.im_h / 2 - - min_distance = float("inf") - selected_index = 0 - - for i, detection in enumerate(detections): - det_center_x, det_center_y = detection.get_centre() - distance = math.sqrt( - (det_center_x - image_center_x) ** 2 + (det_center_y - image_center_y) ** 2 - ) - if distance < min_distance: - min_distance = distance - selected_index = i - - return selected_index - - if self.__selection_strategy == DetectionSelectionStrategy.LARGEST_AREA: - # Find detection with largest bounding box area - max_area = 0 - selected_index = 0 - - for i, detection in enumerate(detections): - width = detection.x_2 - detection.x_1 - height = detection.y_2 - detection.y_1 - area = width * height - if area > max_area: - max_area = area - selected_index = i - - return selected_index - if self.__selection_strategy == DetectionSelectionStrategy.HIGHEST_CONFIDENCE: # Find detection with highest confidence max_confidence = 0 From 41e3923b9bd86021514617479b3d11115b0266fc Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Sun, 27 Jul 2025 03:40:57 -0400 Subject: [PATCH 04/10] Remove command queue and add a constant to config.yaml --- config.yaml | 3 + modules/auto_landing/auto_landing.py | 92 --------------------- modules/auto_landing/auto_landing_worker.py | 86 +------------------ 3 files changed, 4 insertions(+), 177 deletions(-) diff --git a/config.yaml b/config.yaml index e6a3a0e1..af4d4dd8 100644 --- a/config.yaml +++ b/config.yaml @@ -82,3 +82,6 @@ cluster_estimation: communications: timeout: 60.0 # seconds worker_period: 0.5 # seconds + +auto_landing: + enabled: true diff --git a/modules/auto_landing/auto_landing.py b/modules/auto_landing/auto_landing.py index 4ba8ca4b..8269c2f8 100644 --- a/modules/auto_landing/auto_landing.py +++ b/modules/auto_landing/auto_landing.py @@ -5,7 +5,6 @@ import math from enum import Enum -import threading from .. import merged_odometry_detections from ..common.modules.logger import logger @@ -170,94 +169,3 @@ def run( ) return True, AutoLandingInformation(angle_x, angle_y, target_to_vehicle_dist) - - -class AutoLandingController: - """ - Controller for turning auto-landing on/off. - """ - - __create_key = object() - - @classmethod - def create( - cls, - auto_landing: AutoLanding, - local_logger: logger.Logger, - ) -> "tuple[bool, AutoLandingController | None]": - """ - Create an AutoLandingController instance. - - auto_landing: The AutoLanding instance to control. - local_logger: Logger instance for logging. - - Returns an AutoLandingController object. - """ - return True, AutoLandingController(cls.__create_key, auto_landing, local_logger) - - def __init__( - self, - class_private_create_key: object, - auto_landing: AutoLanding, - local_logger: logger.Logger, - ) -> None: - """ - Private constructor, use create() method. - """ - assert class_private_create_key is AutoLandingController.__create_key, "Use create() method" - - self.__auto_landing = auto_landing - self.__logger = local_logger - self.__enabled = False - self.__enabled_lock = threading.Lock() - - def is_enabled(self) -> bool: - """ - Check if auto-landing is enabled. - """ - with self.__enabled_lock: - return self.__enabled - - def enable(self) -> bool: - """ - Enable auto-landing system. - - Returns True if successfully enabled, False otherwise. - """ - with self.__enabled_lock: - if not self.__enabled: - self.__enabled = True - self.__logger.info("Auto-landing system enabled", True) - return True - self.__logger.warning("Auto-landing system already enabled", True) - return False - - def disable(self) -> bool: - """ - Disable auto-landing system. - - Returns True if successfully disabled, False otherwise. - """ - with self.__enabled_lock: - if self.__enabled: - self.__enabled = False - self.__logger.info("Auto-landing system disabled", True) - return True - self.__logger.warning("Auto-landing system already disabled", True) - return False - - def process_detections( - self, odometry_detections: merged_odometry_detections.MergedOdometryDetections - ) -> "tuple[bool, AutoLandingInformation | None]": - """ - Process detections if auto-landing is enabled. - - Returns landing information if processing was successful, None otherwise. - """ - with self.__enabled_lock: - if not self.__enabled: - return False, None - - # Process the detections using the auto-landing module - result, landing_info = self.__auto_landing.run(odometry_detections) - return result, landing_info diff --git a/modules/auto_landing/auto_landing_worker.py b/modules/auto_landing/auto_landing_worker.py index 59a45f74..4271632b 100644 --- a/modules/auto_landing/auto_landing_worker.py +++ b/modules/auto_landing/auto_landing_worker.py @@ -13,15 +13,6 @@ from ..common.modules.logger import logger -class AutoLandingCommand: - """ - Command structure for controlling auto-landing operations. - """ - - def __init__(self, command: str) -> None: - self.command = command - - def auto_landing_worker( fov_x: float, fov_y: float, @@ -31,7 +22,6 @@ def auto_landing_worker( detection_strategy: auto_landing.DetectionSelectionStrategy, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, - command_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ @@ -45,7 +35,6 @@ def auto_landing_worker( detection_strategy: Strategy for selecting detection when multiple targets are present. input_queue: Queue for receiving merged odometry detections. output_queue: Queue for sending auto-landing information. - command_queue: Queue for receiving enable/disable commands. controller: Worker controller for pause/exit management. """ @@ -72,25 +61,11 @@ def auto_landing_worker( # Get Pylance to stop complaining assert auto_lander is not None - # Create auto-landing controller - result, landing_controller = auto_landing.AutoLandingController.create( - auto_lander, local_logger - ) - if not result: - local_logger.error("Worker failed to create AutoLandingController object", True) - return - - # Get Pylance to stop complaining - assert landing_controller is not None - local_logger.info("Auto-landing worker initialized successfully", True) while not controller.is_exit_requested(): controller.check_pause() - # Process commands first - _process_commands(command_queue, landing_controller, local_logger) - # Process detections if available input_data = None try: @@ -100,67 +75,8 @@ def auto_landing_worker( pass if input_data is not None: - result, landing_info = landing_controller.process_detections(input_data) + result, landing_info = auto_lander.run(input_data) if result and landing_info: output_queue.queue.put(landing_info) time.sleep(period) - - -def _process_commands( - command_queue: queue_proxy_wrapper.QueueProxyWrapper, - landing_controller: auto_landing.AutoLandingController, - local_logger: logger.Logger, -) -> None: - """ - Process all available commands in the command queue. - - command_queue: Queue containing AutoLandingCommand objects. - landing_controller: Controller instance to execute commands on. - local_logger: Logger for command processing. - """ - while True: - try: - command = command_queue.queue.get_nowait() - if command is None: - break - - if isinstance(command, AutoLandingCommand): - _execute_command(command, landing_controller, local_logger) - else: - local_logger.warning(f"Received invalid command type: {type(command)}", True) - - except queue.Empty: - # No more commands available - break - - -def _execute_command( - command: AutoLandingCommand, - landing_controller: auto_landing.AutoLandingController, - local_logger: logger.Logger, -) -> None: - """ - Execute an auto-landing command. - - command: Command to execute. - landing_controller: Controller instance to execute command on. - local_logger: Logger for command execution. - """ - local_logger.info(f"Executing command: {command.command}", True) - - success = False - if command.command == "enable": - success = landing_controller.enable() - elif command.command == "disable": - success = landing_controller.disable() - else: - local_logger.error( - f"Unknown command: {command.command}. Only 'enable' and 'disable' are supported.", True - ) - return - - if success: - local_logger.info(f"Command '{command.command}' executed successfully", True) - else: - local_logger.error(f"Command '{command.command}' failed to execute", True) From 52f1cf18716eddba345b42baa5cc4421795d36a2 Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Sun, 27 Jul 2025 04:13:15 -0400 Subject: [PATCH 05/10] modify test file --- tests/integration/test_auto_landing_worker.py | 82 +++++-------------- 1 file changed, 20 insertions(+), 62 deletions(-) diff --git a/tests/integration/test_auto_landing_worker.py b/tests/integration/test_auto_landing_worker.py index 07e9edaf..68b10937 100644 --- a/tests/integration/test_auto_landing_worker.py +++ b/tests/integration/test_auto_landing_worker.py @@ -26,7 +26,8 @@ IMAGE_HEIGHT = 640.0 # pixels IMAGE_WIDTH = 640.0 # pixels WORKER_PERIOD = 0.1 # seconds -DETECTION_STRATEGY = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER +# The worker now defaults to HIGHEST_CONFIDENCE, so we test for that +DETECTION_STRATEGY = auto_landing.DetectionSelectionStrategy.HIGHEST_CONFIDENCE LOG_TIMINGS = False # Disable timing logging for the test # Ensure logs directory exists and create timestamped subdirectory @@ -109,7 +110,6 @@ def main() -> int: mp_manager = mp.Manager() input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) # Create worker process worker = mp.Process( @@ -123,7 +123,6 @@ def main() -> int: DETECTION_STRATEGY, input_queue, output_queue, - command_queue, controller, ), ) @@ -134,7 +133,8 @@ def main() -> int: # Give worker time to initialize time.sleep(0.5) - # Test 1: Worker should not process detections when disabled (default state) + # Test 1: Send a single detection and verify it's processed + local_logger.info("--- Test 1: Processing single detection ---") detection1 = create_test_detection([200, 200, 400, 400], 1, 0.9) simulate_detection_input( input_queue, @@ -145,26 +145,6 @@ def main() -> int: time.sleep(0.2) - # Should have no output since auto-landing is disabled by default - assert output_queue.queue.empty() - - # Test 2: Enable auto-landing and verify it processes detections - enable_command = auto_landing_worker.AutoLandingCommand("enable") - command_queue.queue.put(enable_command) - - time.sleep(0.2) - - # Now send the same detection - should be processed - detection2 = create_test_detection([300, 300, 500, 500], 2, 0.85) - simulate_detection_input( - input_queue, - [detection2], - (10.0, 5.0, -75.0), # 75 meters above ground - (0.1, 0.0, 0.0), - ) - - time.sleep(0.2) - # Should have output now assert not output_queue.queue.empty() landing_info = output_queue.queue.get_nowait() @@ -172,64 +152,42 @@ def main() -> int: assert hasattr(landing_info, "angle_x") assert hasattr(landing_info, "angle_y") assert hasattr(landing_info, "target_dist") + local_logger.info("--- Test 1 Passed ---") - # Test 3: Test with multiple detections (should use NEAREST_TO_CENTER strategy) - detection3 = create_test_detection([100, 100, 200, 200], 1, 0.7) # Far from center - detection4 = create_test_detection([310, 310, 330, 330], 2, 0.8) # Close to center (320, 320) + # Test 2: Test with multiple detections (should use HIGHEST_CONFIDENCE strategy) + local_logger.info("--- Test 2: Processing multiple detections with HIGHEST_CONFIDENCE ---") + detection_low_confidence = create_test_detection([100, 100, 200, 200], 1, 0.7) + detection_high_confidence = create_test_detection([320, 320, 320, 320], 2, 0.95) # This one should be chosen simulate_detection_input( input_queue, - [detection3, detection4], + [detection_low_confidence, detection_high_confidence], (0.0, 0.0, -100.0), # 100 meters above ground (0.0, 0.0, 0.0), ) time.sleep(0.2) - # Should have output for the detection closest to center + # Should have output for the detection with the highest confidence assert not output_queue.queue.empty() landing_info2 = output_queue.queue.get_nowait() assert landing_info2 is not None - # Test 4: Disable auto-landing and verify it stops processing - disable_command = auto_landing_worker.AutoLandingCommand("disable") - command_queue.queue.put(disable_command) - - time.sleep(0.2) - - # Send another detection - should not be processed - detection5 = create_test_detection([400, 400, 600, 600], 3, 0.95) - simulate_detection_input( - input_queue, - [detection5], - (0.0, 0.0, -60.0), - (0.0, 0.0, 0.0), - ) - - time.sleep(0.2) - - # Should have no new output - assert output_queue.queue.empty() - - # Test 5: Test invalid command handling - invalid_command = auto_landing_worker.AutoLandingCommand("invalid_command") - command_queue.queue.put(invalid_command) - - time.sleep(0.2) - - # Worker should continue running despite invalid command - assert worker.is_alive() + # To verify the correct detection was chosen, we can check the calculated angles. + # The high confidence detection is at (320, 320), which is the center of the 640x640 image. + # Therefore, the angles should be 0. + assert landing_info2.angle_x == 0.0 + assert landing_info2.angle_y == 0.0 + local_logger.info("--- Test 2 Passed ---") - # Test 6: Test with no detections (empty detection list should not crash) - # This should not create a MergedOdometryDetections object since it requires non-empty detections - # So we just verify the worker continues running + # The case of "no detections" is handled by the worker's queue.get_nowait() exception. + # No specific test is needed for an empty detection list as the data structure does not allow it. # Cleanup controller.request_exit() # Drain queues input_queue.fill_and_drain_queue() - command_queue.fill_and_drain_queue() # Wait for worker to finish worker.join(timeout=5.0) @@ -245,4 +203,4 @@ def main() -> int: if result_main < 0: print(f"ERROR: Status code: {result_main}") - print("Done!") + print("Done!") \ No newline at end of file From d39db205667b23a013c56b3f01dd2bcf0d89ab71 Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Sun, 27 Jul 2025 04:36:40 -0400 Subject: [PATCH 06/10] Add testing using config.yaml --- tests/integration/test_auto_landing_worker.py | 116 +++++++++++++----- 1 file changed, 88 insertions(+), 28 deletions(-) diff --git a/tests/integration/test_auto_landing_worker.py b/tests/integration/test_auto_landing_worker.py index 68b10937..699be2ac 100644 --- a/tests/integration/test_auto_landing_worker.py +++ b/tests/integration/test_auto_landing_worker.py @@ -6,6 +6,7 @@ import multiprocessing as mp import pathlib import time +import math import numpy as np @@ -17,18 +18,17 @@ from modules.common.modules import position_local from modules.common.modules.mavlink import drone_odometry_local from modules.common.modules.logger import logger +from modules.common.modules.read_yaml import read_yaml from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller -# Worker parameters -FOV_X = 90.0 # degrees -FOV_Y = 90.0 # degrees -IMAGE_HEIGHT = 640.0 # pixels -IMAGE_WIDTH = 640.0 # pixels + +# Test-specific parameters WORKER_PERIOD = 0.1 # seconds -# The worker now defaults to HIGHEST_CONFIDENCE, so we test for that -DETECTION_STRATEGY = auto_landing.DetectionSelectionStrategy.HIGHEST_CONFIDENCE LOG_TIMINGS = False # Disable timing logging for the test +CONFIG_FILE_PATH = pathlib.Path("config.yaml") +detection_strategy = auto_landing.DetectionSelectionStrategy.FIRST_DETECTION +# detection_strategy = auto_landing.DetectionSelectionStrategy.HIGHEST_CONFIDENCE # Ensure logs directory exists and create timestamped subdirectory LOG_DIR = pathlib.Path("logs") @@ -94,6 +94,20 @@ def create_test_detection( return detection +def calculate_expected_angles(detection_bbox: list, fov_x_deg: float, fov_y_deg: float, im_w: float, im_h: float) -> tuple[float, float]: + """ + Calculates the expected angle_x and angle_y for a given detection bounding box + and camera parameters, mirroring the logic in auto_landing.py. + Assumes fov_x_deg and fov_y_deg are in degrees. + """ + x_center = (detection_bbox[0] + detection_bbox[2]) / 2 + y_center = (detection_bbox[1] + detection_bbox[3]) / 2 + + angle_x = (x_center - im_w / 2) * (fov_x_deg * (math.pi / 180)) / im_w + angle_y = (y_center - im_h / 2) * (fov_y_deg * (math.pi / 180)) / im_h + return angle_x, angle_y + + def main() -> int: """ Main function. @@ -104,6 +118,28 @@ def main() -> int: assert result # Logger initialization should succeed assert local_logger is not None + # Read config file + result, config = read_yaml.open_config(CONFIG_FILE_PATH) + assert result and config is not None, "Failed to read config file" + + # Check if the feature is enabled in the config + auto_landing_config = config.get("auto_landing", {}) + if not auto_landing_config.get("enabled", False): + local_logger.info("Auto-landing is disabled in config.yaml, skipping test.") + return 0 + + # Extract parameters from config + try: + fov_x = config["geolocation"]["fov_x"] + fov_y = config["geolocation"]["fov_y"] + image_height = config["video_input"]["height"] + image_width = config["video_input"]["width"] + except KeyError as e: + local_logger.error(f"Config file missing required key: {e}") + return -1 + + + # Setup controller = worker_controller.WorkerController() @@ -111,16 +147,16 @@ def main() -> int: input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - # Create worker process + # Create worker process using parameters from config worker = mp.Process( target=auto_landing_worker.auto_landing_worker, args=( - FOV_X, - FOV_Y, - IMAGE_HEIGHT, - IMAGE_WIDTH, + fov_x, + fov_y, + image_height, + image_width, WORKER_PERIOD, - DETECTION_STRATEGY, + detection_strategy, # Explicitly pass the strategy to be tested input_queue, output_queue, controller, @@ -135,7 +171,8 @@ def main() -> int: # Test 1: Send a single detection and verify it's processed local_logger.info("--- Test 1: Processing single detection ---") - detection1 = create_test_detection([200, 200, 400, 400], 1, 0.9) + detection1_bbox = [200, 200, 400, 400] + detection1 = create_test_detection(detection1_bbox, 1, 0.9) simulate_detection_input( input_queue, [detection1], @@ -154,35 +191,58 @@ def main() -> int: assert hasattr(landing_info, "target_dist") local_logger.info("--- Test 1 Passed ---") - # Test 2: Test with multiple detections (should use HIGHEST_CONFIDENCE strategy) - local_logger.info("--- Test 2: Processing multiple detections with HIGHEST_CONFIDENCE ---") - detection_low_confidence = create_test_detection([100, 100, 200, 200], 1, 0.7) - detection_high_confidence = create_test_detection([320, 320, 320, 320], 2, 0.95) # This one should be chosen + # Test 2: Verify strategy with multiple detections + local_logger.info(f"--- Test 2: Verifying {detection_strategy.value} strategy ---") + + # This is the first detection in the list + detection_first_bbox = [100, 100, 200, 200] + detection_first = create_test_detection(detection_first_bbox, 1, 0.7) + + # This detection is at the center and has higher confidence + center_x = image_width / 2 + center_y = image_height / 2 + detection_center_bbox = [center_x, center_y, center_x, center_y] + detection_center = create_test_detection(detection_center_bbox, 2, 0.95) + + # Order matters for FIRST_DETECTION strategy + detections_for_test_2 = [detection_first, detection_center] simulate_detection_input( input_queue, - [detection_low_confidence, detection_high_confidence], + detections_for_test_2, (0.0, 0.0, -100.0), # 100 meters above ground (0.0, 0.0, 0.0), ) time.sleep(0.2) - # Should have output for the detection with the highest confidence + # Should have output assert not output_queue.queue.empty() landing_info2 = output_queue.queue.get_nowait() assert landing_info2 is not None - # To verify the correct detection was chosen, we can check the calculated angles. - # The high confidence detection is at (320, 320), which is the center of the 640x640 image. - # Therefore, the angles should be 0. - assert landing_info2.angle_x == 0.0 - assert landing_info2.angle_y == 0.0 + # Determine expected angles based on the strategy + expected_angle_x = 0.0 + expected_angle_y = 0.0 + + if detection_strategy == auto_landing.DetectionSelectionStrategy.HIGHEST_CONFIDENCE: + # If HIGHEST_CONFIDENCE, it should pick detection_center + expected_angle_x, expected_angle_y = calculate_expected_angles( + detection_center_bbox, fov_x, fov_y, image_width, image_height + ) + elif detection_strategy == auto_landing.DetectionSelectionStrategy.FIRST_DETECTION: + # If FIRST_DETECTION, it should pick detection_first + expected_angle_x, expected_angle_y = calculate_expected_angles( + detection_first_bbox, fov_x, fov_y, image_width, image_height + ) + + # Verify the calculated angles using a tolerance for float comparison + assert math.isclose(landing_info2.angle_x, expected_angle_x, rel_tol=1e-7), \ + f"Expected angle_x to be {expected_angle_x}, but got {landing_info2.angle_x}" + assert math.isclose(landing_info2.angle_y, expected_angle_y, rel_tol=1e-7), \ + f"Expected angle_y to be {expected_angle_y}, but got {landing_info2.angle_y}" local_logger.info("--- Test 2 Passed ---") - # The case of "no detections" is handled by the worker's queue.get_nowait() exception. - # No specific test is needed for an empty detection list as the data structure does not allow it. - # Cleanup controller.request_exit() From 26a7ecd7fecab9b074d6a8295f47ce062f16a515 Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Sun, 27 Jul 2025 04:40:51 -0400 Subject: [PATCH 07/10] Pass linters --- modules/auto_landing/auto_landing.py | 5 +++- tests/integration/test_auto_landing_worker.py | 24 ++++++++++--------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/modules/auto_landing/auto_landing.py b/modules/auto_landing/auto_landing.py index 8269c2f8..be596d3c 100644 --- a/modules/auto_landing/auto_landing.py +++ b/modules/auto_landing/auto_landing.py @@ -101,7 +101,10 @@ def _select_detection(self, detections: list) -> int | None: if not detections: return None - if len(detections) == 1 or self.__selection_strategy == DetectionSelectionStrategy.FIRST_DETECTION: + if ( + len(detections) == 1 + or self.__selection_strategy == DetectionSelectionStrategy.FIRST_DETECTION + ): return 0 if self.__selection_strategy == DetectionSelectionStrategy.HIGHEST_CONFIDENCE: diff --git a/tests/integration/test_auto_landing_worker.py b/tests/integration/test_auto_landing_worker.py index 699be2ac..d1a78059 100644 --- a/tests/integration/test_auto_landing_worker.py +++ b/tests/integration/test_auto_landing_worker.py @@ -94,7 +94,9 @@ def create_test_detection( return detection -def calculate_expected_angles(detection_bbox: list, fov_x_deg: float, fov_y_deg: float, im_w: float, im_h: float) -> tuple[float, float]: +def calculate_expected_angles( + detection_bbox: list, fov_x_deg: float, fov_y_deg: float, im_w: float, im_h: float +) -> tuple[float, float]: """ Calculates the expected angle_x and angle_y for a given detection bounding box and camera parameters, mirroring the logic in auto_landing.py. @@ -138,8 +140,6 @@ def main() -> int: local_logger.error(f"Config file missing required key: {e}") return -1 - - # Setup controller = worker_controller.WorkerController() @@ -156,7 +156,7 @@ def main() -> int: image_height, image_width, WORKER_PERIOD, - detection_strategy, # Explicitly pass the strategy to be tested + detection_strategy, # Explicitly pass the strategy to be tested input_queue, output_queue, controller, @@ -193,11 +193,11 @@ def main() -> int: # Test 2: Verify strategy with multiple detections local_logger.info(f"--- Test 2: Verifying {detection_strategy.value} strategy ---") - + # This is the first detection in the list detection_first_bbox = [100, 100, 200, 200] detection_first = create_test_detection(detection_first_bbox, 1, 0.7) - + # This detection is at the center and has higher confidence center_x = image_width / 2 center_y = image_height / 2 @@ -237,10 +237,12 @@ def main() -> int: ) # Verify the calculated angles using a tolerance for float comparison - assert math.isclose(landing_info2.angle_x, expected_angle_x, rel_tol=1e-7), \ - f"Expected angle_x to be {expected_angle_x}, but got {landing_info2.angle_x}" - assert math.isclose(landing_info2.angle_y, expected_angle_y, rel_tol=1e-7), \ - f"Expected angle_y to be {expected_angle_y}, but got {landing_info2.angle_y}" + assert math.isclose( + landing_info2.angle_x, expected_angle_x, rel_tol=1e-7 + ), f"Expected angle_x to be {expected_angle_x}, but got {landing_info2.angle_x}" + assert math.isclose( + landing_info2.angle_y, expected_angle_y, rel_tol=1e-7 + ), f"Expected angle_y to be {expected_angle_y}, but got {landing_info2.angle_y}" local_logger.info("--- Test 2 Passed ---") # Cleanup @@ -263,4 +265,4 @@ def main() -> int: if result_main < 0: print(f"ERROR: Status code: {result_main}") - print("Done!") \ No newline at end of file + print("Done!") From 413e7b1887d3569680ffbee74bd83b43e7b14dad Mon Sep 17 00:00:00 2001 From: Daniel Zhao <94013292+danielquzhao@users.noreply.github.com> Date: Sun, 24 Aug 2025 16:32:34 -0600 Subject: [PATCH 08/10] Update test_auto_landing_worker.py --- tests/integration/test_auto_landing_worker.py | 35 ++++--------------- 1 file changed, 7 insertions(+), 28 deletions(-) diff --git a/tests/integration/test_auto_landing_worker.py b/tests/integration/test_auto_landing_worker.py index d1a78059..c01e4d06 100644 --- a/tests/integration/test_auto_landing_worker.py +++ b/tests/integration/test_auto_landing_worker.py @@ -2,7 +2,6 @@ Test auto-landing worker process. """ -import datetime import multiprocessing as mp import pathlib import time @@ -17,7 +16,6 @@ from modules.common.modules import orientation from modules.common.modules import position_local from modules.common.modules.mavlink import drone_odometry_local -from modules.common.modules.logger import logger from modules.common.modules.read_yaml import read_yaml from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller @@ -25,21 +23,10 @@ # Test-specific parameters WORKER_PERIOD = 0.1 # seconds -LOG_TIMINGS = False # Disable timing logging for the test CONFIG_FILE_PATH = pathlib.Path("config.yaml") detection_strategy = auto_landing.DetectionSelectionStrategy.FIRST_DETECTION # detection_strategy = auto_landing.DetectionSelectionStrategy.HIGHEST_CONFIDENCE -# Ensure logs directory exists and create timestamped subdirectory -LOG_DIR = pathlib.Path("logs") -LOG_DIR.mkdir(parents=True, exist_ok=True) - -# Create a timestamped subdirectory for this test session -TIMESTAMP_FORMAT = "%Y-%m-%d_%H-%M-%S" -test_session_dir = LOG_DIR / datetime.datetime.now().strftime(TIMESTAMP_FORMAT) -test_session_dir.mkdir(parents=True, exist_ok=True) - - def simulate_detection_input( input_queue: queue_proxy_wrapper.QueueProxyWrapper, detections: list, @@ -114,12 +101,6 @@ def main() -> int: """ Main function. """ - # Logger - test_name = pathlib.Path(__file__).stem - result, local_logger = logger.Logger.create(test_name, LOG_TIMINGS) - assert result # Logger initialization should succeed - assert local_logger is not None - # Read config file result, config = read_yaml.open_config(CONFIG_FILE_PATH) assert result and config is not None, "Failed to read config file" @@ -127,7 +108,7 @@ def main() -> int: # Check if the feature is enabled in the config auto_landing_config = config.get("auto_landing", {}) if not auto_landing_config.get("enabled", False): - local_logger.info("Auto-landing is disabled in config.yaml, skipping test.") + print("Auto-landing is disabled in config.yaml, skipping test.") return 0 # Extract parameters from config @@ -137,7 +118,7 @@ def main() -> int: image_height = config["video_input"]["height"] image_width = config["video_input"]["width"] except KeyError as e: - local_logger.error(f"Config file missing required key: {e}") + print(f"ERROR: Config file missing required key: {e}") return -1 # Setup @@ -170,7 +151,7 @@ def main() -> int: time.sleep(0.5) # Test 1: Send a single detection and verify it's processed - local_logger.info("--- Test 1: Processing single detection ---") + print("--- Test 1: Processing single detection ---") detection1_bbox = [200, 200, 400, 400] detection1 = create_test_detection(detection1_bbox, 1, 0.9) simulate_detection_input( @@ -189,10 +170,10 @@ def main() -> int: assert hasattr(landing_info, "angle_x") assert hasattr(landing_info, "angle_y") assert hasattr(landing_info, "target_dist") - local_logger.info("--- Test 1 Passed ---") + print("--- Test 1 Passed ---") # Test 2: Verify strategy with multiple detections - local_logger.info(f"--- Test 2: Verifying {detection_strategy.value} strategy ---") + print(f"--- Test 2: Verifying {detection_strategy.value} strategy ---") # This is the first detection in the list detection_first_bbox = [100, 100, 200, 200] @@ -243,7 +224,7 @@ def main() -> int: assert math.isclose( landing_info2.angle_y, expected_angle_y, rel_tol=1e-7 ), f"Expected angle_y to be {expected_angle_y}, but got {landing_info2.angle_y}" - local_logger.info("--- Test 2 Passed ---") + print("--- Test 2 Passed ---") # Cleanup controller.request_exit() @@ -253,9 +234,7 @@ def main() -> int: # Wait for worker to finish worker.join(timeout=5.0) - if worker.is_alive(): - worker.terminate() - worker.join() + worker.terminate() return 0 From ec1806b4f38efaeab379ad2db1e06ea39ba393c6 Mon Sep 17 00:00:00 2001 From: Daniel Zhao <94013292+danielquzhao@users.noreply.github.com> Date: Sun, 24 Aug 2025 16:34:43 -0600 Subject: [PATCH 09/10] Formatting --- tests/integration/test_auto_landing_worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_auto_landing_worker.py b/tests/integration/test_auto_landing_worker.py index c01e4d06..727b5a07 100644 --- a/tests/integration/test_auto_landing_worker.py +++ b/tests/integration/test_auto_landing_worker.py @@ -27,6 +27,7 @@ detection_strategy = auto_landing.DetectionSelectionStrategy.FIRST_DETECTION # detection_strategy = auto_landing.DetectionSelectionStrategy.HIGHEST_CONFIDENCE + def simulate_detection_input( input_queue: queue_proxy_wrapper.QueueProxyWrapper, detections: list, From 5c19b44c93a103395a4848a36c4cd22653b3bd9b Mon Sep 17 00:00:00 2001 From: Daniel Zhao <94013292+danielquzhao@users.noreply.github.com> Date: Sun, 7 Sep 2025 13:11:28 -0400 Subject: [PATCH 10/10] pass -> continue --- modules/auto_landing/auto_landing_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/auto_landing/auto_landing_worker.py b/modules/auto_landing/auto_landing_worker.py index 4271632b..81fc8ae8 100644 --- a/modules/auto_landing/auto_landing_worker.py +++ b/modules/auto_landing/auto_landing_worker.py @@ -72,7 +72,7 @@ def auto_landing_worker( input_data = input_queue.queue.get_nowait() except queue.Empty: # No data available, continue - pass + continue if input_data is not None: result, landing_info = auto_lander.run(input_data)